worker

package
v0.1.147 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 7, 2026 License: AGPL-3.0, AGPL-3.0-or-later Imports: 51 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultCachePath = "/var/lib/clip/cache"
	DefaultWorkPath  = "/var/lib/clip/work"
	DefaultMountPath = "/var/lib/clip/mnt"
)

Default paths for CLIP storage directories

View Source
const (
	DefaultBetweenTurnsTimeout = 60 * time.Second
)

Variables

This section is empty.

Functions

This section is empty.

Types

type AgentExecutionRunner added in v0.1.60

type AgentExecutionRunner interface {
	Name() string
	BuildEntrypoint(task types.RunExecution, env map[string]string) []string
}

AgentExecutionRunner builds the process entrypoint for an agent task.

type AirAnalyzer added in v0.1.110

type AirAnalyzer struct {
	// contains filtered or unexported fields
}

AirAnalyzer implements OutputAnalyzer for the air runner's JSONL event format. air emits structured events (tool_call, tool_result, output) — we only need to match tool_result events that follow write-capable tools.

func NewAirAnalyzer added in v0.1.110

func NewAirAnalyzer() *AirAnalyzer

func (*AirAnalyzer) PrepareInput added in v0.1.110

func (a *AirAnalyzer) PrepareInput(payload map[string]any) (toolName, toolInput, toolResult string, ok bool)

func (*AirAnalyzer) ShouldAnalyze added in v0.1.110

func (a *AirAnalyzer) ShouldAnalyze(payload map[string]any) bool

type AirRunner added in v0.1.110

type AirRunner struct {
	// contains filtered or unexported fields
}

func NewAirRunner added in v0.1.110

func NewAirRunner(opts AirRunnerOptions) *AirRunner

func (*AirRunner) BuildEntrypoint added in v0.1.110

func (r *AirRunner) BuildEntrypoint(task types.RunExecution, env map[string]string) []string

func (*AirRunner) BuildTurnArgs added in v0.1.110

func (r *AirRunner) BuildTurnArgs(prompt string, env map[string]string, mode TurnArgMode) []string

func (*AirRunner) ClassifierEnv added in v0.1.110

func (r *AirRunner) ClassifierEnv() map[string]string

func (*AirRunner) ExtractResponseText added in v0.1.110

func (r *AirRunner) ExtractResponseText(raw []byte, limit int) string

ExtractResponseText implements ResponseExtractor for air's JSONL output. It scans for the last "response" event and returns its message.

func (*AirRunner) Name added in v0.1.110

func (r *AirRunner) Name() string

func (*AirRunner) OutputAnalyzer added in v0.1.110

func (r *AirRunner) OutputAnalyzer() OutputAnalyzer

func (*AirRunner) ParseTurnOutput added in v0.1.110

func (r *AirRunner) ParseTurnOutput(output []byte) (TurnParseResult, error)

ParseTurnOutput extracts turn state, assistant response text, and any structured artifacts from air's JSON trace. air emits JSONL events (with an "event" field) to stderr and a final JSON trace summary (without an "event" field) to stdout; in a PTY they're interleaved. We skip any line that has an "event" key and only consider the trace summary.

type AirRunnerOptions added in v0.1.110

type AirRunnerOptions struct {
	AnthropicAPIKey string
	CerebrasAPIKey  string
	KernelAPIKey    string
	S2Key           string
	S2Basin         string
}

type AnalyzerProvider added in v0.1.110

type AnalyzerProvider interface {
	AgentExecutionRunner
	OutputAnalyzer() OutputAnalyzer
}

AnalyzerProvider exposes the structured-output analyzer that matches a runner's stdout format.

type AnalyzerWriter added in v0.1.96

type AnalyzerWriter struct {
	// contains filtered or unexported fields
}

AnalyzerWriter is an io.Writer that inspects agent stdout lines, runs qualifying tool completions through the BAML ExtractOutputs classifier, and creates TaskOutput records via the gateway gRPC.

It rate-limits to one in-flight BAML call per task and queues additional candidates so we don't overwhelm the classifier.

func (*AnalyzerWriter) Wait added in v0.1.103

func (w *AnalyzerWriter) Wait()

func (*AnalyzerWriter) Write added in v0.1.96

func (w *AnalyzerWriter) Write(p []byte) (int, error)

type CLIPImageManager

type CLIPImageManager struct {
	// contains filtered or unexported fields
}

CLIPImageManager implements ImageManager using CLIP for lazy-loading images from OCI registries. It manages FUSE mounts with reference counting for efficient image reuse.

func (*CLIPImageManager) Close

func (m *CLIPImageManager) Close() error

Close unmounts all FUSE filesystems and releases resources.

func (*CLIPImageManager) PrepareRootfs

func (m *CLIPImageManager) PrepareRootfs(ctx context.Context, imageRef string) (string, func(), error)

PrepareRootfs converts an OCI image to CLIP format and mounts it as a FUSE filesystem. If the image is already mounted, it increments the reference count and returns the existing mount.

type ClassifierEnvProvider added in v0.1.110

type ClassifierEnvProvider interface {
	AgentExecutionRunner
	ClassifierEnv() map[string]string
}

ClassifierEnvProvider exposes the environment needed for worker-side BAML classifier calls associated with a runner.

type ClaudeCodeAnalyzer added in v0.1.96

type ClaudeCodeAnalyzer struct {
	// contains filtered or unexported fields
}

ClaudeCodeAnalyzer implements OutputAnalyzer for the Claude Code stream-json format. It tracks tool_use blocks (from "assistant" messages) and matches them with tool_result blocks (from "user" messages) to decide which completions are worth classifying via the BAML ExtractOutputs function.

func NewClaudeCodeAnalyzer added in v0.1.96

func NewClaudeCodeAnalyzer() *ClaudeCodeAnalyzer

func (*ClaudeCodeAnalyzer) PrepareInput added in v0.1.96

func (a *ClaudeCodeAnalyzer) PrepareInput(payload map[string]any) (toolName, toolInput, toolResult string, ok bool)

func (*ClaudeCodeAnalyzer) ShouldAnalyze added in v0.1.96

func (a *ClaudeCodeAnalyzer) ShouldAnalyze(payload map[string]any) bool

type ClaudeCodeRunner added in v0.1.60

type ClaudeCodeRunner struct {
	// contains filtered or unexported fields
}

func NewClaudeCodeRunner added in v0.1.60

func NewClaudeCodeRunner(opts ClaudeCodeRunnerOptions) *ClaudeCodeRunner

func (*ClaudeCodeRunner) BuildEntrypoint added in v0.1.60

func (r *ClaudeCodeRunner) BuildEntrypoint(task types.RunExecution, env map[string]string) []string

func (*ClaudeCodeRunner) BuildTurnArgs added in v0.1.66

func (r *ClaudeCodeRunner) BuildTurnArgs(prompt string, env map[string]string, mode TurnArgMode) []string

BuildTurnArgs returns the argv for a single interactive turn. Each turn runs claude --print as a separate process in the sandbox; the Go worker manages the loop between turns.

func (*ClaudeCodeRunner) CheckHeartbeat added in v0.1.88

func (r *ClaudeCodeRunner) CheckHeartbeat(heartbeatPath string) bool

func (*ClaudeCodeRunner) ClassifierEnv added in v0.1.110

func (r *ClaudeCodeRunner) ClassifierEnv() map[string]string

func (*ClaudeCodeRunner) Name added in v0.1.60

func (r *ClaudeCodeRunner) Name() string

func (*ClaudeCodeRunner) OutputAnalyzer added in v0.1.110

func (r *ClaudeCodeRunner) OutputAnalyzer() OutputAnalyzer

func (*ClaudeCodeRunner) ReadLastMessage added in v0.1.96

func (r *ClaudeCodeRunner) ReadLastMessage(markerPath string) string

func (*ClaudeCodeRunner) SetupHeartbeat added in v0.1.88

func (r *ClaudeCodeRunner) SetupHeartbeat(mountSource string, env map[string]string) (string, error)

func (*ClaudeCodeRunner) SetupNeedsInput added in v0.1.96

func (r *ClaudeCodeRunner) SetupNeedsInput(mountSource string, env map[string]string) (string, error)

type ClaudeCodeRunnerOptions added in v0.1.60

type ClaudeCodeRunnerOptions struct {
	AnthropicAPIKey string
	KernelAPIKey    string
}

type ClaudeStreamUsageParser added in v0.1.110

type ClaudeStreamUsageParser struct {
	// contains filtered or unexported fields
}

ClaudeStreamUsageParser parses Claude stream-json output and captures the latest usage snapshot emitted by the process.

func NewClaudeStreamUsageParser added in v0.1.110

func NewClaudeStreamUsageParser() *ClaudeStreamUsageParser

func (*ClaudeStreamUsageParser) Snapshot added in v0.1.110

func (p *ClaudeStreamUsageParser) Snapshot() *types.LLMUsage

func (*ClaudeStreamUsageParser) Write added in v0.1.110

func (p *ClaudeStreamUsageParser) Write(chunk []byte) (int, error)

type Config added in v0.1.23

type Config struct {
	// Paths
	BundleDir   string
	StateDir    string
	MountDir    string
	WorkerMount string
	CLIBinary   string

	// Identity
	WorkerID string

	// Gateway
	GatewayAddr   string
	AuthToken     string
	GatewayClient *gatewayclient.GatewayClient

	// Features
	EnableFilesystem  bool
	EnableNetwork     bool
	UseHostResolvConf bool

	// Runtime
	RuntimeType   string
	RuntimeConfig runtime.Config
	ImageConfig   types.ImageConfig

	// Streaming
	S2Token string
	S2Basin string

	// API keys
	AnthropicAPIKey string
	KernelAPIKey    string
	CerebrasAPIKey  string
}

Config for creating a SandboxManager.

type ConsoleWriter

type ConsoleWriter struct {
	// contains filtered or unexported fields
}

ConsoleWriter writes task output to the worker's console with context.

func NewConsoleWriter

func NewConsoleWriter(taskID, stream string) *ConsoleWriter

NewConsoleWriter creates a writer that logs to the worker console.

func (*ConsoleWriter) Write

func (w *ConsoleWriter) Write(p []byte) (int, error)

type HeartbeatRunner added in v0.1.88

type HeartbeatRunner interface {
	AgentExecutionRunner
	// SetupHeartbeat writes hook configuration to the VFS so the runner
	// touches a heartbeat file on each tool use / stop. mountSource is
	// the host-side VFS FUSE mount path. env is the task env used to
	// derive CLAUDE_CONFIG_DIR. Returns the host-side heartbeat path.
	SetupHeartbeat(mountSource string, env map[string]string) (string, error)
	// CheckHeartbeat returns true if the heartbeat file at the given
	// host-side path was recently modified.
	CheckHeartbeat(heartbeatPath string) bool
}

HeartbeatRunner extends AgentExecutionRunner with liveness tracking. Runners install hooks inside the sandbox that touch a heartbeat file on each lifecycle event (tool use, stop). The worker also touches the file on observed output as a belt-and-suspenders fallback.

type ImageManager

type ImageManager interface {
	// PrepareRootfs prepares a rootfs from a container image.
	// Returns the path to the rootfs and a cleanup function.
	PrepareRootfs(ctx context.Context, imageRef string) (rootfsPath string, cleanup func(), err error)

	// Close cleans up all resources.
	Close() error
}

ImageManager provides container image management functionality.

func NewImageManager

func NewImageManager(config types.ImageConfig) (ImageManager, error)

NewImageManager creates a new CLIP-based image manager.

type MountConfig

type MountConfig struct {
	// MountDir is the base directory for per-task mounts
	MountDir string

	// CLIBinary is the path to the CLI binary
	CLIBinary string

	// GatewayAddr is the gateway gRPC address
	GatewayAddr string

	// MountReadyTimeout is how long to wait for a mount to be ready
	MountReadyTimeout time.Duration
}

MountConfig configures the mount manager

func DefaultMountConfig

func DefaultMountConfig() MountConfig

DefaultMountConfig returns sensible defaults

type MountManager

type MountManager struct {
	// contains filtered or unexported fields
}

MountManager manages per-task FUSE mounts

func NewMountManager

func NewMountManager(config MountConfig) (*MountManager, error)

NewMountManager creates a new mount manager

func (*MountManager) ActiveMounts

func (m *MountManager) ActiveMounts() int

ActiveMounts returns the number of active mounts

func (*MountManager) CleanupAll

func (m *MountManager) CleanupAll()

CleanupAll unmounts all active mounts (call on shutdown)

func (*MountManager) CleanupStale

func (m *MountManager) CleanupStale(activeTasks map[string]bool) int

CleanupStale removes mounts for tasks that are no longer running

func (*MountManager) GetMountPath

func (m *MountManager) GetMountPath(taskID string) string

GetMountPath returns the mount path for a task, or empty if not mounted

func (*MountManager) Mount

func (m *MountManager) Mount(ctx context.Context, taskID, token string) (string, error)

Mount creates a FUSE mount for a task using the given token. Returns the mount path that can be bind-mounted into containers. Retries the initial mount up to maxInitialMountAttempts times to tolerate transient gateway unavailability (e.g. during rollouts).

func (*MountManager) Unmount

func (m *MountManager) Unmount(taskID string) error

Unmount stops the FUSE mount for a task and cleans up

type NeedsInputRunner added in v0.1.96

type NeedsInputRunner interface {
	AgentExecutionRunner
	SetupNeedsInput(mountSource string, env map[string]string) (string, error)
	ReadLastMessage(markerPath string) string
}

NeedsInputRunner extends AgentExecutionRunner with input-detection. A Stop hook dumps the agent's last message to a marker file. The worker reads it and calls BAML to classify whether the agent is blocked on user input or done.

type NetworkManager added in v0.1.23

type NetworkManager struct {
	// contains filtered or unexported fields
}

NetworkManager handles container networking with dual-stack NAT. IP allocation is coordinated via gRPC with the gateway.

func NewNetworkManager added in v0.1.23

func NewNetworkManager(ctx context.Context, workerID string, client *gatewayclient.GatewayClient) (*NetworkManager, error)

func (*NetworkManager) Setup added in v0.1.23

func (m *NetworkManager) Setup(sandboxID string, spec *specs.Spec) (string, error)

Setup creates network namespace and veth pair for a sandbox. Returns the allocated IP address.

func (*NetworkManager) TearDown added in v0.1.23

func (m *NetworkManager) TearDown(sandboxID string) error

TearDown removes all networking resources for a sandbox.

type OutputAnalyzer added in v0.1.96

type OutputAnalyzer interface {
	ShouldAnalyze(payload map[string]any) bool
	PrepareInput(payload map[string]any) (toolName, toolInput, toolResult string, ok bool)
}

OutputAnalyzer decides which log lines may contain extractable outputs and prepares them for the BAML classifier. Each runner provides its own implementation because log formats differ across providers.

type OutputParsingRunner added in v0.1.110

type OutputParsingRunner interface {
	TurnRunner
	ParseTurnOutput(output []byte) (TurnParseResult, error)
}

OutputParsingRunner extends TurnRunner for runners whose stdout carries structured turn-outcome signals directly in the process output. The worker calls ParseTurnOutput after each turn instead of relying solely on file-based markers or a single needs-input bit from the runner.

type OutputWriter added in v0.1.96

type OutputWriter struct {
	// contains filtered or unexported fields
}

OutputWriter intercepts structured output messages from agent stdout (type=output, output_append, output_done) and sends them to the gateway via gRPC.

func (*OutputWriter) Wait added in v0.1.103

func (w *OutputWriter) Wait()

func (*OutputWriter) Write added in v0.1.96

func (w *OutputWriter) Write(p []byte) (int, error)

type ResponseExtractor added in v0.1.110

type ResponseExtractor interface {
	ExtractResponseText(raw []byte, limit int) string
}

ResponseExtractor extends AgentExecutionRunner with response text extraction from raw PTY output. The worker calls this after the session completes to extract the assistant's final response for output persistence. Runners that don't implement this fall back to the default stream-json parser (Claude Code format).

type S2Writer

type S2Writer struct {
	// contains filtered or unexported fields
}

S2Writer writes task output to S2 streams.

func NewS2Writer

func NewS2Writer(ctx context.Context, client *common.S2Client, taskID, stream string) *S2Writer

NewS2Writer creates a writer that appends to S2.

func (*S2Writer) Write

func (w *S2Writer) Write(p []byte) (int, error)

type Sandbox added in v0.1.23

type Sandbox struct {
	Config  types.SandboxConfig
	State   types.SandboxState
	Bundle  string
	Cancel  context.CancelFunc
	Overlay *common.ContainerOverlay
	Rootfs  func() // cleanup function
	Output  io.Writer
	Flush   func()
	// contains filtered or unexported fields
}

Sandbox represents a running sandbox with its resources.

type SandboxManager

type SandboxManager struct {
	// contains filtered or unexported fields
}

SandboxManager manages the lifecycle of sandboxes on a worker.

func NewSandboxManager

func NewSandboxManager(ctx context.Context, cfg Config) (*SandboxManager, error)

func (*SandboxManager) AttachPTY added in v0.1.53

func (m *SandboxManager) AttachPTY(
	ctx context.Context,
	sandboxID string,
	stdin io.Reader,
	stdout io.Writer,
) error

AttachPTY starts a long-lived PTY-backed process in a running sandbox. Claude interactive tasks run their task entrypoint directly; other interactive tasks keep the existing shell PTY behavior.

func (*SandboxManager) BamlEnvForRunner added in v0.1.110

func (m *SandboxManager) BamlEnvForRunner(runner AgentExecutionRunner) map[string]string

func (*SandboxManager) Close

func (m *SandboxManager) Close() error

Close shuts down the sandbox manager and all sandboxes

func (*SandboxManager) Create

Create creates a new sandbox from the given config

func (*SandboxManager) Delete

func (m *SandboxManager) Delete(sandboxID string, force bool) error

Delete removes a sandbox and cleans up resources.

The teardown sequence avoids racing with the runtime supervisor:

  1. Stop container processes (SIGTERM or SIGKILL depending on force)
  2. Wait for the Run() goroutine to finish so runsc exits cleanly
  3. Only cancel the sandbox context as a fallback if Run() doesn't drain
  4. runtime.Delete to clean up any residual state

func (*SandboxManager) ExecCheck added in v0.1.102

func (m *SandboxManager) ExecCheck(ctx context.Context, sandboxID string, args []string) error

ExecCheck runs a short-lived command inside a sandbox and returns nil if the command exits 0. This is useful for probing container state (e.g. checking whether specific processes are still running via pgrep).

func (*SandboxManager) ExecPTY added in v0.1.66

func (m *SandboxManager) ExecPTY(
	ctx context.Context,
	sandboxID string,
	args []string,
	env map[string]string,
	stdout io.Writer,
) error

ExecPTY runs a command inside an existing sandbox with PTY-compatible env. Unlike AttachPTY, it takes explicit args and does not attach stdin — the prompt is passed via command-line args. This is used by the turn-based session loop where each turn is a separate Exec call.

The command is wrapped in a login shell so that the user's profile (and full PATH) is available — tools like `claude` are often installed outside the minimal system PATH.

func (*SandboxManager) Get

func (m *SandboxManager) Get(sandboxID string) (*types.SandboxState, error)

Get returns the state of a sandbox

func (*SandboxManager) List

func (m *SandboxManager) List() []types.SandboxState

List returns all managed sandboxes

func (*SandboxManager) ResolveRunner added in v0.1.66

func (m *SandboxManager) ResolveRunner(task types.RunExecution, env map[string]string) AgentExecutionRunner

ResolveRunner returns the AgentExecutionRunner for the given task.

func (*SandboxManager) RunTask

RunTask creates and runs a sandbox for a task, returning when complete

func (*SandboxManager) SetOutput

func (m *SandboxManager) SetOutput(sandboxID string, writer io.Writer, flusher func()) error

SetOutput configures the output writer and flusher for a sandbox. Must be called before Start.

func (*SandboxManager) Start

func (m *SandboxManager) Start(sandboxID string) error

Start starts a created sandbox

func (*SandboxManager) Stop

func (m *SandboxManager) Stop(sandboxID string, force bool) error

Stop stops a running sandbox by signalling container processes. Non-force sends SIGTERM; force sends SIGKILL. Neither cancels the sandbox context — that is handled by Delete after waiting for the Run() goroutine to drain.

type TaskStreamOutput added in v0.1.110

type TaskStreamOutput struct {
	// contains filtered or unexported fields
}

TaskStreamOutput handles task stdout/stderr with multiple destinations. Zero allocation for the common case of just writing.

func NewTaskStreamOutput added in v0.1.110

func NewTaskStreamOutput(taskID, stream string, writers ...io.Writer) *TaskStreamOutput

NewTaskStreamOutput creates an output handler for a task stream.

func (*TaskStreamOutput) Flush added in v0.1.110

func (o *TaskStreamOutput) Flush()

Flush writes any remaining buffered content.

func (*TaskStreamOutput) Write added in v0.1.110

func (o *TaskStreamOutput) Write(p []byte) (int, error)

Write implements io.Writer. Buffers partial lines, flushes complete ones.

type TurnArgMode added in v0.1.71

type TurnArgMode string
const (
	// TurnArgModeFirstStart is a normal first turn; it preserves explicit session ids.
	TurnArgModeFirstStart TurnArgMode = "first_start"
	// TurnArgModeFirstResumeLatest resumes from latest local/VFS state.
	TurnArgModeFirstResumeLatest TurnArgMode = "first_resume_latest"
	// TurnArgModeFirstResumeByID resumes using an explicit session id.
	TurnArgModeFirstResumeByID TurnArgMode = "first_resume_by_id"
	// TurnArgModeFollowup is used for non-first turns.
	TurnArgModeFollowup TurnArgMode = "followup"
)

type TurnArtifact added in v0.1.110

type TurnArtifact struct {
	OutputType string
	Title      string
	Summary    string
	Content    string
	URI        string
	Path       string
	Data       map[string]any
	Metadata   map[string]any
	Role       string
	Status     string
	Blocking   *types.TaskOutputBlockingMetadata
}

TurnArtifact is a first-class structured output emitted directly by a runner as part of its turn result. Artifacts are persisted independently of control flow so the worker can keep durable outputs and task-state transitions as separate concerns.

type TurnBlockerDirective added in v0.1.110

type TurnBlockerDirective struct {
	InputKind types.InputKind
	Summary   string
}

TurnBlockerDirective is an explicit instruction that the current turn should pause for user input. The worker projects it into blocker/task state after persisting any first-class outputs emitted by the turn.

type TurnControl added in v0.1.110

type TurnControl struct {
	Blocker    *TurnBlockerDirective
	WakeSignal *types.RunExecutionWakeSignal
}

TurnControl carries the turn's control-flow directive independently from any artifacts that were emitted. A turn can either block for input, schedule a wake, or complete normally.

func (*TurnControl) IsZero added in v0.1.110

func (c *TurnControl) IsZero() bool

type TurnParseResult added in v0.1.110

type TurnParseResult struct {
	Response  string
	Artifacts []TurnArtifact
	Control   *TurnControl
}

TurnParseResult is the normalized worker-side view of a runner's structured turn output. Control flow is explicit via Control; artifacts remain durable outputs rather than implicitly driving task state.

type TurnRunner added in v0.1.66

type TurnRunner interface {
	AgentExecutionRunner
	BuildTurnArgs(prompt string, env map[string]string, mode TurnArgMode) []string
}

TurnRunner extends AgentExecutionRunner with per-turn execution. Runners implementing this interface execute each turn as a separate process in the sandbox, with the Go worker managing the lifecycle between turns — no shell loop, no stdin pipe.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker represents a airstore worker that: 1. Registers with the gateway 2. Pulls tasks from the queue 3. Runs tasks in gVisor sandboxes

func NewWorker

func NewWorker() (*Worker, error)

NewWorker creates a new Worker instance

func (*Worker) Run

func (w *Worker) Run() error

Run starts the worker and blocks until shutdown

Directories

Path Synopsis
agentsignal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL