vega

package module
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 24, 2026 License: MIT Imports: 20 Imported by: 0

README

Vega

Go Reference Latest Release Go Version

Fault-tolerant AI agent orchestration for Go.

Vega makes it easy to build reliable AI agent systems with Erlang-style supervision. Use the YAML DSL for rapid prototyping or the Go library for full control.

Chat

The fastest way to use Vega is the built-in chat interface. Responses stream token-by-token, and tool calls appear inline as collapsible panels with arguments, results, and execution time.

vega serve team.vega.yaml
# Open http://localhost:3001 → pick an agent → start chatting

No YAML file? vega serve starts with Mother and Hermes ready to go.

Streaming API

Chat is also available as an SSE endpoint for programmatic use:

curl -N -X POST localhost:3001/api/agents/assistant/chat/stream \
  -H 'Content-Type: application/json' \
  -d '{"message": "Search for recent Go releases"}'
event: text_delta
data: {"type":"text_delta","delta":"Let me "}

event: tool_start
data: {"type":"tool_start","tool_call_id":"tc_1","tool_name":"web_search","arguments":{"query":"Go releases 2026"}}

event: tool_end
data: {"type":"tool_end","tool_call_id":"tc_1","tool_name":"web_search","result":"Found 3 results...","duration_ms":1250}

event: text_delta
data: {"type":"text_delta","delta":"Based on the results..."}

event: done
data: {"type":"done"}
Event Fields Description
text_delta delta Incremental text content
tool_start tool_call_id, tool_name, arguments Tool execution began
tool_end tool_call_id, tool_name, result, duration_ms Tool execution finished
error error An error occurred
done Stream complete

A blocking (non-streaming) endpoint is also available at POST /api/agents/{name}/chat.

Go Library
stream, _ := proc.SendStreamRich(ctx, "Search for recent Go releases")
for event := range stream.Events() {
    switch event.Type {
    case vega.ChatEventTextDelta:
        fmt.Print(event.Delta)
    case vega.ChatEventToolStart:
        fmt.Printf("\n[calling %s]\n", event.ToolName)
    case vega.ChatEventToolEnd:
        fmt.Printf("[done in %dms]\n", event.DurationMs)
    }
}

Use SendStream for simple text-only streaming, or Send for a blocking call.


Installation

CLI Tool
# Homebrew (macOS / Linux)
brew install everydev1618/tap/vega

# Go install
go install github.com/everydev1618/govega/cmd/vega@latest

# Or download a binary from GitHub Releases
# https://github.com/everydev1618/govega/releases
Go Library
go get github.com/everydev1618/govega
Configuration

Set your Anthropic API key:

export ANTHROPIC_API_KEY=your-key-here

Quick Start

Option 1: Go Library
package main

import (
    "context"
    "fmt"
    "log"

    "github.com/everydev1618/govega"
    "github.com/everydev1618/govega/llm"
)

func main() {
    // Create LLM backend
    anthropic := llm.NewAnthropic()

    // Create orchestrator
    orch := vega.NewOrchestrator(vega.WithLLM(anthropic))

    // Define an agent
    agent := vega.Agent{
        Name:   "assistant",
        Model:  "claude-sonnet-4-20250514",
        System: vega.StaticPrompt("You are a helpful coding assistant."),
    }

    // Spawn a process
    proc, err := orch.Spawn(agent)
    if err != nil {
        log.Fatal(err)
    }

    // Send a message
    ctx := context.Background()
    response, err := proc.Send(ctx, "Write a hello world function in Go")
    if err != nil {
        proc.Fail(err)  // Mark process as failed
        log.Fatal(err)
    }

    // IMPORTANT: Always mark processes as completed when done
    proc.Complete(response)

    fmt.Println(response)

    // Cleanup
    orch.Shutdown(ctx)
}
Option 2: YAML DSL

Create assistant.vega.yaml:

name: Assistant

agents:
  helper:
    model: claude-sonnet-4-20250514
    system: You are a helpful assistant.

Run interactively:

vega repl assistant.vega.yaml
> /ask helper
> What is the capital of France?

⚠️ Important: Process Lifecycle

Every spawned process must be completed or failed. This is the most common mistake when using Vega.

// ❌ WRONG - process leaks, stays "running" forever
proc, _ := orch.Spawn(agent)
proc.SendAsync(task)
return "Started!"

// ✅ CORRECT - process properly completed
proc, _ := orch.Spawn(agent)
response, err := proc.Send(ctx, task)
if err != nil {
    proc.Fail(err)
    return err
}
proc.Complete(response)

See Best Practices for more details.


Architecture

flowchart TB
    subgraph Orchestrator["Orchestrator"]
        direction TB
        Registry["Agent Registry"]
        ProcessMgr["Process Manager"]
        Groups["Process Groups"]
        RateLimiter["Rate Limiter"]
    end

    subgraph Supervision["Supervision Tree"]
        direction TB
        Supervisor["Supervisor"]
        Strategy{{"Strategy<br/>OneForOne | OneForAll | RestForOne"}}
        Supervisor --> Strategy
    end

    subgraph Processes["Processes"]
        direction LR
        P1["Process 1"]
        P2["Process 2"]
        P3["Process 3"]
    end

    subgraph Agent["Agent (Blueprint)"]
        Model["Model"]
        System["System Prompt"]
        Tools["Tools"]
        Config["Retry / Circuit Breaker / Budget"]
    end

    subgraph LLMLoop["LLM Execution Loop"]
        direction TB
        Build["Build Messages"]
        Call["LLM Call"]
        Parse["Parse Response"]
        ToolExec["Execute Tools"]
        Build --> Call --> Parse
        Parse -->|"Tool Use"| ToolExec --> Build
        Parse -->|"Text Response"| Done["Complete"]
    end

    Agent -->|"spawn"| Orchestrator
    Orchestrator -->|"creates"| Processes
    Supervisor -->|"monitors"| Processes
    Processes -->|"run"| LLMLoop

    P1 <-->|"links"| P2
    P2 -.->|"monitors"| P3
flowchart LR
    subgraph Lifecycle["Process Lifecycle"]
        Pending["🟡 Pending"]
        Running["🟢 Running"]
        Completed["✅ Completed"]
        Failed["❌ Failed"]
        Timeout["⏱️ Timeout"]

        Pending --> Running
        Running --> Completed
        Running --> Failed
        Running --> Timeout
        Failed -->|"restart"| Pending
    end
flowchart TB
    subgraph SpawnTree["Spawn Tree"]
        Root["Orchestrator Process"]
        Child1["Worker 1<br/>depth: 1"]
        Child2["Worker 2<br/>depth: 1"]
        GrandChild["Sub-worker<br/>depth: 2"]

        Root --> Child1
        Root --> Child2
        Child1 --> GrandChild
    end

Features

Erlang-Style Supervision

Processes automatically restart on failure with configurable strategies:

proc, err := orch.Spawn(agent, vega.WithSupervision(vega.Supervision{
    Strategy:    vega.Restart,  // Restart, Stop, or Escalate
    MaxRestarts: 3,
    Window:      5 * time.Minute,
    Backoff: vega.BackoffConfig{
        Initial:    100 * time.Millisecond,
        Multiplier: 2.0,
        Max:        30 * time.Second,
    },
}))

Or in YAML:

agents:
  worker:
    model: claude-sonnet-4-20250514
    system: You process tasks reliably.
    supervision:
      strategy: restart
      max_restarts: 3
      window: 5m
Tools

Register tools for agents to use:

tools := vega.NewTools()
tools.RegisterBuiltins() // read_file, write_file, run_command

// Register custom tool
tools.Register("greet", func(name string) string {
    return "Hello, " + name + "!"
})

agent := vega.Agent{
    Name:  "greeter",
    Tools: tools,
}

Or define in YAML with full HTTP support:

tools:
  fetch_weather:
    description: Get weather for a city
    params:
      - name: city
        type: string
        required: true
    implementation:
      type: http
      method: GET
      url: "https://api.weather.com/v1/current?city={{.city}}"
      headers:
        Authorization: "Bearer {{.api_key}}"
      timeout: 10s

  create_issue:
    description: Create a GitHub issue
    params:
      - name: title
        type: string
        required: true
      - name: body
        type: string
        required: true
    implementation:
      type: http
      method: POST
      url: "https://api.github.com/repos/{{.owner}}/{{.repo}}/issues"
      headers:
        Authorization: "token {{.token}}"
        Content-Type: application/json
      body:
        title: "{{.title}}"
        body: "{{.body}}"
      timeout: 30s

  run_script:
    description: Execute a shell script
    params:
      - name: script
        type: string
        required: true
    implementation:
      type: exec
      command: "bash -c '{{.script}}'"
      timeout: 60s

Tool implementations support {{.param}} template interpolation for dynamic values.

Streaming Responses

See the Chat section at the top for SendStreamRich (structured events with tool visibility).

For simple text-only streaming:

stream, err := proc.SendStream(ctx, "Tell me a story")
if err != nil {
    log.Fatal(err)
}

for chunk := range stream.Chunks() {
    fmt.Print(chunk)
}
fmt.Println()
Async Operations
// Fire and forget
future := proc.SendAsync("Process this in background")

// Do other work...

// Wait for result
response, err := future.Await(ctx)
Parallel Execution (DSL)
steps:
  - parallel:
      - agent1:
          send: "Task 1"
          save: result1
      - agent2:
          send: "Task 2"
          save: result2
  - combiner:
      send: "Combine: {{result1}} and {{result2}}"
Rate Limiting & Circuit Breakers
orch := vega.NewOrchestrator(
    vega.WithLLM(anthropic),
    vega.WithRateLimits(map[string]vega.RateLimitConfig{
        "claude-sonnet-4-20250514": {
            RequestsPerMinute: 60,
            TokensPerMinute:   100000,
        },
    }),
)

agent := vega.Agent{
    CircuitBreaker: &vega.CircuitBreaker{
        Threshold:  5,           // Open after 5 failures
        ResetAfter: time.Minute, // Try again after 1 minute
    },
}
Intelligent Retry with Error Classification

Vega automatically classifies errors and retries appropriately:

agent := vega.Agent{
    Name: "resilient-worker",
    Retry: &vega.RetryPolicy{
        MaxAttempts: 5,
        Backoff: vega.BackoffConfig{
            Initial:    100 * time.Millisecond,
            Multiplier: 2.0,
            Max:        30 * time.Second,
            Jitter:     0.1, // ±10% randomness
            Type:       vega.BackoffExponential,
        },
        RetryOn: []vega.ErrorClass{
            vega.ErrClassRateLimit,
            vega.ErrClassOverloaded,
            vega.ErrClassTimeout,
            vega.ErrClassTemporary,
        },
    },
}

Error classes are automatically detected:

  • ErrClassRateLimit - 429 errors, "rate limit" messages
  • ErrClassOverloaded - 503 errors, capacity issues
  • ErrClassTimeout - Deadline exceeded, connection timeouts
  • ErrClassTemporary - Transient server errors (5xx)
  • ErrClassAuthentication - 401/403 errors (not retried)
  • ErrClassInvalidRequest - 400 errors (not retried)
  • ErrClassBudgetExceeded - Cost limits (not retried)
Configurable Iteration Limits

Control how many tool call loops an agent can perform:

agent := vega.Agent{
    Name:          "deep-researcher",
    MaxIterations: 100, // Default is 50
}
Conversation History

Vega provides flexible ways to manage conversation context across sessions.

Resume Conversations with WithMessages

Spawn a process with pre-existing conversation history:

// Load previous conversation (from database, file, etc.)
history := []vega.Message{
    {Role: vega.RoleUser, Content: "What's our project deadline?"},
    {Role: vega.RoleAssistant, Content: "The deadline is March 15th."},
}

// Spawn with existing history - the agent remembers the conversation
proc, err := orch.Spawn(agent, vega.WithMessages(history))
if err != nil {
    log.Fatal(err)
}

// Continue the conversation naturally
response, _ := proc.Send(ctx, "Can we extend it by a week?")
// Agent knows "it" refers to the March 15th deadline
Serialize Messages for Persistence

Save and restore conversations using JSON:

// Save conversation to database/file
messages := []vega.Message{
    {Role: vega.RoleUser, Content: "Hello"},
    {Role: vega.RoleAssistant, Content: "Hi there!"},
}
data, err := vega.MarshalMessages(messages)
// Store `data` in your database

// Later, restore the conversation
restored, err := vega.UnmarshalMessages(data)
proc, err := orch.Spawn(agent, vega.WithMessages(restored))
Token-Aware Context with TokenBudgetContext

Automatically manage context within a token budget:

// Create context with 8000 token budget (~32k chars)
ctx := vega.NewTokenBudgetContext(8000)

// Optionally load existing history
ctx.Load(previousMessages)

agent := vega.Agent{
    Name:    "assistant",
    Context: ctx, // Attach to agent
}

// As conversation grows, oldest messages are automatically trimmed
// to stay within the 8000 token budget

// Save for later
snapshot := ctx.Snapshot()
data, _ := vega.MarshalMessages(snapshot)

TokenBudgetContext features:

  • Automatic trimming of oldest messages when budget exceeded
  • Token estimation (~4 chars per token)
  • Load() to restore from persistence
  • Snapshot() to get messages for saving
  • Thread-safe for concurrent use
Context Auto-Compaction

For smarter context management with LLM-powered summarization:

// Create a sliding window context that keeps recent messages
// and summarizes older ones
ctx := vega.NewSlidingWindowContext(20) // Keep last 20 messages

agent := vega.Agent{
    Name:    "long-conversation-agent",
    Context: ctx,
}

// Check if compaction is needed
if ctx.NeedsCompaction(50000) { // 50k token threshold
    err := ctx.Compact(llm) // Uses LLM to summarize old messages
}

The SlidingWindowContext automatically:

  • Keeps a configurable number of recent messages
  • Can summarize older messages using the LLM
  • Preserves important context while reducing token usage

When to use which:

Context Manager Best For
WithMessages only Simple resume, short conversations
TokenBudgetContext Long conversations, automatic trimming, persistence
SlidingWindowContext Very long conversations needing intelligent summarization
Budget Control
agent := vega.Agent{
    Budget: &vega.Budget{
        Limit:    5.0,           // $5.00 max
        OnExceed: vega.BudgetBlock,
    },
}
Spawn Tree Tracking

Track parent-child relationships when agents spawn other agents:

// When spawning from a parent process, use WithParent to establish the relationship
childProc, err := orch.Spawn(childAgent,
    vega.WithParent(parentProc),           // Track parent-child relationship
    vega.WithSpawnReason("Process data"),  // Optional context for the spawn
)

// The child process will have:
// - ParentID: parent's process ID
// - ParentAgent: parent's agent name
// - SpawnDepth: parent's depth + 1

// Query the entire spawn tree
tree := orch.GetSpawnTree()
// Returns []*SpawnTreeNode with hierarchical structure

For tools that spawn processes, the parent process is automatically available via context:

func mySpawnTool(ctx context.Context, params map[string]any) (string, error) {
    // Get the calling process from context
    parent := vega.ProcessFromContext(ctx)

    // Spawn with parent tracking
    child, err := orch.Spawn(agent,
        vega.WithParent(parent),
        vega.WithSpawnReason(params["task"].(string)),
    )
    // ...
}

The spawn tree structure:

type SpawnTreeNode struct {
    ProcessID   string           `json:"process_id"`
    AgentName   string           `json:"agent_name"`
    Task        string           `json:"task"`
    Status      Status           `json:"status"`
    SpawnDepth  int              `json:"spawn_depth"`
    SpawnReason string           `json:"spawn_reason,omitempty"`
    StartedAt   time.Time        `json:"started_at"`
    Children    []*SpawnTreeNode `json:"children,omitempty"`
}
MCP (Model Context Protocol) Servers

Connect to MCP servers to use external tools:

import "github.com/everydev1618/govega/mcp"

tools := vega.NewTools(
    vega.WithMCPServer(mcp.ServerConfig{
        Name:    "filesystem",
        Command: "npx",
        Args:    []string{"-y", "@modelcontextprotocol/server-filesystem", "/workspace"},
    }),
    vega.WithMCPServer(mcp.ServerConfig{
        Name:    "github",
        Command: "npx",
        Args:    []string{"-y", "@modelcontextprotocol/server-github"},
        Env:     map[string]string{"GITHUB_TOKEN": os.Getenv("GITHUB_TOKEN")},
    }),
)

// Connect to all MCP servers
ctx := context.Background()
if err := tools.ConnectMCP(ctx); err != nil {
    log.Fatal(err)
}
defer tools.DisconnectMCP()

// Tools are automatically registered with prefix: servername__toolname
// e.g., "filesystem__read_file", "github__create_issue"

Or in YAML:

settings:
  mcp:
    servers:
      - name: filesystem
        transport: stdio
        command: npx
        args: ["-y", "@modelcontextprotocol/server-filesystem", "/workspace"]
      - name: github
        command: npx
        args: ["-y", "@modelcontextprotocol/server-github"]
        env:
          GITHUB_TOKEN: "${GITHUB_TOKEN}"
        timeout: 30s

agents:
  coder:
    model: claude-sonnet-4-20250514
    system: You are a coding assistant.
    tools:
      - filesystem__*   # All tools from filesystem server
      - github__create_issue
Web Dashboard & REST API

Monitor and control your agents through a browser-based dashboard:

vega serve team.vega.yaml
# Open http://localhost:3001

The dashboard provides:

  • Chat — Real-time streaming chat with inline tool call panels (see top of README)
  • Overview — Live stats, recent events, active processes
  • Process Explorer — Sortable process list with conversation history
  • Spawn Tree — Hierarchical view of parent-child process relationships
  • Event Stream — Real-time SSE feed with filtering
  • Agent Registry — Agent definitions from your .vega.yaml
  • MCP Servers — Connection status and available tools
  • Workflow Launcher — Run workflows from the browser with auto-generated input forms
  • Cost Dashboard — Per-agent and per-process cost tracking

REST API — All data is also available via JSON endpoints:

curl localhost:3001/api/processes    # List processes
curl localhost:3001/api/agents       # List agents
curl localhost:3001/api/stats        # Aggregate metrics
curl localhost:3001/api/spawn-tree   # Spawn tree
curl localhost:3001/api/workflows    # List workflows
curl localhost:3001/api/mcp/servers  # MCP server status
curl localhost:3001/api/events       # SSE event stream

# Launch a workflow
curl -X POST localhost:3001/api/workflows/review/run \
  -H 'Content-Type: application/json' \
  -d '{"task": "Write a sort function"}'

# Chat (see Streaming API section at top of README)
curl -X POST localhost:3001/api/agents/assistant/chat \
  -H 'Content-Type: application/json' \
  -d '{"message": "Hello!"}'

Flags:

  • --addr :3001 — HTTP listen address (default :3001)
  • --db ~/.vega/vega.db — SQLite database path for persistent history

Historical process data, events, and workflow runs persist across restarts via SQLite.

Built-in Meta-Agents

vega serve injects two meta-agents automatically — no YAML required.

Mother

Mother is the agent architect. Talk to her to create, update, or delete agents through conversation. She knows all available tools, skills, and MCP servers, and designs teams rather than solo agents. She can also set up recurring schedules — cron-driven triggers that send a message to any agent automatically.

You: I need an agent that researches competitors
Mother: I'll create a researcher with web tools and a lead agent to synthesize...

You: Every morning at 9am, have hermes check Hacker News and email me a summary
Mother: [calls create_schedule with cron "0 9 * * *", agent "hermes", message "..."]

Mother is always available in the sidebar or via POST /api/agents/mother/chat.

Hermes

Hermes is the cosmic orchestrator — a boy traveling the Vega universe with unlimited reach. Give him any goal and he figures out which agents to involve, routes work across the whole population, calls on Mother when new agents are needed, and synthesizes everything into a result.

You → Hermes: "Do a competitive analysis of our top 3 competitors and write it up"

Hermes → list_agents           (surveys who's available)
Hermes → send_to_agent(mother, "create a web researcher agent")
Hermes → send_to_agent(researcher, "research competitor A...")
Hermes → send_to_agent(researcher, "research competitor B...")
Hermes → send_to_agent(writer,     "write up the analysis...")
Hermes → You: polished result

Hermes has two tools:

  • list_agents — see all agents with their purpose summaries
  • send_to_agent — route a task to any agent by name, including Mother

The mythological fit is intentional: Hermes's mother in Greek mythology was Maia.

Scheduler

Trigger agents on a cron schedule — no external cron job needed. The scheduler runs inside vega serve and persists jobs to SQLite so they survive restarts.

Set up a schedule by asking Mother:

You: Every day at 9am, send a message to hermes saying "check hacker news for AI news and email me a summary"
Mother: Done! Schedule "morning-news" created: '0 9 * * *' → hermes

Or use Mother's scheduler tools directly:

  • create_schedule(name, cron, agent, message) — add a job
  • update_schedule(name, cron?, agent?, message?, enabled?) — modify a job
  • delete_schedule(name) — remove a job
  • list_schedules() — list all active jobs

Standard 5-field cron syntax: "0 9 * * *" = 9am daily, "*/30 * * * *" = every 30 minutes.

To send email results, give agents the send_email tool (requires SMTP_HOST, SMTP_USER, SMTP_PASS env vars).

Telegram

Chat with your Vega agents from Telegram — no public URL or port forwarding needed. The bot uses long polling.

Setup:

  1. Create a bot via @BotFather and copy the token
  2. Add to ~/.vega/env:
TELEGRAM_BOT_TOKEN=your-token-here
# Optional: override which agent handles messages (defaults to hermes)
# TELEGRAM_AGENT=assistant
  1. Run vega serve — you'll see telegram bot started agent=hermes in the logs
  2. Open Telegram, find your bot, and start chatting

By default every message goes to Hermes, so you can describe any goal and he'll route it across your full agent population. Each Telegram user gets their own isolated agent clone — conversation history is stored per-user and accessible via the REST API:

curl localhost:3001/api/agents/hermes:<telegram-user-id>/chat

The env file at ~/.vega/env is loaded automatically on startup — no shell export needed.

Agent Skills

Skills provide dynamic prompt injection based on message context:

import "github.com/everydev1618/govega/skills"

// Load skills from directories
loader := skills.NewLoader("./skills", "~/.vega/skills")
loader.Load(ctx)

// Wrap system prompt with skills
agent := vega.Agent{
    Name:   "assistant",
    Model:  "claude-sonnet-4-20250514",
    System: vega.NewSkillsPrompt(
        vega.StaticPrompt("You are a helpful assistant."),
        loader,
        vega.WithMaxActiveSkills(3),
    ),
}

Create skill files (skills/code-review.skill.md):

---
name: code-review
description: Expert code review guidance
tags: [code, review]
triggers:
  - type: keyword
    keywords: [review, PR, pull request, code review]
  - type: pattern
    pattern: "review (this|my) (code|changes)"
---

# Code Review Expert

When reviewing code, focus on:
1. Security vulnerabilities
2. Performance issues
3. Code clarity and maintainability
4. Test coverage

Or configure in YAML:

settings:
  skills:
    directories:
      - ./skills
      - ~/.vega/skills

agents:
  reviewer:
    model: claude-sonnet-4-20250514
    system: You are a code reviewer.
    skills:
      include: [code-review, security-*]
      exclude: [deprecated-*]
      max_active: 3

Skills are automatically matched and injected based on:

  • keyword: Message contains specific words
  • pattern: Message matches a regex pattern
  • always: Always included

CLI Commands

# Run a workflow
vega run team.vega.yaml --workflow my-workflow --task "Do something"

# Validate a file
vega validate team.vega.yaml --verbose

# Interactive REPL
vega repl team.vega.yaml

# Web dashboard & REST API
vega serve team.vega.yaml
vega serve team.vega.yaml --addr :8080 --db my-data.db

# Show help
vega help

Examples

The examples/ directory contains complete working examples:

Example Description
simple-agent.vega.yaml Basic single-agent chatbot
code-review.vega.yaml Two-agent code review workflow
dev-team.vega.yaml Full dev team (architect, frontend, backend, reviewer, PM)
tools-demo.vega.yaml Custom HTTP and exec tools
mcp-demo.vega.yaml MCP server integration
skills-demo.vega.yaml Dynamic skill injection
supervision-demo.vega.yaml Fault tolerance patterns
control-flow.vega.yaml Conditionals, loops, parallel execution

Run an example:

vega run examples/code-review.vega.yaml --workflow review --task "Write a binary search function"

DSL Reference

Settings
settings:
  default_model: claude-sonnet-4-20250514
  sandbox: ./workspace              # Restrict file operations
  budget: "$100.00"                 # Global budget limit

  mcp:                              # MCP server configuration
    servers:
      - name: filesystem
        transport: stdio            # stdio (default), http, sse
        command: npx
        args: ["-y", "@modelcontextprotocol/server-filesystem", "/workspace"]
        env:
          DEBUG: "true"
        timeout: 30s

  skills:                           # Global skill directories
    directories:
      - ./skills
      - ~/.vega/skills

  rate_limit:
    requests_per_minute: 60
    tokens_per_minute: 100000
Agents
agents:
  agent-name:
    model: claude-sonnet-4-20250514    # Required
    system: |                           # Required
      Your system prompt here.
    temperature: 0.7                    # Optional (0.0-1.0)
    tools:                              # Optional
      - read_file
      - write_file
      - filesystem__*                   # MCP tools (server__pattern)
    budget: "$5.00"                     # Optional
    supervision:                        # Optional
      strategy: restart
      max_restarts: 3
    skills:                             # Optional skill configuration
      directories:                      # Agent-specific skill dirs
        - ./agent-skills
      include: [coding-*, review]       # Only these skills
      exclude: [deprecated-*]           # Never these skills
      max_active: 3                     # Max skills to inject
Workflows
workflows:
  workflow-name:
    description: What this workflow does
    inputs:
      task:
        type: string
        required: true
        default: "default value"
    steps:
      - agent-name:
          send: "Message with {{task}} interpolation"
          save: variable_name
          timeout: 30s
    output: "{{variable_name}}"
Control Flow
# Conditionals
- if: "{{approved}}"
  then:
    - agent: ...
  else:
    - agent: ...

# Loops
- for: item in items
  steps:
    - agent:
        send: "Process {{item}}"

# Parallel
- parallel:
    - agent1: ...
    - agent2: ...

# Try/Catch
- try:
    - risky-agent: ...
  catch:
    - fallback-agent: ...
Expression Filters
{{name | upper}}           # UPPERCASE
{{name | lower}}           # lowercase
{{name | trim}}            # Remove whitespace
{{name | default:anon}}    # Default value
{{text | truncate:100}}    # Limit length
{{items | join:, }}        # Join array

Documentation

Document Description
Quick Start Get running in 5 minutes
Best Practices Common pitfalls and how to avoid them
DSL Reference Complete YAML syntax
Go Library Spec Full API reference
Tools Built-in and custom tools
MCP Servers Model Context Protocol integration
Skills Dynamic prompt injection
Supervision Fault tolerance patterns
Architecture Internal design
Web Dashboard vega serve dashboard & REST API

Structured Logging

Vega uses Go's slog for structured logging. Enable debug logging to see detailed information:

import "log/slog"

// Enable debug logging
slog.SetLogLoggerLevel(slog.LevelDebug)

Log events include:

  • Process spawn/complete/fail with agent names and IDs
  • LLM call success/failure with latency, tokens, and error classification
  • Retry attempts with backoff delays
  • MCP server stderr output
Default Configuration

Vega provides sensible defaults that can be overridden:

// Default constants (defined in agent.go)
const (
    DefaultMaxIterations        = 50        // Tool call loop limit
    DefaultMaxContextTokens     = 100000    // Context window size
    DefaultLLMTimeout           = 5 * time.Minute
    DefaultStreamBufferSize     = 100
    DefaultSupervisorPollInterval = 100 * time.Millisecond
)

// Anthropic defaults (defined in llm/anthropic.go)
const (
    DefaultAnthropicTimeout = 5 * time.Minute
    DefaultAnthropicModel   = "claude-sonnet-4-20250514"
    DefaultAnthropicBaseURL = "https://api.anthropic.com"
)

Why Vega?

Erlang-Inspired Process Model

Most agent frameworks treat LLM calls as stateless functions. Vega models agents as processes with a full lifecycle (pending → running → completed/failed/timeout), managed by an Orchestrator. The key concepts ported from Erlang/OTP:

  • Process Linking — Bidirectional links between processes. If one dies, the linked one dies too (unless it traps exits). This is how you build robust agent topologies where dependent agents fail together.
  • Monitors — Unidirectional observation: watch another process and get notified when it exits, without dying yourself.
  • TrapExit — A process can opt to receive exit signals as messages instead of dying. This is exactly how supervisors survive their children crashing.
  • Supervision Trees — Strategies (Restart, Stop, Escalate, RestartAll) with configurable backoff (exponential, linear, constant with jitter) and windowed restart counting.

Just as Erlang/OTP made it possible to build telecom-grade systems by making processes and supervision cheap and composable, Vega applies the same philosophy to AI agent systems that need to run in production without falling over.

Agent = Blueprint, Process = Running Instance

The separation of Agent (a struct defining model, tools, system prompt, budget) from Process (a running instance with state, messages, metrics) means you can spawn multiple processes from the same agent definition, each with its own conversation history and lifecycle. This is the actor model applied to LLMs.

Intelligent Error Classification

Errors are automatically classified into 7 categories (RateLimit, Overloaded, Timeout, Temporary, InvalidRequest, Authentication, BudgetExceeded) with smart retry decisions. Rate limits and overloaded errors get retried with backoff; auth errors and bad requests don't. Production-grade reliability is built into the framework, not bolted on.

Team Delegation with Shared Blackboard

Agents can delegate to team members via a delegate tool with context-aware message enrichment — recent conversation history is forwarded so delegates understand the full picture. A shared blackboard (bb_read, bb_write, bb_list) gives team members structured shared state, not just text passed back and forth.

Dual Interface: Go Library + YAML DSL

The Go library gives full programmatic control, while the YAML DSL lets non-programmers define multi-agent workflows with control flow (if/then/else, for-each, parallel, try/catch), variable interpolation, and expression filters. The DSL interpreter is a proper workflow engine — not a thin config wrapper.

Everything is Observable

Built-in web dashboard with SSE streaming, process explorer, spawn tree visualization, cost tracking, and a chat UI with inline tool call panels. Processes emit structured events (text_delta, tool_start, tool_end), and the ChatStream API makes it straightforward to build rich streaming interfaces.

Feature Comparison
Feature Raw SDK Other Frameworks Vega
Supervision trees Manual ✅ Built-in
Process linking & monitors ✅ Erlang-style
Automatic retries Manual Partial ✅ Smart (error-classified)
Rate limiting Manual Manual ✅ Built-in
Cost tracking Manual Partial ✅ Built-in
MCP server support Manual Partial ✅ Built-in
Web dashboard ✅ Built-in
Team delegation & blackboard ✅ Built-in
Dynamic skills ✅ Built-in
Conversation history Manual Manual ✅ WithMessages, persistence helpers
Context compaction ✅ Auto-summarization
Structured logging Manual Partial ✅ slog integration
Error classification ✅ 7 error classes
Non-programmer friendly ✅ YAML DSL
Parallel execution Complex Complex parallel:
Config-driven Limited ✅ Full YAML
Agent creation via chat ✅ Mother (built-in)
Cross-agent orchestration Manual ✅ Hermes (built-in)
Telegram channel ✅ Built-in (long polling)
Scheduled triggers Manual ✅ Built-in cron scheduler
Email delivery Manual send_email built-in

Project Structure

vega/
├── agent.go           # Agent definition, context managers, defaults
├── context.go         # TokenBudgetContext, message serialization
├── process.go         # Running process with lifecycle, retry logic, spawn tree tracking
├── orchestrator.go    # Process management, spawn options, groups
├── supervision.go     # Fault tolerance, health monitoring
├── tools.go           # Tool registration, HTTP/exec executors
├── mcp_tools.go       # MCP server integration
├── skills.go          # Skills prompt wrapper
├── llm.go             # LLM interface, cost calculation
├── errors.go          # Error types, classification, retry decisions
├── llm/
│   └── anthropic.go   # Anthropic backend with streaming
├── mcp/               # Model Context Protocol client
│   ├── types.go       # MCP types and JSON-RPC
│   ├── client.go      # MCP client implementation
│   ├── transport_stdio.go  # Subprocess transport with logging
│   └── transport_http.go   # HTTP/SSE transport
├── skills/            # Agent skills system
│   ├── types.go       # Skill and trigger types
│   ├── parser.go      # SKILL.md file parser
│   ├── loader.go      # Directory scanner
│   └── matcher.go     # Keyword/pattern matching
├── container/         # Docker container management
│   ├── manager.go     # Container lifecycle
│   └── project.go     # Project isolation
├── serve/             # Web dashboard & REST API
│   ├── server.go      # HTTP server, routes, CORS
│   ├── handlers_api.go    # REST endpoint handlers
│   ├── handlers_sse.go    # Server-Sent Events stream
│   ├── broker.go      # Event pub/sub for SSE
│   ├── store.go       # Persistence interface
│   ├── store_sqlite.go    # SQLite implementation
│   ├── scheduler.go   # Built-in cron scheduler (robfig/cron)
│   ├── types.go       # API request/response types
│   ├── telegram.go    # Telegram bot via long polling
│   ├── embed.go       # Embedded SPA frontend
│   └── frontend/      # React + Vite + Tailwind dashboard
├── dsl/
│   ├── types.go           # AST types
│   ├── parser.go          # YAML parser
│   ├── interpreter.go     # Workflow execution
│   ├── mother.go          # Mother meta-agent (creates/manages agents)
│   ├── scheduler_tools.go # SchedulerBackend interface + Mother scheduler tools
│   └── hermes.go          # Hermes meta-agent (orchestrates across all agents)
├── cmd/vega/
│   ├── main.go        # CLI entry point
│   └── serve.go       # serve command
├── examples/          # Example .vega.yaml files
└── docs/              # Documentation

Contributing

Contributions welcome! Please read the code, write tests, and submit PRs.

# Run tests
go test ./...

# Build CLI
go build -o vega ./cmd/vega

License

MIT

Documentation

Overview

Package vega provides fault-tolerant AI agent orchestration with Erlang-style supervision.

Vega is a Go library for building reliable AI agent systems. It provides:

  • Agent definitions with configurable LLM backends
  • Process management with lifecycle control
  • Erlang-style supervision trees for fault tolerance
  • Tool registration with sandboxing
  • Rate limiting and circuit breakers
  • Budget management for cost control
  • A YAML-based DSL for non-programmers

Quick Start

Create an agent and spawn a process:

// Create an orchestrator with Anthropic backend
llm := llm.NewAnthropic()
orch := vega.NewOrchestrator(vega.WithLLM(llm))

// Define an agent
agent := vega.Agent{
    Name:   "assistant",
    Model:  "claude-sonnet-4-20250514",
    System: vega.StaticPrompt("You are a helpful assistant."),
}

// Spawn a process
proc, err := orch.Spawn(agent)
if err != nil {
    log.Fatal(err)
}

// Send a message
response, err := proc.Send(ctx, "Hello!")
if err != nil {
    log.Fatal(err)
}
fmt.Println(response)

Supervision

Add fault tolerance with supervision strategies:

proc, err := orch.Spawn(agent, vega.WithSupervision(vega.Supervision{
    Strategy:    vega.Restart,
    MaxRestarts: 3,
    Window:      5 * time.Minute,
}))

Supervision strategies:

  • Restart: Automatically restart the process on failure
  • Stop: Stop the process permanently on failure
  • Escalate: Pass the failure to a supervisor agent

Tools

Register tools for agents to use:

tools := tools.NewTools()
tools.RegisterBuiltins() // read_file, write_file, run_command

tools.Register("greet", func(name string) string {
    return "Hello, " + name + "!"
})

agent := vega.Agent{
    Name:  "greeter",
    Tools: tools,
}

DSL

For non-programmers, use the YAML-based DSL in the dsl package:

parser := dsl.NewParser()
doc, err := parser.ParseFile("team.vega.yaml")

interp, err := dsl.NewInterpreter(doc)
result, err := interp.Execute(ctx, "workflow-name", inputs)

See the examples/ directory for complete DSL examples.

Architecture

The main components are:

  • Agent: Blueprint defining model, system prompt, tools, and configuration
  • Process: A running agent instance with state and lifecycle
  • Orchestrator: Manages processes, enforces limits, coordinates shutdown
  • Supervision: Fault tolerance configuration with restart strategies
  • Tools: Tool registration with schema generation and sandboxing
  • LLM: Interface for language model backends (Anthropic provided)

Thread Safety

All exported types are safe for concurrent use. The Orchestrator and Process types use internal synchronization to protect shared state.

Index

Constants

View Source
const (
	// DefaultMaxIterations is the default maximum tool call loop iterations
	DefaultMaxIterations = 50

	// DefaultMaxContextTokens is the default context window size
	DefaultMaxContextTokens = 100000

	// DefaultLLMTimeout is the default timeout for LLM API calls
	DefaultLLMTimeout = 5 * time.Minute

	// DefaultStreamBufferSize is the default buffer size for streaming responses
	DefaultStreamBufferSize = 100

	// DefaultSupervisorPollInterval is the default interval for supervisor health checks
	DefaultSupervisorPollInterval = 100 * time.Millisecond
)

Default configuration values

Variables

View Source
var (
	// ErrProcessNotRunning is returned when trying to send to a stopped process
	ErrProcessNotRunning = errors.New("process is not running")

	// ErrNotCompleted is returned when accessing Future result before completion
	ErrNotCompleted = errors.New("operation not completed")

	// ErrMaxIterationsExceeded is returned when tool loop exceeds safety limit
	ErrMaxIterationsExceeded = errors.New("maximum iterations exceeded")

	// ErrBudgetExceeded is returned when cost would exceed budget
	ErrBudgetExceeded = errors.New("budget exceeded")

	// ErrRateLimited is returned when rate limit is hit
	ErrRateLimited = errors.New("rate limited")

	// ErrCircuitOpen is returned when circuit breaker is open
	ErrCircuitOpen = errors.New("circuit breaker is open")

	// ErrSandboxViolation is returned when file access escapes sandbox
	ErrSandboxViolation = errors.New("sandbox violation: path escapes allowed directory")

	// ErrMaxProcessesReached is returned when orchestrator is at capacity
	ErrMaxProcessesReached = errors.New("maximum number of processes reached")

	// ErrProcessNotFound is returned when process ID is not found
	ErrProcessNotFound = errors.New("process not found")

	// ErrAgentNotFound is returned when agent name is not found
	ErrAgentNotFound = errors.New("agent not found")

	// ErrWorkflowNotFound is returned when workflow name is not found
	ErrWorkflowNotFound = errors.New("workflow not found")

	// ErrInvalidInput is returned for invalid workflow inputs
	ErrInvalidInput = errors.New("invalid input")

	// ErrTimeout is returned when an operation times out
	ErrTimeout = errors.New("operation timed out")

	// ErrLinkedProcessDied is returned when a process dies due to a linked process dying
	ErrLinkedProcessDied = errors.New("linked process died")

	// ErrNameTaken is returned when trying to register a name that's already in use
	ErrNameTaken = errors.New("name already registered")

	// ErrGroupNotFound is returned when a process group doesn't exist
	ErrGroupNotFound = errors.New("process group not found")
)

Standard errors

Functions

func ContextWithEventSink added in v0.2.0

func ContextWithEventSink(ctx context.Context, ch chan<- ChatEvent) context.Context

ContextWithEventSink returns a new context with a ChatEvent sink attached.

func ContextWithProcess

func ContextWithProcess(ctx context.Context, p *Process) context.Context

ContextWithProcess returns a new context with the process attached.

func DefaultDBPath added in v0.2.0

func DefaultDBPath() string

DefaultDBPath returns the default SQLite database path (~/.vega/vega.db).

func EnsureHome

func EnsureHome() error

EnsureHome creates the Vega home and workspace directories if they don't exist.

func EventSinkFromContext added in v0.2.0

func EventSinkFromContext(ctx context.Context) chan<- ChatEvent

EventSinkFromContext retrieves the ChatEvent sink from the context, if present.

func Home

func Home() string

Home returns the Vega home directory. It defaults to ~/.vega but can be overridden with the VEGA_HOME environment variable.

func IsRetryable

func IsRetryable(class ErrorClass) bool

IsRetryable returns true if the error class should typically be retried.

func PublishEvent

func PublishEvent(ctx context.Context, event Event, config *CallbackConfig) error

PublishEvent sends an event to the orchestrator. Used by workers to report their status.

func ShouldRetry

func ShouldRetry(err error, policy *RetryPolicy, attempt int) bool

ShouldRetry checks if an error should be retried based on the retry policy.

func WorkspacePath

func WorkspacePath() string

WorkspacePath returns the default shared workspace directory.

Types

type APIError

type APIError struct {
	StatusCode int
	Message    string
	Err        error
}

APIError represents an error from an API call with status information.

func (*APIError) Error

func (e *APIError) Error() string

func (*APIError) Unwrap

func (e *APIError) Unwrap() error

type Agent

type Agent struct {
	// Name is a human-readable identifier for this agent
	Name string

	// Model is the LLM model ID (e.g., "claude-sonnet-4-20250514")
	Model string

	// FallbackModel is used when all retries with the primary model are exhausted (optional)
	FallbackModel string

	// System is the system prompt (static or dynamic)
	System SystemPrompt

	// Tools available to this agent
	Tools *tools.Tools

	// Memory provides persistent storage (optional)
	Memory memory.Memory

	// Context manages conversation history (optional)
	Context memory.ContextManager

	// Budget sets cost limits (optional)
	Budget *Budget

	// Retry configures retry behavior for transient failures (optional)
	Retry *RetryPolicy

	// RateLimit throttles requests (optional)
	RateLimit *RateLimit

	// CircuitBreaker isolates failures (optional)
	CircuitBreaker *CircuitBreaker

	// LLM is the backend to use (optional, uses default if not set)
	LLM llm.LLM

	// Temperature for generation (0.0-1.0, optional)
	Temperature *float64

	// MaxTokens limits response length (optional)
	MaxTokens int

	// MaxIterations limits tool call loop iterations (default: DefaultMaxIterations)
	MaxIterations int
}

Agent defines an AI agent. It's a blueprint, not a running process. Spawn an Agent with an Orchestrator to get a running Process.

type Alert

type Alert struct {
	ProcessID string
	AgentName string
	Type      AlertType
	Message   string
	Timestamp time.Time
}

Alert represents a health alert.

type AlertType

type AlertType string

AlertType categorizes alerts.

const (
	AlertStaleProgress  AlertType = "stale_progress"
	AlertHighCost       AlertType = "high_cost"
	AlertErrorLoop      AlertType = "error_loop"
	AlertTimeoutWarning AlertType = "timeout_warning"
	AlertHighIterations AlertType = "high_iterations"
)

type BackoffConfig

type BackoffConfig struct {
	// Initial delay before first retry
	Initial time.Duration

	// Multiplier for exponential backoff
	Multiplier float64

	// Max delay between retries
	Max time.Duration

	// Jitter adds randomness (0.0-1.0)
	Jitter float64

	// Type of backoff (linear, exponential, constant)
	Type BackoffType
}

BackoffConfig configures retry delays.

type BackoffType

type BackoffType int

BackoffType specifies the backoff algorithm.

const (
	BackoffExponential BackoffType = iota
	BackoffLinear
	BackoffConstant
)

type Budget

type Budget struct {
	// Limit is the maximum cost in USD
	Limit float64

	// OnExceed determines behavior when budget is exceeded
	OnExceed BudgetAction
}

Budget configures cost limits for an agent.

type BudgetAction

type BudgetAction int

BudgetAction determines what happens when a budget is exceeded.

const (
	// BudgetBlock prevents the request from executing
	BudgetBlock BudgetAction = iota

	// BudgetWarn logs a warning but allows the request
	BudgetWarn

	// BudgetAllow silently allows the request
	BudgetAllow
)

type CallMetrics

type CallMetrics struct {
	InputTokens  int
	OutputTokens int
	CostUSD      float64
	LatencyMs    int64
	ToolCalls    []string
	Retries      int
}

CallMetrics tracks a single LLM call.

type CallbackConfig

type CallbackConfig struct {
	// Dir is the directory for file-based callbacks (local mode)
	Dir string

	// URL is the endpoint for HTTP-based callbacks (distributed mode)
	URL string
	// contains filtered or unexported fields
}

CallbackConfig holds callback configuration for an orchestrator.

func NewCallbackConfig

func NewCallbackConfig(dir, url string) *CallbackConfig

NewCallbackConfig creates a callback config for use by workers. Pass this to workers so they can report events back.

type ChatEvent

type ChatEvent struct {
	Type        ChatEventType  `json:"type"`
	Delta       string         `json:"delta,omitempty"`
	ToolCallID  string         `json:"tool_call_id,omitempty"`
	ToolName    string         `json:"tool_name,omitempty"`
	Arguments   map[string]any `json:"arguments,omitempty"`
	Result      string         `json:"result,omitempty"`
	DurationMs  int64          `json:"duration_ms,omitempty"`
	Error       string         `json:"error,omitempty"`
	NestedAgent string         `json:"nested_agent,omitempty"`
}

ChatEvent is a structured event emitted during a streaming chat response. It carries text deltas alongside tool call lifecycle events so that callers can render tool activity inline with the response text.

type ChatEventType

type ChatEventType string

ChatEventType categorizes chat stream events.

const (
	ChatEventTextDelta ChatEventType = "text_delta"
	ChatEventToolStart ChatEventType = "tool_start"
	ChatEventToolEnd   ChatEventType = "tool_end"
	ChatEventError     ChatEventType = "error"
	ChatEventDone      ChatEventType = "done"
)

type ChatStream

type ChatStream struct {
	// contains filtered or unexported fields
}

ChatStream represents a streaming chat response with structured events.

func (*ChatStream) Err

func (cs *ChatStream) Err() error

Err returns any error that occurred during streaming.

func (*ChatStream) Events

func (cs *ChatStream) Events() <-chan ChatEvent

Events returns the channel of chat events.

func (*ChatStream) Response

func (cs *ChatStream) Response() string

Response returns the complete text response after the stream is done.

type ChildInfo

type ChildInfo struct {
	Name    string
	ID      string
	Status  Status
	Restart ChildRestart
	Agent   string
}

ChildInfo contains information about a supervised child.

type ChildRestart

type ChildRestart int

ChildRestart determines when a child should be restarted.

const (
	// Permanent children are always restarted
	Permanent ChildRestart = iota
	// Transient children are restarted only on abnormal exit
	Transient
	// Temporary children are never restarted
	Temporary
)

func (ChildRestart) String

func (r ChildRestart) String() string

String returns the restart type name.

type ChildSpec

type ChildSpec struct {
	// Name is the registered name for this child (optional)
	Name string
	// Agent is the agent definition to spawn
	Agent Agent
	// Restart determines when to restart this child
	Restart ChildRestart
	// Task is the initial task for the child
	Task string
	// SpawnOpts are additional options for spawning
	SpawnOpts []SpawnOption
}

ChildSpec defines how to start and supervise a child process.

type CircuitBreaker

type CircuitBreaker struct {
	// Threshold is failures before opening the circuit
	Threshold int

	// ResetAfter is time before trying again (half-open)
	ResetAfter time.Duration

	// HalfOpenMax is requests allowed in half-open state
	HalfOpenMax int

	// OnOpen is called when circuit opens
	OnOpen func()

	// OnClose is called when circuit closes
	OnClose func()
}

CircuitBreaker isolates failures to prevent cascading.

type CombinedPrompt

type CombinedPrompt struct {
	// contains filtered or unexported fields
}

CombinedPrompt combines multiple SystemPrompts into one.

func NewCombinedPrompt

func NewCombinedPrompt(prompts ...SystemPrompt) *CombinedPrompt

NewCombinedPrompt creates a prompt that combines multiple prompts.

func (*CombinedPrompt) Prompt

func (c *CombinedPrompt) Prompt() string

Prompt returns the combined prompt from all sources.

type DynamicPrompt

type DynamicPrompt func() string

DynamicPrompt is a function that generates a system prompt. It's called each turn, allowing the prompt to include current state.

func (DynamicPrompt) Prompt

func (d DynamicPrompt) Prompt() string

Prompt calls the function to generate the prompt.

type ErrorClass

type ErrorClass int

ErrorClass categorizes errors for retry decisions.

const (
	ErrClassRateLimit ErrorClass = iota
	ErrClassOverloaded
	ErrClassTimeout
	ErrClassTemporary
	ErrClassInvalidRequest
	ErrClassAuthentication
	ErrClassBudgetExceeded
)

func ClassifyError

func ClassifyError(err error) ErrorClass

ClassifyError determines the ErrorClass for an error. This enables intelligent retry decisions based on error type.

type Event

type Event struct {
	Type      EventType         `json:"type"`
	ProcessID string            `json:"process_id"`
	AgentName string            `json:"agent_name"`
	Timestamp time.Time         `json:"timestamp"`
	Data      map[string]string `json:"data,omitempty"`

	// For completion events
	Result string `json:"result,omitempty"`

	// For failure events
	Error string `json:"error,omitempty"`

	// For progress events
	Progress  float64 `json:"progress,omitempty"`
	Message   string  `json:"message,omitempty"`
	Iteration int     `json:"iteration,omitempty"`
}

Event represents a worker lifecycle event.

type EventPoller

type EventPoller struct {
	// contains filtered or unexported fields
}

EventPoller polls a directory for event files.

func (*EventPoller) Start

func (p *EventPoller) Start() <-chan Event

Start begins polling for events.

func (*EventPoller) Stop

func (p *EventPoller) Stop()

Stop stops the poller.

type EventType

type EventType string

EventType identifies the kind of event.

const (
	EventStarted   EventType = "started"
	EventProgress  EventType = "progress"
	EventCompleted EventType = "completed"
	EventFailed    EventType = "failed"
	EventHeartbeat EventType = "heartbeat"
)

type ExitReason

type ExitReason string

ExitReason describes why a process exited.

const (
	// ExitNormal means the process completed successfully
	ExitNormal ExitReason = "normal"
	// ExitError means the process failed with an error
	ExitError ExitReason = "error"
	// ExitKilled means the process was explicitly killed
	ExitKilled ExitReason = "killed"
	// ExitLinked means the process died because a linked process died
	ExitLinked ExitReason = "linked"
)

type ExitSignal

type ExitSignal struct {
	// ProcessID is the ID of the process that exited
	ProcessID string
	// AgentName is the name of the agent that was running
	AgentName string
	// Reason explains why the process exited
	Reason ExitReason
	// Error is set if Reason is ExitError
	Error error
	// Result is set if Reason is ExitNormal
	Result string
	// Timestamp is when the exit occurred
	Timestamp time.Time
}

ExitSignal is sent to linked/monitoring processes when a process exits. When trapExit is true, these are delivered via the ExitSignals channel. When trapExit is false, linked process deaths cause this process to die.

type Future

type Future struct {
	// contains filtered or unexported fields
}

Future represents an asynchronous operation result.

func (*Future) Await

func (f *Future) Await(ctx context.Context) (string, error)

Await waits for the future to complete and returns the result.

func (*Future) Cancel

func (f *Future) Cancel()

Cancel cancels the future.

func (*Future) Done

func (f *Future) Done() bool

Done returns true if the future has completed.

func (*Future) Result

func (f *Future) Result() (string, error)

Result returns the result if completed, or error if not.

type GroupMember

type GroupMember struct {
	ID     string
	Name   string // Registered name, if any
	Agent  string
	Status Status
}

GroupMember contains information about a group member.

type HealthConfig

type HealthConfig struct {
	// CheckInterval is how often to check health
	CheckInterval time.Duration

	// StaleProgressMinutes warns if no progress
	StaleProgressMinutes int

	// MaxIterationsWarning warns if iterations exceed this
	MaxIterationsWarning int

	// ErrorLoopCount is consecutive errors before alert
	ErrorLoopCount int

	// CostAlertUSD alerts when cost exceeds this
	CostAlertUSD float64
}

HealthConfig configures health monitoring.

type HealthMonitor

type HealthMonitor struct {
	// contains filtered or unexported fields
}

HealthMonitor monitors process health.

func NewHealthMonitor

func NewHealthMonitor(config HealthConfig) *HealthMonitor

NewHealthMonitor creates a new health monitor.

func (*HealthMonitor) Alerts

func (h *HealthMonitor) Alerts() <-chan Alert

Alerts returns the channel of health alerts.

func (*HealthMonitor) Start

func (h *HealthMonitor) Start(getProcesses func() []*Process)

Start begins health monitoring.

func (*HealthMonitor) Stop

func (h *HealthMonitor) Stop()

Stop stops health monitoring.

type JSONPersistence

type JSONPersistence struct {
	// contains filtered or unexported fields
}

JSONPersistence saves state to a JSON file.

func NewJSONPersistence

func NewJSONPersistence(path string) *JSONPersistence

NewJSONPersistence creates a new JSON file persistence.

func (*JSONPersistence) Load

func (p *JSONPersistence) Load() ([]ProcessState, error)

Load reads state from the file.

func (*JSONPersistence) Save

func (p *JSONPersistence) Save(states []ProcessState) error

Save writes state to the file.

type LinkedProcessError

type LinkedProcessError struct {
	LinkedID      string
	OriginalError error
}

LinkedProcessError is the error set when a process dies due to a linked process dying.

func (*LinkedProcessError) Error

func (e *LinkedProcessError) Error() string

func (*LinkedProcessError) Unwrap

func (e *LinkedProcessError) Unwrap() error

type MonitorRef

type MonitorRef struct {
	// contains filtered or unexported fields
}

MonitorRef is a reference to an active monitor, used for demonitoring.

type Orchestrator

type Orchestrator struct {
	// contains filtered or unexported fields
}

Orchestrator manages multiple processes.

func NewOrchestrator

func NewOrchestrator(opts ...OrchestratorOption) *Orchestrator

NewOrchestrator creates a new Orchestrator.

func (*Orchestrator) BroadcastToGroup

func (o *Orchestrator) BroadcastToGroup(ctx context.Context, groupName, message string) (map[string]error, error)

BroadcastToGroup sends a message to all members of a group.

func (*Orchestrator) CallbackDir

func (o *Orchestrator) CallbackDir() string

CallbackDir returns the callback directory if configured.

func (*Orchestrator) CallbackURL

func (o *Orchestrator) CallbackURL() string

CallbackURL returns the callback URL if configured.

func (*Orchestrator) DeleteGroup

func (o *Orchestrator) DeleteGroup(name string) error

DeleteGroup removes an empty group. Returns error if the group has members.

func (*Orchestrator) Get

func (o *Orchestrator) Get(id string) *Process

Get returns a process by ID.

func (*Orchestrator) GetAgent

func (o *Orchestrator) GetAgent(name string) (Agent, bool)

GetAgent returns a registered agent by name.

func (*Orchestrator) GetByName

func (o *Orchestrator) GetByName(name string) *Process

GetByName returns a process by its registered name. Returns nil if no process is registered with that name.

func (*Orchestrator) GetContainerManager

func (o *Orchestrator) GetContainerManager() *container.Manager

GetContainerManager returns the container manager, if configured.

func (*Orchestrator) GetGroup

func (o *Orchestrator) GetGroup(name string) (*ProcessGroup, bool)

GetGroup returns a process group by name.

func (*Orchestrator) GetOrCreateGroup

func (o *Orchestrator) GetOrCreateGroup(name string) *ProcessGroup

GetOrCreateGroup returns a group, creating it if necessary.

func (*Orchestrator) GetProjectRegistry

func (o *Orchestrator) GetProjectRegistry() *container.ProjectRegistry

GetProjectRegistry returns the project registry, if configured.

func (*Orchestrator) GetSpawnTree

func (o *Orchestrator) GetSpawnTree() []*SpawnTreeNode

GetSpawnTree returns the hierarchical spawn tree of all processes. Root processes (those with no parent) are returned as top-level nodes.

func (*Orchestrator) GroupMembers

func (o *Orchestrator) GroupMembers(groupName string) ([]*Process, error)

GroupMembers returns members of a named group.

func (*Orchestrator) HandleEventCallback

func (o *Orchestrator) HandleEventCallback() http.HandlerFunc

HandleEventCallback returns an http.HandlerFunc for receiving HTTP callbacks. Mount this at your callback URL endpoint.

Example:

http.HandleFunc("/events", orch.HandleEventCallback())

func (*Orchestrator) JoinGroup

func (o *Orchestrator) JoinGroup(groupName string, p *Process)

JoinGroup adds a process to a named group. Creates the group if it doesn't exist.

func (*Orchestrator) Kill

func (o *Orchestrator) Kill(id string) error

Kill terminates a process.

func (*Orchestrator) LeaveAllGroups

func (o *Orchestrator) LeaveAllGroups(p *Process)

LeaveAllGroups removes a process from all groups. This is called automatically when a process exits.

func (*Orchestrator) LeaveGroup

func (o *Orchestrator) LeaveGroup(groupName string, p *Process) error

LeaveGroup removes a process from a named group.

func (*Orchestrator) List

func (o *Orchestrator) List() []*Process

List returns all processes.

func (*Orchestrator) ListGroups

func (o *Orchestrator) ListGroups() []string

ListGroups returns the names of all groups.

func (*Orchestrator) NewSupervisor

func (o *Orchestrator) NewSupervisor(spec SupervisorSpec) *Supervisor

NewSupervisor creates a new supervisor with the given spec.

func (*Orchestrator) OnHealthAlert

func (o *Orchestrator) OnHealthAlert(fn func(Alert))

OnHealthAlert registers a callback for health alerts.

func (*Orchestrator) OnProcessComplete

func (o *Orchestrator) OnProcessComplete(fn func(*Process, string))

OnProcessComplete registers a callback for when a process completes successfully. The callback receives the process and its final result.

func (*Orchestrator) OnProcessFailed

func (o *Orchestrator) OnProcessFailed(fn func(*Process, error))

OnProcessFailed registers a callback for when a process fails. The callback receives the process and the error.

func (*Orchestrator) OnProcessStarted

func (o *Orchestrator) OnProcessStarted(fn func(*Process))

OnProcessStarted registers a callback for when a process starts.

func (*Orchestrator) Register

func (o *Orchestrator) Register(name string, p *Process) error

Register associates a name with a process. Returns error if name is already taken. The process will be automatically unregistered when it exits.

func (*Orchestrator) RegisterAgent

func (o *Orchestrator) RegisterAgent(agent Agent)

RegisterAgent registers an agent definition for later respawning. This is required for automatic restart to work.

func (*Orchestrator) Shutdown

func (o *Orchestrator) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down all processes.

func (*Orchestrator) Spawn

func (o *Orchestrator) Spawn(agent Agent, opts ...SpawnOption) (*Process, error)

Spawn creates and starts a new process from an agent.

func (*Orchestrator) SpawnSupervised

func (o *Orchestrator) SpawnSupervised(agent Agent, restart ChildRestart, opts ...SpawnOption) (*Process, error)

SpawnSupervised spawns a process with automatic restart on failure. The agent must be registered with RegisterAgent for restart to work.

func (*Orchestrator) Unregister

func (o *Orchestrator) Unregister(name string)

Unregister removes a name association.

type OrchestratorOption

type OrchestratorOption func(*Orchestrator)

OrchestratorOption configures an Orchestrator.

func WithCallbackDir

func WithCallbackDir(dir string) OrchestratorOption

WithCallbackDir configures file-based callbacks for local workers. Events are written as JSON files to the specified directory. The orchestrator polls this directory for events from workers.

Example:

orch := vega.NewOrchestrator(
    vega.WithCallbackDir("~/.vega/events"),
)

func WithCallbackURL

func WithCallbackURL(url string) OrchestratorOption

WithCallbackURL configures HTTP-based callbacks for distributed workers. Workers POST events to this URL. The orchestrator must expose this endpoint.

Example:

orch := vega.NewOrchestrator(
    vega.WithCallbackURL("http://orchestrator:3001/events"),
)

func WithContainerManager

func WithContainerManager(cm *container.Manager, baseDir string) OrchestratorOption

WithContainerManager enables container-based project isolation. If baseDir is provided, a ProjectRegistry will also be created.

func WithHealthCheck

func WithHealthCheck(config HealthConfig) OrchestratorOption

WithHealthCheck enables health monitoring.

func WithLLM

func WithLLM(l llm.LLM) OrchestratorOption

WithLLM sets the default LLM backend.

func WithMaxProcesses

func WithMaxProcesses(n int) OrchestratorOption

WithMaxProcesses sets the maximum number of concurrent processes.

func WithPersistence

func WithPersistence(p Persistence) OrchestratorOption

WithPersistence enables process state persistence.

func WithRateLimits

func WithRateLimits(limits map[string]RateLimitConfig) OrchestratorOption

WithRateLimits configures per-model rate limiting.

func WithRecovery

func WithRecovery(enabled bool) OrchestratorOption

WithRecovery enables process recovery on startup.

type Persistence

type Persistence interface {
	Save(states []ProcessState) error
	Load() ([]ProcessState, error)
}

Persistence interface for saving process state.

type Process

type Process struct {
	// ID is the unique identifier for this process
	ID string

	// Agent is the agent definition this process is running
	Agent *Agent

	// Task describes what this process is working on
	Task string

	// WorkDir is the isolated workspace directory
	WorkDir string

	// Project is the container project name for isolated execution
	Project string

	// StartedAt is when the process was spawned
	StartedAt time.Time

	// Supervision configures fault tolerance
	Supervision *Supervision

	// Spawn tree tracking
	ParentID    string   // ID of spawning process (empty if root)
	ParentAgent string   // Agent name of parent
	ChildIDs    []string // Child process IDs

	SpawnDepth  int    // Depth in tree (0 = root)
	SpawnReason string // Task/context for spawn
	// contains filtered or unexported fields
}

Process is a running Agent with state and lifecycle.

func ProcessFromContext

func ProcessFromContext(ctx context.Context) *Process

ProcessFromContext retrieves the process from the context, if present.

func (*Process) Complete

func (p *Process) Complete(result string)

Complete marks the process as successfully completed with a result. This triggers OnProcessComplete callbacks and notifies linked/monitoring processes. Normal completion does NOT cause linked processes to die.

func (*Process) Demonitor

func (p *Process) Demonitor(ref MonitorRef)

Demonitor stops monitoring a process. The MonitorRef must be one returned by a previous Monitor call.

func (*Process) ExitSignals

func (p *Process) ExitSignals() <-chan ExitSignal

ExitSignals returns the channel for receiving exit signals. Only receives signals when trapExit is true, or for monitored processes. Returns nil if no exit signal channel has been created.

func (*Process) Fail

func (p *Process) Fail(err error)

Fail marks the process as failed with an error. This triggers OnProcessFailed callbacks and notifies linked/monitoring processes. Failed processes cause linked processes to die too (unless they trap exits).

func (*Process) Groups

func (p *Process) Groups() []string

Groups returns the names of all groups this process belongs to.

func (*Process) HydrateMessages added in v0.2.0

func (p *Process) HydrateMessages(msgs []llm.Message)

HydrateMessages loads historical messages into a process that has no conversation history yet (e.g. after a restart). This is a no-op if the process already has messages.

func (p *Process) Link(other *Process)

Link creates a bidirectional link between this process and another. If either process dies, the other will also die (unless trapExit is set). Linking is idempotent - linking to an already-linked process is a no-op.

func (p *Process) Links() []string

Links returns the IDs of all linked processes.

func (*Process) Messages

func (p *Process) Messages() []llm.Message

Messages returns a copy of the conversation history.

func (*Process) Metrics

func (p *Process) Metrics() ProcessMetrics

Metrics returns the current process metrics.

func (*Process) Monitor

func (p *Process) Monitor(other *Process) MonitorRef

Monitor starts monitoring another process. When the monitored process exits, this process receives an ExitSignal on its ExitSignals channel (does not cause death, unlike Link). Returns a MonitorRef that can be used to stop monitoring.

func (*Process) Name

func (p *Process) Name() string

Name returns the registered name of the process, or empty string if not named.

func (*Process) Result

func (p *Process) Result() string

Result returns the final result if the process completed.

func (*Process) Send

func (p *Process) Send(ctx context.Context, message string) (string, error)

Send sends a message and waits for a response.

func (*Process) SendAsync

func (p *Process) SendAsync(message string) *Future

SendAsync sends a message and returns a Future.

func (*Process) SendStream

func (p *Process) SendStream(ctx context.Context, message string) (*Stream, error)

SendStream sends a message and returns a streaming response.

func (*Process) SendStreamRich

func (p *Process) SendStreamRich(ctx context.Context, message string) (*ChatStream, error)

SendStreamRich sends a message and returns a ChatStream with structured events (text deltas, tool start/end) instead of raw text chunks.

func (*Process) SetExtraSystem

func (p *Process) SetExtraSystem(content string)

SetExtraSystem sets additional system prompt content that is appended after the main system prompt. Use this to inject per-process context (e.g. user memory) without modifying the agent's shared System prompt.

func (*Process) SetTrapExit

func (p *Process) SetTrapExit(trap bool)

SetTrapExit enables or disables exit trapping. When trapExit is true, linked process deaths deliver ExitSignals instead of killing this process. This is how supervisors survive their children dying.

func (*Process) Status

func (p *Process) Status() Status

Status returns the current process status.

func (*Process) Stop

func (p *Process) Stop()

Stop terminates the process. This is equivalent to killing the process - linked processes will be notified.

func (*Process) TrapExit

func (p *Process) TrapExit() bool

TrapExit returns whether exit trapping is enabled.

func (p *Process) Unlink(other *Process)

Unlink removes the bidirectional link between this process and another. Unlinking is idempotent - unlinking from a non-linked process is a no-op.

type ProcessError

type ProcessError struct {
	ProcessID string
	AgentName string
	Err       error
}

ProcessError wraps errors with process context.

func (*ProcessError) Error

func (e *ProcessError) Error() string

func (*ProcessError) Unwrap

func (e *ProcessError) Unwrap() error

type ProcessEvent

type ProcessEvent struct {
	Type      ProcessEventType
	Process   *Process
	Result    string // For complete events
	Error     error  // For failed events
	Timestamp time.Time
}

ProcessEvent represents a process lifecycle event.

type ProcessEventType

type ProcessEventType int

ProcessEventType is the type of lifecycle event.

const (
	ProcessStarted ProcessEventType = iota
	ProcessCompleted
	ProcessFailed
)

type ProcessGroup

type ProcessGroup struct {
	// contains filtered or unexported fields
}

ProcessGroup enables multi-agent collaboration by grouping related processes. Processes can join multiple groups and groups support broadcast operations.

func NewGroup

func NewGroup(name string) *ProcessGroup

NewGroup creates a new process group. Groups are typically accessed via the orchestrator's Join/Leave methods.

func (*ProcessGroup) BBDelete

func (g *ProcessGroup) BBDelete(key string)

BBDelete removes a key from the group's shared blackboard.

func (*ProcessGroup) BBGet

func (g *ProcessGroup) BBGet(key string) (any, bool)

BBGet reads a value from the group's shared blackboard.

func (*ProcessGroup) BBKeys

func (g *ProcessGroup) BBKeys() []string

BBKeys returns all keys on the group's shared blackboard.

func (*ProcessGroup) BBSet

func (g *ProcessGroup) BBSet(key string, value any)

BBSet writes a key/value pair to the group's shared blackboard.

func (*ProcessGroup) BBSnapshot

func (g *ProcessGroup) BBSnapshot() map[string]any

BBSnapshot returns a shallow copy of the entire blackboard.

func (*ProcessGroup) Broadcast

func (g *ProcessGroup) Broadcast(ctx context.Context, message string) map[string]error

Broadcast sends a message to all group members. Returns a map of process ID to result/error.

func (*ProcessGroup) Count

func (g *ProcessGroup) Count() int

Count returns the number of members in the group.

func (*ProcessGroup) Has

func (g *ProcessGroup) Has(p *Process) bool

Has checks if a process is a member of this group.

func (*ProcessGroup) Join

func (g *ProcessGroup) Join(p *Process) bool

Join adds a process to this group. Returns true if the process was added, false if already a member.

func (*ProcessGroup) Leave

func (g *ProcessGroup) Leave(p *Process) bool

Leave removes a process from this group. Returns true if the process was removed, false if not a member.

func (*ProcessGroup) MemberInfo

func (g *ProcessGroup) MemberInfo() []GroupMember

MemberInfo returns information about all members.

func (*ProcessGroup) Members

func (g *ProcessGroup) Members() []*Process

Members returns all processes in this group.

func (*ProcessGroup) Name

func (g *ProcessGroup) Name() string

Name returns the group name.

func (*ProcessGroup) OnJoin

func (g *ProcessGroup) OnJoin(fn func(*Process))

OnJoin registers a callback for when processes join.

func (*ProcessGroup) OnLeave

func (g *ProcessGroup) OnLeave(fn func(*Process))

OnLeave registers a callback for when processes leave.

type ProcessMetrics

type ProcessMetrics struct {
	Iterations   int
	InputTokens  int
	OutputTokens int
	CostUSD      float64
	StartedAt    time.Time
	CompletedAt  time.Time
	LastActiveAt time.Time
	ToolCalls    int
	Errors       int
}

ProcessMetrics tracks process usage.

type ProcessState

type ProcessState struct {
	ID        string         `json:"id"`
	AgentName string         `json:"agent_name"`
	Task      string         `json:"task"`
	WorkDir   string         `json:"work_dir"`
	Status    Status         `json:"status"`
	StartedAt time.Time      `json:"started_at"`
	Metrics   ProcessMetrics `json:"metrics"`
}

ProcessState is the persisted state of a process.

type RateLimit

type RateLimit struct {
	// RequestsPerMinute limits request rate
	RequestsPerMinute int

	// TokensPerMinute limits token throughput
	TokensPerMinute int
}

RateLimit configures request throttling.

type RateLimitConfig

type RateLimitConfig struct {
	RequestsPerMinute int
	TokensPerMinute   int
	Strategy          RateLimitStrategy
}

RateLimitConfig configures rate limiting for a model.

type RateLimitStrategy

type RateLimitStrategy int

RateLimitStrategy determines rate limit behavior.

const (
	RateLimitQueue RateLimitStrategy = iota
	RateLimitReject
	RateLimitBackpressure
)

type RetryPolicy

type RetryPolicy struct {
	// MaxAttempts is the maximum number of retry attempts
	MaxAttempts int

	// Backoff configures delay between retries
	Backoff BackoffConfig

	// RetryOn specifies which error classes to retry
	RetryOn []ErrorClass
}

RetryPolicy configures retry behavior for transient failures.

type SendResult

type SendResult struct {
	Response string
	Error    error
	Metrics  CallMetrics
}

SendResult is the result of a Send operation.

type SkillMatch

type SkillMatch = skills.SkillMatch

SkillMatch is a type alias for skills.SkillMatch, keeping it in the public API.

type SkillSummary

type SkillSummary struct {
	Name        string
	Description string
	Tags        []string
}

SkillSummary provides a brief summary of a skill.

type SkillsConfig

type SkillsConfig struct {
	// Directories to load skills from.
	Directories []string

	// Include filters skills by name pattern.
	Include []string

	// Exclude filters out skills by name pattern.
	Exclude []string

	// MaxActive is the maximum number of skills to inject.
	MaxActive int
}

SkillsConfig configures skills for an agent.

type SkillsPrompt

type SkillsPrompt struct {
	// contains filtered or unexported fields
}

SkillsPrompt wraps a base SystemPrompt and dynamically injects relevant skills.

func NewSkillsPrompt

func NewSkillsPrompt(base SystemPrompt, loader *skills.Loader, opts ...SkillsPromptOption) *SkillsPrompt

NewSkillsPrompt creates a new SkillsPrompt that wraps a base prompt.

func SkillsPromptFromConfig

func SkillsPromptFromConfig(base SystemPrompt, config SkillsConfig) (*SkillsPrompt, error)

SkillsPromptFromConfig creates a SkillsPrompt from configuration.

func (*SkillsPrompt) AvailableSkills

func (s *SkillsPrompt) AvailableSkills() []string

AvailableSkills returns a list of all available skills.

func (*SkillsPrompt) GetMatchedSkills

func (s *SkillsPrompt) GetMatchedSkills() []skills.SkillMatch

GetMatchedSkills returns the skills that would be matched for the current context.

func (*SkillsPrompt) ListSkillSummaries

func (s *SkillsPrompt) ListSkillSummaries() []SkillSummary

ListSkillSummaries returns summaries of all available skills.

func (*SkillsPrompt) Loader

func (s *SkillsPrompt) Loader() *skills.Loader

Loader returns the underlying skills loader.

func (*SkillsPrompt) Prompt

func (s *SkillsPrompt) Prompt() string

Prompt generates the system prompt with injected skills.

func (*SkillsPrompt) SetContext

func (s *SkillsPrompt) SetContext(message string)

SetContext sets the context message for skill matching. This should be called with the user's message before Prompt() is called.

type SkillsPromptOption

type SkillsPromptOption func(*SkillsPrompt)

SkillsPromptOption configures a SkillsPrompt.

func WithMaxActiveSkills

func WithMaxActiveSkills(n int) SkillsPromptOption

WithMaxActiveSkills sets the maximum number of skills to inject.

type SpawnOption

type SpawnOption func(*Process)

SpawnOption configures a spawned process.

func WithMaxIterations

func WithMaxIterations(n int) SpawnOption

WithMaxIterations sets the maximum iteration count.

func WithMessages

func WithMessages(messages []llm.Message) SpawnOption

WithMessages initializes the process with existing conversation history. This is useful for resuming conversations or providing context from previous interactions.

func WithParent

func WithParent(parent *Process) SpawnOption

WithParent sets the parent process for spawn tree tracking. This establishes the parent-child relationship for visualization.

func WithProcessContext

func WithProcessContext(ctx context.Context) SpawnOption

WithProcessContext sets a parent context.

func WithProject

func WithProject(name string) SpawnOption

WithProject sets the container project for isolated execution.

func WithSpawnReason

func WithSpawnReason(reason string) SpawnOption

WithSpawnReason sets the reason/task for spawning this process. This provides context for why the process was created.

func WithSupervision

func WithSupervision(s Supervision) SpawnOption

WithSupervision sets the supervision configuration.

func WithTask

func WithTask(task string) SpawnOption

WithTask sets the task description.

func WithTimeout

func WithTimeout(d time.Duration) SpawnOption

WithTimeout sets a timeout for the process.

func WithWorkDir

func WithWorkDir(dir string) SpawnOption

WithWorkDir sets the working directory.

type SpawnTreeNode

type SpawnTreeNode struct {
	ProcessID   string           `json:"process_id"`
	AgentName   string           `json:"agent_name"`
	Task        string           `json:"task"`
	Status      Status           `json:"status"`
	SpawnDepth  int              `json:"spawn_depth"`
	SpawnReason string           `json:"spawn_reason,omitempty"`
	StartedAt   time.Time        `json:"started_at"`
	Children    []*SpawnTreeNode `json:"children,omitempty"`
}

SpawnTreeNode represents a node in the process spawn tree.

type StaticPrompt

type StaticPrompt string

StaticPrompt is a fixed system prompt string.

func (StaticPrompt) Prompt

func (s StaticPrompt) Prompt() string

Prompt returns the static prompt string.

type Status

type Status string

Status represents the process lifecycle state.

const (
	StatusPending   Status = "pending"
	StatusRunning   Status = "running"
	StatusCompleted Status = "completed"
	StatusFailed    Status = "failed"
	StatusTimeout   Status = "timeout"
)

type Strategy

type Strategy int

Strategy determines restart behavior.

const (
	// Restart restarts the failed process
	Restart Strategy = iota

	// Stop lets the process stay dead
	Stop

	// Escalate propagates failure to parent
	Escalate

	// RestartAll restarts all sibling processes (for interdependent processes)
	RestartAll
)

func (Strategy) String

func (s Strategy) String() string

String returns the strategy name.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream represents a streaming response.

func (*Stream) Chunks

func (s *Stream) Chunks() <-chan string

Chunks returns the channel of response chunks.

func (*Stream) Err

func (s *Stream) Err() error

Err returns any error that occurred during streaming.

func (*Stream) Response

func (s *Stream) Response() string

Response returns the complete response after streaming is done.

type Supervision

type Supervision struct {
	// Strategy determines what happens when a process fails
	Strategy Strategy

	// MaxRestarts is the maximum restart count within Window (-1 for unlimited)
	MaxRestarts int

	// Window is the time window for counting restarts
	Window time.Duration

	// Backoff configures delay between restarts
	Backoff BackoffConfig

	// OnFailure is called when the process fails
	OnFailure func(p *Process, err error)

	// OnRestart is called before restarting
	OnRestart func(p *Process, attempt int)

	// OnGiveUp is called when max restarts exceeded
	OnGiveUp func(p *Process, err error)
	// contains filtered or unexported fields
}

Supervision configures fault tolerance for a process.

type Supervisor

type Supervisor struct {
	// contains filtered or unexported fields
}

Supervisor manages a group of child processes with automatic restart.

func (*Supervisor) Children

func (s *Supervisor) Children() []*Process

Children returns the current supervised processes.

func (*Supervisor) CountChildren

func (s *Supervisor) CountChildren() (total, running, failed int)

CountChildren returns the number of children (total, running, failed).

func (*Supervisor) DeleteChild

func (s *Supervisor) DeleteChild(name string) error

DeleteChild removes a child from the supervisor entirely. The child is stopped if running and will not be restarted.

func (*Supervisor) GetChild

func (s *Supervisor) GetChild(name string) *Process

GetChild returns the process for a specific child by name.

func (*Supervisor) RestartChild

func (s *Supervisor) RestartChild(name string) error

RestartChild forces a restart of a specific child by name.

func (*Supervisor) Start

func (s *Supervisor) Start() error

Start spawns all children and begins supervision.

func (*Supervisor) StartChild

func (s *Supervisor) StartChild(spec ChildSpec) (*Process, error)

StartChild dynamically adds and starts a new child to the supervisor. Returns the new process or an error if the child couldn't be started.

func (*Supervisor) Stop

func (s *Supervisor) Stop()

Stop stops the supervisor and all its children.

func (*Supervisor) TerminateChild

func (s *Supervisor) TerminateChild(name string) error

TerminateChild stops a specific child by name. The child will be restarted according to its restart policy unless DeleteChild is called.

func (*Supervisor) WhichChildren

func (s *Supervisor) WhichChildren() []ChildInfo

WhichChildren returns information about all current children.

type SupervisorSpec

type SupervisorSpec struct {
	// Strategy determines how failures affect siblings
	Strategy SupervisorStrategy
	// MaxRestarts is the maximum restarts within Window (0 = unlimited)
	MaxRestarts int
	// Window is the time window for counting restarts
	Window time.Duration
	// Children are the child specifications
	Children []ChildSpec
	// Backoff configures delay between restarts
	Backoff BackoffConfig
}

SupervisorSpec defines a supervision tree configuration.

type SupervisorStrategy

type SupervisorStrategy int

SupervisorStrategy determines how failures affect siblings.

const (
	// OneForOne restarts only the failed child
	OneForOne SupervisorStrategy = iota
	// OneForAll restarts all children when one fails
	OneForAll
	// RestForOne restarts the failed child and all children started after it
	RestForOne
)

func (SupervisorStrategy) String

func (s SupervisorStrategy) String() string

String returns the strategy name.

type SystemPrompt

type SystemPrompt interface {
	Prompt() string
}

SystemPrompt provides the system prompt for an agent. It can be static (StaticPrompt) or dynamic (DynamicPrompt).

type ValidationError

type ValidationError struct {
	Field   string
	Message string
	Line    int
	Column  int
}

ValidationError provides detailed validation failure information.

func (*ValidationError) Error

func (e *ValidationError) Error() string

Directories

Path Synopsis
cmd
vega command
Package main provides the Vega CLI.
Package main provides the Vega CLI.
Package dsl provides a YAML-based domain-specific language for defining AI agent teams and workflows without writing Go code.
Package dsl provides a YAML-based domain-specific language for defining AI agent teams and workflows without writing Go code.
internal
container
Package container provides Docker container management for project isolation.
Package container provides Docker container management for project isolation.
skills
Package skills provides skill loading and matching for agents.
Package skills provides skill loading and matching for agents.
Package llm provides LLM backend implementations.
Package llm provides LLM backend implementations.
Package mcp provides a client for the Model Context Protocol.
Package mcp provides a client for the Model Context Protocol.
Package serve provides an HTTP server with a web dashboard and REST API for monitoring and controlling Vega agent orchestration.
Package serve provides an HTTP server with a web dashboard and REST API for monitoring and controlling Vega agent orchestration.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL