Documentation
¶
Index ¶
- type AccessPolicyModel
- type AgentConfigurationModel
- type AgentDIDModel
- type AgentExecutionModel
- type AgentNodeModel
- type AgentPackageModel
- type AgentTagVCModel
- type CacheMessage
- type CacheProvider
- type Change
- type ChangeType
- type ComponentDIDModel
- type ComponentDIDRequest
- type ConfigEntry
- type ConfigStorageModel
- type DBTX
- type DIDDocumentModel
- type DIDRegistryModel
- type DuplicateDIDError
- type ExecutionLogEntryModel
- type ExecutionRecordModel
- type ExecutionVCModel
- type ExecutionWebhookEventModel
- type ExecutionWebhookModel
- type ForeignKeyConstraintError
- type InvalidExecutionStateTransitionError
- type LocalStorage
- func (ls *LocalStorage) AcquireLock(ctx context.Context, key string, timeout time.Duration) (*types.DistributedLock, error)
- func (ls *LocalStorage) AddToDeadLetterQueue(ctx context.Context, event *types.ObservabilityEvent, errorMessage string, ...) error
- func (ls *LocalStorage) BeginTransaction() (Transaction, error)
- func (ls *LocalStorage) CleanupOldExecutions(ctx context.Context, retentionPeriod time.Duration, batchSize int) (int, error)
- func (ls *LocalStorage) CleanupWorkflow(ctx context.Context, identifier string, dryRun bool) (*types.WorkflowCleanupResult, error)
- func (ls *LocalStorage) ClearDeadLetterQueue(ctx context.Context) error
- func (ls *LocalStorage) Close(ctx context.Context) error
- func (ls *LocalStorage) CountExecutionVCs(ctx context.Context, filters types.VCFilters) (int, error)
- func (ls *LocalStorage) CreateAccessPolicy(ctx context.Context, policy *types.AccessPolicy) error
- func (ls *LocalStorage) CreateExecutionRecord(ctx context.Context, exec *types.Execution) error
- func (ls *LocalStorage) CreateOrUpdateSession(ctx context.Context, session *types.Session) error
- func (ls *LocalStorage) CreateOrUpdateWorkflow(ctx context.Context, workflow *types.Workflow) error
- func (ls *LocalStorage) Delete(key string) error
- func (ls *LocalStorage) DeleteAccessPolicy(ctx context.Context, id int64) error
- func (ls *LocalStorage) DeleteAgentConfiguration(ctx context.Context, agentID, packageID string) error
- func (ls *LocalStorage) DeleteAgentPackage(ctx context.Context, packageID string) error
- func (ls *LocalStorage) DeleteAgentVersion(ctx context.Context, id string, version string) error
- func (ls *LocalStorage) DeleteConfig(ctx context.Context, key string) error
- func (ls *LocalStorage) DeleteFromDeadLetterQueue(ctx context.Context, ids []int64) error
- func (ls *LocalStorage) DeleteMemory(ctx context.Context, scope, scopeID, key string) error
- func (ls *LocalStorage) DeleteObservabilityWebhook(ctx context.Context) error
- func (ls *LocalStorage) DeleteVector(ctx context.Context, scope, scopeID, key string) error
- func (ls *LocalStorage) DeleteVectorsByPrefix(ctx context.Context, scope, scopeID, prefix string) (int, error)
- func (ls *LocalStorage) Exists(key string) bool
- func (ls *LocalStorage) Get(key string, dest interface{}) error
- func (ls *LocalStorage) GetAccessPolicies(ctx context.Context) ([]*types.AccessPolicy, error)
- func (ls *LocalStorage) GetAccessPolicyByID(ctx context.Context, id int64) (*types.AccessPolicy, error)
- func (ls *LocalStorage) GetAgent(ctx context.Context, id string) (*types.AgentNode, error)
- func (ls *LocalStorage) GetAgentConfiguration(ctx context.Context, agentID, packageID string) (*types.AgentConfiguration, error)
- func (ls *LocalStorage) GetAgentDID(ctx context.Context, agentID string) (*types.AgentDIDInfo, error)
- func (ls *LocalStorage) GetAgentFieldServerDID(ctx context.Context, agentfieldServerID string) (*types.AgentFieldServerDIDInfo, error)
- func (ls *LocalStorage) GetAgentPackage(ctx context.Context, packageID string) (*types.AgentPackage, error)
- func (ls *LocalStorage) GetAgentTagVC(ctx context.Context, agentID string) (*types.AgentTagVCRecord, error)
- func (ls *LocalStorage) GetAgentVersion(ctx context.Context, id string, version string) (*types.AgentNode, error)
- func (ls *LocalStorage) GetComponentDID(ctx context.Context, componentID string) (*types.ComponentDIDInfo, error)
- func (ls *LocalStorage) GetConfig(ctx context.Context, key string) (*ConfigEntry, error)
- func (ls *LocalStorage) GetDID(ctx context.Context, did string) (*types.DIDRegistryEntry, error)
- func (ls *LocalStorage) GetDIDDocument(ctx context.Context, did string) (*types.DIDDocumentRecord, error)
- func (ls *LocalStorage) GetDIDDocumentByAgentID(ctx context.Context, agentID string) (*types.DIDDocumentRecord, error)
- func (ls *LocalStorage) GetDeadLetterQueue(ctx context.Context, limit, offset int) ([]types.ObservabilityDeadLetterEntry, error)
- func (ls *LocalStorage) GetDeadLetterQueueCount(ctx context.Context) (int64, error)
- func (ls *LocalStorage) GetEventHistory(ctx context.Context, filter types.EventFilter) ([]*types.MemoryChangeEvent, error)
- func (ls *LocalStorage) GetExecution(ctx context.Context, id int64) (*types.AgentExecution, error)
- func (ls *LocalStorage) GetExecutionEventBus() *events.ExecutionEventBus
- func (ls *LocalStorage) GetExecutionLogEventBus() *events.EventBus[*types.ExecutionLogEntry]
- func (ls *LocalStorage) GetExecutionRecord(ctx context.Context, executionID string) (*types.Execution, error)
- func (ls *LocalStorage) GetExecutionVC(ctx context.Context, vcID string) (*types.ExecutionVCInfo, error)
- func (ls *LocalStorage) GetExecutionWebhook(ctx context.Context, executionID string) (*types.ExecutionWebhook, error)
- func (ls *LocalStorage) GetFullExecutionVC(vcID string) (json.RawMessage, string, error)
- func (ls *LocalStorage) GetLockStatus(ctx context.Context, key string) (*types.DistributedLock, error)
- func (ls *LocalStorage) GetMemory(ctx context.Context, scope, scopeID, key string) (*types.Memory, error)
- func (ls *LocalStorage) GetObservabilityWebhook(ctx context.Context) (*types.ObservabilityWebhookConfig, error)
- func (ls *LocalStorage) GetReasonerExecutionHistory(ctx context.Context, reasonerID string, page, limit int) (*types.ReasonerExecutionHistory, error)
- func (ls *LocalStorage) GetReasonerPerformanceMetrics(ctx context.Context, reasonerID string) (*types.ReasonerPerformanceMetrics, error)
- func (ls *LocalStorage) GetSession(ctx context.Context, sessionID string) (*types.Session, error)
- func (ls *LocalStorage) GetVector(ctx context.Context, scope, scopeID, key string) (*types.VectorRecord, error)
- func (ls *LocalStorage) GetWorkflow(ctx context.Context, workflowID string) (*types.Workflow, error)
- func (ls *LocalStorage) GetWorkflowExecution(ctx context.Context, executionID string) (*types.WorkflowExecution, error)
- func (ls *LocalStorage) GetWorkflowExecutionEventBus() *events.EventBus[*types.WorkflowExecutionEvent]
- func (ls *LocalStorage) GetWorkflowRun(ctx context.Context, runID string) (*types.WorkflowRun, error)
- func (ls *LocalStorage) GetWorkflowVC(ctx context.Context, workflowVCID string) (*types.WorkflowVCInfo, error)
- func (ls *LocalStorage) HasExecutionWebhook(ctx context.Context, executionID string) (bool, error)
- func (ls *LocalStorage) HealthCheck(ctx context.Context) error
- func (ls *LocalStorage) Initialize(ctx context.Context, config StorageConfig) error
- func (ls *LocalStorage) ListAgentDIDs(ctx context.Context) ([]*types.AgentDIDInfo, error)
- func (ls *LocalStorage) ListAgentFieldServerDIDs(ctx context.Context) ([]*types.AgentFieldServerDIDInfo, error)
- func (ls *LocalStorage) ListAgentGroups(ctx context.Context, teamID string) ([]types.AgentGroupSummary, error)
- func (ls *LocalStorage) ListAgentTagVCs(ctx context.Context) ([]*types.AgentTagVCRecord, error)
- func (ls *LocalStorage) ListAgentVersions(ctx context.Context, id string) ([]*types.AgentNode, error)
- func (ls *LocalStorage) ListAgents(ctx context.Context, filters types.AgentFilters) ([]*types.AgentNode, error)
- func (ls *LocalStorage) ListAgentsByGroup(ctx context.Context, groupID string) ([]*types.AgentNode, error)
- func (ls *LocalStorage) ListAgentsByLifecycleStatus(ctx context.Context, status types.AgentLifecycleStatus) ([]*types.AgentNode, error)
- func (ls *LocalStorage) ListComponentDIDs(ctx context.Context, agentDID string) ([]*types.ComponentDIDInfo, error)
- func (ls *LocalStorage) ListConfigs(ctx context.Context) ([]*ConfigEntry, error)
- func (ls *LocalStorage) ListDIDDocuments(ctx context.Context) ([]*types.DIDDocumentRecord, error)
- func (ls *LocalStorage) ListDIDs(ctx context.Context) ([]*types.DIDRegistryEntry, error)
- func (ls *LocalStorage) ListDueExecutionWebhooks(ctx context.Context, limit int) ([]*types.ExecutionWebhook, error)
- func (ls *LocalStorage) ListExecutionLogEntries(ctx context.Context, executionID string, afterSeq *int64, limit int, ...) ([]*types.ExecutionLogEntry, error)
- func (ls *LocalStorage) ListExecutionVCs(ctx context.Context, filters types.VCFilters) ([]*types.ExecutionVCInfo, error)
- func (ls *LocalStorage) ListExecutionWebhookEvents(ctx context.Context, executionID string) ([]*types.ExecutionWebhookEvent, error)
- func (ls *LocalStorage) ListExecutionWebhookEventsBatch(ctx context.Context, executionIDs []string) (map[string][]*types.ExecutionWebhookEvent, error)
- func (ls *LocalStorage) ListExecutionWebhooksRegistered(ctx context.Context, executionIDs []string) (map[string]bool, error)
- func (ls *LocalStorage) ListMemory(ctx context.Context, scope, scopeID string) ([]*types.Memory, error)
- func (ls *LocalStorage) ListWorkflowExecutionEvents(ctx context.Context, executionID string, afterSeq *int64, limit int) ([]*types.WorkflowExecutionEvent, error)
- func (ls *LocalStorage) ListWorkflowVCStatusSummaries(ctx context.Context, workflowIDs []string) ([]*types.WorkflowVCStatusAggregation, error)
- func (ls *LocalStorage) ListWorkflowVCs(ctx context.Context, workflowID string) ([]*types.WorkflowVCInfo, error)
- func (ls *LocalStorage) MarkStaleExecutions(ctx context.Context, staleAfter time.Duration, limit int) (int, error)
- func (ls *LocalStorage) MarkStaleWorkflowExecutions(ctx context.Context, staleAfter time.Duration, limit int) (int, error)
- func (ls *LocalStorage) NewUnitOfWork() UnitOfWork
- func (ls *LocalStorage) NewWorkflowUnitOfWork() WorkflowUnitOfWork
- func (ls *LocalStorage) PruneExecutionLogEntries(ctx context.Context, executionID string, maxEntries int, olderThan time.Time) error
- func (ls *LocalStorage) Publish(channel string, message interface{}) error
- func (ls *LocalStorage) PublishMemoryChange(ctx context.Context, event types.MemoryChangeEvent) error
- func (ls *LocalStorage) QueryAgentConfigurations(ctx context.Context, filters types.ConfigurationFilters) ([]*types.AgentConfiguration, error)
- func (ls *LocalStorage) QueryAgentPackages(ctx context.Context, filters types.PackageFilters) ([]*types.AgentPackage, error)
- func (ls *LocalStorage) QueryExecutionRecords(ctx context.Context, filter types.ExecutionFilter) ([]*types.Execution, error)
- func (ls *LocalStorage) QueryExecutions(ctx context.Context, filters types.ExecutionFilters) ([]*types.AgentExecution, error)
- func (ls *LocalStorage) QueryRunSummaries(ctx context.Context, filter types.ExecutionFilter) ([]*RunSummaryAggregation, int, error)
- func (ls *LocalStorage) QuerySessions(ctx context.Context, filters types.SessionFilters) ([]*types.Session, error)
- func (ls *LocalStorage) QueryWorkflowDAG(ctx context.Context, rootWorkflowID string) ([]*types.WorkflowExecution, error)
- func (ls *LocalStorage) QueryWorkflowExecutions(ctx context.Context, filters types.WorkflowExecutionFilters) ([]*types.WorkflowExecution, error)
- func (ls *LocalStorage) QueryWorkflows(ctx context.Context, filters types.WorkflowFilters) ([]*types.Workflow, error)
- func (ls *LocalStorage) RegisterAgent(ctx context.Context, agent *types.AgentNode) error
- func (ls *LocalStorage) RegisterExecutionWebhook(ctx context.Context, webhook *types.ExecutionWebhook) error
- func (ls *LocalStorage) ReleaseLock(ctx context.Context, lockID string) error
- func (ls *LocalStorage) RenewLock(ctx context.Context, lockID string) (*types.DistributedLock, error)
- func (ls *LocalStorage) RetryStaleWorkflowExecutions(ctx context.Context, staleAfter time.Duration, maxRetries int, limit int) ([]string, error)
- func (ls *LocalStorage) RevokeAgentTagVC(ctx context.Context, agentID string) error
- func (ls *LocalStorage) RevokeDIDDocument(ctx context.Context, did string) error
- func (ls *LocalStorage) Set(key string, value interface{}, ttl time.Duration) error
- func (ls *LocalStorage) SetConfig(ctx context.Context, key string, value string, updatedBy string) error
- func (ls *LocalStorage) SetMemory(ctx context.Context, memory *types.Memory) error
- func (ls *LocalStorage) SetObservabilityWebhook(ctx context.Context, config *types.ObservabilityWebhookConfig) error
- func (ls *LocalStorage) SetVector(ctx context.Context, record *types.VectorRecord) error
- func (ls *LocalStorage) SimilaritySearch(ctx context.Context, scope, scopeID string, queryEmbedding []float32, topK int, ...) ([]*types.VectorSearchResult, error)
- func (ls *LocalStorage) StoreAgentConfiguration(ctx context.Context, config *types.AgentConfiguration) error
- func (ls *LocalStorage) StoreAgentDID(ctx context.Context, ...) error
- func (ls *LocalStorage) StoreAgentDIDWithComponents(ctx context.Context, ...) error
- func (ls *LocalStorage) StoreAgentFieldServerDID(ctx context.Context, agentfieldServerID, rootDID string, masterSeed []byte, ...) error
- func (ls *LocalStorage) StoreAgentPackage(ctx context.Context, pkg *types.AgentPackage) error
- func (ls *LocalStorage) StoreAgentTagVC(ctx context.Context, agentID, agentDID, vcID, vcDocument, signature string, ...) error
- func (ls *LocalStorage) StoreComponentDID(ctx context.Context, ...) error
- func (ls *LocalStorage) StoreDID(ctx context.Context, did string, ...) error
- func (ls *LocalStorage) StoreDIDDocument(ctx context.Context, record *types.DIDDocumentRecord) error
- func (ls *LocalStorage) StoreEvent(ctx context.Context, event *types.MemoryChangeEvent) error
- func (ls *LocalStorage) StoreExecution(ctx context.Context, execution *types.AgentExecution) error
- func (ls *LocalStorage) StoreExecutionLogEntries(ctx context.Context, executionID string, entries []*types.ExecutionLogEntry) error
- func (ls *LocalStorage) StoreExecutionLogEntry(ctx context.Context, entry *types.ExecutionLogEntry) error
- func (ls *LocalStorage) StoreExecutionVC(ctx context.Context, ...) error
- func (ls *LocalStorage) StoreExecutionWebhookEvent(ctx context.Context, event *types.ExecutionWebhookEvent) error
- func (ls *LocalStorage) StoreWorkflowExecution(ctx context.Context, execution *types.WorkflowExecution) error
- func (ls *LocalStorage) StoreWorkflowExecutionEvent(ctx context.Context, event *types.WorkflowExecutionEvent) error
- func (ls *LocalStorage) StoreWorkflowExecutionWithUnitOfWork(ctx context.Context, execution *types.WorkflowExecution) error
- func (ls *LocalStorage) StoreWorkflowRun(ctx context.Context, run *types.WorkflowRun) error
- func (ls *LocalStorage) StoreWorkflowRunEvent(ctx context.Context, event *types.WorkflowRunEvent) error
- func (ls *LocalStorage) StoreWorkflowStep(ctx context.Context, step *types.WorkflowStep) error
- func (ls *LocalStorage) StoreWorkflowVC(ctx context.Context, workflowVCID, workflowID, sessionID string, ...) error
- func (ls *LocalStorage) Subscribe(channel string) (<-chan CacheMessage, error)
- func (ls *LocalStorage) SubscribeToMemoryChanges(ctx context.Context, scope, scopeID string) (<-chan types.MemoryChangeEvent, error)
- func (ls *LocalStorage) TryMarkExecutionWebhookInFlight(ctx context.Context, executionID string, now time.Time) (bool, error)
- func (ls *LocalStorage) UpdateAccessPolicy(ctx context.Context, policy *types.AccessPolicy) error
- func (ls *LocalStorage) UpdateAgentConfiguration(ctx context.Context, config *types.AgentConfiguration) error
- func (ls *LocalStorage) UpdateAgentHealth(ctx context.Context, id string, status types.HealthStatus) error
- func (ls *LocalStorage) UpdateAgentHealthAtomic(ctx context.Context, id string, status types.HealthStatus, ...) error
- func (ls *LocalStorage) UpdateAgentHeartbeat(ctx context.Context, id string, version string, heartbeatTime time.Time) error
- func (ls *LocalStorage) UpdateAgentLifecycleStatus(ctx context.Context, id string, status types.AgentLifecycleStatus) error
- func (ls *LocalStorage) UpdateAgentPackage(ctx context.Context, pkg *types.AgentPackage) error
- func (ls *LocalStorage) UpdateAgentTrafficWeight(ctx context.Context, id string, version string, weight int) error
- func (ls *LocalStorage) UpdateAgentVersion(ctx context.Context, id string, version string) error
- func (ls *LocalStorage) UpdateExecutionRecord(ctx context.Context, executionID string, ...) (*types.Execution, error)
- func (ls *LocalStorage) UpdateExecutionWebhookState(ctx context.Context, executionID string, ...) error
- func (ls *LocalStorage) UpdateWorkflowExecution(ctx context.Context, executionID string, ...) error
- func (ls *LocalStorage) ValidateAgentConfiguration(ctx context.Context, agentID, packageID string, config map[string]interface{}) (*types.ConfigurationValidationResult, error)
- type LocalStorageConfig
- type ObservabilityDeadLetterQueueModel
- type ObservabilityWebhookModel
- type PostgresStorageConfig
- type RunSummaryAggregation
- type SchemaMigrationModel
- type SessionModel
- type StorageConfig
- type StorageFactory
- type StorageProvider
- type Transaction
- type UnitOfWork
- type ValidationError
- type VectorDistanceMetric
- type VectorStoreConfig
- type WorkflowExecutionEventModel
- type WorkflowExecutionModel
- type WorkflowModel
- type WorkflowRunEventModel
- type WorkflowRunModel
- type WorkflowStepModel
- type WorkflowUnitOfWork
- type WorkflowVCModel
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AccessPolicyModel ¶
type AccessPolicyModel struct {
ID int64 `gorm:"column:id;primaryKey;autoIncrement"`
Name string `gorm:"column:name;not null;uniqueIndex"`
CallerTags string `gorm:"column:caller_tags;type:text;not null"` // JSON array
TargetTags string `gorm:"column:target_tags;type:text;not null"` // JSON array
AllowFunctions string `gorm:"column:allow_functions;type:text"` // JSON array
DenyFunctions string `gorm:"column:deny_functions;type:text"` // JSON array
Constraints string `gorm:"column:constraints;type:text"` // JSON object
Action string `gorm:"column:action;not null;default:'allow'"`
Priority int `gorm:"column:priority;not null;default:0;index"`
Enabled bool `gorm:"column:enabled;not null;default:true;index"`
Description *string `gorm:"column:description"`
CreatedAt time.Time `gorm:"column:created_at;autoCreateTime"`
UpdatedAt time.Time `gorm:"column:updated_at;autoUpdateTime"`
}
AccessPolicyModel represents a tag-based access policy for cross-agent calls.
func (AccessPolicyModel) TableName ¶
func (AccessPolicyModel) TableName() string
type AgentConfigurationModel ¶
type AgentConfigurationModel struct {
ID int64 `gorm:"column:id;primaryKey;autoIncrement"`
AgentID string `gorm:"column:agent_id;not null;index:idx_agent_config_agent_package,priority:1"`
PackageID string `gorm:"column:package_id;not null;index:idx_agent_config_agent_package,priority:2"`
Configuration []byte `gorm:"column:configuration;not null"`
EncryptedFields []byte `gorm:"column:encrypted_fields"`
Status string `gorm:"column:status;not null"`
Version int `gorm:"column:version;not null;default:1"`
CreatedAt time.Time `gorm:"column:created_at;autoCreateTime"`
UpdatedAt time.Time `gorm:"column:updated_at;autoUpdateTime"`
CreatedBy *string `gorm:"column:created_by"`
UpdatedBy *string `gorm:"column:updated_by"`
}
func (AgentConfigurationModel) TableName ¶
func (AgentConfigurationModel) TableName() string
type AgentDIDModel ¶
type AgentDIDModel struct {
DID string `gorm:"column:did;primaryKey"`
AgentNodeID string `gorm:"column:agent_node_id;not null;index"`
AgentFieldServerID string `gorm:"column:agentfield_server_id;not null;index"`
PublicKeyJWK string `gorm:"column:public_key_jwk;not null"`
DerivationPath string `gorm:"column:derivation_path;not null"`
Reasoners string `gorm:"column:reasoners;default:'{}'"`
Skills string `gorm:"column:skills;default:'{}'"`
Status string `gorm:"column:status;not null;default:'active'"`
RegisteredAt time.Time `gorm:"column:registered_at;autoCreateTime"`
CreatedAt time.Time `gorm:"column:created_at;autoCreateTime"`
UpdatedAt time.Time `gorm:"column:updated_at;autoUpdateTime"`
}
func (AgentDIDModel) TableName ¶
func (AgentDIDModel) TableName() string
type AgentExecutionModel ¶
type AgentExecutionModel struct {
ID int64 `gorm:"column:id;primaryKey;autoIncrement"`
WorkflowID string `gorm:"column:workflow_id;not null;index"`
SessionID *string `gorm:"column:session_id;index"`
AgentNodeID string `gorm:"column:agent_node_id;not null;index"`
ReasonerID string `gorm:"column:reasoner_id;not null;index"`
InputData []byte `gorm:"column:input_data"`
OutputData []byte `gorm:"column:output_data"`
InputSize int `gorm:"column:input_size"`
OutputSize int `gorm:"column:output_size"`
DurationMS int `gorm:"column:duration_ms;not null"`
Status string `gorm:"column:status;not null;index"`
ErrorMessage *string `gorm:"column:error_message"`
UserID *string `gorm:"column:user_id"`
TeamID *string `gorm:"column:team_id"`
Metadata []byte `gorm:"column:metadata"`
CreatedAt time.Time `gorm:"column:created_at;autoCreateTime"`
}
func (AgentExecutionModel) TableName ¶
func (AgentExecutionModel) TableName() string
type AgentNodeModel ¶
type AgentNodeModel struct {
ID string `gorm:"column:id;primaryKey"`
Version string `gorm:"column:version;primaryKey;not null;default:''"`
GroupID string `gorm:"column:group_id;not null;default:'';index"`
TeamID string `gorm:"column:team_id;not null;index"`
BaseURL string `gorm:"column:base_url;not null"`
TrafficWeight int `gorm:"column:traffic_weight;not null;default:100"`
DeploymentType string `gorm:"column:deployment_type;default:'long_running';index"`
InvocationURL *string `gorm:"column:invocation_url"`
Reasoners []byte `gorm:"column:reasoners"`
Skills []byte `gorm:"column:skills"`
CommunicationConfig []byte `gorm:"column:communication_config"`
HealthStatus string `gorm:"column:health_status;not null;index"`
LifecycleStatus string `gorm:"column:lifecycle_status;default:'starting';index"`
LastHeartbeat *time.Time `gorm:"column:last_heartbeat"`
RegisteredAt time.Time `gorm:"column:registered_at;autoCreateTime"`
Features []byte `gorm:"column:features"`
Metadata []byte `gorm:"column:metadata"`
ProposedTags []byte `gorm:"column:proposed_tags"`
ApprovedTags []byte `gorm:"column:approved_tags"`
}
func (AgentNodeModel) TableName ¶
func (AgentNodeModel) TableName() string
type AgentPackageModel ¶
type AgentPackageModel struct {
ID string `gorm:"column:id;primaryKey"`
Name string `gorm:"column:name;not null"`
Version string `gorm:"column:version;not null"`
Description *string `gorm:"column:description"`
Author *string `gorm:"column:author"`
Repository *string `gorm:"column:repository"`
InstallPath string `gorm:"column:install_path;not null"`
ConfigurationSchema []byte `gorm:"column:configuration_schema"`
Status string `gorm:"column:status;not null"`
ConfigurationStatus string `gorm:"column:configuration_status;not null"`
InstalledAt time.Time `gorm:"column:installed_at;autoCreateTime"`
UpdatedAt time.Time `gorm:"column:updated_at;autoUpdateTime"`
Metadata []byte `gorm:"column:metadata"`
}
func (AgentPackageModel) TableName ¶
func (AgentPackageModel) TableName() string
type AgentTagVCModel ¶
type AgentTagVCModel struct {
ID int64 `gorm:"column:id;primaryKey;autoIncrement"`
AgentID string `gorm:"column:agent_id;uniqueIndex;not null"`
AgentDID string `gorm:"column:agent_did;not null;index"`
VCID string `gorm:"column:vc_id;uniqueIndex;not null"`
VCDocument string `gorm:"column:vc_document;type:text;not null"`
Signature string `gorm:"column:signature;type:text"`
IssuedAt time.Time `gorm:"column:issued_at;not null"`
ExpiresAt *time.Time `gorm:"column:expires_at"`
RevokedAt *time.Time `gorm:"column:revoked_at"`
CreatedAt time.Time `gorm:"column:created_at;autoCreateTime"`
UpdatedAt time.Time `gorm:"column:updated_at;autoUpdateTime"`
}
AgentTagVCModel stores signed Agent Tag VCs issued on tag approval.
func (AgentTagVCModel) TableName ¶
func (AgentTagVCModel) TableName() string
type CacheMessage ¶
CacheMessage represents a message received from the cache's pub/sub.
type CacheProvider ¶
type CacheProvider interface {
Set(key string, value interface{}, ttl time.Duration) error
Get(key string, dest interface{}) error
Delete(key string) error
Exists(key string) bool
// Pub/Sub for real-time features
Subscribe(channel string) (<-chan CacheMessage, error)
Publish(channel string, message interface{}) error
}
CacheProvider is the interface for the high-performance caching layer.
type Change ¶
type Change struct {
Entity interface{}
Table string
Type ChangeType
Operation func(DBTX) error
Timestamp time.Time
}
Change represents a single database operation within the unit of work
type ChangeType ¶
type ChangeType int
ChangeType represents the type of change being tracked
const ( ChangeTypeNew ChangeType = iota ChangeTypeDirty ChangeTypeDeleted )
type ComponentDIDModel ¶
type ComponentDIDModel struct {
DID string `gorm:"column:did;primaryKey"`
AgentDID string `gorm:"column:agent_did;not null;index"`
ComponentType string `gorm:"column:component_type;not null;index"`
FunctionName string `gorm:"column:function_name;not null"`
PublicKeyJWK string `gorm:"column:public_key_jwk;not null"`
DerivationPath string `gorm:"column:derivation_path;not null"`
Capabilities string `gorm:"column:capabilities;default:'[]'"`
Tags string `gorm:"column:tags;default:'[]'"`
ExposureLevel string `gorm:"column:exposure_level;not null;default:'private'"`
CreatedAt time.Time `gorm:"column:created_at;autoCreateTime"`
UpdatedAt time.Time `gorm:"column:updated_at;autoUpdateTime"`
}
func (ComponentDIDModel) TableName ¶
func (ComponentDIDModel) TableName() string
type ComponentDIDRequest ¶
type ComponentDIDRequest struct {
ComponentDID string
ComponentType string
ComponentName string
PublicKeyJWK string
DerivationIndex int
}
ComponentDIDRequest represents a component DID to be stored
type ConfigEntry ¶
type ConfigEntry struct {
Key string `json:"key"`
Value string `json:"value"`
Version int `json:"version"`
CreatedBy string `json:"created_by,omitempty"`
UpdatedBy string `json:"updated_by,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
ConfigEntry represents a database-stored configuration file.
type ConfigStorageModel ¶
type ConfigStorageModel struct {
ID int64 `gorm:"column:id;primaryKey;autoIncrement"`
Key string `gorm:"column:key;not null;uniqueIndex"`
Value string `gorm:"column:value;type:text;not null"`
Version int `gorm:"column:version;not null;default:1"`
CreatedBy *string `gorm:"column:created_by"`
UpdatedBy *string `gorm:"column:updated_by"`
CreatedAt time.Time `gorm:"column:created_at;autoCreateTime"`
UpdatedAt time.Time `gorm:"column:updated_at;autoUpdateTime"`
}
ConfigStorageModel stores configuration files in the database. Each record represents a named configuration (e.g. "agentfield.yaml") with versioning for audit trail.
func (ConfigStorageModel) TableName ¶
func (ConfigStorageModel) TableName() string
type DBTX ¶
type DBTX interface {
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
Exec(query string, args ...interface{}) (sql.Result, error)
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
Query(query string, args ...interface{}) (*sql.Rows, error)
QueryRow(query string, args ...interface{}) *sql.Row
}
DBTX interface for operations that can run on a db or tx
type DIDDocumentModel ¶
type DIDDocumentModel struct {
DID string `gorm:"column:did;primaryKey"`
AgentID string `gorm:"column:agent_id;not null;index"`
DIDDocument []byte `gorm:"column:did_document;type:jsonb;not null"` // JSONB in PostgreSQL, TEXT in SQLite
PublicKeyJWK string `gorm:"column:public_key_jwk;not null"`
RevokedAt *time.Time `gorm:"column:revoked_at;index"`
CreatedAt time.Time `gorm:"column:created_at;autoCreateTime"`
UpdatedAt time.Time `gorm:"column:updated_at;autoUpdateTime"`
}
DIDDocumentModel represents a DID document record for did:web resolution.
func (DIDDocumentModel) TableName ¶
func (DIDDocumentModel) TableName() string
type DIDRegistryModel ¶
type DIDRegistryModel struct {
AgentFieldServerID string `gorm:"column:agentfield_server_id;primaryKey"`
MasterSeedEncrypted []byte `gorm:"column:master_seed_encrypted;not null"`
RootDID string `gorm:"column:root_did;not null;unique"`
AgentNodes string `gorm:"column:agent_nodes;default:'{}'"`
TotalDIDs int `gorm:"column:total_dids;default:0"`
CreatedAt time.Time `gorm:"column:created_at;autoCreateTime"`
LastKeyRotation time.Time `gorm:"column:last_key_rotation;autoCreateTime"`
}
func (DIDRegistryModel) TableName ¶
func (DIDRegistryModel) TableName() string
type DuplicateDIDError ¶
Custom error types for data integrity issues
func (*DuplicateDIDError) Error ¶
func (e *DuplicateDIDError) Error() string
type ExecutionLogEntryModel ¶
type ExecutionLogEntryModel struct {
EventID int64 `gorm:"column:event_id;primaryKey;autoIncrement"`
ExecutionID string `gorm:"column:execution_id;not null;index:idx_execution_logs_execution,priority:1"`
WorkflowID string `gorm:"column:workflow_id;not null;index"`
RunID *string `gorm:"column:run_id;index"`
RootWorkflowID *string `gorm:"column:root_workflow_id;index"`
ParentExecutionID *string `gorm:"column:parent_execution_id;index"`
Sequence int64 `gorm:"column:sequence;not null;index:idx_execution_logs_execution,priority:2"`
AgentNodeID string `gorm:"column:agent_node_id;not null;index"`
ReasonerID *string `gorm:"column:reasoner_id;index"`
Level string `gorm:"column:level;not null;index"`
Source string `gorm:"column:source;not null;index"`
EventType *string `gorm:"column:event_type;index"`
Message string `gorm:"column:message;not null"`
Attributes string `gorm:"column:attributes;default:'{}'"`
SystemGenerated bool `gorm:"column:system_generated;not null;default:false"`
SDKLanguage *string `gorm:"column:sdk_language;index"`
Attempt *int `gorm:"column:attempt"`
SpanID *string `gorm:"column:span_id;index"`
StepID *string `gorm:"column:step_id;index"`
ErrorCategory *string `gorm:"column:error_category;index"`
EmittedAt time.Time `gorm:"column:emitted_at;not null;index"`
RecordedAt time.Time `gorm:"column:recorded_at;autoCreateTime"`
}
func (ExecutionLogEntryModel) TableName ¶
func (ExecutionLogEntryModel) TableName() string
type ExecutionRecordModel ¶
type ExecutionRecordModel struct {
ID int64 `gorm:"column:id;primaryKey;autoIncrement"`
ExecutionID string `gorm:"column:execution_id;not null;uniqueIndex"`
RunID string `gorm:"column:run_id;not null;index"`
ParentExecutionID *string `gorm:"column:parent_execution_id;index"`
AgentNodeID string `gorm:"column:agent_node_id;not null;index"`
ReasonerID string `gorm:"column:reasoner_id;not null;index"`
NodeID string `gorm:"column:node_id;not null;index"`
Status string `gorm:"column:status;not null;index"`
StatusReason *string `gorm:"column:status_reason"`
InputPayload []byte `gorm:"column:input_payload"`
ResultPayload []byte `gorm:"column:result_payload"`
ErrorMessage *string `gorm:"column:error_message"`
InputURI *string `gorm:"column:input_uri"`
ResultURI *string `gorm:"column:result_uri"`
SessionID *string `gorm:"column:session_id;index"`
ActorID *string `gorm:"column:actor_id;index"`
StartedAt time.Time `gorm:"column:started_at;not null;index"`
CompletedAt *time.Time `gorm:"column:completed_at"`
DurationMS *int64 `gorm:"column:duration_ms"`
Notes string `gorm:"column:notes;default:'[]'"`
CreatedAt time.Time `gorm:"column:created_at;autoCreateTime"`
UpdatedAt time.Time `gorm:"column:updated_at;autoUpdateTime"`
}
func (ExecutionRecordModel) TableName ¶
func (ExecutionRecordModel) TableName() string
type ExecutionVCModel ¶
type ExecutionVCModel struct {
VCID string `gorm:"column:vc_id;primaryKey"`
ExecutionID string `gorm:"column:execution_id;not null;index;index:idx_execution_vcs_execution_unique,priority:1"`
WorkflowID string `gorm:"column:workflow_id;not null;index"`
SessionID string `gorm:"column:session_id;not null;index"`
IssuerDID string `gorm:"column:issuer_did;not null;index;index:idx_execution_vcs_execution_unique,priority:2"`
TargetDID *string `gorm:"column:target_did;index;index:idx_execution_vcs_execution_unique,priority:3"`
CallerDID string `gorm:"column:caller_did;not null;index"`
VCDocument string `gorm:"column:vc_document;not null"`
Signature string `gorm:"column:signature;not null"`
StorageURI string `gorm:"column:storage_uri;default:''"`
DocumentSizeBytes int64 `gorm:"column:document_size_bytes;default:0"`
InputHash string `gorm:"column:input_hash;not null"`
OutputHash string `gorm:"column:output_hash;not null"`
Status string `gorm:"column:status;not null;default:'pending';index"`
ParentVCID *string `gorm:"column:parent_vc_id;index"`
ChildVCIDs string `gorm:"column:child_vc_ids;default:'[]'"`
CreatedAt time.Time `gorm:"column:created_at;autoCreateTime;index"`
UpdatedAt time.Time `gorm:"column:updated_at;autoUpdateTime"`
}
func (ExecutionVCModel) TableName ¶
func (ExecutionVCModel) TableName() string
type ExecutionWebhookEventModel ¶
type ExecutionWebhookEventModel struct {
ID int64 `gorm:"column:id;primaryKey;autoIncrement"`
ExecutionID string `gorm:"column:execution_id;not null;index"`
EventType string `gorm:"column:event_type;not null"`
Status string `gorm:"column:status;not null"`
HTTPStatus *int `gorm:"column:http_status"`
Payload *string `gorm:"column:payload"`
ResponseBody *string `gorm:"column:response_body"`
ErrorMessage *string `gorm:"column:error_message"`
CreatedAt time.Time `gorm:"column:created_at;autoCreateTime"`
}
func (ExecutionWebhookEventModel) TableName ¶
func (ExecutionWebhookEventModel) TableName() string
type ExecutionWebhookModel ¶
type ExecutionWebhookModel struct {
ExecutionID string `gorm:"column:execution_id;primaryKey"`
URL string `gorm:"column:url;not null"`
Secret *string `gorm:"column:secret"`
Headers string `gorm:"column:headers;default:'{}'"`
Status string `gorm:"column:status;not null;default:'pending'"`
AttemptCount int `gorm:"column:attempt_count;not null;default:0"`
NextAttemptAt *time.Time `gorm:"column:next_attempt_at"`
LastAttemptAt *time.Time `gorm:"column:last_attempt_at"`
LastError *string `gorm:"column:last_error"`
CreatedAt time.Time `gorm:"column:created_at;autoCreateTime"`
UpdatedAt time.Time `gorm:"column:updated_at;autoUpdateTime"`
}
func (ExecutionWebhookModel) TableName ¶
func (ExecutionWebhookModel) TableName() string
type ForeignKeyConstraintError ¶
type ForeignKeyConstraintError struct {
Table string
Column string
ReferencedTable string
ReferencedValue string
Operation string
}
ForeignKeyConstraintError represents a foreign key constraint violation
func (*ForeignKeyConstraintError) Error ¶
func (e *ForeignKeyConstraintError) Error() string
type InvalidExecutionStateTransitionError ¶
type InvalidExecutionStateTransitionError struct {
ExecutionID string
CurrentState string
NewState string
Reason string
}
func (*InvalidExecutionStateTransitionError) Error ¶
func (e *InvalidExecutionStateTransitionError) Error() string
type LocalStorage ¶
type LocalStorage struct {
// contains filtered or unexported fields
}
LocalStorage implements the StorageProvider and CacheProvider interfaces using SQLite for structured data and BoltDB for key-value data (memory).
CONCURRENCY MODEL: - SQLite is configured with WAL (Write-Ahead Logging) mode for optimal concurrency - Read-only operations (SELECT queries) do NOT acquire writeMutex - they run concurrently - Write operations (INSERT/UPDATE/DELETE) acquire writeMutex for serialization - WAL mode allows multiple concurrent readers with a single writer without blocking - This eliminates the performance bottleneck where analytics queries blocked all writes
func NewLocalStorage ¶
func NewLocalStorage(config LocalStorageConfig) *LocalStorage
NewLocalStorage creates a new instance of LocalStorage.
func NewPostgresStorage ¶
func NewPostgresStorage(config PostgresStorageConfig) *LocalStorage
NewPostgresStorage creates a new instance configured for PostgreSQL.
func (*LocalStorage) AcquireLock ¶
func (ls *LocalStorage) AcquireLock(ctx context.Context, key string, timeout time.Duration) (*types.DistributedLock, error)
AcquireLock attempts to acquire a distributed lock.
func (*LocalStorage) AddToDeadLetterQueue ¶
func (ls *LocalStorage) AddToDeadLetterQueue(ctx context.Context, event *types.ObservabilityEvent, errorMessage string, retryCount int) error
AddToDeadLetterQueue adds a failed event to the dead letter queue.
func (*LocalStorage) BeginTransaction ¶
func (ls *LocalStorage) BeginTransaction() (Transaction, error)
TransactionalStorage methods (not fully implemented for local storage yet)
func (*LocalStorage) CleanupOldExecutions ¶
func (ls *LocalStorage) CleanupOldExecutions(ctx context.Context, retentionPeriod time.Duration, batchSize int) (int, error)
CleanupOldExecutions removes old completed workflow executions based on retention period
func (*LocalStorage) CleanupWorkflow ¶
func (ls *LocalStorage) CleanupWorkflow(ctx context.Context, identifier string, dryRun bool) (*types.WorkflowCleanupResult, error)
CleanupWorkflow deletes all data related to a specific workflow ID or workflow run identifier
func (*LocalStorage) ClearDeadLetterQueue ¶
func (ls *LocalStorage) ClearDeadLetterQueue(ctx context.Context) error
ClearDeadLetterQueue removes all entries from the dead letter queue.
func (*LocalStorage) Close ¶
func (ls *LocalStorage) Close(ctx context.Context) error
Close closes the SQLite and BoltDB connections.
func (*LocalStorage) CountExecutionVCs ¶
func (*LocalStorage) CreateAccessPolicy ¶
func (ls *LocalStorage) CreateAccessPolicy(ctx context.Context, policy *types.AccessPolicy) error
CreateAccessPolicy creates a new access policy.
func (*LocalStorage) CreateExecutionRecord ¶
CreateExecutionRecord inserts a new execution row using the simplified schema.
func (*LocalStorage) CreateOrUpdateSession ¶
CreateOrUpdateSession creates or updates a session record in SQLite
func (*LocalStorage) CreateOrUpdateWorkflow ¶
CreateOrUpdateWorkflow creates or updates a workflow record in SQLite
func (*LocalStorage) Delete ¶
func (ls *LocalStorage) Delete(key string) error
Delete implements the CacheProvider Delete method using the in-memory cache.
func (*LocalStorage) DeleteAccessPolicy ¶
func (ls *LocalStorage) DeleteAccessPolicy(ctx context.Context, id int64) error
DeleteAccessPolicy deletes an access policy by ID.
func (*LocalStorage) DeleteAgentConfiguration ¶
func (ls *LocalStorage) DeleteAgentConfiguration(ctx context.Context, agentID, packageID string) error
DeleteAgentConfiguration deletes an agent configuration record
func (*LocalStorage) DeleteAgentPackage ¶
func (ls *LocalStorage) DeleteAgentPackage(ctx context.Context, packageID string) error
DeleteAgentPackage deletes an agent package record
func (*LocalStorage) DeleteAgentVersion ¶
DeleteAgentVersion deletes a specific agent version row from the agent_nodes table.
func (*LocalStorage) DeleteConfig ¶
func (ls *LocalStorage) DeleteConfig(ctx context.Context, key string) error
DeleteConfig removes a configuration entry by key.
func (*LocalStorage) DeleteFromDeadLetterQueue ¶
func (ls *LocalStorage) DeleteFromDeadLetterQueue(ctx context.Context, ids []int64) error
DeleteFromDeadLetterQueue removes specific entries from the dead letter queue.
func (*LocalStorage) DeleteMemory ¶
func (ls *LocalStorage) DeleteMemory(ctx context.Context, scope, scopeID, key string) error
DeleteMemory deletes a memory record from BoltDB and cache.
func (*LocalStorage) DeleteObservabilityWebhook ¶
func (ls *LocalStorage) DeleteObservabilityWebhook(ctx context.Context) error
DeleteObservabilityWebhook removes the global observability webhook configuration.
func (*LocalStorage) DeleteVector ¶
func (ls *LocalStorage) DeleteVector(ctx context.Context, scope, scopeID, key string) error
DeleteVector removes a stored vector embedding.
func (*LocalStorage) DeleteVectorsByPrefix ¶
func (ls *LocalStorage) DeleteVectorsByPrefix(ctx context.Context, scope, scopeID, prefix string) (int, error)
DeleteVectorsByPrefix deletes all vectors whose key starts with the given prefix.
func (*LocalStorage) Exists ¶
func (ls *LocalStorage) Exists(key string) bool
Exists implements the CacheProvider Exists method using the in-memory cache.
func (*LocalStorage) Get ¶
func (ls *LocalStorage) Get(key string, dest interface{}) error
Get implements the CacheProvider Get method using the in-memory cache.
func (*LocalStorage) GetAccessPolicies ¶
func (ls *LocalStorage) GetAccessPolicies(ctx context.Context) ([]*types.AccessPolicy, error)
GetAccessPolicies retrieves all enabled access policies, sorted by priority descending.
func (*LocalStorage) GetAccessPolicyByID ¶
func (ls *LocalStorage) GetAccessPolicyByID(ctx context.Context, id int64) (*types.AccessPolicy, error)
GetAccessPolicyByID retrieves a single access policy by its ID.
func (*LocalStorage) GetAgent ¶
GetAgent retrieves the default (unversioned) agent node record by ID. It filters for version = ” to return only the default agent. Use GetAgentVersion for a specific version, or ListAgentVersions for all versions.
func (*LocalStorage) GetAgentConfiguration ¶
func (ls *LocalStorage) GetAgentConfiguration(ctx context.Context, agentID, packageID string) (*types.AgentConfiguration, error)
GetAgentConfiguration retrieves an agent configuration record from SQLite
func (*LocalStorage) GetAgentDID ¶
func (ls *LocalStorage) GetAgentDID(ctx context.Context, agentID string) (*types.AgentDIDInfo, error)
func (*LocalStorage) GetAgentFieldServerDID ¶
func (ls *LocalStorage) GetAgentFieldServerDID(ctx context.Context, agentfieldServerID string) (*types.AgentFieldServerDIDInfo, error)
func (*LocalStorage) GetAgentPackage ¶
func (ls *LocalStorage) GetAgentPackage(ctx context.Context, packageID string) (*types.AgentPackage, error)
GetAgentPackage retrieves an agent package record from SQLite
func (*LocalStorage) GetAgentTagVC ¶
func (ls *LocalStorage) GetAgentTagVC(ctx context.Context, agentID string) (*types.AgentTagVCRecord, error)
GetAgentTagVC retrieves an agent's tag VC record.
func (*LocalStorage) GetAgentVersion ¶
func (ls *LocalStorage) GetAgentVersion(ctx context.Context, id string, version string) (*types.AgentNode, error)
GetAgentVersion retrieves a specific (id, version) agent node.
func (*LocalStorage) GetComponentDID ¶
func (ls *LocalStorage) GetComponentDID(ctx context.Context, componentID string) (*types.ComponentDIDInfo, error)
func (*LocalStorage) GetConfig ¶
func (ls *LocalStorage) GetConfig(ctx context.Context, key string) (*ConfigEntry, error)
GetConfig retrieves a configuration entry by key.
func (*LocalStorage) GetDID ¶
func (ls *LocalStorage) GetDID(ctx context.Context, did string) (*types.DIDRegistryEntry, error)
func (*LocalStorage) GetDIDDocument ¶
func (ls *LocalStorage) GetDIDDocument(ctx context.Context, did string) (*types.DIDDocumentRecord, error)
GetDIDDocument retrieves a DID document by its DID.
func (*LocalStorage) GetDIDDocumentByAgentID ¶
func (ls *LocalStorage) GetDIDDocumentByAgentID(ctx context.Context, agentID string) (*types.DIDDocumentRecord, error)
GetDIDDocumentByAgentID retrieves a DID document by agent ID.
func (*LocalStorage) GetDeadLetterQueue ¶
func (ls *LocalStorage) GetDeadLetterQueue(ctx context.Context, limit, offset int) ([]types.ObservabilityDeadLetterEntry, error)
GetDeadLetterQueue returns entries from the dead letter queue with pagination.
func (*LocalStorage) GetDeadLetterQueueCount ¶
func (ls *LocalStorage) GetDeadLetterQueueCount(ctx context.Context) (int64, error)
GetDeadLetterQueueCount returns the number of entries in the dead letter queue.
func (*LocalStorage) GetEventHistory ¶
func (ls *LocalStorage) GetEventHistory(ctx context.Context, filter types.EventFilter) ([]*types.MemoryChangeEvent, error)
GetEventHistory retrieves a list of memory change events based on a filter.
func (*LocalStorage) GetExecution ¶
func (ls *LocalStorage) GetExecution(ctx context.Context, id int64) (*types.AgentExecution, error)
GetExecution retrieves an agent execution record from SQLite by ID.
func (*LocalStorage) GetExecutionEventBus ¶
func (ls *LocalStorage) GetExecutionEventBus() *events.ExecutionEventBus
GetExecutionEventBus returns the execution event bus for real-time updates
func (*LocalStorage) GetExecutionLogEventBus ¶
func (ls *LocalStorage) GetExecutionLogEventBus() *events.EventBus[*types.ExecutionLogEntry]
GetExecutionLogEventBus returns the bus for structured execution logs.
func (*LocalStorage) GetExecutionRecord ¶
func (ls *LocalStorage) GetExecutionRecord(ctx context.Context, executionID string) (*types.Execution, error)
GetExecutionRecord fetches a single execution row by execution_id.
func (*LocalStorage) GetExecutionVC ¶
func (ls *LocalStorage) GetExecutionVC(ctx context.Context, vcID string) (*types.ExecutionVCInfo, error)
func (*LocalStorage) GetExecutionWebhook ¶
func (ls *LocalStorage) GetExecutionWebhook(ctx context.Context, executionID string) (*types.ExecutionWebhook, error)
GetExecutionWebhook fetches the webhook registration for the given execution.
func (*LocalStorage) GetFullExecutionVC ¶
func (ls *LocalStorage) GetFullExecutionVC(vcID string) (json.RawMessage, string, error)
GetFullExecutionVC retrieves the full execution VC including the VC document and signature
func (*LocalStorage) GetLockStatus ¶
func (ls *LocalStorage) GetLockStatus(ctx context.Context, key string) (*types.DistributedLock, error)
GetLockStatus retrieves the status of a distributed lock.
func (*LocalStorage) GetMemory ¶
func (ls *LocalStorage) GetMemory(ctx context.Context, scope, scopeID, key string) (*types.Memory, error)
GetMemory retrieves a memory record from BoltDB or cache.
func (*LocalStorage) GetObservabilityWebhook ¶
func (ls *LocalStorage) GetObservabilityWebhook(ctx context.Context) (*types.ObservabilityWebhookConfig, error)
GetObservabilityWebhook retrieves the global observability webhook configuration. Returns nil if no webhook is configured.
func (*LocalStorage) GetReasonerExecutionHistory ¶
func (ls *LocalStorage) GetReasonerExecutionHistory(ctx context.Context, reasonerID string, page, limit int) (*types.ReasonerExecutionHistory, error)
GetReasonerExecutionHistory retrieves paginated execution history for a specific reasoner This is a read-only operation that leverages SQLite WAL mode for concurrent access
func (*LocalStorage) GetReasonerPerformanceMetrics ¶
func (ls *LocalStorage) GetReasonerPerformanceMetrics(ctx context.Context, reasonerID string) (*types.ReasonerPerformanceMetrics, error)
GetReasonerPerformanceMetrics retrieves performance metrics for a specific reasoner This is a read-only operation that leverages SQLite WAL mode for concurrent access
func (*LocalStorage) GetSession ¶
GetSession retrieves a session record from SQLite by ID
func (*LocalStorage) GetVector ¶
func (ls *LocalStorage) GetVector(ctx context.Context, scope, scopeID, key string) (*types.VectorRecord, error)
GetVector retrieves a vector embedding by key.
func (*LocalStorage) GetWorkflow ¶
func (ls *LocalStorage) GetWorkflow(ctx context.Context, workflowID string) (*types.Workflow, error)
GetWorkflow retrieves a workflow record from SQLite by ID
func (*LocalStorage) GetWorkflowExecution ¶
func (ls *LocalStorage) GetWorkflowExecution(ctx context.Context, executionID string) (*types.WorkflowExecution, error)
GetWorkflowExecution retrieves a workflow execution record from SQLite by ID
func (*LocalStorage) GetWorkflowExecutionEventBus ¶
func (ls *LocalStorage) GetWorkflowExecutionEventBus() *events.EventBus[*types.WorkflowExecutionEvent]
GetWorkflowExecutionEventBus returns the bus for workflow execution events.
func (*LocalStorage) GetWorkflowRun ¶
func (ls *LocalStorage) GetWorkflowRun(ctx context.Context, runID string) (*types.WorkflowRun, error)
func (*LocalStorage) GetWorkflowVC ¶
func (ls *LocalStorage) GetWorkflowVC(ctx context.Context, workflowVCID string) (*types.WorkflowVCInfo, error)
func (*LocalStorage) HasExecutionWebhook ¶
HasExecutionWebhook indicates whether an execution has a registered webhook.
func (*LocalStorage) HealthCheck ¶
func (ls *LocalStorage) HealthCheck(ctx context.Context) error
HealthCheck checks the health of the local storage including database integrity.
func (*LocalStorage) Initialize ¶
func (ls *LocalStorage) Initialize(ctx context.Context, config StorageConfig) error
Initialize sets up the SQLite and BoltDB databases.
func (*LocalStorage) ListAgentDIDs ¶
func (ls *LocalStorage) ListAgentDIDs(ctx context.Context) ([]*types.AgentDIDInfo, error)
func (*LocalStorage) ListAgentFieldServerDIDs ¶
func (ls *LocalStorage) ListAgentFieldServerDIDs(ctx context.Context) ([]*types.AgentFieldServerDIDInfo, error)
func (*LocalStorage) ListAgentGroups ¶
func (ls *LocalStorage) ListAgentGroups(ctx context.Context, teamID string) ([]types.AgentGroupSummary, error)
ListAgentGroups returns distinct agent groups with summary info for a team.
func (*LocalStorage) ListAgentTagVCs ¶
func (ls *LocalStorage) ListAgentTagVCs(ctx context.Context) ([]*types.AgentTagVCRecord, error)
ListAgentTagVCs returns all non-revoked agent tag VCs.
func (*LocalStorage) ListAgentVersions ¶
func (ls *LocalStorage) ListAgentVersions(ctx context.Context, id string) ([]*types.AgentNode, error)
ListAgentVersions returns all versioned agents with the given ID (version != ”).
func (*LocalStorage) ListAgents ¶
func (ls *LocalStorage) ListAgents(ctx context.Context, filters types.AgentFilters) ([]*types.AgentNode, error)
ListAgents retrieves agent node records from SQLite based on filters.
func (*LocalStorage) ListAgentsByGroup ¶
func (ls *LocalStorage) ListAgentsByGroup(ctx context.Context, groupID string) ([]*types.AgentNode, error)
ListAgentsByGroup returns all agents belonging to a specific group.
func (*LocalStorage) ListAgentsByLifecycleStatus ¶
func (ls *LocalStorage) ListAgentsByLifecycleStatus(ctx context.Context, status types.AgentLifecycleStatus) ([]*types.AgentNode, error)
ListAgentsByLifecycleStatus lists agents filtered by lifecycle status.
func (*LocalStorage) ListComponentDIDs ¶
func (ls *LocalStorage) ListComponentDIDs(ctx context.Context, agentDID string) ([]*types.ComponentDIDInfo, error)
func (*LocalStorage) ListConfigs ¶
func (ls *LocalStorage) ListConfigs(ctx context.Context) ([]*ConfigEntry, error)
ListConfigs returns all stored configuration entries.
func (*LocalStorage) ListDIDDocuments ¶
func (ls *LocalStorage) ListDIDDocuments(ctx context.Context) ([]*types.DIDDocumentRecord, error)
ListDIDDocuments lists all DID documents.
func (*LocalStorage) ListDIDs ¶
func (ls *LocalStorage) ListDIDs(ctx context.Context) ([]*types.DIDRegistryEntry, error)
func (*LocalStorage) ListDueExecutionWebhooks ¶
func (ls *LocalStorage) ListDueExecutionWebhooks(ctx context.Context, limit int) ([]*types.ExecutionWebhook, error)
ListDueExecutionWebhooks returns webhook registrations that are ready for delivery.
func (*LocalStorage) ListExecutionLogEntries ¶
func (ls *LocalStorage) ListExecutionLogEntries(ctx context.Context, executionID string, afterSeq *int64, limit int, levels []string, nodeIDs []string, sources []string, query string) ([]*types.ExecutionLogEntry, error)
ListExecutionLogEntries retrieves structured execution logs ordered by sequence.
func (*LocalStorage) ListExecutionVCs ¶
func (ls *LocalStorage) ListExecutionVCs(ctx context.Context, filters types.VCFilters) ([]*types.ExecutionVCInfo, error)
func (*LocalStorage) ListExecutionWebhookEvents ¶
func (ls *LocalStorage) ListExecutionWebhookEvents(ctx context.Context, executionID string) ([]*types.ExecutionWebhookEvent, error)
ListExecutionWebhookEvents returns webhook attempts ordered by creation time.
func (*LocalStorage) ListExecutionWebhookEventsBatch ¶
func (ls *LocalStorage) ListExecutionWebhookEventsBatch(ctx context.Context, executionIDs []string) (map[string][]*types.ExecutionWebhookEvent, error)
ListExecutionWebhookEventsBatch fetches webhook events for multiple executions in a single query.
func (*LocalStorage) ListExecutionWebhooksRegistered ¶
func (ls *LocalStorage) ListExecutionWebhooksRegistered(ctx context.Context, executionIDs []string) (map[string]bool, error)
ListExecutionWebhooksRegistered returns a map of execution IDs that have webhook registrations.
func (*LocalStorage) ListMemory ¶
func (ls *LocalStorage) ListMemory(ctx context.Context, scope, scopeID string) ([]*types.Memory, error)
ListMemory retrieves all memory records for a given scope and scope ID from BoltDB.
func (*LocalStorage) ListWorkflowExecutionEvents ¶
func (ls *LocalStorage) ListWorkflowExecutionEvents(ctx context.Context, executionID string, afterSeq *int64, limit int) ([]*types.WorkflowExecutionEvent, error)
ListWorkflowExecutionEvents retrieves execution events ordered by sequence.
func (*LocalStorage) ListWorkflowVCStatusSummaries ¶
func (ls *LocalStorage) ListWorkflowVCStatusSummaries(ctx context.Context, workflowIDs []string) ([]*types.WorkflowVCStatusAggregation, error)
func (*LocalStorage) ListWorkflowVCs ¶
func (ls *LocalStorage) ListWorkflowVCs(ctx context.Context, workflowID string) ([]*types.WorkflowVCInfo, error)
func (*LocalStorage) MarkStaleExecutions ¶
func (ls *LocalStorage) MarkStaleExecutions(ctx context.Context, staleAfter time.Duration, limit int) (int, error)
MarkStaleExecutions updates executions stuck in non-terminal states beyond the provided timeout. Staleness is determined by updated_at (last activity) rather than started_at, so legitimately long-running executions that are still making progress are not incorrectly timed out.
INVARIANT: callers must ensure updated_at is bumped on every meaningful execution activity. If updated_at is not maintained, active executions may be incorrectly reaped. Uses COALESCE(updated_at, created_at, started_at) to handle rows where updated_at may be NULL.
func (*LocalStorage) MarkStaleWorkflowExecutions ¶
func (ls *LocalStorage) MarkStaleWorkflowExecutions(ctx context.Context, staleAfter time.Duration, limit int) (int, error)
MarkStaleWorkflowExecutions updates workflow executions stuck in non-terminal states when their updated_at timestamp exceeds the staleAfter threshold. This catches orphaned child executions whose parent failed without cascading cancellation.
See MarkStaleExecutions for the updated_at invariant and COALESCE fallback rationale.
func (*LocalStorage) NewUnitOfWork ¶
func (ls *LocalStorage) NewUnitOfWork() UnitOfWork
NewUnitOfWork creates a new unit of work instance for this storage
func (*LocalStorage) NewWorkflowUnitOfWork ¶
func (ls *LocalStorage) NewWorkflowUnitOfWork() WorkflowUnitOfWork
NewWorkflowUnitOfWork creates a new workflow-specific unit of work instance for this storage
func (*LocalStorage) PruneExecutionLogEntries ¶
func (ls *LocalStorage) PruneExecutionLogEntries(ctx context.Context, executionID string, maxEntries int, olderThan time.Time) error
PruneExecutionLogEntries trims old or excessive execution logs for a single execution.
func (*LocalStorage) Publish ¶
func (ls *LocalStorage) Publish(channel string, message interface{}) error
Publish implements the CacheProvider Publish method using local pub/sub.
func (*LocalStorage) PublishMemoryChange ¶
func (ls *LocalStorage) PublishMemoryChange(ctx context.Context, event types.MemoryChangeEvent) error
PublishMemoryChange implements the StorageProvider PublishMemoryChange method using local pub/sub.
func (*LocalStorage) QueryAgentConfigurations ¶
func (ls *LocalStorage) QueryAgentConfigurations(ctx context.Context, filters types.ConfigurationFilters) ([]*types.AgentConfiguration, error)
QueryAgentConfigurations retrieves agent configuration records from SQLite based on filters
func (*LocalStorage) QueryAgentPackages ¶
func (ls *LocalStorage) QueryAgentPackages(ctx context.Context, filters types.PackageFilters) ([]*types.AgentPackage, error)
QueryAgentPackages retrieves agent package records from SQLite based on filters
func (*LocalStorage) QueryExecutionRecords ¶
func (ls *LocalStorage) QueryExecutionRecords(ctx context.Context, filter types.ExecutionFilter) ([]*types.Execution, error)
QueryExecutionRecords runs a filtered query returning all matching executions.
func (*LocalStorage) QueryExecutions ¶
func (ls *LocalStorage) QueryExecutions(ctx context.Context, filters types.ExecutionFilters) ([]*types.AgentExecution, error)
QueryExecutions retrieves agent execution records based on filters using GORM.
func (*LocalStorage) QueryRunSummaries ¶
func (ls *LocalStorage) QueryRunSummaries(ctx context.Context, filter types.ExecutionFilter) ([]*RunSummaryAggregation, int, error)
QueryRunSummaries returns aggregated statistics for workflow runs without fetching all execution records. The implementation uses a single GROUP BY query plus a lightweight COUNT for total runs to stay fast even when page_size is large.
func (*LocalStorage) QuerySessions ¶
func (ls *LocalStorage) QuerySessions(ctx context.Context, filters types.SessionFilters) ([]*types.Session, error)
QuerySessions retrieves session records from SQLite based on filters
func (*LocalStorage) QueryWorkflowDAG ¶
func (ls *LocalStorage) QueryWorkflowDAG(ctx context.Context, rootWorkflowID string) ([]*types.WorkflowExecution, error)
QueryWorkflowDAG retrieves a complete workflow DAG using recursive CTE for optimal performance
func (*LocalStorage) QueryWorkflowExecutions ¶
func (ls *LocalStorage) QueryWorkflowExecutions(ctx context.Context, filters types.WorkflowExecutionFilters) ([]*types.WorkflowExecution, error)
QueryWorkflowExecutions retrieves workflow execution records from SQLite based on filters
func (*LocalStorage) QueryWorkflows ¶
func (ls *LocalStorage) QueryWorkflows(ctx context.Context, filters types.WorkflowFilters) ([]*types.Workflow, error)
QueryWorkflows retrieves workflow records from SQLite based on filters
func (*LocalStorage) RegisterAgent ¶
RegisterAgent stores an agent node record in SQLite.
func (*LocalStorage) RegisterExecutionWebhook ¶
func (ls *LocalStorage) RegisterExecutionWebhook(ctx context.Context, webhook *types.ExecutionWebhook) error
RegisterExecutionWebhook stores or updates the webhook registration for an execution.
func (*LocalStorage) ReleaseLock ¶
func (ls *LocalStorage) ReleaseLock(ctx context.Context, lockID string) error
ReleaseLock releases a distributed lock.
func (*LocalStorage) RenewLock ¶
func (ls *LocalStorage) RenewLock(ctx context.Context, lockID string) (*types.DistributedLock, error)
RenewLock renews a distributed lock to extend its TTL.
func (*LocalStorage) RetryStaleWorkflowExecutions ¶
func (ls *LocalStorage) RetryStaleWorkflowExecutions(ctx context.Context, staleAfter time.Duration, maxRetries int, limit int) ([]string, error)
RetryStaleWorkflowExecutions finds stale workflow executions that haven't exceeded maxRetries and resets both workflow_executions and executions back to "pending" so the paired records stay in sync for the retry path.
func (*LocalStorage) RevokeAgentTagVC ¶
func (ls *LocalStorage) RevokeAgentTagVC(ctx context.Context, agentID string) error
RevokeAgentTagVC marks an agent's tag VC as revoked.
func (*LocalStorage) RevokeDIDDocument ¶
func (ls *LocalStorage) RevokeDIDDocument(ctx context.Context, did string) error
RevokeDIDDocument revokes a DID document by setting its revoked_at timestamp.
func (*LocalStorage) Set ¶
func (ls *LocalStorage) Set(key string, value interface{}, ttl time.Duration) error
Set implements the CacheProvider Set method using the in-memory cache.
func (*LocalStorage) SetConfig ¶
func (ls *LocalStorage) SetConfig(ctx context.Context, key string, value string, updatedBy string) error
SetConfig upserts a configuration entry in the database. On conflict (duplicate key), it increments the version and updates the value.
func (*LocalStorage) SetObservabilityWebhook ¶
func (ls *LocalStorage) SetObservabilityWebhook(ctx context.Context, config *types.ObservabilityWebhookConfig) error
SetObservabilityWebhook stores or updates the global observability webhook configuration. Uses upsert pattern to handle both insert and update.
func (*LocalStorage) SetVector ¶
func (ls *LocalStorage) SetVector(ctx context.Context, record *types.VectorRecord) error
SetVector stores or updates a vector embedding for the specified scope/key.
func (*LocalStorage) SimilaritySearch ¶
func (ls *LocalStorage) SimilaritySearch(ctx context.Context, scope, scopeID string, queryEmbedding []float32, topK int, filters map[string]interface{}) ([]*types.VectorSearchResult, error)
SimilaritySearch performs a similarity search within a scope using the configured vector backend.
func (*LocalStorage) StoreAgentConfiguration ¶
func (ls *LocalStorage) StoreAgentConfiguration(ctx context.Context, config *types.AgentConfiguration) error
StoreAgentConfiguration stores an agent configuration record in SQLite
func (*LocalStorage) StoreAgentDID ¶
func (ls *LocalStorage) StoreAgentDID(ctx context.Context, agentID, agentDID, agentfieldServerDID, publicKeyJWK string, derivationIndex int) error
Agent DID operations
func (*LocalStorage) StoreAgentDIDWithComponents ¶
func (ls *LocalStorage) StoreAgentDIDWithComponents(ctx context.Context, agentID, agentDID, agentfieldServerDID, publicKeyJWK string, derivationIndex int, components []ComponentDIDRequest) error
StoreAgentDIDWithComponents stores an agent DID along with its component DIDs in a single transaction
func (*LocalStorage) StoreAgentFieldServerDID ¶
func (ls *LocalStorage) StoreAgentFieldServerDID(ctx context.Context, agentfieldServerID, rootDID string, masterSeed []byte, createdAt, lastKeyRotation time.Time) error
AgentField Server DID operations
func (*LocalStorage) StoreAgentPackage ¶
func (ls *LocalStorage) StoreAgentPackage(ctx context.Context, pkg *types.AgentPackage) error
StoreAgentPackage stores an agent package record in SQLite
func (*LocalStorage) StoreAgentTagVC ¶
func (ls *LocalStorage) StoreAgentTagVC(ctx context.Context, agentID, agentDID, vcID, vcDocument, signature string, issuedAt time.Time, expiresAt *time.Time) error
StoreAgentTagVC stores or replaces an agent's tag VC.
func (*LocalStorage) StoreComponentDID ¶
func (ls *LocalStorage) StoreComponentDID(ctx context.Context, componentID, componentDID, agentDID, componentType, componentName string, derivationIndex int) error
Component DID operations
func (*LocalStorage) StoreDID ¶
func (ls *LocalStorage) StoreDID(ctx context.Context, did string, didDocument, publicKey, privateKeyRef, derivationPath string) error
DID Registry operations
func (*LocalStorage) StoreDIDDocument ¶
func (ls *LocalStorage) StoreDIDDocument(ctx context.Context, record *types.DIDDocumentRecord) error
StoreDIDDocument stores a DID document record.
func (*LocalStorage) StoreEvent ¶
func (ls *LocalStorage) StoreEvent(ctx context.Context, event *types.MemoryChangeEvent) error
StoreEvent saves a memory change event to the database.
func (*LocalStorage) StoreExecution ¶
func (ls *LocalStorage) StoreExecution(ctx context.Context, execution *types.AgentExecution) error
StoreExecution stores an agent execution record in SQLite.
func (*LocalStorage) StoreExecutionLogEntries ¶
func (ls *LocalStorage) StoreExecutionLogEntries(ctx context.Context, executionID string, entries []*types.ExecutionLogEntry) error
StoreExecutionLogEntries atomically stores a batch of structured execution logs for one execution.
func (*LocalStorage) StoreExecutionLogEntry ¶
func (ls *LocalStorage) StoreExecutionLogEntry(ctx context.Context, entry *types.ExecutionLogEntry) error
StoreExecutionLogEntry inserts a structured execution log entry and publishes it to subscribers.
func (*LocalStorage) StoreExecutionVC ¶
func (ls *LocalStorage) StoreExecutionVC(ctx context.Context, vcID, executionID, workflowID, sessionID, issuerDID, targetDID, callerDID, inputHash, outputHash, status string, vcDocument []byte, signature string, storageURI string, documentSizeBytes int64) error
Execution VC operations
func (*LocalStorage) StoreExecutionWebhookEvent ¶
func (ls *LocalStorage) StoreExecutionWebhookEvent(ctx context.Context, event *types.ExecutionWebhookEvent) error
StoreExecutionWebhookEvent records webhook delivery attempts for SQLite deployments.
func (*LocalStorage) StoreWorkflowExecution ¶
func (ls *LocalStorage) StoreWorkflowExecution(ctx context.Context, execution *types.WorkflowExecution) error
StoreWorkflowExecution stores a workflow execution record in SQLite with UPSERT capability Uses transactions to prevent database corruption - SQLite WAL mode handles write coordination
func (*LocalStorage) StoreWorkflowExecutionEvent ¶
func (ls *LocalStorage) StoreWorkflowExecutionEvent(ctx context.Context, event *types.WorkflowExecutionEvent) error
StoreWorkflowExecutionEvent inserts an immutable execution event into SQLite.
func (*LocalStorage) StoreWorkflowExecutionWithUnitOfWork ¶
func (ls *LocalStorage) StoreWorkflowExecutionWithUnitOfWork(ctx context.Context, execution *types.WorkflowExecution) error
StoreWorkflowExecutionWithUnitOfWork demonstrates using Unit of Work for atomic operations
func (*LocalStorage) StoreWorkflowRun ¶
func (ls *LocalStorage) StoreWorkflowRun(ctx context.Context, run *types.WorkflowRun) error
func (*LocalStorage) StoreWorkflowRunEvent ¶
func (ls *LocalStorage) StoreWorkflowRunEvent(ctx context.Context, event *types.WorkflowRunEvent) error
func (*LocalStorage) StoreWorkflowStep ¶
func (ls *LocalStorage) StoreWorkflowStep(ctx context.Context, step *types.WorkflowStep) error
func (*LocalStorage) StoreWorkflowVC ¶
func (ls *LocalStorage) StoreWorkflowVC(ctx context.Context, workflowVCID, workflowID, sessionID string, componentVCIDs []string, status string, startTime, endTime *time.Time, totalSteps, completedSteps int, storageURI string, documentSizeBytes int64) error
Workflow VC operations
func (*LocalStorage) Subscribe ¶
func (ls *LocalStorage) Subscribe(channel string) (<-chan CacheMessage, error)
Subscribe implements the CacheProvider Subscribe method using local pub/sub.
func (*LocalStorage) SubscribeToMemoryChanges ¶
func (ls *LocalStorage) SubscribeToMemoryChanges(ctx context.Context, scope, scopeID string) (<-chan types.MemoryChangeEvent, error)
SubscribeToMemoryChanges implements the StorageProvider SubscribeToMemoryChanges method using local pub/sub.
func (*LocalStorage) TryMarkExecutionWebhookInFlight ¶
func (ls *LocalStorage) TryMarkExecutionWebhookInFlight(ctx context.Context, executionID string, now time.Time) (bool, error)
TryMarkExecutionWebhookInFlight atomically marks a webhook registration as delivering.
func (*LocalStorage) UpdateAccessPolicy ¶
func (ls *LocalStorage) UpdateAccessPolicy(ctx context.Context, policy *types.AccessPolicy) error
UpdateAccessPolicy updates an existing access policy.
func (*LocalStorage) UpdateAgentConfiguration ¶
func (ls *LocalStorage) UpdateAgentConfiguration(ctx context.Context, config *types.AgentConfiguration) error
UpdateAgentConfiguration updates an existing agent configuration record
func (*LocalStorage) UpdateAgentHealth ¶
func (ls *LocalStorage) UpdateAgentHealth(ctx context.Context, id string, status types.HealthStatus) error
UpdateAgentHealth updates the health status of an agent node in SQLite. IMPORTANT: This method ONLY updates health_status, never last_heartbeat (only heartbeat endpoint should do that)
func (*LocalStorage) UpdateAgentHealthAtomic ¶
func (ls *LocalStorage) UpdateAgentHealthAtomic(ctx context.Context, id string, status types.HealthStatus, expectedLastHeartbeat *time.Time) error
UpdateAgentHealthAtomic updates the health status of an agent node atomically with optimistic locking. If expectedLastHeartbeat is provided, the update will only succeed if the current last_heartbeat matches. This prevents race conditions between health monitor and heartbeat updates. IMPORTANT: This method ONLY updates health_status, never last_heartbeat (only heartbeat endpoint should do that)
func (*LocalStorage) UpdateAgentHeartbeat ¶
func (ls *LocalStorage) UpdateAgentHeartbeat(ctx context.Context, id string, version string, heartbeatTime time.Time) error
UpdateAgentHeartbeat updates only the heartbeat timestamp of an agent node in SQLite. If version is empty, it updates the default (unversioned) agent.
func (*LocalStorage) UpdateAgentLifecycleStatus ¶
func (ls *LocalStorage) UpdateAgentLifecycleStatus(ctx context.Context, id string, status types.AgentLifecycleStatus) error
UpdateAgentLifecycleStatus updates the lifecycle status of an agent node in SQLite.
func (*LocalStorage) UpdateAgentPackage ¶
func (ls *LocalStorage) UpdateAgentPackage(ctx context.Context, pkg *types.AgentPackage) error
UpdateAgentPackage updates an existing agent package record
func (*LocalStorage) UpdateAgentTrafficWeight ¶
func (ls *LocalStorage) UpdateAgentTrafficWeight(ctx context.Context, id string, version string, weight int) error
UpdateAgentTrafficWeight sets the traffic_weight for a specific (id, version) pair.
func (*LocalStorage) UpdateAgentVersion ¶
UpdateAgentVersion updates only the version field for an agent node.
func (*LocalStorage) UpdateExecutionRecord ¶
func (ls *LocalStorage) UpdateExecutionRecord(ctx context.Context, executionID string, updater func(*types.Execution) (*types.Execution, error)) (*types.Execution, error)
UpdateExecutionRecord applies an update callback atomically. The callback mutates a types.Execution copy and the result gets persisted.
func (*LocalStorage) UpdateExecutionWebhookState ¶
func (ls *LocalStorage) UpdateExecutionWebhookState(ctx context.Context, executionID string, update types.ExecutionWebhookStateUpdate) error
UpdateExecutionWebhookState persists the latest delivery state for a webhook registration.
func (*LocalStorage) UpdateWorkflowExecution ¶
func (ls *LocalStorage) UpdateWorkflowExecution(ctx context.Context, executionID string, updateFunc func(execution *types.WorkflowExecution) (*types.WorkflowExecution, error)) error
UpdateWorkflowExecution atomically updates a workflow execution using a user-provided update function This eliminates the read-modify-write race condition by performing the entire operation within a single transaction
func (*LocalStorage) ValidateAgentConfiguration ¶
func (ls *LocalStorage) ValidateAgentConfiguration(ctx context.Context, agentID, packageID string, config map[string]interface{}) (*types.ConfigurationValidationResult, error)
ValidateAgentConfiguration validates a configuration against the package schema
type LocalStorageConfig ¶
type LocalStorageConfig struct {
DatabasePath string `yaml:"database_path" mapstructure:"database_path"`
KVStorePath string `yaml:"kv_store_path" mapstructure:"kv_store_path"`
}
LocalStorageConfig holds configuration for the local storage provider.
type ObservabilityDeadLetterQueueModel ¶
type ObservabilityDeadLetterQueueModel struct {
ID int64 `gorm:"column:id;primaryKey;autoIncrement"`
EventType string `gorm:"column:event_type;not null"`
EventSource string `gorm:"column:event_source;not null"`
EventTimestamp time.Time `gorm:"column:event_timestamp;not null"`
Payload string `gorm:"column:payload;not null"`
ErrorMessage string `gorm:"column:error_message;not null"`
RetryCount int `gorm:"column:retry_count;not null;default:0"`
CreatedAt time.Time `gorm:"column:created_at;autoCreateTime"`
}
ObservabilityDeadLetterQueueModel represents failed observability events for retry.
func (ObservabilityDeadLetterQueueModel) TableName ¶
func (ObservabilityDeadLetterQueueModel) TableName() string
type ObservabilityWebhookModel ¶
type ObservabilityWebhookModel struct {
ID string `gorm:"column:id;primaryKey;default:'global'"`
URL string `gorm:"column:url;not null"`
Secret *string `gorm:"column:secret"`
Headers string `gorm:"column:headers;default:'{}'"`
Enabled bool `gorm:"column:enabled;not null;default:true"`
CreatedAt time.Time `gorm:"column:created_at;autoCreateTime"`
UpdatedAt time.Time `gorm:"column:updated_at;autoUpdateTime"`
}
ObservabilityWebhookModel represents the global observability webhook configuration. This is a singleton table with only one row (id='global').
func (ObservabilityWebhookModel) TableName ¶
func (ObservabilityWebhookModel) TableName() string
type PostgresStorageConfig ¶
type PostgresStorageConfig struct {
DSN string `yaml:"dsn" mapstructure:"dsn"`
URL string `yaml:"url" mapstructure:"url"`
Host string `yaml:"host" mapstructure:"host"`
Port int `yaml:"port" mapstructure:"port"`
Database string `yaml:"database" mapstructure:"database"`
User string `yaml:"user" mapstructure:"user"`
Password string `yaml:"password" mapstructure:"password"`
SSLMode string `yaml:"sslmode" mapstructure:"sslmode"`
AdminDatabase string `yaml:"admin_database" mapstructure:"admin_database"`
ConnMaxLifetime time.Duration `yaml:"conn_max_lifetime" mapstructure:"conn_max_lifetime"`
MaxOpenConns int `yaml:"max_open_conns" mapstructure:"max_open_conns"`
MaxIdleConns int `yaml:"max_idle_conns" mapstructure:"max_idle_conns"`
}
PostgresStorageConfig holds configuration for the PostgreSQL storage provider.
type RunSummaryAggregation ¶
type RunSummaryAggregation struct {
RunID string
TotalExecutions int
StatusCounts map[string]int
EarliestStarted time.Time
LatestStarted time.Time
RootExecutionID *string
RootStatus *string
RootAgentNodeID *string
RootReasonerID *string
SessionID *string
ActorID *string
MaxDepth int
ActiveExecutions int
}
RunSummaryAggregation holds aggregated statistics for a single workflow run
type SchemaMigrationModel ¶
type SchemaMigrationModel struct {
Version string `gorm:"column:version;primaryKey"`
AppliedAt time.Time `gorm:"column:applied_at;autoCreateTime"`
Description string `gorm:"column:description"`
}
func (SchemaMigrationModel) TableName ¶
func (SchemaMigrationModel) TableName() string
type SessionModel ¶
type SessionModel struct {
SessionID string `gorm:"column:session_id;primaryKey"`
ActorID *string `gorm:"column:actor_id;index"`
SessionName *string `gorm:"column:session_name"`
ParentSessionID *string `gorm:"column:parent_session_id"`
RootSessionID *string `gorm:"column:root_session_id;index"`
TotalWorkflows int `gorm:"column:total_workflows;default:0"`
TotalExecutions int `gorm:"column:total_executions;default:0"`
TotalDurationMS int `gorm:"column:total_duration_ms;default:0"`
StartedAt time.Time `gorm:"column:started_at;not null"`
LastActivityAt time.Time `gorm:"column:last_activity_at;not null"`
CreatedAt time.Time `gorm:"column:created_at;autoCreateTime"`
UpdatedAt time.Time `gorm:"column:updated_at;autoUpdateTime"`
}
func (SessionModel) TableName ¶
func (SessionModel) TableName() string
type StorageConfig ¶
type StorageConfig struct {
Mode string `yaml:"mode" mapstructure:"mode"`
Local LocalStorageConfig `yaml:"local" mapstructure:"local"`
Postgres PostgresStorageConfig `yaml:"postgres" mapstructure:"postgres"`
Vector VectorStoreConfig `yaml:"vector" mapstructure:"vector"`
}
StorageConfig holds the configuration for the storage provider.
type StorageFactory ¶
type StorageFactory struct{}
StorageFactory is responsible for creating the appropriate storage backend.
func (*StorageFactory) CreateStorage ¶
func (sf *StorageFactory) CreateStorage(config StorageConfig) (StorageProvider, CacheProvider, error)
CreateStorage creates a StorageProvider and CacheProvider based on the configuration.
type StorageProvider ¶
type StorageProvider interface {
// Lifecycle
// Initialize prepares the storage backend using the provided configuration.
// The ctx controls setup lifetime, and config supplies backend settings.
// Returns an error if initialization fails.
Initialize(ctx context.Context, config StorageConfig) error
// Close releases storage resources associated with the provider.
// The ctx bounds shutdown and cleanup work for the backend.
// Returns an error if resources cannot be closed cleanly.
Close(ctx context.Context) error
// HealthCheck verifies that the storage backend is reachable and operational.
// The ctx controls how long the health probe may run.
// Returns an error when the backend is unhealthy or unreachable.
HealthCheck(ctx context.Context) error
// Execution operations
// StoreExecution persists an agent execution record.
// The ctx scopes the write, and execution contains the execution data to store.
// Returns an error if the execution cannot be saved.
StoreExecution(ctx context.Context, execution *types.AgentExecution) error
// GetExecution fetches an agent execution by its numeric identifier.
// The ctx scopes the read, and id selects the execution to load.
// Returns the execution record or an error if it cannot be found or read.
GetExecution(ctx context.Context, id int64) (*types.AgentExecution, error)
// QueryExecutions lists agent executions matching the provided filters.
// The ctx scopes the query, and filters define the selection criteria.
// Returns matching executions or an error if the query fails.
QueryExecutions(ctx context.Context, filters types.ExecutionFilters) ([]*types.AgentExecution, error)
// Workflow execution operations
// StoreWorkflowExecution persists a workflow execution record.
// The ctx scopes the write, and execution contains the workflow execution data.
// Returns an error if the workflow execution cannot be saved.
StoreWorkflowExecution(ctx context.Context, execution *types.WorkflowExecution) error
// GetWorkflowExecution retrieves a workflow execution by its identifier.
// The ctx scopes the read, and executionID identifies the workflow execution.
// Returns the workflow execution or an error if it is missing or unreadable.
GetWorkflowExecution(ctx context.Context, executionID string) (*types.WorkflowExecution, error)
// QueryWorkflowExecutions lists workflow executions that match the given filters.
// The ctx scopes the query, and filters specify which executions to include.
// Returns matching workflow executions or an error if the query fails.
QueryWorkflowExecutions(ctx context.Context, filters types.WorkflowExecutionFilters) ([]*types.WorkflowExecution, error)
// UpdateWorkflowExecution applies an update function to a workflow execution record.
// The ctx scopes the operation, executionID selects the record, and updateFunc mutates it.
// Returns an error if the execution cannot be loaded, updated, or saved.
UpdateWorkflowExecution(ctx context.Context, executionID string, updateFunc func(execution *types.WorkflowExecution) (*types.WorkflowExecution, error)) error
// CreateExecutionRecord creates a primary execution record.
// The ctx scopes the write, and execution contains the record to insert.
// Returns an error if the record cannot be created.
CreateExecutionRecord(ctx context.Context, execution *types.Execution) error
// GetExecutionRecord retrieves a primary execution record by identifier.
// The ctx scopes the read, and executionID selects the record to fetch.
// Returns the execution record or an error if it is missing or unreadable.
GetExecutionRecord(ctx context.Context, executionID string) (*types.Execution, error)
// UpdateExecutionRecord updates a primary execution record through a callback.
// The ctx scopes the operation, executionID selects the record, and update transforms it.
// Returns the updated execution or an error if the update cannot be completed.
UpdateExecutionRecord(ctx context.Context, executionID string, update func(*types.Execution) (*types.Execution, error)) (*types.Execution, error)
// QueryExecutionRecords lists execution records that satisfy the provided filter.
// The ctx scopes the query, and filter describes the records to include.
// Returns matching execution records or an error if the query fails.
QueryExecutionRecords(ctx context.Context, filter types.ExecutionFilter) ([]*types.Execution, error)
// QueryRunSummaries aggregates execution records into run-level summaries.
// The ctx scopes the query, and filter limits which executions are aggregated.
// Returns summaries, a total count, and an error if aggregation fails.
QueryRunSummaries(ctx context.Context, filter types.ExecutionFilter) ([]*RunSummaryAggregation, int, error)
// RegisterExecutionWebhook stores webhook metadata for an execution.
// The ctx scopes the write, and webhook contains the registration details.
// Returns an error if the webhook cannot be registered.
RegisterExecutionWebhook(ctx context.Context, webhook *types.ExecutionWebhook) error
// GetExecutionWebhook retrieves webhook metadata for an execution.
// The ctx scopes the read, and executionID identifies the webhook to load.
// Returns the webhook record or an error if it is missing or unreadable.
GetExecutionWebhook(ctx context.Context, executionID string) (*types.ExecutionWebhook, error)
// ListDueExecutionWebhooks returns pending execution webhooks ready for delivery.
// The ctx scopes the query, and limit caps how many due webhooks to return.
// Returns due webhooks or an error if the lookup fails.
ListDueExecutionWebhooks(ctx context.Context, limit int) ([]*types.ExecutionWebhook, error)
// TryMarkExecutionWebhookInFlight marks a due webhook as being processed.
// The ctx scopes the update, executionID selects the webhook, and now timestamps the attempt.
// Returns true if the webhook was claimed, or false with an error on failure.
TryMarkExecutionWebhookInFlight(ctx context.Context, executionID string, now time.Time) (bool, error)
// UpdateExecutionWebhookState updates stored state for an execution webhook.
// The ctx scopes the write, executionID selects the webhook, and update carries new state.
// Returns an error if the webhook state cannot be updated.
UpdateExecutionWebhookState(ctx context.Context, executionID string, update types.ExecutionWebhookStateUpdate) error
// HasExecutionWebhook reports whether an execution has a registered webhook.
// The ctx scopes the lookup, and executionID identifies the execution to check.
// Returns a boolean result or an error if the check fails.
HasExecutionWebhook(ctx context.Context, executionID string) (bool, error)
// ListExecutionWebhooksRegistered reports webhook registration status for executions.
// The ctx scopes the lookup, and executionIDs lists the executions to inspect.
// Returns a map keyed by execution ID or an error if the query fails.
ListExecutionWebhooksRegistered(ctx context.Context, executionIDs []string) (map[string]bool, error)
// StoreExecutionWebhookEvent persists a webhook delivery event for an execution.
// The ctx scopes the write, and event contains the webhook event payload.
// Returns an error if the event cannot be stored.
StoreExecutionWebhookEvent(ctx context.Context, event *types.ExecutionWebhookEvent) error
// ListExecutionWebhookEvents retrieves webhook events for one execution.
// The ctx scopes the query, and executionID identifies which events to return.
// Returns the event list or an error if the lookup fails.
ListExecutionWebhookEvents(ctx context.Context, executionID string) ([]*types.ExecutionWebhookEvent, error)
// ListExecutionWebhookEventsBatch retrieves webhook events for multiple executions.
// The ctx scopes the query, and executionIDs identifies the executions to load.
// Returns events keyed by execution ID or an error if the lookup fails.
ListExecutionWebhookEventsBatch(ctx context.Context, executionIDs []string) (map[string][]*types.ExecutionWebhookEvent, error)
// StoreWorkflowExecutionEvent persists an event associated with a workflow execution.
// The ctx scopes the write, and event contains the workflow execution event data.
// Returns an error if the event cannot be stored.
StoreWorkflowExecutionEvent(ctx context.Context, event *types.WorkflowExecutionEvent) error
// ListWorkflowExecutionEvents lists events for a workflow execution in sequence order.
// The ctx scopes the query, executionID selects the execution, and afterSeq and limit page results.
// Returns matching workflow events or an error if the query fails.
ListWorkflowExecutionEvents(ctx context.Context, executionID string, afterSeq *int64, limit int) ([]*types.WorkflowExecutionEvent, error)
StoreExecutionLogEntry(ctx context.Context, entry *types.ExecutionLogEntry) error
ListExecutionLogEntries(ctx context.Context, executionID string, afterSeq *int64, limit int, levels []string, nodeIDs []string, sources []string, query string) ([]*types.ExecutionLogEntry, error)
PruneExecutionLogEntries(ctx context.Context, executionID string, maxEntries int, olderThan time.Time) error
// Execution cleanup operations
// CleanupOldExecutions deletes execution data older than the retention period.
// The ctx scopes the cleanup, retentionPeriod sets the age threshold, and batchSize limits each pass.
// Returns the number of cleaned executions or an error if cleanup fails.
CleanupOldExecutions(ctx context.Context, retentionPeriod time.Duration, batchSize int) (int, error)
// MarkStaleExecutions marks long-running executions as stale.
// The ctx scopes the update, staleAfter sets the cutoff age, and limit bounds affected rows.
// Returns the number marked stale or an error if the update fails.
MarkStaleExecutions(ctx context.Context, staleAfter time.Duration, limit int) (int, error)
// MarkStaleWorkflowExecutions marks long-running workflow executions as stale.
// The ctx scopes the update, staleAfter sets the cutoff age, and limit bounds affected rows.
// Returns the number marked stale or an error if the update fails.
MarkStaleWorkflowExecutions(ctx context.Context, staleAfter time.Duration, limit int) (int, error)
// RetryStaleWorkflowExecutions resets stale workflow executions with retry_count < maxRetries
// back to "pending" status with incremented retry_count. Returns IDs of retried executions.
RetryStaleWorkflowExecutions(ctx context.Context, staleAfter time.Duration, maxRetries int, limit int) ([]string, error)
// Workflow cleanup operations - deletes all data related to a workflow ID
// CleanupWorkflow removes all stored data associated with a workflow.
// The ctx scopes the operation, workflowID selects the workflow, and dryRun skips destructive changes.
// Returns a cleanup summary or an error if the operation fails.
CleanupWorkflow(ctx context.Context, workflowID string, dryRun bool) (*types.WorkflowCleanupResult, error)
// DAG operations - optimized single-query DAG building
// QueryWorkflowDAG loads workflow executions needed to assemble a workflow DAG.
// The ctx scopes the query, and rootWorkflowID identifies the DAG root workflow.
// Returns workflow execution nodes or an error if the query fails.
QueryWorkflowDAG(ctx context.Context, rootWorkflowID string) ([]*types.WorkflowExecution, error)
// Workflow operations
// CreateOrUpdateWorkflow stores a workflow, creating it or replacing existing state.
// The ctx scopes the write, and workflow contains the workflow definition to persist.
// Returns an error if the workflow cannot be saved.
CreateOrUpdateWorkflow(ctx context.Context, workflow *types.Workflow) error
// GetWorkflow retrieves a workflow by its identifier.
// The ctx scopes the read, and workflowID selects the workflow to load.
// Returns the workflow or an error if it is missing or unreadable.
GetWorkflow(ctx context.Context, workflowID string) (*types.Workflow, error)
// QueryWorkflows lists workflows that satisfy the provided filters.
// The ctx scopes the query, and filters define which workflows to include.
// Returns matching workflows or an error if the query fails.
QueryWorkflows(ctx context.Context, filters types.WorkflowFilters) ([]*types.Workflow, error)
// Session operations
// CreateOrUpdateSession stores a session, creating it or replacing existing state.
// The ctx scopes the write, and session contains the session data to persist.
// Returns an error if the session cannot be saved.
CreateOrUpdateSession(ctx context.Context, session *types.Session) error
// GetSession retrieves a session by its identifier.
// The ctx scopes the read, and sessionID selects the session to load.
// Returns the session or an error if it is missing or unreadable.
GetSession(ctx context.Context, sessionID string) (*types.Session, error)
// QuerySessions lists sessions matching the provided filters.
// The ctx scopes the query, and filters define which sessions to return.
// Returns matching sessions or an error if the query fails.
QuerySessions(ctx context.Context, filters types.SessionFilters) ([]*types.Session, error)
// Memory operations
// SetMemory stores or replaces a memory record.
// The ctx scopes the write, and memory contains the scoped memory data to persist.
// Returns an error if the memory record cannot be saved.
SetMemory(ctx context.Context, memory *types.Memory) error
// GetMemory retrieves a memory record by scope, scope ID, and key.
// The ctx scopes the read, and scope, scopeID, and key identify the memory entry.
// Returns the memory record or an error if it is missing or unreadable.
GetMemory(ctx context.Context, scope, scopeID, key string) (*types.Memory, error)
// DeleteMemory removes a memory record by scope, scope ID, and key.
// The ctx scopes the delete, and scope, scopeID, and key identify the record.
// Returns an error if the memory record cannot be deleted.
DeleteMemory(ctx context.Context, scope, scopeID, key string) error
// ListMemory lists memory records within a scope and scope identifier.
// The ctx scopes the query, and scope and scopeID identify the memory namespace.
// Returns matching memory records or an error if the query fails.
ListMemory(ctx context.Context, scope, scopeID string) ([]*types.Memory, error)
// SetVector stores or replaces a vector record.
// The ctx scopes the write, and record contains the vector data to persist.
// Returns an error if the vector record cannot be saved.
SetVector(ctx context.Context, record *types.VectorRecord) error
// GetVector retrieves a vector record by scope, scope ID, and key.
// The ctx scopes the read, and scope, scopeID, and key identify the vector entry.
// Returns the vector record or an error if it is missing or unreadable.
GetVector(ctx context.Context, scope, scopeID, key string) (*types.VectorRecord, error)
// DeleteVector removes a vector record by scope, scope ID, and key.
// The ctx scopes the delete, and scope, scopeID, and key identify the vector entry.
// Returns an error if the vector record cannot be deleted.
DeleteVector(ctx context.Context, scope, scopeID, key string) error
// DeleteVectorsByPrefix removes vector records that share a key prefix.
// The ctx scopes the delete, and scope, scopeID, and prefix define the target set.
// Returns the number deleted or an error if the operation fails.
DeleteVectorsByPrefix(ctx context.Context, scope, scopeID, prefix string) (int, error)
// SimilaritySearch finds the closest vector matches for an embedding query.
// The ctx scopes the search, scope and scopeID choose the namespace, and queryEmbedding, topK, and filters shape results.
// Returns ranked matches or an error if the search fails.
SimilaritySearch(ctx context.Context, scope, scopeID string, queryEmbedding []float32, topK int, filters map[string]interface{}) ([]*types.VectorSearchResult, error)
// Event operations
// StoreEvent persists a memory change event.
// The ctx scopes the write, and event contains the change event data to record.
// Returns an error if the event cannot be stored.
StoreEvent(ctx context.Context, event *types.MemoryChangeEvent) error
// GetEventHistory retrieves memory change events matching a filter.
// The ctx scopes the query, and filter defines which events to include.
// Returns matching events or an error if the query fails.
GetEventHistory(ctx context.Context, filter types.EventFilter) ([]*types.MemoryChangeEvent, error)
// Distributed Lock operations
// AcquireLock attempts to create a distributed lock for a key.
// The ctx scopes the request, key identifies the lock, and timeout sets its lease duration.
// Returns the acquired lock or an error if the lock cannot be obtained.
AcquireLock(ctx context.Context, key string, timeout time.Duration) (*types.DistributedLock, error)
// ReleaseLock releases a distributed lock by lock identifier.
// The ctx scopes the request, and lockID identifies the held lock to release.
// Returns an error if the lock cannot be released.
ReleaseLock(ctx context.Context, lockID string) error
// RenewLock extends the lease for an existing distributed lock.
// The ctx scopes the request, and lockID identifies the lock to renew.
// Returns the renewed lock state or an error if renewal fails.
RenewLock(ctx context.Context, lockID string) (*types.DistributedLock, error)
// GetLockStatus retrieves the current state of a distributed lock key.
// The ctx scopes the lookup, and key identifies the lock to inspect.
// Returns the lock status or an error if it cannot be read.
GetLockStatus(ctx context.Context, key string) (*types.DistributedLock, error)
// Agent registry
// RegisterAgent stores agent metadata in the registry.
// The ctx scopes the write, and agent contains the agent node information to persist.
// Returns an error if the agent cannot be registered.
RegisterAgent(ctx context.Context, agent *types.AgentNode) error
// GetAgent retrieves the current agent record by identifier.
// The ctx scopes the read, and id identifies the agent to load.
// Returns the agent record or an error if it is missing or unreadable.
GetAgent(ctx context.Context, id string) (*types.AgentNode, error)
// GetAgentVersion retrieves a specific version of an agent record.
// The ctx scopes the read, and id and version identify the versioned agent entry.
// Returns the agent version or an error if it is missing or unreadable.
GetAgentVersion(ctx context.Context, id string, version string) (*types.AgentNode, error)
// DeleteAgentVersion removes a specific agent version from the registry.
// The ctx scopes the delete, and id and version identify the versioned agent entry.
// Returns an error if the agent version cannot be deleted.
DeleteAgentVersion(ctx context.Context, id string, version string) error
// ListAgentVersions lists all stored versions for an agent.
// The ctx scopes the query, and id identifies the agent whose versions to return.
// Returns version records or an error if the query fails.
ListAgentVersions(ctx context.Context, id string) ([]*types.AgentNode, error)
// ListAgents lists registered agents matching the provided filters.
// The ctx scopes the query, and filters define which agents to include.
// Returns matching agents or an error if the query fails.
ListAgents(ctx context.Context, filters types.AgentFilters) ([]*types.AgentNode, error)
// ListAgentsByGroup lists agents assigned to a group.
// The ctx scopes the query, and groupID identifies the group to inspect.
// Returns matching agents or an error if the query fails.
ListAgentsByGroup(ctx context.Context, groupID string) ([]*types.AgentNode, error)
// ListAgentGroups lists agent group summaries for a team.
// The ctx scopes the query, and teamID identifies the team to inspect.
// Returns group summaries or an error if the query fails.
ListAgentGroups(ctx context.Context, teamID string) ([]types.AgentGroupSummary, error)
// UpdateAgentHealth records a new health status for an agent.
// The ctx scopes the update, and id and status identify the agent and new health state.
// Returns an error if the health status cannot be updated.
UpdateAgentHealth(ctx context.Context, id string, status types.HealthStatus) error
// UpdateAgentHealthAtomic updates agent health when the last heartbeat matches expectations.
// The ctx scopes the update, and id, status, and expectedLastHeartbeat control the conditional write.
// Returns an error if the conditional health update fails.
UpdateAgentHealthAtomic(ctx context.Context, id string, status types.HealthStatus, expectedLastHeartbeat *time.Time) error
// UpdateAgentHeartbeat records a heartbeat timestamp for an agent version.
// The ctx scopes the update, and id, version, and heartbeatTime identify the heartbeat to save.
// Returns an error if the heartbeat cannot be updated.
UpdateAgentHeartbeat(ctx context.Context, id string, version string, heartbeatTime time.Time) error
// UpdateAgentLifecycleStatus updates the lifecycle status of an agent.
// The ctx scopes the update, and id and status identify the agent and new lifecycle state.
// Returns an error if the lifecycle status cannot be updated.
UpdateAgentLifecycleStatus(ctx context.Context, id string, status types.AgentLifecycleStatus) error
// UpdateAgentVersion marks which version is active for an agent.
// The ctx scopes the update, and id and version identify the agent and selected version.
// Returns an error if the active version cannot be updated.
UpdateAgentVersion(ctx context.Context, id string, version string) error
// UpdateAgentTrafficWeight changes the traffic weight assigned to an agent version.
// The ctx scopes the update, and id, version, and weight identify the target routing change.
// Returns an error if the traffic weight cannot be updated.
UpdateAgentTrafficWeight(ctx context.Context, id string, version string, weight int) error
// Configuration Storage (database-backed config files)
// SetConfig stores a configuration value under a key.
// The ctx scopes the write, and key, value, and updatedBy describe the config change.
// Returns an error if the configuration cannot be saved.
SetConfig(ctx context.Context, key string, value string, updatedBy string) error
// GetConfig retrieves a stored configuration entry by key.
// The ctx scopes the read, and key identifies the configuration entry to load.
// Returns the configuration entry or an error if it is missing or unreadable.
GetConfig(ctx context.Context, key string) (*ConfigEntry, error)
// ListConfigs returns all stored configuration entries.
// The ctx scopes the query and does not require additional parameters.
// Returns configuration entries or an error if the query fails.
ListConfigs(ctx context.Context) ([]*ConfigEntry, error)
// DeleteConfig removes a stored configuration entry by key.
// The ctx scopes the delete, and key identifies the configuration entry to remove.
// Returns an error if the configuration cannot be deleted.
DeleteConfig(ctx context.Context, key string) error
// Reasoner Performance and History
// GetReasonerPerformanceMetrics retrieves aggregated performance metrics for a reasoner.
// The ctx scopes the read, and reasonerID identifies the reasoner to inspect.
// Returns performance metrics or an error if they cannot be loaded.
GetReasonerPerformanceMetrics(ctx context.Context, reasonerID string) (*types.ReasonerPerformanceMetrics, error)
// GetReasonerExecutionHistory retrieves paginated execution history for a reasoner.
// The ctx scopes the query, and reasonerID, page, and limit define which history page to load.
// Returns execution history or an error if the query fails.
GetReasonerExecutionHistory(ctx context.Context, reasonerID string, page, limit int) (*types.ReasonerExecutionHistory, error)
// Agent Configuration Management
// StoreAgentConfiguration persists an agent configuration record.
// The ctx scopes the write, and config contains the configuration to store.
// Returns an error if the configuration cannot be saved.
StoreAgentConfiguration(ctx context.Context, config *types.AgentConfiguration) error
// GetAgentConfiguration retrieves configuration for an agent and package pair.
// The ctx scopes the read, and agentID and packageID identify the configuration to load.
// Returns the agent configuration or an error if it is missing or unreadable.
GetAgentConfiguration(ctx context.Context, agentID, packageID string) (*types.AgentConfiguration, error)
// QueryAgentConfigurations lists agent configurations that match provided filters.
// The ctx scopes the query, and filters define which configurations to include.
// Returns matching configurations or an error if the query fails.
QueryAgentConfigurations(ctx context.Context, filters types.ConfigurationFilters) ([]*types.AgentConfiguration, error)
// UpdateAgentConfiguration replaces stored state for an agent configuration.
// The ctx scopes the write, and config contains the updated configuration values.
// Returns an error if the configuration cannot be updated.
UpdateAgentConfiguration(ctx context.Context, config *types.AgentConfiguration) error
// DeleteAgentConfiguration removes configuration for an agent and package pair.
// The ctx scopes the delete, and agentID and packageID identify the configuration to remove.
// Returns an error if the configuration cannot be deleted.
DeleteAgentConfiguration(ctx context.Context, agentID, packageID string) error
// ValidateAgentConfiguration validates configuration data for an agent package.
// The ctx scopes the check, and agentID, packageID, and config identify the configuration to validate.
// Returns validation results or an error if validation cannot be performed.
ValidateAgentConfiguration(ctx context.Context, agentID, packageID string, config map[string]interface{}) (*types.ConfigurationValidationResult, error)
// Agent Package Management
// StoreAgentPackage persists an agent package record.
// The ctx scopes the write, and pkg contains the package metadata to store.
// Returns an error if the package cannot be saved.
StoreAgentPackage(ctx context.Context, pkg *types.AgentPackage) error
// GetAgentPackage retrieves an agent package by identifier.
// The ctx scopes the read, and packageID identifies the package to load.
// Returns the package record or an error if it is missing or unreadable.
GetAgentPackage(ctx context.Context, packageID string) (*types.AgentPackage, error)
// QueryAgentPackages lists agent packages matching the provided filters.
// The ctx scopes the query, and filters define which packages to include.
// Returns matching packages or an error if the query fails.
QueryAgentPackages(ctx context.Context, filters types.PackageFilters) ([]*types.AgentPackage, error)
// UpdateAgentPackage replaces stored state for an agent package.
// The ctx scopes the write, and pkg contains the updated package metadata.
// Returns an error if the package cannot be updated.
UpdateAgentPackage(ctx context.Context, pkg *types.AgentPackage) error
// DeleteAgentPackage removes an agent package by identifier.
// The ctx scopes the delete, and packageID identifies the package to remove.
// Returns an error if the package cannot be deleted.
DeleteAgentPackage(ctx context.Context, packageID string) error
// Real-time features (optional, may be handled by CacheProvider)
// SubscribeToMemoryChanges opens a stream of memory change events for a scope.
// The ctx scopes the subscription, and scope and scopeID identify the memory namespace to watch.
// Returns an event channel or an error if subscription setup fails.
SubscribeToMemoryChanges(ctx context.Context, scope, scopeID string) (<-chan types.MemoryChangeEvent, error)
// PublishMemoryChange broadcasts a memory change event to subscribers.
// The ctx scopes the publish, and event contains the change notification to send.
// Returns an error if the event cannot be published.
PublishMemoryChange(ctx context.Context, event types.MemoryChangeEvent) error
// Execution event bus for real-time updates
// GetExecutionEventBus returns the in-process event bus for execution updates.
// This method takes no parameters and exposes the shared execution event bus.
// Returns the execution event bus pointer.
GetExecutionEventBus() *events.ExecutionEventBus
// GetWorkflowExecutionEventBus returns the in-process event bus for workflow execution updates.
// This method takes no parameters and exposes the shared workflow execution event bus.
// Returns the workflow execution event bus pointer.
GetWorkflowExecutionEventBus() *events.EventBus[*types.WorkflowExecutionEvent]
GetExecutionLogEventBus() *events.EventBus[*types.ExecutionLogEntry]
// DID Registry operations
// StoreDID persists DID registry data for a decentralized identifier.
// The ctx scopes the write, and the DID, document, key, and derivation parameters describe the record to store.
// Returns an error if the DID record cannot be saved.
StoreDID(ctx context.Context, did string, didDocument, publicKey, privateKeyRef, derivationPath string) error
// GetDID retrieves a DID registry entry by decentralized identifier.
// The ctx scopes the read, and did identifies the registry entry to load.
// Returns the DID registry entry or an error if it is missing or unreadable.
GetDID(ctx context.Context, did string) (*types.DIDRegistryEntry, error)
// ListDIDs lists all stored DID registry entries.
// The ctx scopes the query and does not require additional parameters.
// Returns DID registry entries or an error if the query fails.
ListDIDs(ctx context.Context) ([]*types.DIDRegistryEntry, error)
// AgentField Server DID operations
// StoreAgentFieldServerDID stores DID material for an AgentField server.
// The ctx scopes the write, and the server ID, DID, seed, and rotation timestamps describe the stored record.
// Returns an error if the server DID cannot be saved.
StoreAgentFieldServerDID(ctx context.Context, agentfieldServerID, rootDID string, masterSeed []byte, createdAt, lastKeyRotation time.Time) error
// GetAgentFieldServerDID retrieves DID information for an AgentField server.
// The ctx scopes the read, and agentfieldServerID identifies the server record to load.
// Returns the server DID info or an error if it is missing or unreadable.
GetAgentFieldServerDID(ctx context.Context, agentfieldServerID string) (*types.AgentFieldServerDIDInfo, error)
// ListAgentFieldServerDIDs lists stored AgentField server DID records.
// The ctx scopes the query and does not require additional parameters.
// Returns server DID records or an error if the query fails.
ListAgentFieldServerDIDs(ctx context.Context) ([]*types.AgentFieldServerDIDInfo, error)
// Agent DID operations
// StoreAgentDID stores DID information for an agent.
// The ctx scopes the write, and the agent, server, key, and derivation parameters describe the DID record.
// Returns an error if the agent DID cannot be saved.
StoreAgentDID(ctx context.Context, agentID, agentDID, agentfieldServerDID, publicKeyJWK string, derivationIndex int) error
// GetAgentDID retrieves DID information for an agent.
// The ctx scopes the read, and agentID identifies the agent DID record to load.
// Returns the agent DID info or an error if it is missing or unreadable.
GetAgentDID(ctx context.Context, agentID string) (*types.AgentDIDInfo, error)
// ListAgentDIDs lists stored DID records for agents.
// The ctx scopes the query and does not require additional parameters.
// Returns agent DID records or an error if the query fails.
ListAgentDIDs(ctx context.Context) ([]*types.AgentDIDInfo, error)
// Component DID operations
// StoreComponentDID stores DID information for a component.
// The ctx scopes the write, and the component, agent, naming, and derivation parameters describe the DID record.
// Returns an error if the component DID cannot be saved.
StoreComponentDID(ctx context.Context, componentID, componentDID, agentDID, componentType, componentName string, derivationIndex int) error
// GetComponentDID retrieves DID information for a component.
// The ctx scopes the read, and componentID identifies the component DID record to load.
// Returns the component DID info or an error if it is missing or unreadable.
GetComponentDID(ctx context.Context, componentID string) (*types.ComponentDIDInfo, error)
// ListComponentDIDs lists component DID records associated with an agent DID.
// The ctx scopes the query, and agentDID identifies the component DID owner to inspect.
// Returns component DID records or an error if the query fails.
ListComponentDIDs(ctx context.Context, agentDID string) ([]*types.ComponentDIDInfo, error)
// Multi-step DID operations with transaction safety
// StoreAgentDIDWithComponents stores an agent DID and its component DIDs atomically.
// The ctx scopes the transaction, and the agent, server, key, derivation, and components parameters define the stored records.
// Returns an error if any part of the transaction fails.
StoreAgentDIDWithComponents(ctx context.Context, agentID, agentDID, agentfieldServerDID, publicKeyJWK string, derivationIndex int, components []ComponentDIDRequest) error
// Execution VC operations
// StoreExecutionVC stores verifiable credential data for an execution.
// The ctx scopes the write, and the identifiers, hashes, document, signature, and storage fields describe the VC record.
// Returns an error if the execution VC cannot be saved.
StoreExecutionVC(ctx context.Context, vcID, executionID, workflowID, sessionID, issuerDID, targetDID, callerDID, inputHash, outputHash, status string, vcDocument []byte, signature string, storageURI string, documentSizeBytes int64) error
// GetExecutionVC retrieves execution verifiable credential data by VC identifier.
// The ctx scopes the read, and vcID identifies the execution VC record to load.
// Returns the execution VC info or an error if it is missing or unreadable.
GetExecutionVC(ctx context.Context, vcID string) (*types.ExecutionVCInfo, error)
// ListExecutionVCs lists execution verifiable credentials matching the provided filters.
// The ctx scopes the query, and filters define which execution VC records to include.
// Returns matching execution VC records or an error if the query fails.
ListExecutionVCs(ctx context.Context, filters types.VCFilters) ([]*types.ExecutionVCInfo, error)
// ListWorkflowVCStatusSummaries aggregates workflow VC status for workflow identifiers.
// The ctx scopes the query, and workflowIDs identifies the workflows to summarize.
// Returns workflow VC status summaries or an error if aggregation fails.
ListWorkflowVCStatusSummaries(ctx context.Context, workflowIDs []string) ([]*types.WorkflowVCStatusAggregation, error)
// CountExecutionVCs counts execution VC records that match the provided filters.
// The ctx scopes the query, and filters define which execution VC records to count.
// Returns the count or an error if the query fails.
CountExecutionVCs(ctx context.Context, filters types.VCFilters) (int, error)
// Workflow VC operations
// StoreWorkflowVC stores verifiable credential data for a workflow.
// The ctx scopes the write, and the workflow, status, timing, component, and storage parameters describe the VC record.
// Returns an error if the workflow VC cannot be saved.
StoreWorkflowVC(ctx context.Context, workflowVCID, workflowID, sessionID string, componentVCIDs []string, status string, startTime, endTime *time.Time, totalSteps, completedSteps int, storageURI string, documentSizeBytes int64) error
// GetWorkflowVC retrieves workflow verifiable credential data by VC identifier.
// The ctx scopes the read, and workflowVCID identifies the workflow VC record to load.
// Returns the workflow VC info or an error if it is missing or unreadable.
GetWorkflowVC(ctx context.Context, workflowVCID string) (*types.WorkflowVCInfo, error)
// ListWorkflowVCs lists workflow verifiable credentials for a workflow.
// The ctx scopes the query, and workflowID identifies the workflow whose VC records to return.
// Returns matching workflow VC records or an error if the query fails.
ListWorkflowVCs(ctx context.Context, workflowID string) ([]*types.WorkflowVCInfo, error)
// Observability Webhook configuration (singleton pattern)
// GetObservabilityWebhook retrieves the singleton observability webhook configuration.
// The ctx scopes the read and does not require additional parameters.
// Returns the webhook configuration or an error if it is missing or unreadable.
GetObservabilityWebhook(ctx context.Context) (*types.ObservabilityWebhookConfig, error)
// SetObservabilityWebhook stores the singleton observability webhook configuration.
// The ctx scopes the write, and config contains the webhook configuration to persist.
// Returns an error if the configuration cannot be saved.
SetObservabilityWebhook(ctx context.Context, config *types.ObservabilityWebhookConfig) error
// DeleteObservabilityWebhook removes the singleton observability webhook configuration.
// The ctx scopes the delete and does not require additional parameters.
// Returns an error if the configuration cannot be deleted.
DeleteObservabilityWebhook(ctx context.Context) error
// Observability Dead Letter Queue
// AddToDeadLetterQueue stores a failed observability event for later inspection.
// The ctx scopes the write, and event, errorMessage, and retryCount describe the failed delivery.
// Returns an error if the dead-letter entry cannot be saved.
AddToDeadLetterQueue(ctx context.Context, event *types.ObservabilityEvent, errorMessage string, retryCount int) error
// GetDeadLetterQueueCount returns the number of stored dead-letter entries.
// The ctx scopes the read and does not require additional parameters.
// Returns the entry count or an error if it cannot be determined.
GetDeadLetterQueueCount(ctx context.Context) (int64, error)
// GetDeadLetterQueue retrieves paginated dead-letter entries.
// The ctx scopes the query, and limit and offset define which entries to return.
// Returns dead-letter entries or an error if the query fails.
GetDeadLetterQueue(ctx context.Context, limit, offset int) ([]types.ObservabilityDeadLetterEntry, error)
// DeleteFromDeadLetterQueue removes selected dead-letter entries by identifier.
// The ctx scopes the delete, and ids lists the entries to remove.
// Returns an error if the entries cannot be deleted.
DeleteFromDeadLetterQueue(ctx context.Context, ids []int64) error
// ClearDeadLetterQueue removes all stored dead-letter entries.
// The ctx scopes the delete and does not require additional parameters.
// Returns an error if the queue cannot be cleared.
ClearDeadLetterQueue(ctx context.Context) error
// Access policy operations (tag-based authorization)
// GetAccessPolicies lists all stored access policies.
// The ctx scopes the query and does not require additional parameters.
// Returns access policies or an error if the query fails.
GetAccessPolicies(ctx context.Context) ([]*types.AccessPolicy, error)
// GetAccessPolicyByID retrieves an access policy by identifier.
// The ctx scopes the read, and id identifies the access policy to load.
// Returns the access policy or an error if it is missing or unreadable.
GetAccessPolicyByID(ctx context.Context, id int64) (*types.AccessPolicy, error)
// CreateAccessPolicy stores a new access policy.
// The ctx scopes the write, and policy contains the access policy data to persist.
// Returns an error if the policy cannot be created.
CreateAccessPolicy(ctx context.Context, policy *types.AccessPolicy) error
// UpdateAccessPolicy replaces stored state for an access policy.
// The ctx scopes the write, and policy contains the updated policy data.
// Returns an error if the policy cannot be updated.
UpdateAccessPolicy(ctx context.Context, policy *types.AccessPolicy) error
// DeleteAccessPolicy removes an access policy by identifier.
// The ctx scopes the delete, and id identifies the access policy to remove.
// Returns an error if the policy cannot be deleted.
DeleteAccessPolicy(ctx context.Context, id int64) error
// Agent Tag VC operations (tag-based PermissionVC)
// StoreAgentTagVC stores a tag-based verifiable credential for an agent.
// The ctx scopes the write, and the agent, credential, signature, and timing parameters describe the VC record.
// Returns an error if the agent tag VC cannot be saved.
StoreAgentTagVC(ctx context.Context, agentID, agentDID, vcID, vcDocument, signature string, issuedAt time.Time, expiresAt *time.Time) error
// GetAgentTagVC retrieves the tag-based verifiable credential for an agent.
// The ctx scopes the read, and agentID identifies the agent VC record to load.
// Returns the agent tag VC record or an error if it is missing or unreadable.
GetAgentTagVC(ctx context.Context, agentID string) (*types.AgentTagVCRecord, error)
// ListAgentTagVCs lists all stored agent tag verifiable credentials.
// The ctx scopes the query and does not require additional parameters.
// Returns agent tag VC records or an error if the query fails.
ListAgentTagVCs(ctx context.Context) ([]*types.AgentTagVCRecord, error)
// RevokeAgentTagVC revokes the tag-based verifiable credential for an agent.
// The ctx scopes the update, and agentID identifies the agent VC record to revoke.
// Returns an error if the credential cannot be revoked.
RevokeAgentTagVC(ctx context.Context, agentID string) error
// DID Document operations (did:web resolution)
// StoreDIDDocument persists a DID document record.
// The ctx scopes the write, and record contains the DID document data to store.
// Returns an error if the DID document cannot be saved.
StoreDIDDocument(ctx context.Context, record *types.DIDDocumentRecord) error
// GetDIDDocument retrieves a DID document by DID string.
// The ctx scopes the read, and did identifies the DID document to load.
// Returns the DID document record or an error if it is missing or unreadable.
GetDIDDocument(ctx context.Context, did string) (*types.DIDDocumentRecord, error)
// GetDIDDocumentByAgentID retrieves a DID document associated with an agent.
// The ctx scopes the read, and agentID identifies the agent-linked DID document to load.
// Returns the DID document record or an error if it is missing or unreadable.
GetDIDDocumentByAgentID(ctx context.Context, agentID string) (*types.DIDDocumentRecord, error)
// RevokeDIDDocument revokes a DID document by DID string.
// The ctx scopes the update, and did identifies the DID document to revoke.
// Returns an error if the DID document cannot be revoked.
RevokeDIDDocument(ctx context.Context, did string) error
// ListDIDDocuments lists all stored DID document records.
// The ctx scopes the query and does not require additional parameters.
// Returns DID document records or an error if the query fails.
ListDIDDocuments(ctx context.Context) ([]*types.DIDDocumentRecord, error)
// Agent lifecycle queries (tag approval workflow)
// ListAgentsByLifecycleStatus lists agents currently in a lifecycle status.
// The ctx scopes the query, and status identifies which lifecycle state to filter by.
// Returns matching agents or an error if the query fails.
ListAgentsByLifecycleStatus(ctx context.Context, status types.AgentLifecycleStatus) ([]*types.AgentNode, error)
}
StorageProvider is the interface for the primary data storage backend.
type Transaction ¶
type Transaction interface {
StorageProvider
Commit() error
Rollback() error
}
Transaction represents a database transaction.
type UnitOfWork ¶
type UnitOfWork interface {
// Entity registration
RegisterNew(entity interface{}, table string, operation func(DBTX) error)
RegisterDirty(entity interface{}, table string, operation func(DBTX) error)
RegisterDeleted(id interface{}, table string, operation func(DBTX) error)
// Transaction management
Commit() error
Rollback() error
// State inspection
HasChanges() bool
GetChangeCount() int
IsActive() bool
}
UnitOfWork manages a collection of changes as a single transaction
func NewUnitOfWork ¶
func NewUnitOfWork(db *sqlDatabase, backend unitOfWorkBackend) UnitOfWork
NewUnitOfWork creates a new unit of work instance
type ValidationError ¶
ValidationError represents a pre-storage validation failure
func (*ValidationError) Error ¶
func (e *ValidationError) Error() string
type VectorDistanceMetric ¶
type VectorDistanceMetric string
const ( VectorDistanceCosine VectorDistanceMetric = "cosine" VectorDistanceDot VectorDistanceMetric = "dot" VectorDistanceL2 VectorDistanceMetric = "l2" )
type VectorStoreConfig ¶
type VectorStoreConfig struct {
Enabled *bool `yaml:"enabled" mapstructure:"enabled"`
Distance string `yaml:"distance" mapstructure:"distance"`
}
VectorStoreConfig controls vector storage behavior.
type WorkflowExecutionEventModel ¶
type WorkflowExecutionEventModel struct {
EventID int64 `gorm:"column:event_id;primaryKey;autoIncrement"`
ExecutionID string `gorm:"column:execution_id;not null;index:idx_workflow_exec_events_execution,priority:1"`
WorkflowID string `gorm:"column:workflow_id;not null"`
RunID *string `gorm:"column:run_id;index:idx_workflow_exec_events_run,priority:1"`
ParentExecutionID *string `gorm:"column:parent_execution_id"`
Sequence int64 `gorm:"column:sequence;not null;index:idx_workflow_exec_events_execution,priority:2"`
PreviousSequence int64 `gorm:"column:previous_sequence;not null;default:0"`
EventType string `gorm:"column:event_type;not null"`
Status *string `gorm:"column:status"`
StatusReason *string `gorm:"column:status_reason"`
Payload string `gorm:"column:payload;default:'{}'"`
EmittedAt time.Time `gorm:"column:emitted_at;not null"`
RecordedAt time.Time `gorm:"column:recorded_at;autoCreateTime"`
}
func (WorkflowExecutionEventModel) TableName ¶
func (WorkflowExecutionEventModel) TableName() string
type WorkflowExecutionModel ¶
type WorkflowExecutionModel struct {
ID int64 `gorm:"column:id;primaryKey;autoIncrement"`
WorkflowID string `gorm:"column:workflow_id;not null;index;index:idx_workflow_executions_workflow_status,priority:1"`
ExecutionID string `gorm:"column:execution_id;not null;uniqueIndex"`
AgentFieldRequestID string `gorm:"column:agentfield_request_id;not null;index"`
RunID *string `gorm:"column:run_id;index"`
SessionID *string `` /* 201-byte string literal not displayed */
ActorID *string `` /* 193-byte string literal not displayed */
AgentNodeID string `` /* 160-byte string literal not displayed */
ParentWorkflowID *string `gorm:"column:parent_workflow_id;index"`
ParentExecutionID *string `gorm:"column:parent_execution_id;index"`
RootWorkflowID *string `gorm:"column:root_workflow_id;index"`
WorkflowDepth int `gorm:"column:workflow_depth;default:0"`
ReasonerID string `gorm:"column:reasoner_id;not null"`
InputData []byte `gorm:"column:input_data"`
OutputData []byte `gorm:"column:output_data"`
InputSize int `gorm:"column:input_size"`
OutputSize int `gorm:"column:output_size"`
WorkflowName *string `gorm:"column:workflow_name"`
WorkflowTags string `gorm:"column:workflow_tags"`
Status string `` /* 493-byte string literal not displayed */
StartedAt time.Time `` /* 377-byte string literal not displayed */
CompletedAt *time.Time `gorm:"column:completed_at"`
DurationMS int `gorm:"column:duration_ms"`
StateVersion int `gorm:"column:state_version;not null;default:0"`
LastEventSequence int `gorm:"column:last_event_sequence;not null;default:0"`
ActiveChildren int `gorm:"column:active_children;not null;default:0"`
PendingChildren int `gorm:"column:pending_children;not null;default:0"`
PendingTerminalStatus *string `gorm:"column:pending_terminal_status"`
StatusReason *string `gorm:"column:status_reason"`
LeaseOwner *string `gorm:"column:lease_owner"`
LeaseExpiresAt *time.Time `gorm:"column:lease_expires_at"`
ErrorMessage *string `gorm:"column:error_message"`
RetryCount int `gorm:"column:retry_count;default:0"`
ApprovalRequestID *string `gorm:"column:approval_request_id;index:idx_workflow_executions_approval_request_id"`
ApprovalRequestURL *string `gorm:"column:approval_request_url"`
ApprovalStatus *string `gorm:"column:approval_status"`
ApprovalResponse *string `gorm:"column:approval_response"`
ApprovalRequestedAt *time.Time `gorm:"column:approval_requested_at"`
ApprovalRespondedAt *time.Time `gorm:"column:approval_responded_at"`
ApprovalCallbackURL *string `gorm:"column:approval_callback_url"`
ApprovalExpiresAt *time.Time `gorm:"column:approval_expires_at"`
Notes string `gorm:"column:notes;default:'[]'"`
CreatedAt time.Time `gorm:"column:created_at;autoCreateTime"`
UpdatedAt time.Time `gorm:"column:updated_at;autoUpdateTime"`
}
func (WorkflowExecutionModel) TableName ¶
func (WorkflowExecutionModel) TableName() string
type WorkflowModel ¶
type WorkflowModel struct {
WorkflowID string `gorm:"column:workflow_id;primaryKey"`
WorkflowName *string `gorm:"column:workflow_name"`
WorkflowTags string `gorm:"column:workflow_tags"`
SessionID *string `gorm:"column:session_id;index"`
ActorID *string `gorm:"column:actor_id;index"`
ParentWorkflowID *string `gorm:"column:parent_workflow_id"`
ParentExecutionID *string `gorm:"column:parent_execution_id"`
RootWorkflowID *string `gorm:"column:root_workflow_id"`
WorkflowDepth int `gorm:"column:workflow_depth;default:0"`
TotalExecutions int `gorm:"column:total_executions;default:0"`
SuccessfulExecutions int `gorm:"column:successful_executions;default:0"`
FailedExecutions int `gorm:"column:failed_executions;default:0"`
TotalDurationMS int `gorm:"column:total_duration_ms;default:0"`
Status string `gorm:"column:status;not null"`
StartedAt time.Time `gorm:"column:started_at;not null"`
CompletedAt *time.Time `gorm:"column:completed_at"`
CreatedAt time.Time `gorm:"column:created_at;autoCreateTime"`
UpdatedAt time.Time `gorm:"column:updated_at;autoUpdateTime"`
}
func (WorkflowModel) TableName ¶
func (WorkflowModel) TableName() string
type WorkflowRunEventModel ¶
type WorkflowRunEventModel struct {
EventID int64 `gorm:"column:event_id;primaryKey;autoIncrement"`
RunID string `gorm:"column:run_id;not null;index:idx_workflow_run_events_run,priority:1"`
Sequence int64 `gorm:"column:sequence;not null;index:idx_workflow_run_events_run,priority:2"`
PreviousSequence int64 `gorm:"column:previous_sequence;not null;default:0"`
EventType string `gorm:"column:event_type;not null"`
Status *string `gorm:"column:status"`
StatusReason *string `gorm:"column:status_reason"`
Payload string `gorm:"column:payload;default:'{}'"`
EmittedAt time.Time `gorm:"column:emitted_at;not null"`
RecordedAt time.Time `gorm:"column:recorded_at;autoCreateTime"`
}
func (WorkflowRunEventModel) TableName ¶
func (WorkflowRunEventModel) TableName() string
type WorkflowRunModel ¶
type WorkflowRunModel struct {
RunID string `gorm:"column:run_id;primaryKey"`
RootWorkflowID string `gorm:"column:root_workflow_id;not null;index"`
RootExecutionID *string `gorm:"column:root_execution_id"`
Status string `gorm:"column:status;not null;default:'pending';index"`
TotalSteps int `gorm:"column:total_steps;not null;default:0"`
CompletedSteps int `gorm:"column:completed_steps;not null;default:0"`
FailedSteps int `gorm:"column:failed_steps;not null;default:0"`
StateVersion int64 `gorm:"column:state_version;not null;default:0"`
LastEventSequence int64 `gorm:"column:last_event_sequence;not null;default:0"`
Metadata []byte `gorm:"column:metadata;default:'{}'"`
CreatedAt time.Time `gorm:"column:created_at;autoCreateTime;index"`
UpdatedAt time.Time `gorm:"column:updated_at;autoUpdateTime;index"`
CompletedAt *time.Time `gorm:"column:completed_at;index"`
}
func (WorkflowRunModel) TableName ¶
func (WorkflowRunModel) TableName() string
type WorkflowStepModel ¶
type WorkflowStepModel struct {
StepID string `gorm:"column:step_id;primaryKey"`
RunID string `` /* 181-byte string literal not displayed */
ParentStepID *string `gorm:"column:parent_step_id;index"`
ExecutionID *string `gorm:"column:execution_id;index:idx_workflow_steps_run_execution,priority:2"`
AgentNodeID *string `gorm:"column:agent_node_id;index;index:idx_workflow_steps_agent_not_before,priority:1"`
Target *string `gorm:"column:target"`
Status string `` /* 207-byte string literal not displayed */
Attempt int `gorm:"column:attempt;not null;default:0"`
Priority int `gorm:"column:priority;not null;default:0;index:idx_workflow_steps_run_priority,priority:2"`
NotBefore time.Time `` /* 189-byte string literal not displayed */
InputURI *string `gorm:"column:input_uri"`
ResultURI *string `gorm:"column:result_uri"`
ErrorMessage *string `gorm:"column:error_message"`
Metadata []byte `gorm:"column:metadata;default:'{}'"`
StartedAt *time.Time `gorm:"column:started_at"`
CompletedAt *time.Time `gorm:"column:completed_at"`
LeasedAt *time.Time `gorm:"column:leased_at"`
LeaseTimeout *time.Time `gorm:"column:lease_timeout"`
CreatedAt time.Time `gorm:"column:created_at;autoCreateTime;index"`
UpdatedAt time.Time `gorm:"column:updated_at;autoUpdateTime;index"`
}
func (WorkflowStepModel) TableName ¶
func (WorkflowStepModel) TableName() string
type WorkflowUnitOfWork ¶
type WorkflowUnitOfWork interface {
UnitOfWork
// High-level workflow operations
StoreWorkflowWithExecution(ctx context.Context, workflow *types.Workflow, execution *types.WorkflowExecution) error
UpdateWorkflowStatus(ctx context.Context, workflowID string, status string, execution *types.WorkflowExecution) error
CompleteWorkflowWithResults(workflowID string, results map[string]interface{}) error
StoreWorkflowWithSession(ctx context.Context, workflow *types.Workflow, session *types.Session) error
}
WorkflowUnitOfWork provides specialized operations for workflow management
func NewWorkflowUnitOfWork ¶
func NewWorkflowUnitOfWork(db *sqlDatabase, backend unitOfWorkBackend) WorkflowUnitOfWork
NewWorkflowUnitOfWork creates a new workflow-specific unit of work instance
type WorkflowVCModel ¶
type WorkflowVCModel struct {
WorkflowVCID string `gorm:"column:workflow_vc_id;primaryKey"`
WorkflowID string `gorm:"column:workflow_id;not null;index"`
SessionID string `gorm:"column:session_id;not null;index"`
ComponentVCIDs string `gorm:"column:component_vc_ids;default:'[]'"`
Status string `gorm:"column:status;not null;default:'pending';index"`
StartTime time.Time `gorm:"column:start_time;autoCreateTime;index"`
EndTime *time.Time `gorm:"column:end_time;index"`
TotalSteps int `gorm:"column:total_steps;default:0"`
CompletedSteps int `gorm:"column:completed_steps;default:0"`
StorageURI string `gorm:"column:storage_uri;default:''"`
DocumentSizeBytes int64 `gorm:"column:document_size_bytes;default:0"`
CreatedAt time.Time `gorm:"column:created_at;autoCreateTime;index"`
UpdatedAt time.Time `gorm:"column:updated_at;autoUpdateTime"`
}
func (WorkflowVCModel) TableName ¶
func (WorkflowVCModel) TableName() string