worker

package
v0.1.74 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2026 License: AGPL-3.0, AGPL-3.0-or-later Imports: 40 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultCachePath = "/var/lib/clip/cache"
	DefaultWorkPath  = "/var/lib/clip/work"
	DefaultMountPath = "/var/lib/clip/mnt"
)

Default paths for CLIP storage directories

View Source
const DefaultBetweenTurnsTimeout = 60 * time.Second

Variables

This section is empty.

Functions

This section is empty.

Types

type AgentExecutionRunner added in v0.1.60

type AgentExecutionRunner interface {
	Name() string
	BuildEntrypoint(task types.RunExecution, env map[string]string) []string
}

AgentExecutionRunner builds the process entrypoint for an agent task.

type CLIPImageManager

type CLIPImageManager struct {
	// contains filtered or unexported fields
}

CLIPImageManager implements ImageManager using CLIP for lazy-loading images from OCI registries. It manages FUSE mounts with reference counting for efficient image reuse.

func (*CLIPImageManager) Close

func (m *CLIPImageManager) Close() error

Close unmounts all FUSE filesystems and releases resources.

func (*CLIPImageManager) PrepareRootfs

func (m *CLIPImageManager) PrepareRootfs(ctx context.Context, imageRef string) (string, func(), error)

PrepareRootfs converts an OCI image to CLIP format and mounts it as a FUSE filesystem. If the image is already mounted, it increments the reference count and returns the existing mount.

type ClaudeCodeRunner added in v0.1.60

type ClaudeCodeRunner struct {
	// contains filtered or unexported fields
}

func NewClaudeCodeRunner added in v0.1.60

func NewClaudeCodeRunner(opts ClaudeCodeRunnerOptions) *ClaudeCodeRunner

func (*ClaudeCodeRunner) BuildEntrypoint added in v0.1.60

func (r *ClaudeCodeRunner) BuildEntrypoint(task types.RunExecution, env map[string]string) []string

func (*ClaudeCodeRunner) BuildTurnArgs added in v0.1.66

func (r *ClaudeCodeRunner) BuildTurnArgs(prompt string, env map[string]string, mode TurnArgMode) []string

BuildTurnArgs returns the argv for a single interactive turn. Each turn runs claude --print as a separate process in the sandbox; the Go worker manages the loop between turns.

func (*ClaudeCodeRunner) Name added in v0.1.60

func (r *ClaudeCodeRunner) Name() string

type ClaudeCodeRunnerOptions added in v0.1.60

type ClaudeCodeRunnerOptions struct {
	AnthropicAPIKey string
	KernelAPIKey    string
}

type Config added in v0.1.23

type Config struct {
	// Paths
	BundleDir   string
	StateDir    string
	MountDir    string
	WorkerMount string
	CLIBinary   string

	// Identity
	WorkerID string

	// Gateway
	GatewayAddr   string
	AuthToken     string
	GatewayClient *gatewayclient.GatewayClient

	// Features
	EnableFilesystem  bool
	EnableNetwork     bool
	UseHostResolvConf bool

	// Runtime
	RuntimeType   string
	RuntimeConfig runtime.Config
	ImageConfig   types.ImageConfig

	// Streaming
	S2Token string
	S2Basin string

	// API keys
	AnthropicAPIKey string
	KernelAPIKey    string
}

Config for creating a SandboxManager.

type ConsoleWriter

type ConsoleWriter struct {
	// contains filtered or unexported fields
}

ConsoleWriter writes task output to the worker's console with context.

func NewConsoleWriter

func NewConsoleWriter(taskID, stream string) *ConsoleWriter

NewConsoleWriter creates a writer that logs to the worker console.

func (*ConsoleWriter) Write

func (w *ConsoleWriter) Write(p []byte) (int, error)

type FileWriter

type FileWriter struct {
	// contains filtered or unexported fields
}

FileWriter writes to a file (useful for debugging).

func NewFileWriter

func NewFileWriter(path string) (*FileWriter, error)

NewFileWriter creates a writer that appends to a file.

func (*FileWriter) Close

func (w *FileWriter) Close() error

func (*FileWriter) Write

func (w *FileWriter) Write(p []byte) (int, error)

type ImageManager

type ImageManager interface {
	// PrepareRootfs prepares a rootfs from a container image.
	// Returns the path to the rootfs and a cleanup function.
	PrepareRootfs(ctx context.Context, imageRef string) (rootfsPath string, cleanup func(), err error)

	// Close cleans up all resources.
	Close() error
}

ImageManager provides container image management functionality.

func NewImageManager

func NewImageManager(config types.ImageConfig) (ImageManager, error)

NewImageManager creates a new CLIP-based image manager.

type MountConfig

type MountConfig struct {
	// MountDir is the base directory for per-task mounts
	MountDir string

	// CLIBinary is the path to the CLI binary
	CLIBinary string

	// GatewayAddr is the gateway gRPC address
	GatewayAddr string

	// MountReadyTimeout is how long to wait for a mount to be ready
	MountReadyTimeout time.Duration
}

MountConfig configures the mount manager

func DefaultMountConfig

func DefaultMountConfig() MountConfig

DefaultMountConfig returns sensible defaults

type MountManager

type MountManager struct {
	// contains filtered or unexported fields
}

MountManager manages per-task FUSE mounts

func NewMountManager

func NewMountManager(config MountConfig) (*MountManager, error)

NewMountManager creates a new mount manager

func (*MountManager) ActiveMounts

func (m *MountManager) ActiveMounts() int

ActiveMounts returns the number of active mounts

func (*MountManager) CleanupAll

func (m *MountManager) CleanupAll()

CleanupAll unmounts all active mounts (call on shutdown)

func (*MountManager) CleanupStale

func (m *MountManager) CleanupStale(activeTasks map[string]bool) int

CleanupStale removes mounts for tasks that are no longer running

func (*MountManager) GetMountPath

func (m *MountManager) GetMountPath(taskID string) string

GetMountPath returns the mount path for a task, or empty if not mounted

func (*MountManager) Mount

func (m *MountManager) Mount(ctx context.Context, taskID, token string) (string, error)

Mount creates a FUSE mount for a task using the given token Returns the mount path that can be bind-mounted into containers

func (*MountManager) Unmount

func (m *MountManager) Unmount(taskID string) error

Unmount stops the FUSE mount for a task and cleans up

type NetworkManager added in v0.1.23

type NetworkManager struct {
	// contains filtered or unexported fields
}

NetworkManager handles container networking with dual-stack NAT. IP allocation is coordinated via gRPC with the gateway.

func NewNetworkManager added in v0.1.23

func NewNetworkManager(ctx context.Context, workerID string, client *gatewayclient.GatewayClient) (*NetworkManager, error)

func (*NetworkManager) Setup added in v0.1.23

func (m *NetworkManager) Setup(sandboxID string, spec *specs.Spec) (string, error)

Setup creates network namespace and veth pair for a sandbox. Returns the allocated IP address.

func (*NetworkManager) TearDown added in v0.1.23

func (m *NetworkManager) TearDown(sandboxID string) error

TearDown removes all networking resources for a sandbox.

type OutputConfig

type OutputConfig struct {
	TaskID   string
	S2Client *common.S2Client
	Console  bool // Write to worker stdout
}

OutputConfig configures task output destinations.

type S2Writer

type S2Writer struct {
	// contains filtered or unexported fields
}

S2Writer writes task output to S2 streams.

func NewS2Writer

func NewS2Writer(ctx context.Context, client *common.S2Client, taskID, stream string) *S2Writer

NewS2Writer creates a writer that appends to S2.

func (*S2Writer) Write

func (w *S2Writer) Write(p []byte) (int, error)

type Sandbox added in v0.1.23

type Sandbox struct {
	Config  types.SandboxConfig
	State   types.SandboxState
	Bundle  string
	Cancel  context.CancelFunc
	Overlay *common.ContainerOverlay
	Rootfs  func() // cleanup function
	Output  io.Writer
	Flush   func()
}

Sandbox represents a running sandbox with its resources.

type SandboxManager

type SandboxManager struct {
	// contains filtered or unexported fields
}

SandboxManager manages the lifecycle of sandboxes on a worker.

func NewSandboxManager

func NewSandboxManager(ctx context.Context, cfg Config) (*SandboxManager, error)

func (*SandboxManager) AttachPTY added in v0.1.53

func (m *SandboxManager) AttachPTY(
	ctx context.Context,
	sandboxID string,
	stdin io.Reader,
	stdout io.Writer,
) error

AttachPTY starts a long-lived PTY-backed process in a running sandbox. Claude interactive tasks run their task entrypoint directly; other interactive tasks keep the existing shell PTY behavior.

func (*SandboxManager) Close

func (m *SandboxManager) Close() error

Close shuts down the sandbox manager and all sandboxes

func (*SandboxManager) Create

Create creates a new sandbox from the given config

func (*SandboxManager) Delete

func (m *SandboxManager) Delete(sandboxID string, force bool) error

Delete removes a sandbox and cleans up resources

func (*SandboxManager) ExecPTY added in v0.1.66

func (m *SandboxManager) ExecPTY(
	ctx context.Context,
	sandboxID string,
	args []string,
	env map[string]string,
	stdout io.Writer,
) error

ExecPTY runs a command inside an existing sandbox with PTY-compatible env. Unlike AttachPTY, it takes explicit args and does not attach stdin — the prompt is passed via command-line args. This is used by the turn-based session loop where each turn is a separate Exec call.

The command is wrapped in a login shell so that the user's profile (and full PATH) is available — tools like `claude` are often installed outside the minimal system PATH.

func (*SandboxManager) Get

func (m *SandboxManager) Get(sandboxID string) (*types.SandboxState, error)

Get returns the state of a sandbox

func (*SandboxManager) List

func (m *SandboxManager) List() []types.SandboxState

List returns all managed sandboxes

func (*SandboxManager) ResolveRunner added in v0.1.66

func (m *SandboxManager) ResolveRunner(task types.RunExecution, env map[string]string) AgentExecutionRunner

ResolveRunner returns the AgentExecutionRunner for the given task.

func (*SandboxManager) RunTask

RunTask creates and runs a sandbox for a task, returning when complete

func (*SandboxManager) SetOutput

func (m *SandboxManager) SetOutput(sandboxID string, writer io.Writer, flusher func()) error

SetOutput configures the output writer and flusher for a sandbox. Must be called before Start.

func (*SandboxManager) Start

func (m *SandboxManager) Start(sandboxID string) error

Start starts a created sandbox

func (*SandboxManager) Stop

func (m *SandboxManager) Stop(sandboxID string, force bool) error

Stop stops a running sandbox by sending a signal to the container processes. The sandbox context is NOT cancelled here so that runsc can exit naturally and report the real exit code. The context is cancelled later in Delete() as a final cleanup step.

type TaskOutput

type TaskOutput struct {
	// contains filtered or unexported fields
}

TaskOutput handles task stdout/stderr with multiple destinations. Zero allocation for the common case of just writing.

func NewOutputPair

func NewOutputPair(ctx context.Context, cfg OutputConfig) (stdout, stderr *TaskOutput)

NewOutputPair creates stdout and stderr writers for a task.

func NewTaskOutput

func NewTaskOutput(taskID, stream string, writers ...io.Writer) *TaskOutput

NewTaskOutput creates an output handler for a task stream.

func (*TaskOutput) Flush

func (o *TaskOutput) Flush()

Flush writes any remaining buffered content.

func (*TaskOutput) Write

func (o *TaskOutput) Write(p []byte) (int, error)

Write implements io.Writer. Buffers partial lines, flushes complete ones.

type TurnArgMode added in v0.1.71

type TurnArgMode string
const (
	// TurnArgModeFirstStart is a normal first turn; it preserves explicit session ids.
	TurnArgModeFirstStart TurnArgMode = "first_start"
	// TurnArgModeFirstResumeLatest resumes from latest local/VFS state.
	TurnArgModeFirstResumeLatest TurnArgMode = "first_resume_latest"
	// TurnArgModeFirstResumeByID resumes using an explicit session id.
	TurnArgModeFirstResumeByID TurnArgMode = "first_resume_by_id"
	// TurnArgModeFirstFreshNoSession starts a new session without an explicit session id.
	TurnArgModeFirstFreshNoSession TurnArgMode = "first_fresh_no_session"
	// TurnArgModeFollowup is used for non-first turns.
	TurnArgModeFollowup TurnArgMode = "followup"
)

type TurnRunner added in v0.1.66

type TurnRunner interface {
	AgentExecutionRunner
	BuildTurnArgs(prompt string, env map[string]string, mode TurnArgMode) []string
}

TurnRunner extends AgentExecutionRunner with per-turn execution. Runners implementing this interface execute each turn as a separate process in the sandbox, with the Go worker managing the lifecycle between turns — no shell loop, no stdin pipe.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker represents a airstore worker that: 1. Registers with the gateway 2. Pulls tasks from the queue 3. Runs tasks in gVisor sandboxes

func NewWorker

func NewWorker() (*Worker, error)

NewWorker creates a new Worker instance

func (*Worker) PoolName

func (w *Worker) PoolName() string

PoolName returns the worker's pool name

func (*Worker) Run

func (w *Worker) Run() error

Run starts the worker and blocks until shutdown

func (*Worker) WorkerId

func (w *Worker) WorkerId() string

WorkerId returns the worker's ID

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL