Documentation
¶
Index ¶
- Constants
- func DefaultAgentConfig(agentKey string) map[string]any
- func ExecutionClassKey(workspaceID uint, agentID, lane *string, p RunExecutionPolicy) string
- func ExtractLeaseExecutionID(owner string) string
- func NextCronTime(expr string, ref time.Time, tz string) (time.Time, error)
- func ReconcileStaleSessionLease(ctx context.Context, backend repository.BackendRepository, ...) bool
- func ToRunExecutionResources(p RunExecutionPolicy) *types.RunExecutionResources
- func ToRunExecutionType(p RunExecutionPolicy) types.RunExecutionType
- func ValidateAgentCommandParams(v *AgentCommandParams) error
- func ValidateRunExecutionPolicy(p RunExecutionPolicy) error
- type AgentAPI
- func (a *AgentAPI) AcceptAgentCommand(ctx context.Context, workspaceID uint, params AgentCommandParams) (*types.AgentTask, bool, error)
- func (a *AgentAPI) ArchiveTask(ctx context.Context, workspaceID uint, taskID string) error
- func (a *AgentAPI) CancelRun(ctx context.Context, workspaceID uint, runID string) error
- func (a *AgentAPI) CancelTask(ctx context.Context, workspaceID uint, taskID string) error
- func (a *AgentAPI) CreateAgent(ctx context.Context, workspaceID uint, agentKey string, name string, ...) (*types.AgentProfile, error)
- func (a *AgentAPI) CreateSchedule(ctx context.Context, workspaceID uint, agentID string, cronExpr string, ...) (*types.ScheduledTask, error)
- func (a *AgentAPI) DeleteAgent(ctx context.Context, workspaceID uint, agentID string) error
- func (a *AgentAPI) DeleteSchedule(ctx context.Context, workspaceID uint, externalID string) error
- func (a *AgentAPI) EnqueueRunInput(ctx context.Context, workspaceID uint, runID string, ...) (*types.AgentTask, bool, types.RunInputDeliveryOutcome, error)
- func (a *AgentAPI) GetAgent(ctx context.Context, workspaceID uint, agentID string) (*types.AgentProfile, error)
- func (a *AgentAPI) GetDefaultConfig(agentKey string) map[string]any
- func (a *AgentAPI) GetRun(ctx context.Context, workspaceID uint, runID string) (*types.AgentRun, error)
- func (a *AgentAPI) GetRunInteraction(ctx context.Context, workspaceID uint, runID string) (*types.RunInteraction, error)
- func (a *AgentAPI) GetSchedule(ctx context.Context, workspaceID uint, externalID string) (*types.ScheduledTask, error)
- func (a *AgentAPI) GetTask(ctx context.Context, workspaceID uint, taskID string) (*types.AgentTask, error)
- func (a *AgentAPI) GetTaskLogs(ctx context.Context, workspaceID uint, taskID string) ([]common.TaskLogEntry, error)
- func (a *AgentAPI) ListAgents(ctx context.Context, workspaceID uint) ([]*types.AgentProfile, error)
- func (a *AgentAPI) ListRunEvents(ctx context.Context, workspaceID uint, runID string) ([]map[string]any, error)
- func (a *AgentAPI) ListRunSnapshots(ctx context.Context, workspaceID uint, runID string, limit int) ([]*types.AgentRunSnapshot, error)
- func (a *AgentAPI) ListRuns(ctx context.Context, workspaceID uint, limit int) ([]*types.AgentRun, error)
- func (a *AgentAPI) ListRunsFiltered(ctx context.Context, workspaceID uint, filter types.AgentRunListFilter) ([]*types.AgentRun, string, bool, error)
- func (a *AgentAPI) ListSchedules(ctx context.Context, workspaceID uint) ([]*types.ScheduledTask, error)
- func (a *AgentAPI) ListTaskLogs(ctx context.Context, workspaceID uint, taskID string, seqNum int64) ([]common.TaskLogEntry, int64, error)
- func (a *AgentAPI) ListTasks(ctx context.Context, workspaceID uint, limit int) ([]*types.AgentTask, error)
- func (a *AgentAPI) ListTasksFiltered(ctx context.Context, workspaceID uint, filter types.AgentTaskListFilter) ([]*types.AgentTask, string, bool, error)
- func (a *AgentAPI) StreamTaskEvents(ctx context.Context, workspaceID uint, taskID string, logCursor int64, ...) (*TaskEventBatch, error)
- func (a *AgentAPI) UpdateAgent(ctx context.Context, workspaceID uint, agentID string, name *string, ...) (*types.AgentProfile, error)
- func (a *AgentAPI) UpdateSchedule(ctx context.Context, workspaceID uint, externalID string, cronExpr *string, ...) (*types.ScheduledTask, error)
- type AgentCommandParams
- type AgentService
- func (s *AgentService) AcceptAgentCommand(ctx context.Context, workspaceID uint, params AgentCommandParams) (*types.AgentTask, bool, error)
- func (s *AgentService) AcceptRunInput(ctx context.Context, workspaceID uint, targetRunID string, ...) (*types.AgentTask, bool, types.RunInputDeliveryOutcome, error)
- func (s *AgentService) GetRunInteraction(ctx context.Context, workspaceID uint, runID string) (*types.RunInteraction, error)
- func (s *AgentService) ListRunPendingInputs(ctx context.Context, runID string) ([]types.PendingInput, error)
- func (s *AgentService) Start(ctx context.Context)
- type AttemptEvent
- type AttemptEventManager
- type CronScheduler
- type ExecAsk
- type ExecHost
- type ExecSecurity
- type ExecutionInstance
- type ExecutionInstanceConfig
- type ExecutionInstanceController
- func (c *ExecutionInstanceController) EnsureInstance(ctx context.Context, cfg ExecutionInstanceConfig) (IExecutionInstance, error)
- func (c *ExecutionInstanceController) RouteAttemptEvent(ctx context.Context, instanceKey string, event AttemptEvent) error
- func (c *ExecutionInstanceController) RouteDispatchTarget(ctx context.Context, instanceKey string, target int) error
- type ExecutionInstanceState
- type ExecutionInstanceStore
- type IExecutionInstance
- type InputProvenance
- type InstanceDispatchLocker
- type RoutingContext
- type RunExecutionPolicy
- type RunRetryPolicy
- type TaskEventBatch
Constants ¶
const ( AgentRunnerClaudeCode = "claude_code" AgentProviderClaude = "claude" )
const ( ExecHostSandbox ExecHost = "sandbox" ExecSecurityDeny ExecSecurity = "deny" ExecSecurityAllowlist ExecSecurity = "allowlist" ExecSecurityFull ExecSecurity = "full" ExecAskOff ExecAsk = "off" ExecAskOnMiss ExecAsk = "on-miss" ExecAskAlways ExecAsk = "always" RuntimeTypeGvisor = "gvisor" RuntimeTypeRunc = "runc" WorkspaceAccessNone = "none" WorkspaceAccessRO = "ro" WorkspaceAccessRW = "rw" )
const ( DefaultRetryMaxAttempts = 2 DefaultRetryDelayMs = 0 )
Variables ¶
This section is empty.
Functions ¶
func DefaultAgentConfig ¶ added in v0.1.66
DefaultAgentConfig returns the default config for a new agent with the given key. This is used by the API, SDK, and frontend to preview defaults before creation.
func ExecutionClassKey ¶
func ExecutionClassKey(workspaceID uint, agentID, lane *string, p RunExecutionPolicy) string
func ExtractLeaseExecutionID ¶ added in v0.1.74
ExtractLeaseExecutionID parses the execution ID from a lease owner string formatted as "workerID:executionID".
func NextCronTime ¶ added in v0.1.89
NextCronTime parses a standard 5-field cron expression and returns the next fire time after ref, interpreted in the given IANA timezone. An empty or invalid timezone falls back to UTC.
func ReconcileStaleSessionLease ¶ added in v0.1.74
func ReconcileStaleSessionLease( ctx context.Context, backend repository.BackendRepository, terminalIO repository.TerminalIORepository, workspaceID uint, sessionID, owner string, ) bool
ReconcileStaleSessionLease checks whether the current lease owner references a terminal/missing execution. If so it force-releases the lease and returns true. Owner format is "workerID:executionID". Exported so gateway/services can reuse the same logic.
func ToRunExecutionResources ¶
func ToRunExecutionResources(p RunExecutionPolicy) *types.RunExecutionResources
func ToRunExecutionType ¶
func ToRunExecutionType(p RunExecutionPolicy) types.RunExecutionType
func ValidateAgentCommandParams ¶
func ValidateAgentCommandParams(v *AgentCommandParams) error
func ValidateRunExecutionPolicy ¶
func ValidateRunExecutionPolicy(p RunExecutionPolicy) error
Types ¶
type AgentAPI ¶
type AgentAPI struct {
// contains filtered or unexported fields
}
AgentAPI is the shared application layer for agent/task/run flows. Transport layers (gRPC/HTTP) should call this directly.
func NewAgentAPI ¶
func NewAgentAPI( backend repository.BackendRepository, runtime *AgentService, ) *AgentAPI
func (*AgentAPI) AcceptAgentCommand ¶
func (*AgentAPI) ArchiveTask ¶ added in v0.1.71
func (*AgentAPI) CancelTask ¶
func (*AgentAPI) CreateAgent ¶
func (*AgentAPI) CreateSchedule ¶ added in v0.1.89
func (*AgentAPI) DeleteAgent ¶ added in v0.1.73
func (*AgentAPI) DeleteSchedule ¶ added in v0.1.89
func (*AgentAPI) EnqueueRunInput ¶
func (*AgentAPI) GetDefaultConfig ¶ added in v0.1.66
func (*AgentAPI) GetRunInteraction ¶ added in v0.1.73
func (*AgentAPI) GetSchedule ¶ added in v0.1.89
func (*AgentAPI) GetTaskLogs ¶
func (*AgentAPI) ListAgents ¶
func (*AgentAPI) ListRunEvents ¶
func (*AgentAPI) ListRunSnapshots ¶
func (*AgentAPI) ListRunsFiltered ¶ added in v0.1.66
func (*AgentAPI) ListSchedules ¶ added in v0.1.89
func (*AgentAPI) ListTaskLogs ¶ added in v0.1.66
func (*AgentAPI) ListTasksFiltered ¶ added in v0.1.66
func (*AgentAPI) StreamTaskEvents ¶ added in v0.1.66
func (*AgentAPI) UpdateAgent ¶ added in v0.1.66
type AgentCommandParams ¶
type AgentCommandParams struct {
Message string `json:"message"`
AgentID *string `json:"agent_id,omitempty"`
HookID *uint `json:"hook_id,omitempty"`
SessionID string `json:"session_id"`
SessionKey *string `json:"session_key,omitempty"`
Deliver *bool `json:"deliver,omitempty"`
TimeoutMs *int `json:"timeout_ms,omitempty"`
Policy *RunExecutionPolicy `json:"policy,omitempty"`
Lane *string `json:"lane,omitempty"`
ExtraSystemPrompt *string `json:"extra_system_prompt,omitempty"`
InputProvenance *InputProvenance `json:"input_provenance,omitempty"`
Routing RoutingContext `json:"routing"`
Attachments []map[string]any `json:"attachments,omitempty"`
IdempotencyKey string `json:"idempotency_key"`
Label *string `json:"label,omitempty"`
SpawnedBy *string `json:"spawned_by,omitempty"`
}
type AgentService ¶
type AgentService struct {
// contains filtered or unexported fields
}
func NewAgentService ¶
func NewAgentService( ctx context.Context, backend repository.BackendRepository, taskQueue repository.TaskQueue, redis *common.RedisClient, s2 *common.S2Client, defaultImage string, ) *AgentService
func (*AgentService) AcceptAgentCommand ¶
func (s *AgentService) AcceptAgentCommand( ctx context.Context, workspaceID uint, params AgentCommandParams, ) (*types.AgentTask, bool, error)
func (*AgentService) AcceptRunInput ¶
func (s *AgentService) AcceptRunInput( ctx context.Context, workspaceID uint, targetRunID string, queueMode types.AgentQueueMode, message string, idempotencyKey string, ) (*types.AgentTask, bool, types.RunInputDeliveryOutcome, error)
func (*AgentService) GetRunInteraction ¶ added in v0.1.73
func (s *AgentService) GetRunInteraction(ctx context.Context, workspaceID uint, runID string) (*types.RunInteraction, error)
func (*AgentService) ListRunPendingInputs ¶ added in v0.1.73
func (s *AgentService) ListRunPendingInputs(ctx context.Context, runID string) ([]types.PendingInput, error)
ListRunPendingInputs returns user messages sitting in the input buffer for the run's active execution as determined by backend interaction state.
func (*AgentService) Start ¶
func (s *AgentService) Start(ctx context.Context)
type AttemptEvent ¶
type AttemptEventManager ¶
type AttemptEventManager struct {
// contains filtered or unexported fields
}
func NewAttemptEventManager ¶
func NewAttemptEventManager( defaultCtx context.Context, instanceFactory func(ctx context.Context, instanceKey string) (IExecutionInstance, error), ) *AttemptEventManager
func (*AttemptEventManager) RouteAttemptEvent ¶
func (m *AttemptEventManager) RouteAttemptEvent(ctx context.Context, instanceKey string, event AttemptEvent) error
func (*AttemptEventManager) RouteDispatchTarget ¶
type CronScheduler ¶ added in v0.1.89
type CronScheduler struct {
// contains filtered or unexported fields
}
CronScheduler polls for due scheduled tasks and fires them as agent tasks.
Multi-replica safety: AdvanceScheduledTask is a compare-and-swap on next_run_at, so only one replica can win a given tick. If the submit fails after the CAS, we revert next_run_at to allow retry on the next poll cycle rather than silently skipping the occurrence.
func NewCronScheduler ¶ added in v0.1.89
func NewCronScheduler( backend repository.BackendRepository, agents *AgentAPI, ) *CronScheduler
func (*CronScheduler) Start ¶ added in v0.1.89
func (s *CronScheduler) Start(ctx context.Context)
type ExecSecurity ¶
type ExecSecurity string
type ExecutionInstance ¶
type ExecutionInstance struct {
Ctx context.Context
CancelFunc context.CancelFunc
AttemptEventChan chan AttemptEvent
DispatchEventChan chan int
InstanceLockKey string
FailedAttemptThreshold int
// contains filtered or unexported fields
}
func NewExecutionInstance ¶
func NewExecutionInstance( ctx context.Context, cfg ExecutionInstanceConfig, store ExecutionInstanceStore, locker InstanceDispatchLocker, ) (*ExecutionInstance, error)
func (*ExecutionInstance) ConsumeAttemptEvent ¶
func (e *ExecutionInstance) ConsumeAttemptEvent(ev AttemptEvent)
func (*ExecutionInstance) ConsumeDispatchTarget ¶
func (e *ExecutionInstance) ConsumeDispatchTarget(target int)
func (*ExecutionInstance) DesiredDispatchTarget ¶
func (e *ExecutionInstance) DesiredDispatchTarget() int
func (*ExecutionInstance) HandleDispatchEvent ¶
func (e *ExecutionInstance) HandleDispatchEvent(target int) error
func (*ExecutionInstance) Sync ¶
func (e *ExecutionInstance) Sync() error
type ExecutionInstanceConfig ¶
type ExecutionInstanceController ¶
type ExecutionInstanceController struct {
// contains filtered or unexported fields
}
func NewExecutionInstanceController ¶
func NewExecutionInstanceController( ctx context.Context, store ExecutionInstanceStore, locker InstanceDispatchLocker, instanceLockKeyFunc func(string) string, ) *ExecutionInstanceController
func (*ExecutionInstanceController) EnsureInstance ¶
func (c *ExecutionInstanceController) EnsureInstance(ctx context.Context, cfg ExecutionInstanceConfig) (IExecutionInstance, error)
func (*ExecutionInstanceController) RouteAttemptEvent ¶
func (c *ExecutionInstanceController) RouteAttemptEvent(ctx context.Context, instanceKey string, event AttemptEvent) error
func (*ExecutionInstanceController) RouteDispatchTarget ¶
type ExecutionInstanceState ¶
type ExecutionInstanceStore ¶
type ExecutionInstanceStore interface {
GetOrCreateExecutionInstance(ctx context.Context, instance *types.AgentExecutionInstance) (*types.AgentExecutionInstance, error)
GetExecutionInstanceByKey(ctx context.Context, instanceKey string) (*types.AgentExecutionInstance, error)
UpdateExecutionInstanceState(
ctx context.Context,
instanceKey string,
runningAttempts int,
pendingAttempts int,
stoppingAttempts int,
desiredDispatchConcurrency int,
status types.AgentExecutionInstanceStatus,
lastSyncAt *time.Time,
) error
}
type IExecutionInstance ¶
type IExecutionInstance interface {
ConsumeAttemptEvent(AttemptEvent)
ConsumeDispatchTarget(int)
HandleDispatchEvent(int) error
Sync() error
}
type InputProvenance ¶
type InstanceDispatchLocker ¶
type RoutingContext ¶
type RoutingContext struct {
To *string `json:"to,omitempty"`
ReplyTo *string `json:"reply_to,omitempty"`
Channel *string `json:"channel,omitempty"`
ReplyChannel *string `json:"reply_channel,omitempty"`
AccountID *string `json:"account_id,omitempty"`
ReplyAccountID *string `json:"reply_account_id,omitempty"`
ThreadID *string `json:"thread_id,omitempty"`
GroupID *string `json:"group_id,omitempty"`
GroupChannel *string `json:"group_channel,omitempty"`
GroupSpace *string `json:"group_space,omitempty"`
}
type RunExecutionPolicy ¶
type RunExecutionPolicy struct {
Host ExecHost `json:"host"`
Security ExecSecurity `json:"security"`
Ask ExecAsk `json:"ask"`
RuntimeType string `json:"runtime_type"`
WorkspaceAccess string `json:"workspace_access"`
NetworkEnabled bool `json:"network_enabled"`
Interactive bool `json:"interactive"`
Resources map[string]any `json:"resources,omitempty"`
Retry *RunRetryPolicy `json:"retry,omitempty"`
}
func DefaultRunExecutionPolicy ¶
func DefaultRunExecutionPolicy() RunExecutionPolicy
func NormalizeRunExecutionPolicy ¶
func NormalizeRunExecutionPolicy(p RunExecutionPolicy) RunExecutionPolicy
type RunRetryPolicy ¶
type RunRetryPolicy struct {
MaxAttempts int `json:"max_attempts,omitempty"`
DelayMs int `json:"delay_ms,omitempty"`
}
func NormalizeRunRetryPolicy ¶
func NormalizeRunRetryPolicy(r RunRetryPolicy) RunRetryPolicy
func RetryPolicyOrDefault ¶
func RetryPolicyOrDefault(r *RunRetryPolicy) RunRetryPolicy
type TaskEventBatch ¶ added in v0.1.66
type TaskEventBatch struct {
TaskID string `json:"task_id"`
RunID *string `json:"run_id,omitempty"`
Task *types.AgentTask `json:"task,omitempty"`
Run *types.AgentRun `json:"run,omitempty"`
Interaction *types.RunInteraction `json:"interaction,omitempty"`
Logs []common.TaskLogEntry `json:"logs"`
RunEvents []map[string]any `json:"run_events"`
PendingInputs []types.PendingInput `json:"pending_inputs"`
NextLogCursor int64 `json:"next_log_cursor"`
NextRunEventCursor int `json:"next_run_event_cursor"`
}