orchestration

package
v0.1.95 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2026 License: AGPL-3.0, AGPL-3.0-or-later Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AgentRunnerClaudeCode = "claude_code"

	AgentProviderClaude = "claude"
)
View Source
const (
	ExecHostSandbox ExecHost = "sandbox"

	ExecSecurityDeny      ExecSecurity = "deny"
	ExecSecurityAllowlist ExecSecurity = "allowlist"
	ExecSecurityFull      ExecSecurity = "full"

	ExecAskOff    ExecAsk = "off"
	ExecAskOnMiss ExecAsk = "on-miss"
	ExecAskAlways ExecAsk = "always"

	RuntimeTypeGvisor = "gvisor"
	RuntimeTypeRunc   = "runc"

	WorkspaceAccessNone = "none"
	WorkspaceAccessRO   = "ro"
	WorkspaceAccessRW   = "rw"
)
View Source
const (
	DefaultRetryMaxAttempts = 2
	DefaultRetryDelayMs     = 0
)

Variables

This section is empty.

Functions

func DefaultAgentConfig added in v0.1.66

func DefaultAgentConfig(agentKey string) map[string]any

DefaultAgentConfig returns the default config for a new agent with the given key. This is used by the API, SDK, and frontend to preview defaults before creation.

func ExecutionClassKey

func ExecutionClassKey(workspaceID uint, agentID, lane *string, p RunExecutionPolicy) string

func ExtractLeaseExecutionID added in v0.1.74

func ExtractLeaseExecutionID(owner string) string

ExtractLeaseExecutionID parses the execution ID from a lease owner string formatted as "workerID:executionID".

func NextCronTime added in v0.1.89

func NextCronTime(expr string, ref time.Time, tz string) (time.Time, error)

NextCronTime parses a standard 5-field cron expression and returns the next fire time after ref, interpreted in the given IANA timezone. An empty or invalid timezone falls back to UTC.

func ReconcileStaleSessionLease added in v0.1.74

func ReconcileStaleSessionLease(
	ctx context.Context,
	backend repository.BackendRepository,
	terminalIO repository.TerminalIORepository,
	workspaceID uint,
	sessionID, owner string,
) bool

ReconcileStaleSessionLease checks whether the current lease owner references a terminal/missing execution. If so it force-releases the lease and returns true. Owner format is "workerID:executionID". Exported so gateway/services can reuse the same logic.

func ValidateAgentCommandParams

func ValidateAgentCommandParams(v *AgentCommandParams) error

func ValidateRunExecutionPolicy

func ValidateRunExecutionPolicy(p RunExecutionPolicy) error

Types

type AgentAPI

type AgentAPI struct {
	// contains filtered or unexported fields
}

AgentAPI is the shared application layer for agent/task/run flows. Transport layers (gRPC/HTTP) should call this directly.

func NewAgentAPI

func NewAgentAPI(
	backend repository.BackendRepository,
	runtime *AgentService,
) *AgentAPI

func (*AgentAPI) AcceptAgentCommand

func (a *AgentAPI) AcceptAgentCommand(
	ctx context.Context,
	workspaceID uint,
	params AgentCommandParams,
) (*types.AgentTask, bool, error)

func (*AgentAPI) ArchiveTask added in v0.1.71

func (a *AgentAPI) ArchiveTask(ctx context.Context, workspaceID uint, taskID string) error

func (*AgentAPI) CancelRun

func (a *AgentAPI) CancelRun(ctx context.Context, workspaceID uint, runID string) error

func (*AgentAPI) CancelTask

func (a *AgentAPI) CancelTask(ctx context.Context, workspaceID uint, taskID string) error

func (*AgentAPI) CreateAgent

func (a *AgentAPI) CreateAgent(
	ctx context.Context,
	workspaceID uint,
	agentKey string,
	name string,
	config map[string]any,
	active *bool,
) (*types.AgentProfile, error)

func (*AgentAPI) CreateSchedule added in v0.1.89

func (a *AgentAPI) CreateSchedule(
	ctx context.Context,
	workspaceID uint,
	agentID string,
	cronExpr string,
	timezone string,
	prompt string,
	skillPaths []string,
	memberID *uint,
	tokenID *uint,
	encryptedToken []byte,
) (*types.ScheduledTask, error)

func (*AgentAPI) DeleteAgent added in v0.1.73

func (a *AgentAPI) DeleteAgent(ctx context.Context, workspaceID uint, agentID string) error

func (*AgentAPI) DeleteSchedule added in v0.1.89

func (a *AgentAPI) DeleteSchedule(ctx context.Context, workspaceID uint, externalID string) error

func (*AgentAPI) EnqueueRunInput

func (a *AgentAPI) EnqueueRunInput(
	ctx context.Context,
	workspaceID uint,
	runID string,
	queueMode types.AgentQueueMode,
	message string,
	idempotencyKey string,
) (*types.AgentTask, bool, types.RunInputDeliveryOutcome, error)

func (*AgentAPI) GetAgent

func (a *AgentAPI) GetAgent(ctx context.Context, workspaceID uint, agentID string) (*types.AgentProfile, error)

func (*AgentAPI) GetDefaultConfig added in v0.1.66

func (a *AgentAPI) GetDefaultConfig(agentKey string) map[string]any

func (*AgentAPI) GetRun

func (a *AgentAPI) GetRun(ctx context.Context, workspaceID uint, runID string) (*types.AgentRun, error)

func (*AgentAPI) GetRunInteraction added in v0.1.73

func (a *AgentAPI) GetRunInteraction(ctx context.Context, workspaceID uint, runID string) (*types.RunInteraction, error)

func (*AgentAPI) GetSchedule added in v0.1.89

func (a *AgentAPI) GetSchedule(ctx context.Context, workspaceID uint, externalID string) (*types.ScheduledTask, error)

func (*AgentAPI) GetTask

func (a *AgentAPI) GetTask(ctx context.Context, workspaceID uint, taskID string) (*types.AgentTask, error)

func (*AgentAPI) GetTaskLogs

func (a *AgentAPI) GetTaskLogs(ctx context.Context, workspaceID uint, taskID string) ([]common.TaskLogEntry, error)

func (*AgentAPI) ListAgents

func (a *AgentAPI) ListAgents(ctx context.Context, workspaceID uint) ([]*types.AgentProfile, error)

func (*AgentAPI) ListRunEvents

func (a *AgentAPI) ListRunEvents(ctx context.Context, workspaceID uint, runID string) ([]map[string]any, error)

func (*AgentAPI) ListRunSnapshots

func (a *AgentAPI) ListRunSnapshots(ctx context.Context, workspaceID uint, runID string, limit int) ([]*types.AgentRunSnapshot, error)

func (*AgentAPI) ListRuns

func (a *AgentAPI) ListRuns(ctx context.Context, workspaceID uint, limit int) ([]*types.AgentRun, error)

func (*AgentAPI) ListRunsFiltered added in v0.1.66

func (a *AgentAPI) ListRunsFiltered(
	ctx context.Context,
	workspaceID uint,
	filter types.AgentRunListFilter,
) ([]*types.AgentRun, string, bool, error)

func (*AgentAPI) ListSchedules added in v0.1.89

func (a *AgentAPI) ListSchedules(ctx context.Context, workspaceID uint) ([]*types.ScheduledTask, error)

func (*AgentAPI) ListTaskLogs added in v0.1.66

func (a *AgentAPI) ListTaskLogs(
	ctx context.Context,
	workspaceID uint,
	taskID string,
	seqNum int64,
) ([]common.TaskLogEntry, int64, error)

func (*AgentAPI) ListTasks

func (a *AgentAPI) ListTasks(ctx context.Context, workspaceID uint, limit int) ([]*types.AgentTask, error)

func (*AgentAPI) ListTasksFiltered added in v0.1.66

func (a *AgentAPI) ListTasksFiltered(
	ctx context.Context,
	workspaceID uint,
	filter types.AgentTaskListFilter,
) ([]*types.AgentTask, string, bool, error)

func (*AgentAPI) StreamTaskEvents added in v0.1.66

func (a *AgentAPI) StreamTaskEvents(
	ctx context.Context,
	workspaceID uint,
	taskID string,
	logCursor int64,
	runEventCursor int,
	cursorRunID string,
) (*TaskEventBatch, error)

func (*AgentAPI) UpdateAgent added in v0.1.66

func (a *AgentAPI) UpdateAgent(
	ctx context.Context,
	workspaceID uint,
	agentID string,
	name *string,
	config map[string]any,
	active *bool,
) (*types.AgentProfile, error)

func (*AgentAPI) UpdateSchedule added in v0.1.89

func (a *AgentAPI) UpdateSchedule(
	ctx context.Context,
	workspaceID uint,
	externalID string,
	cronExpr *string,
	timezone *string,
	prompt *string,
	skillPaths *[]string,
	active *bool,
) (*types.ScheduledTask, error)

type AgentCommandParams

type AgentCommandParams struct {
	Message           string              `json:"message"`
	AgentID           *string             `json:"agent_id,omitempty"`
	HookID            *uint               `json:"hook_id,omitempty"`
	SessionID         string              `json:"session_id"`
	SessionKey        *string             `json:"session_key,omitempty"`
	Deliver           *bool               `json:"deliver,omitempty"`
	TimeoutMs         *int                `json:"timeout_ms,omitempty"`
	Policy            *RunExecutionPolicy `json:"policy,omitempty"`
	Lane              *string             `json:"lane,omitempty"`
	ExtraSystemPrompt *string             `json:"extra_system_prompt,omitempty"`
	InputProvenance   *InputProvenance    `json:"input_provenance,omitempty"`
	Routing           RoutingContext      `json:"routing"`
	Attachments       []map[string]any    `json:"attachments,omitempty"`
	IdempotencyKey    string              `json:"idempotency_key"`
	Label             *string             `json:"label,omitempty"`
	SpawnedBy         *string             `json:"spawned_by,omitempty"`
}

type AgentService

type AgentService struct {
	// contains filtered or unexported fields
}

func NewAgentService

func NewAgentService(
	ctx context.Context,
	backend repository.BackendRepository,
	taskQueue repository.TaskQueue,
	redis *common.RedisClient,
	s2 *common.S2Client,
	defaultImage string,
) *AgentService

func (*AgentService) AcceptAgentCommand

func (s *AgentService) AcceptAgentCommand(
	ctx context.Context,
	workspaceID uint,
	params AgentCommandParams,
) (*types.AgentTask, bool, error)

func (*AgentService) AcceptRunInput

func (s *AgentService) AcceptRunInput(
	ctx context.Context,
	workspaceID uint,
	targetRunID string,
	queueMode types.AgentQueueMode,
	message string,
	idempotencyKey string,
) (*types.AgentTask, bool, types.RunInputDeliveryOutcome, error)

func (*AgentService) GetRunInteraction added in v0.1.73

func (s *AgentService) GetRunInteraction(ctx context.Context, workspaceID uint, runID string) (*types.RunInteraction, error)

func (*AgentService) ListRunPendingInputs added in v0.1.73

func (s *AgentService) ListRunPendingInputs(ctx context.Context, runID string) ([]types.PendingInput, error)

ListRunPendingInputs returns user messages sitting in the input buffer for the run's active execution as determined by backend interaction state.

func (*AgentService) Start

func (s *AgentService) Start(ctx context.Context)

type AttemptEvent

type AttemptEvent struct {
	AttemptID string
	RunID     string
	Change    int
	Status    string
	Ts        int64
}

type AttemptEventManager

type AttemptEventManager struct {
	// contains filtered or unexported fields
}

func NewAttemptEventManager

func NewAttemptEventManager(
	defaultCtx context.Context,
	instanceFactory func(ctx context.Context, instanceKey string) (IExecutionInstance, error),
) *AttemptEventManager

func (*AttemptEventManager) RouteAttemptEvent

func (m *AttemptEventManager) RouteAttemptEvent(ctx context.Context, instanceKey string, event AttemptEvent) error

func (*AttemptEventManager) RouteDispatchTarget

func (m *AttemptEventManager) RouteDispatchTarget(ctx context.Context, instanceKey string, target int) error

type CronScheduler added in v0.1.89

type CronScheduler struct {
	// contains filtered or unexported fields
}

CronScheduler polls for due scheduled tasks and fires them as agent tasks.

Multi-replica safety: AdvanceScheduledTask is a compare-and-swap on next_run_at, so only one replica can win a given tick. If the submit fails after the CAS, we revert next_run_at to allow retry on the next poll cycle rather than silently skipping the occurrence.

func NewCronScheduler added in v0.1.89

func NewCronScheduler(
	backend repository.BackendRepository,
	agents *AgentAPI,
) *CronScheduler

func (*CronScheduler) Start added in v0.1.89

func (s *CronScheduler) Start(ctx context.Context)

type ExecAsk

type ExecAsk string

type ExecHost

type ExecHost string

type ExecSecurity

type ExecSecurity string

type ExecutionInstance

type ExecutionInstance struct {
	Ctx        context.Context
	CancelFunc context.CancelFunc

	AttemptEventChan  chan AttemptEvent
	DispatchEventChan chan int

	InstanceLockKey        string
	FailedAttemptThreshold int
	// contains filtered or unexported fields
}

func (*ExecutionInstance) ConsumeAttemptEvent

func (e *ExecutionInstance) ConsumeAttemptEvent(ev AttemptEvent)

func (*ExecutionInstance) ConsumeDispatchTarget

func (e *ExecutionInstance) ConsumeDispatchTarget(target int)

func (*ExecutionInstance) DesiredDispatchTarget

func (e *ExecutionInstance) DesiredDispatchTarget() int

func (*ExecutionInstance) HandleDispatchEvent

func (e *ExecutionInstance) HandleDispatchEvent(target int) error

func (*ExecutionInstance) Sync

func (e *ExecutionInstance) Sync() error

type ExecutionInstanceConfig

type ExecutionInstanceConfig struct {
	InstanceKey            string
	WorkspaceID            uint
	AgentID                *string
	Lane                   *string
	ExecutionClassKey      string
	FailedAttemptThreshold int
	InstanceLockKey        string
}

type ExecutionInstanceController

type ExecutionInstanceController struct {
	// contains filtered or unexported fields
}

func NewExecutionInstanceController

func NewExecutionInstanceController(
	ctx context.Context,
	store ExecutionInstanceStore,
	locker InstanceDispatchLocker,
	instanceLockKeyFunc func(string) string,
) *ExecutionInstanceController

func (*ExecutionInstanceController) EnsureInstance

func (*ExecutionInstanceController) RouteAttemptEvent

func (c *ExecutionInstanceController) RouteAttemptEvent(ctx context.Context, instanceKey string, event AttemptEvent) error

func (*ExecutionInstanceController) RouteDispatchTarget

func (c *ExecutionInstanceController) RouteDispatchTarget(ctx context.Context, instanceKey string, target int) error

type ExecutionInstanceState

type ExecutionInstanceState struct {
	RunningAttempts  int
	PendingAttempts  int
	StoppingAttempts int
	FailedAttempts   []string
}

type ExecutionInstanceStore

type ExecutionInstanceStore interface {
	GetOrCreateExecutionInstance(ctx context.Context, instance *types.AgentExecutionInstance) (*types.AgentExecutionInstance, error)
	GetExecutionInstanceByKey(ctx context.Context, instanceKey string) (*types.AgentExecutionInstance, error)
	UpdateExecutionInstanceState(
		ctx context.Context,
		instanceKey string,
		runningAttempts int,
		pendingAttempts int,
		stoppingAttempts int,
		desiredDispatchConcurrency int,
		status types.AgentExecutionInstanceStatus,
		lastSyncAt *time.Time,
	) error
}

type IExecutionInstance

type IExecutionInstance interface {
	ConsumeAttemptEvent(AttemptEvent)
	ConsumeDispatchTarget(int)
	HandleDispatchEvent(int) error
	Sync() error
}

type InputProvenance

type InputProvenance struct {
	Source        *string `json:"source,omitempty"`
	MessageID     *string `json:"message_id,omitempty"`
	Channel       *string `json:"channel,omitempty"`
	ToolCallID    *string `json:"tool_call_id,omitempty"`
	CorrelationID *string `json:"correlation_id,omitempty"`
}

type InstanceDispatchLocker

type InstanceDispatchLocker interface {
	WithInstanceLock(ctx context.Context, lockKey string, fn func() error) error
}

type RoutingContext

type RoutingContext struct {
	To             *string `json:"to,omitempty"`
	ReplyTo        *string `json:"reply_to,omitempty"`
	Channel        *string `json:"channel,omitempty"`
	ReplyChannel   *string `json:"reply_channel,omitempty"`
	AccountID      *string `json:"account_id,omitempty"`
	ReplyAccountID *string `json:"reply_account_id,omitempty"`
	ThreadID       *string `json:"thread_id,omitempty"`
	GroupID        *string `json:"group_id,omitempty"`
	GroupChannel   *string `json:"group_channel,omitempty"`
	GroupSpace     *string `json:"group_space,omitempty"`
}

type RunExecutionPolicy

type RunExecutionPolicy struct {
	Host            ExecHost        `json:"host"`
	Security        ExecSecurity    `json:"security"`
	Ask             ExecAsk         `json:"ask"`
	RuntimeType     string          `json:"runtime_type"`
	WorkspaceAccess string          `json:"workspace_access"`
	NetworkEnabled  bool            `json:"network_enabled"`
	Interactive     bool            `json:"interactive"`
	Resources       map[string]any  `json:"resources,omitempty"`
	Retry           *RunRetryPolicy `json:"retry,omitempty"`
}

func DefaultRunExecutionPolicy

func DefaultRunExecutionPolicy() RunExecutionPolicy

func NormalizeRunExecutionPolicy

func NormalizeRunExecutionPolicy(p RunExecutionPolicy) RunExecutionPolicy

type RunRetryPolicy

type RunRetryPolicy struct {
	MaxAttempts int `json:"max_attempts,omitempty"`
	DelayMs     int `json:"delay_ms,omitempty"`
}

func NormalizeRunRetryPolicy

func NormalizeRunRetryPolicy(r RunRetryPolicy) RunRetryPolicy

func RetryPolicyOrDefault

func RetryPolicyOrDefault(r *RunRetryPolicy) RunRetryPolicy

type TaskEventBatch added in v0.1.66

type TaskEventBatch struct {
	TaskID             string                `json:"task_id"`
	RunID              *string               `json:"run_id,omitempty"`
	Task               *types.AgentTask      `json:"task,omitempty"`
	Run                *types.AgentRun       `json:"run,omitempty"`
	Interaction        *types.RunInteraction `json:"interaction,omitempty"`
	Logs               []common.TaskLogEntry `json:"logs"`
	RunEvents          []map[string]any      `json:"run_events"`
	PendingInputs      []types.PendingInput  `json:"pending_inputs"`
	NextLogCursor      int64                 `json:"next_log_cursor"`
	NextRunEventCursor int                   `json:"next_run_event_cursor"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL