Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Architecture Document: octos

Overview

octos is a 15-member Rust workspace (Edition 2024, rust-version 1.85.0) providing both a coding agent CLI and a multi-channel messaging gateway. Pure Rust TLS via rustls (no OpenSSL). Error handling via eyre/color-eyre.

Workspace members:

  • 6 core crates: octos-core, octos-memory, octos-llm, octos-agent, octos-bus, octos-cli
  • 1 pipeline crate: octos-pipeline
  • 7 app-skill crates: news, deep-search, deep-crawl, send-email, account-manager, time, weather
  • 1 platform-skill crate: asr
┌─────────────────────────────────────────────────────────────┐
│                        octos-cli                             │
│           (CLI: chat, gateway, init, status)                │
├──────────────────────────┬──────────────────────────────────┤
│       octos-agent         │           octos-bus               │
│  (Agent, Tools, Skills)  │  (Channels, Sessions, Cron)     │
├──────────┬───────────────┼──────────────────────────────────┤
│octos-memory│  octos-llm    │       octos-pipeline              │
│(Episodes) │ (Providers)  │  (DOT-based orchestration)      │
├──────────┴───────────────┴──────────────────────────────────┤
│                       octos-core                             │
│            (Types, Messages, Gateway Protocol)              │
└─────────────────────────────────────────────────────────────┘

octos-core — Foundation Types

Shared types with no internal dependencies. Only depends on serde, chrono, uuid, eyre.

MessageRole implements as_str() -> &'static str and Display for consistent string conversion across providers (system/user/assistant/tool).

Task Model

#![allow(unused)]
fn main() {
pub struct Task {
    pub id: TaskId,                   // UUID v7 (temporal ordering)
    pub parent_id: Option<TaskId>,    // For subtasks
    pub status: TaskStatus,
    pub kind: TaskKind,
    pub context: TaskContext,
    pub result: Option<TaskResult>,
    pub created_at: DateTime<Utc>,
    pub updated_at: DateTime<Utc>,
}
}

TaskId: Newtype over Uuid. Generates UUID v7 via Uuid::now_v7(). Implements Display, FromStr, Default.

TaskStatus (tagged enum, "state" discriminant):

  • Pending — awaiting assignment
  • InProgress { agent_id: AgentId } — executing
  • Blocked { reason: String } — waiting for dependency
  • Completed — success
  • Failed { error: String } — failure with message

TaskKind (tagged enum, "type" discriminant):

  • Plan { goal: String }
  • Code { instruction: String, files: Vec<PathBuf> }
  • Review { diff: String }
  • Test { command: String }
  • Custom { name: String, params: serde_json::Value }

TaskContext:

  • working_dir: PathBuf, git_state: Option<GitState>, working_memory: Vec<Message>, episodic_refs: Vec<EpisodeRef>, files_in_scope: Vec<PathBuf>

TaskResult:

  • success: bool, output: String, files_modified: Vec<PathBuf>, subtasks: Vec<TaskId>, token_usage: TokenUsage

TokenUsage: input_tokens: u32, output_tokens: u32 (defaults to 0/0)

Message Types

#![allow(unused)]
fn main() {
pub struct Message {
    pub role: MessageRole,           // System | User | Assistant | Tool
    pub content: String,
    pub media: Vec<String>,          // File paths (images, audio)
    pub tool_calls: Option<Vec<ToolCall>>,
    pub tool_call_id: Option<String>,
    pub timestamp: DateTime<Utc>,
}

pub struct ToolCall {
    pub id: String,
    pub name: String,
    pub arguments: serde_json::Value,
}
}

Gateway Protocol

#![allow(unused)]
fn main() {
pub struct InboundMessage {       // channel → agent
    pub channel: String,          // "telegram", "cli", "discord", etc.
    pub sender_id: String,
    pub chat_id: String,
    pub content: String,
    pub timestamp: DateTime<Utc>,
    pub media: Vec<String>,
    pub metadata: serde_json::Value,
}

pub struct OutboundMessage {      // agent → channel
    pub channel: String,
    pub chat_id: String,
    pub content: String,
    pub reply_to: Option<String>,
    pub media: Vec<String>,
    pub metadata: serde_json::Value,
}
}

InboundMessage::session_key() derives SessionKey::new(channel, chat_id) — format "{channel}:{chat_id}".

Inter-Agent Coordination

#![allow(unused)]
fn main() {
pub enum AgentMessage {           // tagged: "type", snake_case
    TaskAssign { task: Box<Task> },
    TaskUpdate { task_id: TaskId, status: TaskStatus },
    TaskComplete { task_id: TaskId, result: TaskResult },
    ContextRequest { task_id: TaskId, query: String },
    ContextResponse { task_id: TaskId, context: Vec<Message> },
}
}

Error System

#![allow(unused)]
fn main() {
pub struct Error {
    pub kind: ErrorKind,
    pub context: Option<String>,      // Chained context
    pub suggestion: Option<String>,   // Actionable fix hint
}
}

ErrorKind variants: TaskNotFound, AgentNotFound, InvalidStateTransition, LlmError, ApiError (status-aware: 401→check key, 429→rate limit), ToolError, ConfigError, ApiKeyNotSet, UnknownProvider, Timeout, ChannelError, SessionError, IoError, SerializationError, Other(eyre::Report).

Utilities

truncate_utf8(s: &mut String, max_len: usize, suffix: &str) — in-place truncation at UTF-8 char boundaries. Appends suffix after truncation. Used across all tool outputs.


octos-llm — LLM Provider Abstraction

Provider Trait

#![allow(unused)]
fn main() {
#[async_trait]
pub trait LlmProvider: Send + Sync {
    async fn chat(&self, messages: &[Message], tools: &[ToolSpec], config: &ChatConfig) -> Result<ChatResponse>;
    async fn chat_stream(&self, messages: &[Message], tools: &[ToolSpec], config: &ChatConfig) -> Result<ChatStream>;  // default: falls back to chat()
    fn context_window(&self) -> u32;  // default: context_window_tokens(self.model_id())
    fn model_id(&self) -> &str;
    fn provider_name(&self) -> &str;
}
}

Configuration

#![allow(unused)]
fn main() {
pub struct ChatConfig {
    pub max_tokens: Option<u32>,        // default: Some(4096)
    pub temperature: Option<f32>,       // default: Some(0.0)
    pub tool_choice: ToolChoice,        // Auto | Required | None | Specific { name }
    pub stop_sequences: Vec<String>,
}
}

Response Types

#![allow(unused)]
fn main() {
pub struct ChatResponse {
    pub content: Option<String>,
    pub tool_calls: Vec<ToolCall>,
    pub stop_reason: StopReason,       // EndTurn | ToolUse | MaxTokens | StopSequence
    pub usage: TokenUsage,
}

pub enum StreamEvent {
    TextDelta(String),
    ToolCallDelta { index, id, name, arguments_delta },
    Usage(TokenUsage),
    Done(StopReason),
    Error(String),
}

pub type ChatStream = Pin<Box<dyn Stream<Item = StreamEvent> + Send>>;
}

Provider Registry (registry/)

All providers are defined in octos-llm/src/registry/ — one file per provider. Each file exports a ProviderEntry with metadata (name, aliases, default model, API key env var, base URL) and a create() factory function. Adding a new provider = one file + one line in mod.rs.

#![allow(unused)]
fn main() {
pub struct ProviderEntry {
    pub name: &'static str,              // canonical name
    pub aliases: &'static [&'static str], // e.g. ["google"] for gemini
    pub default_model: Option<&'static str>,
    pub api_key_env: Option<&'static str>,
    pub default_base_url: Option<&'static str>,
    pub requires_api_key: bool,
    pub requires_base_url: bool,          // true for vllm
    pub requires_model: bool,             // true for vllm
    pub detect_patterns: &'static [&'static str], // model→provider auto-detect
    pub create: fn(CreateParams) -> Result<Arc<dyn LlmProvider>>,
}

pub struct CreateParams {
    pub api_key: Option<String>,
    pub model: Option<String>,
    pub base_url: Option<String>,
    pub model_hints: Option<ModelHints>,  // config-level override
}
}

Lookup: registry::lookup(name) — case-insensitive, matches canonical name or aliases. Auto-detect: registry::detect_provider(model) — infers provider from model name patterns.

Native Providers (4 protocol implementations)

ProviderBase URLAuth HeaderImage FormatDefault Model
Anthropicapi.anthropic.comx-api-keyBase64 blocksclaude-sonnet-4-20250514
OpenAIapi.openai.com/v1Authorization: BearerData URIgpt-4o
Geminigenerativelanguage.googleapis.com/v1betax-goog-api-keyBase64 inlinegemini-2.5-flash
OpenRouteropenrouter.ai/api/v1Authorization: BearerData URIanthropic/claude-sonnet-4-20250514

OpenAI-Compatible Providers (via OpenAIProvider::with_base_url())

ProviderAliasesBase URLDefault ModelAPI Key Env
DeepSeekapi.deepseek.com/v1deepseek-chatDEEPSEEK_API_KEY
Groqapi.groq.com/openai/v1llama-3.3-70b-versatileGROQ_API_KEY
Moonshotkimiapi.moonshot.ai/v1kimi-k2.5MOONSHOT_API_KEY
DashScopeqwendashscope.aliyuncs.com/compatible-mode/v1qwen-maxDASHSCOPE_API_KEY
MiniMaxapi.minimax.io/v1MiniMax-Text-01MINIMAX_API_KEY
Zhipuglmopen.bigmodel.cn/api/paas/v4glm-4-plusZHIPU_API_KEY
Nvidianimintegrate.api.nvidia.com/v1meta/llama-3.3-70b-instructNVIDIA_API_KEY
Ollamalocalhost:11434/v1llama3.2(none)
vLLM(user-provided)(user-provided)VLLM_API_KEY

Anthropic-Compatible Provider

ProviderAliasesBase URLDefault ModelAPI Key Env
Z.AIzai, z.aiapi.z.ai/api/anthropicglm-5ZAI_API_KEY

ModelHints (OpenAI provider)

Auto-detected from model name at construction, overridable via config model_hints:

#![allow(unused)]
fn main() {
pub struct ModelHints {
    pub uses_completion_tokens: bool,  // o-series, gpt-5, gpt-4.1
    pub fixed_temperature: bool,       // o-series, kimi-k2.5
    pub lacks_vision: bool,            // deepseek, minimax, mistral, yi-
    pub merge_system_messages: bool,   // default: true
}
}

SSE Streaming

parse_sse_response(response) -> impl Stream<Item = SseEvent> — stateful unfold-based parser. Max buffer: 1 MB. Handles \n\n and \r\n\r\n separators. Each provider maps SSE events to StreamEvent:

  • Anthropic: message_start → input tokens, content_block_start/delta → text/tool chunks, message_delta → stop reason. Custom SSE state machine.
  • OpenAI/OpenRouter: Standard OpenAI SSE with [DONE] sentinel. delta.content for text, delta.tool_calls[] for tools. Shared parser: parse_openai_sse_events().
  • Gemini: alt=sse endpoint. candidates[0].content.parts[] with function call data.

RetryProvider

Wraps any Arc<dyn LlmProvider> with exponential backoff. Wrapped by ProviderChain for multi-provider failover.

#![allow(unused)]
fn main() {
pub struct RetryConfig {
    pub max_retries: u32,           // default: 3
    pub initial_delay: Duration,    // default: 1s
    pub max_delay: Duration,        // default: 60s
    pub backoff_multiplier: f64,    // default: 2.0
}
}

Delay formula: initial_delay * backoff_multiplier^attempt, capped at max_delay.

Retryable errors (three-tier detection):

  1. HTTP status: 429, 500, 502, 503, 504, 529
  2. reqwest: is_connect() or is_timeout()
  3. String fallback: “connection refused”, “timed out”, “overloaded”

Provider Failover Chain

ProviderChain wraps multiple Arc<dyn LlmProvider> and transparently fails over on retriable errors. Configured via fallback_models in config.

#![allow(unused)]
fn main() {
pub struct ProviderChain {
    slots: Vec<ProviderSlot>,       // provider + AtomicU32 failure count
    failure_threshold: u32,         // default: 3
}
}

Behavior: Tries providers in order, skipping degraded ones (failures >= threshold). On retriable error, moves to the next. On success, resets failure count. If all degraded, picks the one with fewest failures.

Failoverable: Broader than retryable — includes 401/403, timeouts, and content-format 400 errors (e.g. "must not be empty", "reasoning_content", "API key not valid", "invalid_value"). These should not retry on the same provider but should failover to a different one.

AdaptiveRouter (adaptive.rs)

Metrics-driven provider selection with three mutually exclusive modes (Off, Hedge, Lane). Tracks per-provider EMA latency (configurable ema_alpha, default 0.3), p95 latency (64-sample circular buffer), error rates, throughput (output tokens/sec EMA), and cost. Four-factor scoring: stability, quality, priority, cost (all weights configurable). Includes circuit breaker, probe requests, model catalog seeding from model_catalog.json, and QoS ranking. Scoring uses EMA blending: baseline catalog data at cold start, live metrics gradually replace it (weight ramps from 0 to 1 over 10 calls).

#![allow(unused)]
fn main() {
pub struct AdaptiveSlot {
    provider: Arc<dyn LlmProvider>,
    metrics: ProviderMetrics,
    priority: usize,
    cost_per_m: f64,
    model_type: Mutex<ModelType>,        // Strong | Fast
    cost_in: AtomicU64,
    ds_output: AtomicU64,                // deep search output quality
    baseline_stability: AtomicU64,
    baseline_tool_avg_ms: AtomicU64,
    baseline_p95_ms: AtomicU64,
    context_window: AtomicU64,
    max_output: AtomicU64,
}
}

Hedge mode: Races primary + cheapest alternate via tokio::select!, cancels loser. Only completed requests record metrics (cancelled loser metrics are discarded). If primary fails, alternate is tried sequentially.

Lane mode: Scores all providers, picks single best. Probe requests sent to stale providers (configurable probability, default 0.1; interval, default 60s).

FallbackProvider (fallback.rs)

Wraps primary + QoS-ranked fallbacks. On failure, records cooldown via ProviderRouter. Tries each fallback in order.

SwappableProvider (swappable.rs)

Runtime model switching via RwLock. Leaks ~50 bytes per swap (acceptable for rare user-initiated changes). cached_model_id and cached_provider_name are leaked &'static str to satisfy the &str return type.

ProviderRouter (router.rs)

Sub-agent multi-model routing with prefix-based key resolution. Supports cooldown (60s default), QoS-scored compatible_fallbacks() (sorted by model catalog score), cost info auto-derived from pricing.rs, and metadata for LLM-visible tool schemas.

#![allow(unused)]
fn main() {
pub struct ProviderRouter {
    providers: RwLock<HashMap<String, Arc<dyn LlmProvider>>>,
    active_key: RwLock<Option<String>>,
    metadata: RwLock<HashMap<String, SubProviderMeta>>,
    cooldowns: RwLock<HashMap<String, Instant>>,
    qos_scores: RwLock<HashMap<String, f64>>,
}
}

OminixClient (ominix.rs)

Client for local ASR/TTS via Ominix runtime.

Token Estimation

#![allow(unused)]
fn main() {
pub fn estimate_tokens(text: &str) -> u32  // ~4 chars/token ASCII, ~1.5 chars/token CJK
pub fn estimate_message_tokens(msg: &Message) -> u32  // content + tool_calls + 4 overhead
}

Context Windows

Model FamilyTokens
Claude 3/4200,000
GPT-4o/4-turbo128,000
o1/o3/o4200,000
Gemini 2.0/1.51,000,000
Default (unknown)128,000

Pricing

model_pricing(model_id) -> Option<ModelPricing> — case-insensitive substring match. Cost = (input/1M) * input_rate + (output/1M) * output_rate.

ModelInput $/1MOutput $/1M
claude-opus-415.0075.00
claude-sonnet-43.0015.00
claude-haiku0.804.00
gpt-4o2.5010.00
gpt-4o-mini0.150.60
o3/o410.0040.00

Embedding

#![allow(unused)]
fn main() {
pub trait EmbeddingProvider: Send + Sync {
    async fn embed(&self, texts: &[&str]) -> Result<Vec<Vec<f32>>>;
    fn dimension(&self) -> usize;
}
}

OpenAIEmbedder: Default model text-embedding-3-small (1536 dims). text-embedding-3-large = 3072 dims.

Transcription

