worker

package
v0.1.105 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2026 License: AGPL-3.0, AGPL-3.0-or-later Imports: 45 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultCachePath = "/var/lib/clip/cache"
	DefaultWorkPath  = "/var/lib/clip/work"
	DefaultMountPath = "/var/lib/clip/mnt"
)

Default paths for CLIP storage directories

View Source
const (
	DefaultBetweenTurnsTimeout = 60 * time.Second
)

Variables

This section is empty.

Functions

This section is empty.

Types

type AgentExecutionRunner added in v0.1.60

type AgentExecutionRunner interface {
	Name() string
	BuildEntrypoint(task types.RunExecution, env map[string]string) []string
}

AgentExecutionRunner builds the process entrypoint for an agent task.

type AnalyzerWriter added in v0.1.96

type AnalyzerWriter struct {
	// contains filtered or unexported fields
}

AnalyzerWriter is an io.Writer that inspects agent stdout lines, runs qualifying tool completions through the BAML ExtractOutputs classifier, and creates TaskOutput records via the gateway gRPC.

It rate-limits to one in-flight BAML call per task and queues additional candidates so we don't overwhelm the classifier.

func NewAnalyzerWriter added in v0.1.96

func NewAnalyzerWriter(
	ctx context.Context,
	analyzer OutputAnalyzer,
	client *gatewayclient.GatewayClient,
	task types.RunExecution,
	bamlEnv map[string]string,
) *AnalyzerWriter

func (*AnalyzerWriter) Wait added in v0.1.103

func (w *AnalyzerWriter) Wait()

func (*AnalyzerWriter) Write added in v0.1.96

func (w *AnalyzerWriter) Write(p []byte) (int, error)

type CLIPImageManager

type CLIPImageManager struct {
	// contains filtered or unexported fields
}

CLIPImageManager implements ImageManager using CLIP for lazy-loading images from OCI registries. It manages FUSE mounts with reference counting for efficient image reuse.

func (*CLIPImageManager) Close

func (m *CLIPImageManager) Close() error

Close unmounts all FUSE filesystems and releases resources.

func (*CLIPImageManager) PrepareRootfs

func (m *CLIPImageManager) PrepareRootfs(ctx context.Context, imageRef string) (string, func(), error)

PrepareRootfs converts an OCI image to CLIP format and mounts it as a FUSE filesystem. If the image is already mounted, it increments the reference count and returns the existing mount.

type ClaudeCodeAnalyzer added in v0.1.96

type ClaudeCodeAnalyzer struct {
	// contains filtered or unexported fields
}

ClaudeCodeAnalyzer implements OutputAnalyzer for the Claude Code stream-json format. It tracks tool_use blocks (from "assistant" messages) and matches them with tool_result blocks (from "user" messages) to decide which completions are worth classifying via the BAML ExtractOutputs function.

func NewClaudeCodeAnalyzer added in v0.1.96

func NewClaudeCodeAnalyzer() *ClaudeCodeAnalyzer

func (*ClaudeCodeAnalyzer) PrepareInput added in v0.1.96

func (a *ClaudeCodeAnalyzer) PrepareInput(payload map[string]any) (toolName, toolInput, toolResult string, ok bool)

func (*ClaudeCodeAnalyzer) ShouldAnalyze added in v0.1.96

func (a *ClaudeCodeAnalyzer) ShouldAnalyze(payload map[string]any) bool

type ClaudeCodeRunner added in v0.1.60

type ClaudeCodeRunner struct {
	// contains filtered or unexported fields
}

func NewClaudeCodeRunner added in v0.1.60

func NewClaudeCodeRunner(opts ClaudeCodeRunnerOptions) *ClaudeCodeRunner

func (*ClaudeCodeRunner) BuildEntrypoint added in v0.1.60

func (r *ClaudeCodeRunner) BuildEntrypoint(task types.RunExecution, env map[string]string) []string

func (*ClaudeCodeRunner) BuildTurnArgs added in v0.1.66

func (r *ClaudeCodeRunner) BuildTurnArgs(prompt string, env map[string]string, mode TurnArgMode) []string

BuildTurnArgs returns the argv for a single interactive turn. Each turn runs claude --print as a separate process in the sandbox; the Go worker manages the loop between turns.

func (*ClaudeCodeRunner) CheckHeartbeat added in v0.1.88

func (r *ClaudeCodeRunner) CheckHeartbeat(heartbeatPath string) bool

func (*ClaudeCodeRunner) Name added in v0.1.60

func (r *ClaudeCodeRunner) Name() string

func (*ClaudeCodeRunner) ReadLastMessage added in v0.1.96

func (r *ClaudeCodeRunner) ReadLastMessage(markerPath string) string

func (*ClaudeCodeRunner) SetupHeartbeat added in v0.1.88

func (r *ClaudeCodeRunner) SetupHeartbeat(mountSource string, env map[string]string) (string, error)

func (*ClaudeCodeRunner) SetupNeedsInput added in v0.1.96

func (r *ClaudeCodeRunner) SetupNeedsInput(mountSource string, env map[string]string) (string, error)

type ClaudeCodeRunnerOptions added in v0.1.60

type ClaudeCodeRunnerOptions struct {
	AnthropicAPIKey string
	KernelAPIKey    string
}

type Config added in v0.1.23

type Config struct {
	// Paths
	BundleDir   string
	StateDir    string
	MountDir    string
	WorkerMount string
	CLIBinary   string

	// Identity
	WorkerID string

	// Gateway
	GatewayAddr   string
	AuthToken     string
	GatewayClient *gatewayclient.GatewayClient

	// Features
	EnableFilesystem  bool
	EnableNetwork     bool
	UseHostResolvConf bool

	// Runtime
	RuntimeType   string
	RuntimeConfig runtime.Config
	ImageConfig   types.ImageConfig

	// Streaming
	S2Token string
	S2Basin string

	// API keys
	AnthropicAPIKey string
	KernelAPIKey    string
}

Config for creating a SandboxManager.

type ConsoleWriter

type ConsoleWriter struct {
	// contains filtered or unexported fields
}

ConsoleWriter writes task output to the worker's console with context.

func NewConsoleWriter

func NewConsoleWriter(taskID, stream string) *ConsoleWriter

NewConsoleWriter creates a writer that logs to the worker console.

func (*ConsoleWriter) Write

func (w *ConsoleWriter) Write(p []byte) (int, error)

type FileWriter

type FileWriter struct {
	// contains filtered or unexported fields
}

FileWriter writes to a file (useful for debugging).

func NewFileWriter

func NewFileWriter(path string) (*FileWriter, error)

NewFileWriter creates a writer that appends to a file.

func (*FileWriter) Close

func (w *FileWriter) Close() error

func (*FileWriter) Write

func (w *FileWriter) Write(p []byte) (int, error)

type HeartbeatRunner added in v0.1.88

type HeartbeatRunner interface {
	AgentExecutionRunner
	// SetupHeartbeat writes hook configuration to the VFS so the runner
	// touches a heartbeat file on each tool use / stop. mountSource is
	// the host-side VFS FUSE mount path. env is the task env used to
	// derive CLAUDE_CONFIG_DIR. Returns the host-side heartbeat path.
	SetupHeartbeat(mountSource string, env map[string]string) (string, error)
	// CheckHeartbeat returns true if the heartbeat file at the given
	// host-side path was recently modified.
	CheckHeartbeat(heartbeatPath string) bool
}

HeartbeatRunner extends AgentExecutionRunner with liveness tracking. Runners install hooks inside the sandbox that touch a heartbeat file on each lifecycle event (tool use, stop). The worker also touches the file on observed output as a belt-and-suspenders fallback.

type ImageManager

type ImageManager interface {
	// PrepareRootfs prepares a rootfs from a container image.
	// Returns the path to the rootfs and a cleanup function.
	PrepareRootfs(ctx context.Context, imageRef string) (rootfsPath string, cleanup func(), err error)

	// Close cleans up all resources.
	Close() error
}

ImageManager provides container image management functionality.

func NewImageManager

func NewImageManager(config types.ImageConfig) (ImageManager, error)

NewImageManager creates a new CLIP-based image manager.

type MountConfig

type MountConfig struct {
	// MountDir is the base directory for per-task mounts
	MountDir string

	// CLIBinary is the path to the CLI binary
	CLIBinary string

	// GatewayAddr is the gateway gRPC address
	GatewayAddr string

	// MountReadyTimeout is how long to wait for a mount to be ready
	MountReadyTimeout time.Duration
}

MountConfig configures the mount manager

func DefaultMountConfig

func DefaultMountConfig() MountConfig

DefaultMountConfig returns sensible defaults

type MountManager

type MountManager struct {
	// contains filtered or unexported fields
}

MountManager manages per-task FUSE mounts

func NewMountManager

func NewMountManager(config MountConfig) (*MountManager, error)

NewMountManager creates a new mount manager

func (*MountManager) ActiveMounts

func (m *MountManager) ActiveMounts() int

ActiveMounts returns the number of active mounts

func (*MountManager) CleanupAll

func (m *MountManager) CleanupAll()

CleanupAll unmounts all active mounts (call on shutdown)

func (*MountManager) CleanupStale

func (m *MountManager) CleanupStale(activeTasks map[string]bool) int

CleanupStale removes mounts for tasks that are no longer running

func (*MountManager) GetMountPath

func (m *MountManager) GetMountPath(taskID string) string

GetMountPath returns the mount path for a task, or empty if not mounted

func (*MountManager) Mount

func (m *MountManager) Mount(ctx context.Context, taskID, token string) (string, error)

Mount creates a FUSE mount for a task using the given token. Returns the mount path that can be bind-mounted into containers.

func (*MountManager) Unmount

func (m *MountManager) Unmount(taskID string) error

Unmount stops the FUSE mount for a task and cleans up

type NeedsInputRunner added in v0.1.96

type NeedsInputRunner interface {
	AgentExecutionRunner
	SetupNeedsInput(mountSource string, env map[string]string) (string, error)
	ReadLastMessage(markerPath string) string
}

NeedsInputRunner extends AgentExecutionRunner with input-detection. A Stop hook dumps the agent's last message to a marker file. The worker reads it and calls BAML to classify whether the agent is blocked on user input or done.

type NetworkManager added in v0.1.23

type NetworkManager struct {
	// contains filtered or unexported fields
}

NetworkManager handles container networking with dual-stack NAT. IP allocation is coordinated via gRPC with the gateway.

func NewNetworkManager added in v0.1.23

func NewNetworkManager(ctx context.Context, workerID string, client *gatewayclient.GatewayClient) (*NetworkManager, error)

func (*NetworkManager) Setup added in v0.1.23

func (m *NetworkManager) Setup(sandboxID string, spec *specs.Spec) (string, error)

Setup creates network namespace and veth pair for a sandbox. Returns the allocated IP address.

func (*NetworkManager) TearDown added in v0.1.23

func (m *NetworkManager) TearDown(sandboxID string) error

TearDown removes all networking resources for a sandbox.

type OutputAnalyzer added in v0.1.96

type OutputAnalyzer interface {
	// ShouldAnalyze returns true if the parsed JSON log line might
	// contain an extractable output (file write, API call, etc.).
	// This must be fast — no LLM calls, just field inspection.
	ShouldAnalyze(payload map[string]any) bool

	// PrepareInput extracts classifier inputs from a qualifying payload.
	// Returns false if the payload can't be prepared after all.
	PrepareInput(payload map[string]any) (toolName, toolInput, toolResult string, ok bool)
}

OutputAnalyzer decides which log lines may contain extractable outputs and prepares them for the BAML classifier. Each runner provides its own implementation since log formats differ across providers.

type OutputConfig

type OutputConfig struct {
	TaskID   string
	S2Client *common.S2Client
	Console  bool // Write to worker stdout
}

OutputConfig configures task output destinations.

type OutputWriter added in v0.1.96

type OutputWriter struct {
	// contains filtered or unexported fields
}

OutputWriter intercepts structured output messages from agent stdout (type=output, output_append, output_done) and sends them to the gateway via gRPC.

func NewOutputWriter added in v0.1.96

func NewOutputWriter(ctx context.Context, client *gatewayclient.GatewayClient, task types.RunExecution) *OutputWriter

func (*OutputWriter) Wait added in v0.1.103

func (w *OutputWriter) Wait()

func (*OutputWriter) Write added in v0.1.96

func (w *OutputWriter) Write(p []byte) (int, error)

type S2Writer

type S2Writer struct {
	// contains filtered or unexported fields
}

S2Writer writes task output to S2 streams.

func NewS2Writer

func NewS2Writer(ctx context.Context, client *common.S2Client, taskID, stream string) *S2Writer

NewS2Writer creates a writer that appends to S2.

func (*S2Writer) Write

func (w *S2Writer) Write(p []byte) (int, error)

type Sandbox added in v0.1.23

type Sandbox struct {
	Config  types.SandboxConfig
	State   types.SandboxState
	Bundle  string
	Cancel  context.CancelFunc
	Overlay *common.ContainerOverlay
	Rootfs  func() // cleanup function
	Output  io.Writer
	Flush   func()
	// contains filtered or unexported fields
}

Sandbox represents a running sandbox with its resources.

type SandboxManager

type SandboxManager struct {
	// contains filtered or unexported fields
}

SandboxManager manages the lifecycle of sandboxes on a worker.

func NewSandboxManager

func NewSandboxManager(ctx context.Context, cfg Config) (*SandboxManager, error)

func (*SandboxManager) AttachPTY added in v0.1.53

func (m *SandboxManager) AttachPTY(
	ctx context.Context,
	sandboxID string,
	stdin io.Reader,
	stdout io.Writer,
) error

AttachPTY starts a long-lived PTY-backed process in a running sandbox. Claude interactive tasks run their task entrypoint directly; other interactive tasks keep the existing shell PTY behavior.

func (*SandboxManager) BamlEnv added in v0.1.96

func (m *SandboxManager) BamlEnv() map[string]string

BamlEnv returns the environment variables needed for BAML classifier calls (e.g. ExtractOutputs, ClassifyTurn). The worker's own config is the source of truth — no extraction from task env needed.

func (*SandboxManager) Close

func (m *SandboxManager) Close() error

Close shuts down the sandbox manager and all sandboxes

func (*SandboxManager) Create

Create creates a new sandbox from the given config

func (*SandboxManager) Delete

func (m *SandboxManager) Delete(sandboxID string, force bool) error

Delete removes a sandbox and cleans up resources.

The teardown sequence avoids racing with the runtime supervisor:

  1. Stop container processes (SIGTERM or SIGKILL depending on force)
  2. Wait for the Run() goroutine to finish so runsc exits cleanly
  3. Only cancel the sandbox context as a fallback if Run() doesn't drain
  4. runtime.Delete to clean up any residual state

func (*SandboxManager) ExecCheck added in v0.1.102

func (m *SandboxManager) ExecCheck(ctx context.Context, sandboxID string, args []string) error

ExecCheck runs a short-lived command inside a sandbox and returns nil if the command exits 0. This is useful for probing container state (e.g. checking whether specific processes are still running via pgrep).

func (*SandboxManager) ExecPTY added in v0.1.66

func (m *SandboxManager) ExecPTY(
	ctx context.Context,
	sandboxID string,
	args []string,
	env map[string]string,
	stdout io.Writer,
) error

ExecPTY runs a command inside an existing sandbox with PTY-compatible env. Unlike AttachPTY, it takes explicit args and does not attach stdin — the prompt is passed via command-line args. This is used by the turn-based session loop where each turn is a separate Exec call.

The command is wrapped in a login shell so that the user's profile (and full PATH) is available — tools like `claude` are often installed outside the minimal system PATH.

func (*SandboxManager) Get

func (m *SandboxManager) Get(sandboxID string) (*types.SandboxState, error)

Get returns the state of a sandbox

func (*SandboxManager) List

func (m *SandboxManager) List() []types.SandboxState

List returns all managed sandboxes

func (*SandboxManager) ResolveRunner added in v0.1.66

func (m *SandboxManager) ResolveRunner(task types.RunExecution, env map[string]string) AgentExecutionRunner

ResolveRunner returns the AgentExecutionRunner for the given task.

func (*SandboxManager) RunTask

RunTask creates and runs a sandbox for a task, returning when complete

func (*SandboxManager) SetOutput

func (m *SandboxManager) SetOutput(sandboxID string, writer io.Writer, flusher func()) error

SetOutput configures the output writer and flusher for a sandbox. Must be called before Start.

func (*SandboxManager) Start

func (m *SandboxManager) Start(sandboxID string) error

Start starts a created sandbox

func (*SandboxManager) Stop

func (m *SandboxManager) Stop(sandboxID string, force bool) error

Stop stops a running sandbox by signalling container processes. Non-force sends SIGTERM; force sends SIGKILL. Neither cancels the sandbox context — that is handled by Delete after waiting for the Run() goroutine to drain.

type TaskOutput

type TaskOutput struct {
	// contains filtered or unexported fields
}

TaskOutput handles task stdout/stderr with multiple destinations. Zero allocation for the common case of just writing.

func NewOutputPair

func NewOutputPair(ctx context.Context, cfg OutputConfig) (stdout, stderr *TaskOutput)

NewOutputPair creates stdout and stderr writers for a task.

func NewTaskOutput

func NewTaskOutput(taskID, stream string, writers ...io.Writer) *TaskOutput

NewTaskOutput creates an output handler for a task stream.

func (*TaskOutput) Flush

func (o *TaskOutput) Flush()

Flush writes any remaining buffered content.

func (*TaskOutput) Write

func (o *TaskOutput) Write(p []byte) (int, error)

Write implements io.Writer. Buffers partial lines, flushes complete ones.

type TurnArgMode added in v0.1.71

type TurnArgMode string
const (
	// TurnArgModeFirstStart is a normal first turn; it preserves explicit session ids.
	TurnArgModeFirstStart TurnArgMode = "first_start"
	// TurnArgModeFirstResumeLatest resumes from latest local/VFS state.
	TurnArgModeFirstResumeLatest TurnArgMode = "first_resume_latest"
	// TurnArgModeFirstResumeByID resumes using an explicit session id.
	TurnArgModeFirstResumeByID TurnArgMode = "first_resume_by_id"
	// TurnArgModeFollowup is used for non-first turns.
	TurnArgModeFollowup TurnArgMode = "followup"
)

type TurnRunner added in v0.1.66

type TurnRunner interface {
	AgentExecutionRunner
	BuildTurnArgs(prompt string, env map[string]string, mode TurnArgMode) []string
}

TurnRunner extends AgentExecutionRunner with per-turn execution. Runners implementing this interface execute each turn as a separate process in the sandbox, with the Go worker managing the lifecycle between turns — no shell loop, no stdin pipe.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker represents a airstore worker that: 1. Registers with the gateway 2. Pulls tasks from the queue 3. Runs tasks in gVisor sandboxes

func NewWorker

func NewWorker() (*Worker, error)

NewWorker creates a new Worker instance

func (*Worker) PoolName

func (w *Worker) PoolName() string

PoolName returns the worker's pool name

func (*Worker) Run

func (w *Worker) Run() error

Run starts the worker and blocks until shutdown

func (*Worker) WorkerId

func (w *Worker) WorkerId() string

WorkerId returns the worker's ID

Directories

Path Synopsis
agentsignal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL