From 400c70f17df242228ca4e4dcac595cb7b509f833 Mon Sep 17 00:00:00 2001 From: geoffsee <> Date: Tue, 2 Sep 2025 14:45:16 -0400 Subject: [PATCH] streaming implementaion re-added to UI --- Cargo.lock | 2 + crates/chat-ui/Cargo.toml | 18 ++- crates/chat-ui/LICENSE | 24 ---- crates/chat-ui/src/app.rs | 252 ++++++++++++++++++++++++++++++--- crates/chat-ui/style/main.scss | 39 +++++ scripts/curl_chat_stream.sh | 2 +- 6 files changed, 295 insertions(+), 42 deletions(-) delete mode 100644 crates/chat-ui/LICENSE diff --git a/Cargo.lock b/Cargo.lock index 0d806ae..8fa9f2f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -893,6 +893,7 @@ dependencies = [ "axum", "console_error_panic_hook", "gloo-net", + "js-sys", "leptos", "leptos_axum", "leptos_meta", @@ -902,6 +903,7 @@ dependencies = [ "serde_json", "tokio", "wasm-bindgen", + "wasm-bindgen-futures", "web-sys", ] diff --git a/crates/chat-ui/Cargo.toml b/crates/chat-ui/Cargo.toml index 16ade35..e66dc32 100644 --- a/crates/chat-ui/Cargo.toml +++ b/crates/chat-ui/Cargo.toml @@ -15,10 +15,26 @@ leptos_axum = { version = "0.8.0", optional = true } leptos_meta = { version = "0.8.0" } tokio = { version = "1", features = ["rt-multi-thread"], optional = true } wasm-bindgen = { version = "=0.2.100", optional = true } +wasm-bindgen-futures = "0.4" +js-sys = "0.3" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" reqwest = { version = "0.12", features = ["json"] } -web-sys = { version = "0.3", features = ["console"] } +web-sys = { version = "0.3", features = [ + "console", + "EventSource", + "MessageEvent", + "Window", + "Request", + "RequestInit", + "Response", + "Headers", + "ReadableStream", + "ReadableStreamDefaultReader", + "TextDecoder", + "TextDecoderOptions", + "HtmlInputElement" +] } gloo-net = { version = "0.6", features = ["http"] } [features] diff --git a/crates/chat-ui/LICENSE b/crates/chat-ui/LICENSE deleted file mode 100644 index fdddb29..0000000 --- a/crates/chat-ui/LICENSE +++ /dev/null @@ -1,24 +0,0 @@ -This is free and unencumbered software released into the public domain. - -Anyone is free to copy, modify, publish, use, compile, sell, or -distribute this software, either in source code form or as a compiled -binary, for any purpose, commercial or non-commercial, and by any -means. - -In jurisdictions that recognize copyright laws, the author or authors -of this software dedicate any and all copyright interest in the -software to the public domain. We make this dedication for the benefit -of the public at large and to the detriment of our heirs and -successors. We intend this dedication to be an overt act of -relinquishment in perpetuity of all present and future rights to this -software under copyright law. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. -IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR -OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, -ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR -OTHER DEALINGS IN THE SOFTWARE. - -For more information, please refer to diff --git a/crates/chat-ui/src/app.rs b/crates/chat-ui/src/app.rs index 6a540a6..3d45dea 100644 --- a/crates/chat-ui/src/app.rs +++ b/crates/chat-ui/src/app.rs @@ -49,7 +49,6 @@ use leptos_router::{ }; use serde::{Deserialize, Serialize}; use web_sys::console; -// Remove spawn_local import as we'll use different approach // Data structures for OpenAI-compatible API #[derive(Debug, Clone, Serialize, Deserialize)] @@ -73,6 +72,29 @@ pub struct ChatChoice { pub finish_reason: Option, } +// Streaming response structures +#[derive(Debug, Deserialize)] +pub struct StreamDelta { + pub role: Option, + pub content: Option, +} + +#[derive(Debug, Deserialize)] +pub struct StreamChoice { + pub index: u32, + pub delta: StreamDelta, + pub finish_reason: Option, +} + +#[derive(Debug, Deserialize)] +pub struct StreamChatResponse { + pub id: String, + pub object: String, + pub created: u64, + pub model: String, + pub choices: Vec, +} + #[derive(Debug, Deserialize)] pub struct ChatResponse { pub id: String, @@ -161,6 +183,129 @@ pub async fn send_chat_completion( } } +// Streaming chat completion using EventSource +#[cfg(target_arch = "wasm32")] +pub fn send_chat_completion_stream( + messages: Vec, + model: String, + on_chunk: impl Fn(String) + 'static, + on_complete: impl Fn() + 'static, + on_error: impl Fn(String) + 'static, +) { + use wasm_bindgen::prelude::*; + use wasm_bindgen::JsCast; + + let request = ChatRequest { + model, + messages, + max_tokens: Some(1024), + stream: Some(true), + }; + + // We need to send a POST request but EventSource only supports GET + // So we'll use fetch with a readable stream instead + let window = web_sys::window().unwrap(); + let request_json = serde_json::to_string(&request).unwrap(); + + let opts = web_sys::RequestInit::new(); + opts.set_method("POST"); + opts.set_body(&JsValue::from_str(&request_json)); + + let headers = web_sys::Headers::new().unwrap(); + headers.set("Content-Type", "application/json").unwrap(); + headers.set("Accept", "text/event-stream").unwrap(); + opts.set_headers(&headers); + + let request = web_sys::Request::new_with_str_and_init("/v1/chat/completions", &opts).unwrap(); + + let promise = window.fetch_with_request(&request); + + wasm_bindgen_futures::spawn_local(async move { + match wasm_bindgen_futures::JsFuture::from(promise).await { + Ok(resp_value) => { + let resp: web_sys::Response = resp_value.dyn_into().unwrap(); + + if !resp.ok() { + on_error(format!("Server error: {}", resp.status())); + return; + } + + let body = resp.body(); + if body.is_none() { + on_error("No response body".to_string()); + return; + } + + let reader = body + .unwrap() + .get_reader() + .dyn_into::() + .unwrap(); + + let decoder = web_sys::TextDecoder::new().unwrap(); + let mut buffer = String::new(); + + loop { + match wasm_bindgen_futures::JsFuture::from(reader.read()).await { + Ok(result) => { + let done = js_sys::Reflect::get(&result, &JsValue::from_str("done")) + .unwrap() + .as_bool() + .unwrap_or(false); + + if done { + break; + } + + let value = js_sys::Reflect::get(&result, &JsValue::from_str("value")).unwrap(); + let array = js_sys::Uint8Array::new(&value); + let mut bytes = vec![0; array.length() as usize]; + array.copy_to(&mut bytes); + let text = decoder.decode_with_u8_array(&bytes).unwrap(); + + buffer.push_str(&text); + + // Process complete SSE events from buffer + while let Some(event_end) = buffer.find("\n\n") { + let event = buffer[..event_end].to_string(); + buffer = buffer[event_end + 2..].to_string(); + + // Parse SSE event + for line in event.lines() { + if let Some(data) = line.strip_prefix("data: ") { + if data == "[DONE]" { + on_complete(); + return; + } + + // Parse JSON chunk + if let Ok(chunk) = serde_json::from_str::(data) { + if let Some(choice) = chunk.choices.first() { + if let Some(content) = &choice.delta.content { + on_chunk(content.clone()); + } + } + } + } + } + } + } + Err(e) => { + on_error(format!("Read error: {:?}", e)); + break; + } + } + } + + on_complete(); + } + Err(e) => { + on_error(format!("Fetch error: {:?}", e)); + } + } + }); +} + pub fn shell(options: LeptosOptions) -> impl IntoView { view! { @@ -221,6 +366,13 @@ fn ChatPage() -> impl IntoView { // State for available models and selected model let available_models = RwSignal::new(Vec::::new()); let selected_model = RwSignal::new(String::from("gemma-3-1b-it")); // Default model + + // State for streaming response + let streaming_content = RwSignal::new(String::new()); + let is_streaming = RwSignal::new(false); + + // State for streaming mode toggle + let use_streaming = RwSignal::new(true); // Default to streaming // Client-side only: Fetch models on component mount #[cfg(target_arch = "wasm32")] @@ -265,25 +417,63 @@ fn ChatPage() -> impl IntoView { // Prepare messages for API call let current_messages = messages.get(); let current_model = selected_model.get(); + let should_stream = use_streaming.get(); - // Spawn async task to call API - spawn_local(async move { - match send_chat_completion(current_messages, current_model).await { - Ok(response_content) => { - let assistant_message = ChatMessage { - role: "assistant".to_string(), - content: response_content, - }; - messages.update(|msgs| msgs.push(assistant_message)); + if should_stream { + // Clear streaming content and set streaming flag + streaming_content.set(String::new()); + is_streaming.set(true); + + // Use streaming API + send_chat_completion_stream( + current_messages, + current_model, + move |chunk| { + // Append chunk to streaming content + streaming_content.update(|content| content.push_str(&chunk)); + }, + move || { + // On complete, move streaming content to messages + let final_content = streaming_content.get(); + if !final_content.is_empty() { + let assistant_message = ChatMessage { + role: "assistant".to_string(), + content: final_content, + }; + messages.update(|msgs| msgs.push(assistant_message)); + } + streaming_content.set(String::new()); + is_streaming.set(false); is_loading.set(false); - } - Err(error) => { - console::log_1(&format!("API Error: {}", error).into()); + }, + move |error| { + console::log_1(&format!("Streaming Error: {}", error).into()); error_message.set(Some(error)); + is_streaming.set(false); is_loading.set(false); + streaming_content.set(String::new()); + }, + ); + } else { + // Use non-streaming API + spawn_local(async move { + match send_chat_completion(current_messages, current_model).await { + Ok(response_content) => { + let assistant_message = ChatMessage { + role: "assistant".to_string(), + content: response_content, + }; + messages.update(|msgs| msgs.push(assistant_message)); + is_loading.set(false); + } + Err(error) => { + console::log_1(&format!("API Error: {}", error).into()); + error_message.set(Some(error)); + is_loading.set(false); + } } - } - }); + }); + } } }; @@ -329,6 +519,19 @@ fn ChatPage() -> impl IntoView { } /> +
+ +
@@ -348,7 +551,24 @@ fn ChatPage() -> impl IntoView { /> {move || { - if is_loading.get() { + if is_streaming.get() { + let content = streaming_content.get(); + if !content.is_empty() { + view! { +
+
"assistant"
+
{content}"▊"
+
+ }.into_any() + } else { + view! { +
+
"assistant"
+
"Thinking..."
+
+ }.into_any() + } + } else if is_loading.get() && !use_streaming.get() { view! {
"assistant"
diff --git a/crates/chat-ui/style/main.scss b/crates/chat-ui/style/main.scss index 297e93f..d61dd11 100644 --- a/crates/chat-ui/style/main.scss +++ b/crates/chat-ui/style/main.scss @@ -42,6 +42,7 @@ body { align-items: center; justify-content: center; gap: 0.5rem; + flex-wrap: wrap; label { font-weight: 500; @@ -69,6 +70,24 @@ body { padding: 0.5rem; } } + + .streaming-toggle { + display: flex; + align-items: center; + margin-left: 1rem; + + label { + display: flex; + align-items: center; + gap: 0.5rem; + cursor: pointer; + font-size: 0.9rem; + + input[type="checkbox"] { + cursor: pointer; + } + } + } } } @@ -134,6 +153,16 @@ body { color: #6b7280; } } + + &.streaming { + .message-content { + .cursor { + display: inline-block; + animation: blink 1s infinite; + color: #9ca3af; + } + } + } } } @@ -223,4 +252,14 @@ body { .chat-messages::-webkit-scrollbar-thumb:hover { background: #a8a8a8; +} + +/* Cursor blink animation */ +@keyframes blink { + 0%, 50% { + opacity: 1; + } + 51%, 100% { + opacity: 0; + } } \ No newline at end of file diff --git a/scripts/curl_chat_stream.sh b/scripts/curl_chat_stream.sh index 3567f23..ac379c2 100755 --- a/scripts/curl_chat_stream.sh +++ b/scripts/curl_chat_stream.sh @@ -15,7 +15,7 @@ CONNECT_TIMEOUT=${CONNECT_TIMEOUT:-10} MAX_TIME=${MAX_TIME:-30} cat <