worker

package
v0.1.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 3, 2026 License: AGPL-3.0, AGPL-3.0-or-later Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultCachePath = "/var/lib/clip/cache"
	DefaultWorkPath  = "/var/lib/clip/work"
	DefaultMountPath = "/var/lib/clip/mnt"
)

Default paths for CLIP storage directories

Variables

This section is empty.

Functions

This section is empty.

Types

type CLIPImageManager

type CLIPImageManager struct {
	// contains filtered or unexported fields
}

CLIPImageManager implements ImageManager using CLIP for lazy-loading images from OCI registries. It manages FUSE mounts with reference counting for efficient image reuse.

func (*CLIPImageManager) Close

func (m *CLIPImageManager) Close() error

Close unmounts all FUSE filesystems and releases resources.

func (*CLIPImageManager) PrepareRootfs

func (m *CLIPImageManager) PrepareRootfs(ctx context.Context, imageRef string) (string, func(), error)

PrepareRootfs converts an OCI image to CLIP format and mounts it as a FUSE filesystem. If the image is already mounted, it increments the reference count and returns the existing mount.

type ConsoleWriter

type ConsoleWriter struct {
	// contains filtered or unexported fields
}

ConsoleWriter writes task output to the worker's console with context.

func NewConsoleWriter

func NewConsoleWriter(taskID, stream string) *ConsoleWriter

NewConsoleWriter creates a writer that logs to the worker console.

func (*ConsoleWriter) Write

func (w *ConsoleWriter) Write(p []byte) (int, error)

type FileWriter

type FileWriter struct {
	// contains filtered or unexported fields
}

FileWriter writes to a file (useful for debugging).

func NewFileWriter

func NewFileWriter(path string) (*FileWriter, error)

NewFileWriter creates a writer that appends to a file.

func (*FileWriter) Close

func (w *FileWriter) Close() error

func (*FileWriter) Write

func (w *FileWriter) Write(p []byte) (int, error)

type ImageManager

type ImageManager interface {
	// PrepareRootfs prepares a rootfs from a container image.
	// Returns the path to the rootfs and a cleanup function.
	PrepareRootfs(ctx context.Context, imageRef string) (rootfsPath string, cleanup func(), err error)

	// Close cleans up all resources.
	Close() error
}

ImageManager provides container image management functionality.

func NewImageManager

func NewImageManager(config types.ImageConfig) (ImageManager, error)

NewImageManager creates a new CLIP-based image manager.

type ManagedSandbox

type ManagedSandbox struct {
	Config        types.SandboxConfig
	State         types.SandboxState
	BundlePath    string
	Cancel        context.CancelFunc
	RootfsCleanup func()                   // Cleanup function for the CLIP rootfs mount
	Overlay       *common.ContainerOverlay // Overlay filesystem for writable layer
	OutputWriter  io.Writer                // Output destination (stdout/stderr combined)
	OutputFlusher func()                   // Flush pending output on completion
}

ManagedSandbox represents a sandbox being managed

type MountConfig

type MountConfig struct {
	// MountDir is the base directory for per-task mounts
	MountDir string

	// CLIBinary is the path to the CLI binary
	CLIBinary string

	// GatewayAddr is the gateway gRPC address
	GatewayAddr string

	// MountReadyTimeout is how long to wait for a mount to be ready
	MountReadyTimeout time.Duration
}

MountConfig configures the mount manager

func DefaultMountConfig

func DefaultMountConfig() MountConfig

DefaultMountConfig returns sensible defaults

type MountManager

type MountManager struct {
	// contains filtered or unexported fields
}

MountManager manages per-task FUSE mounts

func NewMountManager

func NewMountManager(config MountConfig) (*MountManager, error)

NewMountManager creates a new mount manager

func (*MountManager) ActiveMounts

func (m *MountManager) ActiveMounts() int

ActiveMounts returns the number of active mounts

func (*MountManager) CleanupAll

func (m *MountManager) CleanupAll()

CleanupAll unmounts all active mounts (call on shutdown)

func (*MountManager) CleanupStale

func (m *MountManager) CleanupStale(activeTasks map[string]bool) int

CleanupStale removes mounts for tasks that are no longer running

func (*MountManager) GetMountPath

func (m *MountManager) GetMountPath(taskID string) string

GetMountPath returns the mount path for a task, or empty if not mounted

func (*MountManager) Mount

func (m *MountManager) Mount(ctx context.Context, taskID, token string) (string, error)

Mount creates a FUSE mount for a task using the given token Returns the mount path that can be bind-mounted into containers

func (*MountManager) Unmount

func (m *MountManager) Unmount(taskID string) error

Unmount stops the FUSE mount for a task and cleans up

type OutputConfig

type OutputConfig struct {
	TaskID   string
	S2Client *streams.S2Client
	Console  bool // Write to worker stdout
}

OutputConfig configures task output destinations.

type S2Writer

type S2Writer struct {
	// contains filtered or unexported fields
}

S2Writer writes task output to S2 streams.

func NewS2Writer

func NewS2Writer(ctx context.Context, client *streams.S2Client, taskID, stream string) *S2Writer

NewS2Writer creates a writer that appends to S2.

func (*S2Writer) Write

func (w *S2Writer) Write(p []byte) (int, error)

type SandboxConfig

type SandboxConfig struct {
	// Paths configuration
	Paths types.WorkerPaths

	// WorkerID is the unique ID of this worker
	WorkerID string

	// GatewayGRPCAddr is the gateway gRPC address
	GatewayGRPCAddr string

	// AuthToken is the worker's authentication token (fallback)
	AuthToken string

	// EnableFilesystem enables FUSE filesystem mounts
	EnableFilesystem bool

	// AnthropicAPIKey for Claude Code tasks (from config, not env var)
	AnthropicAPIKey string
}

SandboxConfig holds configuration for the sandbox manager

type SandboxManager

type SandboxManager struct {
	// contains filtered or unexported fields
}

SandboxManager manages the lifecycle of sandboxes on a worker

func NewSandboxManager

func NewSandboxManager(ctx context.Context, cfg SandboxManagerConfig) (*SandboxManager, error)

NewSandboxManager creates a new SandboxManager

func (*SandboxManager) Close

func (m *SandboxManager) Close() error

Close shuts down the sandbox manager and all sandboxes

func (*SandboxManager) Create

Create creates a new sandbox from the given config

func (*SandboxManager) Delete

func (m *SandboxManager) Delete(sandboxID string, force bool) error

Delete removes a sandbox and cleans up resources

func (*SandboxManager) Get

func (m *SandboxManager) Get(sandboxID string) (*types.SandboxState, error)

Get returns the state of a sandbox

func (*SandboxManager) List

func (m *SandboxManager) List() []types.SandboxState

List returns all managed sandboxes

func (*SandboxManager) RunTask

func (m *SandboxManager) RunTask(ctx context.Context, task types.Task) (*types.TaskResult, error)

RunTask creates and runs a sandbox for a task, returning when complete

func (*SandboxManager) SetOutput

func (m *SandboxManager) SetOutput(sandboxID string, writer io.Writer, flusher func()) error

SetOutput configures the output writer and flusher for a sandbox. Must be called before Start.

func (*SandboxManager) Start

func (m *SandboxManager) Start(sandboxID string) error

Start starts a created sandbox

func (*SandboxManager) Stop

func (m *SandboxManager) Stop(sandboxID string, force bool) error

Stop stops a running sandbox

type SandboxManagerConfig

type SandboxManagerConfig struct {
	RuntimeType      string // "runc" or "gvisor"
	BundleDir        string
	StateDir         string
	MountDir         string // Directory for per-task FUSE mounts
	WorkerMount      string // Global worker FUSE mount path
	WorkerID         string
	GatewayGRPCAddr  string            // gRPC address for gateway (e.g., "airstore-gateway:1993")
	AuthToken        string            // Token for authenticating with gateway
	FilesystemBinary string            // Path to filesystem binary on host
	EnableFilesystem bool              // Whether to mount the airstore filesystem
	ImageConfig      types.ImageConfig // Image management configuration (CLIP + S3)
	RuntimeConfig    runtime.Config

	// S2 configuration for log streaming
	S2Token string
	S2Basin string

	// Anthropic API key for Claude Code tasks (from config)
	AnthropicAPIKey string
}

SandboxManagerConfig configures the SandboxManager

type TaskOutput

type TaskOutput struct {
	// contains filtered or unexported fields
}

TaskOutput handles task stdout/stderr with multiple destinations. Zero allocation for the common case of just writing.

func NewOutputPair

func NewOutputPair(ctx context.Context, cfg OutputConfig) (stdout, stderr *TaskOutput)

NewOutputPair creates stdout and stderr writers for a task.

func NewTaskOutput

func NewTaskOutput(taskID, stream string, writers ...io.Writer) *TaskOutput

NewTaskOutput creates an output handler for a task stream.

func (*TaskOutput) Flush

func (o *TaskOutput) Flush()

Flush writes any remaining buffered content.

func (*TaskOutput) Write

func (o *TaskOutput) Write(p []byte) (int, error)

Write implements io.Writer. Buffers partial lines, flushes complete ones.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker represents a airstore worker that: 1. Registers with the gateway 2. Pulls tasks from the queue 3. Runs tasks in gVisor sandboxes

func NewWorker

func NewWorker() (*Worker, error)

NewWorker creates a new Worker instance

func (*Worker) PoolName

func (w *Worker) PoolName() string

PoolName returns the worker's pool name

func (*Worker) Run

func (w *Worker) Run() error

Run starts the worker and blocks until shutdown

func (*Worker) WorkerId

func (w *Worker) WorkerId() string

WorkerId returns the worker's ID

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL