orchestration

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2026 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Package orchestration defines Agentic Control's native workflow orchestration layer.

This layer sits above pkg/controlplane and below higher-level workflow modules such as Court. It is the right home for generic concurrent fan-out, worker scheduling, runtime target selection, cancellation, aggregation, and reusable workflow primitives that should benefit both Court and non-Court product flows.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ExecuteWorker

func ExecuteWorker[RunT any, WorkerT any, IdentityT any](ctx context.Context, workerID string, hooks WorkerExecutionHooks[RunT, WorkerT, IdentityT]) error

func FlushQueuedRuntimeResponses

func FlushQueuedRuntimeResponses(
	ctx context.Context,
	controller SessionController,
	sessionID string,
	responses []QueuedRuntimeResponse,
	hooks RuntimeResponseHooks,
) error

func IsTerminalRunStatus

func IsTerminalRunStatus(status RunStatus) bool

func LaunchDetachedCommand

func LaunchDetachedCommand(ctx context.Context, request CommandLaunchRequest) error

func QueueAndLaunch

func QueueAndLaunch[T any](ctx context.Context, items []T, hooks QueueAndLaunchHooks[T]) error

Types

type ArtifactRecord

type ArtifactRecord struct {
	ID        int64     `json:"id"`
	RunID     string    `json:"run_id"`
	WorkerID  string    `json:"worker_id,omitempty"`
	Kind      string    `json:"kind"`
	Format    string    `json:"format"`
	Content   string    `json:"content"`
	CreatedAt time.Time `json:"created_at"`
}

type CommandLaunchRequest

type CommandLaunchRequest struct {
	Command string
	Args    []string
	Env     []string
	LogPath string
}

type EventRecord

type EventRecord struct {
	ID        int64     `json:"id"`
	RunID     string    `json:"run_id"`
	WorkerID  string    `json:"worker_id,omitempty"`
	Type      string    `json:"type"`
	Message   string    `json:"message"`
	Payload   string    `json:"payload,omitempty"`
	CreatedAt time.Time `json:"created_at"`
}

type FanoutController

type FanoutController interface {
	Describe() contract.SystemDescriptor
	SubscribeEvents(buffer int) (<-chan contract.RuntimeEvent, func())
	StartSession(context.Context, string, api.StartSessionRequest) (*contract.RuntimeSession, error)
	SendInput(context.Context, api.SendInputRequest) (*contract.RuntimeEvent, error)
	StopSession(context.Context, string) (*contract.RuntimeEvent, error)
	GetTrackedSession(context.Context, string, string) (*contract.TrackedSession, error)
}

type FanoutOptions

type FanoutOptions struct {
	Targets      []FanoutTarget
	Prompt       string
	Repeat       int
	ModelOptions api.ModelOptions
	KeepSessions bool
	Metadata     map[string]any
	EventBuffer  int
}

type FanoutResult

type FanoutResult struct {
	Prompt       string               `json:"prompt"`
	Targets      []FanoutTargetResult `json:"targets"`
	TotalUsage   contract.TokenUsage  `json:"total_usage,omitempty"`
	TotalCostUSD float64              `json:"total_cost_usd,omitempty"`
	TargetCount  int                  `json:"target_count"`
}

func RunFanout

func RunFanout(ctx context.Context, controller FanoutController, options FanoutOptions) (FanoutResult, error)

type FanoutTarget

type FanoutTarget struct {
	Backend   string                   `json:"backend"`
	Model     string                   `json:"model,omitempty"`
	Label     string                   `json:"label,omitempty"`
	Options   api.ModelOptions         `json:"options,omitempty"`
	Selection *contract.ModelSelection `json:"selection,omitempty"`
}

func ResolveFanoutTargets

func ResolveFanoutTargets(descriptors []contract.RuntimeDescriptor, requested []FanoutTarget) ([]FanoutTarget, error)

type FanoutTargetResult

type FanoutTargetResult struct {
	Target          FanoutTarget             `json:"target"`
	Session         *contract.TrackedSession `json:"session,omitempty"`
	Text            string                   `json:"text,omitempty"`
	Error           string                   `json:"error,omitempty"`
	Stopped         bool                     `json:"stopped,omitempty"`
	StopError       string                   `json:"stop_error,omitempty"`
	EventCount      int                      `json:"event_count,omitempty"`
	RecordedUsage   contract.TokenUsage      `json:"recorded_usage,omitempty"`
	RecordedCostUSD float64                  `json:"recorded_cost_usd,omitempty"`
}

type MonitorSnapshot

type MonitorSnapshot[RunT any, WorkerT any, RequestT any, EventT any] struct {
	Run          RunT       `json:"run"`
	Workers      []WorkerT  `json:"workers"`
	OpenRequests []RequestT `json:"open_requests"`
	RecentEvents []EventT   `json:"recent_events"`
	UpdatedAt    time.Time  `json:"updated_at"`
}

type PendingSessionControl

type PendingSessionControl struct {
	ID     int64
	Action PendingSessionControlAction
}

type PendingSessionControlAction

type PendingSessionControlAction string
const (
	PendingSessionControlCancel    PendingSessionControlAction = "cancel"
	PendingSessionControlInterrupt PendingSessionControlAction = "interrupt"
)

type QueueAndLaunchHooks

type QueueAndLaunchHooks[T any] struct {
	Persist        func(context.Context, T) error
	OnPersistError func(context.Context, T, error) (skip bool, err error)
	AfterPersist   func(context.Context, T) error
	Launch         func(context.Context, T) error
	OnLaunchError  func(context.Context, T, error) error
}

type QueuedRuntimeResponse

type QueuedRuntimeResponse struct {
	ID        int64
	RequestID string
	Action    contract.RespondAction
	Text      string
	OptionID  string
	Answers   []contract.RequestAnswer
	Metadata  map[string]any
}

type ReductionMode

type ReductionMode string
const (
	ReductionModeCompare   ReductionMode = "compare"
	ReductionModeSummarize ReductionMode = "summarize"
	ReductionModeBestOfN   ReductionMode = "best_of_n"
)

type ReductionResult

type ReductionResult struct {
	Mode            ReductionMode            `json:"mode"`
	Target          FanoutTarget             `json:"target"`
	Session         *contract.TrackedSession `json:"session,omitempty"`
	Text            string                   `json:"text,omitempty"`
	JSON            string                   `json:"json,omitempty"`
	Error           string                   `json:"error,omitempty"`
	Stopped         bool                     `json:"stopped,omitempty"`
	StopError       string                   `json:"stop_error,omitempty"`
	RecordedUsage   contract.TokenUsage      `json:"recorded_usage,omitempty"`
	RecordedCostUSD float64                  `json:"recorded_cost_usd,omitempty"`
}

func RunReduction

func RunReduction(ctx context.Context, controller FanoutController, mode ReductionMode, fanout FanoutResult, target FanoutTarget, keepSession bool) (ReductionResult, error)

type ReviewedFanoutOptions

type ReviewedFanoutOptions struct {
	Fanout          FanoutOptions
	Mode            ReductionMode
	ReductionTarget FanoutTarget
}

type ReviewedFanoutResult

type ReviewedFanoutResult struct {
	Fanout       FanoutResult        `json:"fanout"`
	Reduction    ReductionResult     `json:"reduction"`
	TotalUsage   contract.TokenUsage `json:"total_usage,omitempty"`
	TotalCostUSD float64             `json:"total_cost_usd,omitempty"`
}

func RunReviewedFanout

func RunReviewedFanout(ctx context.Context, controller FanoutController, options ReviewedFanoutOptions) (ReviewedFanoutResult, error)

type RunStatus

type RunStatus string
const (
	RunQueued    RunStatus = "queued"
	RunRunning   RunStatus = "running"
	RunCompleted RunStatus = "completed"
	RunFailed    RunStatus = "failed"
	RunCancelled RunStatus = "cancelled"
)

type RunStatusView

type RunStatusView[RunT any, WorkerT any] struct {
	Run     RunT      `json:"run"`
	Workers []WorkerT `json:"workers"`
}

type RunTrace

type RunTrace[RunT any, WorkerTraceT any, EventT any, ArtifactT any] struct {
	Run       RunT           `json:"run"`
	Workers   []WorkerTraceT `json:"workers"`
	Events    []EventT       `json:"events"`
	Artifacts []ArtifactT    `json:"artifacts"`
}

type RuntimeIdentity

type RuntimeIdentity struct {
	SessionID         string `json:"session_id,omitempty"`
	ProviderSessionID string `json:"provider_session_id,omitempty"`
	TranscriptPath    string `json:"transcript_path,omitempty"`
	PID               int    `json:"pid,omitempty"`
}

type RuntimeRequestAnswer

type RuntimeRequestAnswer struct {
	QuestionID string `json:"question_id,omitempty"`
	OptionID   string `json:"option_id,omitempty"`
	Text       string `json:"text,omitempty"`
}

type RuntimeRequestRecord

type RuntimeRequestRecord struct {
	ID                       int64                 `json:"id"`
	RunID                    string                `json:"run_id"`
	WorkerID                 string                `json:"worker_id"`
	RequestID                string                `json:"request_id"`
	RuntimeSessionID         string                `json:"runtime_session_id"`
	RuntimeProviderSessionID string                `json:"runtime_provider_session_id,omitempty"`
	Runtime                  string                `json:"runtime"`
	Kind                     string                `json:"kind"`
	NativeMethod             string                `json:"native_method,omitempty"`
	Status                   RuntimeRequestStatus  `json:"status"`
	Summary                  string                `json:"summary,omitempty"`
	TurnID                   string                `json:"turn_id,omitempty"`
	RequestJSON              string                `json:"request_json,omitempty"`
	ResponseStatus           RuntimeResponseStatus `json:"response_status,omitempty"`
	ResponseAction           string                `json:"response_action,omitempty"`
	ResponseText             string                `json:"response_text,omitempty"`
	ResponseOptionID         string                `json:"response_option_id,omitempty"`
	ResponseAnswersJSON      string                `json:"response_answers_json,omitempty"`
	ResponseError            string                `json:"response_error,omitempty"`
	ResponseJSON             string                `json:"response_json,omitempty"`
	CreatedAt                time.Time             `json:"created_at"`
	UpdatedAt                time.Time             `json:"updated_at"`
	RespondedAt              time.Time             `json:"responded_at,omitempty"`
}

type RuntimeRequestResponse

type RuntimeRequestResponse struct {
	Action   string                 `json:"action,omitempty"`
	Text     string                 `json:"text,omitempty"`
	OptionID string                 `json:"option_id,omitempty"`
	Answers  []RuntimeRequestAnswer `json:"answers,omitempty"`
}

type RuntimeRequestStatus

type RuntimeRequestStatus string
const (
	RuntimeRequestOpen      RuntimeRequestStatus = "open"
	RuntimeRequestResponded RuntimeRequestStatus = "responded"
	RuntimeRequestResolved  RuntimeRequestStatus = "resolved"
	RuntimeRequestClosed    RuntimeRequestStatus = "closed"
)

type RuntimeResponseHooks

type RuntimeResponseHooks struct {
	Complete func(QueuedRuntimeResponse, *contract.RuntimeEvent, error) error
}

type RuntimeResponseStatus

type RuntimeResponseStatus string
const (
	RuntimeResponseNone      RuntimeResponseStatus = ""
	RuntimeResponseQueued    RuntimeResponseStatus = "queued"
	RuntimeResponseCompleted RuntimeResponseStatus = "completed"
	RuntimeResponseFailed    RuntimeResponseStatus = "failed"
)

type SQLiteLedgerStore

type SQLiteLedgerStore struct {
	// contains filtered or unexported fields
}

func NewSQLiteLedgerStore

func NewSQLiteLedgerStore(db *sql.DB) *SQLiteLedgerStore

func (*SQLiteLedgerStore) AddArtifact

func (s *SQLiteLedgerStore) AddArtifact(ctx context.Context, artifact ArtifactRecord) error

func (*SQLiteLedgerStore) AddEvent

func (s *SQLiteLedgerStore) AddEvent(ctx context.Context, event EventRecord) error

func (*SQLiteLedgerStore) AddWorkerControl

func (s *SQLiteLedgerStore) AddWorkerControl(ctx context.Context, runID string, workerID string, action WorkerControlAction) (WorkerControlRequest, error)

func (*SQLiteLedgerStore) ArchiveWorkerAttempt

func (s *SQLiteLedgerStore) ArchiveWorkerAttempt(ctx context.Context, worker WorkerRecord) error

func (*SQLiteLedgerStore) CompletePendingWorkerControls

func (s *SQLiteLedgerStore) CompletePendingWorkerControls(ctx context.Context, workerID string, status WorkerControlStatus, errText string) error

func (*SQLiteLedgerStore) CompleteRuntimeRequestResponse

func (s *SQLiteLedgerStore) CompleteRuntimeRequestResponse(ctx context.Context, id int64, status RuntimeResponseStatus, responseJSON string, errText string) error

func (*SQLiteLedgerStore) CompleteWorker

func (s *SQLiteLedgerStore) CompleteWorker(ctx context.Context, workerID string, status WorkerStatus, result string, resultJSON string, errText string) error

func (*SQLiteLedgerStore) CompleteWorkerControl

func (s *SQLiteLedgerStore) CompleteWorkerControl(ctx context.Context, id int64, status WorkerControlStatus, errText string) error

func (*SQLiteLedgerStore) CreateWorker

func (s *SQLiteLedgerStore) CreateWorker(ctx context.Context, worker WorkerRecord) error

func (*SQLiteLedgerStore) GetRuntimeRequest

func (s *SQLiteLedgerStore) GetRuntimeRequest(ctx context.Context, id int64) (RuntimeRequestRecord, error)

func (*SQLiteLedgerStore) GetWorker

func (s *SQLiteLedgerStore) GetWorker(ctx context.Context, id string) (WorkerRecord, error)

func (*SQLiteLedgerStore) ListArtifacts

func (s *SQLiteLedgerStore) ListArtifacts(ctx context.Context, runID string) ([]ArtifactRecord, error)

func (*SQLiteLedgerStore) ListEvents

func (s *SQLiteLedgerStore) ListEvents(ctx context.Context, runID string, after int64) ([]EventRecord, error)

func (*SQLiteLedgerStore) ListPendingWorkerControls

func (s *SQLiteLedgerStore) ListPendingWorkerControls(ctx context.Context, workerID string) ([]WorkerControlRequest, error)

func (*SQLiteLedgerStore) ListQueuedRuntimeRequestResponses

func (s *SQLiteLedgerStore) ListQueuedRuntimeRequestResponses(ctx context.Context, workerID string) ([]RuntimeRequestRecord, error)

func (*SQLiteLedgerStore) ListRuntimeRequests

func (s *SQLiteLedgerStore) ListRuntimeRequests(ctx context.Context, runID string, status RuntimeRequestStatus) ([]RuntimeRequestRecord, error)

func (*SQLiteLedgerStore) ListWorkerAttempts

func (s *SQLiteLedgerStore) ListWorkerAttempts(ctx context.Context, runID string) ([]WorkerAttemptRecord, error)

func (*SQLiteLedgerStore) ListWorkerControls

func (s *SQLiteLedgerStore) ListWorkerControls(ctx context.Context, runID string) ([]WorkerControlRequest, error)

func (*SQLiteLedgerStore) ListWorkers

func (s *SQLiteLedgerStore) ListWorkers(ctx context.Context, runID string) ([]WorkerRecord, error)

func (*SQLiteLedgerStore) QueueRuntimeRequestResponse

func (s *SQLiteLedgerStore) QueueRuntimeRequestResponse(ctx context.Context, id int64, response RuntimeRequestResponse) error

func (*SQLiteLedgerStore) ResetWorkerForRetry

func (s *SQLiteLedgerStore) ResetWorkerForRetry(ctx context.Context, workerID string, launchID string) error

func (*SQLiteLedgerStore) UpdateWorkerRunning

func (s *SQLiteLedgerStore) UpdateWorkerRunning(ctx context.Context, workerID string) error

func (*SQLiteLedgerStore) UpdateWorkerRuntimeIdentity

func (s *SQLiteLedgerStore) UpdateWorkerRuntimeIdentity(ctx context.Context, workerID string, identity RuntimeIdentity) error

func (*SQLiteLedgerStore) UpsertRuntimeRequest

func (s *SQLiteLedgerStore) UpsertRuntimeRequest(ctx context.Context, request RuntimeRequestRecord) error

type SQLiteRunStateStore

type SQLiteRunStateStore struct {
	// contains filtered or unexported fields
}

func NewSQLiteRunStateStore

func NewSQLiteRunStateStore(db *sql.DB) *SQLiteRunStateStore

func (*SQLiteRunStateStore) ReactivateRun

func (s *SQLiteRunStateStore) ReactivateRun(ctx context.Context, runID string, stage string) error

func (*SQLiteRunStateStore) UpdateRunStage

func (s *SQLiteRunStateStore) UpdateRunStage(ctx context.Context, runID string, stage string) error

func (*SQLiteRunStateStore) UpdateRunStatus

func (s *SQLiteRunStateStore) UpdateRunStatus(ctx context.Context, runID string, status RunStatus, output string) error

type SessionControlHooks

type SessionControlHooks struct {
	Complete func(PendingSessionControl, *contract.RuntimeEvent, error) error
}

type SessionControlResult

type SessionControlResult struct {
	Cancelled bool
	Errors    []error
}

func HandlePendingSessionControls

func HandlePendingSessionControls(
	ctx context.Context,
	controller SessionController,
	sessionID string,
	controls []PendingSessionControl,
	hooks SessionControlHooks,
) (SessionControlResult, error)

type SessionController

type SessionController interface {
	StopSession(context.Context, string) (*contract.RuntimeEvent, error)
	Interrupt(context.Context, string) (*contract.RuntimeEvent, error)
	Respond(context.Context, api.RespondRequest) (*contract.RuntimeEvent, error)
}

type WatchOptions

type WatchOptions[RunT any, EventT any] struct {
	StopOnTerminal bool
	PollInterval   time.Duration
	OnUpdate       func(WatchUpdate[RunT, EventT]) error
}

type WatchUpdate

type WatchUpdate[RunT any, EventT any] struct {
	Event       *EventT
	TerminalRun *RunT
}

type WorkerAttemptRecord

type WorkerAttemptRecord struct {
	ID                       int64            `json:"id"`
	WorkerID                 string           `json:"worker_id"`
	RunID                    string           `json:"run_id"`
	Attempt                  int              `json:"attempt"`
	LaunchID                 string           `json:"launch_id"`
	RoleID                   string           `json:"role_id"`
	RoleKind                 string           `json:"role_kind"`
	RoleTitle                string           `json:"role_title"`
	Backend                  string           `json:"backend"`
	Provider                 string           `json:"provider,omitempty"`
	Model                    string           `json:"model,omitempty"`
	ModelOptions             api.ModelOptions `json:"model_options,omitempty"`
	Agent                    string           `json:"agent,omitempty"`
	Status                   WorkerStatus     `json:"status"`
	RuntimeSessionID         string           `json:"runtime_session_id,omitempty"`
	RuntimeProviderSessionID string           `json:"runtime_provider_session_id,omitempty"`
	RuntimeTranscriptPath    string           `json:"runtime_transcript_path,omitempty"`
	RuntimePID               int              `json:"runtime_pid,omitempty"`
	Result                   string           `json:"result,omitempty"`
	ResultJSON               string           `json:"result_json,omitempty"`
	Error                    string           `json:"error,omitempty"`
	CreatedAt                time.Time        `json:"created_at"`
	UpdatedAt                time.Time        `json:"updated_at"`
	CompletedAt              time.Time        `json:"completed_at,omitempty"`
	ArchivedAt               time.Time        `json:"archived_at"`
}

type WorkerControlAction

type WorkerControlAction string
const (
	WorkerControlCancel    WorkerControlAction = "cancel"
	WorkerControlInterrupt WorkerControlAction = "interrupt"
	WorkerControlRetry     WorkerControlAction = "retry"
	WorkerControlResume    WorkerControlAction = "resume"
)

type WorkerControlRequest

type WorkerControlRequest struct {
	ID        int64               `json:"id"`
	RunID     string              `json:"run_id"`
	WorkerID  string              `json:"worker_id"`
	Action    WorkerControlAction `json:"action"`
	Status    WorkerControlStatus `json:"status"`
	Error     string              `json:"error,omitempty"`
	CreatedAt time.Time           `json:"created_at"`
	UpdatedAt time.Time           `json:"updated_at"`
}

type WorkerControlStatus

type WorkerControlStatus string
const (
	WorkerControlPending   WorkerControlStatus = "pending"
	WorkerControlCompleted WorkerControlStatus = "completed"
	WorkerControlFailed    WorkerControlStatus = "failed"
)

type WorkerExecutionHooks

type WorkerExecutionHooks[RunT any, WorkerT any, IdentityT any] struct {
	Load             func(context.Context, string) (WorkerT, RunT, error)
	IsTerminal       func(WorkerT) bool
	MarkRunning      func(context.Context, string) error
	OnStarted        func(context.Context, RunT, WorkerT)
	Execute          func(context.Context, RunT, WorkerT) (string, string, IdentityT, error)
	HasIdentity      func(IdentityT) bool
	PersistIdentity  func(context.Context, string, IdentityT) error
	IsCancelledError func(error) bool
	MarkCancelled    func(context.Context, string, string, string) error
	OnCancelled      func(context.Context, RunT, WorkerT, string, string)
	MarkFailed       func(context.Context, string, string, string, string) error
	OnFailed         func(context.Context, RunT, WorkerT, string, string, string)
	Reload           func(context.Context, string) (WorkerT, error)
	IsCancelled      func(WorkerT) bool
	MarkCompleted    func(context.Context, string, string, string) error
	OnCompleted      func(context.Context, RunT, WorkerT, string, string)
	Reconcile        func(context.Context, RunT) error
}

type WorkerRecord

type WorkerRecord struct {
	ID                       string           `json:"id"`
	RunID                    string           `json:"run_id"`
	LaunchID                 string           `json:"launch_id"`
	Attempt                  int              `json:"attempt"`
	RoleID                   string           `json:"role_id"`
	RoleKind                 string           `json:"role_kind"`
	RoleTitle                string           `json:"role_title"`
	Backend                  string           `json:"backend"`
	Provider                 string           `json:"provider,omitempty"`
	Model                    string           `json:"model,omitempty"`
	ModelOptions             api.ModelOptions `json:"model_options,omitempty"`
	Agent                    string           `json:"agent,omitempty"`
	Status                   WorkerStatus     `json:"status"`
	RuntimeSessionID         string           `json:"runtime_session_id,omitempty"`
	RuntimeProviderSessionID string           `json:"runtime_provider_session_id,omitempty"`
	RuntimeTranscriptPath    string           `json:"runtime_transcript_path,omitempty"`
	RuntimePID               int              `json:"runtime_pid,omitempty"`
	Result                   string           `json:"result,omitempty"`
	ResultJSON               string           `json:"result_json,omitempty"`
	Error                    string           `json:"error,omitempty"`
	CreatedAt                time.Time        `json:"created_at"`
	UpdatedAt                time.Time        `json:"updated_at"`
	CompletedAt              time.Time        `json:"completed_at,omitempty"`
}

type WorkerStatus

type WorkerStatus string
const (
	WorkerQueued    WorkerStatus = "queued"
	WorkerRunning   WorkerStatus = "running"
	WorkerCompleted WorkerStatus = "completed"
	WorkerFailed    WorkerStatus = "failed"
	WorkerCancelled WorkerStatus = "cancelled"
)

type WorkerStatusSummary

type WorkerStatusSummary struct {
	HasFailed          bool
	HasCancelled       bool
	HasRunningOrQueued bool
}

func SummarizeWorkerStatuses

func SummarizeWorkerStatuses[T any](workers []T, status func(T) WorkerStatus) WorkerStatusSummary

type WorkerTrace

type WorkerTrace[WorkerT any, AttemptT any, ResultT any, ControlT any, RequestT any, EventT any, ArtifactT any, SessionT any] struct {
	Worker           WorkerT     `json:"worker"`
	RuntimeSession   *SessionT   `json:"runtime_session,omitempty"`
	Attempts         []AttemptT  `json:"attempts,omitempty"`
	StructuredResult *ResultT    `json:"structured_result,omitempty"`
	Events           []EventT    `json:"events"`
	Artifacts        []ArtifactT `json:"artifacts"`
	Controls         []ControlT  `json:"controls,omitempty"`
	Requests         []RequestT  `json:"requests,omitempty"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL