diff --git a/crates/chat-cli/src/cli/chat/mod.rs b/crates/chat-cli/src/cli/chat/mod.rs index 099c0c8761..ff013912e8 100644 --- a/crates/chat-cli/src/cli/chat/mod.rs +++ b/crates/chat-cli/src/cli/chat/mod.rs @@ -130,6 +130,7 @@ use tool_manager::{ }; use tools::delegate::{ AgentExecution, + completion_channel, save_agent_execution, status_all_agents, }; @@ -680,6 +681,10 @@ pub struct ChatSession { prompt_ack_rx: std::sync::mpsc::Receiver<()>, /// Additional context to be added to the next user message (e.g., delegate task summaries) pending_additional_context: Option, + /// Sender for subagent completion notifications (cloned into each spawned agent) + subagent_completion_tx: tokio::sync::mpsc::UnboundedSender, + /// Receiver for subagent completion notifications + subagent_completion_rx: tokio::sync::mpsc::UnboundedReceiver, } impl ChatSession { @@ -794,6 +799,8 @@ impl ChatSession { } }); + let (subagent_completion_tx, subagent_completion_rx) = completion_channel(); + Ok(Self { stdout: control_end_stdout, stderr: control_end_stderr, @@ -817,6 +824,8 @@ impl ChatSession { wrap, prompt_ack_rx, pending_additional_context: None, + subagent_completion_tx, + subagent_completion_rx, }) } @@ -2503,6 +2512,7 @@ impl ChatSession { &mut self.stdout, &mut self.conversation.file_line_tracker, &self.conversation.agents, + Some(self.subagent_completion_tx.clone()), ) .await; @@ -2853,228 +2863,270 @@ impl ChatSession { } loop { - match rx.recv().await { - Some(Ok(msg_event)) => { - trace!("Consumed: {:?}", msg_event); - - match msg_event { - parser::ResponseEvent::ToolUseStart { name } => { - // We need to flush the buffer here, otherwise text will not be - // printed while we are receiving tool use events. - buf.push('\n'); - tool_name_being_recvd = Some(name); - }, - parser::ResponseEvent::AssistantText(text) => { - if self.stdout.should_send_structured_event { - if !response_prefix_printed && !text.trim().is_empty() { - let msg_start = TextMessageStart { - message_id: request_id.clone().unwrap_or_default(), - role: MessageRole::Assistant, - }; - - self.stdout.send(Event::TextMessageStart(msg_start))?; - response_prefix_printed = true; - } - } else { - // Add Q response prefix before the first assistant text. - if !response_prefix_printed && !text.trim().is_empty() { - queue!( - self.stdout, - StyledText::success_fg(), - style::Print("> "), - StyledText::reset(), - )?; - response_prefix_printed = true; - } - } - buf.push_str(&text); - }, - parser::ResponseEvent::ToolUse(tool_use) => { - if self.spinner.is_some() { - drop(self.spinner.take()); - queue!( - self.stderr, - terminal::Clear(terminal::ClearType::CurrentLine), - cursor::MoveToColumn(0), - )?; - } - tool_uses.push(tool_use); - tool_name_being_recvd = None; - }, - parser::ResponseEvent::EndStream { - message, - request_metadata: rm, - } => { - // This log is attempting to help debug instances where users encounter - // the response timeout message. - if message.content() == RESPONSE_TIMEOUT_CONTENT { - error!(?request_id, ?message, "Encountered an unexpected model response"); - } - self.conversation.push_assistant_message(os, message, Some(rm.clone())); - self.user_turn_request_metadata.push(rm); - ended = true; + tokio::select! { + biased; + // Check for subagent completions (non-blocking priority) + Some(execution) = self.subagent_completion_rx.recv() => { + let status = if execution.status == tools::delegate::AgentStatus::Completed { + "✓" + } else { + "✗" + }; + // Brief inline notification on stderr (doesn't disrupt model stream) + let _ = execute!( + self.stderr, + style::Print(format!( + "\n {} Background task '{}' finished {}\n", + status, execution.agent, status + )), + ); + // Play bell to alert user + if os.database.settings.get_bool(Setting::ChatEnableNotifications).unwrap_or(false) { + play_notification_bell(true); + } + // Mark as notified and save + let mut exec = execution; + exec.user_notified = true; + if let Err(e) = save_agent_execution(os, &exec).await { + eprintln!("Failed to mark agent execution as notified: {}", e); + } + // Queue the full details for the next prompt context + let summary = exec.summary.as_deref().unwrap_or("No summary available"); + let ctx = format!( + "Background agent '{}' completed. Summary: {}", + exec.agent, summary + ); + match &mut self.pending_additional_context { + Some(existing) => { + existing.push('\n'); + existing.push_str(&ctx); }, + None => self.pending_additional_context = Some(ctx), } - }, - Some(Err(recv_error)) => { - if let Some(request_id) = &recv_error.request_metadata.request_id { - self.failed_request_ids.push(request_id.clone()); - }; - - self.user_turn_request_metadata - .push(recv_error.request_metadata.clone()); - let (reason, reason_desc) = get_error_reason(&recv_error); - let status_code = recv_error.status_code(); - - match recv_error.source { - RecvErrorKind::StreamTimeout { source, duration } => { - self.send_chat_telemetry( - os, - TelemetryResult::Failed, - Some(reason), - Some(reason_desc), - status_code, - false, // We retry the request, so don't end the current turn yet. - ) - .await; - - error!( - recv_error.request_metadata.request_id, - ?source, - "Encountered a stream timeout after waiting for {}s", - duration.as_secs() - ); - - execute!(self.stderr, cursor::Hide)?; - self.spinner = Some(Spinner::new(Spinners::Dots, "Dividing up the work...".to_string())); - - // For stream timeouts, we'll tell the model to try and split its response into - // smaller chunks. - self.conversation.push_assistant_message( - os, - AssistantMessage::new_response(None, RESPONSE_TIMEOUT_CONTENT.to_string()), - None, - ); - self.conversation - .set_next_user_message( - "You took too long to respond - try to split up the work into smaller steps." - .to_string(), - ) - .await; - self.send_tool_use_telemetry(os).await; - return Ok(ChatState::HandleResponseStream( - self.conversation - .as_sendable_conversation_state(os, &mut self.stderr, false) - .await?, - )); + continue; + } + msg = rx.recv() => { + match msg { + Some(Ok(msg_event)) => { + trace!("Consumed: {:?}", msg_event); + + match msg_event { + parser::ResponseEvent::ToolUseStart { name } => { + // We need to flush the buffer here, otherwise text will not be + // printed while we are receiving tool use events. + buf.push('\n'); + tool_name_being_recvd = Some(name); + }, + parser::ResponseEvent::AssistantText(text) => { + if self.stdout.should_send_structured_event { + if !response_prefix_printed && !text.trim().is_empty() { + let msg_start = TextMessageStart { + message_id: request_id.clone().unwrap_or_default(), + role: MessageRole::Assistant, + }; + + self.stdout.send(Event::TextMessageStart(msg_start))?; + response_prefix_printed = true; + } + } else { + // Add Q response prefix before the first assistant text. + if !response_prefix_printed && !text.trim().is_empty() { + queue!( + self.stdout, + StyledText::success_fg(), + style::Print("> "), + StyledText::reset(), + )?; + response_prefix_printed = true; + } + } + buf.push_str(&text); + }, + parser::ResponseEvent::ToolUse(tool_use) => { + if self.spinner.is_some() { + drop(self.spinner.take()); + queue!( + self.stderr, + terminal::Clear(terminal::ClearType::CurrentLine), + cursor::MoveToColumn(0), + )?; + } + tool_uses.push(tool_use); + tool_name_being_recvd = None; + }, + parser::ResponseEvent::EndStream { + message, + request_metadata: rm, + } => { + // This log is attempting to help debug instances where users encounter + // the response timeout message. + if message.content() == RESPONSE_TIMEOUT_CONTENT { + error!(?request_id, ?message, "Encountered an unexpected model response"); + } + self.conversation.push_assistant_message(os, message, Some(rm.clone())); + self.user_turn_request_metadata.push(rm); + ended = true; + }, + } }, - RecvErrorKind::UnexpectedToolUseEos { - tool_use_id, - name, - message, - .. - } => { - self.send_chat_telemetry( - os, - TelemetryResult::Failed, - Some(reason), - Some(reason_desc), - status_code, - false, // We retry the request, so don't end the current turn yet. - ) - .await; + Some(Err(recv_error)) => { + if let Some(request_id) = &recv_error.request_metadata.request_id { + self.failed_request_ids.push(request_id.clone()); + }; - error!( - recv_error.request_metadata.request_id, - tool_use_id, name, "The response stream ended before the entire tool use was received" - ); - self.conversation - .push_assistant_message(os, *message, Some(recv_error.request_metadata)); - let tool_results = vec![ToolUseResult { + self.user_turn_request_metadata + .push(recv_error.request_metadata.clone()); + let (reason, reason_desc) = get_error_reason(&recv_error); + let status_code = recv_error.status_code(); + + match recv_error.source { + RecvErrorKind::StreamTimeout { source, duration } => { + self.send_chat_telemetry( + os, + TelemetryResult::Failed, + Some(reason), + Some(reason_desc), + status_code, + false, + ) + .await; + + error!( + recv_error.request_metadata.request_id, + ?source, + "Encountered a stream timeout after waiting for {}s", + duration.as_secs() + ); + + execute!(self.stderr, cursor::Hide)?; + self.spinner = Some(Spinner::new(Spinners::Dots, "Dividing up the work...".to_string())); + + self.conversation.push_assistant_message( + os, + AssistantMessage::new_response(None, RESPONSE_TIMEOUT_CONTENT.to_string()), + None, + ); + self.conversation + .set_next_user_message( + "You took too long to respond - try to split up the work into smaller steps." + .to_string(), + ) + .await; + self.send_tool_use_telemetry(os).await; + return Ok(ChatState::HandleResponseStream( + self.conversation + .as_sendable_conversation_state(os, &mut self.stderr, false) + .await?, + )); + }, + RecvErrorKind::UnexpectedToolUseEos { tool_use_id, - content: vec![ToolUseResultBlock::Text( - "The generated tool was too large, try again but this time split up the work between multiple tool uses".to_string(), - )], - status: ToolResultStatus::Error, - }]; - self.conversation.add_tool_results(tool_results); - self.send_tool_use_telemetry(os).await; - return Ok(ChatState::HandleResponseStream( - self.conversation - .as_sendable_conversation_state(os, &mut self.stderr, false) - .await?, - )); - }, - RecvErrorKind::ToolValidationError { - tool_use_id, - name, - message, - error_message, - } => { - self.send_chat_telemetry( - os, - TelemetryResult::Failed, - Some(reason), - Some(reason_desc), - status_code, - false, // We retry the request, so don't end the current turn yet. - ) - .await; - - error!( - recv_error.request_metadata.request_id, - tool_use_id, name, error_message, "Tool validation failed" - ); - self.conversation - .push_assistant_message(os, *message, Some(recv_error.request_metadata)); - let tool_results = vec![ToolUseResult { - tool_use_id, - content: vec![ToolUseResultBlock::Text(format!( - "Tool validation failed: {}. Please ensure tool arguments are provided as a valid JSON object.", - error_message - ))], - status: ToolResultStatus::Error, - }]; - // User hint of what happened - let _ = queue!( - self.stdout, - style::Print("\n\n"), - StyledText::warning_fg(), - style::Print(format!( - "Tool validation failed: {}\n Retrying the request...", - error_message - )), - StyledText::reset(), - style::Print("\n"), - ); - self.conversation.add_tool_results(tool_results); - self.send_tool_use_telemetry(os).await; - return Ok(ChatState::HandleResponseStream( - self.conversation - .as_sendable_conversation_state(os, &mut self.stderr, false) - .await?, - )); + name, + message, + .. + } => { + self.send_chat_telemetry( + os, + TelemetryResult::Failed, + Some(reason), + Some(reason_desc), + status_code, + false, + ) + .await; + + error!( + recv_error.request_metadata.request_id, + tool_use_id, name, "The response stream ended before the entire tool use was received" + ); + self.conversation + .push_assistant_message(os, *message, Some(recv_error.request_metadata)); + let tool_results = vec![ToolUseResult { + tool_use_id, + content: vec![ToolUseResultBlock::Text( + "The generated tool was too large, try again but this time split up the work between multiple tool uses".to_string(), + )], + status: ToolResultStatus::Error, + }]; + self.conversation.add_tool_results(tool_results); + self.send_tool_use_telemetry(os).await; + return Ok(ChatState::HandleResponseStream( + self.conversation + .as_sendable_conversation_state(os, &mut self.stderr, false) + .await?, + )); + }, + RecvErrorKind::ToolValidationError { + tool_use_id, + name, + message, + error_message, + } => { + self.send_chat_telemetry( + os, + TelemetryResult::Failed, + Some(reason), + Some(reason_desc), + status_code, + false, + ) + .await; + + error!( + recv_error.request_metadata.request_id, + tool_use_id, name, error_message, "Tool validation failed" + ); + self.conversation + .push_assistant_message(os, *message, Some(recv_error.request_metadata)); + let tool_results = vec![ToolUseResult { + tool_use_id, + content: vec![ToolUseResultBlock::Text(format!( + "Tool validation failed: {}. Please ensure tool arguments are provided as a valid JSON object.", + error_message + ))], + status: ToolResultStatus::Error, + }]; + let _ = queue!( + self.stdout, + style::Print("\n\n"), + StyledText::warning_fg(), + style::Print(format!( + "Tool validation failed: {}\n Retrying the request...", + error_message + )), + StyledText::reset(), + style::Print("\n"), + ); + self.conversation.add_tool_results(tool_results); + self.send_tool_use_telemetry(os).await; + return Ok(ChatState::HandleResponseStream( + self.conversation + .as_sendable_conversation_state(os, &mut self.stderr, false) + .await?, + )); + }, + _ => { + self.send_chat_telemetry( + os, + TelemetryResult::Failed, + Some(reason), + Some(reason_desc), + status_code, + true, + ) + .await; + + return Err(recv_error.into()); + }, + } }, - _ => { - self.send_chat_telemetry( - os, - TelemetryResult::Failed, - Some(reason), - Some(reason_desc), - status_code, - true, // Hard fail -> end the current user turn. - ) - .await; - - return Err(recv_error.into()); + None => { + warn!("response stream receiver closed before receiving a stop event"); + ended = true; }, } - }, - None => { - warn!("response stream receiver closed before receiving a stop event"); - ended = true; - }, - } + } + } // tokio::select! // Fix for the markdown parser copied over from q chat: // this is a hack since otherwise the parser might report Incomplete with useful data @@ -3608,7 +3660,21 @@ impl ChatSession { prompt::generate_prompt(profile.as_deref(), all_trusted, tangent_mode, usage_percentage); if ExperimentManager::is_enabled(os, ExperimentName::Delegate) { + // Drain any channel-based completions from this session's spawned agents + let mut channel_executions = Vec::new(); + while let Ok(execution) = self.subagent_completion_rx.try_recv() { + channel_executions.push(execution); + } + + // Also check filesystem for completions from other sessions if let Ok(mut executions) = status_all_agents(os).await { + // Merge: filesystem results + channel results (dedup by agent name) + for ch_exec in channel_executions { + if !executions.iter().any(|e| e.agent == ch_exec.agent) { + executions.push(ch_exec); + } + } + if !executions.is_empty() { let rich_notification = format_rich_notification(&executions); generated_prompt = format!("{}\n{}", rich_notification, generated_prompt); diff --git a/crates/chat-cli/src/cli/chat/tools/delegate.rs b/crates/chat-cli/src/cli/chat/tools/delegate.rs index fd7a1e31ce..acf4d48b9f 100644 --- a/crates/chat-cli/src/cli/chat/tools/delegate.rs +++ b/crates/chat-cli/src/cli/chat/tools/delegate.rs @@ -23,6 +23,8 @@ use strum::{ EnumString, }; +use tokio::sync::mpsc; + use crate::cli::agent::Agents; use crate::cli::chat::tools::{ InvokeOutput, @@ -41,6 +43,12 @@ use crate::theme::StyledText; use crate::util::env_var::get_all_env_vars; use crate::util::paths::PathResolver; +/// Creates a channel pair for subagent completion notifications. +/// The sender is passed to `spawn_agent_process`; the receiver is held by `ChatSession`. +pub fn completion_channel() -> (mpsc::UnboundedSender, mpsc::UnboundedReceiver) { + mpsc::unbounded_channel() +} + /// Launch and manage async agent processes. Delegate tasks to agents that run independently in /// background. /// @@ -49,7 +57,8 @@ use crate::util::paths::PathResolver; /// - status: Check agent status (agent optional - defaults to 'all') /// - list: Show available agents /// -/// Only one task per agent. Files stored in the workspace subagents directory +/// Multiple concurrent tasks per agent are supported. Files stored in the workspace subagents +/// directory. /// /// Examples: /// - Launch: {"operation": "launch", "agent": "rust-agent", "task": "Create snake game"} @@ -83,7 +92,13 @@ impl Delegate { ExperimentManager::is_enabled(os, ExperimentName::Delegate) } - pub async fn invoke(&self, os: &Os, _output: &mut impl Write, agents: &Agents) -> Result { + pub async fn invoke( + &self, + os: &Os, + _output: &mut impl Write, + agents: &Agents, + completion_tx: Option>, + ) -> Result { if !Self::is_enabled(os) { return Ok(InvokeOutput { output: OutputKind::Text( @@ -101,7 +116,7 @@ impl Delegate { let agent_name = self.agent.as_deref().unwrap_or(DEFAULT_AGENT_NAME); - launch_agent(os, agent_name, agents, task).await? + launch_agent(os, agent_name, agents, task, completion_tx).await? }, Operation::Status => match &self.agent { Some(agent_name) => status_agent(os, agent_name).await?, @@ -142,19 +157,15 @@ impl Delegate { } } -pub async fn launch_agent(os: &Os, agent: &str, agents: &Agents, task: &str) -> Result { +pub async fn launch_agent( + os: &Os, + agent: &str, + agents: &Agents, + task: &str, + completion_tx: Option>, +) -> Result { validate_agent_availability(os, agent).await?; - // Check if agent is already running - if let Some((execution, _)) = load_agent_execution(os, agent).await? { - if execution.status == AgentStatus::Running { - return Err(eyre::eyre!( - "Agent '{}' is already running. Use status operation to check progress or wait for completion.", - agent - )); - } - } - if agent == DEFAULT_AGENT_NAME { // Show warning for default agent but no approval needed display_default_agent_warning()?; @@ -163,7 +174,7 @@ pub async fn launch_agent(os: &Os, agent: &str, agents: &Agents, task: &str) -> request_user_approval(agent, agents, task).await?; } - spawn_agent_process(os, agent, task).await?; + spawn_agent_process(os, agent, task, completion_tx).await?; Ok(format_launch_success(agent, task)) } @@ -268,6 +279,9 @@ impl AgentStatus { pub struct AgentExecution { #[serde(default)] pub agent: String, + /// Unique identifier for this task execution, enabling multiple concurrent tasks per agent. + #[serde(default)] + pub task_id: String, #[serde(default)] pub task: String, #[serde(default)] @@ -334,7 +348,12 @@ impl From<&Agent> for AgentConfig { } } -pub async fn spawn_agent_process(os: &Os, agent: &str, task: &str) -> Result { +pub async fn spawn_agent_process( + os: &Os, + agent: &str, + task: &str, + completion_tx: Option>, +) -> Result { let now = Utc::now(); // Run Q chat with specific agent in background, non-interactive @@ -357,8 +376,11 @@ pub async fn spawn_agent_process(os: &Os, agent: &str, task: &str) -> Result Result Result { } } -async fn monitor_child_process(child: tokio::process::Child, mut execution: AgentExecution, os: Os) { +async fn monitor_child_process( + child: tokio::process::Child, + mut execution: AgentExecution, + os: Os, + completion_tx: Option>, +) { match child.wait_with_output().await { Ok(output) => { execution.status = if output.status.success() { @@ -452,6 +479,11 @@ async fn monitor_child_process(child: tokio::process::Child, mut execution: Agen if let Err(e) = save_agent_execution(&os, &execution).await { eprintln!("Failed to save agent execution: {}", e); } + + // Notify via channel + if let Some(tx) = completion_tx { + let _ = tx.send(execution); + } }, Err(e) => { execution.status = AgentStatus::Failed; @@ -464,29 +496,33 @@ async fn monitor_child_process(child: tokio::process::Child, mut execution: Agen if let Err(e) = save_agent_execution(&os, &execution).await { eprintln!("Failed to save agent execution: {}", e); } + + // Notify via channel + if let Some(tx) = completion_tx { + let _ = tx.send(execution); + } }, } } pub async fn status_agent(os: &Os, agent: &str) -> Result { - match load_agent_execution(os, agent).await? { - Some((mut execution, _path)) => { - // If status is running, check if PID is still alive - if execution.status == AgentStatus::Running && execution.pid != 0 && !is_process_alive(execution.pid) { - // Process died, mark as failed - execution.status = AgentStatus::Failed; - execution.completed_at = Some(chrono::Utc::now()); - execution.exit_code = Some(-1); - execution.output = "Process terminated unexpectedly (PID not found)".to_string(); - - // Save the updated status - save_agent_execution(os, &execution).await?; - } + let mut executions = load_all_agent_executions(os, agent).await?; + if executions.is_empty() { + return Ok(format!("No execution found for agent '{}'", agent)); + } - Ok(execution.format_status()) - }, - None => Ok(format!("No execution found for agent '{}'", agent)), + // Check PIDs and update dead processes + for execution in &mut executions { + if execution.status == AgentStatus::Running && execution.pid != 0 && !is_process_alive(execution.pid) { + execution.status = AgentStatus::Failed; + execution.completed_at = Some(chrono::Utc::now()); + execution.exit_code = Some(-1); + execution.output = "Process terminated unexpectedly (PID not found)".to_string(); + let _ = save_agent_execution(os, execution).await; + } } + + Ok(executions.iter().map(|e| e.format_status()).collect::>().join("\n---\n")) } pub async fn status_all_agents(os: &Os) -> Result> { @@ -562,20 +598,42 @@ pub async fn request_user_approval(agent: &str, agents: &Agents, task: &str) -> Ok(()) } -pub async fn load_agent_execution(os: &Os, agent: &str) -> Result> { - let file_path = agent_file_path(os, agent).await?; +/// Load all executions for a given agent (for concurrent task support). +pub async fn load_all_agent_executions(os: &Os, agent: &str) -> Result> { + let dir = subagents_dir(os).await?; + let prefix = format!("{}_", agent); + let mut executions = Vec::new(); + + if let Ok(mut dir_walker) = os.fs.read_dir(&dir).await { + while let Ok(Some(file)) = dir_walker.next_entry().await { + let name = file.file_name().to_string_lossy().to_string(); + if name.starts_with(&prefix) && name.ends_with(".json") { + if let Ok(bytes) = os.fs.read(file.path()).await { + if let Ok(exec) = serde_json::from_slice::(&bytes) { + executions.push(exec); + } + } + } + } + } + // Also check legacy path + let file_path = agent_file_path(os, agent).await?; if file_path.exists() { - let content = os.fs.read_to_string(&file_path).await?; - let execution: AgentExecution = serde_json::from_str(&content)?; - Ok(Some((execution, file_path))) - } else { - Ok(None) + if let Ok(content) = os.fs.read_to_string(&file_path).await { + if let Ok(exec) = serde_json::from_str::(&content) { + if !executions.iter().any(|e| e.task_id == exec.task_id) { + executions.push(exec); + } + } + } } + + Ok(executions) } pub async fn save_agent_execution(os: &Os, execution: &AgentExecution) -> Result<()> { - let file_path = agent_file_path(os, &execution.agent).await?; + let file_path = execution_file_path(os, &execution.agent, &execution.task_id).await?; let content = serde_json::to_string_pretty(execution)?; os.fs.write(&file_path, content).await?; Ok(()) @@ -586,6 +644,12 @@ pub async fn agent_file_path(os: &Os, agent: &str) -> Result { Ok(subagents_dir.join(format!("{}.json", agent))) } +/// File path for a specific task execution (concurrent-safe). +pub async fn execution_file_path(os: &Os, agent: &str, task_id: &str) -> Result { + let subagents_dir = subagents_dir(os).await?; + Ok(subagents_dir.join(format!("{}_{}.json", agent, task_id))) +} + pub async fn subagents_dir(os: &Os) -> Result { Ok(PathResolver::new(os).workspace().ensure_subagents_dir().await?) } diff --git a/crates/chat-cli/src/cli/chat/tools/mod.rs b/crates/chat-cli/src/cli/chat/tools/mod.rs index 4859fbf210..1d80d45a14 100644 --- a/crates/chat-cli/src/cli/chat/tools/mod.rs +++ b/crates/chat-cli/src/cli/chat/tools/mod.rs @@ -149,6 +149,7 @@ impl Tool { stdout: &mut impl Write, line_tracker: &mut HashMap, agents: &crate::cli::agent::Agents, + completion_tx: Option>, ) -> Result { let active_agent = agents.get_active(); match self { @@ -162,7 +163,7 @@ impl Tool { Tool::Knowledge(knowledge) => knowledge.invoke(os, stdout, active_agent).await, Tool::Thinking(think) => think.invoke(stdout).await, Tool::Todo(todo) => todo.invoke(os, stdout).await, - Tool::Delegate(delegate) => delegate.invoke(os, stdout, agents).await, + Tool::Delegate(delegate) => delegate.invoke(os, stdout, agents, completion_tx).await, } }