Documentation
¶
Index ¶
- Constants
- func DefaultAgentConfig(agentKey string) map[string]any
- func ExecutionClassKey(workspaceID uint, agentID, lane *string, p RunExecutionPolicy) string
- func ExtractLeaseExecutionID(owner string) string
- func NextCronTime(expr string, ref time.Time, tz string) (time.Time, error)
- func ReconcileStaleSessionLease(ctx context.Context, backend repository.BackendRepository, ...) bool
- func ToRunExecutionResources(p RunExecutionPolicy) *types.RunExecutionResources
- func ToRunExecutionType(p RunExecutionPolicy) types.RunExecutionType
- func ValidateAgentCommandParams(v *AgentCommandParams) error
- func ValidateRunExecutionPolicy(p RunExecutionPolicy) error
- type AgentAPI
- func (a *AgentAPI) AcceptAgentCommand(ctx context.Context, workspaceID uint, params AgentCommandParams) (*types.AgentTask, bool, error)
- func (a *AgentAPI) ArchiveTask(ctx context.Context, workspaceID uint, taskID string) error
- func (a *AgentAPI) CancelRun(ctx context.Context, workspaceID uint, runID string) error
- func (a *AgentAPI) CancelTask(ctx context.Context, workspaceID uint, taskID string) error
- func (a *AgentAPI) CreateAgent(ctx context.Context, workspaceID uint, agentKey string, name string, ...) (*types.AgentProfile, error)
- func (a *AgentAPI) CreateSchedule(ctx context.Context, workspaceID uint, agentID string, cronExpr string, ...) (*types.ScheduledTask, error)
- func (a *AgentAPI) DeleteAgent(ctx context.Context, workspaceID uint, agentID string) error
- func (a *AgentAPI) DeleteChannelBinding(ctx context.Context, workspaceID uint, agentID *string, channelType string) error
- func (a *AgentAPI) DeleteSchedule(ctx context.Context, workspaceID uint, externalID string) error
- func (a *AgentAPI) EnqueueRunInput(ctx context.Context, workspaceID uint, runID string, ...) (*types.AgentTask, bool, types.RunInputDeliveryOutcome, error)
- func (a *AgentAPI) GetAgent(ctx context.Context, workspaceID uint, agentID string) (*types.AgentProfile, error)
- func (a *AgentAPI) GetAgentStats(ctx context.Context, workspaceID uint, agentID string) (*types.AgentStats, error)
- func (a *AgentAPI) GetDefaultConfig(agentKey string) map[string]any
- func (a *AgentAPI) GetRun(ctx context.Context, workspaceID uint, runID string) (*types.AgentRun, error)
- func (a *AgentAPI) GetRunInteraction(ctx context.Context, workspaceID uint, runID string) (*types.RunInteraction, error)
- func (a *AgentAPI) GetSchedule(ctx context.Context, workspaceID uint, externalID string) (*types.ScheduledTask, error)
- func (a *AgentAPI) GetTask(ctx context.Context, workspaceID uint, taskID string) (*types.AgentTask, error)
- func (a *AgentAPI) GetTaskLogs(ctx context.Context, workspaceID uint, taskID string) ([]common.TaskLogEntry, error)
- func (a *AgentAPI) GetWorkspace(ctx context.Context, workspaceID uint) (*types.Workspace, error)
- func (a *AgentAPI) ListAgents(ctx context.Context, workspaceID uint) ([]*types.AgentProfile, error)
- func (a *AgentAPI) ListChannelBindings(ctx context.Context, workspaceID uint, agentID *string) ([]*types.ChannelBinding, error)
- func (a *AgentAPI) ListRunEvents(ctx context.Context, workspaceID uint, runID string) ([]map[string]any, error)
- func (a *AgentAPI) ListRunSnapshots(ctx context.Context, workspaceID uint, runID string, limit int) ([]*types.AgentRunSnapshot, error)
- func (a *AgentAPI) ListRuns(ctx context.Context, workspaceID uint, limit int) ([]*types.AgentRun, error)
- func (a *AgentAPI) ListRunsFiltered(ctx context.Context, workspaceID uint, filter types.AgentRunListFilter) ([]*types.AgentRun, string, bool, error)
- func (a *AgentAPI) ListSchedules(ctx context.Context, workspaceID uint) ([]*types.ScheduledTask, error)
- func (a *AgentAPI) ListSchedulesByView(ctx context.Context, workspaceID uint, sourceViewID string) ([]*types.ScheduledTask, error)
- func (a *AgentAPI) ListSubtasks(ctx context.Context, parentTaskID string) ([]*types.AgentTask, error)
- func (a *AgentAPI) ListSubtasksByOutputIDs(ctx context.Context, outputIDs []string) ([]*types.AgentTask, error)
- func (a *AgentAPI) ListTaskLogs(ctx context.Context, workspaceID uint, taskID string, seqNum int64) ([]common.TaskLogEntry, int64, error)
- func (a *AgentAPI) ListTasks(ctx context.Context, workspaceID uint, limit int) ([]*types.AgentTask, error)
- func (a *AgentAPI) ListTasksFiltered(ctx context.Context, workspaceID uint, filter types.AgentTaskListFilter) ([]*types.AgentTask, string, bool, error)
- func (a *AgentAPI) RetryTask(ctx context.Context, workspaceID uint, taskID string) error
- func (a *AgentAPI) StreamTaskEvents(ctx context.Context, workspaceID uint, taskID string, logCursor int64, ...) (*TaskEventBatch, error)
- func (a *AgentAPI) SubmitTaskInput(ctx context.Context, workspaceID uint, taskID string, kind types.InputKind, ...) (*types.AgentTask, error)
- func (a *AgentAPI) SubscribeExecutionLogs(ctx context.Context, executionID string) (<-chan []byte, func(), error)
- func (a *AgentAPI) SubscribeRunEvents(ctx context.Context, runID string) (<-chan struct{}, func(), error)
- func (a *AgentAPI) SubscribeTaskLive(ctx context.Context, taskID string) (<-chan struct{}, func(), error)
- func (a *AgentAPI) SubscribeWorkspaceLive(ctx context.Context, workspaceID uint) (<-chan struct{}, func(), error)
- func (a *AgentAPI) UpdateAgent(ctx context.Context, workspaceID uint, agentID string, name *string, ...) (*types.AgentProfile, error)
- func (a *AgentAPI) UpdateSchedule(ctx context.Context, workspaceID uint, externalID string, cronExpr *string, ...) (*types.ScheduledTask, error)
- func (a *AgentAPI) UpdateTask(ctx context.Context, workspaceID uint, taskID string, params TaskUpdateParams) (*types.AgentTask, error)
- func (a *AgentAPI) UpsertChannelBinding(ctx context.Context, binding *types.ChannelBinding) error
- func (a *AgentAPI) WorkspaceLiveBatch(ctx context.Context, workspaceID uint) (*WorkspaceLiveBatch, error)
- type AgentCommandParams
- type AgentService
- func (s *AgentService) AcceptAgentCommand(ctx context.Context, workspaceID uint, params AgentCommandParams) (*types.AgentTask, bool, error)
- func (s *AgentService) AcceptRunInput(ctx context.Context, workspaceID uint, targetRunID string, ...) (*types.AgentTask, bool, types.RunInputDeliveryOutcome, error)
- func (s *AgentService) AcceptTaskInput(ctx context.Context, workspaceID uint, taskID string, kind types.InputKind, ...) (*types.AgentTask, error)
- func (s *AgentService) GetRunInteraction(ctx context.Context, workspaceID uint, runID string) (*types.RunInteraction, error)
- func (s *AgentService) SetSourceWatchRegistrar(registrar SourceWatchRegistrar)
- func (s *AgentService) SetToolExecutor(executor ToolExecutor)
- func (s *AgentService) Start(ctx context.Context)
- type AttemptEvent
- type AttemptEventManager
- type CronScheduler
- type DispatchEnvelope
- type ExecAsk
- type ExecHost
- type ExecSecurity
- type ExecutionInstance
- type ExecutionInstanceConfig
- type ExecutionInstanceController
- func (c *ExecutionInstanceController) EnsureInstance(ctx context.Context, cfg ExecutionInstanceConfig) (IExecutionInstance, error)
- func (c *ExecutionInstanceController) RouteAttemptEvent(ctx context.Context, instanceKey string, event AttemptEvent) error
- func (c *ExecutionInstanceController) RouteDispatchTarget(ctx context.Context, instanceKey string, target int) error
- type ExecutionInstanceState
- type ExecutionInstanceStore
- type IExecutionInstance
- type InputProvenance
- type InstanceDispatchLocker
- type OutcomeProjector
- type ResumeBarrier
- type ResumeDirective
- type RoutingContext
- type RunExecutionPolicy
- type RunFactory
- func (f *RunFactory) CreateAttemptExecutionTask(ctx context.Context, run *types.AgentRun, runPolicy RunExecutionPolicy, ...) (*types.AgentRunAttempt, error)
- func (f *RunFactory) MaterializeRun(ctx context.Context, task *types.AgentTask) (*types.AgentRun, RunExecutionPolicy, string, error)
- func (f *RunFactory) ResolveRunAgentConfig(ctx context.Context, run *types.AgentRun, payload map[string]any) map[string]any
- type RunFactoryConfig
- type RunResultEnvelope
- type RunRetryPolicy
- type RuntimeLoops
- type SettleOpts
- type SourceWatchRegistrar
- type TaskCommandPayload
- type TaskEventBatch
- type TaskFlows
- func (f *TaskFlows) AcceptAgentCommand(ctx context.Context, workspaceID uint, params AgentCommandParams) (*types.AgentTask, bool, error)
- func (f *TaskFlows) AcceptRunInput(ctx context.Context, workspaceID uint, targetRunID string, ...) (*types.AgentTask, bool, types.RunInputDeliveryOutcome, error)
- func (f *TaskFlows) AcceptTaskInput(ctx context.Context, workspaceID uint, taskID string, kind types.InputKind, ...) (*types.AgentTask, error)
- func (f *TaskFlows) RunPendingSweep(ctx context.Context)
- func (f *TaskFlows) SetToolExecutor(executor ToolExecutor)
- type TaskLifecycle
- func (lc *TaskLifecycle) Cancel(ctx context.Context, taskID string) error
- func (lc *TaskLifecycle) Dispatch(ctx context.Context, taskID, runID string) error
- func (lc *TaskLifecycle) Done(ctx context.Context, taskID string, targetRunID *string) error
- func (lc *TaskLifecycle) Drop(ctx context.Context, taskID string, reason string) error
- func (lc *TaskLifecycle) Queue(ctx context.Context, taskID string, targetRunID *string) error
- func (lc *TaskLifecycle) Resume(ctx context.Context, taskID, runID string) (bool, error)
- func (lc *TaskLifecycle) Retry(ctx context.Context, taskID string) error
- func (lc *TaskLifecycle) SetOutcomeProjector(projector *OutcomeProjector)
- func (lc *TaskLifecycle) Settle(ctx context.Context, runID string, opts *SettleOpts) error
- func (lc *TaskLifecycle) TransitionLive(ctx context.Context, update types.TaskLiveUpdate) (bool, error)
- type TaskUpdateParams
- type ToolExecutor
- type WakeDirective
- type WorkspaceLiveBatch
Constants ¶
const ( AgentRunnerClaudeCode = "claude_code" AgentRunnerAir = "air" AgentProviderClaude = "claude" AgentProviderAir = "air" )
const ( ExecHostSandbox ExecHost = "sandbox" ExecSecurityDeny ExecSecurity = "deny" ExecSecurityAllowlist ExecSecurity = "allowlist" ExecSecurityFull ExecSecurity = "full" ExecAskOff ExecAsk = "off" ExecAskOnMiss ExecAsk = "on-miss" ExecAskAlways ExecAsk = "always" RuntimeTypeGvisor = "gvisor" RuntimeTypeRunc = "runc" WorkspaceAccessNone = "none" WorkspaceAccessRO = "ro" WorkspaceAccessRW = "rw" )
const ( DefaultRetryMaxAttempts = 2 DefaultRetryDelayMs = 0 )
Variables ¶
This section is empty.
Functions ¶
func DefaultAgentConfig ¶ added in v0.1.66
DefaultAgentConfig returns the default config for a new agent with the given key. This is used by the API, SDK, and frontend to preview defaults before creation.
func ExecutionClassKey ¶
func ExecutionClassKey(workspaceID uint, agentID, lane *string, p RunExecutionPolicy) string
func ExtractLeaseExecutionID ¶ added in v0.1.74
ExtractLeaseExecutionID parses the execution ID from a lease owner string formatted as "workerID:executionID".
func NextCronTime ¶ added in v0.1.89
NextCronTime parses a standard 5-field cron expression and returns the next fire time after ref, interpreted in the given IANA timezone. An empty or invalid timezone falls back to UTC.
func ReconcileStaleSessionLease ¶ added in v0.1.74
func ReconcileStaleSessionLease( ctx context.Context, backend repository.BackendRepository, terminalIO repository.TerminalIORepository, workspaceID uint, sessionID, owner string, ) bool
ReconcileStaleSessionLease checks whether the current lease owner references a terminal/missing execution. If so it force-releases the lease and returns true. Owner format is "workerID:executionID". Exported so gateway/services can reuse the same logic.
func ToRunExecutionResources ¶
func ToRunExecutionResources(p RunExecutionPolicy) *types.RunExecutionResources
func ToRunExecutionType ¶
func ToRunExecutionType(p RunExecutionPolicy) types.RunExecutionType
func ValidateAgentCommandParams ¶
func ValidateAgentCommandParams(v *AgentCommandParams) error
func ValidateRunExecutionPolicy ¶
func ValidateRunExecutionPolicy(p RunExecutionPolicy) error
Types ¶
type AgentAPI ¶
type AgentAPI struct {
// contains filtered or unexported fields
}
AgentAPI is the shared application layer for agent/task/run flows. Transport layers (gRPC/HTTP) should call this directly.
func NewAgentAPI ¶
func NewAgentAPI( backend repository.BackendRepository, runtime *AgentService, ) *AgentAPI
func (*AgentAPI) AcceptAgentCommand ¶
func (*AgentAPI) ArchiveTask ¶ added in v0.1.71
func (*AgentAPI) CancelTask ¶
func (*AgentAPI) CreateAgent ¶
func (*AgentAPI) CreateSchedule ¶ added in v0.1.89
func (*AgentAPI) DeleteAgent ¶ added in v0.1.73
func (*AgentAPI) DeleteChannelBinding ¶ added in v0.1.96
func (*AgentAPI) DeleteSchedule ¶ added in v0.1.89
func (*AgentAPI) EnqueueRunInput ¶
func (*AgentAPI) GetAgentStats ¶ added in v0.1.96
func (*AgentAPI) GetDefaultConfig ¶ added in v0.1.66
func (*AgentAPI) GetRunInteraction ¶ added in v0.1.73
func (*AgentAPI) GetSchedule ¶ added in v0.1.89
func (*AgentAPI) GetTaskLogs ¶
func (*AgentAPI) GetWorkspace ¶ added in v0.1.96
func (*AgentAPI) ListAgents ¶
func (*AgentAPI) ListChannelBindings ¶ added in v0.1.96
func (*AgentAPI) ListRunEvents ¶
func (*AgentAPI) ListRunSnapshots ¶
func (*AgentAPI) ListRunsFiltered ¶ added in v0.1.66
func (*AgentAPI) ListSchedules ¶ added in v0.1.89
func (*AgentAPI) ListSchedulesByView ¶ added in v0.1.112
func (*AgentAPI) ListSubtasks ¶ added in v0.1.110
func (*AgentAPI) ListSubtasksByOutputIDs ¶ added in v0.1.110
func (*AgentAPI) ListTaskLogs ¶ added in v0.1.66
func (*AgentAPI) ListTasksFiltered ¶ added in v0.1.66
func (*AgentAPI) StreamTaskEvents ¶ added in v0.1.66
func (*AgentAPI) SubmitTaskInput ¶ added in v0.1.98
func (*AgentAPI) SubscribeExecutionLogs ¶ added in v0.1.98
func (*AgentAPI) SubscribeRunEvents ¶ added in v0.1.98
func (*AgentAPI) SubscribeTaskLive ¶ added in v0.1.98
func (*AgentAPI) SubscribeWorkspaceLive ¶ added in v0.1.98
func (*AgentAPI) UpdateAgent ¶ added in v0.1.66
func (*AgentAPI) UpdateSchedule ¶ added in v0.1.89
func (*AgentAPI) UpdateTask ¶ added in v0.1.96
func (*AgentAPI) UpsertChannelBinding ¶ added in v0.1.96
func (*AgentAPI) WorkspaceLiveBatch ¶ added in v0.1.98
type AgentCommandParams ¶
type AgentCommandParams struct {
Message string `json:"message"`
AgentID *string `json:"agent_id,omitempty"`
HookID *uint `json:"hook_id,omitempty"`
SessionID string `json:"session_id"`
SessionKey *string `json:"session_key,omitempty"`
Deliver *bool `json:"deliver,omitempty"`
TimeoutMs *int `json:"timeout_ms,omitempty"`
Policy *RunExecutionPolicy `json:"policy,omitempty"`
Lane *string `json:"lane,omitempty"`
ExtraSystemPrompt *string `json:"extra_system_prompt,omitempty"`
InputProvenance *InputProvenance `json:"input_provenance,omitempty"`
Routing RoutingContext `json:"routing"`
Attachments []map[string]any `json:"attachments,omitempty"`
IdempotencyKey string `json:"idempotency_key"`
Label *string `json:"label,omitempty"`
SpawnedBy *string `json:"spawned_by,omitempty"`
Priority string `json:"priority,omitempty"`
BudgetUSD *float64 `json:"budget_usd,omitempty"`
ParentTaskID *string `json:"parent_task_id,omitempty"`
SourceViewID *string `json:"source_view_id,omitempty"`
DispatchDelay time.Duration `json:"-"`
}
type AgentService ¶
type AgentService struct {
// contains filtered or unexported fields
}
func NewAgentService ¶
func NewAgentService( ctx context.Context, backend repository.BackendRepository, taskQueue repository.TaskQueue, redis *common.RedisClient, s2 *common.S2Client, defaultImage string, ) *AgentService
func (*AgentService) AcceptAgentCommand ¶
func (s *AgentService) AcceptAgentCommand( ctx context.Context, workspaceID uint, params AgentCommandParams, ) (*types.AgentTask, bool, error)
func (*AgentService) AcceptRunInput ¶
func (s *AgentService) AcceptRunInput( ctx context.Context, workspaceID uint, targetRunID string, queueMode types.AgentQueueMode, message string, idempotencyKey string, ) (*types.AgentTask, bool, types.RunInputDeliveryOutcome, error)
AcceptRunInput is the legacy run-scoped entry point. It resolves the origin task from the run and delegates to AcceptTaskInput for durable delivery.
func (*AgentService) AcceptTaskInput ¶ added in v0.1.98
func (s *AgentService) AcceptTaskInput( ctx context.Context, workspaceID uint, taskID string, kind types.InputKind, action *types.TaskInputAction, message string, idempotencyKey string, items []types.ItemDecision, ) (*types.AgentTask, error)
AcceptTaskInput appends a durable follow-up input to the task's inbox and attempts to deliver it to whichever run is currently active. If no run is live (terminal or interaction closed), the task is requeued for a new dispatch.
func (*AgentService) GetRunInteraction ¶ added in v0.1.73
func (s *AgentService) GetRunInteraction(ctx context.Context, workspaceID uint, runID string) (*types.RunInteraction, error)
func (*AgentService) SetSourceWatchRegistrar ¶ added in v0.1.110
func (s *AgentService) SetSourceWatchRegistrar(registrar SourceWatchRegistrar)
func (*AgentService) SetToolExecutor ¶ added in v0.1.131
func (s *AgentService) SetToolExecutor(executor ToolExecutor)
func (*AgentService) Start ¶
func (s *AgentService) Start(ctx context.Context)
type AttemptEvent ¶
type AttemptEventManager ¶
type AttemptEventManager struct {
// contains filtered or unexported fields
}
func NewAttemptEventManager ¶
func NewAttemptEventManager( defaultCtx context.Context, instanceFactory func(ctx context.Context, instanceKey string) (IExecutionInstance, error), ) *AttemptEventManager
func (*AttemptEventManager) RouteAttemptEvent ¶
func (m *AttemptEventManager) RouteAttemptEvent(ctx context.Context, instanceKey string, event AttemptEvent) error
func (*AttemptEventManager) RouteDispatchTarget ¶
type CronScheduler ¶ added in v0.1.89
type CronScheduler struct {
// contains filtered or unexported fields
}
CronScheduler polls for due scheduled tasks and fires them as agent tasks.
Multi-replica safety: AdvanceScheduledTask is a compare-and-swap on next_run_at, so only one replica can win a given tick. If the submit fails after the CAS, we revert next_run_at to allow retry on the next poll cycle rather than silently skipping the occurrence.
func NewCronScheduler ¶ added in v0.1.89
func NewCronScheduler( backend repository.BackendRepository, agents *AgentAPI, ) *CronScheduler
func (*CronScheduler) Start ¶ added in v0.1.89
func (s *CronScheduler) Start(ctx context.Context)
type DispatchEnvelope ¶ added in v0.1.110
type DispatchEnvelope struct {
TaskID string
Prompt string
Reason string
RetryDelayMs int
RetryAttempt int
Resume ResumeDirective
Wake WakeDirective
}
func (DispatchEnvelope) ToMap ¶ added in v0.1.110
func (e DispatchEnvelope) ToMap() map[string]any
type ExecSecurity ¶
type ExecSecurity string
type ExecutionInstance ¶
type ExecutionInstance struct {
Ctx context.Context
CancelFunc context.CancelFunc
AttemptEventChan chan AttemptEvent
DispatchEventChan chan int
InstanceLockKey string
FailedAttemptThreshold int
// contains filtered or unexported fields
}
func NewExecutionInstance ¶
func NewExecutionInstance( ctx context.Context, cfg ExecutionInstanceConfig, store ExecutionInstanceStore, locker InstanceDispatchLocker, ) (*ExecutionInstance, error)
func (*ExecutionInstance) ConsumeAttemptEvent ¶
func (e *ExecutionInstance) ConsumeAttemptEvent(ev AttemptEvent)
func (*ExecutionInstance) ConsumeDispatchTarget ¶
func (e *ExecutionInstance) ConsumeDispatchTarget(target int)
func (*ExecutionInstance) DesiredDispatchTarget ¶
func (e *ExecutionInstance) DesiredDispatchTarget() int
func (*ExecutionInstance) HandleDispatchEvent ¶
func (e *ExecutionInstance) HandleDispatchEvent(target int) error
func (*ExecutionInstance) Sync ¶
func (e *ExecutionInstance) Sync() error
type ExecutionInstanceConfig ¶
type ExecutionInstanceController ¶
type ExecutionInstanceController struct {
// contains filtered or unexported fields
}
func NewExecutionInstanceController ¶
func NewExecutionInstanceController( ctx context.Context, store ExecutionInstanceStore, locker InstanceDispatchLocker, instanceLockKeyFunc func(string) string, ) *ExecutionInstanceController
func (*ExecutionInstanceController) EnsureInstance ¶
func (c *ExecutionInstanceController) EnsureInstance(ctx context.Context, cfg ExecutionInstanceConfig) (IExecutionInstance, error)
func (*ExecutionInstanceController) RouteAttemptEvent ¶
func (c *ExecutionInstanceController) RouteAttemptEvent(ctx context.Context, instanceKey string, event AttemptEvent) error
func (*ExecutionInstanceController) RouteDispatchTarget ¶
type ExecutionInstanceState ¶
type ExecutionInstanceStore ¶
type ExecutionInstanceStore interface {
GetOrCreateExecutionInstance(ctx context.Context, instance *types.AgentExecutionInstance) (*types.AgentExecutionInstance, error)
GetExecutionInstanceByKey(ctx context.Context, instanceKey string) (*types.AgentExecutionInstance, error)
UpdateExecutionInstanceState(
ctx context.Context,
instanceKey string,
runningAttempts int,
pendingAttempts int,
stoppingAttempts int,
desiredDispatchConcurrency int,
status types.AgentExecutionInstanceStatus,
lastSyncAt *time.Time,
) error
}
type IExecutionInstance ¶
type IExecutionInstance interface {
ConsumeAttemptEvent(AttemptEvent)
ConsumeDispatchTarget(int)
HandleDispatchEvent(int) error
Sync() error
}
type InputProvenance ¶
type InstanceDispatchLocker ¶
type OutcomeProjector ¶ added in v0.1.110
type OutcomeProjector struct {
// contains filtered or unexported fields
}
func NewOutcomeProjector ¶ added in v0.1.110
func NewOutcomeProjector(backend repository.BackendRepository) *OutcomeProjector
type ResumeBarrier ¶ added in v0.1.110
type ResumeBarrier struct {
// contains filtered or unexported fields
}
func NewResumeBarrier ¶ added in v0.1.110
func NewResumeBarrier(backend repository.BackendRepository, terminalIO repository.TerminalIORepository) *ResumeBarrier
func (*ResumeBarrier) WaitForResume ¶ added in v0.1.110
type ResumeDirective ¶ added in v0.1.110
ResumeDirective captures the session-resume metadata that used to be spread across several raw payload keys.
type RoutingContext ¶
type RoutingContext struct {
To *string `json:"to,omitempty"`
ReplyTo *string `json:"reply_to,omitempty"`
Channel *string `json:"channel,omitempty"`
ReplyChannel *string `json:"reply_channel,omitempty"`
AccountID *string `json:"account_id,omitempty"`
ReplyAccountID *string `json:"reply_account_id,omitempty"`
ThreadID *string `json:"thread_id,omitempty"`
GroupID *string `json:"group_id,omitempty"`
GroupChannel *string `json:"group_channel,omitempty"`
GroupSpace *string `json:"group_space,omitempty"`
}
type RunExecutionPolicy ¶
type RunExecutionPolicy struct {
Host ExecHost `json:"host"`
Security ExecSecurity `json:"security"`
Ask ExecAsk `json:"ask"`
RuntimeType string `json:"runtime_type"`
WorkspaceAccess string `json:"workspace_access"`
NetworkEnabled bool `json:"network_enabled"`
Interactive bool `json:"interactive"`
Resources map[string]any `json:"resources,omitempty"`
Retry *RunRetryPolicy `json:"retry,omitempty"`
}
func DefaultRunExecutionPolicy ¶
func DefaultRunExecutionPolicy() RunExecutionPolicy
func NormalizeRunExecutionPolicy ¶
func NormalizeRunExecutionPolicy(p RunExecutionPolicy) RunExecutionPolicy
type RunFactory ¶ added in v0.1.110
type RunFactory struct {
// contains filtered or unexported fields
}
func NewRunFactory ¶ added in v0.1.110
func NewRunFactory(cfg RunFactoryConfig) *RunFactory
func (*RunFactory) CreateAttemptExecutionTask ¶ added in v0.1.110
func (f *RunFactory) CreateAttemptExecutionTask( ctx context.Context, run *types.AgentRun, runPolicy RunExecutionPolicy, prompt string, agentConfig map[string]any, payload map[string]any, ) (*types.AgentRunAttempt, error)
func (*RunFactory) MaterializeRun ¶ added in v0.1.110
func (f *RunFactory) MaterializeRun( ctx context.Context, task *types.AgentTask, ) (*types.AgentRun, RunExecutionPolicy, string, error)
type RunFactoryConfig ¶ added in v0.1.110
type RunFactoryConfig struct {
Backend repository.BackendRepository
TaskQueue repository.TaskQueue
TerminalIO repository.TerminalIORepository
S2 *common.S2Client
Lifecycle *TaskLifecycle
ResumeBarrier *ResumeBarrier
DefaultImage string
PublishTaskUpdate func(context.Context, uint, string)
}
type RunResultEnvelope ¶ added in v0.1.110
type RunResultEnvelope struct {
TaskID string
AttemptID string
ExitCode int
ErrorText string
ResultKey string
RetryAttempt int
PostRun *types.RunExecutionPostRun
WaitingForInput bool
Wake WakeDirective
SubtaskRequests []*types.SubtaskRequest
SourceWatchRequests []*types.SourceWatchRequest
}
func (RunResultEnvelope) ToMap ¶ added in v0.1.110
func (e RunResultEnvelope) ToMap() map[string]any
type RunRetryPolicy ¶
type RunRetryPolicy struct {
MaxAttempts int `json:"max_attempts,omitempty"`
DelayMs int `json:"delay_ms,omitempty"`
}
func NormalizeRunRetryPolicy ¶
func NormalizeRunRetryPolicy(r RunRetryPolicy) RunRetryPolicy
func RetryPolicyOrDefault ¶
func RetryPolicyOrDefault(r *RunRetryPolicy) RunRetryPolicy
type RuntimeLoops ¶ added in v0.1.110
type RuntimeLoops struct {
// contains filtered or unexported fields
}
func NewRuntimeLoops ¶ added in v0.1.110
func NewRuntimeLoops( backend repository.BackendRepository, store *repository.OrchestrationStore, terminalIO repository.TerminalIORepository, lifecycle *TaskLifecycle, instanceController *ExecutionInstanceController, runFactory *RunFactory, taskFlows *TaskFlows, dispatchConsumerID string, resultConsumerID string, publishTaskUpdate func(context.Context, uint, string), ) *RuntimeLoops
func (*RuntimeLoops) RunDispatchLoop ¶ added in v0.1.110
func (r *RuntimeLoops) RunDispatchLoop(ctx context.Context)
func (*RuntimeLoops) RunOutboxLoop ¶ added in v0.1.110
func (r *RuntimeLoops) RunOutboxLoop(ctx context.Context)
func (*RuntimeLoops) RunResultLoop ¶ added in v0.1.110
func (r *RuntimeLoops) RunResultLoop(ctx context.Context)
func (*RuntimeLoops) SetSourceWatchRegistrar ¶ added in v0.1.110
func (r *RuntimeLoops) SetSourceWatchRegistrar(registrar SourceWatchRegistrar)
type SettleOpts ¶ added in v0.1.102
type SettleOpts struct {
WaitingForInput bool
WakeSignal *types.RunExecutionWakeSignal
Blocker *types.TaskBlockerSpec
}
SettleOpts carries optional signals that only the result projector provides.
type SourceWatchRegistrar ¶ added in v0.1.110
type SourceWatchRegistrar interface {
RegisterTaskSourceWatches(ctx context.Context, task *types.AgentTask, wakeSignal *types.RunExecutionWakeSignal, requests []*types.SourceWatchRequest) (*types.TaskBlockerSpec, error)
CleanupTaskSourceWatches(ctx context.Context, task *types.AgentTask) error
HasTaskSourceWatches(ctx context.Context, task *types.AgentTask) bool
}
type TaskCommandPayload ¶ added in v0.1.110
type TaskCommandPayload struct {
Message string
Prompt string
OriginalMessage string
SessionID string
SessionKey *string
AgentID *string
HookID *uint
TimeoutMs int
Policy RunExecutionPolicy
Lane *string
ExtraSystemPrompt *string
InputProvenance *InputProvenance
Deliver *bool
Attachments []map[string]any
InstanceKey string
Label *string
SpawnedBy *string
Priority string
Provider *string
Model *string
AgentConfig map[string]any
SourceViewID *string
Resume ResumeDirective
}
func (TaskCommandPayload) PromptText ¶ added in v0.1.110
func (p TaskCommandPayload) PromptText() string
func (TaskCommandPayload) ToMap ¶ added in v0.1.110
func (p TaskCommandPayload) ToMap() map[string]any
type TaskEventBatch ¶ added in v0.1.66
type TaskEventBatch struct {
TaskID string `json:"task_id"`
RunID *string `json:"run_id,omitempty"`
Task *types.AgentTask `json:"task,omitempty"`
Run *types.AgentRun `json:"run,omitempty"`
Interaction *types.RunInteraction `json:"interaction,omitempty"`
Blocker *types.ResolvedBlocker `json:"blocker"`
Logs []common.TaskLogEntry `json:"logs"`
RunEvents []map[string]any `json:"run_events"`
Outputs []*types.TaskOutput `json:"outputs,omitempty"`
NextLogCursor int64 `json:"next_log_cursor"`
NextRunEventCursor int `json:"next_run_event_cursor"`
}
type TaskFlows ¶ added in v0.1.110
type TaskFlows struct {
// contains filtered or unexported fields
}
func NewTaskFlows ¶ added in v0.1.110
func NewTaskFlows( backend repository.BackendRepository, terminalIO repository.TerminalIORepository, s2 *common.S2Client, lifecycle *TaskLifecycle, publishTaskUpdate func(context.Context, uint, string), resolveRunState func(context.Context, *types.AgentRun) (*types.RunInteraction, error), ) *TaskFlows
func (*TaskFlows) AcceptAgentCommand ¶ added in v0.1.110
func (*TaskFlows) AcceptRunInput ¶ added in v0.1.110
func (*TaskFlows) AcceptTaskInput ¶ added in v0.1.110
func (*TaskFlows) RunPendingSweep ¶ added in v0.1.110
func (*TaskFlows) SetToolExecutor ¶ added in v0.1.131
func (f *TaskFlows) SetToolExecutor(executor ToolExecutor)
SetToolExecutor enables server-side execution of deferred tool calls on approval.
type TaskLifecycle ¶ added in v0.1.102
type TaskLifecycle struct {
// contains filtered or unexported fields
}
TaskLifecycle is the single authority for task state transitions. Every state change on agent_task must go through this struct.
func NewTaskLifecycle ¶ added in v0.1.102
func NewTaskLifecycle( backend repository.BackendRepository, store *repository.OrchestrationStore, terminalIO repository.TerminalIORepository, ) *TaskLifecycle
func (*TaskLifecycle) Cancel ¶ added in v0.1.102
func (lc *TaskLifecycle) Cancel(ctx context.Context, taskID string) error
Cancel transitions a task to cancelled. Allowed from any non-terminal state.
func (*TaskLifecycle) Dispatch ¶ added in v0.1.102
func (lc *TaskLifecycle) Dispatch(ctx context.Context, taskID, runID string) error
Dispatch transitions queued -> running when a run is materialized.
func (*TaskLifecycle) Retry ¶ added in v0.1.130
func (lc *TaskLifecycle) Retry(ctx context.Context, taskID string) error
Retry transitions a stopped task (error, dropped, or cancelled) back to queued and enqueues a fresh dispatch so the orchestration loop picks it up.
func (*TaskLifecycle) SetOutcomeProjector ¶ added in v0.1.110
func (lc *TaskLifecycle) SetOutcomeProjector(projector *OutcomeProjector)
func (*TaskLifecycle) Settle ¶ added in v0.1.102
func (lc *TaskLifecycle) Settle(ctx context.Context, runID string, opts *SettleOpts) error
Settle derives the correct task state from a completed run and applies it. This is THE single path for run-completion settlement. It is idempotent.
func (*TaskLifecycle) TransitionLive ¶ added in v0.1.102
func (lc *TaskLifecycle) TransitionLive(ctx context.Context, update types.TaskLiveUpdate) (bool, error)
TransitionLive applies a non-terminal state change during execution (running <-> waiting). Only the active worker should call this.
type TaskUpdateParams ¶ added in v0.1.96
type ToolExecutor ¶ added in v0.1.131
type ToolExecutor interface {
ExecuteDeferred(ctx context.Context, workspaceID, memberID uint, toolName string, args []string) (stdout, stderr string, exitCode int, err error)
RecordToolRejection(ctx context.Context, taskID, tool, command string) error
GrantWritePreapproval(ctx context.Context, taskID string) error
}
ToolExecutor runs a deferred tool call server-side (after user approval) and tracks rejections so the write gate can auto-fail retries.
type WakeDirective ¶ added in v0.1.110
type WakeDirective struct {
DelayMinutes int
Reason string
FollowUpPrompt string
Agenda []*types.TaskWakeAgendaItem
}
type WorkspaceLiveBatch ¶ added in v0.1.98
type WorkspaceLiveBatch struct {
Tasks []*types.AgentTask `json:"tasks"`
Outputs []*types.TaskOutput `json:"outputs"`
}