orchestration

package
v0.1.84 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 2, 2026 License: AGPL-3.0, AGPL-3.0-or-later Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AgentRunnerClaudeCode = "claude_code"

	AgentProviderClaude = "claude"
)
View Source
const (
	ExecHostSandbox ExecHost = "sandbox"

	ExecSecurityDeny      ExecSecurity = "deny"
	ExecSecurityAllowlist ExecSecurity = "allowlist"
	ExecSecurityFull      ExecSecurity = "full"

	ExecAskOff    ExecAsk = "off"
	ExecAskOnMiss ExecAsk = "on-miss"
	ExecAskAlways ExecAsk = "always"

	RuntimeTypeGvisor = "gvisor"
	RuntimeTypeRunc   = "runc"

	WorkspaceAccessNone = "none"
	WorkspaceAccessRO   = "ro"
	WorkspaceAccessRW   = "rw"
)
View Source
const (
	DefaultRetryMaxAttempts = 2
	DefaultRetryDelayMs     = 0
)

Variables

This section is empty.

Functions

func DefaultAgentConfig added in v0.1.66

func DefaultAgentConfig(agentKey string) map[string]any

DefaultAgentConfig returns the default config for a new agent with the given key. This is used by the API, SDK, and frontend to preview defaults before creation.

func ExecutionClassKey

func ExecutionClassKey(workspaceID uint, agentID, lane *string, p RunExecutionPolicy) string

func ExtractLeaseExecutionID added in v0.1.74

func ExtractLeaseExecutionID(owner string) string

ExtractLeaseExecutionID parses the execution ID from a lease owner string formatted as "workerID:executionID".

func ReconcileStaleSessionLease added in v0.1.74

func ReconcileStaleSessionLease(
	ctx context.Context,
	backend repository.BackendRepository,
	terminalIO repository.TerminalIORepository,
	workspaceID uint,
	sessionID, owner string,
) bool

ReconcileStaleSessionLease checks whether the current lease owner references a terminal/missing execution. If so it force-releases the lease and returns true. Owner format is "workerID:executionID". Exported so gateway/services can reuse the same logic.

func ValidateAgentCommandParams

func ValidateAgentCommandParams(v *AgentCommandParams) error

func ValidateRunExecutionPolicy

func ValidateRunExecutionPolicy(p RunExecutionPolicy) error

Types

type AgentAPI

type AgentAPI struct {
	// contains filtered or unexported fields
}

AgentAPI is the shared application layer for agent/task/run flows. Transport layers (gRPC/HTTP) should call this directly.

func NewAgentAPI

func NewAgentAPI(
	backend repository.BackendRepository,
	runtime *AgentService,
) *AgentAPI

func (*AgentAPI) AcceptAgentCommand

func (a *AgentAPI) AcceptAgentCommand(
	ctx context.Context,
	workspaceID uint,
	params AgentCommandParams,
) (*types.AgentTask, bool, error)

func (*AgentAPI) ArchiveTask added in v0.1.71

func (a *AgentAPI) ArchiveTask(ctx context.Context, workspaceID uint, taskID string) error

func (*AgentAPI) CancelRun

func (a *AgentAPI) CancelRun(ctx context.Context, workspaceID uint, runID string) error

func (*AgentAPI) CancelTask

func (a *AgentAPI) CancelTask(ctx context.Context, workspaceID uint, taskID string) error

func (*AgentAPI) CreateAgent

func (a *AgentAPI) CreateAgent(
	ctx context.Context,
	workspaceID uint,
	agentKey string,
	name string,
	config map[string]any,
	active *bool,
) (*types.AgentProfile, error)

func (*AgentAPI) DeleteAgent added in v0.1.73

func (a *AgentAPI) DeleteAgent(ctx context.Context, workspaceID uint, agentID string) error

func (*AgentAPI) EnqueueRunInput

func (a *AgentAPI) EnqueueRunInput(
	ctx context.Context,
	workspaceID uint,
	runID string,
	queueMode types.AgentQueueMode,
	message string,
	idempotencyKey string,
) (*types.AgentTask, bool, types.RunInputDeliveryOutcome, error)

func (*AgentAPI) GetAgent

func (a *AgentAPI) GetAgent(ctx context.Context, workspaceID uint, agentID string) (*types.AgentProfile, error)

func (*AgentAPI) GetDefaultConfig added in v0.1.66

func (a *AgentAPI) GetDefaultConfig(agentKey string) map[string]any

func (*AgentAPI) GetRun

func (a *AgentAPI) GetRun(ctx context.Context, workspaceID uint, runID string) (*types.AgentRun, error)

func (*AgentAPI) GetRunInteraction added in v0.1.73

func (a *AgentAPI) GetRunInteraction(ctx context.Context, workspaceID uint, runID string) (*types.RunInteraction, error)

func (*AgentAPI) GetTask

func (a *AgentAPI) GetTask(ctx context.Context, workspaceID uint, taskID string) (*types.AgentTask, error)

func (*AgentAPI) GetTaskLogs

func (a *AgentAPI) GetTaskLogs(ctx context.Context, workspaceID uint, taskID string) ([]common.TaskLogEntry, error)

func (*AgentAPI) ListAgents

func (a *AgentAPI) ListAgents(ctx context.Context, workspaceID uint) ([]*types.AgentProfile, error)

func (*AgentAPI) ListRunEvents

func (a *AgentAPI) ListRunEvents(ctx context.Context, workspaceID uint, runID string) ([]map[string]any, error)

func (*AgentAPI) ListRunSnapshots

func (a *AgentAPI) ListRunSnapshots(ctx context.Context, workspaceID uint, runID string, limit int) ([]*types.AgentRunSnapshot, error)

func (*AgentAPI) ListRuns

func (a *AgentAPI) ListRuns(ctx context.Context, workspaceID uint, limit int) ([]*types.AgentRun, error)

func (*AgentAPI) ListRunsFiltered added in v0.1.66

func (a *AgentAPI) ListRunsFiltered(
	ctx context.Context,
	workspaceID uint,
	filter types.AgentRunListFilter,
) ([]*types.AgentRun, string, bool, error)

func (*AgentAPI) ListTaskLogs added in v0.1.66

func (a *AgentAPI) ListTaskLogs(
	ctx context.Context,
	workspaceID uint,
	taskID string,
	seqNum int64,
) ([]common.TaskLogEntry, int64, error)

func (*AgentAPI) ListTasks

func (a *AgentAPI) ListTasks(ctx context.Context, workspaceID uint, limit int) ([]*types.AgentTask, error)

func (*AgentAPI) ListTasksFiltered added in v0.1.66

func (a *AgentAPI) ListTasksFiltered(
	ctx context.Context,
	workspaceID uint,
	filter types.AgentTaskListFilter,
) ([]*types.AgentTask, string, bool, error)

func (*AgentAPI) StreamTaskEvents added in v0.1.66

func (a *AgentAPI) StreamTaskEvents(
	ctx context.Context,
	workspaceID uint,
	taskID string,
	logCursor int64,
	runEventCursor int,
	cursorRunID string,
) (*TaskEventBatch, error)

func (*AgentAPI) UpdateAgent added in v0.1.66

func (a *AgentAPI) UpdateAgent(
	ctx context.Context,
	workspaceID uint,
	agentID string,
	name *string,
	config map[string]any,
	active *bool,
) (*types.AgentProfile, error)

type AgentCommandParams

type AgentCommandParams struct {
	Message           string              `json:"message"`
	AgentID           *string             `json:"agent_id,omitempty"`
	HookID            *uint               `json:"hook_id,omitempty"`
	SessionID         string              `json:"session_id"`
	SessionKey        *string             `json:"session_key,omitempty"`
	Deliver           *bool               `json:"deliver,omitempty"`
	TimeoutMs         *int                `json:"timeout_ms,omitempty"`
	Policy            *RunExecutionPolicy `json:"policy,omitempty"`
	Lane              *string             `json:"lane,omitempty"`
	ExtraSystemPrompt *string             `json:"extra_system_prompt,omitempty"`
	InputProvenance   *InputProvenance    `json:"input_provenance,omitempty"`
	Routing           RoutingContext      `json:"routing"`
	Attachments       []map[string]any    `json:"attachments,omitempty"`
	IdempotencyKey    string              `json:"idempotency_key"`
	Label             *string             `json:"label,omitempty"`
	SpawnedBy         *string             `json:"spawned_by,omitempty"`
}

type AgentService

type AgentService struct {
	// contains filtered or unexported fields
}

func NewAgentService

func NewAgentService(
	ctx context.Context,
	backend repository.BackendRepository,
	taskQueue repository.TaskQueue,
	redis *common.RedisClient,
	s2 *common.S2Client,
	defaultImage string,
) *AgentService

func (*AgentService) AcceptAgentCommand

func (s *AgentService) AcceptAgentCommand(
	ctx context.Context,
	workspaceID uint,
	params AgentCommandParams,
) (*types.AgentTask, bool, error)

func (*AgentService) AcceptRunInput

func (s *AgentService) AcceptRunInput(
	ctx context.Context,
	workspaceID uint,
	targetRunID string,
	queueMode types.AgentQueueMode,
	message string,
	idempotencyKey string,
) (*types.AgentTask, bool, types.RunInputDeliveryOutcome, error)

func (*AgentService) GetRunInteraction added in v0.1.73

func (s *AgentService) GetRunInteraction(ctx context.Context, workspaceID uint, runID string) (*types.RunInteraction, error)

func (*AgentService) ListRunPendingInputs added in v0.1.73

func (s *AgentService) ListRunPendingInputs(ctx context.Context, runID string) ([]types.PendingInput, error)

ListRunPendingInputs returns user messages sitting in the input buffer for the run's active execution as determined by backend interaction state.

func (*AgentService) Start

func (s *AgentService) Start(ctx context.Context)

type AttemptEvent

type AttemptEvent struct {
	AttemptID string
	RunID     string
	Change    int
	Status    string
	Ts        int64
}

type AttemptEventManager

type AttemptEventManager struct {
	// contains filtered or unexported fields
}

func NewAttemptEventManager

func NewAttemptEventManager(
	defaultCtx context.Context,
	instanceFactory func(ctx context.Context, instanceKey string) (IExecutionInstance, error),
) *AttemptEventManager

func (*AttemptEventManager) RouteAttemptEvent

func (m *AttemptEventManager) RouteAttemptEvent(ctx context.Context, instanceKey string, event AttemptEvent) error

func (*AttemptEventManager) RouteDispatchTarget

func (m *AttemptEventManager) RouteDispatchTarget(ctx context.Context, instanceKey string, target int) error

type ExecAsk

type ExecAsk string

type ExecHost

type ExecHost string

type ExecSecurity

type ExecSecurity string

type ExecutionInstance

type ExecutionInstance struct {
	Ctx        context.Context
	CancelFunc context.CancelFunc

	AttemptEventChan  chan AttemptEvent
	DispatchEventChan chan int

	InstanceLockKey        string
	FailedAttemptThreshold int
	// contains filtered or unexported fields
}

func (*ExecutionInstance) ConsumeAttemptEvent

func (e *ExecutionInstance) ConsumeAttemptEvent(ev AttemptEvent)

func (*ExecutionInstance) ConsumeDispatchTarget

func (e *ExecutionInstance) ConsumeDispatchTarget(target int)

func (*ExecutionInstance) DesiredDispatchTarget

func (e *ExecutionInstance) DesiredDispatchTarget() int

func (*ExecutionInstance) HandleDispatchEvent

func (e *ExecutionInstance) HandleDispatchEvent(target int) error

func (*ExecutionInstance) Sync

func (e *ExecutionInstance) Sync() error

type ExecutionInstanceConfig

type ExecutionInstanceConfig struct {
	InstanceKey            string
	WorkspaceID            uint
	AgentID                *string
	Lane                   *string
	ExecutionClassKey      string
	FailedAttemptThreshold int
	InstanceLockKey        string
}

type ExecutionInstanceController

type ExecutionInstanceController struct {
	// contains filtered or unexported fields
}

func NewExecutionInstanceController

func NewExecutionInstanceController(
	ctx context.Context,
	store ExecutionInstanceStore,
	locker InstanceDispatchLocker,
	instanceLockKeyFunc func(string) string,
) *ExecutionInstanceController

func (*ExecutionInstanceController) EnsureInstance

func (*ExecutionInstanceController) RouteAttemptEvent

func (c *ExecutionInstanceController) RouteAttemptEvent(ctx context.Context, instanceKey string, event AttemptEvent) error

func (*ExecutionInstanceController) RouteDispatchTarget

func (c *ExecutionInstanceController) RouteDispatchTarget(ctx context.Context, instanceKey string, target int) error

type ExecutionInstanceState

type ExecutionInstanceState struct {
	RunningAttempts  int
	PendingAttempts  int
	StoppingAttempts int
	FailedAttempts   []string
}

type ExecutionInstanceStore

type ExecutionInstanceStore interface {
	GetOrCreateExecutionInstance(ctx context.Context, instance *types.AgentExecutionInstance) (*types.AgentExecutionInstance, error)
	GetExecutionInstanceByKey(ctx context.Context, instanceKey string) (*types.AgentExecutionInstance, error)
	UpdateExecutionInstanceState(
		ctx context.Context,
		instanceKey string,
		runningAttempts int,
		pendingAttempts int,
		stoppingAttempts int,
		desiredDispatchConcurrency int,
		status types.AgentExecutionInstanceStatus,
		lastSyncAt *time.Time,
	) error
}

type IExecutionInstance

type IExecutionInstance interface {
	ConsumeAttemptEvent(AttemptEvent)
	ConsumeDispatchTarget(int)
	HandleDispatchEvent(int) error
	Sync() error
}

type InputProvenance

type InputProvenance struct {
	Source        *string `json:"source,omitempty"`
	MessageID     *string `json:"message_id,omitempty"`
	Channel       *string `json:"channel,omitempty"`
	ToolCallID    *string `json:"tool_call_id,omitempty"`
	CorrelationID *string `json:"correlation_id,omitempty"`
}

type InstanceDispatchLocker

type InstanceDispatchLocker interface {
	WithInstanceLock(ctx context.Context, lockKey string, fn func() error) error
}

type RoutingContext

type RoutingContext struct {
	To             *string `json:"to,omitempty"`
	ReplyTo        *string `json:"reply_to,omitempty"`
	Channel        *string `json:"channel,omitempty"`
	ReplyChannel   *string `json:"reply_channel,omitempty"`
	AccountID      *string `json:"account_id,omitempty"`
	ReplyAccountID *string `json:"reply_account_id,omitempty"`
	ThreadID       *string `json:"thread_id,omitempty"`
	GroupID        *string `json:"group_id,omitempty"`
	GroupChannel   *string `json:"group_channel,omitempty"`
	GroupSpace     *string `json:"group_space,omitempty"`
}

type RunExecutionPolicy

type RunExecutionPolicy struct {
	Host            ExecHost        `json:"host"`
	Security        ExecSecurity    `json:"security"`
	Ask             ExecAsk         `json:"ask"`
	RuntimeType     string          `json:"runtime_type"`
	WorkspaceAccess string          `json:"workspace_access"`
	NetworkEnabled  bool            `json:"network_enabled"`
	Interactive     bool            `json:"interactive"`
	Resources       map[string]any  `json:"resources,omitempty"`
	Retry           *RunRetryPolicy `json:"retry,omitempty"`
}

func DefaultRunExecutionPolicy

func DefaultRunExecutionPolicy() RunExecutionPolicy

func NormalizeRunExecutionPolicy

func NormalizeRunExecutionPolicy(p RunExecutionPolicy) RunExecutionPolicy

type RunRetryPolicy

type RunRetryPolicy struct {
	MaxAttempts int `json:"max_attempts,omitempty"`
	DelayMs     int `json:"delay_ms,omitempty"`
}

func NormalizeRunRetryPolicy

func NormalizeRunRetryPolicy(r RunRetryPolicy) RunRetryPolicy

func RetryPolicyOrDefault

func RetryPolicyOrDefault(r *RunRetryPolicy) RunRetryPolicy

type TaskEventBatch added in v0.1.66

type TaskEventBatch struct {
	TaskID             string                `json:"task_id"`
	RunID              *string               `json:"run_id,omitempty"`
	Task               *types.AgentTask      `json:"task,omitempty"`
	Run                *types.AgentRun       `json:"run,omitempty"`
	Interaction        *types.RunInteraction `json:"interaction,omitempty"`
	Logs               []common.TaskLogEntry `json:"logs"`
	RunEvents          []map[string]any      `json:"run_events"`
	PendingInputs      []types.PendingInput  `json:"pending_inputs"`
	NextLogCursor      int64                 `json:"next_log_cursor"`
	NextRunEventCursor int                   `json:"next_run_event_cursor"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL