storage

package
v0.0.0-...-8cc4447 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 7, 2026 License: Apache-2.0 Imports: 34 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AccessPolicyModel

type AccessPolicyModel struct {
	ID             int64     `gorm:"column:id;primaryKey;autoIncrement"`
	Name           string    `gorm:"column:name;not null;uniqueIndex"`
	CallerTags     string    `gorm:"column:caller_tags;type:text;not null"` // JSON array
	TargetTags     string    `gorm:"column:target_tags;type:text;not null"` // JSON array
	AllowFunctions string    `gorm:"column:allow_functions;type:text"`      // JSON array
	DenyFunctions  string    `gorm:"column:deny_functions;type:text"`       // JSON array
	Constraints    string    `gorm:"column:constraints;type:text"`          // JSON object
	Action         string    `gorm:"column:action;not null;default:'allow'"`
	Priority       int       `gorm:"column:priority;not null;default:0;index"`
	Enabled        bool      `gorm:"column:enabled;not null;default:true;index"`
	Description    *string   `gorm:"column:description"`
	CreatedAt      time.Time `gorm:"column:created_at;autoCreateTime"`
	UpdatedAt      time.Time `gorm:"column:updated_at;autoUpdateTime"`
}

AccessPolicyModel represents a tag-based access policy for cross-agent calls.

func (AccessPolicyModel) TableName

func (AccessPolicyModel) TableName() string

type AgentConfigurationModel

type AgentConfigurationModel struct {
	ID              int64     `gorm:"column:id;primaryKey;autoIncrement"`
	AgentID         string    `gorm:"column:agent_id;not null;index:idx_agent_config_agent_package,priority:1"`
	PackageID       string    `gorm:"column:package_id;not null;index:idx_agent_config_agent_package,priority:2"`
	Configuration   []byte    `gorm:"column:configuration;not null"`
	EncryptedFields []byte    `gorm:"column:encrypted_fields"`
	Status          string    `gorm:"column:status;not null"`
	Version         int       `gorm:"column:version;not null;default:1"`
	CreatedAt       time.Time `gorm:"column:created_at;autoCreateTime"`
	UpdatedAt       time.Time `gorm:"column:updated_at;autoUpdateTime"`
	CreatedBy       *string   `gorm:"column:created_by"`
	UpdatedBy       *string   `gorm:"column:updated_by"`
}

func (AgentConfigurationModel) TableName

func (AgentConfigurationModel) TableName() string

type AgentDIDModel

type AgentDIDModel struct {
	DID                string    `gorm:"column:did;primaryKey"`
	AgentNodeID        string    `gorm:"column:agent_node_id;not null;index"`
	AgentFieldServerID string    `gorm:"column:agentfield_server_id;not null;index"`
	PublicKeyJWK       string    `gorm:"column:public_key_jwk;not null"`
	DerivationPath     string    `gorm:"column:derivation_path;not null"`
	Reasoners          string    `gorm:"column:reasoners;default:'{}'"`
	Skills             string    `gorm:"column:skills;default:'{}'"`
	Status             string    `gorm:"column:status;not null;default:'active'"`
	RegisteredAt       time.Time `gorm:"column:registered_at;autoCreateTime"`
	CreatedAt          time.Time `gorm:"column:created_at;autoCreateTime"`
	UpdatedAt          time.Time `gorm:"column:updated_at;autoUpdateTime"`
}

func (AgentDIDModel) TableName

func (AgentDIDModel) TableName() string

type AgentExecutionModel

type AgentExecutionModel struct {
	ID           int64     `gorm:"column:id;primaryKey;autoIncrement"`
	WorkflowID   string    `gorm:"column:workflow_id;not null;index"`
	SessionID    *string   `gorm:"column:session_id;index"`
	AgentNodeID  string    `gorm:"column:agent_node_id;not null;index"`
	ReasonerID   string    `gorm:"column:reasoner_id;not null;index"`
	InputData    []byte    `gorm:"column:input_data"`
	OutputData   []byte    `gorm:"column:output_data"`
	InputSize    int       `gorm:"column:input_size"`
	OutputSize   int       `gorm:"column:output_size"`
	DurationMS   int       `gorm:"column:duration_ms;not null"`
	Status       string    `gorm:"column:status;not null;index"`
	ErrorMessage *string   `gorm:"column:error_message"`
	UserID       *string   `gorm:"column:user_id"`
	TeamID       *string   `gorm:"column:team_id"`
	Metadata     []byte    `gorm:"column:metadata"`
	CreatedAt    time.Time `gorm:"column:created_at;autoCreateTime"`
}

func (AgentExecutionModel) TableName

func (AgentExecutionModel) TableName() string

type AgentNodeModel

type AgentNodeModel struct {
	ID                  string     `gorm:"column:id;primaryKey"`
	Version             string     `gorm:"column:version;primaryKey;not null;default:''"`
	GroupID             string     `gorm:"column:group_id;not null;default:'';index"`
	TeamID              string     `gorm:"column:team_id;not null;index"`
	BaseURL             string     `gorm:"column:base_url;not null"`
	TrafficWeight       int        `gorm:"column:traffic_weight;not null;default:100"`
	DeploymentType      string     `gorm:"column:deployment_type;default:'long_running';index"`
	InvocationURL       *string    `gorm:"column:invocation_url"`
	Reasoners           []byte     `gorm:"column:reasoners"`
	Skills              []byte     `gorm:"column:skills"`
	CommunicationConfig []byte     `gorm:"column:communication_config"`
	HealthStatus        string     `gorm:"column:health_status;not null;index"`
	LifecycleStatus     string     `gorm:"column:lifecycle_status;default:'starting';index"`
	LastHeartbeat       *time.Time `gorm:"column:last_heartbeat"`
	RegisteredAt        time.Time  `gorm:"column:registered_at;autoCreateTime"`
	Features            []byte     `gorm:"column:features"`
	Metadata            []byte     `gorm:"column:metadata"`
	ProposedTags        []byte     `gorm:"column:proposed_tags"`
	ApprovedTags        []byte     `gorm:"column:approved_tags"`
}

func (AgentNodeModel) TableName

func (AgentNodeModel) TableName() string

type AgentPackageModel

type AgentPackageModel struct {
	ID                  string    `gorm:"column:id;primaryKey"`
	Name                string    `gorm:"column:name;not null"`
	Version             string    `gorm:"column:version;not null"`
	Description         *string   `gorm:"column:description"`
	Author              *string   `gorm:"column:author"`
	Repository          *string   `gorm:"column:repository"`
	InstallPath         string    `gorm:"column:install_path;not null"`
	ConfigurationSchema []byte    `gorm:"column:configuration_schema"`
	Status              string    `gorm:"column:status;not null"`
	ConfigurationStatus string    `gorm:"column:configuration_status;not null"`
	InstalledAt         time.Time `gorm:"column:installed_at;autoCreateTime"`
	UpdatedAt           time.Time `gorm:"column:updated_at;autoUpdateTime"`
	Metadata            []byte    `gorm:"column:metadata"`
}

func (AgentPackageModel) TableName

func (AgentPackageModel) TableName() string

type AgentTagVCModel

type AgentTagVCModel struct {
	ID         int64      `gorm:"column:id;primaryKey;autoIncrement"`
	AgentID    string     `gorm:"column:agent_id;uniqueIndex;not null"`
	AgentDID   string     `gorm:"column:agent_did;not null;index"`
	VCID       string     `gorm:"column:vc_id;uniqueIndex;not null"`
	VCDocument string     `gorm:"column:vc_document;type:text;not null"`
	Signature  string     `gorm:"column:signature;type:text"`
	IssuedAt   time.Time  `gorm:"column:issued_at;not null"`
	ExpiresAt  *time.Time `gorm:"column:expires_at"`
	RevokedAt  *time.Time `gorm:"column:revoked_at"`
	CreatedAt  time.Time  `gorm:"column:created_at;autoCreateTime"`
	UpdatedAt  time.Time  `gorm:"column:updated_at;autoUpdateTime"`
}

AgentTagVCModel stores signed Agent Tag VCs issued on tag approval.

func (AgentTagVCModel) TableName

func (AgentTagVCModel) TableName() string

type CacheMessage

type CacheMessage struct {
	Channel string
	Payload []byte
}

CacheMessage represents a message received from the cache's pub/sub.

type CacheProvider

type CacheProvider interface {
	Set(key string, value interface{}, ttl time.Duration) error
	Get(key string, dest interface{}) error
	Delete(key string) error
	Exists(key string) bool

	// Pub/Sub for real-time features
	Subscribe(channel string) (<-chan CacheMessage, error)
	Publish(channel string, message interface{}) error
}

CacheProvider is the interface for the high-performance caching layer.

type Change

type Change struct {
	Entity    interface{}
	Table     string
	Type      ChangeType
	Operation func(DBTX) error
	Timestamp time.Time
}

Change represents a single database operation within the unit of work

type ChangeType

type ChangeType int

ChangeType represents the type of change being tracked

const (
	ChangeTypeNew ChangeType = iota
	ChangeTypeDirty
	ChangeTypeDeleted
)

type ComponentDIDModel

type ComponentDIDModel struct {
	DID            string    `gorm:"column:did;primaryKey"`
	AgentDID       string    `gorm:"column:agent_did;not null;index"`
	ComponentType  string    `gorm:"column:component_type;not null;index"`
	FunctionName   string    `gorm:"column:function_name;not null"`
	PublicKeyJWK   string    `gorm:"column:public_key_jwk;not null"`
	DerivationPath string    `gorm:"column:derivation_path;not null"`
	Capabilities   string    `gorm:"column:capabilities;default:'[]'"`
	Tags           string    `gorm:"column:tags;default:'[]'"`
	ExposureLevel  string    `gorm:"column:exposure_level;not null;default:'private'"`
	CreatedAt      time.Time `gorm:"column:created_at;autoCreateTime"`
	UpdatedAt      time.Time `gorm:"column:updated_at;autoUpdateTime"`
}

func (ComponentDIDModel) TableName

func (ComponentDIDModel) TableName() string

type ComponentDIDRequest

type ComponentDIDRequest struct {
	ComponentDID    string
	ComponentType   string
	ComponentName   string
	PublicKeyJWK    string
	DerivationIndex int
}

ComponentDIDRequest represents a component DID to be stored

type ConfigEntry

type ConfigEntry struct {
	Key       string    `json:"key"`
	Value     string    `json:"value"`
	Version   int       `json:"version"`
	CreatedBy string    `json:"created_by,omitempty"`
	UpdatedBy string    `json:"updated_by,omitempty"`
	CreatedAt time.Time `json:"created_at"`
	UpdatedAt time.Time `json:"updated_at"`
}

ConfigEntry represents a database-stored configuration file.

type ConfigStorageModel

type ConfigStorageModel struct {
	ID        int64     `gorm:"column:id;primaryKey;autoIncrement"`
	Key       string    `gorm:"column:key;not null;uniqueIndex"`
	Value     string    `gorm:"column:value;type:text;not null"`
	Version   int       `gorm:"column:version;not null;default:1"`
	CreatedBy *string   `gorm:"column:created_by"`
	UpdatedBy *string   `gorm:"column:updated_by"`
	CreatedAt time.Time `gorm:"column:created_at;autoCreateTime"`
	UpdatedAt time.Time `gorm:"column:updated_at;autoUpdateTime"`
}

ConfigStorageModel stores configuration files in the database. Each record represents a named configuration (e.g. "agentfield.yaml") with versioning for audit trail.

func (ConfigStorageModel) TableName

func (ConfigStorageModel) TableName() string

type DBTX

type DBTX interface {
	ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
	Exec(query string, args ...interface{}) (sql.Result, error)
	QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
	QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
	Query(query string, args ...interface{}) (*sql.Rows, error)
	QueryRow(query string, args ...interface{}) *sql.Row
}

DBTX interface for operations that can run on a db or tx

type DIDDocumentModel

type DIDDocumentModel struct {
	DID          string     `gorm:"column:did;primaryKey"`
	AgentID      string     `gorm:"column:agent_id;not null;index"`
	DIDDocument  []byte     `gorm:"column:did_document;type:jsonb;not null"` // JSONB in PostgreSQL, TEXT in SQLite
	PublicKeyJWK string     `gorm:"column:public_key_jwk;not null"`
	RevokedAt    *time.Time `gorm:"column:revoked_at;index"`
	CreatedAt    time.Time  `gorm:"column:created_at;autoCreateTime"`
	UpdatedAt    time.Time  `gorm:"column:updated_at;autoUpdateTime"`
}

DIDDocumentModel represents a DID document record for did:web resolution.

func (DIDDocumentModel) TableName

func (DIDDocumentModel) TableName() string

type DIDRegistryModel

type DIDRegistryModel struct {
	AgentFieldServerID  string    `gorm:"column:agentfield_server_id;primaryKey"`
	MasterSeedEncrypted []byte    `gorm:"column:master_seed_encrypted;not null"`
	RootDID             string    `gorm:"column:root_did;not null;unique"`
	AgentNodes          string    `gorm:"column:agent_nodes;default:'{}'"`
	TotalDIDs           int       `gorm:"column:total_dids;default:0"`
	CreatedAt           time.Time `gorm:"column:created_at;autoCreateTime"`
	LastKeyRotation     time.Time `gorm:"column:last_key_rotation;autoCreateTime"`
}

func (DIDRegistryModel) TableName

func (DIDRegistryModel) TableName() string

type DuplicateDIDError

type DuplicateDIDError struct {
	DID  string
	Type string // "registry", "agent", or "component"
}

Custom error types for data integrity issues

func (*DuplicateDIDError) Error

func (e *DuplicateDIDError) Error() string

type ExecutionLogEntryModel

type ExecutionLogEntryModel struct {
	EventID           int64     `gorm:"column:event_id;primaryKey;autoIncrement"`
	ExecutionID       string    `gorm:"column:execution_id;not null;index:idx_execution_logs_execution,priority:1"`
	WorkflowID        string    `gorm:"column:workflow_id;not null;index"`
	RunID             *string   `gorm:"column:run_id;index"`
	RootWorkflowID    *string   `gorm:"column:root_workflow_id;index"`
	ParentExecutionID *string   `gorm:"column:parent_execution_id;index"`
	Sequence          int64     `gorm:"column:sequence;not null;index:idx_execution_logs_execution,priority:2"`
	AgentNodeID       string    `gorm:"column:agent_node_id;not null;index"`
	ReasonerID        *string   `gorm:"column:reasoner_id;index"`
	Level             string    `gorm:"column:level;not null;index"`
	Source            string    `gorm:"column:source;not null;index"`
	EventType         *string   `gorm:"column:event_type;index"`
	Message           string    `gorm:"column:message;not null"`
	Attributes        string    `gorm:"column:attributes;default:'{}'"`
	SystemGenerated   bool      `gorm:"column:system_generated;not null;default:false"`
	SDKLanguage       *string   `gorm:"column:sdk_language;index"`
	Attempt           *int      `gorm:"column:attempt"`
	SpanID            *string   `gorm:"column:span_id;index"`
	StepID            *string   `gorm:"column:step_id;index"`
	ErrorCategory     *string   `gorm:"column:error_category;index"`
	EmittedAt         time.Time `gorm:"column:emitted_at;not null;index"`
	RecordedAt        time.Time `gorm:"column:recorded_at;autoCreateTime"`
}

func (ExecutionLogEntryModel) TableName

func (ExecutionLogEntryModel) TableName() string

type ExecutionRecordModel

type ExecutionRecordModel struct {
	ID                int64      `gorm:"column:id;primaryKey;autoIncrement"`
	ExecutionID       string     `gorm:"column:execution_id;not null;uniqueIndex"`
	RunID             string     `gorm:"column:run_id;not null;index"`
	ParentExecutionID *string    `gorm:"column:parent_execution_id;index"`
	AgentNodeID       string     `gorm:"column:agent_node_id;not null;index"`
	ReasonerID        string     `gorm:"column:reasoner_id;not null;index"`
	NodeID            string     `gorm:"column:node_id;not null;index"`
	Status            string     `gorm:"column:status;not null;index"`
	StatusReason      *string    `gorm:"column:status_reason"`
	InputPayload      []byte     `gorm:"column:input_payload"`
	ResultPayload     []byte     `gorm:"column:result_payload"`
	ErrorMessage      *string    `gorm:"column:error_message"`
	InputURI          *string    `gorm:"column:input_uri"`
	ResultURI         *string    `gorm:"column:result_uri"`
	SessionID         *string    `gorm:"column:session_id;index"`
	ActorID           *string    `gorm:"column:actor_id;index"`
	StartedAt         time.Time  `gorm:"column:started_at;not null;index"`
	CompletedAt       *time.Time `gorm:"column:completed_at"`
	DurationMS        *int64     `gorm:"column:duration_ms"`
	Notes             string     `gorm:"column:notes;default:'[]'"`
	CreatedAt         time.Time  `gorm:"column:created_at;autoCreateTime"`
	UpdatedAt         time.Time  `gorm:"column:updated_at;autoUpdateTime"`
}

func (ExecutionRecordModel) TableName

func (ExecutionRecordModel) TableName() string

type ExecutionVCModel

type ExecutionVCModel struct {
	VCID              string    `gorm:"column:vc_id;primaryKey"`
	ExecutionID       string    `gorm:"column:execution_id;not null;index;index:idx_execution_vcs_execution_unique,priority:1"`
	WorkflowID        string    `gorm:"column:workflow_id;not null;index"`
	SessionID         string    `gorm:"column:session_id;not null;index"`
	IssuerDID         string    `gorm:"column:issuer_did;not null;index;index:idx_execution_vcs_execution_unique,priority:2"`
	TargetDID         *string   `gorm:"column:target_did;index;index:idx_execution_vcs_execution_unique,priority:3"`
	CallerDID         string    `gorm:"column:caller_did;not null;index"`
	VCDocument        string    `gorm:"column:vc_document;not null"`
	Signature         string    `gorm:"column:signature;not null"`
	StorageURI        string    `gorm:"column:storage_uri;default:''"`
	DocumentSizeBytes int64     `gorm:"column:document_size_bytes;default:0"`
	InputHash         string    `gorm:"column:input_hash;not null"`
	OutputHash        string    `gorm:"column:output_hash;not null"`
	Status            string    `gorm:"column:status;not null;default:'pending';index"`
	ParentVCID        *string   `gorm:"column:parent_vc_id;index"`
	ChildVCIDs        string    `gorm:"column:child_vc_ids;default:'[]'"`
	CreatedAt         time.Time `gorm:"column:created_at;autoCreateTime;index"`
	UpdatedAt         time.Time `gorm:"column:updated_at;autoUpdateTime"`
}

func (ExecutionVCModel) TableName

func (ExecutionVCModel) TableName() string

type ExecutionWebhookEventModel

type ExecutionWebhookEventModel struct {
	ID           int64     `gorm:"column:id;primaryKey;autoIncrement"`
	ExecutionID  string    `gorm:"column:execution_id;not null;index"`
	EventType    string    `gorm:"column:event_type;not null"`
	Status       string    `gorm:"column:status;not null"`
	HTTPStatus   *int      `gorm:"column:http_status"`
	Payload      *string   `gorm:"column:payload"`
	ResponseBody *string   `gorm:"column:response_body"`
	ErrorMessage *string   `gorm:"column:error_message"`
	CreatedAt    time.Time `gorm:"column:created_at;autoCreateTime"`
}

func (ExecutionWebhookEventModel) TableName

func (ExecutionWebhookEventModel) TableName() string

type ExecutionWebhookModel

type ExecutionWebhookModel struct {
	ExecutionID   string     `gorm:"column:execution_id;primaryKey"`
	URL           string     `gorm:"column:url;not null"`
	Secret        *string    `gorm:"column:secret"`
	Headers       string     `gorm:"column:headers;default:'{}'"`
	Status        string     `gorm:"column:status;not null;default:'pending'"`
	AttemptCount  int        `gorm:"column:attempt_count;not null;default:0"`
	NextAttemptAt *time.Time `gorm:"column:next_attempt_at"`
	LastAttemptAt *time.Time `gorm:"column:last_attempt_at"`
	LastError     *string    `gorm:"column:last_error"`
	CreatedAt     time.Time  `gorm:"column:created_at;autoCreateTime"`
	UpdatedAt     time.Time  `gorm:"column:updated_at;autoUpdateTime"`
}

func (ExecutionWebhookModel) TableName

func (ExecutionWebhookModel) TableName() string

type ForeignKeyConstraintError

type ForeignKeyConstraintError struct {
	Table           string
	Column          string
	ReferencedTable string
	ReferencedValue string
	Operation       string
}

ForeignKeyConstraintError represents a foreign key constraint violation

func (*ForeignKeyConstraintError) Error

func (e *ForeignKeyConstraintError) Error() string

type InvalidExecutionStateTransitionError

type InvalidExecutionStateTransitionError struct {
	ExecutionID  string
	CurrentState string
	NewState     string
	Reason       string
}

func (*InvalidExecutionStateTransitionError) Error

type LocalStorage

type LocalStorage struct {
	// contains filtered or unexported fields
}

LocalStorage implements the StorageProvider and CacheProvider interfaces using SQLite for structured data and BoltDB for key-value data (memory).

CONCURRENCY MODEL: - SQLite is configured with WAL (Write-Ahead Logging) mode for optimal concurrency - Read-only operations (SELECT queries) do NOT acquire writeMutex - they run concurrently - Write operations (INSERT/UPDATE/DELETE) acquire writeMutex for serialization - WAL mode allows multiple concurrent readers with a single writer without blocking - This eliminates the performance bottleneck where analytics queries blocked all writes

func NewLocalStorage

func NewLocalStorage(config LocalStorageConfig) *LocalStorage

NewLocalStorage creates a new instance of LocalStorage.

func NewPostgresStorage

func NewPostgresStorage(config PostgresStorageConfig) *LocalStorage

NewPostgresStorage creates a new instance configured for PostgreSQL.

func (*LocalStorage) AcquireLock

func (ls *LocalStorage) AcquireLock(ctx context.Context, key string, timeout time.Duration) (*types.DistributedLock, error)

AcquireLock attempts to acquire a distributed lock.

func (*LocalStorage) AddToDeadLetterQueue

func (ls *LocalStorage) AddToDeadLetterQueue(ctx context.Context, event *types.ObservabilityEvent, errorMessage string, retryCount int) error

AddToDeadLetterQueue adds a failed event to the dead letter queue.

func (*LocalStorage) BeginTransaction

func (ls *LocalStorage) BeginTransaction() (Transaction, error)

TransactionalStorage methods (not fully implemented for local storage yet)

func (*LocalStorage) CleanupOldExecutions

func (ls *LocalStorage) CleanupOldExecutions(ctx context.Context, retentionPeriod time.Duration, batchSize int) (int, error)

CleanupOldExecutions removes old completed workflow executions based on retention period

func (*LocalStorage) CleanupWorkflow

func (ls *LocalStorage) CleanupWorkflow(ctx context.Context, identifier string, dryRun bool) (*types.WorkflowCleanupResult, error)

CleanupWorkflow deletes all data related to a specific workflow ID or workflow run identifier

func (*LocalStorage) ClearDeadLetterQueue

func (ls *LocalStorage) ClearDeadLetterQueue(ctx context.Context) error

ClearDeadLetterQueue removes all entries from the dead letter queue.

func (*LocalStorage) Close

func (ls *LocalStorage) Close(ctx context.Context) error

Close closes the SQLite and BoltDB connections.

func (*LocalStorage) CountExecutionVCs

func (ls *LocalStorage) CountExecutionVCs(ctx context.Context, filters types.VCFilters) (int, error)

func (*LocalStorage) CreateAccessPolicy

func (ls *LocalStorage) CreateAccessPolicy(ctx context.Context, policy *types.AccessPolicy) error

CreateAccessPolicy creates a new access policy.

func (*LocalStorage) CreateExecutionRecord

func (ls *LocalStorage) CreateExecutionRecord(ctx context.Context, exec *types.Execution) error

CreateExecutionRecord inserts a new execution row using the simplified schema.

func (*LocalStorage) CreateOrUpdateSession

func (ls *LocalStorage) CreateOrUpdateSession(ctx context.Context, session *types.Session) error

CreateOrUpdateSession creates or updates a session record in SQLite

func (*LocalStorage) CreateOrUpdateWorkflow

func (ls *LocalStorage) CreateOrUpdateWorkflow(ctx context.Context, workflow *types.Workflow) error

CreateOrUpdateWorkflow creates or updates a workflow record in SQLite

func (*LocalStorage) Delete

func (ls *LocalStorage) Delete(key string) error

Delete implements the CacheProvider Delete method using the in-memory cache.

func (*LocalStorage) DeleteAccessPolicy

func (ls *LocalStorage) DeleteAccessPolicy(ctx context.Context, id int64) error

DeleteAccessPolicy deletes an access policy by ID.

func (*LocalStorage) DeleteAgentConfiguration

func (ls *LocalStorage) DeleteAgentConfiguration(ctx context.Context, agentID, packageID string) error

DeleteAgentConfiguration deletes an agent configuration record

func (*LocalStorage) DeleteAgentPackage

func (ls *LocalStorage) DeleteAgentPackage(ctx context.Context, packageID string) error

DeleteAgentPackage deletes an agent package record

func (*LocalStorage) DeleteAgentVersion

func (ls *LocalStorage) DeleteAgentVersion(ctx context.Context, id string, version string) error

DeleteAgentVersion deletes a specific agent version row from the agent_nodes table.

func (*LocalStorage) DeleteConfig

func (ls *LocalStorage) DeleteConfig(ctx context.Context, key string) error

DeleteConfig removes a configuration entry by key.

func (*LocalStorage) DeleteFromDeadLetterQueue

func (ls *LocalStorage) DeleteFromDeadLetterQueue(ctx context.Context, ids []int64) error

DeleteFromDeadLetterQueue removes specific entries from the dead letter queue.

func (*LocalStorage) DeleteMemory

func (ls *LocalStorage) DeleteMemory(ctx context.Context, scope, scopeID, key string) error

DeleteMemory deletes a memory record from BoltDB and cache.

func (*LocalStorage) DeleteObservabilityWebhook

func (ls *LocalStorage) DeleteObservabilityWebhook(ctx context.Context) error

DeleteObservabilityWebhook removes the global observability webhook configuration.

func (*LocalStorage) DeleteVector

func (ls *LocalStorage) DeleteVector(ctx context.Context, scope, scopeID, key string) error

DeleteVector removes a stored vector embedding.

func (*LocalStorage) DeleteVectorsByPrefix

func (ls *LocalStorage) DeleteVectorsByPrefix(ctx context.Context, scope, scopeID, prefix string) (int, error)

DeleteVectorsByPrefix deletes all vectors whose key starts with the given prefix.

func (*LocalStorage) Exists

func (ls *LocalStorage) Exists(key string) bool

Exists implements the CacheProvider Exists method using the in-memory cache.

func (*LocalStorage) Get

func (ls *LocalStorage) Get(key string, dest interface{}) error

Get implements the CacheProvider Get method using the in-memory cache.

func (*LocalStorage) GetAccessPolicies

func (ls *LocalStorage) GetAccessPolicies(ctx context.Context) ([]*types.AccessPolicy, error)

GetAccessPolicies retrieves all enabled access policies, sorted by priority descending.

func (*LocalStorage) GetAccessPolicyByID

func (ls *LocalStorage) GetAccessPolicyByID(ctx context.Context, id int64) (*types.AccessPolicy, error)

GetAccessPolicyByID retrieves a single access policy by its ID.

func (*LocalStorage) GetAgent

func (ls *LocalStorage) GetAgent(ctx context.Context, id string) (*types.AgentNode, error)

GetAgent retrieves the default (unversioned) agent node record by ID. It filters for version = ” to return only the default agent. Use GetAgentVersion for a specific version, or ListAgentVersions for all versions.

func (*LocalStorage) GetAgentConfiguration

func (ls *LocalStorage) GetAgentConfiguration(ctx context.Context, agentID, packageID string) (*types.AgentConfiguration, error)

GetAgentConfiguration retrieves an agent configuration record from SQLite

func (*LocalStorage) GetAgentDID

func (ls *LocalStorage) GetAgentDID(ctx context.Context, agentID string) (*types.AgentDIDInfo, error)

func (*LocalStorage) GetAgentFieldServerDID

func (ls *LocalStorage) GetAgentFieldServerDID(ctx context.Context, agentfieldServerID string) (*types.AgentFieldServerDIDInfo, error)

func (*LocalStorage) GetAgentPackage

func (ls *LocalStorage) GetAgentPackage(ctx context.Context, packageID string) (*types.AgentPackage, error)

GetAgentPackage retrieves an agent package record from SQLite

func (*LocalStorage) GetAgentTagVC

func (ls *LocalStorage) GetAgentTagVC(ctx context.Context, agentID string) (*types.AgentTagVCRecord, error)

GetAgentTagVC retrieves an agent's tag VC record.

func (*LocalStorage) GetAgentVersion

func (ls *LocalStorage) GetAgentVersion(ctx context.Context, id string, version string) (*types.AgentNode, error)

GetAgentVersion retrieves a specific (id, version) agent node.

func (*LocalStorage) GetComponentDID

func (ls *LocalStorage) GetComponentDID(ctx context.Context, componentID string) (*types.ComponentDIDInfo, error)

func (*LocalStorage) GetConfig

func (ls *LocalStorage) GetConfig(ctx context.Context, key string) (*ConfigEntry, error)

GetConfig retrieves a configuration entry by key.

func (*LocalStorage) GetDID

func (ls *LocalStorage) GetDID(ctx context.Context, did string) (*types.DIDRegistryEntry, error)

func (*LocalStorage) GetDIDDocument

func (ls *LocalStorage) GetDIDDocument(ctx context.Context, did string) (*types.DIDDocumentRecord, error)

GetDIDDocument retrieves a DID document by its DID.

func (*LocalStorage) GetDIDDocumentByAgentID

func (ls *LocalStorage) GetDIDDocumentByAgentID(ctx context.Context, agentID string) (*types.DIDDocumentRecord, error)

GetDIDDocumentByAgentID retrieves a DID document by agent ID.

func (*LocalStorage) GetDeadLetterQueue

func (ls *LocalStorage) GetDeadLetterQueue(ctx context.Context, limit, offset int) ([]types.ObservabilityDeadLetterEntry, error)

GetDeadLetterQueue returns entries from the dead letter queue with pagination.

func (*LocalStorage) GetDeadLetterQueueCount

func (ls *LocalStorage) GetDeadLetterQueueCount(ctx context.Context) (int64, error)

GetDeadLetterQueueCount returns the number of entries in the dead letter queue.

func (*LocalStorage) GetEventHistory

func (ls *LocalStorage) GetEventHistory(ctx context.Context, filter types.EventFilter) ([]*types.MemoryChangeEvent, error)

GetEventHistory retrieves a list of memory change events based on a filter.

func (*LocalStorage) GetExecution

func (ls *LocalStorage) GetExecution(ctx context.Context, id int64) (*types.AgentExecution, error)

GetExecution retrieves an agent execution record from SQLite by ID.

func (*LocalStorage) GetExecutionEventBus

func (ls *LocalStorage) GetExecutionEventBus() *events.ExecutionEventBus

GetExecutionEventBus returns the execution event bus for real-time updates

func (*LocalStorage) GetExecutionLogEventBus

func (ls *LocalStorage) GetExecutionLogEventBus() *events.EventBus[*types.ExecutionLogEntry]

GetExecutionLogEventBus returns the bus for structured execution logs.

func (*LocalStorage) GetExecutionRecord

func (ls *LocalStorage) GetExecutionRecord(ctx context.Context, executionID string) (*types.Execution, error)

GetExecutionRecord fetches a single execution row by execution_id.

func (*LocalStorage) GetExecutionVC

func (ls *LocalStorage) GetExecutionVC(ctx context.Context, vcID string) (*types.ExecutionVCInfo, error)

func (*LocalStorage) GetExecutionWebhook

func (ls *LocalStorage) GetExecutionWebhook(ctx context.Context, executionID string) (*types.ExecutionWebhook, error)

GetExecutionWebhook fetches the webhook registration for the given execution.

func (*LocalStorage) GetFullExecutionVC

func (ls *LocalStorage) GetFullExecutionVC(vcID string) (json.RawMessage, string, error)

GetFullExecutionVC retrieves the full execution VC including the VC document and signature

func (*LocalStorage) GetLockStatus

func (ls *LocalStorage) GetLockStatus(ctx context.Context, key string) (*types.DistributedLock, error)

GetLockStatus retrieves the status of a distributed lock.

func (*LocalStorage) GetMemory

func (ls *LocalStorage) GetMemory(ctx context.Context, scope, scopeID, key string) (*types.Memory, error)

GetMemory retrieves a memory record from BoltDB or cache.

func (*LocalStorage) GetObservabilityWebhook

func (ls *LocalStorage) GetObservabilityWebhook(ctx context.Context) (*types.ObservabilityWebhookConfig, error)

GetObservabilityWebhook retrieves the global observability webhook configuration. Returns nil if no webhook is configured.

func (*LocalStorage) GetReasonerExecutionHistory

func (ls *LocalStorage) GetReasonerExecutionHistory(ctx context.Context, reasonerID string, page, limit int) (*types.ReasonerExecutionHistory, error)

GetReasonerExecutionHistory retrieves paginated execution history for a specific reasoner This is a read-only operation that leverages SQLite WAL mode for concurrent access

func (*LocalStorage) GetReasonerPerformanceMetrics

func (ls *LocalStorage) GetReasonerPerformanceMetrics(ctx context.Context, reasonerID string) (*types.ReasonerPerformanceMetrics, error)

GetReasonerPerformanceMetrics retrieves performance metrics for a specific reasoner This is a read-only operation that leverages SQLite WAL mode for concurrent access

func (*LocalStorage) GetSession

func (ls *LocalStorage) GetSession(ctx context.Context, sessionID string) (*types.Session, error)

GetSession retrieves a session record from SQLite by ID

func (*LocalStorage) GetVector

func (ls *LocalStorage) GetVector(ctx context.Context, scope, scopeID, key string) (*types.VectorRecord, error)

GetVector retrieves a vector embedding by key.

func (*LocalStorage) GetWorkflow

func (ls *LocalStorage) GetWorkflow(ctx context.Context, workflowID string) (*types.Workflow, error)

GetWorkflow retrieves a workflow record from SQLite by ID

func (*LocalStorage) GetWorkflowExecution

func (ls *LocalStorage) GetWorkflowExecution(ctx context.Context, executionID string) (*types.WorkflowExecution, error)

GetWorkflowExecution retrieves a workflow execution record from SQLite by ID

func (*LocalStorage) GetWorkflowExecutionEventBus

func (ls *LocalStorage) GetWorkflowExecutionEventBus() *events.EventBus[*types.WorkflowExecutionEvent]

GetWorkflowExecutionEventBus returns the bus for workflow execution events.

func (*LocalStorage) GetWorkflowRun

func (ls *LocalStorage) GetWorkflowRun(ctx context.Context, runID string) (*types.WorkflowRun, error)

func (*LocalStorage) GetWorkflowVC

func (ls *LocalStorage) GetWorkflowVC(ctx context.Context, workflowVCID string) (*types.WorkflowVCInfo, error)

func (*LocalStorage) HasExecutionWebhook

func (ls *LocalStorage) HasExecutionWebhook(ctx context.Context, executionID string) (bool, error)

HasExecutionWebhook indicates whether an execution has a registered webhook.

func (*LocalStorage) HealthCheck

func (ls *LocalStorage) HealthCheck(ctx context.Context) error

HealthCheck checks the health of the local storage including database integrity.

func (*LocalStorage) Initialize

func (ls *LocalStorage) Initialize(ctx context.Context, config StorageConfig) error

Initialize sets up the SQLite and BoltDB databases.

func (*LocalStorage) ListAgentDIDs

func (ls *LocalStorage) ListAgentDIDs(ctx context.Context) ([]*types.AgentDIDInfo, error)

func (*LocalStorage) ListAgentFieldServerDIDs

func (ls *LocalStorage) ListAgentFieldServerDIDs(ctx context.Context) ([]*types.AgentFieldServerDIDInfo, error)

func (*LocalStorage) ListAgentGroups

func (ls *LocalStorage) ListAgentGroups(ctx context.Context, teamID string) ([]types.AgentGroupSummary, error)

ListAgentGroups returns distinct agent groups with summary info for a team.

func (*LocalStorage) ListAgentTagVCs

func (ls *LocalStorage) ListAgentTagVCs(ctx context.Context) ([]*types.AgentTagVCRecord, error)

ListAgentTagVCs returns all non-revoked agent tag VCs.

func (*LocalStorage) ListAgentVersions

func (ls *LocalStorage) ListAgentVersions(ctx context.Context, id string) ([]*types.AgentNode, error)

ListAgentVersions returns all versioned agents with the given ID (version != ”).

func (*LocalStorage) ListAgents

func (ls *LocalStorage) ListAgents(ctx context.Context, filters types.AgentFilters) ([]*types.AgentNode, error)

ListAgents retrieves agent node records from SQLite based on filters.

func (*LocalStorage) ListAgentsByGroup

func (ls *LocalStorage) ListAgentsByGroup(ctx context.Context, groupID string) ([]*types.AgentNode, error)

ListAgentsByGroup returns all agents belonging to a specific group.

func (*LocalStorage) ListAgentsByLifecycleStatus

func (ls *LocalStorage) ListAgentsByLifecycleStatus(ctx context.Context, status types.AgentLifecycleStatus) ([]*types.AgentNode, error)

ListAgentsByLifecycleStatus lists agents filtered by lifecycle status.

func (*LocalStorage) ListComponentDIDs

func (ls *LocalStorage) ListComponentDIDs(ctx context.Context, agentDID string) ([]*types.ComponentDIDInfo, error)

func (*LocalStorage) ListConfigs

func (ls *LocalStorage) ListConfigs(ctx context.Context) ([]*ConfigEntry, error)

ListConfigs returns all stored configuration entries.

func (*LocalStorage) ListDIDDocuments

func (ls *LocalStorage) ListDIDDocuments(ctx context.Context) ([]*types.DIDDocumentRecord, error)

ListDIDDocuments lists all DID documents.

func (*LocalStorage) ListDIDs

func (ls *LocalStorage) ListDIDs(ctx context.Context) ([]*types.DIDRegistryEntry, error)

func (*LocalStorage) ListDueExecutionWebhooks

func (ls *LocalStorage) ListDueExecutionWebhooks(ctx context.Context, limit int) ([]*types.ExecutionWebhook, error)

ListDueExecutionWebhooks returns webhook registrations that are ready for delivery.

func (*LocalStorage) ListExecutionLogEntries

func (ls *LocalStorage) ListExecutionLogEntries(ctx context.Context, executionID string, afterSeq *int64, limit int, levels []string, nodeIDs []string, sources []string, query string) ([]*types.ExecutionLogEntry, error)

ListExecutionLogEntries retrieves structured execution logs ordered by sequence.

func (*LocalStorage) ListExecutionVCs

func (ls *LocalStorage) ListExecutionVCs(ctx context.Context, filters types.VCFilters) ([]*types.ExecutionVCInfo, error)

func (*LocalStorage) ListExecutionWebhookEvents

func (ls *LocalStorage) ListExecutionWebhookEvents(ctx context.Context, executionID string) ([]*types.ExecutionWebhookEvent, error)

ListExecutionWebhookEvents returns webhook attempts ordered by creation time.

func (*LocalStorage) ListExecutionWebhookEventsBatch

func (ls *LocalStorage) ListExecutionWebhookEventsBatch(ctx context.Context, executionIDs []string) (map[string][]*types.ExecutionWebhookEvent, error)

ListExecutionWebhookEventsBatch fetches webhook events for multiple executions in a single query.

func (*LocalStorage) ListExecutionWebhooksRegistered

func (ls *LocalStorage) ListExecutionWebhooksRegistered(ctx context.Context, executionIDs []string) (map[string]bool, error)

ListExecutionWebhooksRegistered returns a map of execution IDs that have webhook registrations.

func (*LocalStorage) ListMemory

func (ls *LocalStorage) ListMemory(ctx context.Context, scope, scopeID string) ([]*types.Memory, error)

ListMemory retrieves all memory records for a given scope and scope ID from BoltDB.

func (*LocalStorage) ListWorkflowExecutionEvents

func (ls *LocalStorage) ListWorkflowExecutionEvents(ctx context.Context, executionID string, afterSeq *int64, limit int) ([]*types.WorkflowExecutionEvent, error)

ListWorkflowExecutionEvents retrieves execution events ordered by sequence.

func (*LocalStorage) ListWorkflowVCStatusSummaries

func (ls *LocalStorage) ListWorkflowVCStatusSummaries(ctx context.Context, workflowIDs []string) ([]*types.WorkflowVCStatusAggregation, error)

func (*LocalStorage) ListWorkflowVCs

func (ls *LocalStorage) ListWorkflowVCs(ctx context.Context, workflowID string) ([]*types.WorkflowVCInfo, error)

func (*LocalStorage) MarkStaleExecutions

func (ls *LocalStorage) MarkStaleExecutions(ctx context.Context, staleAfter time.Duration, limit int) (int, error)

MarkStaleExecutions updates executions stuck in non-terminal states beyond the provided timeout. Staleness is determined by updated_at (last activity) rather than started_at, so legitimately long-running executions that are still making progress are not incorrectly timed out.

INVARIANT: callers must ensure updated_at is bumped on every meaningful execution activity. If updated_at is not maintained, active executions may be incorrectly reaped. Uses COALESCE(updated_at, created_at, started_at) to handle rows where updated_at may be NULL.

func (*LocalStorage) MarkStaleWorkflowExecutions

func (ls *LocalStorage) MarkStaleWorkflowExecutions(ctx context.Context, staleAfter time.Duration, limit int) (int, error)

MarkStaleWorkflowExecutions updates workflow executions stuck in non-terminal states when their updated_at timestamp exceeds the staleAfter threshold. This catches orphaned child executions whose parent failed without cascading cancellation.

See MarkStaleExecutions for the updated_at invariant and COALESCE fallback rationale.

func (*LocalStorage) NewUnitOfWork

func (ls *LocalStorage) NewUnitOfWork() UnitOfWork

NewUnitOfWork creates a new unit of work instance for this storage

func (*LocalStorage) NewWorkflowUnitOfWork

func (ls *LocalStorage) NewWorkflowUnitOfWork() WorkflowUnitOfWork

NewWorkflowUnitOfWork creates a new workflow-specific unit of work instance for this storage

func (*LocalStorage) PruneExecutionLogEntries

func (ls *LocalStorage) PruneExecutionLogEntries(ctx context.Context, executionID string, maxEntries int, olderThan time.Time) error

PruneExecutionLogEntries trims old or excessive execution logs for a single execution.

func (*LocalStorage) Publish

func (ls *LocalStorage) Publish(channel string, message interface{}) error

Publish implements the CacheProvider Publish method using local pub/sub.

func (*LocalStorage) PublishMemoryChange

func (ls *LocalStorage) PublishMemoryChange(ctx context.Context, event types.MemoryChangeEvent) error

PublishMemoryChange implements the StorageProvider PublishMemoryChange method using local pub/sub.

func (*LocalStorage) QueryAgentConfigurations

func (ls *LocalStorage) QueryAgentConfigurations(ctx context.Context, filters types.ConfigurationFilters) ([]*types.AgentConfiguration, error)

QueryAgentConfigurations retrieves agent configuration records from SQLite based on filters

func (*LocalStorage) QueryAgentPackages

func (ls *LocalStorage) QueryAgentPackages(ctx context.Context, filters types.PackageFilters) ([]*types.AgentPackage, error)

QueryAgentPackages retrieves agent package records from SQLite based on filters

func (*LocalStorage) QueryExecutionRecords

func (ls *LocalStorage) QueryExecutionRecords(ctx context.Context, filter types.ExecutionFilter) ([]*types.Execution, error)

QueryExecutionRecords runs a filtered query returning all matching executions.

func (*LocalStorage) QueryExecutions

func (ls *LocalStorage) QueryExecutions(ctx context.Context, filters types.ExecutionFilters) ([]*types.AgentExecution, error)

QueryExecutions retrieves agent execution records based on filters using GORM.

func (*LocalStorage) QueryRunSummaries

func (ls *LocalStorage) QueryRunSummaries(ctx context.Context, filter types.ExecutionFilter) ([]*RunSummaryAggregation, int, error)

QueryRunSummaries returns aggregated statistics for workflow runs without fetching all execution records. The implementation uses a single GROUP BY query plus a lightweight COUNT for total runs to stay fast even when page_size is large.

func (*LocalStorage) QuerySessions

func (ls *LocalStorage) QuerySessions(ctx context.Context, filters types.SessionFilters) ([]*types.Session, error)

QuerySessions retrieves session records from SQLite based on filters

func (*LocalStorage) QueryWorkflowDAG

func (ls *LocalStorage) QueryWorkflowDAG(ctx context.Context, rootWorkflowID string) ([]*types.WorkflowExecution, error)

QueryWorkflowDAG retrieves a complete workflow DAG using recursive CTE for optimal performance

func (*LocalStorage) QueryWorkflowExecutions

func (ls *LocalStorage) QueryWorkflowExecutions(ctx context.Context, filters types.WorkflowExecutionFilters) ([]*types.WorkflowExecution, error)

QueryWorkflowExecutions retrieves workflow execution records from SQLite based on filters

func (*LocalStorage) QueryWorkflows

func (ls *LocalStorage) QueryWorkflows(ctx context.Context, filters types.WorkflowFilters) ([]*types.Workflow, error)

QueryWorkflows retrieves workflow records from SQLite based on filters

func (*LocalStorage) RegisterAgent

func (ls *LocalStorage) RegisterAgent(ctx context.Context, agent *types.AgentNode) error

RegisterAgent stores an agent node record in SQLite.

func (*LocalStorage) RegisterExecutionWebhook

func (ls *LocalStorage) RegisterExecutionWebhook(ctx context.Context, webhook *types.ExecutionWebhook) error

RegisterExecutionWebhook stores or updates the webhook registration for an execution.

func (*LocalStorage) ReleaseLock

func (ls *LocalStorage) ReleaseLock(ctx context.Context, lockID string) error

ReleaseLock releases a distributed lock.

func (*LocalStorage) RenewLock

func (ls *LocalStorage) RenewLock(ctx context.Context, lockID string) (*types.DistributedLock, error)

RenewLock renews a distributed lock to extend its TTL.

func (*LocalStorage) RetryStaleWorkflowExecutions

func (ls *LocalStorage) RetryStaleWorkflowExecutions(ctx context.Context, staleAfter time.Duration, maxRetries int, limit int) ([]string, error)

RetryStaleWorkflowExecutions finds stale workflow executions that haven't exceeded maxRetries and resets both workflow_executions and executions back to "pending" so the paired records stay in sync for the retry path.

func (*LocalStorage) RevokeAgentTagVC

func (ls *LocalStorage) RevokeAgentTagVC(ctx context.Context, agentID string) error

RevokeAgentTagVC marks an agent's tag VC as revoked.

func (*LocalStorage) RevokeDIDDocument

func (ls *LocalStorage) RevokeDIDDocument(ctx context.Context, did string) error

RevokeDIDDocument revokes a DID document by setting its revoked_at timestamp.

func (*LocalStorage) Set

func (ls *LocalStorage) Set(key string, value interface{}, ttl time.Duration) error

Set implements the CacheProvider Set method using the in-memory cache.

func (*LocalStorage) SetConfig

func (ls *LocalStorage) SetConfig(ctx context.Context, key string, value string, updatedBy string) error

SetConfig upserts a configuration entry in the database. On conflict (duplicate key), it increments the version and updates the value.

func (*LocalStorage) SetMemory

func (ls *LocalStorage) SetMemory(ctx context.Context, memory *types.Memory) error

SetMemory stores a memory record in BoltDB.

func (*LocalStorage) SetObservabilityWebhook

func (ls *LocalStorage) SetObservabilityWebhook(ctx context.Context, config *types.ObservabilityWebhookConfig) error

SetObservabilityWebhook stores or updates the global observability webhook configuration. Uses upsert pattern to handle both insert and update.

func (*LocalStorage) SetVector

func (ls *LocalStorage) SetVector(ctx context.Context, record *types.VectorRecord) error

SetVector stores or updates a vector embedding for the specified scope/key.

func (*LocalStorage) SimilaritySearch

func (ls *LocalStorage) SimilaritySearch(ctx context.Context, scope, scopeID string, queryEmbedding []float32, topK int, filters map[string]interface{}) ([]*types.VectorSearchResult, error)

SimilaritySearch performs a similarity search within a scope using the configured vector backend.

func (*LocalStorage) StoreAgentConfiguration

func (ls *LocalStorage) StoreAgentConfiguration(ctx context.Context, config *types.AgentConfiguration) error

StoreAgentConfiguration stores an agent configuration record in SQLite

func (*LocalStorage) StoreAgentDID

func (ls *LocalStorage) StoreAgentDID(ctx context.Context, agentID, agentDID, agentfieldServerDID, publicKeyJWK string, derivationIndex int) error

Agent DID operations

func (*LocalStorage) StoreAgentDIDWithComponents

func (ls *LocalStorage) StoreAgentDIDWithComponents(ctx context.Context, agentID, agentDID, agentfieldServerDID, publicKeyJWK string, derivationIndex int, components []ComponentDIDRequest) error

StoreAgentDIDWithComponents stores an agent DID along with its component DIDs in a single transaction

func (*LocalStorage) StoreAgentFieldServerDID

func (ls *LocalStorage) StoreAgentFieldServerDID(ctx context.Context, agentfieldServerID, rootDID string, masterSeed []byte, createdAt, lastKeyRotation time.Time) error

AgentField Server DID operations

func (*LocalStorage) StoreAgentPackage

func (ls *LocalStorage) StoreAgentPackage(ctx context.Context, pkg *types.AgentPackage) error

StoreAgentPackage stores an agent package record in SQLite

func (*LocalStorage) StoreAgentTagVC

func (ls *LocalStorage) StoreAgentTagVC(ctx context.Context, agentID, agentDID, vcID, vcDocument, signature string, issuedAt time.Time, expiresAt *time.Time) error

StoreAgentTagVC stores or replaces an agent's tag VC.

func (*LocalStorage) StoreComponentDID

func (ls *LocalStorage) StoreComponentDID(ctx context.Context, componentID, componentDID, agentDID, componentType, componentName string, derivationIndex int) error

Component DID operations

func (*LocalStorage) StoreDID

func (ls *LocalStorage) StoreDID(ctx context.Context, did string, didDocument, publicKey, privateKeyRef, derivationPath string) error

DID Registry operations

func (*LocalStorage) StoreDIDDocument

func (ls *LocalStorage) StoreDIDDocument(ctx context.Context, record *types.DIDDocumentRecord) error

StoreDIDDocument stores a DID document record.

func (*LocalStorage) StoreEvent

func (ls *LocalStorage) StoreEvent(ctx context.Context, event *types.MemoryChangeEvent) error

StoreEvent saves a memory change event to the database.

func (*LocalStorage) StoreExecution

func (ls *LocalStorage) StoreExecution(ctx context.Context, execution *types.AgentExecution) error

StoreExecution stores an agent execution record in SQLite.

func (*LocalStorage) StoreExecutionLogEntries

func (ls *LocalStorage) StoreExecutionLogEntries(ctx context.Context, executionID string, entries []*types.ExecutionLogEntry) error

StoreExecutionLogEntries atomically stores a batch of structured execution logs for one execution.

func (*LocalStorage) StoreExecutionLogEntry

func (ls *LocalStorage) StoreExecutionLogEntry(ctx context.Context, entry *types.ExecutionLogEntry) error

StoreExecutionLogEntry inserts a structured execution log entry and publishes it to subscribers.

func (*LocalStorage) StoreExecutionVC

func (ls *LocalStorage) StoreExecutionVC(ctx context.Context, vcID, executionID, workflowID, sessionID, issuerDID, targetDID, callerDID, inputHash, outputHash, status string, vcDocument []byte, signature string, storageURI string, documentSizeBytes int64) error

Execution VC operations

func (*LocalStorage) StoreExecutionWebhookEvent

func (ls *LocalStorage) StoreExecutionWebhookEvent(ctx context.Context, event *types.ExecutionWebhookEvent) error

StoreExecutionWebhookEvent records webhook delivery attempts for SQLite deployments.

func (*LocalStorage) StoreWorkflowExecution

func (ls *LocalStorage) StoreWorkflowExecution(ctx context.Context, execution *types.WorkflowExecution) error

StoreWorkflowExecution stores a workflow execution record in SQLite with UPSERT capability Uses transactions to prevent database corruption - SQLite WAL mode handles write coordination

func (*LocalStorage) StoreWorkflowExecutionEvent

func (ls *LocalStorage) StoreWorkflowExecutionEvent(ctx context.Context, event *types.WorkflowExecutionEvent) error

StoreWorkflowExecutionEvent inserts an immutable execution event into SQLite.

func (*LocalStorage) StoreWorkflowExecutionWithUnitOfWork

func (ls *LocalStorage) StoreWorkflowExecutionWithUnitOfWork(ctx context.Context, execution *types.WorkflowExecution) error

StoreWorkflowExecutionWithUnitOfWork demonstrates using Unit of Work for atomic operations

func (*LocalStorage) StoreWorkflowRun

func (ls *LocalStorage) StoreWorkflowRun(ctx context.Context, run *types.WorkflowRun) error

func (*LocalStorage) StoreWorkflowRunEvent

func (ls *LocalStorage) StoreWorkflowRunEvent(ctx context.Context, event *types.WorkflowRunEvent) error

func (*LocalStorage) StoreWorkflowStep

func (ls *LocalStorage) StoreWorkflowStep(ctx context.Context, step *types.WorkflowStep) error

func (*LocalStorage) StoreWorkflowVC

func (ls *LocalStorage) StoreWorkflowVC(ctx context.Context, workflowVCID, workflowID, sessionID string, componentVCIDs []string, status string, startTime, endTime *time.Time, totalSteps, completedSteps int, storageURI string, documentSizeBytes int64) error

Workflow VC operations

func (*LocalStorage) Subscribe

func (ls *LocalStorage) Subscribe(channel string) (<-chan CacheMessage, error)

Subscribe implements the CacheProvider Subscribe method using local pub/sub.

func (*LocalStorage) SubscribeToMemoryChanges

func (ls *LocalStorage) SubscribeToMemoryChanges(ctx context.Context, scope, scopeID string) (<-chan types.MemoryChangeEvent, error)

SubscribeToMemoryChanges implements the StorageProvider SubscribeToMemoryChanges method using local pub/sub.

func (*LocalStorage) TryMarkExecutionWebhookInFlight

func (ls *LocalStorage) TryMarkExecutionWebhookInFlight(ctx context.Context, executionID string, now time.Time) (bool, error)

TryMarkExecutionWebhookInFlight atomically marks a webhook registration as delivering.

func (*LocalStorage) UpdateAccessPolicy

func (ls *LocalStorage) UpdateAccessPolicy(ctx context.Context, policy *types.AccessPolicy) error

UpdateAccessPolicy updates an existing access policy.

func (*LocalStorage) UpdateAgentConfiguration

func (ls *LocalStorage) UpdateAgentConfiguration(ctx context.Context, config *types.AgentConfiguration) error

UpdateAgentConfiguration updates an existing agent configuration record

func (*LocalStorage) UpdateAgentHealth

func (ls *LocalStorage) UpdateAgentHealth(ctx context.Context, id string, status types.HealthStatus) error

UpdateAgentHealth updates the health status of an agent node in SQLite. IMPORTANT: This method ONLY updates health_status, never last_heartbeat (only heartbeat endpoint should do that)

func (*LocalStorage) UpdateAgentHealthAtomic

func (ls *LocalStorage) UpdateAgentHealthAtomic(ctx context.Context, id string, status types.HealthStatus, expectedLastHeartbeat *time.Time) error

UpdateAgentHealthAtomic updates the health status of an agent node atomically with optimistic locking. If expectedLastHeartbeat is provided, the update will only succeed if the current last_heartbeat matches. This prevents race conditions between health monitor and heartbeat updates. IMPORTANT: This method ONLY updates health_status, never last_heartbeat (only heartbeat endpoint should do that)

func (*LocalStorage) UpdateAgentHeartbeat

func (ls *LocalStorage) UpdateAgentHeartbeat(ctx context.Context, id string, version string, heartbeatTime time.Time) error

UpdateAgentHeartbeat updates only the heartbeat timestamp of an agent node in SQLite. If version is empty, it updates the default (unversioned) agent.

func (*LocalStorage) UpdateAgentLifecycleStatus

func (ls *LocalStorage) UpdateAgentLifecycleStatus(ctx context.Context, id string, status types.AgentLifecycleStatus) error

UpdateAgentLifecycleStatus updates the lifecycle status of an agent node in SQLite.

func (*LocalStorage) UpdateAgentPackage

func (ls *LocalStorage) UpdateAgentPackage(ctx context.Context, pkg *types.AgentPackage) error

UpdateAgentPackage updates an existing agent package record

func (*LocalStorage) UpdateAgentTrafficWeight

func (ls *LocalStorage) UpdateAgentTrafficWeight(ctx context.Context, id string, version string, weight int) error

UpdateAgentTrafficWeight sets the traffic_weight for a specific (id, version) pair.

func (*LocalStorage) UpdateAgentVersion

func (ls *LocalStorage) UpdateAgentVersion(ctx context.Context, id string, version string) error

UpdateAgentVersion updates only the version field for an agent node.

func (*LocalStorage) UpdateExecutionRecord

func (ls *LocalStorage) UpdateExecutionRecord(ctx context.Context, executionID string, updater func(*types.Execution) (*types.Execution, error)) (*types.Execution, error)

UpdateExecutionRecord applies an update callback atomically. The callback mutates a types.Execution copy and the result gets persisted.

func (*LocalStorage) UpdateExecutionWebhookState

func (ls *LocalStorage) UpdateExecutionWebhookState(ctx context.Context, executionID string, update types.ExecutionWebhookStateUpdate) error

UpdateExecutionWebhookState persists the latest delivery state for a webhook registration.

func (*LocalStorage) UpdateWorkflowExecution

func (ls *LocalStorage) UpdateWorkflowExecution(ctx context.Context, executionID string, updateFunc func(execution *types.WorkflowExecution) (*types.WorkflowExecution, error)) error

UpdateWorkflowExecution atomically updates a workflow execution using a user-provided update function This eliminates the read-modify-write race condition by performing the entire operation within a single transaction

func (*LocalStorage) ValidateAgentConfiguration

func (ls *LocalStorage) ValidateAgentConfiguration(ctx context.Context, agentID, packageID string, config map[string]interface{}) (*types.ConfigurationValidationResult, error)

ValidateAgentConfiguration validates a configuration against the package schema

type LocalStorageConfig

type LocalStorageConfig struct {
	DatabasePath string `yaml:"database_path" mapstructure:"database_path"`
	KVStorePath  string `yaml:"kv_store_path" mapstructure:"kv_store_path"`
}

LocalStorageConfig holds configuration for the local storage provider.

type ObservabilityDeadLetterQueueModel

type ObservabilityDeadLetterQueueModel struct {
	ID             int64     `gorm:"column:id;primaryKey;autoIncrement"`
	EventType      string    `gorm:"column:event_type;not null"`
	EventSource    string    `gorm:"column:event_source;not null"`
	EventTimestamp time.Time `gorm:"column:event_timestamp;not null"`
	Payload        string    `gorm:"column:payload;not null"`
	ErrorMessage   string    `gorm:"column:error_message;not null"`
	RetryCount     int       `gorm:"column:retry_count;not null;default:0"`
	CreatedAt      time.Time `gorm:"column:created_at;autoCreateTime"`
}

ObservabilityDeadLetterQueueModel represents failed observability events for retry.

func (ObservabilityDeadLetterQueueModel) TableName

type ObservabilityWebhookModel

type ObservabilityWebhookModel struct {
	ID        string    `gorm:"column:id;primaryKey;default:'global'"`
	URL       string    `gorm:"column:url;not null"`
	Secret    *string   `gorm:"column:secret"`
	Headers   string    `gorm:"column:headers;default:'{}'"`
	Enabled   bool      `gorm:"column:enabled;not null;default:true"`
	CreatedAt time.Time `gorm:"column:created_at;autoCreateTime"`
	UpdatedAt time.Time `gorm:"column:updated_at;autoUpdateTime"`
}

ObservabilityWebhookModel represents the global observability webhook configuration. This is a singleton table with only one row (id='global').

func (ObservabilityWebhookModel) TableName

func (ObservabilityWebhookModel) TableName() string

type PostgresStorageConfig

type PostgresStorageConfig struct {
	DSN             string        `yaml:"dsn" mapstructure:"dsn"`
	URL             string        `yaml:"url" mapstructure:"url"`
	Host            string        `yaml:"host" mapstructure:"host"`
	Port            int           `yaml:"port" mapstructure:"port"`
	Database        string        `yaml:"database" mapstructure:"database"`
	User            string        `yaml:"user" mapstructure:"user"`
	Password        string        `yaml:"password" mapstructure:"password"`
	SSLMode         string        `yaml:"sslmode" mapstructure:"sslmode"`
	AdminDatabase   string        `yaml:"admin_database" mapstructure:"admin_database"`
	ConnMaxLifetime time.Duration `yaml:"conn_max_lifetime" mapstructure:"conn_max_lifetime"`
	MaxOpenConns    int           `yaml:"max_open_conns" mapstructure:"max_open_conns"`
	MaxIdleConns    int           `yaml:"max_idle_conns" mapstructure:"max_idle_conns"`
}

PostgresStorageConfig holds configuration for the PostgreSQL storage provider.

type RunSummaryAggregation

type RunSummaryAggregation struct {
	RunID            string
	TotalExecutions  int
	StatusCounts     map[string]int
	EarliestStarted  time.Time
	LatestStarted    time.Time
	RootExecutionID  *string
	RootStatus       *string
	RootAgentNodeID  *string
	RootReasonerID   *string
	SessionID        *string
	ActorID          *string
	MaxDepth         int
	ActiveExecutions int
}

RunSummaryAggregation holds aggregated statistics for a single workflow run

type SchemaMigrationModel

type SchemaMigrationModel struct {
	Version     string    `gorm:"column:version;primaryKey"`
	AppliedAt   time.Time `gorm:"column:applied_at;autoCreateTime"`
	Description string    `gorm:"column:description"`
}

func (SchemaMigrationModel) TableName

func (SchemaMigrationModel) TableName() string

type SessionModel

type SessionModel struct {
	SessionID       string    `gorm:"column:session_id;primaryKey"`
	ActorID         *string   `gorm:"column:actor_id;index"`
	SessionName     *string   `gorm:"column:session_name"`
	ParentSessionID *string   `gorm:"column:parent_session_id"`
	RootSessionID   *string   `gorm:"column:root_session_id;index"`
	TotalWorkflows  int       `gorm:"column:total_workflows;default:0"`
	TotalExecutions int       `gorm:"column:total_executions;default:0"`
	TotalDurationMS int       `gorm:"column:total_duration_ms;default:0"`
	StartedAt       time.Time `gorm:"column:started_at;not null"`
	LastActivityAt  time.Time `gorm:"column:last_activity_at;not null"`
	CreatedAt       time.Time `gorm:"column:created_at;autoCreateTime"`
	UpdatedAt       time.Time `gorm:"column:updated_at;autoUpdateTime"`
}

func (SessionModel) TableName

func (SessionModel) TableName() string

type StorageConfig

type StorageConfig struct {
	Mode     string                `yaml:"mode" mapstructure:"mode"`
	Local    LocalStorageConfig    `yaml:"local" mapstructure:"local"`
	Postgres PostgresStorageConfig `yaml:"postgres" mapstructure:"postgres"`
	Vector   VectorStoreConfig     `yaml:"vector" mapstructure:"vector"`
}

StorageConfig holds the configuration for the storage provider.

type StorageFactory

type StorageFactory struct{}

StorageFactory is responsible for creating the appropriate storage backend.

func (*StorageFactory) CreateStorage

func (sf *StorageFactory) CreateStorage(config StorageConfig) (StorageProvider, CacheProvider, error)

CreateStorage creates a StorageProvider and CacheProvider based on the configuration.

type StorageProvider

type StorageProvider interface {
	// Lifecycle
	// Initialize prepares the storage backend using the provided configuration.
	// The ctx controls setup lifetime, and config supplies backend settings.
	// Returns an error if initialization fails.
	Initialize(ctx context.Context, config StorageConfig) error
	// Close releases storage resources associated with the provider.
	// The ctx bounds shutdown and cleanup work for the backend.
	// Returns an error if resources cannot be closed cleanly.
	Close(ctx context.Context) error
	// HealthCheck verifies that the storage backend is reachable and operational.
	// The ctx controls how long the health probe may run.
	// Returns an error when the backend is unhealthy or unreachable.
	HealthCheck(ctx context.Context) error

	// Execution operations
	// StoreExecution persists an agent execution record.
	// The ctx scopes the write, and execution contains the execution data to store.
	// Returns an error if the execution cannot be saved.
	StoreExecution(ctx context.Context, execution *types.AgentExecution) error
	// GetExecution fetches an agent execution by its numeric identifier.
	// The ctx scopes the read, and id selects the execution to load.
	// Returns the execution record or an error if it cannot be found or read.
	GetExecution(ctx context.Context, id int64) (*types.AgentExecution, error)
	// QueryExecutions lists agent executions matching the provided filters.
	// The ctx scopes the query, and filters define the selection criteria.
	// Returns matching executions or an error if the query fails.
	QueryExecutions(ctx context.Context, filters types.ExecutionFilters) ([]*types.AgentExecution, error)

	// Workflow execution operations
	// StoreWorkflowExecution persists a workflow execution record.
	// The ctx scopes the write, and execution contains the workflow execution data.
	// Returns an error if the workflow execution cannot be saved.
	StoreWorkflowExecution(ctx context.Context, execution *types.WorkflowExecution) error
	// GetWorkflowExecution retrieves a workflow execution by its identifier.
	// The ctx scopes the read, and executionID identifies the workflow execution.
	// Returns the workflow execution or an error if it is missing or unreadable.
	GetWorkflowExecution(ctx context.Context, executionID string) (*types.WorkflowExecution, error)
	// QueryWorkflowExecutions lists workflow executions that match the given filters.
	// The ctx scopes the query, and filters specify which executions to include.
	// Returns matching workflow executions or an error if the query fails.
	QueryWorkflowExecutions(ctx context.Context, filters types.WorkflowExecutionFilters) ([]*types.WorkflowExecution, error)
	// UpdateWorkflowExecution applies an update function to a workflow execution record.
	// The ctx scopes the operation, executionID selects the record, and updateFunc mutates it.
	// Returns an error if the execution cannot be loaded, updated, or saved.
	UpdateWorkflowExecution(ctx context.Context, executionID string, updateFunc func(execution *types.WorkflowExecution) (*types.WorkflowExecution, error)) error
	// CreateExecutionRecord creates a primary execution record.
	// The ctx scopes the write, and execution contains the record to insert.
	// Returns an error if the record cannot be created.
	CreateExecutionRecord(ctx context.Context, execution *types.Execution) error
	// GetExecutionRecord retrieves a primary execution record by identifier.
	// The ctx scopes the read, and executionID selects the record to fetch.
	// Returns the execution record or an error if it is missing or unreadable.
	GetExecutionRecord(ctx context.Context, executionID string) (*types.Execution, error)
	// UpdateExecutionRecord updates a primary execution record through a callback.
	// The ctx scopes the operation, executionID selects the record, and update transforms it.
	// Returns the updated execution or an error if the update cannot be completed.
	UpdateExecutionRecord(ctx context.Context, executionID string, update func(*types.Execution) (*types.Execution, error)) (*types.Execution, error)
	// QueryExecutionRecords lists execution records that satisfy the provided filter.
	// The ctx scopes the query, and filter describes the records to include.
	// Returns matching execution records or an error if the query fails.
	QueryExecutionRecords(ctx context.Context, filter types.ExecutionFilter) ([]*types.Execution, error)
	// QueryRunSummaries aggregates execution records into run-level summaries.
	// The ctx scopes the query, and filter limits which executions are aggregated.
	// Returns summaries, a total count, and an error if aggregation fails.
	QueryRunSummaries(ctx context.Context, filter types.ExecutionFilter) ([]*RunSummaryAggregation, int, error)
	// RegisterExecutionWebhook stores webhook metadata for an execution.
	// The ctx scopes the write, and webhook contains the registration details.
	// Returns an error if the webhook cannot be registered.
	RegisterExecutionWebhook(ctx context.Context, webhook *types.ExecutionWebhook) error
	// GetExecutionWebhook retrieves webhook metadata for an execution.
	// The ctx scopes the read, and executionID identifies the webhook to load.
	// Returns the webhook record or an error if it is missing or unreadable.
	GetExecutionWebhook(ctx context.Context, executionID string) (*types.ExecutionWebhook, error)
	// ListDueExecutionWebhooks returns pending execution webhooks ready for delivery.
	// The ctx scopes the query, and limit caps how many due webhooks to return.
	// Returns due webhooks or an error if the lookup fails.
	ListDueExecutionWebhooks(ctx context.Context, limit int) ([]*types.ExecutionWebhook, error)
	// TryMarkExecutionWebhookInFlight marks a due webhook as being processed.
	// The ctx scopes the update, executionID selects the webhook, and now timestamps the attempt.
	// Returns true if the webhook was claimed, or false with an error on failure.
	TryMarkExecutionWebhookInFlight(ctx context.Context, executionID string, now time.Time) (bool, error)
	// UpdateExecutionWebhookState updates stored state for an execution webhook.
	// The ctx scopes the write, executionID selects the webhook, and update carries new state.
	// Returns an error if the webhook state cannot be updated.
	UpdateExecutionWebhookState(ctx context.Context, executionID string, update types.ExecutionWebhookStateUpdate) error
	// HasExecutionWebhook reports whether an execution has a registered webhook.
	// The ctx scopes the lookup, and executionID identifies the execution to check.
	// Returns a boolean result or an error if the check fails.
	HasExecutionWebhook(ctx context.Context, executionID string) (bool, error)
	// ListExecutionWebhooksRegistered reports webhook registration status for executions.
	// The ctx scopes the lookup, and executionIDs lists the executions to inspect.
	// Returns a map keyed by execution ID or an error if the query fails.
	ListExecutionWebhooksRegistered(ctx context.Context, executionIDs []string) (map[string]bool, error)
	// StoreExecutionWebhookEvent persists a webhook delivery event for an execution.
	// The ctx scopes the write, and event contains the webhook event payload.
	// Returns an error if the event cannot be stored.
	StoreExecutionWebhookEvent(ctx context.Context, event *types.ExecutionWebhookEvent) error
	// ListExecutionWebhookEvents retrieves webhook events for one execution.
	// The ctx scopes the query, and executionID identifies which events to return.
	// Returns the event list or an error if the lookup fails.
	ListExecutionWebhookEvents(ctx context.Context, executionID string) ([]*types.ExecutionWebhookEvent, error)
	// ListExecutionWebhookEventsBatch retrieves webhook events for multiple executions.
	// The ctx scopes the query, and executionIDs identifies the executions to load.
	// Returns events keyed by execution ID or an error if the lookup fails.
	ListExecutionWebhookEventsBatch(ctx context.Context, executionIDs []string) (map[string][]*types.ExecutionWebhookEvent, error)
	// StoreWorkflowExecutionEvent persists an event associated with a workflow execution.
	// The ctx scopes the write, and event contains the workflow execution event data.
	// Returns an error if the event cannot be stored.
	StoreWorkflowExecutionEvent(ctx context.Context, event *types.WorkflowExecutionEvent) error
	// ListWorkflowExecutionEvents lists events for a workflow execution in sequence order.
	// The ctx scopes the query, executionID selects the execution, and afterSeq and limit page results.
	// Returns matching workflow events or an error if the query fails.
	ListWorkflowExecutionEvents(ctx context.Context, executionID string, afterSeq *int64, limit int) ([]*types.WorkflowExecutionEvent, error)
	StoreExecutionLogEntry(ctx context.Context, entry *types.ExecutionLogEntry) error
	ListExecutionLogEntries(ctx context.Context, executionID string, afterSeq *int64, limit int, levels []string, nodeIDs []string, sources []string, query string) ([]*types.ExecutionLogEntry, error)
	PruneExecutionLogEntries(ctx context.Context, executionID string, maxEntries int, olderThan time.Time) error

	// Execution cleanup operations
	// CleanupOldExecutions deletes execution data older than the retention period.
	// The ctx scopes the cleanup, retentionPeriod sets the age threshold, and batchSize limits each pass.
	// Returns the number of cleaned executions or an error if cleanup fails.
	CleanupOldExecutions(ctx context.Context, retentionPeriod time.Duration, batchSize int) (int, error)
	// MarkStaleExecutions marks long-running executions as stale.
	// The ctx scopes the update, staleAfter sets the cutoff age, and limit bounds affected rows.
	// Returns the number marked stale or an error if the update fails.
	MarkStaleExecutions(ctx context.Context, staleAfter time.Duration, limit int) (int, error)
	// MarkStaleWorkflowExecutions marks long-running workflow executions as stale.
	// The ctx scopes the update, staleAfter sets the cutoff age, and limit bounds affected rows.
	// Returns the number marked stale or an error if the update fails.
	MarkStaleWorkflowExecutions(ctx context.Context, staleAfter time.Duration, limit int) (int, error)
	// RetryStaleWorkflowExecutions resets stale workflow executions with retry_count < maxRetries
	// back to "pending" status with incremented retry_count. Returns IDs of retried executions.
	RetryStaleWorkflowExecutions(ctx context.Context, staleAfter time.Duration, maxRetries int, limit int) ([]string, error)

	// Workflow cleanup operations - deletes all data related to a workflow ID
	// CleanupWorkflow removes all stored data associated with a workflow.
	// The ctx scopes the operation, workflowID selects the workflow, and dryRun skips destructive changes.
	// Returns a cleanup summary or an error if the operation fails.
	CleanupWorkflow(ctx context.Context, workflowID string, dryRun bool) (*types.WorkflowCleanupResult, error)

	// DAG operations - optimized single-query DAG building
	// QueryWorkflowDAG loads workflow executions needed to assemble a workflow DAG.
	// The ctx scopes the query, and rootWorkflowID identifies the DAG root workflow.
	// Returns workflow execution nodes or an error if the query fails.
	QueryWorkflowDAG(ctx context.Context, rootWorkflowID string) ([]*types.WorkflowExecution, error)

	// Workflow operations
	// CreateOrUpdateWorkflow stores a workflow, creating it or replacing existing state.
	// The ctx scopes the write, and workflow contains the workflow definition to persist.
	// Returns an error if the workflow cannot be saved.
	CreateOrUpdateWorkflow(ctx context.Context, workflow *types.Workflow) error
	// GetWorkflow retrieves a workflow by its identifier.
	// The ctx scopes the read, and workflowID selects the workflow to load.
	// Returns the workflow or an error if it is missing or unreadable.
	GetWorkflow(ctx context.Context, workflowID string) (*types.Workflow, error)
	// QueryWorkflows lists workflows that satisfy the provided filters.
	// The ctx scopes the query, and filters define which workflows to include.
	// Returns matching workflows or an error if the query fails.
	QueryWorkflows(ctx context.Context, filters types.WorkflowFilters) ([]*types.Workflow, error)

	// Session operations
	// CreateOrUpdateSession stores a session, creating it or replacing existing state.
	// The ctx scopes the write, and session contains the session data to persist.
	// Returns an error if the session cannot be saved.
	CreateOrUpdateSession(ctx context.Context, session *types.Session) error
	// GetSession retrieves a session by its identifier.
	// The ctx scopes the read, and sessionID selects the session to load.
	// Returns the session or an error if it is missing or unreadable.
	GetSession(ctx context.Context, sessionID string) (*types.Session, error)
	// QuerySessions lists sessions matching the provided filters.
	// The ctx scopes the query, and filters define which sessions to return.
	// Returns matching sessions or an error if the query fails.
	QuerySessions(ctx context.Context, filters types.SessionFilters) ([]*types.Session, error)

	// Memory operations
	// SetMemory stores or replaces a memory record.
	// The ctx scopes the write, and memory contains the scoped memory data to persist.
	// Returns an error if the memory record cannot be saved.
	SetMemory(ctx context.Context, memory *types.Memory) error
	// GetMemory retrieves a memory record by scope, scope ID, and key.
	// The ctx scopes the read, and scope, scopeID, and key identify the memory entry.
	// Returns the memory record or an error if it is missing or unreadable.
	GetMemory(ctx context.Context, scope, scopeID, key string) (*types.Memory, error)
	// DeleteMemory removes a memory record by scope, scope ID, and key.
	// The ctx scopes the delete, and scope, scopeID, and key identify the record.
	// Returns an error if the memory record cannot be deleted.
	DeleteMemory(ctx context.Context, scope, scopeID, key string) error
	// ListMemory lists memory records within a scope and scope identifier.
	// The ctx scopes the query, and scope and scopeID identify the memory namespace.
	// Returns matching memory records or an error if the query fails.
	ListMemory(ctx context.Context, scope, scopeID string) ([]*types.Memory, error)
	// SetVector stores or replaces a vector record.
	// The ctx scopes the write, and record contains the vector data to persist.
	// Returns an error if the vector record cannot be saved.
	SetVector(ctx context.Context, record *types.VectorRecord) error
	// GetVector retrieves a vector record by scope, scope ID, and key.
	// The ctx scopes the read, and scope, scopeID, and key identify the vector entry.
	// Returns the vector record or an error if it is missing or unreadable.
	GetVector(ctx context.Context, scope, scopeID, key string) (*types.VectorRecord, error)
	// DeleteVector removes a vector record by scope, scope ID, and key.
	// The ctx scopes the delete, and scope, scopeID, and key identify the vector entry.
	// Returns an error if the vector record cannot be deleted.
	DeleteVector(ctx context.Context, scope, scopeID, key string) error
	// DeleteVectorsByPrefix removes vector records that share a key prefix.
	// The ctx scopes the delete, and scope, scopeID, and prefix define the target set.
	// Returns the number deleted or an error if the operation fails.
	DeleteVectorsByPrefix(ctx context.Context, scope, scopeID, prefix string) (int, error)
	// SimilaritySearch finds the closest vector matches for an embedding query.
	// The ctx scopes the search, scope and scopeID choose the namespace, and queryEmbedding, topK, and filters shape results.
	// Returns ranked matches or an error if the search fails.
	SimilaritySearch(ctx context.Context, scope, scopeID string, queryEmbedding []float32, topK int, filters map[string]interface{}) ([]*types.VectorSearchResult, error)

	// Event operations
	// StoreEvent persists a memory change event.
	// The ctx scopes the write, and event contains the change event data to record.
	// Returns an error if the event cannot be stored.
	StoreEvent(ctx context.Context, event *types.MemoryChangeEvent) error
	// GetEventHistory retrieves memory change events matching a filter.
	// The ctx scopes the query, and filter defines which events to include.
	// Returns matching events or an error if the query fails.
	GetEventHistory(ctx context.Context, filter types.EventFilter) ([]*types.MemoryChangeEvent, error)

	// Distributed Lock operations
	// AcquireLock attempts to create a distributed lock for a key.
	// The ctx scopes the request, key identifies the lock, and timeout sets its lease duration.
	// Returns the acquired lock or an error if the lock cannot be obtained.
	AcquireLock(ctx context.Context, key string, timeout time.Duration) (*types.DistributedLock, error)
	// ReleaseLock releases a distributed lock by lock identifier.
	// The ctx scopes the request, and lockID identifies the held lock to release.
	// Returns an error if the lock cannot be released.
	ReleaseLock(ctx context.Context, lockID string) error
	// RenewLock extends the lease for an existing distributed lock.
	// The ctx scopes the request, and lockID identifies the lock to renew.
	// Returns the renewed lock state or an error if renewal fails.
	RenewLock(ctx context.Context, lockID string) (*types.DistributedLock, error)
	// GetLockStatus retrieves the current state of a distributed lock key.
	// The ctx scopes the lookup, and key identifies the lock to inspect.
	// Returns the lock status or an error if it cannot be read.
	GetLockStatus(ctx context.Context, key string) (*types.DistributedLock, error)

	// Agent registry
	// RegisterAgent stores agent metadata in the registry.
	// The ctx scopes the write, and agent contains the agent node information to persist.
	// Returns an error if the agent cannot be registered.
	RegisterAgent(ctx context.Context, agent *types.AgentNode) error
	// GetAgent retrieves the current agent record by identifier.
	// The ctx scopes the read, and id identifies the agent to load.
	// Returns the agent record or an error if it is missing or unreadable.
	GetAgent(ctx context.Context, id string) (*types.AgentNode, error)
	// GetAgentVersion retrieves a specific version of an agent record.
	// The ctx scopes the read, and id and version identify the versioned agent entry.
	// Returns the agent version or an error if it is missing or unreadable.
	GetAgentVersion(ctx context.Context, id string, version string) (*types.AgentNode, error)
	// DeleteAgentVersion removes a specific agent version from the registry.
	// The ctx scopes the delete, and id and version identify the versioned agent entry.
	// Returns an error if the agent version cannot be deleted.
	DeleteAgentVersion(ctx context.Context, id string, version string) error
	// ListAgentVersions lists all stored versions for an agent.
	// The ctx scopes the query, and id identifies the agent whose versions to return.
	// Returns version records or an error if the query fails.
	ListAgentVersions(ctx context.Context, id string) ([]*types.AgentNode, error)
	// ListAgents lists registered agents matching the provided filters.
	// The ctx scopes the query, and filters define which agents to include.
	// Returns matching agents or an error if the query fails.
	ListAgents(ctx context.Context, filters types.AgentFilters) ([]*types.AgentNode, error)
	// ListAgentsByGroup lists agents assigned to a group.
	// The ctx scopes the query, and groupID identifies the group to inspect.
	// Returns matching agents or an error if the query fails.
	ListAgentsByGroup(ctx context.Context, groupID string) ([]*types.AgentNode, error)
	// ListAgentGroups lists agent group summaries for a team.
	// The ctx scopes the query, and teamID identifies the team to inspect.
	// Returns group summaries or an error if the query fails.
	ListAgentGroups(ctx context.Context, teamID string) ([]types.AgentGroupSummary, error)
	// UpdateAgentHealth records a new health status for an agent.
	// The ctx scopes the update, and id and status identify the agent and new health state.
	// Returns an error if the health status cannot be updated.
	UpdateAgentHealth(ctx context.Context, id string, status types.HealthStatus) error
	// UpdateAgentHealthAtomic updates agent health when the last heartbeat matches expectations.
	// The ctx scopes the update, and id, status, and expectedLastHeartbeat control the conditional write.
	// Returns an error if the conditional health update fails.
	UpdateAgentHealthAtomic(ctx context.Context, id string, status types.HealthStatus, expectedLastHeartbeat *time.Time) error
	// UpdateAgentHeartbeat records a heartbeat timestamp for an agent version.
	// The ctx scopes the update, and id, version, and heartbeatTime identify the heartbeat to save.
	// Returns an error if the heartbeat cannot be updated.
	UpdateAgentHeartbeat(ctx context.Context, id string, version string, heartbeatTime time.Time) error
	// UpdateAgentLifecycleStatus updates the lifecycle status of an agent.
	// The ctx scopes the update, and id and status identify the agent and new lifecycle state.
	// Returns an error if the lifecycle status cannot be updated.
	UpdateAgentLifecycleStatus(ctx context.Context, id string, status types.AgentLifecycleStatus) error
	// UpdateAgentVersion marks which version is active for an agent.
	// The ctx scopes the update, and id and version identify the agent and selected version.
	// Returns an error if the active version cannot be updated.
	UpdateAgentVersion(ctx context.Context, id string, version string) error
	// UpdateAgentTrafficWeight changes the traffic weight assigned to an agent version.
	// The ctx scopes the update, and id, version, and weight identify the target routing change.
	// Returns an error if the traffic weight cannot be updated.
	UpdateAgentTrafficWeight(ctx context.Context, id string, version string, weight int) error

	// Configuration Storage (database-backed config files)
	// SetConfig stores a configuration value under a key.
	// The ctx scopes the write, and key, value, and updatedBy describe the config change.
	// Returns an error if the configuration cannot be saved.
	SetConfig(ctx context.Context, key string, value string, updatedBy string) error
	// GetConfig retrieves a stored configuration entry by key.
	// The ctx scopes the read, and key identifies the configuration entry to load.
	// Returns the configuration entry or an error if it is missing or unreadable.
	GetConfig(ctx context.Context, key string) (*ConfigEntry, error)
	// ListConfigs returns all stored configuration entries.
	// The ctx scopes the query and does not require additional parameters.
	// Returns configuration entries or an error if the query fails.
	ListConfigs(ctx context.Context) ([]*ConfigEntry, error)
	// DeleteConfig removes a stored configuration entry by key.
	// The ctx scopes the delete, and key identifies the configuration entry to remove.
	// Returns an error if the configuration cannot be deleted.
	DeleteConfig(ctx context.Context, key string) error

	// Reasoner Performance and History
	// GetReasonerPerformanceMetrics retrieves aggregated performance metrics for a reasoner.
	// The ctx scopes the read, and reasonerID identifies the reasoner to inspect.
	// Returns performance metrics or an error if they cannot be loaded.
	GetReasonerPerformanceMetrics(ctx context.Context, reasonerID string) (*types.ReasonerPerformanceMetrics, error)
	// GetReasonerExecutionHistory retrieves paginated execution history for a reasoner.
	// The ctx scopes the query, and reasonerID, page, and limit define which history page to load.
	// Returns execution history or an error if the query fails.
	GetReasonerExecutionHistory(ctx context.Context, reasonerID string, page, limit int) (*types.ReasonerExecutionHistory, error)

	// Agent Configuration Management
	// StoreAgentConfiguration persists an agent configuration record.
	// The ctx scopes the write, and config contains the configuration to store.
	// Returns an error if the configuration cannot be saved.
	StoreAgentConfiguration(ctx context.Context, config *types.AgentConfiguration) error
	// GetAgentConfiguration retrieves configuration for an agent and package pair.
	// The ctx scopes the read, and agentID and packageID identify the configuration to load.
	// Returns the agent configuration or an error if it is missing or unreadable.
	GetAgentConfiguration(ctx context.Context, agentID, packageID string) (*types.AgentConfiguration, error)
	// QueryAgentConfigurations lists agent configurations that match provided filters.
	// The ctx scopes the query, and filters define which configurations to include.
	// Returns matching configurations or an error if the query fails.
	QueryAgentConfigurations(ctx context.Context, filters types.ConfigurationFilters) ([]*types.AgentConfiguration, error)
	// UpdateAgentConfiguration replaces stored state for an agent configuration.
	// The ctx scopes the write, and config contains the updated configuration values.
	// Returns an error if the configuration cannot be updated.
	UpdateAgentConfiguration(ctx context.Context, config *types.AgentConfiguration) error
	// DeleteAgentConfiguration removes configuration for an agent and package pair.
	// The ctx scopes the delete, and agentID and packageID identify the configuration to remove.
	// Returns an error if the configuration cannot be deleted.
	DeleteAgentConfiguration(ctx context.Context, agentID, packageID string) error
	// ValidateAgentConfiguration validates configuration data for an agent package.
	// The ctx scopes the check, and agentID, packageID, and config identify the configuration to validate.
	// Returns validation results or an error if validation cannot be performed.
	ValidateAgentConfiguration(ctx context.Context, agentID, packageID string, config map[string]interface{}) (*types.ConfigurationValidationResult, error)

	// Agent Package Management
	// StoreAgentPackage persists an agent package record.
	// The ctx scopes the write, and pkg contains the package metadata to store.
	// Returns an error if the package cannot be saved.
	StoreAgentPackage(ctx context.Context, pkg *types.AgentPackage) error
	// GetAgentPackage retrieves an agent package by identifier.
	// The ctx scopes the read, and packageID identifies the package to load.
	// Returns the package record or an error if it is missing or unreadable.
	GetAgentPackage(ctx context.Context, packageID string) (*types.AgentPackage, error)
	// QueryAgentPackages lists agent packages matching the provided filters.
	// The ctx scopes the query, and filters define which packages to include.
	// Returns matching packages or an error if the query fails.
	QueryAgentPackages(ctx context.Context, filters types.PackageFilters) ([]*types.AgentPackage, error)
	// UpdateAgentPackage replaces stored state for an agent package.
	// The ctx scopes the write, and pkg contains the updated package metadata.
	// Returns an error if the package cannot be updated.
	UpdateAgentPackage(ctx context.Context, pkg *types.AgentPackage) error
	// DeleteAgentPackage removes an agent package by identifier.
	// The ctx scopes the delete, and packageID identifies the package to remove.
	// Returns an error if the package cannot be deleted.
	DeleteAgentPackage(ctx context.Context, packageID string) error

	// Real-time features (optional, may be handled by CacheProvider)
	// SubscribeToMemoryChanges opens a stream of memory change events for a scope.
	// The ctx scopes the subscription, and scope and scopeID identify the memory namespace to watch.
	// Returns an event channel or an error if subscription setup fails.
	SubscribeToMemoryChanges(ctx context.Context, scope, scopeID string) (<-chan types.MemoryChangeEvent, error)
	// PublishMemoryChange broadcasts a memory change event to subscribers.
	// The ctx scopes the publish, and event contains the change notification to send.
	// Returns an error if the event cannot be published.
	PublishMemoryChange(ctx context.Context, event types.MemoryChangeEvent) error

	// Execution event bus for real-time updates
	// GetExecutionEventBus returns the in-process event bus for execution updates.
	// This method takes no parameters and exposes the shared execution event bus.
	// Returns the execution event bus pointer.
	GetExecutionEventBus() *events.ExecutionEventBus
	// GetWorkflowExecutionEventBus returns the in-process event bus for workflow execution updates.
	// This method takes no parameters and exposes the shared workflow execution event bus.
	// Returns the workflow execution event bus pointer.
	GetWorkflowExecutionEventBus() *events.EventBus[*types.WorkflowExecutionEvent]
	GetExecutionLogEventBus() *events.EventBus[*types.ExecutionLogEntry]

	// DID Registry operations
	// StoreDID persists DID registry data for a decentralized identifier.
	// The ctx scopes the write, and the DID, document, key, and derivation parameters describe the record to store.
	// Returns an error if the DID record cannot be saved.
	StoreDID(ctx context.Context, did string, didDocument, publicKey, privateKeyRef, derivationPath string) error
	// GetDID retrieves a DID registry entry by decentralized identifier.
	// The ctx scopes the read, and did identifies the registry entry to load.
	// Returns the DID registry entry or an error if it is missing or unreadable.
	GetDID(ctx context.Context, did string) (*types.DIDRegistryEntry, error)
	// ListDIDs lists all stored DID registry entries.
	// The ctx scopes the query and does not require additional parameters.
	// Returns DID registry entries or an error if the query fails.
	ListDIDs(ctx context.Context) ([]*types.DIDRegistryEntry, error)

	// AgentField Server DID operations
	// StoreAgentFieldServerDID stores DID material for an AgentField server.
	// The ctx scopes the write, and the server ID, DID, seed, and rotation timestamps describe the stored record.
	// Returns an error if the server DID cannot be saved.
	StoreAgentFieldServerDID(ctx context.Context, agentfieldServerID, rootDID string, masterSeed []byte, createdAt, lastKeyRotation time.Time) error
	// GetAgentFieldServerDID retrieves DID information for an AgentField server.
	// The ctx scopes the read, and agentfieldServerID identifies the server record to load.
	// Returns the server DID info or an error if it is missing or unreadable.
	GetAgentFieldServerDID(ctx context.Context, agentfieldServerID string) (*types.AgentFieldServerDIDInfo, error)
	// ListAgentFieldServerDIDs lists stored AgentField server DID records.
	// The ctx scopes the query and does not require additional parameters.
	// Returns server DID records or an error if the query fails.
	ListAgentFieldServerDIDs(ctx context.Context) ([]*types.AgentFieldServerDIDInfo, error)

	// Agent DID operations
	// StoreAgentDID stores DID information for an agent.
	// The ctx scopes the write, and the agent, server, key, and derivation parameters describe the DID record.
	// Returns an error if the agent DID cannot be saved.
	StoreAgentDID(ctx context.Context, agentID, agentDID, agentfieldServerDID, publicKeyJWK string, derivationIndex int) error
	// GetAgentDID retrieves DID information for an agent.
	// The ctx scopes the read, and agentID identifies the agent DID record to load.
	// Returns the agent DID info or an error if it is missing or unreadable.
	GetAgentDID(ctx context.Context, agentID string) (*types.AgentDIDInfo, error)
	// ListAgentDIDs lists stored DID records for agents.
	// The ctx scopes the query and does not require additional parameters.
	// Returns agent DID records or an error if the query fails.
	ListAgentDIDs(ctx context.Context) ([]*types.AgentDIDInfo, error)

	// Component DID operations
	// StoreComponentDID stores DID information for a component.
	// The ctx scopes the write, and the component, agent, naming, and derivation parameters describe the DID record.
	// Returns an error if the component DID cannot be saved.
	StoreComponentDID(ctx context.Context, componentID, componentDID, agentDID, componentType, componentName string, derivationIndex int) error
	// GetComponentDID retrieves DID information for a component.
	// The ctx scopes the read, and componentID identifies the component DID record to load.
	// Returns the component DID info or an error if it is missing or unreadable.
	GetComponentDID(ctx context.Context, componentID string) (*types.ComponentDIDInfo, error)
	// ListComponentDIDs lists component DID records associated with an agent DID.
	// The ctx scopes the query, and agentDID identifies the component DID owner to inspect.
	// Returns component DID records or an error if the query fails.
	ListComponentDIDs(ctx context.Context, agentDID string) ([]*types.ComponentDIDInfo, error)

	// Multi-step DID operations with transaction safety
	// StoreAgentDIDWithComponents stores an agent DID and its component DIDs atomically.
	// The ctx scopes the transaction, and the agent, server, key, derivation, and components parameters define the stored records.
	// Returns an error if any part of the transaction fails.
	StoreAgentDIDWithComponents(ctx context.Context, agentID, agentDID, agentfieldServerDID, publicKeyJWK string, derivationIndex int, components []ComponentDIDRequest) error

	// Execution VC operations
	// StoreExecutionVC stores verifiable credential data for an execution.
	// The ctx scopes the write, and the identifiers, hashes, document, signature, and storage fields describe the VC record.
	// Returns an error if the execution VC cannot be saved.
	StoreExecutionVC(ctx context.Context, vcID, executionID, workflowID, sessionID, issuerDID, targetDID, callerDID, inputHash, outputHash, status string, vcDocument []byte, signature string, storageURI string, documentSizeBytes int64) error
	// GetExecutionVC retrieves execution verifiable credential data by VC identifier.
	// The ctx scopes the read, and vcID identifies the execution VC record to load.
	// Returns the execution VC info or an error if it is missing or unreadable.
	GetExecutionVC(ctx context.Context, vcID string) (*types.ExecutionVCInfo, error)
	// ListExecutionVCs lists execution verifiable credentials matching the provided filters.
	// The ctx scopes the query, and filters define which execution VC records to include.
	// Returns matching execution VC records or an error if the query fails.
	ListExecutionVCs(ctx context.Context, filters types.VCFilters) ([]*types.ExecutionVCInfo, error)
	// ListWorkflowVCStatusSummaries aggregates workflow VC status for workflow identifiers.
	// The ctx scopes the query, and workflowIDs identifies the workflows to summarize.
	// Returns workflow VC status summaries or an error if aggregation fails.
	ListWorkflowVCStatusSummaries(ctx context.Context, workflowIDs []string) ([]*types.WorkflowVCStatusAggregation, error)
	// CountExecutionVCs counts execution VC records that match the provided filters.
	// The ctx scopes the query, and filters define which execution VC records to count.
	// Returns the count or an error if the query fails.
	CountExecutionVCs(ctx context.Context, filters types.VCFilters) (int, error)

	// Workflow VC operations
	// StoreWorkflowVC stores verifiable credential data for a workflow.
	// The ctx scopes the write, and the workflow, status, timing, component, and storage parameters describe the VC record.
	// Returns an error if the workflow VC cannot be saved.
	StoreWorkflowVC(ctx context.Context, workflowVCID, workflowID, sessionID string, componentVCIDs []string, status string, startTime, endTime *time.Time, totalSteps, completedSteps int, storageURI string, documentSizeBytes int64) error
	// GetWorkflowVC retrieves workflow verifiable credential data by VC identifier.
	// The ctx scopes the read, and workflowVCID identifies the workflow VC record to load.
	// Returns the workflow VC info or an error if it is missing or unreadable.
	GetWorkflowVC(ctx context.Context, workflowVCID string) (*types.WorkflowVCInfo, error)
	// ListWorkflowVCs lists workflow verifiable credentials for a workflow.
	// The ctx scopes the query, and workflowID identifies the workflow whose VC records to return.
	// Returns matching workflow VC records or an error if the query fails.
	ListWorkflowVCs(ctx context.Context, workflowID string) ([]*types.WorkflowVCInfo, error)

	// Observability Webhook configuration (singleton pattern)
	// GetObservabilityWebhook retrieves the singleton observability webhook configuration.
	// The ctx scopes the read and does not require additional parameters.
	// Returns the webhook configuration or an error if it is missing or unreadable.
	GetObservabilityWebhook(ctx context.Context) (*types.ObservabilityWebhookConfig, error)
	// SetObservabilityWebhook stores the singleton observability webhook configuration.
	// The ctx scopes the write, and config contains the webhook configuration to persist.
	// Returns an error if the configuration cannot be saved.
	SetObservabilityWebhook(ctx context.Context, config *types.ObservabilityWebhookConfig) error
	// DeleteObservabilityWebhook removes the singleton observability webhook configuration.
	// The ctx scopes the delete and does not require additional parameters.
	// Returns an error if the configuration cannot be deleted.
	DeleteObservabilityWebhook(ctx context.Context) error

	// Observability Dead Letter Queue
	// AddToDeadLetterQueue stores a failed observability event for later inspection.
	// The ctx scopes the write, and event, errorMessage, and retryCount describe the failed delivery.
	// Returns an error if the dead-letter entry cannot be saved.
	AddToDeadLetterQueue(ctx context.Context, event *types.ObservabilityEvent, errorMessage string, retryCount int) error
	// GetDeadLetterQueueCount returns the number of stored dead-letter entries.
	// The ctx scopes the read and does not require additional parameters.
	// Returns the entry count or an error if it cannot be determined.
	GetDeadLetterQueueCount(ctx context.Context) (int64, error)
	// GetDeadLetterQueue retrieves paginated dead-letter entries.
	// The ctx scopes the query, and limit and offset define which entries to return.
	// Returns dead-letter entries or an error if the query fails.
	GetDeadLetterQueue(ctx context.Context, limit, offset int) ([]types.ObservabilityDeadLetterEntry, error)
	// DeleteFromDeadLetterQueue removes selected dead-letter entries by identifier.
	// The ctx scopes the delete, and ids lists the entries to remove.
	// Returns an error if the entries cannot be deleted.
	DeleteFromDeadLetterQueue(ctx context.Context, ids []int64) error
	// ClearDeadLetterQueue removes all stored dead-letter entries.
	// The ctx scopes the delete and does not require additional parameters.
	// Returns an error if the queue cannot be cleared.
	ClearDeadLetterQueue(ctx context.Context) error

	// Access policy operations (tag-based authorization)
	// GetAccessPolicies lists all stored access policies.
	// The ctx scopes the query and does not require additional parameters.
	// Returns access policies or an error if the query fails.
	GetAccessPolicies(ctx context.Context) ([]*types.AccessPolicy, error)
	// GetAccessPolicyByID retrieves an access policy by identifier.
	// The ctx scopes the read, and id identifies the access policy to load.
	// Returns the access policy or an error if it is missing or unreadable.
	GetAccessPolicyByID(ctx context.Context, id int64) (*types.AccessPolicy, error)
	// CreateAccessPolicy stores a new access policy.
	// The ctx scopes the write, and policy contains the access policy data to persist.
	// Returns an error if the policy cannot be created.
	CreateAccessPolicy(ctx context.Context, policy *types.AccessPolicy) error
	// UpdateAccessPolicy replaces stored state for an access policy.
	// The ctx scopes the write, and policy contains the updated policy data.
	// Returns an error if the policy cannot be updated.
	UpdateAccessPolicy(ctx context.Context, policy *types.AccessPolicy) error
	// DeleteAccessPolicy removes an access policy by identifier.
	// The ctx scopes the delete, and id identifies the access policy to remove.
	// Returns an error if the policy cannot be deleted.
	DeleteAccessPolicy(ctx context.Context, id int64) error

	// Agent Tag VC operations (tag-based PermissionVC)
	// StoreAgentTagVC stores a tag-based verifiable credential for an agent.
	// The ctx scopes the write, and the agent, credential, signature, and timing parameters describe the VC record.
	// Returns an error if the agent tag VC cannot be saved.
	StoreAgentTagVC(ctx context.Context, agentID, agentDID, vcID, vcDocument, signature string, issuedAt time.Time, expiresAt *time.Time) error
	// GetAgentTagVC retrieves the tag-based verifiable credential for an agent.
	// The ctx scopes the read, and agentID identifies the agent VC record to load.
	// Returns the agent tag VC record or an error if it is missing or unreadable.
	GetAgentTagVC(ctx context.Context, agentID string) (*types.AgentTagVCRecord, error)
	// ListAgentTagVCs lists all stored agent tag verifiable credentials.
	// The ctx scopes the query and does not require additional parameters.
	// Returns agent tag VC records or an error if the query fails.
	ListAgentTagVCs(ctx context.Context) ([]*types.AgentTagVCRecord, error)
	// RevokeAgentTagVC revokes the tag-based verifiable credential for an agent.
	// The ctx scopes the update, and agentID identifies the agent VC record to revoke.
	// Returns an error if the credential cannot be revoked.
	RevokeAgentTagVC(ctx context.Context, agentID string) error

	// DID Document operations (did:web resolution)
	// StoreDIDDocument persists a DID document record.
	// The ctx scopes the write, and record contains the DID document data to store.
	// Returns an error if the DID document cannot be saved.
	StoreDIDDocument(ctx context.Context, record *types.DIDDocumentRecord) error
	// GetDIDDocument retrieves a DID document by DID string.
	// The ctx scopes the read, and did identifies the DID document to load.
	// Returns the DID document record or an error if it is missing or unreadable.
	GetDIDDocument(ctx context.Context, did string) (*types.DIDDocumentRecord, error)
	// GetDIDDocumentByAgentID retrieves a DID document associated with an agent.
	// The ctx scopes the read, and agentID identifies the agent-linked DID document to load.
	// Returns the DID document record or an error if it is missing or unreadable.
	GetDIDDocumentByAgentID(ctx context.Context, agentID string) (*types.DIDDocumentRecord, error)
	// RevokeDIDDocument revokes a DID document by DID string.
	// The ctx scopes the update, and did identifies the DID document to revoke.
	// Returns an error if the DID document cannot be revoked.
	RevokeDIDDocument(ctx context.Context, did string) error
	// ListDIDDocuments lists all stored DID document records.
	// The ctx scopes the query and does not require additional parameters.
	// Returns DID document records or an error if the query fails.
	ListDIDDocuments(ctx context.Context) ([]*types.DIDDocumentRecord, error)

	// Agent lifecycle queries (tag approval workflow)
	// ListAgentsByLifecycleStatus lists agents currently in a lifecycle status.
	// The ctx scopes the query, and status identifies which lifecycle state to filter by.
	// Returns matching agents or an error if the query fails.
	ListAgentsByLifecycleStatus(ctx context.Context, status types.AgentLifecycleStatus) ([]*types.AgentNode, error)
}

StorageProvider is the interface for the primary data storage backend.

type Transaction

type Transaction interface {
	StorageProvider
	Commit() error
	Rollback() error
}

Transaction represents a database transaction.

type UnitOfWork

type UnitOfWork interface {
	// Entity registration
	RegisterNew(entity interface{}, table string, operation func(DBTX) error)
	RegisterDirty(entity interface{}, table string, operation func(DBTX) error)
	RegisterDeleted(id interface{}, table string, operation func(DBTX) error)

	// Transaction management
	Commit() error
	Rollback() error

	// State inspection
	HasChanges() bool
	GetChangeCount() int
	IsActive() bool
}

UnitOfWork manages a collection of changes as a single transaction

func NewUnitOfWork

func NewUnitOfWork(db *sqlDatabase, backend unitOfWorkBackend) UnitOfWork

NewUnitOfWork creates a new unit of work instance

type ValidationError

type ValidationError struct {
	Field   string
	Value   string
	Reason  string
	Context string
}

ValidationError represents a pre-storage validation failure

func (*ValidationError) Error

func (e *ValidationError) Error() string

type VectorDistanceMetric

type VectorDistanceMetric string
const (
	VectorDistanceCosine VectorDistanceMetric = "cosine"
	VectorDistanceDot    VectorDistanceMetric = "dot"
	VectorDistanceL2     VectorDistanceMetric = "l2"
)

type VectorStoreConfig

type VectorStoreConfig struct {
	Enabled  *bool  `yaml:"enabled" mapstructure:"enabled"`
	Distance string `yaml:"distance" mapstructure:"distance"`
}

VectorStoreConfig controls vector storage behavior.

type WorkflowExecutionEventModel

type WorkflowExecutionEventModel struct {
	EventID           int64     `gorm:"column:event_id;primaryKey;autoIncrement"`
	ExecutionID       string    `gorm:"column:execution_id;not null;index:idx_workflow_exec_events_execution,priority:1"`
	WorkflowID        string    `gorm:"column:workflow_id;not null"`
	RunID             *string   `gorm:"column:run_id;index:idx_workflow_exec_events_run,priority:1"`
	ParentExecutionID *string   `gorm:"column:parent_execution_id"`
	Sequence          int64     `gorm:"column:sequence;not null;index:idx_workflow_exec_events_execution,priority:2"`
	PreviousSequence  int64     `gorm:"column:previous_sequence;not null;default:0"`
	EventType         string    `gorm:"column:event_type;not null"`
	Status            *string   `gorm:"column:status"`
	StatusReason      *string   `gorm:"column:status_reason"`
	Payload           string    `gorm:"column:payload;default:'{}'"`
	EmittedAt         time.Time `gorm:"column:emitted_at;not null"`
	RecordedAt        time.Time `gorm:"column:recorded_at;autoCreateTime"`
}

func (WorkflowExecutionEventModel) TableName

func (WorkflowExecutionEventModel) TableName() string

type WorkflowExecutionModel

type WorkflowExecutionModel struct {
	ID                    int64      `gorm:"column:id;primaryKey;autoIncrement"`
	WorkflowID            string     `gorm:"column:workflow_id;not null;index;index:idx_workflow_executions_workflow_status,priority:1"`
	ExecutionID           string     `gorm:"column:execution_id;not null;uniqueIndex"`
	AgentFieldRequestID   string     `gorm:"column:agentfield_request_id;not null;index"`
	RunID                 *string    `gorm:"column:run_id;index"`
	SessionID             *string    `` /* 201-byte string literal not displayed */
	ActorID               *string    `` /* 193-byte string literal not displayed */
	AgentNodeID           string     `` /* 160-byte string literal not displayed */
	ParentWorkflowID      *string    `gorm:"column:parent_workflow_id;index"`
	ParentExecutionID     *string    `gorm:"column:parent_execution_id;index"`
	RootWorkflowID        *string    `gorm:"column:root_workflow_id;index"`
	WorkflowDepth         int        `gorm:"column:workflow_depth;default:0"`
	ReasonerID            string     `gorm:"column:reasoner_id;not null"`
	InputData             []byte     `gorm:"column:input_data"`
	OutputData            []byte     `gorm:"column:output_data"`
	InputSize             int        `gorm:"column:input_size"`
	OutputSize            int        `gorm:"column:output_size"`
	WorkflowName          *string    `gorm:"column:workflow_name"`
	WorkflowTags          string     `gorm:"column:workflow_tags"`
	Status                string     `` /* 493-byte string literal not displayed */
	StartedAt             time.Time  `` /* 377-byte string literal not displayed */
	CompletedAt           *time.Time `gorm:"column:completed_at"`
	DurationMS            int        `gorm:"column:duration_ms"`
	StateVersion          int        `gorm:"column:state_version;not null;default:0"`
	LastEventSequence     int        `gorm:"column:last_event_sequence;not null;default:0"`
	ActiveChildren        int        `gorm:"column:active_children;not null;default:0"`
	PendingChildren       int        `gorm:"column:pending_children;not null;default:0"`
	PendingTerminalStatus *string    `gorm:"column:pending_terminal_status"`
	StatusReason          *string    `gorm:"column:status_reason"`
	LeaseOwner            *string    `gorm:"column:lease_owner"`
	LeaseExpiresAt        *time.Time `gorm:"column:lease_expires_at"`
	ErrorMessage          *string    `gorm:"column:error_message"`
	RetryCount            int        `gorm:"column:retry_count;default:0"`
	ApprovalRequestID     *string    `gorm:"column:approval_request_id;index:idx_workflow_executions_approval_request_id"`
	ApprovalRequestURL    *string    `gorm:"column:approval_request_url"`
	ApprovalStatus        *string    `gorm:"column:approval_status"`
	ApprovalResponse      *string    `gorm:"column:approval_response"`
	ApprovalRequestedAt   *time.Time `gorm:"column:approval_requested_at"`
	ApprovalRespondedAt   *time.Time `gorm:"column:approval_responded_at"`
	ApprovalCallbackURL   *string    `gorm:"column:approval_callback_url"`
	ApprovalExpiresAt     *time.Time `gorm:"column:approval_expires_at"`
	Notes                 string     `gorm:"column:notes;default:'[]'"`
	CreatedAt             time.Time  `gorm:"column:created_at;autoCreateTime"`
	UpdatedAt             time.Time  `gorm:"column:updated_at;autoUpdateTime"`
}

func (WorkflowExecutionModel) TableName

func (WorkflowExecutionModel) TableName() string

type WorkflowModel

type WorkflowModel struct {
	WorkflowID           string     `gorm:"column:workflow_id;primaryKey"`
	WorkflowName         *string    `gorm:"column:workflow_name"`
	WorkflowTags         string     `gorm:"column:workflow_tags"`
	SessionID            *string    `gorm:"column:session_id;index"`
	ActorID              *string    `gorm:"column:actor_id;index"`
	ParentWorkflowID     *string    `gorm:"column:parent_workflow_id"`
	ParentExecutionID    *string    `gorm:"column:parent_execution_id"`
	RootWorkflowID       *string    `gorm:"column:root_workflow_id"`
	WorkflowDepth        int        `gorm:"column:workflow_depth;default:0"`
	TotalExecutions      int        `gorm:"column:total_executions;default:0"`
	SuccessfulExecutions int        `gorm:"column:successful_executions;default:0"`
	FailedExecutions     int        `gorm:"column:failed_executions;default:0"`
	TotalDurationMS      int        `gorm:"column:total_duration_ms;default:0"`
	Status               string     `gorm:"column:status;not null"`
	StartedAt            time.Time  `gorm:"column:started_at;not null"`
	CompletedAt          *time.Time `gorm:"column:completed_at"`
	CreatedAt            time.Time  `gorm:"column:created_at;autoCreateTime"`
	UpdatedAt            time.Time  `gorm:"column:updated_at;autoUpdateTime"`
}

func (WorkflowModel) TableName

func (WorkflowModel) TableName() string

type WorkflowRunEventModel

type WorkflowRunEventModel struct {
	EventID          int64     `gorm:"column:event_id;primaryKey;autoIncrement"`
	RunID            string    `gorm:"column:run_id;not null;index:idx_workflow_run_events_run,priority:1"`
	Sequence         int64     `gorm:"column:sequence;not null;index:idx_workflow_run_events_run,priority:2"`
	PreviousSequence int64     `gorm:"column:previous_sequence;not null;default:0"`
	EventType        string    `gorm:"column:event_type;not null"`
	Status           *string   `gorm:"column:status"`
	StatusReason     *string   `gorm:"column:status_reason"`
	Payload          string    `gorm:"column:payload;default:'{}'"`
	EmittedAt        time.Time `gorm:"column:emitted_at;not null"`
	RecordedAt       time.Time `gorm:"column:recorded_at;autoCreateTime"`
}

func (WorkflowRunEventModel) TableName

func (WorkflowRunEventModel) TableName() string

type WorkflowRunModel

type WorkflowRunModel struct {
	RunID             string     `gorm:"column:run_id;primaryKey"`
	RootWorkflowID    string     `gorm:"column:root_workflow_id;not null;index"`
	RootExecutionID   *string    `gorm:"column:root_execution_id"`
	Status            string     `gorm:"column:status;not null;default:'pending';index"`
	TotalSteps        int        `gorm:"column:total_steps;not null;default:0"`
	CompletedSteps    int        `gorm:"column:completed_steps;not null;default:0"`
	FailedSteps       int        `gorm:"column:failed_steps;not null;default:0"`
	StateVersion      int64      `gorm:"column:state_version;not null;default:0"`
	LastEventSequence int64      `gorm:"column:last_event_sequence;not null;default:0"`
	Metadata          []byte     `gorm:"column:metadata;default:'{}'"`
	CreatedAt         time.Time  `gorm:"column:created_at;autoCreateTime;index"`
	UpdatedAt         time.Time  `gorm:"column:updated_at;autoUpdateTime;index"`
	CompletedAt       *time.Time `gorm:"column:completed_at;index"`
}

func (WorkflowRunModel) TableName

func (WorkflowRunModel) TableName() string

type WorkflowStepModel

type WorkflowStepModel struct {
	StepID       string     `gorm:"column:step_id;primaryKey"`
	RunID        string     `` /* 181-byte string literal not displayed */
	ParentStepID *string    `gorm:"column:parent_step_id;index"`
	ExecutionID  *string    `gorm:"column:execution_id;index:idx_workflow_steps_run_execution,priority:2"`
	AgentNodeID  *string    `gorm:"column:agent_node_id;index;index:idx_workflow_steps_agent_not_before,priority:1"`
	Target       *string    `gorm:"column:target"`
	Status       string     `` /* 207-byte string literal not displayed */
	Attempt      int        `gorm:"column:attempt;not null;default:0"`
	Priority     int        `gorm:"column:priority;not null;default:0;index:idx_workflow_steps_run_priority,priority:2"`
	NotBefore    time.Time  `` /* 189-byte string literal not displayed */
	InputURI     *string    `gorm:"column:input_uri"`
	ResultURI    *string    `gorm:"column:result_uri"`
	ErrorMessage *string    `gorm:"column:error_message"`
	Metadata     []byte     `gorm:"column:metadata;default:'{}'"`
	StartedAt    *time.Time `gorm:"column:started_at"`
	CompletedAt  *time.Time `gorm:"column:completed_at"`
	LeasedAt     *time.Time `gorm:"column:leased_at"`
	LeaseTimeout *time.Time `gorm:"column:lease_timeout"`
	CreatedAt    time.Time  `gorm:"column:created_at;autoCreateTime;index"`
	UpdatedAt    time.Time  `gorm:"column:updated_at;autoUpdateTime;index"`
}

func (WorkflowStepModel) TableName

func (WorkflowStepModel) TableName() string

type WorkflowUnitOfWork

type WorkflowUnitOfWork interface {
	UnitOfWork

	// High-level workflow operations
	StoreWorkflowWithExecution(ctx context.Context, workflow *types.Workflow, execution *types.WorkflowExecution) error
	UpdateWorkflowStatus(ctx context.Context, workflowID string, status string, execution *types.WorkflowExecution) error
	CompleteWorkflowWithResults(workflowID string, results map[string]interface{}) error
	StoreWorkflowWithSession(ctx context.Context, workflow *types.Workflow, session *types.Session) error
}

WorkflowUnitOfWork provides specialized operations for workflow management

func NewWorkflowUnitOfWork

func NewWorkflowUnitOfWork(db *sqlDatabase, backend unitOfWorkBackend) WorkflowUnitOfWork

NewWorkflowUnitOfWork creates a new workflow-specific unit of work instance

type WorkflowVCModel

type WorkflowVCModel struct {
	WorkflowVCID      string     `gorm:"column:workflow_vc_id;primaryKey"`
	WorkflowID        string     `gorm:"column:workflow_id;not null;index"`
	SessionID         string     `gorm:"column:session_id;not null;index"`
	ComponentVCIDs    string     `gorm:"column:component_vc_ids;default:'[]'"`
	Status            string     `gorm:"column:status;not null;default:'pending';index"`
	StartTime         time.Time  `gorm:"column:start_time;autoCreateTime;index"`
	EndTime           *time.Time `gorm:"column:end_time;index"`
	TotalSteps        int        `gorm:"column:total_steps;default:0"`
	CompletedSteps    int        `gorm:"column:completed_steps;default:0"`
	StorageURI        string     `gorm:"column:storage_uri;default:''"`
	DocumentSizeBytes int64      `gorm:"column:document_size_bytes;default:0"`
	CreatedAt         time.Time  `gorm:"column:created_at;autoCreateTime;index"`
	UpdatedAt         time.Time  `gorm:"column:updated_at;autoUpdateTime"`
}

func (WorkflowVCModel) TableName

func (WorkflowVCModel) TableName() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL