diff --git a/workers/site/sdk/chat-sdk.ts b/workers/site/sdk/chat-sdk.ts index 4872067..6b8ce61 100644 --- a/workers/site/sdk/chat-sdk.ts +++ b/workers/site/sdk/chat-sdk.ts @@ -1,74 +1,14 @@ -import { OpenAI } from "openai"; +import {OpenAI} from "openai"; import Message from "../models/Message"; -import { executePreprocessingWorkflow } from "../workflows"; -import { MarkdownSdk } from "./markdown-sdk"; -import { AssistantSdk } from "./assistant-sdk"; -import { IMessage } from "../../../src/stores/ClientChatStore"; -import { getModelFamily } from "../../../src/components/chat/SupportedModels"; +import {AssistantSdk} from "./assistant-sdk"; +import {IMessage} from "../../../src/stores/ClientChatStore"; +import {getModelFamily} from "../../../src/components/chat/SupportedModels"; export class ChatSdk { static async preprocess({ - tools, messages, - contextContainer, - eventHost, - streamId, - openai, - env, }) { - const { latestAiMessage, latestUserMessage } = - ChatSdk.extractMessageContext(messages); - - if (tools.includes("web-search")) { - try { - const { results } = await executePreprocessingWorkflow({ - latestUserMessage, - latestAiMessage, - eventHost, - streamId, - chat: { - messages, - openai, - }, - }); - - const { webhook } = results.get("preprocessed"); - - if (webhook) { - const objectId = env.SITE_COORDINATOR.idFromName("stream-index"); - - const durableObject = env.SITE_COORDINATOR.get(objectId); - - await durableObject.saveStreamData( - streamId, - JSON.stringify({ - webhooks: [webhook], - }), - ); - - await durableObject.saveStreamData( - webhook.id, - JSON.stringify({ - parent: streamId, - url: webhook.url, - }), - ); - } - - console.log("handleOpenAiStream::workflowResults", { - webhookUrl: webhook?.url, - }); - } catch (workflowError) { - console.error( - "handleOpenAiStream::workflowError::Failed to execute workflow", - workflowError, - ); - } - return Message.create({ - role: "assistant", - content: MarkdownSdk.formatContextContainer(contextContainer), - }); - } + // a custom implementation for preprocessing would go here return Message.create({ role: "assistant", content: "", @@ -92,20 +32,10 @@ export class ChatSdk { return new Response("No messages provided", { status: 400 }); } - const contextContainer = new Map(); - const preprocessedContext = await ChatSdk.preprocess({ - tools, messages, - eventHost: ctx.env.EVENTSOURCE_HOST, - contextContainer: contextContainer, - streamId, - openai: ctx.openai, - env: ctx.env, }); - console.log({ preprocessedContext: JSON.stringify(preprocessedContext) }); - const objectId = ctx.env.SITE_COORDINATOR.idFromName("stream-index"); const durableObject = ctx.env.SITE_COORDINATOR.get(objectId); @@ -139,20 +69,6 @@ export class ChatSdk { ); } - private static extractMessageContext(messages: any[]) { - const latestUserMessageObj = [...messages] - .reverse() - .find((msg) => msg.role === "user"); - const latestAiMessageObj = [...messages] - .reverse() - .find((msg) => msg.role === "assistant"); - - return { - latestUserMessage: latestUserMessageObj?.content || "", - latestAiMessage: latestAiMessageObj?.content || "", - }; - } - static async calculateMaxTokens( messages: any[], ctx: Record & { @@ -230,18 +146,18 @@ export class ChatSdk { return messagesToSend; } - static async handleWebhookStream( + static async handleAgentStream( eventSource: EventSource, dataCallback: any, ): Promise { - console.log("sdk::handleWebhookStream::start"); + // console.log("sdk::handleWebhookStream::start"); let done = false; return new Promise((resolve, reject) => { if (!done) { - console.log("sdk::handleWebhookStream::promise::created"); + // console.log("sdk::handleWebhookStream::promise::created"); eventSource.onopen = () => { - console.log("sdk::handleWebhookStream::eventSource::open"); - console.log("Connected to webhook"); + // console.log("sdk::handleWebhookStream::eventSource::open"); + console.log("Connected to agent"); }; const parseEvent = (data) => { diff --git a/workers/site/services/ChatService.ts b/workers/site/services/ChatService.ts index 18bcf3c..742a689 100644 --- a/workers/site/services/ChatService.ts +++ b/workers/site/services/ChatService.ts @@ -89,23 +89,23 @@ const ChatService = types }; - const handleWebhookProcessing = async ( + const handleAgentProcess = async ( {controller, encoder, webhook, dynamicContext}: StreamHandlerParams ) => { - console.log("handleWebhookProcessing::start"); + console.log("handleAgentProcess::start"); if (!webhook) return; - console.log("handleWebhookProcessing::[Loading Live Search]"); + console.log("handleAgentProcess::[Loading Live Search]"); dynamicContext.append("\n## Live Search\n~~~markdown\n"); - for await (const chunk of self.streamWebhookData({webhook})) { + for await (const chunk of self.streamAgentData({webhook})) { controller.enqueue(encoder.encode(chunk)); dynamicContext.append(chunk); } dynamicContext.append("\n~~~\n"); - console.log(`handleWebhookProcessing::[Finished loading Live Search!][length: ${dynamicContext.content.length}]`); + console.log(`handleAgentProcess::[Finished loading Live Search!][length: ${dynamicContext.content.length}]`); ChatSdk.sendDoubleNewline(controller, encoder); - console.log("handleWebhookProcessing::exit") + console.log("handleAgentProcess::exit") }; const createStreamParams = async ( @@ -189,8 +189,8 @@ const ChatService = types self.webhookStreamActive = value; }, - streamWebhookData: async function* ({webhook}) { - console.log("streamWebhookData::start"); + streamAgentData: async function* ({webhook}) { + console.log("streamAgentData::start"); if (self.webhookStreamActive) { return } @@ -206,10 +206,10 @@ const ChatService = types let currentPromise = dataPromise(); const eventSource = new EventSource(webhook.url.trim()); - console.log("streamWebhookData::setWebhookStreamActive::true"); + console.log("streamAgentData::setWebhookStreamActive::true"); self.setWebhookStreamActive(true) try { - ChatSdk.handleWebhookStream(eventSource, (data) => { + ChatSdk.handleAgentStream(eventSource, (data) => { const formattedData = `data: ${JSON.stringify(data)}\n\n`; queue.push(formattedData); if (resolveQueueItem) resolveQueueItem(); @@ -218,7 +218,7 @@ const ChatService = types finished = true; if (resolveQueueItem) resolveQueueItem(); }).catch((err) => { - console.log(`chatService::streamWebhookData::STREAM_ERROR::${err}`); + console.log(`chatService::streamAgentData::STREAM_ERROR::${err}`); errorOccurred = err; if (resolveQueueItem) resolveQueueItem(); }); @@ -234,9 +234,9 @@ const ChatService = types } self.setWebhookStreamActive(false); eventSource.close(); - console.log(`chatService::streamWebhookData::complete`); + // console.log(`chatService::streamAgentData::complete`); } catch (error) { - console.log(`chatService::streamWebhookData::error`); + console.log(`chatService::streamAgentData::error`); eventSource.close(); self.setWebhookStreamActive(false); console.error("Error while streaming webhook data:", error); @@ -290,10 +290,10 @@ const ChatService = types } }, /** - * handleWebhookIfNeeded - * Checks if a webhook exists, and if so, processes it. + * bootstrapAgents + * Checks if an agent exists, and if so, bootstraps it. */ - async handleWebhookIfNeeded(params: { + async bootstrapAgents(params: { savedStreamConfig: string; controller: ReadableStreamDefaultController; encoder: TextEncoder; @@ -306,7 +306,7 @@ const ChatService = types if (webhook) { console.log(`chatService::handleSseStream::ReadableStream::webhook:start`); - await handleWebhookProcessing({ + await handleAgentProcess({ controller, encoder, webhook, @@ -337,7 +337,7 @@ const ChatService = types // Process webhook if configured - await self.handleWebhookIfNeeded({ + await self.bootstrapAgents({ savedStreamConfig, controller, encoder,