refactor: workbench store and move logic into action runner (#4)

This commit is contained in:
Dominic Elm
2024-07-22 17:40:28 +02:00
committed by GitHub
parent cae55a7026
commit f4987a4ecd
10 changed files with 295 additions and 237 deletions

View File

@@ -24,6 +24,8 @@ export default class SwitchableStream extends TransformStream {
await this._currentReader.cancel();
}
console.log('Switching stream');
this._currentReader = newStream.getReader();
this._pumpStream();

View File

@@ -1,68 +1,187 @@
import { WebContainer } from '@webcontainer/api';
import { map, type MapStore } from 'nanostores';
import * as nodePath from 'node:path';
import type { BoltAction } from '../../types/actions';
import { createScopedLogger } from '../../utils/logger';
import { unreachable } from '../../utils/unreachable';
import type { ActionCallbackData } from './message-parser';
const logger = createScopedLogger('ActionRunner');
export type ActionStatus = 'pending' | 'running' | 'complete' | 'aborted' | 'failed';
export type BaseActionState = BoltAction & {
status: Exclude<ActionStatus, 'failed'>;
abort: () => void;
executed: boolean;
abortSignal: AbortSignal;
};
export type FailedActionState = BoltAction &
Omit<BaseActionState, 'status'> & {
status: Extract<ActionStatus, 'failed'>;
error: string;
};
export type ActionState = BaseActionState | FailedActionState;
type BaseActionUpdate = Partial<Pick<BaseActionState, 'status' | 'abort' | 'executed'>>;
export type ActionStateUpdate =
| BaseActionUpdate
| (Omit<BaseActionUpdate, 'status'> & { status: 'failed'; error: string });
type ActionsMap = MapStore<Record<string, ActionState>>;
export class ActionRunner {
#webcontainer: Promise<WebContainer>;
#currentExecutionPromise: Promise<void> = Promise.resolve();
actions: ActionsMap = import.meta.hot?.data.actions ?? map({});
constructor(webcontainerPromise: Promise<WebContainer>) {
this.#webcontainer = webcontainerPromise;
if (import.meta.hot) {
import.meta.hot.data.actions = this.actions;
}
}
async runAction({ action }: ActionCallbackData, abortSignal?: AbortSignal) {
logger.trace('Running action', action);
addAction(data: ActionCallbackData) {
const { actionId } = data;
const { content } = action;
const action = this.actions.get()[actionId];
if (action) {
// action already added
return;
}
const abortController = new AbortController();
this.actions.setKey(actionId, {
...data.action,
status: 'pending',
executed: false,
abort: () => {
abortController.abort();
this.#updateAction(actionId, { status: 'aborted' });
},
abortSignal: abortController.signal,
});
this.#currentExecutionPromise.then(() => {
this.#updateAction(actionId, { status: 'running' });
});
}
async runAction(data: ActionCallbackData) {
const { actionId } = data;
const action = this.actions.get()[actionId];
if (!action) {
unreachable(`Action ${actionId} not found`);
}
if (action.executed) {
return;
}
this.#updateAction(actionId, { ...action, ...data.action, executed: true });
this.#currentExecutionPromise = this.#currentExecutionPromise
.then(() => {
return this.#executeAction(actionId);
})
.catch((error) => {
console.error('Action execution failed:', error);
});
}
async #executeAction(actionId: string) {
const action = this.actions.get()[actionId];
this.#updateAction(actionId, { status: 'running' });
try {
switch (action.type) {
case 'shell': {
await this.#runShellAction(action);
break;
}
case 'file': {
await this.#runFileAction(action);
break;
}
}
this.#updateAction(actionId, { status: action.abortSignal.aborted ? 'aborted' : 'complete' });
} catch (error) {
this.#updateAction(actionId, { status: 'failed', error: 'Action failed' });
// re-throw the error to be caught in the promise chain
throw error;
}
}
async #runShellAction(action: ActionState) {
if (action.type !== 'shell') {
unreachable('Expected shell action');
}
const webcontainer = await this.#webcontainer;
switch (action.type) {
case 'file': {
let folder = nodePath.dirname(action.filePath);
const process = await webcontainer.spawn('jsh', ['-c', action.content]);
// remove trailing slashes
folder = folder.replace(/\/$/g, '');
action.abortSignal.addEventListener('abort', () => {
process.kill();
});
if (folder !== '.') {
try {
await webcontainer.fs.mkdir(folder, { recursive: true });
logger.debug('Created folder', folder);
} catch (error) {
logger.error('Failed to create folder\n', error);
}
}
process.output.pipeTo(
new WritableStream({
write(data) {
console.log(data);
},
}),
);
try {
await webcontainer.fs.writeFile(action.filePath, content);
logger.debug(`File written ${action.filePath}`);
} catch (error) {
logger.error('Failed to write file\n', error);
}
const exitCode = await process.exit;
break;
}
case 'shell': {
const process = await webcontainer.spawn('jsh', ['-c', content]);
logger.debug(`Process terminated with code ${exitCode}`);
}
abortSignal?.addEventListener('abort', () => {
process.kill();
});
async #runFileAction(action: ActionState) {
if (action.type !== 'file') {
unreachable('Expected file action');
}
process.output.pipeTo(
new WritableStream({
write(data) {
console.log(data);
},
}),
);
const webcontainer = await this.#webcontainer;
const exitCode = await process.exit;
let folder = nodePath.dirname(action.filePath);
logger.debug(`Process terminated with code ${exitCode}`);
// remove trailing slashes
folder = folder.replace(/\/+$/g, '');
if (folder !== '.') {
try {
await webcontainer.fs.mkdir(folder, { recursive: true });
logger.debug('Created folder', folder);
} catch (error) {
logger.error('Failed to create folder\n', error);
}
}
try {
await webcontainer.fs.writeFile(action.filePath, action.content);
logger.debug(`File written ${action.filePath}`);
} catch (error) {
logger.error('Failed to write file\n', error);
}
}
#updateAction(id: string, newState: ActionStateUpdate) {
const actions = this.actions.get();
this.actions.setKey(id, { ...actions[id], ...newState });
}
}

View File

@@ -179,7 +179,7 @@ function runTest(input: string | string[], outputOrExpectedResult: string | Expe
};
const parser = new StreamingMessageParser({
artifactElement: '',
artifactElement: () => '',
callbacks,
});

View File

@@ -31,11 +31,15 @@ export interface ParserCallbacks {
onActionClose?: ActionCallback;
}
type ElementFactory = () => string;
interface ElementFactoryProps {
messageId: string;
}
type ElementFactory = (props: ElementFactoryProps) => string;
export interface StreamingMessageParserOptions {
callbacks?: ParserCallbacks;
artifactElement?: string | ElementFactory;
artifactElement?: ElementFactory;
}
interface MessageState {
@@ -193,9 +197,9 @@ export class StreamingMessageParser {
this._options.callbacks?.onArtifactOpen?.({ messageId, ...currentArtifact });
output +=
this._options.artifactElement ??
`<div class="__boltArtifact__" data-artifact-id="${artifactId}" data-message-id="${messageId}"></div>`;
const artifactFactory = this._options.artifactElement ?? createArtifactElement;
output += artifactFactory({ messageId });
i = openTagEnd + 1;
} else {
@@ -264,3 +268,18 @@ export class StreamingMessageParser {
return match ? match[1] : undefined;
}
}
const createArtifactElement: ElementFactory = (props) => {
const elementProps = [
'class="__boltArtifact__"',
Object.entries(props).map(([key, value]) => {
return `data-${camelToDashCase(key)}=${JSON.stringify(value)}`;
}),
];
return `<div ${elementProps.join(' ')}></div>`;
};
function camelToDashCase(input: string) {
return input.replace(/([a-z])([A-Z])/g, '$1-$2').toLowerCase();
}

View File

@@ -25,6 +25,13 @@ export class PreviewsStore {
webcontainer.on('port', (port, type, url) => {
let previewInfo = this.#availablePreviews.get(port);
if (type === 'close' && previewInfo) {
this.#availablePreviews.delete(port);
this.previews.set(this.previews.get().filter((preview) => preview.port !== port));
return;
}
const previews = this.previews.get();
if (!previewInfo) {

View File

@@ -1,48 +1,24 @@
import { atom, map, type MapStore, type ReadableAtom, type WritableAtom } from 'nanostores';
import type { EditorDocument, ScrollPosition } from '../../components/editor/codemirror/CodeMirrorEditor';
import type { BoltAction } from '../../types/actions';
import { unreachable } from '../../utils/unreachable';
import { ActionRunner } from '../runtime/action-runner';
import type { ActionCallbackData, ArtifactCallbackData } from '../runtime/message-parser';
import { webcontainer } from '../webcontainer';
import { chatStore } from './chat';
import { EditorStore } from './editor';
import { FilesStore, type FileMap } from './files';
import { PreviewsStore } from './previews';
const MIN_SPINNER_TIME = 200;
export type BaseActionState = BoltAction & {
status: 'running' | 'complete' | 'pending' | 'aborted';
executing: boolean;
abort?: () => void;
};
export type FailedActionState = BoltAction &
Omit<BaseActionState, 'status'> & {
status: 'failed';
error: string;
};
export type ActionState = BaseActionState | FailedActionState;
type BaseActionUpdate = Partial<Pick<BaseActionState, 'status' | 'executing' | 'abort'>>;
export type ActionStateUpdate =
| BaseActionUpdate
| (Omit<BaseActionUpdate, 'status'> & { status: 'failed'; error: string });
export interface ArtifactState {
title: string;
closed: boolean;
currentActionPromise: Promise<void>;
actions: MapStore<Record<string, ActionState>>;
runner: ActionRunner;
}
export type ArtifactUpdateState = Pick<ArtifactState, 'title' | 'closed'>;
type Artifacts = MapStore<Record<string, ArtifactState>>;
export class WorkbenchStore {
#actionRunner = new ActionRunner(webcontainer);
#previewsStore = new PreviewsStore(webcontainer);
#filesStore = new FilesStore(webcontainer);
#editorStore = new EditorStore(webcontainer);
@@ -102,148 +78,63 @@ export class WorkbenchStore {
}
abortAllActions() {
for (const [, artifact] of Object.entries(this.artifacts.get())) {
for (const [, action] of Object.entries(artifact.actions.get())) {
if (action.status === 'running') {
action.abort?.();
}
}
}
// TODO: what do we wanna do and how do we wanna recover from this?
}
addArtifact({ id, messageId, title }: ArtifactCallbackData) {
const artifacts = this.artifacts.get();
const artifactKey = getArtifactKey(id, messageId);
const artifact = artifacts[artifactKey];
addArtifact({ messageId, title }: ArtifactCallbackData) {
const artifact = this.#getArtifact(messageId);
if (artifact) {
return;
}
this.artifacts.setKey(artifactKey, {
this.artifacts.setKey(messageId, {
title,
closed: false,
actions: map({}),
currentActionPromise: Promise.resolve(),
runner: new ActionRunner(webcontainer),
});
}
updateArtifact({ id, messageId }: ArtifactCallbackData, state: Partial<ArtifactState>) {
const artifacts = this.artifacts.get();
const key = getArtifactKey(id, messageId);
const artifact = artifacts[key];
updateArtifact({ messageId }: ArtifactCallbackData, state: Partial<ArtifactUpdateState>) {
const artifact = this.#getArtifact(messageId);
if (!artifact) {
return;
}
this.artifacts.setKey(key, { ...artifact, ...state });
this.artifacts.setKey(messageId, { ...artifact, ...state });
}
async addAction(data: ActionCallbackData) {
const { artifactId, messageId, actionId } = data;
const { messageId } = data;
const artifacts = this.artifacts.get();
const key = getArtifactKey(artifactId, messageId);
const artifact = artifacts[key];
const artifact = this.#getArtifact(messageId);
if (!artifact) {
unreachable('Artifact not found');
}
const actions = artifact.actions.get();
const action = actions[actionId];
if (action) {
return;
}
artifact.actions.setKey(actionId, { ...data.action, status: 'pending', executing: false });
artifact.currentActionPromise.then(() => {
if (chatStore.get().aborted) {
return;
}
this.#updateAction(key, actionId, { status: 'running' });
});
artifact.runner.addAction(data);
}
async runAction(data: ActionCallbackData) {
const { artifactId, messageId, actionId } = data;
const { messageId } = data;
const artifacts = this.artifacts.get();
const key = getArtifactKey(artifactId, messageId);
const artifact = artifacts[key];
const artifact = this.#getArtifact(messageId);
if (!artifact) {
unreachable('Artifact not found');
}
const actions = artifact.actions.get();
const action = actions[actionId];
if (!action) {
unreachable('Expected action to exist');
}
if (action.executing || action.status === 'complete' || action.status === 'failed' || action.status === 'aborted') {
return;
}
artifact.currentActionPromise = artifact.currentActionPromise.then(async () => {
if (chatStore.get().aborted) {
return;
}
const abortController = new AbortController();
this.#updateAction(key, actionId, {
status: 'running',
executing: true,
abort: () => {
abortController.abort();
this.#updateAction(key, actionId, { status: 'aborted' });
},
});
try {
await Promise.all([
this.#actionRunner.runAction(data, abortController.signal),
new Promise((resolve) => setTimeout(resolve, MIN_SPINNER_TIME)),
]);
if (!abortController.signal.aborted) {
this.#updateAction(key, actionId, { status: 'complete' });
}
} catch (error) {
this.#updateAction(key, actionId, { status: 'failed', error: 'Action failed' });
throw error;
} finally {
this.#updateAction(key, actionId, { executing: false });
}
});
artifact.runner.runAction(data);
}
#updateAction(artifactId: string, actionId: string, newState: ActionStateUpdate) {
#getArtifact(id: string) {
const artifacts = this.artifacts.get();
const artifact = artifacts[artifactId];
if (!artifact) {
return;
}
const actions = artifact.actions.get();
artifact.actions.setKey(actionId, { ...actions[actionId], ...newState });
return artifacts[id];
}
}
export function getArtifactKey(artifactId: string, messageId: string) {
return `${artifactId}_${messageId}`;
}
export const workbenchStore = new WorkbenchStore();
if (import.meta.hot) {