streaming implementaion re-added to UI

This commit is contained in:
geoffsee
2025-09-02 14:45:16 -04:00
parent bcbc6c4693
commit 400c70f17d
6 changed files with 295 additions and 42 deletions

View File

@@ -49,7 +49,6 @@ use leptos_router::{
};
use serde::{Deserialize, Serialize};
use web_sys::console;
// Remove spawn_local import as we'll use different approach
// Data structures for OpenAI-compatible API
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -73,6 +72,29 @@ pub struct ChatChoice {
pub finish_reason: Option<String>,
}
// Streaming response structures
#[derive(Debug, Deserialize)]
pub struct StreamDelta {
pub role: Option<String>,
pub content: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct StreamChoice {
pub index: u32,
pub delta: StreamDelta,
pub finish_reason: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct StreamChatResponse {
pub id: String,
pub object: String,
pub created: u64,
pub model: String,
pub choices: Vec<StreamChoice>,
}
#[derive(Debug, Deserialize)]
pub struct ChatResponse {
pub id: String,
@@ -161,6 +183,129 @@ pub async fn send_chat_completion(
}
}
// Streaming chat completion using EventSource
#[cfg(target_arch = "wasm32")]
pub fn send_chat_completion_stream(
messages: Vec<ChatMessage>,
model: String,
on_chunk: impl Fn(String) + 'static,
on_complete: impl Fn() + 'static,
on_error: impl Fn(String) + 'static,
) {
use wasm_bindgen::prelude::*;
use wasm_bindgen::JsCast;
let request = ChatRequest {
model,
messages,
max_tokens: Some(1024),
stream: Some(true),
};
// We need to send a POST request but EventSource only supports GET
// So we'll use fetch with a readable stream instead
let window = web_sys::window().unwrap();
let request_json = serde_json::to_string(&request).unwrap();
let opts = web_sys::RequestInit::new();
opts.set_method("POST");
opts.set_body(&JsValue::from_str(&request_json));
let headers = web_sys::Headers::new().unwrap();
headers.set("Content-Type", "application/json").unwrap();
headers.set("Accept", "text/event-stream").unwrap();
opts.set_headers(&headers);
let request = web_sys::Request::new_with_str_and_init("/v1/chat/completions", &opts).unwrap();
let promise = window.fetch_with_request(&request);
wasm_bindgen_futures::spawn_local(async move {
match wasm_bindgen_futures::JsFuture::from(promise).await {
Ok(resp_value) => {
let resp: web_sys::Response = resp_value.dyn_into().unwrap();
if !resp.ok() {
on_error(format!("Server error: {}", resp.status()));
return;
}
let body = resp.body();
if body.is_none() {
on_error("No response body".to_string());
return;
}
let reader = body
.unwrap()
.get_reader()
.dyn_into::<web_sys::ReadableStreamDefaultReader>()
.unwrap();
let decoder = web_sys::TextDecoder::new().unwrap();
let mut buffer = String::new();
loop {
match wasm_bindgen_futures::JsFuture::from(reader.read()).await {
Ok(result) => {
let done = js_sys::Reflect::get(&result, &JsValue::from_str("done"))
.unwrap()
.as_bool()
.unwrap_or(false);
if done {
break;
}
let value = js_sys::Reflect::get(&result, &JsValue::from_str("value")).unwrap();
let array = js_sys::Uint8Array::new(&value);
let mut bytes = vec![0; array.length() as usize];
array.copy_to(&mut bytes);
let text = decoder.decode_with_u8_array(&bytes).unwrap();
buffer.push_str(&text);
// Process complete SSE events from buffer
while let Some(event_end) = buffer.find("\n\n") {
let event = buffer[..event_end].to_string();
buffer = buffer[event_end + 2..].to_string();
// Parse SSE event
for line in event.lines() {
if let Some(data) = line.strip_prefix("data: ") {
if data == "[DONE]" {
on_complete();
return;
}
// Parse JSON chunk
if let Ok(chunk) = serde_json::from_str::<StreamChatResponse>(data) {
if let Some(choice) = chunk.choices.first() {
if let Some(content) = &choice.delta.content {
on_chunk(content.clone());
}
}
}
}
}
}
}
Err(e) => {
on_error(format!("Read error: {:?}", e));
break;
}
}
}
on_complete();
}
Err(e) => {
on_error(format!("Fetch error: {:?}", e));
}
}
});
}
pub fn shell(options: LeptosOptions) -> impl IntoView {
view! {
<!DOCTYPE html>
@@ -221,6 +366,13 @@ fn ChatPage() -> impl IntoView {
// State for available models and selected model
let available_models = RwSignal::new(Vec::<ModelInfo>::new());
let selected_model = RwSignal::new(String::from("gemma-3-1b-it")); // Default model
// State for streaming response
let streaming_content = RwSignal::new(String::new());
let is_streaming = RwSignal::new(false);
// State for streaming mode toggle
let use_streaming = RwSignal::new(true); // Default to streaming
// Client-side only: Fetch models on component mount
#[cfg(target_arch = "wasm32")]
@@ -265,25 +417,63 @@ fn ChatPage() -> impl IntoView {
// Prepare messages for API call
let current_messages = messages.get();
let current_model = selected_model.get();
let should_stream = use_streaming.get();
// Spawn async task to call API
spawn_local(async move {
match send_chat_completion(current_messages, current_model).await {
Ok(response_content) => {
let assistant_message = ChatMessage {
role: "assistant".to_string(),
content: response_content,
};
messages.update(|msgs| msgs.push(assistant_message));
if should_stream {
// Clear streaming content and set streaming flag
streaming_content.set(String::new());
is_streaming.set(true);
// Use streaming API
send_chat_completion_stream(
current_messages,
current_model,
move |chunk| {
// Append chunk to streaming content
streaming_content.update(|content| content.push_str(&chunk));
},
move || {
// On complete, move streaming content to messages
let final_content = streaming_content.get();
if !final_content.is_empty() {
let assistant_message = ChatMessage {
role: "assistant".to_string(),
content: final_content,
};
messages.update(|msgs| msgs.push(assistant_message));
}
streaming_content.set(String::new());
is_streaming.set(false);
is_loading.set(false);
}
Err(error) => {
console::log_1(&format!("API Error: {}", error).into());
},
move |error| {
console::log_1(&format!("Streaming Error: {}", error).into());
error_message.set(Some(error));
is_streaming.set(false);
is_loading.set(false);
streaming_content.set(String::new());
},
);
} else {
// Use non-streaming API
spawn_local(async move {
match send_chat_completion(current_messages, current_model).await {
Ok(response_content) => {
let assistant_message = ChatMessage {
role: "assistant".to_string(),
content: response_content,
};
messages.update(|msgs| msgs.push(assistant_message));
is_loading.set(false);
}
Err(error) => {
console::log_1(&format!("API Error: {}", error).into());
error_message.set(Some(error));
is_loading.set(false);
}
}
}
});
});
}
}
};
@@ -329,6 +519,19 @@ fn ChatPage() -> impl IntoView {
}
/>
</select>
<div class="streaming-toggle">
<label>
<input
type="checkbox"
prop:checked=move || use_streaming.get()
on:change=move |ev| {
let target = event_target::<web_sys::HtmlInputElement>(&ev);
use_streaming.set(target.checked());
}
/>
" Use streaming"
</label>
</div>
</div>
</div>
@@ -348,7 +551,24 @@ fn ChatPage() -> impl IntoView {
/>
{move || {
if is_loading.get() {
if is_streaming.get() {
let content = streaming_content.get();
if !content.is_empty() {
view! {
<div class="message assistant-message streaming">
<div class="message-role">"assistant"</div>
<div class="message-content">{content}<span class="cursor">""</span></div>
</div>
}.into_any()
} else {
view! {
<div class="message assistant-message loading">
<div class="message-role">"assistant"</div>
<div class="message-content">"Thinking..."</div>
</div>
}.into_any()
}
} else if is_loading.get() && !use_streaming.get() {
view! {
<div class="message assistant-message loading">
<div class="message-role">"assistant"</div>