GroqTranscriber: Whisper whisper-large-v3 via https://api.groq.com/openai/v1/audio/transcriptions. Multipart form. 60s timeout. MIME detection: ogg/opus→audio/ogg, mp3→audio/mpeg, m4a→audio/mp4, wav→audio/wav.

Vision

encode_image(path) -> (mime_type, base64_data) — JPEG/PNG/GIF/WebP. is_image(path) -> bool.

Typed Error Hierarchy (error.rs)

LlmError with LlmErrorKind enum: Authentication, RateLimited, ContextOverflow, ModelNotFound, ServerError, Network, Timeout, InvalidRequest, ContentFiltered, StreamError, Provider. is_retryable() returns true for RateLimited, ServerError, Network, Timeout, StreamError. from_status(code, body) maps HTTP status codes to error kinds. Provider response bodies logged at debug level only (not exposed in error messages).

High-Level Client (high_level.rs)

LlmClient wraps Arc<dyn LlmProvider> with ergonomic APIs: generate(prompt), generate_with(messages, tools, config), generate_object(prompt, schema_name, schema), generate_typed<T>(prompt, schema_name, schema), stream(prompt), stream_with(messages, tools, config). Configurable via with_config(ChatConfig).

Middleware Pipeline (middleware.rs)

LlmMiddleware trait with before()/after()/on_error() hooks. MiddlewareStack wraps LlmProvider and runs layers in insertion order. before() can short-circuit with cached responses. Built-in: LoggingMiddleware (tracing), CostTracker (AtomicU64 counters for input/output tokens and request count). Streaming bypasses middleware (logged as debug warning).

Model Catalog (catalog.rs)

ModelCatalog with ModelInfo (id, name, provider, context_window, max_output_tokens, capabilities, cost, aliases). Lookup by ID or alias via HashMap index. with_defaults() pre-registers 4 models (Claude Sonnet 4, Claude Haiku 4.5, GPT-4o, Gemini 2.5 Flash). by_provider() and with_capability() for filtered queries.


EpisodeStore

redb database at .octos/episodes.redb with three tables:

TableKeyValuePurpose
episodes&str (episode_id)&str (JSON)Full episode records
cwd_index&str (working_dir)&str (JSON array of IDs)Directory-scoped lookup
embeddings&str (episode_id)&[u8] (bincode Vec)Vector embeddings
#![allow(unused)]
fn main() {
pub struct Episode {
    pub id: String,                   // UUID v7
    pub task_id: TaskId,
    pub agent_id: AgentId,
    pub working_dir: PathBuf,
    pub summary: String,              // LLM-generated, truncated to 500 chars
    pub outcome: EpisodeOutcome,      // Success | Failure | Blocked | Cancelled
    pub key_decisions: Vec<String>,
    pub files_modified: Vec<PathBuf>,
    pub created_at: DateTime<Utc>,
}
}

Operations:

  • store(episode) — serialize to JSON, update cwd_index, insert into in-memory HybridIndex
  • get(id) — direct lookup by episode_id
  • find_relevant(cwd, query, limit) — keyword matching scoped to directory
  • recent_for_cwd(cwd, n) — N most recent by created_at descending
  • store_embedding(id, Vec<f32>) — bincode serialize, store in embeddings table, update HybridIndex
  • find_relevant_hybrid(query, query_embedding, limit) — global hybrid search across all episodes

Initialization: On open(), rebuilds in-memory HybridIndex by iterating all episodes and loading embeddings from DB.

MemoryStore

File-based persistent memory at {data_dir}/memory/:

  • MEMORY.md — long-term memory (full overwrite)
  • YYYY-MM-DD.md — daily notes (append with date header)

get_memory_context() builds system prompt injection:

  1. ## Long-term Memory — full MEMORY.md
  2. ## Recent Activity — 7-day rolling window of daily notes
  3. ## Today's Notes — current day
#![allow(unused)]
fn main() {
pub struct HybridIndex {
    inverted: HashMap<String, Vec<(usize, u32)>>,  // term → [(doc_idx, raw_tf_count)]
    doc_lengths: Vec<usize>,
    total_len: usize,                         // running total for O(1) avg_dl
    avg_dl: f64,
    ids: Vec<String>,
    hnsw: Option<Hnsw<'static, f32, DistCosine>>,
    has_embedding: Vec<bool>,
    dimension: usize,                               // default: 1536
}
}

BM25 scoring (constants: K1=1.2, B=0.75):

  • Tokenization: lowercase, split on non-alphanumeric, filter tokens < 2 chars
  • IDF: ln((N - df + 0.5) / (df + 0.5) + 1.0)
  • Score: IDF * (tf * (K1 + 1)) / (tf + K1 * (1 - B + B * dl/avg_dl)) — uses raw term counts (not normalized)
  • Duplicate detection: ids.contains(episode_id) skips already-indexed documents (line 76-78)
  • Normalized to [0, 1] range (epsilon 1e-10 prevents NaN from near-zero max scores)

HNSW vector index (via hnsw_rs):

  • Named constants: HNSW_MAX_NB_CONNECTION=16, HNSW_CAPACITY=10_000, HNSW_EF_CONSTRUCTION=200, HNSW_MAX_LAYER=16, DistCosine
  • L2 normalization before insertion/search; zero vectors rejected (returns None)
  • Cosine similarity = 1 - distance (DistCosine returns 1-cos_sim)

Hybrid ranking — fetches limit * 4 candidates from each:

  • Configurable weights via with_weights(vector_weight, bm25_weight) (defaults: 0.7 / 0.3)
  • Without vectors: BM25 only (graceful fallback)

octos-agent — Agent Runtime

Agent Core

#![allow(unused)]
fn main() {
pub struct Agent {
    id: AgentId,
    llm: Arc<dyn LlmProvider>,
    tools: ToolRegistry,
    memory: Arc<EpisodeStore>,
    embedder: Option<Arc<dyn EmbeddingProvider>>,
    system_prompt: RwLock<String>,
    config: AgentConfig,
    reporter: Arc<dyn ProgressReporter>,
    shutdown: Arc<AtomicBool>,       // Acquire/Release ordering
}

pub struct AgentConfig {
    pub max_iterations: u32,          // default: 50 (CLI overrides to 20)
    pub max_tokens: Option<u32>,      // None = unlimited
    pub max_timeout: Option<Duration>,// default: 600s wall-clock timeout
    pub save_episodes: bool,          // default: true
}
}

Execution Loop (run_task / process_message)

1. Build messages: system prompt + history + memory context + input
2. Loop (up to max_iterations):
   a. Check shutdown flag and token budget
   b. trim_to_context_window() — compact if needed
   c. Call LLM via chat_stream()
   d. Consume stream → accumulate text, tool_calls, tokens
   e. Match stop_reason:
      - EndTurn/StopSequence → save episode, return result
      - ToolUse → execute_tools() → append results → continue
      - MaxTokens → return result

ConversationResponse: content: String, token_usage: TokenUsage, files_modified: Vec<PathBuf>, streamed: bool

Episode saving: After task completion, fires-and-forgets embedding generation if embedder present.

Wall-clock timeout: Agent aborts after max_timeout (default 600s) regardless of iteration count.

Tool Output Sanitization

Before feeding tool results back to the LLM, sanitize_tool_output() (in sanitize.rs) strips noise:

  • Base64 data URIs: data:...;base64,<payload>[base64-data-redacted]
  • Long hex strings: 64+ contiguous hex chars (SHA-256, raw keys) → [hex-redacted]

Context Compaction

Triggered when estimated tokens exceed 80% of context window / 1.2 safety margin.

Algorithm:

  1. Keep MIN_RECENT_MESSAGES (6) most recent non-system messages
  2. Don’t split inside tool call/result pairs
  3. Summarize old messages: first line (200 chars), strip tool arguments, drop media
  4. Budget: 40% of total for summary (BASE_CHUNK_RATIO = 0.4)
  5. Replace: [System, CompactionSummary, Recent1, Recent2, ...]

Format:

  • User: > User: first line [media omitted]
  • Assistant: > Assistant: content or - Called tool_name
  • Tool: -> tool_name: ok|error - first 100 chars

Bundled App Skills (bundled_app_skills.rs)

Compile-time embedded app-skill entries. Each app-skill crate (news, deep-search, deep-crawl, etc.) is registered as a bundled skill available at runtime.

Bootstrap (bootstrap.rs)

Bootstraps bundled skills at gateway startup. Ensures all bundled app-skills are registered and available.

Prompt Guard (prompt_guard.rs)

Prompt injection detection. ThreatKind enum classifies detected threats. Scans user input before passing to the agent.

Tool System

#![allow(unused)]
fn main() {
pub trait Tool: Send + Sync {
    fn name(&self) -> &str;
    fn description(&self) -> &str;
    fn tags(&self) -> &[&str];
    fn input_schema(&self) -> serde_json::Value;
    async fn execute(&self, args: &serde_json::Value) -> Result<ToolResult>;
}

pub struct ToolResult {
    pub output: String,
    pub success: bool,
    pub file_modified: Option<PathBuf>,
    pub tokens_used: Option<TokenUsage>,
}
}

ToolRegistry: HashMap<String, Arc<dyn Tool>> with provider_policy: Option<ToolPolicy> for soft filtering.

Built-in Tools (14)

ToolParametersKey Behavior
read_filepath, start_line?, end_line?Line numbers (NNN|), 100KB truncation, symlink rejection
write_filepath, contentCreates parent dirs, returns file_modified
edit_filepath, old_string, new_stringExact match required, error on 0 or >1 occurrences
diff_editpath, diffUnified diff with fuzzy matching (+-3 lines), reverse hunk application
globpattern, limit=100Rejects absolute paths and .., relative results
greppattern, file_pattern?, limit=50, context=0, ignore_case=false.gitignore-aware via ignore::WalkBuilder, regex with (?i) flag
list_dirpathSorted, [dir]/[file] prefix
shellcommand, timeout_secs=120SafePolicy check, 50KB output truncation, sandbox-wrapped, timeout clamped to [1, 600]s
web_searchquery, count=5Brave Search API (BRAVE_API_KEY)
web_fetchurl, extract_mode=“markdown”, max_chars=50000SSRF protection, htmd HTML→markdown, 30s timeout
messagecontent, channel?, chat_id?Cross-channel messaging via OutboundMessage. Gateway-only
spawntask, label?, mode=“background”, allowed_tools, context?Subagent with inherited provider policy. sync=inline, background=async. Gateway-only
cronaction, message, schedule paramsSchedule add/list/remove/enable/disable. Gateway-only
browseraction, url?, selector?, text?, expression?Headless Chrome via CDP (always compiled). Actions: navigate (SSRF + scheme check), get_text, get_html, click, type, screenshot, evaluate, close. 5min idle timeout, env sanitization, 10s JS timeout, early action validation

Registration: Core tools registered in ToolRegistry::with_builtins() (all modes). Browser is always compiled. Message, spawn, and cron are registered only in gateway mode (gateway.rs).

Tool Policies

#![allow(unused)]
fn main() {
pub struct ToolPolicy {
    pub allow: Vec<String>,   // empty = allow all
    pub deny: Vec<String>,    // deny-wins
}
}

Groups: group:fs (read_file, write_file, edit_file, diff_edit), group:runtime (shell), group:web (web_search, web_fetch, browser), group:search (glob, grep, list_dir), group:sessions (spawn).

Wildcards: exec* matches prefix. Provider-specific policies via config tools.byProvider.

Command Policy (ShellTool)

#![allow(unused)]
fn main() {
pub enum Decision { Allow, Deny, Ask }
}

SafePolicy deny patterns: rm -rf /, rm -rf /*, dd if=, mkfs, :(){:|:&};:, chmod -R 777 /. Commands are whitespace-normalized before matching to prevent evasion via extra spaces/tabs.

SafePolicy ask patterns: sudo, rm -rf, git push --force, git reset --hard

Sandbox

#![allow(unused)]
fn main() {
pub enum SandboxMode { Auto, Bwrap, Macos, Docker, None }
}

BLOCKED_ENV_VARS (18 vars, shared across all backends + MCP): LD_PRELOAD, LD_LIBRARY_PATH, LD_AUDIT, DYLD_INSERT_LIBRARIES, DYLD_LIBRARY_PATH, DYLD_FRAMEWORK_PATH, DYLD_FALLBACK_LIBRARY_PATH, DYLD_VERSIONED_LIBRARY_PATH, NODE_OPTIONS, PYTHONSTARTUP, PYTHONPATH, PERL5OPT, RUBYOPT, RUBYLIB, JAVA_TOOL_OPTIONS, BASH_ENV, ENV, ZDOTDIR

BackendIsolationNetworkPath Validation
Bwrap (Linux)RO bind /usr,/lib,/bin,/sbin,/etc; RW bind workdir; tmpfs /tmp; unshare-pid--unshare-net if !allow_networkN/A
Macos (sandbox-exec)SBPL profile: process-exec/fork, file-read*, writes to workdir+/private/tmp(allow network*) or (deny network*)Rejects control chars, (, ), \, "
Docker--rm --security-opt no-new-privileges --cap-drop ALL--network noneRejects :, \0, \n, \r

Docker resource limits: --cpus, --memory, --pids-limit. Mount modes: None (/tmp workdir), ReadOnly, ReadWrite.

Hooks System

Lifecycle hooks run shell commands at agent events. Configured via hooks array in config.

#![allow(unused)]
fn main() {
pub enum HookEvent { BeforeToolCall, AfterToolCall, BeforeLlmCall, AfterLlmCall }

pub struct HookConfig {
    pub event: HookEvent,
    pub command: Vec<String>,       // argv array (no shell interpretation)
    pub timeout_ms: u64,            // default: 5000
    pub tool_filter: Vec<String>,   // tool events only; empty = all
}
}

Shell protocol: JSON payload on stdin. Exit code semantics: 0=allow, 1=deny (before-hooks only), 2+=error. Before-hooks can deny operations; after-hook exit codes only count as errors.

Circuit breaker: HookExecutor auto-disables a hook after 3 consecutive failures (configurable via with_threshold()). Resets on success.

Environment: Commands sanitized via BLOCKED_ENV_VARS. Tilde expansion supports ~/ and ~username/.

Integration: Wired into chat.rs, gateway.rs, serve.rs. Hook config changes trigger restart via config watcher.

MCP Integration

JSON-RPC transport for Model Context Protocol servers. Two transport modes:

Transports:

  1. Stdio: Spawns server as child process (command + args + env). Line limit: 1MB. Env sanitized via BLOCKED_ENV_VARS.
  2. HTTP/SSE: Connects to remote server via url field. POST JSON, SSE response handling.

Lifecycle (stdio):

  1. Spawn server (command + args + env, filtering BLOCKED_ENV_VARS)
  2. Initialize: protocolVersion: "2024-11-05"
  3. Discover tools: tools/list RPC
  4. Validate input schemas (max depth 10, max size 64KB); reject tools with invalid schemas
  5. Register McpTool wrappers (30s timeout, 1MB max response)

McpTool execution: tools/call with name + arguments. Extracts content[].text from response.

Skills System

Skills are markdown instruction files that extend agent capabilities. Two sources: built-in (compiled into binary) and workspace (user-installed).

Skill File Format (SKILL.md)

---
name: skill_name
description: What it does
requires_bins: binary1, binary2    # comma-separated, checked via `which`
requires_env: ENV_VAR1, ENV_VAR2   # comma-separated, checked via std::env::var()
always: true|false                 # auto-load into system prompt when available
---
Skill instructions here (markdown). This body is injected into the agent's
system prompt when the skill is activated.

Frontmatter parsing: Simple key: value line matching (not full YAML). split_frontmatter() finds content between --- delimiters. strip_frontmatter() returns body only.

SkillInfo

#![allow(unused)]
fn main() {
pub struct SkillInfo {
    pub name: String,
    pub description: String,
    pub path: PathBuf,          // filesystem path or "(built-in)/name/SKILL.md"
    pub available: bool,        // bins_ok && env_ok
    pub always: bool,           // auto-load into system prompt
    pub builtin: bool,          // true if from BUILTIN_SKILLS, false if workspace
}
}

Availability check: available = requires_bins all found on PATH AND requires_env all set. Missing requirements make the skill unavailable but still listed.

SkillsLoader

#![allow(unused)]
fn main() {
pub struct SkillsLoader {
    skills_dir: PathBuf,        // {data_dir}/skills/
}
}

Methods:

  • list_skills() — scans workspace dir + built-ins. Workspace skills override built-ins with same name (checked via HashSet). Results sorted alphabetically.
  • load_skill(name) — returns body (frontmatter stripped). Checks workspace first, falls back to built-in.
  • build_skills_summary() — generates XML for system prompt injection:
    <skills>
      <skill available="true">
        <name>skill_name</name>
        <description>What it does</description>
        <location>/path/to/SKILL.md</location>
      </skill>
    </skills>
    
  • get_always_skills() — filters skills where always: true AND available: true.
  • load_skills_for_context(names) — loads multiple skills, joins with \n---\n.

Built-in Skills (3, compile-time include_str!())

#![allow(unused)]
fn main() {
pub struct BuiltinSkill {
    pub name: &'static str,
    pub content: &'static str,  // full SKILL.md including frontmatter
}
pub const BUILTIN_SKILLS: &[BuiltinSkill] = &[...];
}
SkillPurpose
cronTask scheduling instructions
skill-storeSkill store browsing and installation
skill-creatorCreate new skills
tmuxTerminal multiplexer control
weatherWeather information retrieval

CLI Management (octos skills)

  • list — shows built-in skills (with override status) + workspace skills
  • install <user/repo/skill-name> — fetches SKILL.md from https://raw.githubusercontent.com/{repo}/main/SKILL.md (15s timeout), saves to .octos/skills/{name}/SKILL.md. Fails if skill already exists.
  • remove <name> — deletes .octos/skills/{name}/ directory

Integration with Gateway

In the gateway command, skills are loaded during system prompt construction:

  1. get_always_skills() — collects auto-load skill names
  2. load_skills_for_context(names) — loads and joins skill bodies
  3. build_skills_summary() — appends XML skill index to system prompt
  4. Always-on skill content is prepended to the system prompt

Plugin System

Plugins extend the agent with external tools via standalone executables. Each plugin is a directory containing a manifest.json and an executable file.

Directory Layout

.octos/plugins/           # local (project-level)
~/.octos/plugins/         # global (user-level)
  └── my-plugin/
      ├── manifest.json  # plugin metadata + tool definitions
      └── my-plugin      # executable (or "main" as fallback)

Discovery order: local .octos/plugins/ first, then global ~/.octos/plugins/. Both are scanned by Config::plugin_dirs().

PluginManifest

#![allow(unused)]
fn main() {
pub struct PluginManifest {
    pub name: String,
    pub version: String,
    pub tools: Vec<PluginToolDef>,    // default: empty vec
}

pub struct PluginToolDef {
    pub name: String,                 // must be unique across all plugins
    pub description: String,
    pub input_schema: serde_json::Value,  // default: {"type": "object"}
}
}

Example manifest.json:

{
  "name": "my-plugin",
  "version": "0.1.0",
  "tools": [
    {
      "name": "greet",
      "description": "Greet someone by name",
      "input_schema": {
        "type": "object",
        "properties": { "name": { "type": "string" } }
      }
    }
  ]
}

PluginLoader

#![allow(unused)]
fn main() {
pub struct PluginLoader;  // stateless, all methods are associated functions
}

load_into(registry, dirs):

  1. Scan each directory for subdirectories
  2. For each subdirectory, look for manifest.json
  3. Parse manifest, find executable (try directory name first, then main)
  4. Validate executable permissions (Unix: mode & 0o111 != 0; non-Unix: existence check)
  5. Wrap each tool definition as a PluginTool implementing the Tool trait
  6. Register into ToolRegistry
  7. Log warning: "loaded unverified plugin (no signature check)"
  8. Return total tool count. Failed plugins are skipped with warning, not fatal.

PluginTool — Execution Protocol

#![allow(unused)]
fn main() {
pub struct PluginTool {
    plugin_name: String,
    tool_def: PluginToolDef,
    executable: PathBuf,
}
}

Invocation: executable <tool_name> (tool name passed as first argument).

stdin/stdout protocol:

  1. Spawn executable with tool name as arg, piped stdin/stdout/stderr
  2. Write JSON-serialized arguments to stdin, close (EOF signals end of input)
  3. Wait for exit with 30s timeout (PLUGIN_TIMEOUT)
  4. Parse stdout as JSON:
    • Structured: {"output": "...", "success": true/false} → use parsed values
    • Fallback: raw stdout + stderr concatenated, success from exit code
  5. Return ToolResult (no file_modified tracking for plugins)

Error handling:

  • Spawn failure → eyre error with plugin name and executable path
  • Timeout → eyre error with plugin name, tool name, and duration
  • JSON parse failure → graceful fallback to raw output

Progress Reporting

The agent emits structured events during execution via a trait-based observer pattern. Consumers (CLI, REST API) implement the trait to render progress in their own format.

ProgressReporter Trait

#![allow(unused)]
fn main() {
pub trait ProgressReporter: Send + Sync {
    fn report(&self, event: ProgressEvent);
}
}

Agent holds reporter: Arc<dyn ProgressReporter>. Events are fired synchronously during the execution loop (non-blocking — implementations must not block).

ProgressEvent Enum

#![allow(unused)]
fn main() {
pub enum ProgressEvent {
    TaskStarted { task_id: String },
    Thinking { iteration: u32 },
    Response { content: String, iteration: u32 },
    ToolStarted { name: String, tool_id: String },
    ToolCompleted { name: String, tool_id: String, success: bool,
                    output_preview: String, duration: Duration },
    FileModified { path: String },
    TokenUsage { input_tokens: u32, output_tokens: u32 },
    TaskCompleted { success: bool, iterations: u32, duration: Duration },
    TaskInterrupted { iterations: u32 },
    MaxIterationsReached { limit: u32 },
    TokenBudgetExceeded { used: u32, limit: u32 },
    StreamChunk { text: String, iteration: u32 },
    StreamDone { iteration: u32 },
    CostUpdate { session_input_tokens: u32, session_output_tokens: u32,
                 response_cost: Option<f64>, session_cost: Option<f64> },
}
}

Implementations (3)

SilentReporter — no-op, used as default when no reporter is configured.

ConsoleReporter — CLI output with ANSI colors and streaming support:

#![allow(unused)]
fn main() {
pub struct ConsoleReporter {
    use_colors: bool,
    verbose: bool,
    stdout: Mutex<BufWriter<Stdout>>,  // buffered for streaming chunks
}
}
EventOutput
Thinking\r⟳ Thinking... (iteration N) (overwrites line, yellow)
Response◆ first 3 lines... (cyan, clears Thinking line)
ToolStarted\r⚙ Running tool_name... (overwrites line, yellow)
ToolCompleted✓ tool_name (duration) green or ✗ tool_name red; verbose: 5 lines of output + ...
FileModified📝 Modified: path (green)
TokenUsageTokens: N in, N out (verbose only, dim)
TaskCompleted✓ Completed N iterations, Xs or ✗ Failed after N iterations
TaskInterrupted⚠ Interrupted after N iterations. (yellow)
MaxIterationsReached⚠ Reached max iterations limit (N). (yellow)
TokenBudgetExceeded⚠ Token budget exceeded (used, limit). (yellow)
StreamChunkWrite to buffered stdout; flush only on \n (reduces syscalls)
StreamDoneFlush + newline
CostUpdateTokens: N in / N out | Cost: $X.XXXX
TaskStarted▶ Task: id (verbose only, dim)

Duration formatting: >1s → {:.1}s, ≤1s → {N}ms.

SseBroadcaster (REST API, feature: api) — converts events to JSON and broadcasts via tokio::sync::broadcast channel:

#![allow(unused)]
fn main() {
pub struct SseBroadcaster {
    tx: broadcast::Sender<String>,  // JSON-serialized events
}
}
ProgressEventJSON type fieldAdditional fields
ToolStarted"tool_start"tool
ToolCompleted"tool_end"tool, success
StreamChunk"token"text
StreamDone"stream_end"
CostUpdate"cost_update"input_tokens, output_tokens, session_cost
Thinking"thinking"iteration
Response"response"iteration
(other)"other"— (logged at debug level)

Subscribers receive events via SseBroadcaster::subscribe() -> broadcast::Receiver<String>. Send errors (no subscribers) are silently ignored.

Execution Environments (exec_env.rs)

ExecEnvironment trait with exec(cmd, args, env), read_file(path), write_file(path, content), file_exists(path), list_dir(path). Two implementations: LocalEnvironment (tokio::process::Command) and DockerEnvironment (docker exec). Environment variables sanitized via shared BLOCKED_ENV_VARS. Docker paths validated against injection characters (\0, \n, \r, :). Docker env vars forwarded via --env flags.

Provider Toolsets (provider_tools.rs)

ToolAdjustment (prefer, demote, aliases, extras) per LLM provider. ProviderToolsets registry with with_defaults() for openai/anthropic/google. Used to optimize tool presentation per provider (e.g., OpenAI prefers shell/read_file, demotes diff_edit).

Typed Turns (turn.rs)

Turn wraps Message with TurnKind (UserInput, AgentReply, ToolCall, ToolResult, System) and iteration number. turns_to_messages() converts back to Vec<Message> for LLM calls. Enables semantic analysis of conversation history.

Event Bus (event_bus.rs)

EventBus with typed EventSubscriber for pub/sub within the agent. Decouples event producers (tool execution, LLM calls) from consumers (logging, metrics, UI updates).

Loop Detection (loop_detect.rs)

Detects repetitive agent behavior (e.g., calling the same tool with same args). Configurable threshold and window. Returns early with diagnostic message when loop detected.

Session State (session.rs)

SessionState with SessionLimits and SessionUsage tracking. SessionStateHandle for thread-safe access. Tracks token usage, iteration count, and wall-clock time against configured limits.

Steering (steering.rs)

SteeringMessage with SteeringSender/SteeringReceiver (mpsc channel). Allows external control of agent behavior mid-conversation (e.g., injecting guidance, changing strategy).

Prompt Layers (prompt_layer.rs)

PromptLayerBuilder for composing system prompts from multiple sources (base prompt, persona, user context, memory, skills). Layers are concatenated in order with configurable separators.


octos-bus — Gateway Infrastructure

Message Bus

create_bus() -> (AgentHandle, BusPublisher) linked by mpsc channels (capacity 256). AgentHandle receives InboundMessages; BusPublisher dispatches OutboundMessages.

Queue Modes (configured via gateway.queue_mode):

  • Followup (default): FIFO — process queued messages one at a time
  • Collect: Merge queued messages by session, concatenating content before processing

Channel Trait

#![allow(unused)]
fn main() {
#[async_trait]
pub trait Channel: Send + Sync {
    fn name(&self) -> &str;
    async fn start(&self, inbound_tx: mpsc::Sender<InboundMessage>) -> Result<()>;
    async fn send(&self, msg: &OutboundMessage) -> Result<()>;
    fn is_allowed(&self, sender_id: &str) -> bool;
    async fn stop(&self) -> Result<()>;
}
}

Channel Implementations

ChannelTransportFeature FlagAuthDedup
CLIstdin/stdout(always)N/AN/A
Telegramteloxide long-polltelegramBot token (env)teloxide built-in
Discordserenity gatewaydiscordBot token (env)serenity built-in
SlackSocket Mode (tokio-tungstenite)slackBot token + App tokenmessage_ts
WhatsAppWebSocket bridge (ws://localhost:3001)whatsappBaileys bridgeHashSet (10K cap, clear on overflow)
FeishuWebSocket (tokio-tungstenite)feishuApp ID + Secret → tenant token (TTL 6000s)HashSet (10K cap, clear on overflow)
EmailIMAP poll + SMTP sendemailUsername/password, rustls TLSIMAP UNSEEN flag
WeComWeCom/WeChat Work APIwecomCorp ID + Agent Secretmessage_id
TwilioTwilio SMS/MMStwilioAccount SID + Auth Tokenmessage SID

Email specifics: IMAP async-imap with rustls for inbound (poll unseen, mark \Seen). SMTP lettre for outbound (port 465=implicit TLS, other=STARTTLS). mailparse for RFC822 body extraction. Body truncated via truncate_utf8(max_body_chars).

Feishu specifics: Tenant access token with TTL cache (6000s). WebSocket gateway URL from /callback/ws/endpoint. Message type detection via header.event_type == "im.message.receive_v1". Supports oc_* (chat_id) vs ou_* (open_id) routing.

Markdown to HTML: markdown_html.rs converts Markdown to Telegram-compatible HTML for rich message formatting.

Media: download_media() helper downloads photos/voice/audio/documents to .octos/media/.

Transcription: Voice/audio auto-transcribed via GroqTranscriber before agent processing.

Message Coalescing

Splits oversized messages into channel-safe chunks:

ChannelMax Chars
Telegram4000
Discord1900
Slack3900

Break preference: paragraph (\n\n) > newline (\n) > sentence (. ) > space ( ) > hard cut.

MAX_CHUNKS = 50 (DoS limit). UTF-8 safe boundary detection via char_indices().

Session Manager

JSONL persistence at .octos/sessions/{key}.jsonl.

  • In-memory cache: LRU with disk sync on write
  • Filenames: Percent-encoded SessionKey, truncated to 183 chars with _{hash:016X} suffix on truncation to prevent collisions
  • File size limit: 10MB max (MAX_SESSION_FILE_SIZE); oversized files skipped on load
  • Crash safety: Atomic write-then-rename
  • Forking: fork() creates child session with parent_key tracking, copies last N messages

Cron Service

JSON persistence at .octos/cron.json.

Schedule types:

  • Every { seconds: u64 } — recurring interval
  • Cron { expr: String } — cron expression via cron crate
  • At { timestamp_ms: i64 } — one-shot (auto-delete after run)

CronJob fields: id (8-char hex from UUIDv7), name, enabled, schedule, payload (message + deliver flag + channel + chat_id), state (next_run_at_ms, run_count), delete_after_run.

Heartbeat Service

Periodic check of HEARTBEAT.md (default: 30 min interval). Sends content to agent if non-empty.


octos-cli — CLI & Configuration

Commands

CommandDescription
chatInteractive multi-turn chat. Readline with history. Exit: exit/quit/:q
gatewayPersistent multi-channel daemon with session management
initInitialize .octos/ with config, templates, directories
statusShow config, provider, API keys, bootstrap files
auth login/logout/statusOAuth PKCE (OpenAI), device code, paste-token
cron list/add/remove/enableCLI cron job management
channels status/loginChannel compilation status, WhatsApp bridge setup
skills list/install/removeSkill management, GitHub fetch
officeOffice/workspace management
accountAccount management
cleanRemove .redb files with dry-run support
completionsShell completion generation (bash/zsh/fish)
docsGenerate tool + provider documentation
serveREST API server (feature: api) — axum on 127.0.0.1:8080 (--host to override)

Configuration

Loaded from .octos/config.json (local) or ~/.config/octos/config.json (global). Local takes precedence.

  • ${VAR} expansion: Environment variable substitution in string values
  • Versioned config: Version field with automatic migrate_config() framework
  • Provider auto-detect (registry::detect_provider(model)): claude→anthropic, gpt/o1/o3/o4→openai, gemini→gemini, deepseek→deepseek, kimi/moonshot→moonshot, qwen→dashscope, glm→zhipu, llama/mixtral→groq. Patterns defined per-provider in registry/.

API key resolution order: Auth store (~/.octos/auth.json) → environment variable.

Auth Module

OAuth PKCE (OpenAI):

  1. Generate 64-char verifier (two UUIDv4 hex)
  2. SHA-256 challenge, base64-URL encode (no padding)
  3. TCP listener on port 1455
  4. Browser → auth.openai.com with PKCE + state
  5. Callback validates state (CSRF), exchanges code+verifier for tokens

Device Code Flow (OpenAI): POST deviceauth/usercode, poll deviceauth/token every 5s+.

Paste Token: Prompt for API key from stdin, store as auth_method: "paste_token".

AuthStore: ~/.octos/auth.json (mode 0600). {credentials: {provider: AuthCredential}}.

Config Watcher

Polls every 5 seconds. SHA-256 hash comparison of file contents.

Hot-reloadable: system_prompt, max_history (applied live).

Restart-required: provider, model, base_url, api_key_env, sandbox, mcp_servers, hooks, gateway.queue_mode, channels.

REST API (feature: api)

RouteMethodDescription
/api/chatPOSTSend message → response
/api/chat/streamGETSSE stream of ProgressEvents
/api/sessionsGETList all sessions
/api/sessions/{id}/messagesGETPaginated history (?limit=100&offset=0, max 500)
/api/statusGETVersion, model, provider, uptime
/metricsGETPrometheus text exposition format (unauthenticated)
/* (fallback)GETEmbedded web UI (static files via rust-embed)

Auth: Optional bearer token with constant-time comparison (API routes only; /metrics and static files are public). CORS: localhost:3000/8080. Max message: 1MB.

Web UI: Embedded SPA via rust-embed served as the fallback handler. Session sidebar, chat interface, SSE streaming, dark theme. Vanilla HTML/CSS/JS (no build tools).

Prometheus Metrics: octos_tool_calls_total (counter, labels: tool, success), octos_tool_call_duration_seconds (histogram, label: tool), octos_llm_tokens_total (counter, label: direction). Powered by metrics + metrics-exporter-prometheus crates.

Session Compaction (Gateway)

Triggered when message count > 40 (threshold). Keeps 10 recent messages. Summarizes older messages via LLM to <500 words. Rewrites JSONL session file.


octos-pipeline — DOT-based Pipeline Orchestration

DOT-based pipeline orchestration engine for defining and executing multi-step workflows.

  • parser.rs — DOT graph parser (parses Graphviz DOT format into pipeline definitions)
  • graph.rs — PipelineGraph with node/edge types
  • executor.rs — Async pipeline execution engine
  • handler.rs — Handler types: CodergenHandler, GateHandler, ShellHandler, NoopHandler, DynamicParallel
  • condition.rs — Conditional edge evaluation (branching logic)
  • tool.rs — RunPipelineTool integration (exposes pipeline execution as an agent tool)
  • validate.rs — Graph validation and lint diagnostics
  • human_gate.rs — Human-in-the-loop gates with HumanInputProvider trait, ChannelInputProvider (mpsc + oneshot, 5min default timeout), AutoApproveProvider. Input types: Approval, FreeText, Choice
  • fidelity.rsFidelityMode enum (Full, Truncate, Compact, Summary) for context carryover control between nodes. Parse from config strings. Safety caps: 10MB max_chars, 100K max_lines
  • manager.rsPipelineManager supervisor with SupervisionStrategy (AllOrNothing, BestEffort, RetryFailed). Retry capped at 10 with exponential backoff (100ms-5s). ManagerOutcome converts to NodeOutcome
  • thread.rsThreadRegistry for LLM session reuse across pipeline nodes. Thread stores model_id + message history. Limits: 1000 threads, 10000 messages per thread
  • server.rsPipelineServer trait with SubmitRequest (validated: 1MB DOT, 256KB input, 64 variables, safe pipeline IDs), RunStatus lifecycle (Queued → Running → Completed/Failed/Cancelled)
  • artifact.rs — Pipeline artifact storage for intermediate outputs
  • checkpoint.rs — Pipeline checkpoint/resume for crash recovery
  • events.rs — Pipeline event system for progress tracking
  • run_dir.rs — Per-run working directories with isolation
  • stylesheet.rs — Visual styling for pipeline graph rendering

Data Flows

Chat Mode

User Input → readline → Agent.process_message(input, history)
                              │
                              ├─ Build messages (system + history + memory + input)
                              ├─ trim_to_context_window() if needed
                              ├─ Call LLM via chat_stream() with tool specs
                              ├─ Execute tools if ToolUse (loop)
                              └─ Return ConversationResponse
                                    │
                              Print response, append to history

Gateway Mode

Channel → InboundMessage → MessageBus → [transcribe audio] → [load session]
                                              │
                                    Agent.process_message()
                                              │
                                        OutboundMessage
                                              │
                                   ChannelManager.dispatch()
                                              │
                                    coalesce() → Channel.send()

System messages (cron, heartbeat, spawn results) flow through the same bus with channel: "system" and metadata routing.


Feature Flags

# octos-bus
telegram = ["teloxide"]
discord  = ["serenity"]
slack    = ["tokio-tungstenite"]
whatsapp = ["tokio-tungstenite"]
feishu   = ["tokio-tungstenite"]
email    = ["async-imap", "tokio-rustls", "rustls", "webpki-roots", "lettre", "mailparse"]

# octos-agent (browser is always compiled in, no longer feature-gated)
git      = ["gix"]                  # git operations via gitoxide
ast      = ["tree-sitter"]          # code_structure.rs AST analysis
admin-bot = [...]                   # admin/ directory tools

# octos-bus (additional)
wecom    = [...]                    # WeCom/WeChat Work channel
twilio   = [...]                    # Twilio SMS/MMS channel

# octos-cli
api      = ["axum", "tower-http", "futures"]
telegram = ["octos-bus/telegram"]
discord  = ["octos-bus/discord"]
slack    = ["octos-bus/slack"]
whatsapp = ["octos-bus/whatsapp"]
feishu   = ["octos-bus/feishu"]
email    = ["octos-bus/email"]
wecom    = ["octos-bus/wecom"]
twilio   = ["octos-bus/twilio"]

File Layout

crates/
├── octos-core/src/
│   ├── lib.rs, task.rs, types.rs, error.rs, gateway.rs, message.rs, utils.rs
├── octos-llm/src/
│   ├── lib.rs, provider.rs, config.rs, types.rs, retry.rs, failover.rs, sse.rs
│   ├── embedding.rs, pricing.rs, context.rs, transcription.rs, vision.rs
│   ├── adaptive.rs, swappable.rs, router.rs, ominix.rs
│   ├── anthropic.rs, openai.rs, gemini.rs, openrouter.rs  (protocol impls)
│   └── registry/ (mod.rs + 14 provider entries: anthropic, openai, gemini,
│                   openrouter, deepseek, groq, moonshot, dashscope, minimax,
│                   zhipu, zai, nvidia, ollama, vllm)
├── octos-memory/src/
│   ├── lib.rs, episode.rs, store.rs, memory_store.rs, hybrid_search.rs
├── octos-agent/src/
│   ├── lib.rs, agent.rs, progress.rs, policy.rs, compaction.rs, sanitize.rs, hooks.rs
│   ├── sandbox.rs, mcp.rs, skills.rs, builtin_skills.rs
│   ├── bundled_app_skills.rs, bootstrap.rs, prompt_guard.rs
│   ├── plugins/ (mod.rs, loader.rs, manifest.rs, tool.rs)
│   ├── skills/ (cron, skill-store, skill-creator SKILL.md)
│   └── tools/ (mod, policy, shell, read_file, write_file, edit_file, diff_edit,
│               list_dir, glob_tool, grep_tool, web_search, web_fetch,
│               message, spawn, browser, ssrf, tool_config,
│               deep_search, site_crawl, recall_memory, save_memory,
│               send_file, take_photo, code_structure, git,
│               deep_research_pipeline, synthesize_research, research_utils,
│               admin/ (profiles, skills, sub_accounts, system,
│                       platform_skills, update))
├── octos-bus/src/
│   ├── lib.rs, bus.rs, channel.rs, session.rs, coalesce.rs, media.rs
│   ├── cli_channel.rs, telegram_channel.rs, discord_channel.rs
│   ├── slack_channel.rs, whatsapp_channel.rs, feishu_channel.rs, email_channel.rs
│   ├── wecom_channel.rs, twilio_channel.rs, markdown_html.rs
│   ├── cron_service.rs, cron_types.rs, heartbeat.rs
└── octos-cli/src/
    ├── main.rs, config.rs, config_watcher.rs, cron_tool.rs, compaction.rs
    ├── auth/ (mod.rs, store.rs, oauth.rs, token.rs)
    ├── api/ (mod.rs, router.rs, handlers.rs, sse.rs, metrics.rs, static_files.rs)
    └── commands/ (mod, chat, init, status, gateway, clean,
                   completions, cron, channels, auth, skills, docs, serve,
                   office, account)
├── octos-pipeline/src/
│   ├── lib.rs, parser.rs, graph.rs, executor.rs, handler.rs
│   ├── condition.rs, tool.rs, validate.rs

Security

Workspace-Level Safety

  • #![deny(unsafe_code)] — workspace-wide lint via [workspace.lints.rust]
  • secrecy::SecretString — all provider API keys are wrapped; prevents accidental logging/display

Authentication & Credentials

  • API keys: auth store (~/.octos/auth.json, mode 0600) checked before env vars
  • OAuth PKCE with SHA-256 challenges, state parameter (CSRF protection)
  • Constant-time byte comparison for API bearer tokens (timing attack prevention)

Execution Sandbox

  • Three backends: bwrap (Linux), sandbox-exec (macOS), Docker — SandboxMode::Auto detection
  • 18 BLOCKED_ENV_VARS shared across all sandbox backends, MCP server spawning, hooks, and browser tool: LD_PRELOAD, LD_LIBRARY_PATH, LD_AUDIT, DYLD_INSERT_LIBRARIES, DYLD_LIBRARY_PATH, DYLD_FRAMEWORK_PATH, DYLD_FALLBACK_LIBRARY_PATH, DYLD_VERSIONED_LIBRARY_PATH, NODE_OPTIONS, PYTHONSTARTUP, PYTHONPATH, PERL5OPT, RUBYOPT, RUBYLIB, JAVA_TOOL_OPTIONS, BASH_ENV, ENV, ZDOTDIR
  • Path injection prevention per backend (Docker: :, \0, \n, \r; macOS: control chars, (, ), \, ")
  • Docker: --cap-drop ALL, --security-opt no-new-privileges, --network none, blocked bind mount sources (docker.sock, /proc, /sys, /dev, /etc)

Tool Safety

  • ShellTool SafePolicy: deny rm -rf /, dd, mkfs, fork bombs, chmod -R 777 /; ask for sudo, rm -rf, git push --force, git reset --hard. Whitespace-normalized before matching. Timeout clamped to [1, 600]s. SIGTERM→grace period→SIGKILL cleanup for child processes.
  • Tool policies: allow/deny with deny-wins semantics, 8 named groups (group:fs, group:runtime, group:web, group:search, group:sessions, etc.), wildcard matching, provider-specific filtering via tools.byProvider
  • Tool argument size limit: 1MB per invocation (non-allocating estimate_json_size with escape char accounting)
  • Symlink-safe file I/O via O_NOFOLLOW on Unix (atomic kernel-level check, eliminates TOCTOU races); metadata-based symlink check fallback on Windows
  • SSRF protection in shared ssrf.rs module: DNS resolution with fail-closed behavior (blocks on DNS failure), private IP blocking (10/8, 172.16/12, 192.168/16, 169.254/16), IPv6 coverage (ULA fc00::/7, link-local fe80::/10, site-local fec0::/10, IPv4-mapped ::ffff:0:0/96, IPv4-compatible ::/96), loopback blocking. Used by web_fetch and browser.
  • Browser: URL scheme allowlist (http/https only), 10s JS execution timeout, zombie process reaping, secure tempfiles for screenshots
  • MCP: input schema validation (max depth 10, max size 64KB) prevents malicious tool definitions
  • Prompt injection guard (prompt_guard.rs): 5 threat categories (SystemOverride, RoleConfusion, ToolCallInjection, SecretExtraction, InstructionInjection), 10 detection patterns. Sanitizes threats by wrapping in [injection-blocked:...].

Data Safety

  • Tool output sanitization (sanitize.rs): strips base64 data URIs, long hex strings (64+ chars), and credential redaction with 7 regex patterns covering OpenAI (sk-...), Anthropic (sk-ant-...), AWS (AKIA...), GitHub (ghp_/gho_/ghs_/ghr_/github_pat_...), GitLab (glpat-...), Bearer tokens, and generic password/api_key assignments
  • UTF-8 safe truncation via truncate_utf8() across all tool outputs and email bodies
  • Session file collision prevention via percent-encoded filenames with hash suffix on truncation
  • Session file size limit: 10MB max prevents OOM on corrupted files
  • Atomic write-then-rename for session persistence (crash safety)
  • API server binds to 127.0.0.1 by default (not 0.0.0.0)
  • Channel access control via allowed_senders lists
  • MCP response limit: 1MB per JSON-RPC line (DoS prevention)
  • Message coalescing: MAX_CHUNKS=50 DoS limit
  • API message limit: 1MB per request

Concurrency Model

Why Rust

octos uses Rust with the tokio async runtime, which provides significant advantages over Python (OpenClaw, etc.) and Node.js (NanoCloud, etc.) agent frameworks for concurrent session handling:

True parallelism — Tokio tasks run across all CPU cores simultaneously. Python has the GIL, so even with asyncio, CPU-bound work (JSON parsing, context compaction, token counting) is single-core. Node.js is single-threaded entirely. In octos, 10 concurrent sessions doing context compaction actually execute in parallel across cores.

Memory efficiency — No garbage collector, no runtime overhead per object. Agent sessions are compact structs on the heap. A Python agent session carries interpreter overhead, GC metadata on every object, and dict-based attribute lookup. This matters with hundreds of sessions and large conversation histories in memory.

No GC pauses — Python and Node.js GC can cause latency spikes mid-response. Rust has deterministic deallocation — memory is freed exactly when the owning struct drops.

Single binary deployment — No Python/Node runtime to install, no dependency hell, predictable resource usage. The gateway is one static binary.

Tokio Tasks vs OS Threads

All concurrent session processing uses tokio tasks (green threads), not OS threads. A tokio task is a state machine on the heap (~few KB). An OS thread is ~8MB stack. Thousands of tasks multiplex across a handful of OS threads (defaults to CPU core count). Since agent sessions spend most of their time awaiting I/O (LLM API responses), they yield the thread to other tasks efficiently.

Gateway Concurrency

Inbound messages → main loop
                      │
                      ├─ tokio::spawn() per message
                      │     │
                      │     ├─ Semaphore (max_concurrent_sessions, default 10)
                      │     │     bounds total concurrent agent runs
                      │     │
                      │     └─ Per-session Mutex
                      │           serializes messages within same session
                      │
                      └─ Different sessions run concurrently
                         Same session queues sequentially
  • Cross-session: concurrent, bounded by max_concurrent_sessions semaphore (default 10)
  • Within same session: serialized via per-session mutex — prevents race conditions on conversation history
  • Per-session locks: pruned after completion (Arc strong_count == 1) to prevent unbounded HashMap growth

Tool Execution

Within a single agent iteration, all tool calls from one LLM response execute concurrently via join_all():

LLM response: [web_search, read_file, send_email]
                   │            │           │
                   └────────────┼───────────┘
                          join_all()
                   ┌────────────┼───────────┐
                   │            │           │
                 done         done        done
                          ↓
              All results appended to messages
                          ↓
                    Next LLM call

Sub-Agent Modes (spawn tool)

AspectSyncBackground
Parent blocks?YesNo (tokio::spawn())
Result deliverySame conversation turnNew inbound message via gateway
Token accountingCounted toward parent budgetIndependent
Use caseSequential pipelinesFire-and-forget long tasks

Sub-agents cannot spawn further sub-agents (spawn tool is always denied in sub-agent policy).

Multi-Tenant Dashboard

The dashboard (octos serve) runs each user profile as a separate gateway OS process:

Dashboard (octos serve)
  ├─ Profile "alice" → octos gateway --config alice.json  (deepseek, own semaphore)
  ├─ Profile "bob"   → octos gateway --config bob.json    (kimi, own semaphore)
  └─ Profile "carol" → octos gateway --config carol.json  (openai, own semaphore)

Each profile has its own LLM provider, API keys, channels, data directory, and max_concurrent_sessions semaphore. Profiles are fully isolated — no shared state between gateway processes.


Testing

1300+ tests across all crates. See TESTING.md for the full inventory and CI guide.

  • Unit: type serde round-trips, tool arg parsing, config validation, provider detection, tool policies, compaction, coalescing, BM25 scoring, L2 normalization, SSE parsing
  • Adaptive routing: Off/Hedge/Lane modes, circuit breaker, failover, scoring, metrics, provider racing (19 tests)
  • Responsiveness: baseline learning, degradation detection, recovery, threshold boundaries (8 tests)
  • Queue modes: Followup, Collect, Steer, Speculative overflow, auto-escalation/deescalation (9 tests)
  • Session persistence: JSONL storage, LRU eviction, fork, rewrite, timestamp sort, concurrent access (28 tests)
  • Integration: CLI commands, file tools, cron jobs, session forking, plugin loading
  • Security: sandbox path injection, env sanitization, SSRF blocking, symlink rejection (O_NOFOLLOW), private IP detection, dedup overflow, tool argument size limits, session file size limits, circuit breaker threshold edge cases, MCP schema validation
  • Channel: allowed_senders, message parsing, dedup logic, email address extraction

Local CI: ./scripts/ci.sh (mirrors GitHub Actions + focused subsystem tests). See TESTING.md.