orchestration

package
v0.1.147 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 7, 2026 License: AGPL-3.0, AGPL-3.0-or-later Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AgentRunnerClaudeCode = "claude_code"
	AgentRunnerAir        = "air"

	AgentProviderClaude = "claude"
	AgentProviderAir    = "air"
)
View Source
const (
	ExecHostSandbox ExecHost = "sandbox"

	ExecSecurityDeny      ExecSecurity = "deny"
	ExecSecurityAllowlist ExecSecurity = "allowlist"
	ExecSecurityFull      ExecSecurity = "full"

	ExecAskOff    ExecAsk = "off"
	ExecAskOnMiss ExecAsk = "on-miss"
	ExecAskAlways ExecAsk = "always"

	RuntimeTypeGvisor = "gvisor"
	RuntimeTypeRunc   = "runc"

	WorkspaceAccessNone = "none"
	WorkspaceAccessRO   = "ro"
	WorkspaceAccessRW   = "rw"
)
View Source
const (
	DefaultRetryMaxAttempts = 2
	DefaultRetryDelayMs     = 0
)

Variables

This section is empty.

Functions

func DefaultAgentConfig added in v0.1.66

func DefaultAgentConfig(agentKey string) map[string]any

DefaultAgentConfig returns the default config for a new agent with the given key. This is used by the API, SDK, and frontend to preview defaults before creation.

func ExecutionClassKey

func ExecutionClassKey(workspaceID uint, agentID, lane *string, p RunExecutionPolicy) string

func ExtractLeaseExecutionID added in v0.1.74

func ExtractLeaseExecutionID(owner string) string

ExtractLeaseExecutionID parses the execution ID from a lease owner string formatted as "workerID:executionID".

func NextCronTime added in v0.1.89

func NextCronTime(expr string, ref time.Time, tz string) (time.Time, error)

NextCronTime parses a standard 5-field cron expression and returns the next fire time after ref, interpreted in the given IANA timezone. An empty or invalid timezone falls back to UTC.

func ReconcileStaleSessionLease added in v0.1.74

func ReconcileStaleSessionLease(
	ctx context.Context,
	backend repository.BackendRepository,
	terminalIO repository.TerminalIORepository,
	workspaceID uint,
	sessionID, owner string,
) bool

ReconcileStaleSessionLease checks whether the current lease owner references a terminal/missing execution. If so it force-releases the lease and returns true. Owner format is "workerID:executionID". Exported so gateway/services can reuse the same logic.

func ValidateAgentCommandParams

func ValidateAgentCommandParams(v *AgentCommandParams) error

func ValidateRunExecutionPolicy

func ValidateRunExecutionPolicy(p RunExecutionPolicy) error

Types

type AgentAPI

type AgentAPI struct {
	// contains filtered or unexported fields
}

AgentAPI is the shared application layer for agent/task/run flows. Transport layers (gRPC/HTTP) should call this directly.

func NewAgentAPI

func NewAgentAPI(
	backend repository.BackendRepository,
	runtime *AgentService,
) *AgentAPI

func (*AgentAPI) AcceptAgentCommand

func (a *AgentAPI) AcceptAgentCommand(
	ctx context.Context,
	workspaceID uint,
	params AgentCommandParams,
) (*types.AgentTask, bool, error)

func (*AgentAPI) ArchiveTask added in v0.1.71

func (a *AgentAPI) ArchiveTask(ctx context.Context, workspaceID uint, taskID string) error

func (*AgentAPI) CancelRun

func (a *AgentAPI) CancelRun(ctx context.Context, workspaceID uint, runID string) error

func (*AgentAPI) CancelTask

func (a *AgentAPI) CancelTask(ctx context.Context, workspaceID uint, taskID string) error

func (*AgentAPI) CreateAgent

func (a *AgentAPI) CreateAgent(
	ctx context.Context,
	workspaceID uint,
	agentKey string,
	name string,
	config map[string]any,
	active *bool,
) (*types.AgentProfile, error)

func (*AgentAPI) CreateSchedule added in v0.1.89

func (a *AgentAPI) CreateSchedule(
	ctx context.Context,
	workspaceID uint,
	agentID string,
	cronExpr string,
	timezone string,
	prompt string,
	skillPaths []string,
	memberID *uint,
	tokenID *uint,
	encryptedToken []byte,
	sourceViewID *string,
) (*types.ScheduledTask, error)

func (*AgentAPI) DeleteAgent added in v0.1.73

func (a *AgentAPI) DeleteAgent(ctx context.Context, workspaceID uint, agentID string) error

func (*AgentAPI) DeleteChannelBinding added in v0.1.96

func (a *AgentAPI) DeleteChannelBinding(ctx context.Context, workspaceID uint, agentID *string, channelType string) error

func (*AgentAPI) DeleteSchedule added in v0.1.89

func (a *AgentAPI) DeleteSchedule(ctx context.Context, workspaceID uint, externalID string) error

func (*AgentAPI) EnqueueRunInput

func (a *AgentAPI) EnqueueRunInput(
	ctx context.Context,
	workspaceID uint,
	runID string,
	queueMode types.AgentQueueMode,
	message string,
	idempotencyKey string,
) (*types.AgentTask, bool, types.RunInputDeliveryOutcome, error)

func (*AgentAPI) GetAgent

func (a *AgentAPI) GetAgent(ctx context.Context, workspaceID uint, agentID string) (*types.AgentProfile, error)

func (*AgentAPI) GetAgentStats added in v0.1.96

func (a *AgentAPI) GetAgentStats(ctx context.Context, workspaceID uint, agentID string) (*types.AgentStats, error)

func (*AgentAPI) GetDefaultConfig added in v0.1.66

func (a *AgentAPI) GetDefaultConfig(agentKey string) map[string]any

func (*AgentAPI) GetRun

func (a *AgentAPI) GetRun(ctx context.Context, workspaceID uint, runID string) (*types.AgentRun, error)

func (*AgentAPI) GetRunInteraction added in v0.1.73

func (a *AgentAPI) GetRunInteraction(ctx context.Context, workspaceID uint, runID string) (*types.RunInteraction, error)

func (*AgentAPI) GetSchedule added in v0.1.89

func (a *AgentAPI) GetSchedule(ctx context.Context, workspaceID uint, externalID string) (*types.ScheduledTask, error)

func (*AgentAPI) GetTask

func (a *AgentAPI) GetTask(ctx context.Context, workspaceID uint, taskID string) (*types.AgentTask, error)

func (*AgentAPI) GetTaskLogs

func (a *AgentAPI) GetTaskLogs(ctx context.Context, workspaceID uint, taskID string) ([]common.TaskLogEntry, error)

func (*AgentAPI) GetWorkspace added in v0.1.96

func (a *AgentAPI) GetWorkspace(ctx context.Context, workspaceID uint) (*types.Workspace, error)

func (*AgentAPI) ListAgents

func (a *AgentAPI) ListAgents(ctx context.Context, workspaceID uint) ([]*types.AgentProfile, error)

func (*AgentAPI) ListChannelBindings added in v0.1.96

func (a *AgentAPI) ListChannelBindings(ctx context.Context, workspaceID uint, agentID *string) ([]*types.ChannelBinding, error)

func (*AgentAPI) ListRunEvents

func (a *AgentAPI) ListRunEvents(ctx context.Context, workspaceID uint, runID string) ([]map[string]any, error)

func (*AgentAPI) ListRunSnapshots

func (a *AgentAPI) ListRunSnapshots(ctx context.Context, workspaceID uint, runID string, limit int) ([]*types.AgentRunSnapshot, error)

func (*AgentAPI) ListRuns

func (a *AgentAPI) ListRuns(ctx context.Context, workspaceID uint, limit int) ([]*types.AgentRun, error)

func (*AgentAPI) ListRunsFiltered added in v0.1.66

func (a *AgentAPI) ListRunsFiltered(
	ctx context.Context,
	workspaceID uint,
	filter types.AgentRunListFilter,
) ([]*types.AgentRun, string, bool, error)

func (*AgentAPI) ListSchedules added in v0.1.89

func (a *AgentAPI) ListSchedules(ctx context.Context, workspaceID uint) ([]*types.ScheduledTask, error)

func (*AgentAPI) ListSchedulesByView added in v0.1.112

func (a *AgentAPI) ListSchedulesByView(ctx context.Context, workspaceID uint, sourceViewID string) ([]*types.ScheduledTask, error)

func (*AgentAPI) ListSubtasks added in v0.1.110

func (a *AgentAPI) ListSubtasks(ctx context.Context, parentTaskID string) ([]*types.AgentTask, error)

func (*AgentAPI) ListSubtasksByOutputIDs added in v0.1.110

func (a *AgentAPI) ListSubtasksByOutputIDs(ctx context.Context, outputIDs []string) ([]*types.AgentTask, error)

func (*AgentAPI) ListTaskLogs added in v0.1.66

func (a *AgentAPI) ListTaskLogs(
	ctx context.Context,
	workspaceID uint,
	taskID string,
	seqNum int64,
) ([]common.TaskLogEntry, int64, error)

func (*AgentAPI) ListTasks

func (a *AgentAPI) ListTasks(ctx context.Context, workspaceID uint, limit int) ([]*types.AgentTask, error)

func (*AgentAPI) ListTasksFiltered added in v0.1.66

func (a *AgentAPI) ListTasksFiltered(
	ctx context.Context,
	workspaceID uint,
	filter types.AgentTaskListFilter,
) ([]*types.AgentTask, string, bool, error)

func (*AgentAPI) RetryTask added in v0.1.130

func (a *AgentAPI) RetryTask(ctx context.Context, workspaceID uint, taskID string) error

func (*AgentAPI) StreamTaskEvents added in v0.1.66

func (a *AgentAPI) StreamTaskEvents(
	ctx context.Context,
	workspaceID uint,
	taskID string,
	logCursor int64,
	runEventCursor int,
	cursorRunID string,
) (*TaskEventBatch, error)

func (*AgentAPI) SubmitTaskInput added in v0.1.98

func (a *AgentAPI) SubmitTaskInput(
	ctx context.Context,
	workspaceID uint,
	taskID string,
	kind types.InputKind,
	action *types.TaskInputAction,
	message string,
	idempotencyKey string,
	items []types.ItemDecision,
) (*types.AgentTask, error)

func (*AgentAPI) SubscribeExecutionLogs added in v0.1.98

func (a *AgentAPI) SubscribeExecutionLogs(ctx context.Context, executionID string) (<-chan []byte, func(), error)

func (*AgentAPI) SubscribeRunEvents added in v0.1.98

func (a *AgentAPI) SubscribeRunEvents(ctx context.Context, runID string) (<-chan struct{}, func(), error)

func (*AgentAPI) SubscribeTaskLive added in v0.1.98

func (a *AgentAPI) SubscribeTaskLive(ctx context.Context, taskID string) (<-chan struct{}, func(), error)

func (*AgentAPI) SubscribeWorkspaceLive added in v0.1.98

func (a *AgentAPI) SubscribeWorkspaceLive(ctx context.Context, workspaceID uint) (<-chan struct{}, func(), error)

func (*AgentAPI) UpdateAgent added in v0.1.66

func (a *AgentAPI) UpdateAgent(
	ctx context.Context,
	workspaceID uint,
	agentID string,
	name *string,
	role *string,
	memoryScope *string,
	qualityScore *float64,
	costBudgetUSD *float64,
	config map[string]any,
	active *bool,
) (*types.AgentProfile, error)

func (*AgentAPI) UpdateSchedule added in v0.1.89

func (a *AgentAPI) UpdateSchedule(
	ctx context.Context,
	workspaceID uint,
	externalID string,
	cronExpr *string,
	timezone *string,
	prompt *string,
	skillPaths *[]string,
	active *bool,
) (*types.ScheduledTask, error)

func (*AgentAPI) UpdateTask added in v0.1.96

func (a *AgentAPI) UpdateTask(ctx context.Context, workspaceID uint, taskID string, params TaskUpdateParams) (*types.AgentTask, error)

func (*AgentAPI) UpsertChannelBinding added in v0.1.96

func (a *AgentAPI) UpsertChannelBinding(ctx context.Context, binding *types.ChannelBinding) error

func (*AgentAPI) WorkspaceLiveBatch added in v0.1.98

func (a *AgentAPI) WorkspaceLiveBatch(ctx context.Context, workspaceID uint) (*WorkspaceLiveBatch, error)

type AgentCommandParams

type AgentCommandParams struct {
	Message           string              `json:"message"`
	AgentID           *string             `json:"agent_id,omitempty"`
	HookID            *uint               `json:"hook_id,omitempty"`
	SessionID         string              `json:"session_id"`
	SessionKey        *string             `json:"session_key,omitempty"`
	Deliver           *bool               `json:"deliver,omitempty"`
	TimeoutMs         *int                `json:"timeout_ms,omitempty"`
	Policy            *RunExecutionPolicy `json:"policy,omitempty"`
	Lane              *string             `json:"lane,omitempty"`
	ExtraSystemPrompt *string             `json:"extra_system_prompt,omitempty"`
	InputProvenance   *InputProvenance    `json:"input_provenance,omitempty"`
	Routing           RoutingContext      `json:"routing"`
	Attachments       []map[string]any    `json:"attachments,omitempty"`
	IdempotencyKey    string              `json:"idempotency_key"`
	Label             *string             `json:"label,omitempty"`
	SpawnedBy         *string             `json:"spawned_by,omitempty"`
	Priority          string              `json:"priority,omitempty"`
	BudgetUSD         *float64            `json:"budget_usd,omitempty"`
	ParentTaskID      *string             `json:"parent_task_id,omitempty"`
	SourceViewID      *string             `json:"source_view_id,omitempty"`
	DispatchDelay     time.Duration       `json:"-"`
}

type AgentService

type AgentService struct {
	// contains filtered or unexported fields
}

func NewAgentService

func NewAgentService(
	ctx context.Context,
	backend repository.BackendRepository,
	taskQueue repository.TaskQueue,
	redis *common.RedisClient,
	s2 *common.S2Client,
	defaultImage string,
) *AgentService

func (*AgentService) AcceptAgentCommand

func (s *AgentService) AcceptAgentCommand(
	ctx context.Context,
	workspaceID uint,
	params AgentCommandParams,
) (*types.AgentTask, bool, error)

func (*AgentService) AcceptRunInput

func (s *AgentService) AcceptRunInput(
	ctx context.Context,
	workspaceID uint,
	targetRunID string,
	queueMode types.AgentQueueMode,
	message string,
	idempotencyKey string,
) (*types.AgentTask, bool, types.RunInputDeliveryOutcome, error)

AcceptRunInput is the legacy run-scoped entry point. It resolves the origin task from the run and delegates to AcceptTaskInput for durable delivery.

func (*AgentService) AcceptTaskInput added in v0.1.98

func (s *AgentService) AcceptTaskInput(
	ctx context.Context,
	workspaceID uint,
	taskID string,
	kind types.InputKind,
	action *types.TaskInputAction,
	message string,
	idempotencyKey string,
	items []types.ItemDecision,
) (*types.AgentTask, error)

AcceptTaskInput appends a durable follow-up input to the task's inbox and attempts to deliver it to whichever run is currently active. If no run is live (terminal or interaction closed), the task is requeued for a new dispatch.

func (*AgentService) GetRunInteraction added in v0.1.73

func (s *AgentService) GetRunInteraction(ctx context.Context, workspaceID uint, runID string) (*types.RunInteraction, error)

func (*AgentService) SetSourceWatchRegistrar added in v0.1.110

func (s *AgentService) SetSourceWatchRegistrar(registrar SourceWatchRegistrar)

func (*AgentService) SetToolExecutor added in v0.1.131

func (s *AgentService) SetToolExecutor(executor ToolExecutor)

func (*AgentService) Start

func (s *AgentService) Start(ctx context.Context)

type AttemptEvent

type AttemptEvent struct {
	AttemptID string
	RunID     string
	Change    int
	Status    string
	Ts        int64
}

type AttemptEventManager

type AttemptEventManager struct {
	// contains filtered or unexported fields
}

func NewAttemptEventManager

func NewAttemptEventManager(
	defaultCtx context.Context,
	instanceFactory func(ctx context.Context, instanceKey string) (IExecutionInstance, error),
) *AttemptEventManager

func (*AttemptEventManager) RouteAttemptEvent

func (m *AttemptEventManager) RouteAttemptEvent(ctx context.Context, instanceKey string, event AttemptEvent) error

func (*AttemptEventManager) RouteDispatchTarget

func (m *AttemptEventManager) RouteDispatchTarget(ctx context.Context, instanceKey string, target int) error

type CronScheduler added in v0.1.89

type CronScheduler struct {
	// contains filtered or unexported fields
}

CronScheduler polls for due scheduled tasks and fires them as agent tasks.

Multi-replica safety: AdvanceScheduledTask is a compare-and-swap on next_run_at, so only one replica can win a given tick. If the submit fails after the CAS, we revert next_run_at to allow retry on the next poll cycle rather than silently skipping the occurrence.

func NewCronScheduler added in v0.1.89

func NewCronScheduler(
	backend repository.BackendRepository,
	agents *AgentAPI,
) *CronScheduler

func (*CronScheduler) Start added in v0.1.89

func (s *CronScheduler) Start(ctx context.Context)

type DispatchEnvelope added in v0.1.110

type DispatchEnvelope struct {
	TaskID       string
	Prompt       string
	Reason       string
	RetryDelayMs int
	RetryAttempt int
	Resume       ResumeDirective
	Wake         WakeDirective
}

func (DispatchEnvelope) ToMap added in v0.1.110

func (e DispatchEnvelope) ToMap() map[string]any

type ExecAsk

type ExecAsk string

type ExecHost

type ExecHost string

type ExecSecurity

type ExecSecurity string

type ExecutionInstance

type ExecutionInstance struct {
	Ctx        context.Context
	CancelFunc context.CancelFunc

	AttemptEventChan  chan AttemptEvent
	DispatchEventChan chan int

	InstanceLockKey        string
	FailedAttemptThreshold int
	// contains filtered or unexported fields
}

func (*ExecutionInstance) ConsumeAttemptEvent

func (e *ExecutionInstance) ConsumeAttemptEvent(ev AttemptEvent)

func (*ExecutionInstance) ConsumeDispatchTarget

func (e *ExecutionInstance) ConsumeDispatchTarget(target int)

func (*ExecutionInstance) DesiredDispatchTarget

func (e *ExecutionInstance) DesiredDispatchTarget() int

func (*ExecutionInstance) HandleDispatchEvent

func (e *ExecutionInstance) HandleDispatchEvent(target int) error

func (*ExecutionInstance) Sync

func (e *ExecutionInstance) Sync() error

type ExecutionInstanceConfig

type ExecutionInstanceConfig struct {
	InstanceKey            string
	WorkspaceID            uint
	AgentID                *string
	Lane                   *string
	ExecutionClassKey      string
	FailedAttemptThreshold int
	InstanceLockKey        string
}

type ExecutionInstanceController

type ExecutionInstanceController struct {
	// contains filtered or unexported fields
}

func NewExecutionInstanceController

func NewExecutionInstanceController(
	ctx context.Context,
	store ExecutionInstanceStore,
	locker InstanceDispatchLocker,
	instanceLockKeyFunc func(string) string,
) *ExecutionInstanceController

func (*ExecutionInstanceController) EnsureInstance

func (*ExecutionInstanceController) RouteAttemptEvent

func (c *ExecutionInstanceController) RouteAttemptEvent(ctx context.Context, instanceKey string, event AttemptEvent) error

func (*ExecutionInstanceController) RouteDispatchTarget

func (c *ExecutionInstanceController) RouteDispatchTarget(ctx context.Context, instanceKey string, target int) error

type ExecutionInstanceState

type ExecutionInstanceState struct {
	RunningAttempts  int
	PendingAttempts  int
	StoppingAttempts int
	FailedAttempts   []string
}

type ExecutionInstanceStore

type ExecutionInstanceStore interface {
	GetOrCreateExecutionInstance(ctx context.Context, instance *types.AgentExecutionInstance) (*types.AgentExecutionInstance, error)
	GetExecutionInstanceByKey(ctx context.Context, instanceKey string) (*types.AgentExecutionInstance, error)
	UpdateExecutionInstanceState(
		ctx context.Context,
		instanceKey string,
		runningAttempts int,
		pendingAttempts int,
		stoppingAttempts int,
		desiredDispatchConcurrency int,
		status types.AgentExecutionInstanceStatus,
		lastSyncAt *time.Time,
	) error
}

type IExecutionInstance

type IExecutionInstance interface {
	ConsumeAttemptEvent(AttemptEvent)
	ConsumeDispatchTarget(int)
	HandleDispatchEvent(int) error
	Sync() error
}

type InputProvenance

type InputProvenance struct {
	Source        *string `json:"source,omitempty"`
	MessageID     *string `json:"message_id,omitempty"`
	Channel       *string `json:"channel,omitempty"`
	ToolCallID    *string `json:"tool_call_id,omitempty"`
	CorrelationID *string `json:"correlation_id,omitempty"`
}

type InstanceDispatchLocker

type InstanceDispatchLocker interface {
	WithInstanceLock(ctx context.Context, lockKey string, fn func() error) error
}

type OutcomeProjector added in v0.1.110

type OutcomeProjector struct {
	// contains filtered or unexported fields
}

func NewOutcomeProjector added in v0.1.110

func NewOutcomeProjector(backend repository.BackendRepository) *OutcomeProjector

func (*OutcomeProjector) Sync added in v0.1.110

func (p *OutcomeProjector) Sync(ctx context.Context, task *types.AgentTask, run *types.AgentRun) error

type ResumeBarrier added in v0.1.110

type ResumeBarrier struct {
	// contains filtered or unexported fields
}

func NewResumeBarrier added in v0.1.110

func NewResumeBarrier(backend repository.BackendRepository, terminalIO repository.TerminalIORepository) *ResumeBarrier

func (*ResumeBarrier) WaitForResume added in v0.1.110

func (b *ResumeBarrier) WaitForResume(
	ctx context.Context,
	workspaceID uint,
	sessionID string,
	expectedCheckpointRunID string,
	excludeRunIDs ...string,
) error

type ResumeDirective added in v0.1.110

type ResumeDirective struct {
	Enabled         bool
	ExcludeRunID    string
	CheckpointRunID string
}

ResumeDirective captures the session-resume metadata that used to be spread across several raw payload keys.

type RoutingContext

type RoutingContext struct {
	To             *string `json:"to,omitempty"`
	ReplyTo        *string `json:"reply_to,omitempty"`
	Channel        *string `json:"channel,omitempty"`
	ReplyChannel   *string `json:"reply_channel,omitempty"`
	AccountID      *string `json:"account_id,omitempty"`
	ReplyAccountID *string `json:"reply_account_id,omitempty"`
	ThreadID       *string `json:"thread_id,omitempty"`
	GroupID        *string `json:"group_id,omitempty"`
	GroupChannel   *string `json:"group_channel,omitempty"`
	GroupSpace     *string `json:"group_space,omitempty"`
}

type RunExecutionPolicy

type RunExecutionPolicy struct {
	Host            ExecHost        `json:"host"`
	Security        ExecSecurity    `json:"security"`
	Ask             ExecAsk         `json:"ask"`
	RuntimeType     string          `json:"runtime_type"`
	WorkspaceAccess string          `json:"workspace_access"`
	NetworkEnabled  bool            `json:"network_enabled"`
	Interactive     bool            `json:"interactive"`
	Resources       map[string]any  `json:"resources,omitempty"`
	Retry           *RunRetryPolicy `json:"retry,omitempty"`
}

func DefaultRunExecutionPolicy

func DefaultRunExecutionPolicy() RunExecutionPolicy

func NormalizeRunExecutionPolicy

func NormalizeRunExecutionPolicy(p RunExecutionPolicy) RunExecutionPolicy

type RunFactory added in v0.1.110

type RunFactory struct {
	// contains filtered or unexported fields
}

func NewRunFactory added in v0.1.110

func NewRunFactory(cfg RunFactoryConfig) *RunFactory

func (*RunFactory) CreateAttemptExecutionTask added in v0.1.110

func (f *RunFactory) CreateAttemptExecutionTask(
	ctx context.Context,
	run *types.AgentRun,
	runPolicy RunExecutionPolicy,
	prompt string,
	agentConfig map[string]any,
	payload map[string]any,
) (*types.AgentRunAttempt, error)

func (*RunFactory) MaterializeRun added in v0.1.110

func (f *RunFactory) MaterializeRun(
	ctx context.Context,
	task *types.AgentTask,
) (*types.AgentRun, RunExecutionPolicy, string, error)

func (*RunFactory) ResolveRunAgentConfig added in v0.1.110

func (f *RunFactory) ResolveRunAgentConfig(
	ctx context.Context,
	run *types.AgentRun,
	payload map[string]any,
) map[string]any

type RunFactoryConfig added in v0.1.110

type RunFactoryConfig struct {
	Backend           repository.BackendRepository
	TaskQueue         repository.TaskQueue
	TerminalIO        repository.TerminalIORepository
	S2                *common.S2Client
	Lifecycle         *TaskLifecycle
	ResumeBarrier     *ResumeBarrier
	DefaultImage      string
	PublishTaskUpdate func(context.Context, uint, string)
}

type RunResultEnvelope added in v0.1.110

type RunResultEnvelope struct {
	TaskID              string
	AttemptID           string
	ExitCode            int
	ErrorText           string
	ResultKey           string
	RetryAttempt        int
	PostRun             *types.RunExecutionPostRun
	WaitingForInput     bool
	Wake                WakeDirective
	SubtaskRequests     []*types.SubtaskRequest
	SourceWatchRequests []*types.SourceWatchRequest
}

func (RunResultEnvelope) ToMap added in v0.1.110

func (e RunResultEnvelope) ToMap() map[string]any

type RunRetryPolicy

type RunRetryPolicy struct {
	MaxAttempts int `json:"max_attempts,omitempty"`
	DelayMs     int `json:"delay_ms,omitempty"`
}

func NormalizeRunRetryPolicy

func NormalizeRunRetryPolicy(r RunRetryPolicy) RunRetryPolicy

func RetryPolicyOrDefault

func RetryPolicyOrDefault(r *RunRetryPolicy) RunRetryPolicy

type RuntimeLoops added in v0.1.110

type RuntimeLoops struct {
	// contains filtered or unexported fields
}

func NewRuntimeLoops added in v0.1.110

func NewRuntimeLoops(
	backend repository.BackendRepository,
	store *repository.OrchestrationStore,
	terminalIO repository.TerminalIORepository,
	lifecycle *TaskLifecycle,
	instanceController *ExecutionInstanceController,
	runFactory *RunFactory,
	taskFlows *TaskFlows,
	dispatchConsumerID string,
	resultConsumerID string,
	publishTaskUpdate func(context.Context, uint, string),
) *RuntimeLoops

func (*RuntimeLoops) RunDispatchLoop added in v0.1.110

func (r *RuntimeLoops) RunDispatchLoop(ctx context.Context)

func (*RuntimeLoops) RunOutboxLoop added in v0.1.110

func (r *RuntimeLoops) RunOutboxLoop(ctx context.Context)

func (*RuntimeLoops) RunResultLoop added in v0.1.110

func (r *RuntimeLoops) RunResultLoop(ctx context.Context)

func (*RuntimeLoops) SetSourceWatchRegistrar added in v0.1.110

func (r *RuntimeLoops) SetSourceWatchRegistrar(registrar SourceWatchRegistrar)

type SettleOpts added in v0.1.102

type SettleOpts struct {
	WaitingForInput bool
	WakeSignal      *types.RunExecutionWakeSignal
	Blocker         *types.TaskBlockerSpec
}

SettleOpts carries optional signals that only the result projector provides.

type SourceWatchRegistrar added in v0.1.110

type SourceWatchRegistrar interface {
	RegisterTaskSourceWatches(ctx context.Context, task *types.AgentTask, wakeSignal *types.RunExecutionWakeSignal, requests []*types.SourceWatchRequest) (*types.TaskBlockerSpec, error)
	CleanupTaskSourceWatches(ctx context.Context, task *types.AgentTask) error
	HasTaskSourceWatches(ctx context.Context, task *types.AgentTask) bool
}

type TaskCommandPayload added in v0.1.110

type TaskCommandPayload struct {
	Message           string
	Prompt            string
	OriginalMessage   string
	SessionID         string
	SessionKey        *string
	AgentID           *string
	HookID            *uint
	TimeoutMs         int
	Policy            RunExecutionPolicy
	Lane              *string
	ExtraSystemPrompt *string
	InputProvenance   *InputProvenance
	Deliver           *bool
	Attachments       []map[string]any
	InstanceKey       string
	Label             *string
	SpawnedBy         *string
	Priority          string
	Provider          *string
	Model             *string
	AgentConfig       map[string]any
	SourceViewID      *string
	Resume            ResumeDirective
}

func (TaskCommandPayload) PromptText added in v0.1.110

func (p TaskCommandPayload) PromptText() string

func (TaskCommandPayload) ToMap added in v0.1.110

func (p TaskCommandPayload) ToMap() map[string]any

type TaskEventBatch added in v0.1.66

type TaskEventBatch struct {
	TaskID             string                 `json:"task_id"`
	RunID              *string                `json:"run_id,omitempty"`
	Task               *types.AgentTask       `json:"task,omitempty"`
	Run                *types.AgentRun        `json:"run,omitempty"`
	Interaction        *types.RunInteraction  `json:"interaction,omitempty"`
	Blocker            *types.ResolvedBlocker `json:"blocker"`
	Logs               []common.TaskLogEntry  `json:"logs"`
	RunEvents          []map[string]any       `json:"run_events"`
	Outputs            []*types.TaskOutput    `json:"outputs,omitempty"`
	NextLogCursor      int64                  `json:"next_log_cursor"`
	NextRunEventCursor int                    `json:"next_run_event_cursor"`
}

type TaskFlows added in v0.1.110

type TaskFlows struct {
	// contains filtered or unexported fields
}

func NewTaskFlows added in v0.1.110

func NewTaskFlows(
	backend repository.BackendRepository,
	terminalIO repository.TerminalIORepository,
	s2 *common.S2Client,
	lifecycle *TaskLifecycle,
	publishTaskUpdate func(context.Context, uint, string),
	resolveRunState func(context.Context, *types.AgentRun) (*types.RunInteraction, error),
) *TaskFlows

func (*TaskFlows) AcceptAgentCommand added in v0.1.110

func (f *TaskFlows) AcceptAgentCommand(
	ctx context.Context,
	workspaceID uint,
	params AgentCommandParams,
) (*types.AgentTask, bool, error)

func (*TaskFlows) AcceptRunInput added in v0.1.110

func (f *TaskFlows) AcceptRunInput(
	ctx context.Context,
	workspaceID uint,
	targetRunID string,
	queueMode types.AgentQueueMode,
	message string,
	idempotencyKey string,
) (*types.AgentTask, bool, types.RunInputDeliveryOutcome, error)

func (*TaskFlows) AcceptTaskInput added in v0.1.110

func (f *TaskFlows) AcceptTaskInput(
	ctx context.Context,
	workspaceID uint,
	taskID string,
	kind types.InputKind,
	action *types.TaskInputAction,
	message string,
	idempotencyKey string,
	items []types.ItemDecision,
) (*types.AgentTask, error)

func (*TaskFlows) RunPendingSweep added in v0.1.110

func (f *TaskFlows) RunPendingSweep(ctx context.Context)

func (*TaskFlows) SetToolExecutor added in v0.1.131

func (f *TaskFlows) SetToolExecutor(executor ToolExecutor)

SetToolExecutor enables server-side execution of deferred tool calls on approval.

type TaskLifecycle added in v0.1.102

type TaskLifecycle struct {
	// contains filtered or unexported fields
}

TaskLifecycle is the single authority for task state transitions. Every state change on agent_task must go through this struct.

func NewTaskLifecycle added in v0.1.102

func (*TaskLifecycle) Cancel added in v0.1.102

func (lc *TaskLifecycle) Cancel(ctx context.Context, taskID string) error

Cancel transitions a task to cancelled. Allowed from any non-terminal state.

func (*TaskLifecycle) Dispatch added in v0.1.102

func (lc *TaskLifecycle) Dispatch(ctx context.Context, taskID, runID string) error

Dispatch transitions queued -> running when a run is materialized.

func (*TaskLifecycle) Done added in v0.1.110

func (lc *TaskLifecycle) Done(ctx context.Context, taskID string, targetRunID *string) error

func (*TaskLifecycle) Drop added in v0.1.102

func (lc *TaskLifecycle) Drop(ctx context.Context, taskID string, reason string) error

Drop transitions a task to dropped with a reason.

func (*TaskLifecycle) Queue added in v0.1.110

func (lc *TaskLifecycle) Queue(ctx context.Context, taskID string, targetRunID *string) error

func (*TaskLifecycle) Resume added in v0.1.110

func (lc *TaskLifecycle) Resume(ctx context.Context, taskID, runID string) (bool, error)

func (*TaskLifecycle) Retry added in v0.1.130

func (lc *TaskLifecycle) Retry(ctx context.Context, taskID string) error

Retry transitions a stopped task (error, dropped, or cancelled) back to queued and enqueues a fresh dispatch so the orchestration loop picks it up.

func (*TaskLifecycle) SetOutcomeProjector added in v0.1.110

func (lc *TaskLifecycle) SetOutcomeProjector(projector *OutcomeProjector)

func (*TaskLifecycle) Settle added in v0.1.102

func (lc *TaskLifecycle) Settle(ctx context.Context, runID string, opts *SettleOpts) error

Settle derives the correct task state from a completed run and applies it. This is THE single path for run-completion settlement. It is idempotent.

func (*TaskLifecycle) TransitionLive added in v0.1.102

func (lc *TaskLifecycle) TransitionLive(ctx context.Context, update types.TaskLiveUpdate) (bool, error)

TransitionLive applies a non-terminal state change during execution (running <-> waiting). Only the active worker should call this.

type TaskUpdateParams added in v0.1.96

type TaskUpdateParams struct {
	Priority    *string        `json:"priority,omitempty"`
	BudgetUSD   *float64       `json:"budget_usd,omitempty"`
	PayloadJSON map[string]any `json:"payload_json,omitempty"`
	RoutingJSON map[string]any `json:"routing_json,omitempty"`
}

type ToolExecutor added in v0.1.131

type ToolExecutor interface {
	ExecuteDeferred(ctx context.Context, workspaceID, memberID uint, toolName string, args []string) (stdout, stderr string, exitCode int, err error)
	RecordToolRejection(ctx context.Context, taskID, tool, command string) error
	GrantWritePreapproval(ctx context.Context, taskID string) error
}

ToolExecutor runs a deferred tool call server-side (after user approval) and tracks rejections so the write gate can auto-fail retries.

type WakeDirective added in v0.1.110

type WakeDirective struct {
	DelayMinutes   int
	Reason         string
	FollowUpPrompt string
	Agenda         []*types.TaskWakeAgendaItem
}

type WorkspaceLiveBatch added in v0.1.98

type WorkspaceLiveBatch struct {
	Tasks   []*types.AgentTask  `json:"tasks"`
	Outputs []*types.TaskOutput `json:"outputs"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL