From 2fde6f8081696411b57cfe00a5c1a7cba4d37558 Mon Sep 17 00:00:00 2001 From: Keoma Wright Date: Sun, 7 Sep 2025 20:26:10 +0200 Subject: [PATCH] fix: implement stream recovery to prevent chat hanging (#1977) - Add StreamRecoveryManager for handling stream timeouts - Monitor stream activity with 45-second timeout - Automatic recovery with 2 retry attempts - Proper cleanup on stream completion Fixes #1964 Co-authored-by: Keoma Wright --- app/lib/.server/llm/stream-recovery.ts | 92 ++++++++++++++++++++++++++ app/routes/api.chat.ts | 15 +++++ 2 files changed, 107 insertions(+) create mode 100644 app/lib/.server/llm/stream-recovery.ts diff --git a/app/lib/.server/llm/stream-recovery.ts b/app/lib/.server/llm/stream-recovery.ts new file mode 100644 index 0000000..0699e61 --- /dev/null +++ b/app/lib/.server/llm/stream-recovery.ts @@ -0,0 +1,92 @@ +import { createScopedLogger } from '~/utils/logger'; + +const logger = createScopedLogger('stream-recovery'); + +export interface StreamRecoveryOptions { + maxRetries?: number; + timeout?: number; + onTimeout?: () => void; + onRecovery?: () => void; +} + +export class StreamRecoveryManager { + private _retryCount = 0; + private _timeoutHandle: NodeJS.Timeout | null = null; + private _lastActivity: number = Date.now(); + private _isActive = true; + + constructor(private _options: StreamRecoveryOptions = {}) { + this._options = { + maxRetries: 3, + timeout: 30000, // 30 seconds default + ..._options, + }; + } + + startMonitoring() { + this._resetTimeout(); + } + + updateActivity() { + this._lastActivity = Date.now(); + this._resetTimeout(); + } + + private _resetTimeout() { + if (this._timeoutHandle) { + clearTimeout(this._timeoutHandle); + } + + if (!this._isActive) { + return; + } + + this._timeoutHandle = setTimeout(() => { + if (this._isActive) { + logger.warn('Stream timeout detected'); + this._handleTimeout(); + } + }, this._options.timeout); + } + + private _handleTimeout() { + if (this._retryCount >= (this._options.maxRetries || 3)) { + logger.error('Max retries reached for stream recovery'); + this.stop(); + + return; + } + + this._retryCount++; + logger.info(`Attempting stream recovery (attempt ${this._retryCount})`); + + if (this._options.onTimeout) { + this._options.onTimeout(); + } + + // Reset monitoring after recovery attempt + this._resetTimeout(); + + if (this._options.onRecovery) { + this._options.onRecovery(); + } + } + + stop() { + this._isActive = false; + + if (this._timeoutHandle) { + clearTimeout(this._timeoutHandle); + this._timeoutHandle = null; + } + } + + getStatus() { + return { + isActive: this._isActive, + retryCount: this._retryCount, + lastActivity: this._lastActivity, + timeSinceLastActivity: Date.now() - this._lastActivity, + }; + } +} diff --git a/app/routes/api.chat.ts b/app/routes/api.chat.ts index 4deb055..73f9176 100644 --- a/app/routes/api.chat.ts +++ b/app/routes/api.chat.ts @@ -13,6 +13,7 @@ import { createSummary } from '~/lib/.server/llm/create-summary'; import { extractPropertiesFromMessage } from '~/lib/.server/llm/utils'; import type { DesignScheme } from '~/types/design-scheme'; import { MCPService } from '~/lib/services/mcpService'; +import { StreamRecoveryManager } from '~/lib/.server/llm/stream-recovery'; export async function action(args: ActionFunctionArgs) { return chatAction(args); @@ -39,6 +40,14 @@ function parseCookies(cookieHeader: string): Record { } async function chatAction({ context, request }: ActionFunctionArgs) { + const streamRecovery = new StreamRecoveryManager({ + timeout: 45000, + maxRetries: 2, + onTimeout: () => { + logger.warn('Stream timeout - attempting recovery'); + }, + }); + const { messages, files, promptId, contextOptimization, supabase, chatMode, designScheme, maxLLMSteps } = await request.json<{ messages: Messages; @@ -83,6 +92,8 @@ async function chatAction({ context, request }: ActionFunctionArgs) { const dataStream = createDataStream({ async execute(dataStream) { + streamRecovery.startMonitoring(); + const filePaths = getFilePaths(files || {}); let filteredFiles: FileMap | undefined = undefined; let summary: string | undefined = undefined; @@ -314,9 +325,12 @@ async function chatAction({ context, request }: ActionFunctionArgs) { (async () => { for await (const part of result.fullStream) { + streamRecovery.updateActivity(); + if (part.type === 'error') { const error: any = part.error; logger.error('Streaming error:', error); + streamRecovery.stop(); // Enhanced error handling for common streaming issues if (error.message?.includes('Invalid JSON response')) { @@ -328,6 +342,7 @@ async function chatAction({ context, request }: ActionFunctionArgs) { return; } } + streamRecovery.stop(); })(); result.mergeIntoDataStream(dataStream); },