store

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 11, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Agent

type Agent struct {
	ID         string          `json:"id"`
	Name       string          `json:"name"`
	Type       string          `json:"type"` // llm, system, human, service
	Metadata   json.RawMessage `json:"metadata,omitempty"`
	CreatedAt  time.Time       `json:"created_at"`
	LastSeenAt *time.Time      `json:"last_seen_at,omitempty"`
}

Agent represents a registered agent identity.

type AuditEntry

type AuditEntry struct {
	ID           int64           `json:"id"`
	AgentID      string          `json:"agent_id"`
	Action       string          `json:"action"`
	ResourceType string          `json:"resource_type"`
	ResourceID   string          `json:"resource_id,omitempty"`
	Details      json.RawMessage `json:"details,omitempty"`
	Timestamp    time.Time       `json:"timestamp"`
}

AuditEntry is an immutable record of an agent action (flag-activated).

type DecisionFilter

type DecisionFilter struct {
	WorkflowID    string `json:"workflow_id,omitempty"`
	AgentID       string `json:"agent_id,omitempty"`
	TargetAgentID string `json:"target_agent_id,omitempty"`
	Status        string `json:"status,omitempty"`
	Limit         int    `json:"limit,omitempty"`
}

DecisionFilter specifies criteria for listing pending decisions.

type Event

type Event struct {
	ID         int64           `json:"id"`
	WorkflowID string          `json:"workflow_id"`
	StepID     string          `json:"step_id,omitempty"`
	Type       string          `json:"event_type"`
	Payload    json.RawMessage `json:"payload,omitempty"`
	AgentID    string          `json:"agent_id,omitempty"`
	Timestamp  time.Time       `json:"timestamp"`
	Sequence   int64           `json:"sequence"`
}

Event is an immutable entry in the event sourcing log.

type EventFilter

type EventFilter struct {
	WorkflowID string     `json:"workflow_id,omitempty"`
	StepID     string     `json:"step_id,omitempty"`
	EventType  string     `json:"event_type,omitempty"`
	Since      *time.Time `json:"since,omitempty"`
	Limit      int        `json:"limit,omitempty"`
}

EventFilter specifies criteria for listing events.

type EventLog

type EventLog struct {
	// contains filtered or unexported fields
}

EventLog provides event-sourcing operations on top of a LibSQLStore.

func NewEventLog

func NewEventLog(s *LibSQLStore) *EventLog

NewEventLog wraps a LibSQLStore to provide event-sourcing operations.

func (*EventLog) AppendEvent

func (el *EventLog) AppendEvent(ctx context.Context, event *Event) error

AppendEvent appends an event with a monotonically increasing per-workflow sequence. Uses BEGIN IMMEDIATE to ensure sequence correctness under concurrency.

func (*EventLog) GetEvents

func (el *EventLog) GetEvents(ctx context.Context, workflowID string, since int64) ([]*Event, error)

GetEvents returns events for a workflow with sequence > since, ordered by sequence ASC.

func (*EventLog) GetEventsByType

func (el *EventLog) GetEventsByType(ctx context.Context, eventType string, filter EventFilter) ([]*Event, error)

GetEventsByType returns events of a specific type matching the filter.

func (*EventLog) ReplayEvents

func (el *EventLog) ReplayEvents(ctx context.Context, workflowID string) (map[string]*StepState, error)

ReplayEvents replays all events for a workflow and returns the reconstructed step states. Returns an error if sequence gaps are detected.

type LibSQLStore

type LibSQLStore struct {
	// contains filtered or unexported fields
}

LibSQLStore implements the Store interface using libSQL (SQLite-compatible embedded database).

func NewLibSQLStore

func NewLibSQLStore(dbPath string) (*LibSQLStore, error)

NewLibSQLStore opens a libSQL database at the given path and returns a Store. The path can be a bare filename (e.g. "opcode.db") or a URI ("file:/path/to/db.db"). Bare paths are automatically prefixed with "file:" for go-libsql compatibility.

func (*LibSQLStore) AppendEvent

func (s *LibSQLStore) AppendEvent(ctx context.Context, event *Event) error

func (*LibSQLStore) CancelDecision

func (s *LibSQLStore) CancelDecision(ctx context.Context, id string) error

func (*LibSQLStore) Close

func (s *LibSQLStore) Close() error

Close closes the database.

func (*LibSQLStore) CreateDecision

func (s *LibSQLStore) CreateDecision(ctx context.Context, dec *PendingDecision) error

func (*LibSQLStore) CreatePlugin

func (s *LibSQLStore) CreatePlugin(ctx context.Context, plugin *Plugin) error

func (*LibSQLStore) CreateScheduledJob

func (s *LibSQLStore) CreateScheduledJob(ctx context.Context, job *ScheduledJob) error

func (*LibSQLStore) CreateWorkflow

func (s *LibSQLStore) CreateWorkflow(ctx context.Context, wf *Workflow) error

func (*LibSQLStore) DB

func (s *LibSQLStore) DB() *sql.DB

DB returns the underlying *sql.DB for advanced usage (e.g. event log).

func (*LibSQLStore) DeleteScheduledJob

func (s *LibSQLStore) DeleteScheduledJob(ctx context.Context, id string) error

func (*LibSQLStore) DeleteSecret

func (s *LibSQLStore) DeleteSecret(ctx context.Context, key string) error

func (*LibSQLStore) DeleteWorkflow

func (s *LibSQLStore) DeleteWorkflow(ctx context.Context, id string) error

func (*LibSQLStore) GetAgent

func (s *LibSQLStore) GetAgent(ctx context.Context, id string) (*Agent, error)

func (*LibSQLStore) GetEvents

func (s *LibSQLStore) GetEvents(ctx context.Context, workflowID string, since int64) ([]*Event, error)

func (*LibSQLStore) GetEventsByType

func (s *LibSQLStore) GetEventsByType(ctx context.Context, eventType string, filter EventFilter) ([]*Event, error)

func (*LibSQLStore) GetPlugin

func (s *LibSQLStore) GetPlugin(ctx context.Context, id string) (*Plugin, error)

func (*LibSQLStore) GetScheduledJob

func (s *LibSQLStore) GetScheduledJob(ctx context.Context, id string) (*ScheduledJob, error)

func (*LibSQLStore) GetSecret

func (s *LibSQLStore) GetSecret(ctx context.Context, key string) ([]byte, error)

func (*LibSQLStore) GetStepState

func (s *LibSQLStore) GetStepState(ctx context.Context, workflowID, stepID string) (*StepState, error)

func (*LibSQLStore) GetTemplate

func (s *LibSQLStore) GetTemplate(ctx context.Context, name string, version string) (*WorkflowTemplate, error)

func (*LibSQLStore) GetWorkflow

func (s *LibSQLStore) GetWorkflow(ctx context.Context, id string) (*Workflow, error)

func (*LibSQLStore) GetWorkflowContext

func (s *LibSQLStore) GetWorkflowContext(ctx context.Context, workflowID string) (*WorkflowContext, error)

func (*LibSQLStore) ListAgents

func (s *LibSQLStore) ListAgents(ctx context.Context) ([]*Agent, error)

func (*LibSQLStore) ListPendingDecisions

func (s *LibSQLStore) ListPendingDecisions(ctx context.Context, filter DecisionFilter) ([]*PendingDecision, error)

func (*LibSQLStore) ListPlugins

func (s *LibSQLStore) ListPlugins(ctx context.Context) ([]*Plugin, error)

func (*LibSQLStore) ListScheduledJobs

func (s *LibSQLStore) ListScheduledJobs(ctx context.Context, filter ScheduledJobFilter) ([]*ScheduledJob, error)

func (*LibSQLStore) ListSecrets

func (s *LibSQLStore) ListSecrets(ctx context.Context) ([]string, error)

func (*LibSQLStore) ListStepStates

func (s *LibSQLStore) ListStepStates(ctx context.Context, workflowID string) ([]*StepState, error)

func (*LibSQLStore) ListTemplates

func (s *LibSQLStore) ListTemplates(ctx context.Context, filter TemplateFilter) ([]*WorkflowTemplate, error)

func (*LibSQLStore) ListWorkflows

func (s *LibSQLStore) ListWorkflows(ctx context.Context, filter WorkflowFilter) ([]*Workflow, error)

func (*LibSQLStore) Migrate

func (s *LibSQLStore) Migrate(ctx context.Context) error

Migrate runs all pending database migrations.

func (*LibSQLStore) RegisterAgent

func (s *LibSQLStore) RegisterAgent(ctx context.Context, agent *Agent) error

func (*LibSQLStore) ResolveDecision

func (s *LibSQLStore) ResolveDecision(ctx context.Context, id string, resolution *Resolution) error

func (*LibSQLStore) StoreSecret

func (s *LibSQLStore) StoreSecret(ctx context.Context, key string, value []byte) error

func (*LibSQLStore) StoreTemplate

func (s *LibSQLStore) StoreTemplate(ctx context.Context, tpl *WorkflowTemplate) error

func (*LibSQLStore) UpdateAgentSeen

func (s *LibSQLStore) UpdateAgentSeen(ctx context.Context, id string) error

func (*LibSQLStore) UpdatePlugin

func (s *LibSQLStore) UpdatePlugin(ctx context.Context, id string, status string, errMsg string) error

func (*LibSQLStore) UpdateScheduledJob

func (s *LibSQLStore) UpdateScheduledJob(ctx context.Context, id string, update ScheduledJobUpdate) error

func (*LibSQLStore) UpdateWorkflow

func (s *LibSQLStore) UpdateWorkflow(ctx context.Context, id string, update WorkflowUpdate) error

func (*LibSQLStore) UpsertStepState

func (s *LibSQLStore) UpsertStepState(ctx context.Context, state *StepState) error

func (*LibSQLStore) UpsertWorkflowContext

func (s *LibSQLStore) UpsertWorkflowContext(ctx context.Context, wfCtx *WorkflowContext) error

func (*LibSQLStore) Vacuum

func (s *LibSQLStore) Vacuum(ctx context.Context) error

Vacuum runs VACUUM on the database.

type PendingDecision

type PendingDecision struct {
	ID            string          `json:"id"`
	WorkflowID    string          `json:"workflow_id"`
	StepID        string          `json:"step_id"`
	AgentID       string          `json:"agent_id,omitempty"`
	TargetAgentID string          `json:"target_agent_id,omitempty"`
	Context       json.RawMessage `json:"context"`
	Options       json.RawMessage `json:"options"`
	TimeoutAt     *time.Time      `json:"timeout_at,omitempty"`
	Fallback      string          `json:"fallback,omitempty"`
	Resolution    json.RawMessage `json:"resolution,omitempty"`
	ResolvedBy    string          `json:"resolved_by,omitempty"`
	ResolvedAt    *time.Time      `json:"resolved_at,omitempty"`
	Status        string          `json:"status"`
	CreatedAt     time.Time       `json:"created_at"`
}

PendingDecision represents a reasoning node awaiting agent input.

type Plugin

type Plugin struct {
	ID              string          `json:"id"`
	Name            string          `json:"name"`
	Type            string          `json:"type"` // mcp
	Config          json.RawMessage `json:"config"`
	Status          string          `json:"status"` // active, inactive, error
	LastHealthCheck *time.Time      `json:"last_health_check,omitempty"`
	ErrorMessage    string          `json:"error_message,omitempty"`
	CreatedAt       time.Time       `json:"created_at"`
}

Plugin represents a registered external action provider.

type Resolution

type Resolution struct {
	Choice     string         `json:"choice"`
	Reasoning  string         `json:"reasoning,omitempty"`
	Data       map[string]any `json:"data,omitempty"`
	ResolvedBy string         `json:"resolved_by,omitempty"`
}

Resolution is the agent's response to a pending decision.

type ScheduledJob

type ScheduledJob struct {
	ID              string          `json:"id"`
	TemplateName    string          `json:"template_name"`
	TemplateVersion string          `json:"template_version,omitempty"`
	CronExpression  string          `json:"cron_expression"`
	Params          json.RawMessage `json:"params,omitempty"`
	AgentID         string          `json:"agent_id"`
	Enabled         bool            `json:"enabled"`
	LastRunAt       *time.Time      `json:"last_run_at,omitempty"`
	NextRunAt       *time.Time      `json:"next_run_at,omitempty"`
	LastRunStatus   string          `json:"last_run_status,omitempty"`
	CreatedAt       time.Time       `json:"created_at"`
}

ScheduledJob is a cron-triggered workflow execution.

type ScheduledJobFilter

type ScheduledJobFilter struct {
	Enabled *bool  `json:"enabled,omitempty"`
	AgentID string `json:"agent_id,omitempty"`
	Limit   int    `json:"limit,omitempty"`
}

ScheduledJobFilter specifies criteria for listing scheduled jobs.

type ScheduledJobUpdate

type ScheduledJobUpdate struct {
	Enabled       *bool      `json:"enabled,omitempty"`
	LastRunAt     *time.Time `json:"last_run_at,omitempty"`
	NextRunAt     *time.Time `json:"next_run_at,omitempty"`
	LastRunStatus string     `json:"last_run_status,omitempty"`
}

ScheduledJobUpdate specifies mutable fields of a scheduled job.

type Secret

type Secret struct {
	Key       string    `json:"key"`
	Value     []byte    `json:"-"` // encrypted, never serialized
	CreatedAt time.Time `json:"created_at"`
}

Secret is an encrypted key-value entry.

type SnapshotPayload

type SnapshotPayload struct {
	Output json.RawMessage `json:"output,omitempty"`
	Error  json.RawMessage `json:"error,omitempty"`
}

SnapshotPayload is used to extract typed data from event payloads.

type StepState

type StepState struct {
	WorkflowID  string            `json:"workflow_id"`
	StepID      string            `json:"step_id"`
	Status      schema.StepStatus `json:"status"`
	Input       json.RawMessage   `json:"input,omitempty"`
	Output      json.RawMessage   `json:"output,omitempty"`
	Error       json.RawMessage   `json:"error,omitempty"`
	RetryCount  int               `json:"retry_count"`
	StartedAt   *time.Time        `json:"started_at,omitempty"`
	CompletedAt *time.Time        `json:"completed_at,omitempty"`
	DurationMs  int64             `json:"duration_ms,omitempty"`
}

StepState is the materialized view of a step's current execution state.

type Store

type Store interface {
	// Workflows
	CreateWorkflow(ctx context.Context, wf *Workflow) error
	GetWorkflow(ctx context.Context, id string) (*Workflow, error)
	UpdateWorkflow(ctx context.Context, id string, update WorkflowUpdate) error
	ListWorkflows(ctx context.Context, filter WorkflowFilter) ([]*Workflow, error)
	DeleteWorkflow(ctx context.Context, id string) error

	// Event Sourcing (append-only)
	AppendEvent(ctx context.Context, event *Event) error
	GetEvents(ctx context.Context, workflowID string, since int64) ([]*Event, error)
	GetEventsByType(ctx context.Context, eventType string, filter EventFilter) ([]*Event, error)

	// Step State (materialized view)
	UpsertStepState(ctx context.Context, state *StepState) error
	GetStepState(ctx context.Context, workflowID, stepID string) (*StepState, error)
	ListStepStates(ctx context.Context, workflowID string) ([]*StepState, error)

	// Workflow Context
	UpsertWorkflowContext(ctx context.Context, wfCtx *WorkflowContext) error
	GetWorkflowContext(ctx context.Context, workflowID string) (*WorkflowContext, error)

	// Pending Decisions
	CreateDecision(ctx context.Context, dec *PendingDecision) error
	ResolveDecision(ctx context.Context, id string, resolution *Resolution) error
	CancelDecision(ctx context.Context, id string) error
	ListPendingDecisions(ctx context.Context, filter DecisionFilter) ([]*PendingDecision, error)

	// Agents
	RegisterAgent(ctx context.Context, agent *Agent) error
	GetAgent(ctx context.Context, id string) (*Agent, error)
	UpdateAgentSeen(ctx context.Context, id string) error
	ListAgents(ctx context.Context) ([]*Agent, error)

	// Secrets
	StoreSecret(ctx context.Context, key string, value []byte) error
	GetSecret(ctx context.Context, key string) ([]byte, error)
	DeleteSecret(ctx context.Context, key string) error
	ListSecrets(ctx context.Context) ([]string, error)

	// Templates
	StoreTemplate(ctx context.Context, tpl *WorkflowTemplate) error
	GetTemplate(ctx context.Context, name string, version string) (*WorkflowTemplate, error)
	ListTemplates(ctx context.Context, filter TemplateFilter) ([]*WorkflowTemplate, error)

	// Plugins
	CreatePlugin(ctx context.Context, plugin *Plugin) error
	GetPlugin(ctx context.Context, id string) (*Plugin, error)
	UpdatePlugin(ctx context.Context, id string, status string, errMsg string) error
	ListPlugins(ctx context.Context) ([]*Plugin, error)

	// Scheduled Jobs
	CreateScheduledJob(ctx context.Context, job *ScheduledJob) error
	GetScheduledJob(ctx context.Context, id string) (*ScheduledJob, error)
	UpdateScheduledJob(ctx context.Context, id string, update ScheduledJobUpdate) error
	ListScheduledJobs(ctx context.Context, filter ScheduledJobFilter) ([]*ScheduledJob, error)
	DeleteScheduledJob(ctx context.Context, id string) error

	// Maintenance
	Migrate(ctx context.Context) error
	Vacuum(ctx context.Context) error

	// Lifecycle
	Close() error
}

Store defines the persistence layer contract. All implementations must be safe for concurrent use.

type TemplateFilter

type TemplateFilter struct {
	Name    string `json:"name,omitempty"`
	AgentID string `json:"agent_id,omitempty"`
	Limit   int    `json:"limit,omitempty"`
}

TemplateFilter specifies criteria for listing templates.

type Workflow

type Workflow struct {
	ID              string                    `json:"id"`
	Name            string                    `json:"name,omitempty"`
	TemplateName    string                    `json:"template_name,omitempty"`
	TemplateVersion string                    `json:"template_version,omitempty"`
	Definition      schema.WorkflowDefinition `json:"definition"`
	Status          schema.WorkflowStatus     `json:"status"`
	AgentID         string                    `json:"agent_id"`
	ParentID        string                    `json:"parent_workflow_id,omitempty"`
	InputParams     map[string]any            `json:"input_params,omitempty"`
	Output          json.RawMessage           `json:"output,omitempty"`
	Error           json.RawMessage           `json:"error,omitempty"`
	CreatedAt       time.Time                 `json:"created_at"`
	StartedAt       *time.Time                `json:"started_at,omitempty"`
	CompletedAt     *time.Time                `json:"completed_at,omitempty"`
	UpdatedAt       time.Time                 `json:"updated_at"`
}

Workflow is the persisted representation of a workflow execution.

type WorkflowContext

type WorkflowContext struct {
	WorkflowID      string          `json:"workflow_id"`
	AgentID         string          `json:"agent_id"`
	OriginalIntent  string          `json:"original_intent"`
	DecisionsLog    json.RawMessage `json:"decisions_log,omitempty"`
	AccumulatedData json.RawMessage `json:"accumulated_data,omitempty"`
	AgentNotes      string          `json:"agent_notes,omitempty"`
	CreatedAt       time.Time       `json:"created_at"`
	UpdatedAt       time.Time       `json:"updated_at"`
}

WorkflowContext stores metadata scoped to a workflow execution.

type WorkflowFilter

type WorkflowFilter struct {
	Status  *schema.WorkflowStatus `json:"status,omitempty"`
	AgentID string                 `json:"agent_id,omitempty"`
	Since   *time.Time             `json:"since,omitempty"`
	Limit   int                    `json:"limit,omitempty"`
	Offset  int                    `json:"offset,omitempty"`
}

WorkflowFilter specifies criteria for listing workflows.

type WorkflowTemplate

type WorkflowTemplate struct {
	Name         string                    `json:"name"`
	Version      string                    `json:"version"`
	Description  string                    `json:"description,omitempty"`
	Definition   schema.WorkflowDefinition `json:"definition"`
	InputSchema  json.RawMessage           `json:"input_schema,omitempty"`
	OutputSchema json.RawMessage           `json:"output_schema,omitempty"`
	Triggers     json.RawMessage           `json:"triggers,omitempty"`
	Permissions  json.RawMessage           `json:"permissions,omitempty"`
	AgentID      string                    `json:"agent_id"`
	CreatedAt    time.Time                 `json:"created_at"`
	UpdatedAt    time.Time                 `json:"updated_at"`
}

WorkflowTemplate is a reusable workflow definition registered via opcode.define.

type WorkflowUpdate

type WorkflowUpdate struct {
	Status      *schema.WorkflowStatus `json:"status,omitempty"`
	Output      json.RawMessage        `json:"output,omitempty"`
	Error       json.RawMessage        `json:"error,omitempty"`
	StartedAt   *time.Time             `json:"started_at,omitempty"`
	CompletedAt *time.Time             `json:"completed_at,omitempty"`
}

WorkflowUpdate specifies mutable fields of a workflow.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL