worker

package
v0.1.79 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 2, 2026 License: AGPL-3.0, AGPL-3.0-or-later Imports: 40 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultCachePath = "/var/lib/clip/cache"
	DefaultWorkPath  = "/var/lib/clip/work"
	DefaultMountPath = "/var/lib/clip/mnt"
)

Default paths for CLIP storage directories

View Source
const DefaultBetweenTurnsTimeout = 60 * time.Second

Variables

This section is empty.

Functions

This section is empty.

Types

type AgentExecutionRunner added in v0.1.60

type AgentExecutionRunner interface {
	Name() string
	BuildEntrypoint(task types.RunExecution, env map[string]string) []string
}

AgentExecutionRunner builds the process entrypoint for an agent task.

type CLIPImageManager

type CLIPImageManager struct {
	// contains filtered or unexported fields
}

CLIPImageManager implements ImageManager using CLIP for lazy-loading images from OCI registries. It manages FUSE mounts with reference counting for efficient image reuse.

func (*CLIPImageManager) Close

func (m *CLIPImageManager) Close() error

Close unmounts all FUSE filesystems and releases resources.

func (*CLIPImageManager) PrepareRootfs

func (m *CLIPImageManager) PrepareRootfs(ctx context.Context, imageRef string) (string, func(), error)

PrepareRootfs converts an OCI image to CLIP format and mounts it as a FUSE filesystem. If the image is already mounted, it increments the reference count and returns the existing mount.

type ClaudeCodeRunner added in v0.1.60

type ClaudeCodeRunner struct {
	// contains filtered or unexported fields
}

func NewClaudeCodeRunner added in v0.1.60

func NewClaudeCodeRunner(opts ClaudeCodeRunnerOptions) *ClaudeCodeRunner

func (*ClaudeCodeRunner) BuildEntrypoint added in v0.1.60

func (r *ClaudeCodeRunner) BuildEntrypoint(task types.RunExecution, env map[string]string) []string

func (*ClaudeCodeRunner) BuildTurnArgs added in v0.1.66

func (r *ClaudeCodeRunner) BuildTurnArgs(prompt string, env map[string]string, mode TurnArgMode) []string

BuildTurnArgs returns the argv for a single interactive turn. Each turn runs claude --print as a separate process in the sandbox; the Go worker manages the loop between turns.

func (*ClaudeCodeRunner) Name added in v0.1.60

func (r *ClaudeCodeRunner) Name() string

type ClaudeCodeRunnerOptions added in v0.1.60

type ClaudeCodeRunnerOptions struct {
	AnthropicAPIKey string
	KernelAPIKey    string
}

type Config added in v0.1.23

type Config struct {
	// Paths
	BundleDir   string
	StateDir    string
	MountDir    string
	WorkerMount string
	CLIBinary   string

	// Identity
	WorkerID string

	// Gateway
	GatewayAddr   string
	AuthToken     string
	GatewayClient *gatewayclient.GatewayClient

	// Features
	EnableFilesystem  bool
	EnableNetwork     bool
	UseHostResolvConf bool

	// Runtime
	RuntimeType   string
	RuntimeConfig runtime.Config
	ImageConfig   types.ImageConfig

	// Streaming
	S2Token string
	S2Basin string

	// API keys
	AnthropicAPIKey string
	KernelAPIKey    string
}

Config for creating a SandboxManager.

type ConsoleWriter

type ConsoleWriter struct {
	// contains filtered or unexported fields
}

ConsoleWriter writes task output to the worker's console with context.

func NewConsoleWriter

func NewConsoleWriter(taskID, stream string) *ConsoleWriter

NewConsoleWriter creates a writer that logs to the worker console.

func (*ConsoleWriter) Write

func (w *ConsoleWriter) Write(p []byte) (int, error)

type FileWriter

type FileWriter struct {
	// contains filtered or unexported fields
}

FileWriter writes to a file (useful for debugging).

func NewFileWriter

func NewFileWriter(path string) (*FileWriter, error)

NewFileWriter creates a writer that appends to a file.

func (*FileWriter) Close

func (w *FileWriter) Close() error

func (*FileWriter) Write

func (w *FileWriter) Write(p []byte) (int, error)

type ImageManager

type ImageManager interface {
	// PrepareRootfs prepares a rootfs from a container image.
	// Returns the path to the rootfs and a cleanup function.
	PrepareRootfs(ctx context.Context, imageRef string) (rootfsPath string, cleanup func(), err error)

	// Close cleans up all resources.
	Close() error
}

ImageManager provides container image management functionality.

func NewImageManager

func NewImageManager(config types.ImageConfig) (ImageManager, error)

NewImageManager creates a new CLIP-based image manager.

type MountConfig

type MountConfig struct {
	// MountDir is the base directory for per-task mounts
	MountDir string

	// CLIBinary is the path to the CLI binary
	CLIBinary string

	// GatewayAddr is the gateway gRPC address
	GatewayAddr string

	// MountReadyTimeout is how long to wait for a mount to be ready
	MountReadyTimeout time.Duration
}

MountConfig configures the mount manager

func DefaultMountConfig

func DefaultMountConfig() MountConfig

DefaultMountConfig returns sensible defaults

type MountManager

type MountManager struct {
	// contains filtered or unexported fields
}

MountManager manages per-task FUSE mounts

func NewMountManager

func NewMountManager(config MountConfig) (*MountManager, error)

NewMountManager creates a new mount manager

func (*MountManager) ActiveMounts

func (m *MountManager) ActiveMounts() int

ActiveMounts returns the number of active mounts

func (*MountManager) CleanupAll

func (m *MountManager) CleanupAll()

CleanupAll unmounts all active mounts (call on shutdown)

func (*MountManager) CleanupStale

func (m *MountManager) CleanupStale(activeTasks map[string]bool) int

CleanupStale removes mounts for tasks that are no longer running

func (*MountManager) GetMountPath

func (m *MountManager) GetMountPath(taskID string) string

GetMountPath returns the mount path for a task, or empty if not mounted

func (*MountManager) Mount

func (m *MountManager) Mount(ctx context.Context, taskID, token string) (string, error)

Mount creates a FUSE mount for a task using the given token Returns the mount path that can be bind-mounted into containers

func (*MountManager) Unmount

func (m *MountManager) Unmount(taskID string) error

Unmount stops the FUSE mount for a task and cleans up

type NetworkManager added in v0.1.23

type NetworkManager struct {
	// contains filtered or unexported fields
}

NetworkManager handles container networking with dual-stack NAT. IP allocation is coordinated via gRPC with the gateway.

func NewNetworkManager added in v0.1.23

func NewNetworkManager(ctx context.Context, workerID string, client *gatewayclient.GatewayClient) (*NetworkManager, error)

func (*NetworkManager) Setup added in v0.1.23

func (m *NetworkManager) Setup(sandboxID string, spec *specs.Spec) (string, error)

Setup creates network namespace and veth pair for a sandbox. Returns the allocated IP address.

func (*NetworkManager) TearDown added in v0.1.23

func (m *NetworkManager) TearDown(sandboxID string) error

TearDown removes all networking resources for a sandbox.

type OutputConfig

type OutputConfig struct {
	TaskID   string
	S2Client *common.S2Client
	Console  bool // Write to worker stdout
}

OutputConfig configures task output destinations.

type S2Writer

type S2Writer struct {
	// contains filtered or unexported fields
}

S2Writer writes task output to S2 streams.

func NewS2Writer

func NewS2Writer(ctx context.Context, client *common.S2Client, taskID, stream string) *S2Writer

NewS2Writer creates a writer that appends to S2.

func (*S2Writer) Write

func (w *S2Writer) Write(p []byte) (int, error)

type Sandbox added in v0.1.23

type Sandbox struct {
	Config  types.SandboxConfig
	State   types.SandboxState
	Bundle  string
	Cancel  context.CancelFunc
	Overlay *common.ContainerOverlay
	Rootfs  func() // cleanup function
	Output  io.Writer
	Flush   func()
}

Sandbox represents a running sandbox with its resources.

type SandboxManager

type SandboxManager struct {
	// contains filtered or unexported fields
}

SandboxManager manages the lifecycle of sandboxes on a worker.

func NewSandboxManager

func NewSandboxManager(ctx context.Context, cfg Config) (*SandboxManager, error)

func (*SandboxManager) AttachPTY added in v0.1.53

func (m *SandboxManager) AttachPTY(
	ctx context.Context,
	sandboxID string,
	stdin io.Reader,
	stdout io.Writer,
) error

AttachPTY starts a long-lived PTY-backed process in a running sandbox. Claude interactive tasks run their task entrypoint directly; other interactive tasks keep the existing shell PTY behavior.

func (*SandboxManager) Close

func (m *SandboxManager) Close() error

Close shuts down the sandbox manager and all sandboxes

func (*SandboxManager) Create

Create creates a new sandbox from the given config

func (*SandboxManager) Delete

func (m *SandboxManager) Delete(sandboxID string, force bool) error

Delete removes a sandbox and cleans up resources

func (*SandboxManager) ExecPTY added in v0.1.66

func (m *SandboxManager) ExecPTY(
	ctx context.Context,
	sandboxID string,
	args []string,
	env map[string]string,
	stdout io.Writer,
) error

ExecPTY runs a command inside an existing sandbox with PTY-compatible env. Unlike AttachPTY, it takes explicit args and does not attach stdin — the prompt is passed via command-line args. This is used by the turn-based session loop where each turn is a separate Exec call.

The command is wrapped in a login shell so that the user's profile (and full PATH) is available — tools like `claude` are often installed outside the minimal system PATH.

func (*SandboxManager) Get

func (m *SandboxManager) Get(sandboxID string) (*types.SandboxState, error)

Get returns the state of a sandbox

func (*SandboxManager) List

func (m *SandboxManager) List() []types.SandboxState

List returns all managed sandboxes

func (*SandboxManager) ResolveRunner added in v0.1.66

func (m *SandboxManager) ResolveRunner(task types.RunExecution, env map[string]string) AgentExecutionRunner

ResolveRunner returns the AgentExecutionRunner for the given task.

func (*SandboxManager) RunTask

RunTask creates and runs a sandbox for a task, returning when complete

func (*SandboxManager) SetOutput

func (m *SandboxManager) SetOutput(sandboxID string, writer io.Writer, flusher func()) error

SetOutput configures the output writer and flusher for a sandbox. Must be called before Start.

func (*SandboxManager) Start

func (m *SandboxManager) Start(sandboxID string) error

Start starts a created sandbox

func (*SandboxManager) Stop

func (m *SandboxManager) Stop(sandboxID string, force bool) error

Stop stops a running sandbox by signalling container processes. Non-force mode sends SIGTERM and preserves normal runtime teardown. Force mode sends SIGKILL and immediately cancels the sandbox context.

type TaskOutput

type TaskOutput struct {
	// contains filtered or unexported fields
}

TaskOutput handles task stdout/stderr with multiple destinations. Zero allocation for the common case of just writing.

func NewOutputPair

func NewOutputPair(ctx context.Context, cfg OutputConfig) (stdout, stderr *TaskOutput)

NewOutputPair creates stdout and stderr writers for a task.

func NewTaskOutput

func NewTaskOutput(taskID, stream string, writers ...io.Writer) *TaskOutput

NewTaskOutput creates an output handler for a task stream.

func (*TaskOutput) Flush

func (o *TaskOutput) Flush()

Flush writes any remaining buffered content.

func (*TaskOutput) Write

func (o *TaskOutput) Write(p []byte) (int, error)

Write implements io.Writer. Buffers partial lines, flushes complete ones.

type TurnArgMode added in v0.1.71

type TurnArgMode string
const (
	// TurnArgModeFirstStart is a normal first turn; it preserves explicit session ids.
	TurnArgModeFirstStart TurnArgMode = "first_start"
	// TurnArgModeFirstResumeLatest resumes from latest local/VFS state.
	TurnArgModeFirstResumeLatest TurnArgMode = "first_resume_latest"
	// TurnArgModeFirstResumeByID resumes using an explicit session id.
	TurnArgModeFirstResumeByID TurnArgMode = "first_resume_by_id"
	// TurnArgModeFollowup is used for non-first turns.
	TurnArgModeFollowup TurnArgMode = "followup"
)

type TurnRunner added in v0.1.66

type TurnRunner interface {
	AgentExecutionRunner
	BuildTurnArgs(prompt string, env map[string]string, mode TurnArgMode) []string
}

TurnRunner extends AgentExecutionRunner with per-turn execution. Runners implementing this interface execute each turn as a separate process in the sandbox, with the Go worker managing the lifecycle between turns — no shell loop, no stdin pipe.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker represents a airstore worker that: 1. Registers with the gateway 2. Pulls tasks from the queue 3. Runs tasks in gVisor sandboxes

func NewWorker

func NewWorker() (*Worker, error)

NewWorker creates a new Worker instance

func (*Worker) PoolName

func (w *Worker) PoolName() string

PoolName returns the worker's pool name

func (*Worker) Run

func (w *Worker) Run() error

Run starts the worker and blocks until shutdown

func (*Worker) WorkerId

func (w *Worker) WorkerId() string

WorkerId returns the worker's ID

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL