Documentation
¶
Index ¶
- type Agent
- type AuditEntry
- type DecisionFilter
- type Event
- type EventFilter
- type EventLog
- func (el *EventLog) AppendEvent(ctx context.Context, event *Event) error
- func (el *EventLog) GetEvents(ctx context.Context, workflowID string, since int64) ([]*Event, error)
- func (el *EventLog) GetEventsByType(ctx context.Context, eventType string, filter EventFilter) ([]*Event, error)
- func (el *EventLog) ReplayEvents(ctx context.Context, workflowID string) (map[string]*StepState, error)
- type LibSQLStore
- func (s *LibSQLStore) AppendEvent(ctx context.Context, event *Event) error
- func (s *LibSQLStore) CancelDecision(ctx context.Context, id string) error
- func (s *LibSQLStore) Close() error
- func (s *LibSQLStore) CreateDecision(ctx context.Context, dec *PendingDecision) error
- func (s *LibSQLStore) CreatePlugin(ctx context.Context, plugin *Plugin) error
- func (s *LibSQLStore) CreateScheduledJob(ctx context.Context, job *ScheduledJob) error
- func (s *LibSQLStore) CreateWorkflow(ctx context.Context, wf *Workflow) error
- func (s *LibSQLStore) DB() *sql.DB
- func (s *LibSQLStore) DeleteScheduledJob(ctx context.Context, id string) error
- func (s *LibSQLStore) DeleteSecret(ctx context.Context, key string) error
- func (s *LibSQLStore) DeleteWorkflow(ctx context.Context, id string) error
- func (s *LibSQLStore) GetAgent(ctx context.Context, id string) (*Agent, error)
- func (s *LibSQLStore) GetEvents(ctx context.Context, workflowID string, since int64) ([]*Event, error)
- func (s *LibSQLStore) GetEventsByType(ctx context.Context, eventType string, filter EventFilter) ([]*Event, error)
- func (s *LibSQLStore) GetPlugin(ctx context.Context, id string) (*Plugin, error)
- func (s *LibSQLStore) GetScheduledJob(ctx context.Context, id string) (*ScheduledJob, error)
- func (s *LibSQLStore) GetSecret(ctx context.Context, key string) ([]byte, error)
- func (s *LibSQLStore) GetStepState(ctx context.Context, workflowID, stepID string) (*StepState, error)
- func (s *LibSQLStore) GetTemplate(ctx context.Context, name string, version string) (*WorkflowTemplate, error)
- func (s *LibSQLStore) GetWorkflow(ctx context.Context, id string) (*Workflow, error)
- func (s *LibSQLStore) GetWorkflowContext(ctx context.Context, workflowID string) (*WorkflowContext, error)
- func (s *LibSQLStore) ListAgents(ctx context.Context) ([]*Agent, error)
- func (s *LibSQLStore) ListPendingDecisions(ctx context.Context, filter DecisionFilter) ([]*PendingDecision, error)
- func (s *LibSQLStore) ListPlugins(ctx context.Context) ([]*Plugin, error)
- func (s *LibSQLStore) ListScheduledJobs(ctx context.Context, filter ScheduledJobFilter) ([]*ScheduledJob, error)
- func (s *LibSQLStore) ListSecrets(ctx context.Context) ([]string, error)
- func (s *LibSQLStore) ListStepStates(ctx context.Context, workflowID string) ([]*StepState, error)
- func (s *LibSQLStore) ListTemplates(ctx context.Context, filter TemplateFilter) ([]*WorkflowTemplate, error)
- func (s *LibSQLStore) ListWorkflows(ctx context.Context, filter WorkflowFilter) ([]*Workflow, error)
- func (s *LibSQLStore) Migrate(ctx context.Context) error
- func (s *LibSQLStore) RegisterAgent(ctx context.Context, agent *Agent) error
- func (s *LibSQLStore) ResolveDecision(ctx context.Context, id string, resolution *Resolution) error
- func (s *LibSQLStore) StoreSecret(ctx context.Context, key string, value []byte) error
- func (s *LibSQLStore) StoreTemplate(ctx context.Context, tpl *WorkflowTemplate) error
- func (s *LibSQLStore) UpdateAgentSeen(ctx context.Context, id string) error
- func (s *LibSQLStore) UpdatePlugin(ctx context.Context, id string, status string, errMsg string) error
- func (s *LibSQLStore) UpdateScheduledJob(ctx context.Context, id string, update ScheduledJobUpdate) error
- func (s *LibSQLStore) UpdateWorkflow(ctx context.Context, id string, update WorkflowUpdate) error
- func (s *LibSQLStore) UpsertStepState(ctx context.Context, state *StepState) error
- func (s *LibSQLStore) UpsertWorkflowContext(ctx context.Context, wfCtx *WorkflowContext) error
- func (s *LibSQLStore) Vacuum(ctx context.Context) error
- type PendingDecision
- type Plugin
- type Resolution
- type ScheduledJob
- type ScheduledJobFilter
- type ScheduledJobUpdate
- type Secret
- type SnapshotPayload
- type StepState
- type Store
- type TemplateFilter
- type Workflow
- type WorkflowContext
- type WorkflowFilter
- type WorkflowTemplate
- type WorkflowUpdate
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Agent ¶
type Agent struct {
ID string `json:"id"`
Name string `json:"name"`
Type string `json:"type"` // llm, system, human, service
Metadata json.RawMessage `json:"metadata,omitempty"`
CreatedAt time.Time `json:"created_at"`
LastSeenAt *time.Time `json:"last_seen_at,omitempty"`
}
Agent represents a registered agent identity.
type AuditEntry ¶
type AuditEntry struct {
ID int64 `json:"id"`
AgentID string `json:"agent_id"`
Action string `json:"action"`
ResourceType string `json:"resource_type"`
ResourceID string `json:"resource_id,omitempty"`
Details json.RawMessage `json:"details,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
AuditEntry is an immutable record of an agent action (flag-activated).
type DecisionFilter ¶
type DecisionFilter struct {
WorkflowID string `json:"workflow_id,omitempty"`
AgentID string `json:"agent_id,omitempty"`
TargetAgentID string `json:"target_agent_id,omitempty"`
Status string `json:"status,omitempty"`
Limit int `json:"limit,omitempty"`
}
DecisionFilter specifies criteria for listing pending decisions.
type Event ¶
type Event struct {
ID int64 `json:"id"`
WorkflowID string `json:"workflow_id"`
StepID string `json:"step_id,omitempty"`
Type string `json:"event_type"`
Payload json.RawMessage `json:"payload,omitempty"`
AgentID string `json:"agent_id,omitempty"`
Timestamp time.Time `json:"timestamp"`
Sequence int64 `json:"sequence"`
}
Event is an immutable entry in the event sourcing log.
type EventFilter ¶
type EventFilter struct {
WorkflowID string `json:"workflow_id,omitempty"`
StepID string `json:"step_id,omitempty"`
EventType string `json:"event_type,omitempty"`
Since *time.Time `json:"since,omitempty"`
Limit int `json:"limit,omitempty"`
}
EventFilter specifies criteria for listing events.
type EventLog ¶
type EventLog struct {
// contains filtered or unexported fields
}
EventLog provides event-sourcing operations on top of a LibSQLStore.
func NewEventLog ¶
func NewEventLog(s *LibSQLStore) *EventLog
NewEventLog wraps a LibSQLStore to provide event-sourcing operations.
func (*EventLog) AppendEvent ¶
AppendEvent appends an event with a monotonically increasing per-workflow sequence. Uses BEGIN IMMEDIATE to ensure sequence correctness under concurrency.
func (*EventLog) GetEvents ¶
func (el *EventLog) GetEvents(ctx context.Context, workflowID string, since int64) ([]*Event, error)
GetEvents returns events for a workflow with sequence > since, ordered by sequence ASC.
func (*EventLog) GetEventsByType ¶
func (el *EventLog) GetEventsByType(ctx context.Context, eventType string, filter EventFilter) ([]*Event, error)
GetEventsByType returns events of a specific type matching the filter.
type LibSQLStore ¶
type LibSQLStore struct {
// contains filtered or unexported fields
}
LibSQLStore implements the Store interface using libSQL (SQLite-compatible embedded database).
func NewLibSQLStore ¶
func NewLibSQLStore(dbPath string) (*LibSQLStore, error)
NewLibSQLStore opens a libSQL database at the given path and returns a Store. The path can be a bare filename (e.g. "opcode.db") or a URI ("file:/path/to/db.db"). Bare paths are automatically prefixed with "file:" for go-libsql compatibility.
func (*LibSQLStore) AppendEvent ¶
func (s *LibSQLStore) AppendEvent(ctx context.Context, event *Event) error
func (*LibSQLStore) CancelDecision ¶
func (s *LibSQLStore) CancelDecision(ctx context.Context, id string) error
func (*LibSQLStore) CreateDecision ¶
func (s *LibSQLStore) CreateDecision(ctx context.Context, dec *PendingDecision) error
func (*LibSQLStore) CreatePlugin ¶
func (s *LibSQLStore) CreatePlugin(ctx context.Context, plugin *Plugin) error
func (*LibSQLStore) CreateScheduledJob ¶
func (s *LibSQLStore) CreateScheduledJob(ctx context.Context, job *ScheduledJob) error
func (*LibSQLStore) CreateWorkflow ¶
func (s *LibSQLStore) CreateWorkflow(ctx context.Context, wf *Workflow) error
func (*LibSQLStore) DB ¶
func (s *LibSQLStore) DB() *sql.DB
DB returns the underlying *sql.DB for advanced usage (e.g. event log).
func (*LibSQLStore) DeleteScheduledJob ¶
func (s *LibSQLStore) DeleteScheduledJob(ctx context.Context, id string) error
func (*LibSQLStore) DeleteSecret ¶
func (s *LibSQLStore) DeleteSecret(ctx context.Context, key string) error
func (*LibSQLStore) DeleteWorkflow ¶
func (s *LibSQLStore) DeleteWorkflow(ctx context.Context, id string) error
func (*LibSQLStore) GetEventsByType ¶
func (s *LibSQLStore) GetEventsByType(ctx context.Context, eventType string, filter EventFilter) ([]*Event, error)
func (*LibSQLStore) GetScheduledJob ¶
func (s *LibSQLStore) GetScheduledJob(ctx context.Context, id string) (*ScheduledJob, error)
func (*LibSQLStore) GetStepState ¶
func (*LibSQLStore) GetTemplate ¶
func (s *LibSQLStore) GetTemplate(ctx context.Context, name string, version string) (*WorkflowTemplate, error)
func (*LibSQLStore) GetWorkflow ¶
func (*LibSQLStore) GetWorkflowContext ¶
func (s *LibSQLStore) GetWorkflowContext(ctx context.Context, workflowID string) (*WorkflowContext, error)
func (*LibSQLStore) ListAgents ¶
func (s *LibSQLStore) ListAgents(ctx context.Context) ([]*Agent, error)
func (*LibSQLStore) ListPendingDecisions ¶
func (s *LibSQLStore) ListPendingDecisions(ctx context.Context, filter DecisionFilter) ([]*PendingDecision, error)
func (*LibSQLStore) ListPlugins ¶
func (s *LibSQLStore) ListPlugins(ctx context.Context) ([]*Plugin, error)
func (*LibSQLStore) ListScheduledJobs ¶
func (s *LibSQLStore) ListScheduledJobs(ctx context.Context, filter ScheduledJobFilter) ([]*ScheduledJob, error)
func (*LibSQLStore) ListSecrets ¶
func (s *LibSQLStore) ListSecrets(ctx context.Context) ([]string, error)
func (*LibSQLStore) ListStepStates ¶
func (*LibSQLStore) ListTemplates ¶
func (s *LibSQLStore) ListTemplates(ctx context.Context, filter TemplateFilter) ([]*WorkflowTemplate, error)
func (*LibSQLStore) ListWorkflows ¶
func (s *LibSQLStore) ListWorkflows(ctx context.Context, filter WorkflowFilter) ([]*Workflow, error)
func (*LibSQLStore) Migrate ¶
func (s *LibSQLStore) Migrate(ctx context.Context) error
Migrate runs all pending database migrations.
func (*LibSQLStore) RegisterAgent ¶
func (s *LibSQLStore) RegisterAgent(ctx context.Context, agent *Agent) error
func (*LibSQLStore) ResolveDecision ¶
func (s *LibSQLStore) ResolveDecision(ctx context.Context, id string, resolution *Resolution) error
func (*LibSQLStore) StoreSecret ¶
func (*LibSQLStore) StoreTemplate ¶
func (s *LibSQLStore) StoreTemplate(ctx context.Context, tpl *WorkflowTemplate) error
func (*LibSQLStore) UpdateAgentSeen ¶
func (s *LibSQLStore) UpdateAgentSeen(ctx context.Context, id string) error
func (*LibSQLStore) UpdatePlugin ¶
func (*LibSQLStore) UpdateScheduledJob ¶
func (s *LibSQLStore) UpdateScheduledJob(ctx context.Context, id string, update ScheduledJobUpdate) error
func (*LibSQLStore) UpdateWorkflow ¶
func (s *LibSQLStore) UpdateWorkflow(ctx context.Context, id string, update WorkflowUpdate) error
func (*LibSQLStore) UpsertStepState ¶
func (s *LibSQLStore) UpsertStepState(ctx context.Context, state *StepState) error
func (*LibSQLStore) UpsertWorkflowContext ¶
func (s *LibSQLStore) UpsertWorkflowContext(ctx context.Context, wfCtx *WorkflowContext) error
type PendingDecision ¶
type PendingDecision struct {
ID string `json:"id"`
WorkflowID string `json:"workflow_id"`
StepID string `json:"step_id"`
AgentID string `json:"agent_id,omitempty"`
TargetAgentID string `json:"target_agent_id,omitempty"`
Context json.RawMessage `json:"context"`
Options json.RawMessage `json:"options"`
TimeoutAt *time.Time `json:"timeout_at,omitempty"`
Fallback string `json:"fallback,omitempty"`
Resolution json.RawMessage `json:"resolution,omitempty"`
ResolvedBy string `json:"resolved_by,omitempty"`
ResolvedAt *time.Time `json:"resolved_at,omitempty"`
Status string `json:"status"`
CreatedAt time.Time `json:"created_at"`
}
PendingDecision represents a reasoning node awaiting agent input.
type Plugin ¶
type Plugin struct {
ID string `json:"id"`
Name string `json:"name"`
Type string `json:"type"` // mcp
Config json.RawMessage `json:"config"`
Status string `json:"status"` // active, inactive, error
LastHealthCheck *time.Time `json:"last_health_check,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
CreatedAt time.Time `json:"created_at"`
}
Plugin represents a registered external action provider.
type Resolution ¶
type Resolution struct {
Choice string `json:"choice"`
Reasoning string `json:"reasoning,omitempty"`
Data map[string]any `json:"data,omitempty"`
ResolvedBy string `json:"resolved_by,omitempty"`
}
Resolution is the agent's response to a pending decision.
type ScheduledJob ¶
type ScheduledJob struct {
ID string `json:"id"`
TemplateName string `json:"template_name"`
TemplateVersion string `json:"template_version,omitempty"`
CronExpression string `json:"cron_expression"`
Params json.RawMessage `json:"params,omitempty"`
AgentID string `json:"agent_id"`
Enabled bool `json:"enabled"`
LastRunAt *time.Time `json:"last_run_at,omitempty"`
NextRunAt *time.Time `json:"next_run_at,omitempty"`
LastRunStatus string `json:"last_run_status,omitempty"`
CreatedAt time.Time `json:"created_at"`
}
ScheduledJob is a cron-triggered workflow execution.
type ScheduledJobFilter ¶
type ScheduledJobFilter struct {
Enabled *bool `json:"enabled,omitempty"`
AgentID string `json:"agent_id,omitempty"`
Limit int `json:"limit,omitempty"`
}
ScheduledJobFilter specifies criteria for listing scheduled jobs.
type ScheduledJobUpdate ¶
type ScheduledJobUpdate struct {
Enabled *bool `json:"enabled,omitempty"`
LastRunAt *time.Time `json:"last_run_at,omitempty"`
NextRunAt *time.Time `json:"next_run_at,omitempty"`
LastRunStatus string `json:"last_run_status,omitempty"`
}
ScheduledJobUpdate specifies mutable fields of a scheduled job.
type Secret ¶
type Secret struct {
Key string `json:"key"`
Value []byte `json:"-"` // encrypted, never serialized
CreatedAt time.Time `json:"created_at"`
}
Secret is an encrypted key-value entry.
type SnapshotPayload ¶
type SnapshotPayload struct {
Output json.RawMessage `json:"output,omitempty"`
Error json.RawMessage `json:"error,omitempty"`
}
SnapshotPayload is used to extract typed data from event payloads.
type StepState ¶
type StepState struct {
WorkflowID string `json:"workflow_id"`
StepID string `json:"step_id"`
Status schema.StepStatus `json:"status"`
Input json.RawMessage `json:"input,omitempty"`
Output json.RawMessage `json:"output,omitempty"`
Error json.RawMessage `json:"error,omitempty"`
RetryCount int `json:"retry_count"`
StartedAt *time.Time `json:"started_at,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
DurationMs int64 `json:"duration_ms,omitempty"`
}
StepState is the materialized view of a step's current execution state.
type Store ¶
type Store interface {
// Workflows
CreateWorkflow(ctx context.Context, wf *Workflow) error
GetWorkflow(ctx context.Context, id string) (*Workflow, error)
UpdateWorkflow(ctx context.Context, id string, update WorkflowUpdate) error
ListWorkflows(ctx context.Context, filter WorkflowFilter) ([]*Workflow, error)
DeleteWorkflow(ctx context.Context, id string) error
// Event Sourcing (append-only)
AppendEvent(ctx context.Context, event *Event) error
GetEvents(ctx context.Context, workflowID string, since int64) ([]*Event, error)
GetEventsByType(ctx context.Context, eventType string, filter EventFilter) ([]*Event, error)
// Step State (materialized view)
UpsertStepState(ctx context.Context, state *StepState) error
GetStepState(ctx context.Context, workflowID, stepID string) (*StepState, error)
ListStepStates(ctx context.Context, workflowID string) ([]*StepState, error)
// Workflow Context
UpsertWorkflowContext(ctx context.Context, wfCtx *WorkflowContext) error
GetWorkflowContext(ctx context.Context, workflowID string) (*WorkflowContext, error)
// Pending Decisions
CreateDecision(ctx context.Context, dec *PendingDecision) error
ResolveDecision(ctx context.Context, id string, resolution *Resolution) error
CancelDecision(ctx context.Context, id string) error
ListPendingDecisions(ctx context.Context, filter DecisionFilter) ([]*PendingDecision, error)
// Agents
RegisterAgent(ctx context.Context, agent *Agent) error
GetAgent(ctx context.Context, id string) (*Agent, error)
UpdateAgentSeen(ctx context.Context, id string) error
ListAgents(ctx context.Context) ([]*Agent, error)
// Secrets
StoreSecret(ctx context.Context, key string, value []byte) error
GetSecret(ctx context.Context, key string) ([]byte, error)
DeleteSecret(ctx context.Context, key string) error
ListSecrets(ctx context.Context) ([]string, error)
// Templates
StoreTemplate(ctx context.Context, tpl *WorkflowTemplate) error
GetTemplate(ctx context.Context, name string, version string) (*WorkflowTemplate, error)
ListTemplates(ctx context.Context, filter TemplateFilter) ([]*WorkflowTemplate, error)
// Plugins
CreatePlugin(ctx context.Context, plugin *Plugin) error
GetPlugin(ctx context.Context, id string) (*Plugin, error)
UpdatePlugin(ctx context.Context, id string, status string, errMsg string) error
ListPlugins(ctx context.Context) ([]*Plugin, error)
// Scheduled Jobs
CreateScheduledJob(ctx context.Context, job *ScheduledJob) error
GetScheduledJob(ctx context.Context, id string) (*ScheduledJob, error)
UpdateScheduledJob(ctx context.Context, id string, update ScheduledJobUpdate) error
ListScheduledJobs(ctx context.Context, filter ScheduledJobFilter) ([]*ScheduledJob, error)
DeleteScheduledJob(ctx context.Context, id string) error
// Maintenance
Migrate(ctx context.Context) error
Vacuum(ctx context.Context) error
// Lifecycle
Close() error
}
Store defines the persistence layer contract. All implementations must be safe for concurrent use.
type TemplateFilter ¶
type TemplateFilter struct {
Name string `json:"name,omitempty"`
AgentID string `json:"agent_id,omitempty"`
Limit int `json:"limit,omitempty"`
}
TemplateFilter specifies criteria for listing templates.
type Workflow ¶
type Workflow struct {
ID string `json:"id"`
Name string `json:"name,omitempty"`
TemplateName string `json:"template_name,omitempty"`
TemplateVersion string `json:"template_version,omitempty"`
Definition schema.WorkflowDefinition `json:"definition"`
Status schema.WorkflowStatus `json:"status"`
AgentID string `json:"agent_id"`
ParentID string `json:"parent_workflow_id,omitempty"`
InputParams map[string]any `json:"input_params,omitempty"`
Output json.RawMessage `json:"output,omitempty"`
Error json.RawMessage `json:"error,omitempty"`
CreatedAt time.Time `json:"created_at"`
StartedAt *time.Time `json:"started_at,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
UpdatedAt time.Time `json:"updated_at"`
}
Workflow is the persisted representation of a workflow execution.
type WorkflowContext ¶
type WorkflowContext struct {
WorkflowID string `json:"workflow_id"`
AgentID string `json:"agent_id"`
OriginalIntent string `json:"original_intent"`
DecisionsLog json.RawMessage `json:"decisions_log,omitempty"`
AccumulatedData json.RawMessage `json:"accumulated_data,omitempty"`
AgentNotes string `json:"agent_notes,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
WorkflowContext stores metadata scoped to a workflow execution.
type WorkflowFilter ¶
type WorkflowFilter struct {
Status *schema.WorkflowStatus `json:"status,omitempty"`
AgentID string `json:"agent_id,omitempty"`
Since *time.Time `json:"since,omitempty"`
Limit int `json:"limit,omitempty"`
Offset int `json:"offset,omitempty"`
}
WorkflowFilter specifies criteria for listing workflows.
type WorkflowTemplate ¶
type WorkflowTemplate struct {
Name string `json:"name"`
Version string `json:"version"`
Description string `json:"description,omitempty"`
Definition schema.WorkflowDefinition `json:"definition"`
InputSchema json.RawMessage `json:"input_schema,omitempty"`
OutputSchema json.RawMessage `json:"output_schema,omitempty"`
Triggers json.RawMessage `json:"triggers,omitempty"`
Permissions json.RawMessage `json:"permissions,omitempty"`
AgentID string `json:"agent_id"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
WorkflowTemplate is a reusable workflow definition registered via opcode.define.
type WorkflowUpdate ¶
type WorkflowUpdate struct {
Status *schema.WorkflowStatus `json:"status,omitempty"`
Output json.RawMessage `json:"output,omitempty"`
Error json.RawMessage `json:"error,omitempty"`
StartedAt *time.Time `json:"started_at,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
}
WorkflowUpdate specifies mutable fields of a workflow.