store

package
v0.0.0-...-dac86b4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2026 License: MIT Imports: 29 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EventExecutionStarted   = "execution.started"
	EventStepStarted        = "step.started"
	EventStepInputRecorded  = "step.input_recorded"
	EventStepOutputRecorded = "step.output_recorded"
	EventStepCompleted      = "step.completed"
	EventStepFailed         = "step.failed"
	EventStepSkipped        = "step.skipped"
	EventStepCompensated    = "step.compensated"
	EventConditionalRouted  = "conditional.routed"
	EventRetryAttempted     = "retry.attempted"
	EventExecutionCompleted = "execution.completed"
	EventExecutionFailed    = "execution.failed"
	EventExecutionCancelled = "execution.cancelled"
	EventSagaCompensating   = "saga.compensating"
	EventSagaCompensated    = "saga.compensated"
)

Variables

View Source
var (
	ErrKeyExpired  = fmt.Errorf("api key expired")
	ErrKeyInactive = fmt.Errorf("api key inactive")
)

Sentinel errors for API key operations.

View Source
var (
	ErrNotFound  = errors.New("not found")
	ErrDuplicate = errors.New("duplicate entry")
	ErrConflict  = errors.New("conflict")
	ErrForbidden = errors.New("forbidden")
)

Sentinel errors for store operations.

ValidRoles is the set of valid role values.

ValidWorkflowStatuses is the set of valid workflow status values.

Functions

This section is empty.

Types

type APIKey

type APIKey struct {
	ID          uuid.UUID  `json:"id"`
	Name        string     `json:"name"`
	KeyHash     string     `json:"-"`          // SHA-256 hash, never exposed
	KeyPrefix   string     `json:"key_prefix"` // first 8 chars for identification
	CompanyID   uuid.UUID  `json:"company_id"`
	OrgID       *uuid.UUID `json:"org_id,omitempty"`     // optional scoping
	ProjectID   *uuid.UUID `json:"project_id,omitempty"` // optional scoping
	Permissions []string   `json:"permissions"`          // e.g., ["read", "write", "admin"]
	CreatedBy   uuid.UUID  `json:"created_by"`
	CreatedAt   time.Time  `json:"created_at"`
	ExpiresAt   *time.Time `json:"expires_at,omitempty"`
	LastUsedAt  *time.Time `json:"last_used_at,omitempty"`
	IsActive    bool       `json:"is_active"`
}

APIKey represents an API key for programmatic access to the platform.

type APIKeyStore

type APIKeyStore interface {
	// Create creates a new API key and returns the raw key (only available at creation time).
	Create(ctx context.Context, key *APIKey) (rawKey string, err error)
	// Get retrieves an API key by ID.
	Get(ctx context.Context, id uuid.UUID) (*APIKey, error)
	// GetByHash retrieves an API key by its hash.
	GetByHash(ctx context.Context, keyHash string) (*APIKey, error)
	// List returns all API keys for a company.
	List(ctx context.Context, companyID uuid.UUID) ([]*APIKey, error)
	// Delete removes an API key by ID.
	Delete(ctx context.Context, id uuid.UUID) error
	// UpdateLastUsed updates the last_used_at timestamp for an API key.
	UpdateLastUsed(ctx context.Context, id uuid.UUID) error
	// Validate hashes the raw key and looks up the corresponding API key.
	// Returns ErrNotFound if no matching key exists, or ErrKeyExpired / ErrKeyInactive
	// for keys that exist but cannot be used.
	Validate(ctx context.Context, rawKey string) (*APIKey, error)
}

APIKeyStore defines persistence operations for API keys.

type AuditEntry

type AuditEntry struct {
	ID           int64           `json:"id"`
	UserID       *uuid.UUID      `json:"user_id,omitempty"`
	Action       string          `json:"action"`
	ResourceType string          `json:"resource_type"`
	ResourceID   *uuid.UUID      `json:"resource_id,omitempty"`
	Details      json.RawMessage `json:"details,omitempty"`
	IPAddress    string          `json:"ip_address,omitempty"`
	UserAgent    string          `json:"user_agent,omitempty"`
	CreatedAt    time.Time       `json:"created_at"`
}

AuditEntry represents an entry in the audit log.

type AuditFilter

type AuditFilter struct {
	UserID       *uuid.UUID
	Action       string
	ResourceType string
	ResourceID   *uuid.UUID
	Since        *time.Time
	Until        *time.Time
	Pagination   Pagination
}

AuditFilter specifies criteria for querying audit entries.

type AuditStore

type AuditStore interface {
	// Record adds an audit entry.
	Record(ctx context.Context, e *AuditEntry) error
	// Query returns audit entries matching the filter.
	Query(ctx context.Context, f AuditFilter) ([]*AuditEntry, error)
}

AuditStore defines persistence operations for audit log entries.

type AuthSessionStoreAdapter

type AuthSessionStoreAdapter struct {
	// contains filtered or unexported fields
}

AuthSessionStoreAdapter adapts a store.SessionStore to the auth.SessionStore interface.

func NewAuthSessionStoreAdapter

func NewAuthSessionStoreAdapter(store SessionStore) *AuthSessionStoreAdapter

NewAuthSessionStoreAdapter creates a new adapter.

func (*AuthSessionStoreAdapter) Cleanup

func (a *AuthSessionStoreAdapter) Cleanup(ctx context.Context) error

func (*AuthSessionStoreAdapter) Delete

func (a *AuthSessionStoreAdapter) Delete(ctx context.Context, sessionID string) error

func (*AuthSessionStoreAdapter) Get

func (a *AuthSessionStoreAdapter) Get(ctx context.Context, sessionID string) (*auth.Session, error)

func (*AuthSessionStoreAdapter) Store

func (a *AuthSessionStoreAdapter) Store(ctx context.Context, session *auth.Session) error

type AuthUserStoreAdapter

type AuthUserStoreAdapter struct {
	// contains filtered or unexported fields
}

AuthUserStoreAdapter adapts a store.UserStore to the auth.UserStore interface.

func NewAuthUserStoreAdapter

func NewAuthUserStoreAdapter(store UserStore) *AuthUserStoreAdapter

NewAuthUserStoreAdapter creates a new adapter.

func (*AuthUserStoreAdapter) CreateUser

func (a *AuthUserStoreAdapter) CreateUser(ctx context.Context, user *auth.User) error

func (*AuthUserStoreAdapter) DeleteUser

func (a *AuthUserStoreAdapter) DeleteUser(ctx context.Context, userID string) error

func (*AuthUserStoreAdapter) GetUser

func (a *AuthUserStoreAdapter) GetUser(ctx context.Context, userID string) (*auth.User, error)

func (*AuthUserStoreAdapter) GetUserByEmail

func (a *AuthUserStoreAdapter) GetUserByEmail(ctx context.Context, email string) (*auth.User, error)

func (*AuthUserStoreAdapter) UpdateUser

func (a *AuthUserStoreAdapter) UpdateUser(ctx context.Context, user *auth.User) error

type BackfillMockDiffHandler

type BackfillMockDiffHandler struct {
	// contains filtered or unexported fields
}

BackfillMockDiffHandler provides HTTP endpoints for backfill, step mock, and execution diff management.

func NewBackfillMockDiffHandler

func NewBackfillMockDiffHandler(
	backfillStore BackfillStore,
	mockStore StepMockStore,
	diffCalc *DiffCalculator,
	logger *slog.Logger,
) *BackfillMockDiffHandler

NewBackfillMockDiffHandler creates a new handler with the given stores and calculator.

func (*BackfillMockDiffHandler) RegisterRoutes

func (h *BackfillMockDiffHandler) RegisterRoutes(mux *http.ServeMux)

RegisterRoutes registers all backfill, mock, and diff API routes on the given mux.

type BackfillRequest

type BackfillRequest struct {
	ID           uuid.UUID      `json:"id"`
	PipelineName string         `json:"pipeline_name"`
	SourceQuery  string         `json:"source_query"`
	StartTime    *time.Time     `json:"start_time,omitempty"`
	EndTime      *time.Time     `json:"end_time,omitempty"`
	Status       BackfillStatus `json:"status"`
	CreatedAt    time.Time      `json:"created_at"`
	CompletedAt  *time.Time     `json:"completed_at,omitempty"`
	TotalEvents  int64          `json:"total_events"`
	Processed    int64          `json:"processed"`
	Failed       int64          `json:"failed"`
	ErrorMsg     string         `json:"error_message,omitempty"`
}

BackfillRequest defines a request to replay historical events through a pipeline.

type BackfillStatus

type BackfillStatus string

BackfillStatus represents the status of a backfill operation.

const (
	BackfillStatusPending   BackfillStatus = "pending"
	BackfillStatusRunning   BackfillStatus = "running"
	BackfillStatusCompleted BackfillStatus = "completed"
	BackfillStatusFailed    BackfillStatus = "failed"
	BackfillStatusCancelled BackfillStatus = "cancelled"
)

type BackfillStore

type BackfillStore interface {
	// Create inserts a new backfill request.
	Create(ctx context.Context, req *BackfillRequest) error
	// Get retrieves a backfill request by ID.
	Get(ctx context.Context, id uuid.UUID) (*BackfillRequest, error)
	// List returns all backfill requests, ordered by creation time descending.
	List(ctx context.Context) ([]*BackfillRequest, error)
	// UpdateProgress updates the processed and failed counts for a backfill request.
	UpdateProgress(ctx context.Context, id uuid.UUID, processed, failed int64) error
	// UpdateStatus sets the status and optional error message for a backfill request.
	UpdateStatus(ctx context.Context, id uuid.UUID, status BackfillStatus, errMsg string) error
	// Cancel cancels a pending or running backfill request.
	Cancel(ctx context.Context, id uuid.UUID) error
}

BackfillStore manages backfill operations.

type Company

type Company struct {
	ID        uuid.UUID       `json:"id"`
	Name      string          `json:"name"`
	Slug      string          `json:"slug"`
	OwnerID   uuid.UUID       `json:"owner_id"`
	Metadata  json.RawMessage `json:"metadata,omitempty"`
	CreatedAt time.Time       `json:"created_at"`
	UpdatedAt time.Time       `json:"updated_at"`
}

Company represents a top-level organization or company.

type CompanyFilter

type CompanyFilter struct {
	OwnerID    *uuid.UUID
	Slug       string
	Pagination Pagination
}

CompanyFilter specifies criteria for listing companies.

type CompanyStore

type CompanyStore interface {
	Create(ctx context.Context, c *Company) error
	Get(ctx context.Context, id uuid.UUID) (*Company, error)
	GetBySlug(ctx context.Context, slug string) (*Company, error)
	Update(ctx context.Context, c *Company) error
	Delete(ctx context.Context, id uuid.UUID) error
	List(ctx context.Context, f CompanyFilter) ([]*Company, error)
	ListForUser(ctx context.Context, userID uuid.UUID) ([]*Company, error)
}

CompanyStore defines persistence operations for companies.

type CrossWorkflowLink struct {
	ID               uuid.UUID       `json:"id"`
	SourceWorkflowID uuid.UUID       `json:"source_workflow_id"`
	TargetWorkflowID uuid.UUID       `json:"target_workflow_id"`
	LinkType         string          `json:"link_type"`
	Config           json.RawMessage `json:"config,omitempty"`
	CreatedBy        uuid.UUID       `json:"created_by"`
	CreatedAt        time.Time       `json:"created_at"`
}

CrossWorkflowLink represents a directed link between two workflows.

type CrossWorkflowLinkFilter

type CrossWorkflowLinkFilter struct {
	SourceWorkflowID *uuid.UUID
	TargetWorkflowID *uuid.UUID
	LinkType         string
	Pagination       Pagination
}

CrossWorkflowLinkFilter specifies criteria for listing cross-workflow links.

type CrossWorkflowLinkStore

type CrossWorkflowLinkStore interface {
	Create(ctx context.Context, l *CrossWorkflowLink) error
	Get(ctx context.Context, id uuid.UUID) (*CrossWorkflowLink, error)
	Delete(ctx context.Context, id uuid.UUID) error
	List(ctx context.Context, f CrossWorkflowLinkFilter) ([]*CrossWorkflowLink, error)
}

CrossWorkflowLinkStore defines persistence operations for cross-workflow links.

type DLQEntry

type DLQEntry struct {
	ID            uuid.UUID       `json:"id"`
	OriginalEvent json.RawMessage `json:"original_event"`
	PipelineName  string          `json:"pipeline_name"`
	StepName      string          `json:"step_name"`
	ErrorMessage  string          `json:"error_message"`
	ErrorType     string          `json:"error_type"`
	RetryCount    int             `json:"retry_count"`
	MaxRetries    int             `json:"max_retries"`
	Status        DLQStatus       `json:"status"`
	CreatedAt     time.Time       `json:"created_at"`
	UpdatedAt     time.Time       `json:"updated_at"`
	ResolvedAt    *time.Time      `json:"resolved_at,omitempty"`
	Metadata      map[string]any  `json:"metadata,omitempty"`
}

DLQEntry represents a failed event/message in the dead letter queue.

type DLQFilter

type DLQFilter struct {
	PipelineName string
	StepName     string
	Status       DLQStatus
	ErrorType    string
	Limit        int
	Offset       int
}

DLQFilter specifies criteria for listing DLQ entries.

type DLQHandler

type DLQHandler struct {
	// contains filtered or unexported fields
}

DLQHandler provides HTTP endpoints for dead letter queue management.

func NewDLQHandler

func NewDLQHandler(store DLQStore, logger *slog.Logger) *DLQHandler

NewDLQHandler creates a new DLQHandler.

func (*DLQHandler) RegisterRoutes

func (h *DLQHandler) RegisterRoutes(mux *http.ServeMux)

RegisterRoutes registers the DLQ API routes on the given mux.

type DLQStatus

type DLQStatus string

DLQStatus represents the status of a dead letter queue entry.

const (
	DLQStatusPending   DLQStatus = "pending"
	DLQStatusRetrying  DLQStatus = "retrying"
	DLQStatusResolved  DLQStatus = "resolved"
	DLQStatusDiscarded DLQStatus = "discarded"
)

type DLQStore

type DLQStore interface {
	// Add inserts a new DLQ entry.
	Add(ctx context.Context, entry *DLQEntry) error
	// Get retrieves a DLQ entry by ID.
	Get(ctx context.Context, id uuid.UUID) (*DLQEntry, error)
	// List returns DLQ entries matching the given filter.
	List(ctx context.Context, filter DLQFilter) ([]*DLQEntry, error)
	// Count returns the number of DLQ entries matching the given filter.
	Count(ctx context.Context, filter DLQFilter) (int64, error)
	// UpdateStatus sets the status of a DLQ entry.
	UpdateStatus(ctx context.Context, id uuid.UUID, status DLQStatus) error
	// Retry increments retry_count and sets status to "retrying".
	Retry(ctx context.Context, id uuid.UUID) error
	// Discard sets the status to "discarded".
	Discard(ctx context.Context, id uuid.UUID) error
	// Resolve sets the status to "resolved" and sets resolved_at.
	Resolve(ctx context.Context, id uuid.UUID) error
	// Purge removes resolved/discarded entries older than the given duration.
	Purge(ctx context.Context, olderThan time.Duration) (int64, error)
}

DLQStore defines persistence operations for dead letter queue entries.

type DiffCalculator

type DiffCalculator struct {
	// contains filtered or unexported fields
}

DiffCalculator computes diffs between executions.

func NewDiffCalculator

func NewDiffCalculator(eventStore EventStore) *DiffCalculator

NewDiffCalculator creates a new DiffCalculator using the given EventStore.

func (*DiffCalculator) Compare

func (d *DiffCalculator) Compare(ctx context.Context, execA, execB uuid.UUID) (*ExecutionDiff, error)

Compare computes a structured diff between two executions.

type DiffSummary

type DiffSummary struct {
	TotalSteps   int `json:"total_steps"`
	SameSteps    int `json:"same_steps"`
	DiffSteps    int `json:"different_steps"`
	AddedSteps   int `json:"added_steps"`
	RemovedSteps int `json:"removed_steps"`
}

DiffSummary provides aggregate counts for an execution diff.

type EventRecorderAdapter

type EventRecorderAdapter struct {
	// contains filtered or unexported fields
}

EventRecorderAdapter wraps an EventStore to satisfy the module.EventRecorder interface used by pipeline execution. The adapter converts string execution IDs to uuid.UUID before delegating to the underlying EventStore.Append method.

func NewEventRecorderAdapter

func NewEventRecorderAdapter(store EventStore) *EventRecorderAdapter

NewEventRecorderAdapter creates an adapter that bridges EventStore to module.EventRecorder.

func (*EventRecorderAdapter) RecordEvent

func (a *EventRecorderAdapter) RecordEvent(ctx context.Context, executionID string, eventType string, data map[string]any) error

RecordEvent implements module.EventRecorder.

type EventStore

type EventStore interface {
	// Append adds a new event to the log for a given execution.
	Append(ctx context.Context, executionID uuid.UUID, eventType string, data map[string]any) error
	// GetEvents returns all events for an execution ordered by sequence number.
	GetEvents(ctx context.Context, executionID uuid.UUID) ([]ExecutionEvent, error)
	// GetTimeline materializes a complete execution view from its event stream.
	GetTimeline(ctx context.Context, executionID uuid.UUID) (*MaterializedExecution, error)
	// ListExecutions returns materialized executions matching the given filter.
	ListExecutions(ctx context.Context, filter ExecutionEventFilter) ([]MaterializedExecution, error)
}

EventStore defines persistence operations for execution events using an append-only event sourcing pattern.

type ExecutionDiff

type ExecutionDiff struct {
	ExecutionA uuid.UUID   `json:"execution_a"`
	ExecutionB uuid.UUID   `json:"execution_b"`
	StepDiffs  []StepDiff  `json:"step_diffs"`
	Summary    DiffSummary `json:"summary"`
}

ExecutionDiff compares two executions step by step.

type ExecutionEvent

type ExecutionEvent struct {
	ID          uuid.UUID       `json:"id"`
	ExecutionID uuid.UUID       `json:"execution_id"`
	SequenceNum int64           `json:"sequence_num"`
	EventType   string          `json:"event_type"`
	EventData   json.RawMessage `json:"event_data"`
	CreatedAt   time.Time       `json:"created_at"`
}

ExecutionEvent represents a single immutable event in the execution log.

type ExecutionEventFilter

type ExecutionEventFilter struct {
	Pipeline string
	TenantID string
	Status   string
	Since    *time.Time
	Until    *time.Time
	Limit    int
	Offset   int
}

ExecutionEventFilter specifies criteria for listing materialized executions.

type ExecutionFilter

type ExecutionFilter struct {
	WorkflowID *uuid.UUID
	Status     ExecutionStatus
	Since      *time.Time
	Until      *time.Time
	Pagination Pagination
}

ExecutionFilter specifies criteria for listing executions.

type ExecutionLog

type ExecutionLog struct {
	ID          int64           `json:"id"`
	WorkflowID  uuid.UUID       `json:"workflow_id"`
	ExecutionID *uuid.UUID      `json:"execution_id,omitempty"`
	Level       LogLevel        `json:"level"`
	Message     string          `json:"message"`
	ModuleName  string          `json:"module_name,omitempty"`
	Fields      json.RawMessage `json:"fields,omitempty"`
	CreatedAt   time.Time       `json:"created_at"`
}

ExecutionLog represents a log entry for a workflow execution.

type ExecutionStatus

type ExecutionStatus string

ExecutionStatus represents the status of a workflow execution.

const (
	ExecutionStatusPending   ExecutionStatus = "pending"
	ExecutionStatusRunning   ExecutionStatus = "running"
	ExecutionStatusCompleted ExecutionStatus = "completed"
	ExecutionStatusFailed    ExecutionStatus = "failed"
	ExecutionStatusCancelled ExecutionStatus = "cancelled"
)

type ExecutionStep

type ExecutionStep struct {
	ID           uuid.UUID       `json:"id"`
	ExecutionID  uuid.UUID       `json:"execution_id"`
	StepName     string          `json:"step_name"`
	StepType     string          `json:"step_type"`
	InputData    json.RawMessage `json:"input_data,omitempty"`
	OutputData   json.RawMessage `json:"output_data,omitempty"`
	Status       StepStatus      `json:"status"`
	ErrorMessage string          `json:"error_message,omitempty"`
	StartedAt    *time.Time      `json:"started_at,omitempty"`
	CompletedAt  *time.Time      `json:"completed_at,omitempty"`
	DurationMs   *int64          `json:"duration_ms,omitempty"`
	SequenceNum  int             `json:"sequence_num"`
	Metadata     json.RawMessage `json:"metadata,omitempty"`
}

ExecutionStep represents a single step within a workflow execution.

type ExecutionStore

type ExecutionStore interface {
	// CreateExecution creates a new workflow execution record.
	CreateExecution(ctx context.Context, e *WorkflowExecution) error
	// GetExecution retrieves an execution by ID.
	GetExecution(ctx context.Context, id uuid.UUID) (*WorkflowExecution, error)
	// UpdateExecution updates an execution record.
	UpdateExecution(ctx context.Context, e *WorkflowExecution) error
	// ListExecutions lists executions matching the filter.
	ListExecutions(ctx context.Context, f ExecutionFilter) ([]*WorkflowExecution, error)

	// CreateStep creates a new execution step.
	CreateStep(ctx context.Context, s *ExecutionStep) error
	// UpdateStep updates an execution step.
	UpdateStep(ctx context.Context, s *ExecutionStep) error
	// ListSteps lists steps for an execution.
	ListSteps(ctx context.Context, executionID uuid.UUID) ([]*ExecutionStep, error)

	// CountByStatus returns execution counts grouped by status for a workflow.
	CountByStatus(ctx context.Context, workflowID uuid.UUID) (map[ExecutionStatus]int, error)
}

ExecutionStore defines persistence operations for workflow executions.

type FieldChange

type FieldChange struct {
	Path   string `json:"path"`
	ValueA any    `json:"value_a"`
	ValueB any    `json:"value_b"`
}

FieldChange represents a single field difference between two maps.

func DiffMaps

func DiffMaps(a, b map[string]any) []FieldChange

DiffMaps recursively compares two maps and returns a list of field changes. Paths are dot-separated for nested keys.

type FileInfo

type FileInfo struct {
	Name        string    `json:"name"`
	Path        string    `json:"path"`
	Size        int64     `json:"size"`
	ModTime     time.Time `json:"modTime"`
	IsDir       bool      `json:"isDir"`
	ContentType string    `json:"contentType,omitempty"`
}

FileInfo describes metadata about a file in a storage provider.

type IAMProviderConfig

type IAMProviderConfig struct {
	ID           uuid.UUID       `json:"id"`
	CompanyID    uuid.UUID       `json:"company_id"`
	ProviderType IAMProviderType `json:"provider_type"`
	Name         string          `json:"name"`
	Config       json.RawMessage `json:"config"`
	Enabled      bool            `json:"enabled"`
	CreatedAt    time.Time       `json:"created_at"`
	UpdatedAt    time.Time       `json:"updated_at"`
}

IAMProviderConfig represents a configured IAM provider for a company.

type IAMProviderFilter

type IAMProviderFilter struct {
	CompanyID    *uuid.UUID
	ProviderType IAMProviderType
	Enabled      *bool
	Pagination   Pagination
}

IAMProviderFilter specifies criteria for listing IAM providers.

type IAMProviderType

type IAMProviderType string

IAMProviderType represents the type of an IAM provider.

const (
	IAMProviderAWS        IAMProviderType = "aws_iam"
	IAMProviderKubernetes IAMProviderType = "kubernetes"
	IAMProviderOIDC       IAMProviderType = "oidc"
	IAMProviderSAML       IAMProviderType = "saml"
	IAMProviderLDAP       IAMProviderType = "ldap"
	IAMProviderCustom     IAMProviderType = "custom"
)

type IAMRoleMapping

type IAMRoleMapping struct {
	ID                 uuid.UUID `json:"id"`
	ProviderID         uuid.UUID `json:"provider_id"`
	ExternalIdentifier string    `json:"external_identifier"`
	ResourceType       string    `json:"resource_type"`
	ResourceID         uuid.UUID `json:"resource_id"`
	Role               Role      `json:"role"`
	CreatedAt          time.Time `json:"created_at"`
}

IAMRoleMapping maps an external identity to a role on a resource.

type IAMRoleMappingFilter

type IAMRoleMappingFilter struct {
	ProviderID         *uuid.UUID
	ExternalIdentifier string
	ResourceType       string
	ResourceID         *uuid.UUID
	Pagination         Pagination
}

IAMRoleMappingFilter specifies criteria for listing IAM role mappings.

type IAMStore

type IAMStore interface {
	// CreateProvider creates a new IAM provider config.
	CreateProvider(ctx context.Context, p *IAMProviderConfig) error
	// GetProvider retrieves an IAM provider by ID.
	GetProvider(ctx context.Context, id uuid.UUID) (*IAMProviderConfig, error)
	// UpdateProvider updates an IAM provider config.
	UpdateProvider(ctx context.Context, p *IAMProviderConfig) error
	// DeleteProvider deletes an IAM provider config.
	DeleteProvider(ctx context.Context, id uuid.UUID) error
	// ListProviders lists IAM providers matching the filter.
	ListProviders(ctx context.Context, f IAMProviderFilter) ([]*IAMProviderConfig, error)

	// CreateMapping creates a new IAM role mapping.
	CreateMapping(ctx context.Context, m *IAMRoleMapping) error
	// GetMapping retrieves an IAM role mapping by ID.
	GetMapping(ctx context.Context, id uuid.UUID) (*IAMRoleMapping, error)
	// DeleteMapping deletes an IAM role mapping.
	DeleteMapping(ctx context.Context, id uuid.UUID) error
	// ListMappings lists IAM role mappings matching the filter.
	ListMappings(ctx context.Context, f IAMRoleMappingFilter) ([]*IAMRoleMapping, error)

	// ResolveRole resolves the role for an external identifier on a resource.
	ResolveRole(ctx context.Context, providerID uuid.UUID, externalID string, resourceType string, resourceID uuid.UUID) (Role, error)
}

IAMStore defines persistence operations for IAM providers and role mappings.

type IdempotencyRecord

type IdempotencyRecord struct {
	Key         string          `json:"key"`
	ExecutionID uuid.UUID       `json:"execution_id"`
	StepName    string          `json:"step_name"`
	Result      json.RawMessage `json:"result"`
	CreatedAt   time.Time       `json:"created_at"`
	ExpiresAt   time.Time       `json:"expires_at"`
}

IdempotencyRecord represents a stored result for an idempotency key.

type IdempotencyStore

type IdempotencyStore interface {
	// Check returns the stored result if the key exists and hasn't expired.
	// Returns nil, nil if the key doesn't exist.
	Check(ctx context.Context, key string) (*IdempotencyRecord, error)
	// Store saves a result for an idempotency key with an expiration time.
	Store(ctx context.Context, record *IdempotencyRecord) error
	// Cleanup removes expired keys.
	Cleanup(ctx context.Context) (int64, error)
}

IdempotencyStore defines persistence operations for idempotency keys.

type InMemoryAPIKeyStore

type InMemoryAPIKeyStore struct {
	// contains filtered or unexported fields
}

InMemoryAPIKeyStore is a thread-safe in-memory implementation of APIKeyStore.

func NewInMemoryAPIKeyStore

func NewInMemoryAPIKeyStore() *InMemoryAPIKeyStore

NewInMemoryAPIKeyStore creates a new InMemoryAPIKeyStore.

func (*InMemoryAPIKeyStore) Create

func (s *InMemoryAPIKeyStore) Create(_ context.Context, key *APIKey) (string, error)

func (*InMemoryAPIKeyStore) Delete

func (s *InMemoryAPIKeyStore) Delete(_ context.Context, id uuid.UUID) error

func (*InMemoryAPIKeyStore) Get

func (*InMemoryAPIKeyStore) GetByHash

func (s *InMemoryAPIKeyStore) GetByHash(_ context.Context, keyHash string) (*APIKey, error)

func (*InMemoryAPIKeyStore) List

func (s *InMemoryAPIKeyStore) List(_ context.Context, companyID uuid.UUID) ([]*APIKey, error)

func (*InMemoryAPIKeyStore) UpdateLastUsed

func (s *InMemoryAPIKeyStore) UpdateLastUsed(_ context.Context, id uuid.UUID) error

func (*InMemoryAPIKeyStore) Validate

func (s *InMemoryAPIKeyStore) Validate(_ context.Context, rawKey string) (*APIKey, error)

type InMemoryBackfillStore

type InMemoryBackfillStore struct {
	// contains filtered or unexported fields
}

InMemoryBackfillStore is a thread-safe in-memory implementation of BackfillStore.

func NewInMemoryBackfillStore

func NewInMemoryBackfillStore() *InMemoryBackfillStore

NewInMemoryBackfillStore creates a new InMemoryBackfillStore.

func (*InMemoryBackfillStore) Cancel

func (*InMemoryBackfillStore) Create

func (*InMemoryBackfillStore) Get

func (*InMemoryBackfillStore) List

func (*InMemoryBackfillStore) UpdateProgress

func (s *InMemoryBackfillStore) UpdateProgress(_ context.Context, id uuid.UUID, processed, failed int64) error

func (*InMemoryBackfillStore) UpdateStatus

func (s *InMemoryBackfillStore) UpdateStatus(_ context.Context, id uuid.UUID, status BackfillStatus, errMsg string) error

type InMemoryDLQStore

type InMemoryDLQStore struct {
	// contains filtered or unexported fields
}

InMemoryDLQStore is a thread-safe in-memory implementation of DLQStore.

func NewInMemoryDLQStore

func NewInMemoryDLQStore() *InMemoryDLQStore

NewInMemoryDLQStore creates a new InMemoryDLQStore.

func (*InMemoryDLQStore) Add

func (s *InMemoryDLQStore) Add(_ context.Context, entry *DLQEntry) error

func (*InMemoryDLQStore) Count

func (s *InMemoryDLQStore) Count(_ context.Context, filter DLQFilter) (int64, error)

func (*InMemoryDLQStore) Discard

func (s *InMemoryDLQStore) Discard(_ context.Context, id uuid.UUID) error

func (*InMemoryDLQStore) Get

func (*InMemoryDLQStore) List

func (s *InMemoryDLQStore) List(_ context.Context, filter DLQFilter) ([]*DLQEntry, error)

func (*InMemoryDLQStore) Purge

func (s *InMemoryDLQStore) Purge(_ context.Context, olderThan time.Duration) (int64, error)

func (*InMemoryDLQStore) Resolve

func (s *InMemoryDLQStore) Resolve(_ context.Context, id uuid.UUID) error

func (*InMemoryDLQStore) Retry

func (s *InMemoryDLQStore) Retry(_ context.Context, id uuid.UUID) error

func (*InMemoryDLQStore) UpdateStatus

func (s *InMemoryDLQStore) UpdateStatus(_ context.Context, id uuid.UUID, status DLQStatus) error

type InMemoryEventStore

type InMemoryEventStore struct {
	// contains filtered or unexported fields
}

InMemoryEventStore is a thread-safe in-memory implementation of EventStore. Suitable for testing and single-server use.

func NewInMemoryEventStore

func NewInMemoryEventStore() *InMemoryEventStore

NewInMemoryEventStore creates a new InMemoryEventStore.

func (*InMemoryEventStore) Append

func (s *InMemoryEventStore) Append(_ context.Context, executionID uuid.UUID, eventType string, data map[string]any) error

func (*InMemoryEventStore) GetEvents

func (s *InMemoryEventStore) GetEvents(_ context.Context, executionID uuid.UUID) ([]ExecutionEvent, error)

func (*InMemoryEventStore) GetTimeline

func (s *InMemoryEventStore) GetTimeline(_ context.Context, executionID uuid.UUID) (*MaterializedExecution, error)

func (*InMemoryEventStore) ListExecutions

type InMemoryIdempotencyStore

type InMemoryIdempotencyStore struct {
	// contains filtered or unexported fields
}

InMemoryIdempotencyStore is a thread-safe in-memory implementation of IdempotencyStore for testing and single-server use.

func NewInMemoryIdempotencyStore

func NewInMemoryIdempotencyStore() *InMemoryIdempotencyStore

NewInMemoryIdempotencyStore creates a new InMemoryIdempotencyStore.

func (*InMemoryIdempotencyStore) Check

func (*InMemoryIdempotencyStore) Cleanup

func (*InMemoryIdempotencyStore) Store

type InMemoryStepMockStore

type InMemoryStepMockStore struct {
	// contains filtered or unexported fields
}

InMemoryStepMockStore is a thread-safe in-memory implementation of StepMockStore.

func NewInMemoryStepMockStore

func NewInMemoryStepMockStore() *InMemoryStepMockStore

NewInMemoryStepMockStore creates a new InMemoryStepMockStore.

func (*InMemoryStepMockStore) ClearAll

func (s *InMemoryStepMockStore) ClearAll(_ context.Context) error

func (*InMemoryStepMockStore) Get

func (s *InMemoryStepMockStore) Get(_ context.Context, pipeline, step string) (*StepMock, error)

func (*InMemoryStepMockStore) IncrementHitCount

func (s *InMemoryStepMockStore) IncrementHitCount(_ context.Context, pipeline, step string) error

func (*InMemoryStepMockStore) List

func (s *InMemoryStepMockStore) List(_ context.Context, pipeline string) ([]*StepMock, error)

func (*InMemoryStepMockStore) Remove

func (s *InMemoryStepMockStore) Remove(_ context.Context, pipeline, step string) error

func (*InMemoryStepMockStore) Set

type LocalStorage

type LocalStorage struct {
	// contains filtered or unexported fields
}

LocalStorage implements StorageProvider backed by the local filesystem.

func NewLocalStorage

func NewLocalStorage(root string) (*LocalStorage, error)

NewLocalStorage creates a new LocalStorage rooted at the given directory. The directory is created if it does not exist.

func (*LocalStorage) Delete

func (l *LocalStorage) Delete(_ context.Context, path string) error

func (*LocalStorage) Get

func (l *LocalStorage) Get(_ context.Context, path string) (io.ReadCloser, error)

func (*LocalStorage) List

func (l *LocalStorage) List(_ context.Context, prefix string) ([]FileInfo, error)

func (*LocalStorage) MkdirAll

func (l *LocalStorage) MkdirAll(_ context.Context, path string) error

MkdirAll creates a directory path and all parents that do not exist.

func (*LocalStorage) Put

func (l *LocalStorage) Put(_ context.Context, path string, reader io.Reader) error

func (*LocalStorage) Root

func (l *LocalStorage) Root() string

Root returns the absolute root path.

func (*LocalStorage) Stat

func (l *LocalStorage) Stat(_ context.Context, path string) (FileInfo, error)

type LogFilter

type LogFilter struct {
	WorkflowID  *uuid.UUID
	ExecutionID *uuid.UUID
	Level       LogLevel
	ModuleName  string
	Since       *time.Time
	Until       *time.Time
	Pagination  Pagination
}

LogFilter specifies criteria for querying logs.

type LogLevel

type LogLevel string

LogLevel represents a log severity level.

const (
	LogLevelDebug LogLevel = "debug"
	LogLevelInfo  LogLevel = "info"
	LogLevelWarn  LogLevel = "warn"
	LogLevelError LogLevel = "error"
	LogLevelFatal LogLevel = "fatal"
)

type LogStore

type LogStore interface {
	// Append adds a log entry.
	Append(ctx context.Context, l *ExecutionLog) error
	// Query returns log entries matching the filter.
	Query(ctx context.Context, f LogFilter) ([]*ExecutionLog, error)
	// CountByLevel returns log counts grouped by level for a workflow.
	CountByLevel(ctx context.Context, workflowID uuid.UUID) (map[LogLevel]int, error)
}

LogStore defines persistence operations for execution logs.

type MaterializedExecution

type MaterializedExecution struct {
	ExecutionID uuid.UUID          `json:"execution_id"`
	Pipeline    string             `json:"pipeline,omitempty"`
	TenantID    string             `json:"tenant_id,omitempty"`
	Status      string             `json:"status"`
	Steps       []MaterializedStep `json:"steps,omitempty"`
	Error       string             `json:"error,omitempty"`
	StartedAt   *time.Time         `json:"started_at,omitempty"`
	CompletedAt *time.Time         `json:"completed_at,omitempty"`
	EventCount  int                `json:"event_count"`
}

MaterializedExecution is a read-optimized view of a complete execution, materialized from the event stream.

type MaterializedStep

type MaterializedStep struct {
	StepName    string          `json:"step_name"`
	StepType    string          `json:"step_type,omitempty"`
	Status      string          `json:"status"`
	InputData   json.RawMessage `json:"input_data,omitempty"`
	OutputData  json.RawMessage `json:"output_data,omitempty"`
	Error       string          `json:"error,omitempty"`
	Route       string          `json:"route,omitempty"`
	Retries     int             `json:"retries,omitempty"`
	StartedAt   *time.Time      `json:"started_at,omitempty"`
	CompletedAt *time.Time      `json:"completed_at,omitempty"`
}

MaterializedStep is a read-optimized view of a single step within an execution.

type Membership

type Membership struct {
	ID        uuid.UUID  `json:"id"`
	UserID    uuid.UUID  `json:"user_id"`
	CompanyID uuid.UUID  `json:"company_id"`
	ProjectID *uuid.UUID `json:"project_id,omitempty"` // nil means company-level membership
	Role      Role       `json:"role"`
	CreatedAt time.Time  `json:"created_at"`
	UpdatedAt time.Time  `json:"updated_at"`
}

Membership represents a user's role within a company or project.

type MembershipFilter

type MembershipFilter struct {
	UserID     *uuid.UUID
	CompanyID  *uuid.UUID
	ProjectID  *uuid.UUID
	Role       Role
	Pagination Pagination
}

MembershipFilter specifies criteria for listing memberships.

type MembershipStore

type MembershipStore interface {
	Create(ctx context.Context, m *Membership) error
	Get(ctx context.Context, id uuid.UUID) (*Membership, error)
	Update(ctx context.Context, m *Membership) error
	Delete(ctx context.Context, id uuid.UUID) error
	List(ctx context.Context, f MembershipFilter) ([]*Membership, error)
	// GetEffectiveRole resolves the effective role for a user in a project,
	// cascading from company-level if no project-level membership exists.
	GetEffectiveRole(ctx context.Context, userID, companyID uuid.UUID, projectID *uuid.UUID) (Role, error)
}

MembershipStore defines persistence operations for memberships.

type Migrator

type Migrator struct {
	// contains filtered or unexported fields
}

Migrator applies SQL migrations from the embedded filesystem.

func NewMigrator

func NewMigrator(pool *pgxpool.Pool) *Migrator

NewMigrator creates a new Migrator.

func (*Migrator) Migrate

func (m *Migrator) Migrate(ctx context.Context) error

Migrate applies all pending migrations in order. It uses an advisory lock to prevent concurrent migration runs and tracks applied migrations in a schema_migrations table.

type MockAuditStore

type MockAuditStore struct {
	// contains filtered or unexported fields
}

MockAuditStore is an in-memory implementation of AuditStore for testing.

func NewMockAuditStore

func NewMockAuditStore() *MockAuditStore

NewMockAuditStore creates a new MockAuditStore.

func (*MockAuditStore) Query

func (*MockAuditStore) Record

func (s *MockAuditStore) Record(_ context.Context, e *AuditEntry) error

type MockCompanyStore

type MockCompanyStore struct {
	// contains filtered or unexported fields
}

MockCompanyStore is an in-memory implementation of CompanyStore for testing.

func NewMockCompanyStore

func NewMockCompanyStore() *MockCompanyStore

NewMockCompanyStore creates a new MockCompanyStore.

func (*MockCompanyStore) Create

func (s *MockCompanyStore) Create(_ context.Context, c *Company) error

func (*MockCompanyStore) Delete

func (s *MockCompanyStore) Delete(_ context.Context, id uuid.UUID) error

func (*MockCompanyStore) Get

func (*MockCompanyStore) GetBySlug

func (s *MockCompanyStore) GetBySlug(_ context.Context, slug string) (*Company, error)

func (*MockCompanyStore) List

func (*MockCompanyStore) ListForUser

func (s *MockCompanyStore) ListForUser(_ context.Context, userID uuid.UUID) ([]*Company, error)

func (*MockCompanyStore) SetMembershipStore

func (s *MockCompanyStore) SetMembershipStore(ms *MockMembershipStore)

SetMembershipStore links a MockMembershipStore for ListForUser support.

func (*MockCompanyStore) Update

func (s *MockCompanyStore) Update(_ context.Context, c *Company) error

type MockCrossWorkflowLinkStore

type MockCrossWorkflowLinkStore struct {
	// contains filtered or unexported fields
}

MockCrossWorkflowLinkStore is an in-memory implementation of CrossWorkflowLinkStore for testing.

func NewMockCrossWorkflowLinkStore

func NewMockCrossWorkflowLinkStore() *MockCrossWorkflowLinkStore

NewMockCrossWorkflowLinkStore creates a new MockCrossWorkflowLinkStore.

func (*MockCrossWorkflowLinkStore) Create

func (*MockCrossWorkflowLinkStore) Delete

func (*MockCrossWorkflowLinkStore) Get

func (*MockCrossWorkflowLinkStore) List

type MockExecutionStore

type MockExecutionStore struct {
	// contains filtered or unexported fields
}

MockExecutionStore is an in-memory implementation of ExecutionStore for testing.

func NewMockExecutionStore

func NewMockExecutionStore() *MockExecutionStore

NewMockExecutionStore creates a new MockExecutionStore.

func (*MockExecutionStore) CountByStatus

func (s *MockExecutionStore) CountByStatus(_ context.Context, workflowID uuid.UUID) (map[ExecutionStatus]int, error)

func (*MockExecutionStore) CreateExecution

func (s *MockExecutionStore) CreateExecution(_ context.Context, e *WorkflowExecution) error

func (*MockExecutionStore) CreateStep

func (s *MockExecutionStore) CreateStep(_ context.Context, step *ExecutionStep) error

func (*MockExecutionStore) GetExecution

func (s *MockExecutionStore) GetExecution(_ context.Context, id uuid.UUID) (*WorkflowExecution, error)

func (*MockExecutionStore) ListExecutions

func (*MockExecutionStore) ListSteps

func (s *MockExecutionStore) ListSteps(_ context.Context, executionID uuid.UUID) ([]*ExecutionStep, error)

func (*MockExecutionStore) UpdateExecution

func (s *MockExecutionStore) UpdateExecution(_ context.Context, e *WorkflowExecution) error

func (*MockExecutionStore) UpdateStep

func (s *MockExecutionStore) UpdateStep(_ context.Context, step *ExecutionStep) error

type MockIAMStore

type MockIAMStore struct {
	// contains filtered or unexported fields
}

MockIAMStore is an in-memory implementation of IAMStore for testing.

func NewMockIAMStore

func NewMockIAMStore() *MockIAMStore

NewMockIAMStore creates a new MockIAMStore.

func (*MockIAMStore) CreateMapping

func (s *MockIAMStore) CreateMapping(_ context.Context, m *IAMRoleMapping) error

func (*MockIAMStore) CreateProvider

func (s *MockIAMStore) CreateProvider(_ context.Context, p *IAMProviderConfig) error

func (*MockIAMStore) DeleteMapping

func (s *MockIAMStore) DeleteMapping(_ context.Context, id uuid.UUID) error

func (*MockIAMStore) DeleteProvider

func (s *MockIAMStore) DeleteProvider(_ context.Context, id uuid.UUID) error

func (*MockIAMStore) GetMapping

func (s *MockIAMStore) GetMapping(_ context.Context, id uuid.UUID) (*IAMRoleMapping, error)

func (*MockIAMStore) GetProvider

func (s *MockIAMStore) GetProvider(_ context.Context, id uuid.UUID) (*IAMProviderConfig, error)

func (*MockIAMStore) ListMappings

func (*MockIAMStore) ListProviders

func (*MockIAMStore) ResolveRole

func (s *MockIAMStore) ResolveRole(_ context.Context, providerID uuid.UUID, externalID string, resourceType string, resourceID uuid.UUID) (Role, error)

func (*MockIAMStore) UpdateProvider

func (s *MockIAMStore) UpdateProvider(_ context.Context, p *IAMProviderConfig) error

type MockLogStore

type MockLogStore struct {
	// contains filtered or unexported fields
}

MockLogStore is an in-memory implementation of LogStore for testing.

func NewMockLogStore

func NewMockLogStore() *MockLogStore

NewMockLogStore creates a new MockLogStore.

func (*MockLogStore) Append

func (s *MockLogStore) Append(_ context.Context, l *ExecutionLog) error

func (*MockLogStore) CountByLevel

func (s *MockLogStore) CountByLevel(_ context.Context, workflowID uuid.UUID) (map[LogLevel]int, error)

func (*MockLogStore) Query

func (s *MockLogStore) Query(_ context.Context, f LogFilter) ([]*ExecutionLog, error)

type MockMembershipStore

type MockMembershipStore struct {
	// contains filtered or unexported fields
}

MockMembershipStore is an in-memory implementation of MembershipStore for testing.

func NewMockMembershipStore

func NewMockMembershipStore() *MockMembershipStore

NewMockMembershipStore creates a new MockMembershipStore.

func (*MockMembershipStore) Create

func (*MockMembershipStore) Delete

func (s *MockMembershipStore) Delete(_ context.Context, id uuid.UUID) error

func (*MockMembershipStore) Get

func (*MockMembershipStore) GetEffectiveRole

func (s *MockMembershipStore) GetEffectiveRole(_ context.Context, userID, companyID uuid.UUID, projectID *uuid.UUID) (Role, error)

func (*MockMembershipStore) List

func (*MockMembershipStore) Update

type MockProjectStore

type MockProjectStore struct {
	// contains filtered or unexported fields
}

MockProjectStore is an in-memory implementation of ProjectStore for testing.

func NewMockProjectStore

func NewMockProjectStore() *MockProjectStore

NewMockProjectStore creates a new MockProjectStore.

func (*MockProjectStore) Create

func (s *MockProjectStore) Create(_ context.Context, p *Project) error

func (*MockProjectStore) Delete

func (s *MockProjectStore) Delete(_ context.Context, id uuid.UUID) error

func (*MockProjectStore) Get

func (*MockProjectStore) GetBySlug

func (s *MockProjectStore) GetBySlug(_ context.Context, companyID uuid.UUID, slug string) (*Project, error)

func (*MockProjectStore) List

func (*MockProjectStore) ListForUser

func (s *MockProjectStore) ListForUser(_ context.Context, userID uuid.UUID) ([]*Project, error)

func (*MockProjectStore) SetMembershipStore

func (s *MockProjectStore) SetMembershipStore(ms *MockMembershipStore)

SetMembershipStore links a MockMembershipStore for ListForUser support.

func (*MockProjectStore) Update

func (s *MockProjectStore) Update(_ context.Context, p *Project) error

type MockSessionStore

type MockSessionStore struct {
	// contains filtered or unexported fields
}

MockSessionStore is an in-memory implementation of SessionStore for testing.

func NewMockSessionStore

func NewMockSessionStore() *MockSessionStore

NewMockSessionStore creates a new MockSessionStore.

func (*MockSessionStore) Create

func (s *MockSessionStore) Create(_ context.Context, sess *Session) error

func (*MockSessionStore) Delete

func (s *MockSessionStore) Delete(_ context.Context, id uuid.UUID) error

func (*MockSessionStore) DeleteExpired

func (s *MockSessionStore) DeleteExpired(_ context.Context) (int64, error)

func (*MockSessionStore) Get

func (*MockSessionStore) GetByToken

func (s *MockSessionStore) GetByToken(_ context.Context, token string) (*Session, error)

func (*MockSessionStore) List

func (*MockSessionStore) Update

func (s *MockSessionStore) Update(_ context.Context, sess *Session) error

type MockUserStore

type MockUserStore struct {
	// contains filtered or unexported fields
}

MockUserStore is an in-memory implementation of UserStore for testing.

func NewMockUserStore

func NewMockUserStore() *MockUserStore

NewMockUserStore creates a new MockUserStore.

func (*MockUserStore) Create

func (s *MockUserStore) Create(_ context.Context, u *User) error

func (*MockUserStore) Delete

func (s *MockUserStore) Delete(_ context.Context, id uuid.UUID) error

func (*MockUserStore) Get

func (s *MockUserStore) Get(_ context.Context, id uuid.UUID) (*User, error)

func (*MockUserStore) GetByEmail

func (s *MockUserStore) GetByEmail(_ context.Context, email string) (*User, error)

func (*MockUserStore) GetByOAuth

func (s *MockUserStore) GetByOAuth(_ context.Context, provider OAuthProvider, oauthID string) (*User, error)

func (*MockUserStore) List

func (s *MockUserStore) List(_ context.Context, f UserFilter) ([]*User, error)

func (*MockUserStore) Update

func (s *MockUserStore) Update(_ context.Context, u *User) error

type MockWorkflowStore

type MockWorkflowStore struct {
	// contains filtered or unexported fields
}

MockWorkflowStore is an in-memory implementation of WorkflowStore for testing.

func NewMockWorkflowStore

func NewMockWorkflowStore() *MockWorkflowStore

NewMockWorkflowStore creates a new MockWorkflowStore.

func (*MockWorkflowStore) Create

func (*MockWorkflowStore) Delete

func (s *MockWorkflowStore) Delete(_ context.Context, id uuid.UUID) error

func (*MockWorkflowStore) Get

func (*MockWorkflowStore) GetBySlug

func (s *MockWorkflowStore) GetBySlug(_ context.Context, projectID uuid.UUID, slug string) (*WorkflowRecord, error)

func (*MockWorkflowStore) GetVersion

func (s *MockWorkflowStore) GetVersion(_ context.Context, id uuid.UUID, version int) (*WorkflowRecord, error)

func (*MockWorkflowStore) List

func (*MockWorkflowStore) ListVersions

func (s *MockWorkflowStore) ListVersions(_ context.Context, id uuid.UUID) ([]*WorkflowRecord, error)

func (*MockWorkflowStore) Update

type OAuthProvider

type OAuthProvider string

OAuthProvider represents the provider for an OAuth connection.

const (
	OAuthProviderGitHub OAuthProvider = "github"
	OAuthProviderGoogle OAuthProvider = "google"
)

type Organization

type Organization = Company

Organization is an alias for Company for code clarity where "organization" terminology is preferred.

type OrganizationStore

type OrganizationStore = CompanyStore

OrganizationStore is an alias for CompanyStore.

type PGAuditStore

type PGAuditStore struct {
	// contains filtered or unexported fields
}

PGAuditStore implements AuditStore backed by PostgreSQL.

func (*PGAuditStore) Query

func (s *PGAuditStore) Query(ctx context.Context, f AuditFilter) ([]*AuditEntry, error)

func (*PGAuditStore) Record

func (s *PGAuditStore) Record(ctx context.Context, e *AuditEntry) error

type PGCompanyStore

type PGCompanyStore struct {
	// contains filtered or unexported fields
}

PGCompanyStore implements CompanyStore backed by PostgreSQL.

func (*PGCompanyStore) Create

func (s *PGCompanyStore) Create(ctx context.Context, c *Company) error

func (*PGCompanyStore) Delete

func (s *PGCompanyStore) Delete(ctx context.Context, id uuid.UUID) error

func (*PGCompanyStore) Get

func (s *PGCompanyStore) Get(ctx context.Context, id uuid.UUID) (*Company, error)

func (*PGCompanyStore) GetBySlug

func (s *PGCompanyStore) GetBySlug(ctx context.Context, slug string) (*Company, error)

func (*PGCompanyStore) List

func (s *PGCompanyStore) List(ctx context.Context, f CompanyFilter) ([]*Company, error)

func (*PGCompanyStore) ListForUser

func (s *PGCompanyStore) ListForUser(ctx context.Context, userID uuid.UUID) ([]*Company, error)

func (*PGCompanyStore) Update

func (s *PGCompanyStore) Update(ctx context.Context, c *Company) error

type PGConfig

type PGConfig struct {
	URL             string `yaml:"url" json:"url"`
	MaxConns        int32  `yaml:"max_conns" json:"max_conns"`
	MinConns        int32  `yaml:"min_conns" json:"min_conns"`
	MaxConnIdleTime string `yaml:"max_conn_idle_time" json:"max_conn_idle_time"`
}

PGConfig holds PostgreSQL connection configuration.

type PGCrossWorkflowLinkStore

type PGCrossWorkflowLinkStore struct {
	// contains filtered or unexported fields
}

PGCrossWorkflowLinkStore implements CrossWorkflowLinkStore backed by PostgreSQL.

func (*PGCrossWorkflowLinkStore) Create

func (*PGCrossWorkflowLinkStore) Delete

func (*PGCrossWorkflowLinkStore) Get

func (*PGCrossWorkflowLinkStore) List

type PGExecutionStore

type PGExecutionStore struct {
	// contains filtered or unexported fields
}

PGExecutionStore implements ExecutionStore backed by PostgreSQL.

func (*PGExecutionStore) CountByStatus

func (s *PGExecutionStore) CountByStatus(ctx context.Context, workflowID uuid.UUID) (map[ExecutionStatus]int, error)

func (*PGExecutionStore) CreateExecution

func (s *PGExecutionStore) CreateExecution(ctx context.Context, e *WorkflowExecution) error

func (*PGExecutionStore) CreateStep

func (s *PGExecutionStore) CreateStep(ctx context.Context, step *ExecutionStep) error

func (*PGExecutionStore) GetExecution

func (s *PGExecutionStore) GetExecution(ctx context.Context, id uuid.UUID) (*WorkflowExecution, error)

func (*PGExecutionStore) ListExecutions

func (s *PGExecutionStore) ListExecutions(ctx context.Context, f ExecutionFilter) ([]*WorkflowExecution, error)

func (*PGExecutionStore) ListSteps

func (s *PGExecutionStore) ListSteps(ctx context.Context, executionID uuid.UUID) ([]*ExecutionStep, error)

func (*PGExecutionStore) UpdateExecution

func (s *PGExecutionStore) UpdateExecution(ctx context.Context, e *WorkflowExecution) error

func (*PGExecutionStore) UpdateStep

func (s *PGExecutionStore) UpdateStep(ctx context.Context, step *ExecutionStep) error

type PGIAMStore

type PGIAMStore struct {
	// contains filtered or unexported fields
}

PGIAMStore implements IAMStore backed by PostgreSQL.

func (*PGIAMStore) CreateMapping

func (s *PGIAMStore) CreateMapping(ctx context.Context, m *IAMRoleMapping) error

func (*PGIAMStore) CreateProvider

func (s *PGIAMStore) CreateProvider(ctx context.Context, p *IAMProviderConfig) error

func (*PGIAMStore) DeleteMapping

func (s *PGIAMStore) DeleteMapping(ctx context.Context, id uuid.UUID) error

func (*PGIAMStore) DeleteProvider

func (s *PGIAMStore) DeleteProvider(ctx context.Context, id uuid.UUID) error

func (*PGIAMStore) GetMapping

func (s *PGIAMStore) GetMapping(ctx context.Context, id uuid.UUID) (*IAMRoleMapping, error)

func (*PGIAMStore) GetProvider

func (s *PGIAMStore) GetProvider(ctx context.Context, id uuid.UUID) (*IAMProviderConfig, error)

func (*PGIAMStore) ListMappings

func (s *PGIAMStore) ListMappings(ctx context.Context, f IAMRoleMappingFilter) ([]*IAMRoleMapping, error)

func (*PGIAMStore) ListProviders

func (s *PGIAMStore) ListProviders(ctx context.Context, f IAMProviderFilter) ([]*IAMProviderConfig, error)

func (*PGIAMStore) ResolveRole

func (s *PGIAMStore) ResolveRole(ctx context.Context, providerID uuid.UUID, externalID string, resourceType string, resourceID uuid.UUID) (Role, error)

func (*PGIAMStore) UpdateProvider

func (s *PGIAMStore) UpdateProvider(ctx context.Context, p *IAMProviderConfig) error

type PGLogStore

type PGLogStore struct {
	// contains filtered or unexported fields
}

PGLogStore implements LogStore backed by PostgreSQL.

func (*PGLogStore) Append

func (s *PGLogStore) Append(ctx context.Context, l *ExecutionLog) error

func (*PGLogStore) CountByLevel

func (s *PGLogStore) CountByLevel(ctx context.Context, workflowID uuid.UUID) (map[LogLevel]int, error)

func (*PGLogStore) Query

func (s *PGLogStore) Query(ctx context.Context, f LogFilter) ([]*ExecutionLog, error)

type PGMembershipStore

type PGMembershipStore struct {
	// contains filtered or unexported fields
}

PGMembershipStore implements MembershipStore backed by PostgreSQL.

func (*PGMembershipStore) Create

func (s *PGMembershipStore) Create(ctx context.Context, m *Membership) error

func (*PGMembershipStore) Delete

func (s *PGMembershipStore) Delete(ctx context.Context, id uuid.UUID) error

func (*PGMembershipStore) Get

func (*PGMembershipStore) GetEffectiveRole

func (s *PGMembershipStore) GetEffectiveRole(ctx context.Context, userID, companyID uuid.UUID, projectID *uuid.UUID) (Role, error)

GetEffectiveRole resolves the effective role for a user. If projectID is non-nil, it first checks for a project-level membership; if none is found it falls back to the company-level membership.

func (*PGMembershipStore) List

func (*PGMembershipStore) Update

func (s *PGMembershipStore) Update(ctx context.Context, m *Membership) error

type PGProjectStore

type PGProjectStore struct {
	// contains filtered or unexported fields
}

PGProjectStore implements ProjectStore backed by PostgreSQL.

func (*PGProjectStore) Create

func (s *PGProjectStore) Create(ctx context.Context, p *Project) error

func (*PGProjectStore) Delete

func (s *PGProjectStore) Delete(ctx context.Context, id uuid.UUID) error

func (*PGProjectStore) Get

func (s *PGProjectStore) Get(ctx context.Context, id uuid.UUID) (*Project, error)

func (*PGProjectStore) GetBySlug

func (s *PGProjectStore) GetBySlug(ctx context.Context, companyID uuid.UUID, slug string) (*Project, error)

func (*PGProjectStore) List

func (s *PGProjectStore) List(ctx context.Context, f ProjectFilter) ([]*Project, error)

func (*PGProjectStore) ListForUser

func (s *PGProjectStore) ListForUser(ctx context.Context, userID uuid.UUID) ([]*Project, error)

func (*PGProjectStore) Update

func (s *PGProjectStore) Update(ctx context.Context, p *Project) error

type PGSessionStore

type PGSessionStore struct {
	// contains filtered or unexported fields
}

PGSessionStore implements SessionStore backed by PostgreSQL.

func (*PGSessionStore) Create

func (s *PGSessionStore) Create(ctx context.Context, sess *Session) error

func (*PGSessionStore) Delete

func (s *PGSessionStore) Delete(ctx context.Context, id uuid.UUID) error

func (*PGSessionStore) DeleteExpired

func (s *PGSessionStore) DeleteExpired(ctx context.Context) (int64, error)

func (*PGSessionStore) Get

func (s *PGSessionStore) Get(ctx context.Context, id uuid.UUID) (*Session, error)

func (*PGSessionStore) GetByToken

func (s *PGSessionStore) GetByToken(ctx context.Context, token string) (*Session, error)

func (*PGSessionStore) List

func (s *PGSessionStore) List(ctx context.Context, f SessionFilter) ([]*Session, error)

func (*PGSessionStore) Update

func (s *PGSessionStore) Update(ctx context.Context, sess *Session) error

type PGStore

type PGStore struct {
	// contains filtered or unexported fields
}

PGStore wraps a pgxpool.Pool and provides access to all domain stores.

func NewPGStore

func NewPGStore(ctx context.Context, cfg PGConfig) (*PGStore, error)

NewPGStore connects to PostgreSQL and returns a PGStore with all sub-stores.

func (*PGStore) Audit

func (s *PGStore) Audit() AuditStore

Audit returns the AuditStore.

func (*PGStore) Close

func (s *PGStore) Close()

Close closes the connection pool.

func (*PGStore) Companies

func (s *PGStore) Companies() CompanyStore

Companies returns the CompanyStore.

func (s *PGStore) CrossWorkflowLinks() CrossWorkflowLinkStore

CrossWorkflowLinks returns the CrossWorkflowLinkStore.

func (*PGStore) Executions

func (s *PGStore) Executions() ExecutionStore

Executions returns the ExecutionStore.

func (*PGStore) IAM

func (s *PGStore) IAM() IAMStore

IAM returns the IAMStore.

func (*PGStore) Logs

func (s *PGStore) Logs() LogStore

Logs returns the LogStore.

func (*PGStore) Memberships

func (s *PGStore) Memberships() MembershipStore

Memberships returns the MembershipStore.

func (*PGStore) Pool

func (s *PGStore) Pool() *pgxpool.Pool

Pool returns the underlying pgxpool.Pool.

func (*PGStore) Projects

func (s *PGStore) Projects() ProjectStore

Projects returns the ProjectStore.

func (*PGStore) Sessions

func (s *PGStore) Sessions() SessionStore

Sessions returns the SessionStore.

func (*PGStore) Users

func (s *PGStore) Users() UserStore

Users returns the UserStore.

func (*PGStore) Workflows

func (s *PGStore) Workflows() WorkflowStore

Workflows returns the WorkflowStore.

type PGUserStore

type PGUserStore struct {
	// contains filtered or unexported fields
}

PGUserStore implements UserStore backed by PostgreSQL.

func (*PGUserStore) Create

func (s *PGUserStore) Create(ctx context.Context, u *User) error

func (*PGUserStore) Delete

func (s *PGUserStore) Delete(ctx context.Context, id uuid.UUID) error

func (*PGUserStore) Get

func (s *PGUserStore) Get(ctx context.Context, id uuid.UUID) (*User, error)

func (*PGUserStore) GetByEmail

func (s *PGUserStore) GetByEmail(ctx context.Context, email string) (*User, error)

func (*PGUserStore) GetByOAuth

func (s *PGUserStore) GetByOAuth(ctx context.Context, provider OAuthProvider, oauthID string) (*User, error)

func (*PGUserStore) List

func (s *PGUserStore) List(ctx context.Context, f UserFilter) ([]*User, error)

func (*PGUserStore) Update

func (s *PGUserStore) Update(ctx context.Context, u *User) error

type PGWorkflowStore

type PGWorkflowStore struct {
	// contains filtered or unexported fields
}

PGWorkflowStore implements WorkflowStore backed by PostgreSQL. Versioning is handled via a workflow_versions table; the main workflows table always reflects the latest version.

func (*PGWorkflowStore) Create

func (s *PGWorkflowStore) Create(ctx context.Context, w *WorkflowRecord) error

func (*PGWorkflowStore) Delete

func (s *PGWorkflowStore) Delete(ctx context.Context, id uuid.UUID) error

func (*PGWorkflowStore) Get

func (*PGWorkflowStore) GetBySlug

func (s *PGWorkflowStore) GetBySlug(ctx context.Context, projectID uuid.UUID, slug string) (*WorkflowRecord, error)

func (*PGWorkflowStore) GetVersion

func (s *PGWorkflowStore) GetVersion(ctx context.Context, id uuid.UUID, version int) (*WorkflowRecord, error)

func (*PGWorkflowStore) List

func (*PGWorkflowStore) ListVersions

func (s *PGWorkflowStore) ListVersions(ctx context.Context, id uuid.UUID) ([]*WorkflowRecord, error)

func (*PGWorkflowStore) Update

func (s *PGWorkflowStore) Update(ctx context.Context, w *WorkflowRecord) error

type Pagination

type Pagination struct {
	Offset int
	Limit  int
}

Pagination holds common pagination parameters.

func DefaultPagination

func DefaultPagination() Pagination

DefaultPagination returns a Pagination with sensible defaults.

type Project

type Project struct {
	ID          uuid.UUID       `json:"id"`
	CompanyID   uuid.UUID       `json:"company_id"`
	Name        string          `json:"name"`
	Slug        string          `json:"slug"`
	Description string          `json:"description,omitempty"`
	Metadata    json.RawMessage `json:"metadata,omitempty"`
	CreatedAt   time.Time       `json:"created_at"`
	UpdatedAt   time.Time       `json:"updated_at"`
}

Project represents a project within a company.

type ProjectFilter

type ProjectFilter struct {
	CompanyID  *uuid.UUID
	Slug       string
	Pagination Pagination
}

ProjectFilter specifies criteria for listing projects.

type ProjectStore

type ProjectStore interface {
	Create(ctx context.Context, p *Project) error
	Get(ctx context.Context, id uuid.UUID) (*Project, error)
	GetBySlug(ctx context.Context, companyID uuid.UUID, slug string) (*Project, error)
	Update(ctx context.Context, p *Project) error
	Delete(ctx context.Context, id uuid.UUID) error
	List(ctx context.Context, f ProjectFilter) ([]*Project, error)
	ListForUser(ctx context.Context, userID uuid.UUID) ([]*Project, error)
}

ProjectStore defines persistence operations for projects.

type ReplayHandler

type ReplayHandler struct {

	// ReplayFunc is called to actually replay an execution. It receives the
	// original execution's timeline and returns a new execution ID.
	// If nil, replays are queued but not executed.
	ReplayFunc func(original *MaterializedExecution, mode string, modifications map[string]any) (uuid.UUID, error)
	// contains filtered or unexported fields
}

ReplayHandler provides HTTP endpoints for the Request Replay API.

func NewReplayHandler

func NewReplayHandler(store EventStore, logger *slog.Logger) *ReplayHandler

NewReplayHandler creates a new ReplayHandler.

func (*ReplayHandler) RegisterRoutes

func (h *ReplayHandler) RegisterRoutes(mux *http.ServeMux)

RegisterRoutes registers replay API routes.

type ReplayRequest

type ReplayRequest struct {
	Mode          string         `json:"mode"`                    // "exact" or "modified"
	Modifications map[string]any `json:"modifications,omitempty"` // step overrides for "modified" mode
}

ReplayRequest defines a request to replay an execution.

type ReplayResult

type ReplayResult struct {
	OriginalExecutionID uuid.UUID `json:"original_execution_id"`
	NewExecutionID      uuid.UUID `json:"new_execution_id"`
	Type                string    `json:"type"` // "replay"
	Mode                string    `json:"mode"`
	Status              string    `json:"status"` // "queued", "started"
}

ReplayResult describes the outcome of a replay operation.

type Role

type Role string

Role represents a membership role within a company or project.

const (
	RoleOwner  Role = "owner"
	RoleAdmin  Role = "admin"
	RoleEditor Role = "editor"
	RoleViewer Role = "viewer"
)

type SQLiteAPIKeyStore

type SQLiteAPIKeyStore struct {
	// contains filtered or unexported fields
}

SQLiteAPIKeyStore is a SQLite-backed implementation of APIKeyStore.

func NewSQLiteAPIKeyStore

func NewSQLiteAPIKeyStore(dbPath string) (*SQLiteAPIKeyStore, error)

NewSQLiteAPIKeyStore creates a new SQLiteAPIKeyStore and initializes the schema.

func NewSQLiteAPIKeyStoreFromDB

func NewSQLiteAPIKeyStoreFromDB(db *sql.DB) (*SQLiteAPIKeyStore, error)

NewSQLiteAPIKeyStoreFromDB creates a SQLiteAPIKeyStore from an existing *sql.DB.

func (*SQLiteAPIKeyStore) Close

func (s *SQLiteAPIKeyStore) Close() error

Close closes the underlying database connection.

func (*SQLiteAPIKeyStore) Create

func (s *SQLiteAPIKeyStore) Create(ctx context.Context, key *APIKey) (string, error)

func (*SQLiteAPIKeyStore) Delete

func (s *SQLiteAPIKeyStore) Delete(ctx context.Context, id uuid.UUID) error

func (*SQLiteAPIKeyStore) Get

func (s *SQLiteAPIKeyStore) Get(ctx context.Context, id uuid.UUID) (*APIKey, error)

func (*SQLiteAPIKeyStore) GetByHash

func (s *SQLiteAPIKeyStore) GetByHash(ctx context.Context, keyHash string) (*APIKey, error)

func (*SQLiteAPIKeyStore) List

func (s *SQLiteAPIKeyStore) List(ctx context.Context, companyID uuid.UUID) ([]*APIKey, error)

func (*SQLiteAPIKeyStore) UpdateLastUsed

func (s *SQLiteAPIKeyStore) UpdateLastUsed(ctx context.Context, id uuid.UUID) error

func (*SQLiteAPIKeyStore) Validate

func (s *SQLiteAPIKeyStore) Validate(ctx context.Context, rawKey string) (*APIKey, error)

type SQLiteDLQStore

type SQLiteDLQStore struct {
	// contains filtered or unexported fields
}

SQLiteDLQStore implements DLQStore backed by SQLite.

func NewSQLiteDLQStore

func NewSQLiteDLQStore(dbPath string) (*SQLiteDLQStore, error)

NewSQLiteDLQStore creates a new SQLiteDLQStore using the given database path.

func NewSQLiteDLQStoreFromDB

func NewSQLiteDLQStoreFromDB(db *sql.DB) (*SQLiteDLQStore, error)

NewSQLiteDLQStoreFromDB wraps an existing *sql.DB connection.

func (*SQLiteDLQStore) Add

func (s *SQLiteDLQStore) Add(ctx context.Context, entry *DLQEntry) error

func (*SQLiteDLQStore) Close

func (s *SQLiteDLQStore) Close() error

Close closes the underlying database connection.

func (*SQLiteDLQStore) Count

func (s *SQLiteDLQStore) Count(ctx context.Context, filter DLQFilter) (int64, error)

func (*SQLiteDLQStore) Discard

func (s *SQLiteDLQStore) Discard(ctx context.Context, id uuid.UUID) error

func (*SQLiteDLQStore) Get

func (s *SQLiteDLQStore) Get(ctx context.Context, id uuid.UUID) (*DLQEntry, error)

func (*SQLiteDLQStore) List

func (s *SQLiteDLQStore) List(ctx context.Context, filter DLQFilter) ([]*DLQEntry, error)

func (*SQLiteDLQStore) Purge

func (s *SQLiteDLQStore) Purge(ctx context.Context, olderThan time.Duration) (int64, error)

func (*SQLiteDLQStore) Resolve

func (s *SQLiteDLQStore) Resolve(ctx context.Context, id uuid.UUID) error

func (*SQLiteDLQStore) Retry

func (s *SQLiteDLQStore) Retry(ctx context.Context, id uuid.UUID) error

func (*SQLiteDLQStore) UpdateStatus

func (s *SQLiteDLQStore) UpdateStatus(ctx context.Context, id uuid.UUID, status DLQStatus) error

type SQLiteEventStore

type SQLiteEventStore struct {
	// contains filtered or unexported fields
}

SQLiteEventStore implements EventStore backed by SQLite using database/sql. Writes are serialized with a mutex to avoid SQLITE_BUSY errors under concurrent load, which is the standard approach for SQLite.

func NewSQLiteEventStore

func NewSQLiteEventStore(dbPath string) (*SQLiteEventStore, error)

NewSQLiteEventStore creates a new SQLiteEventStore using the given database path. It opens the database and creates the required table if it does not exist.

func NewSQLiteEventStoreFromDB

func NewSQLiteEventStoreFromDB(db *sql.DB) (*SQLiteEventStore, error)

NewSQLiteEventStoreFromDB wraps an existing *sql.DB connection. It creates the required table if it does not exist.

func (*SQLiteEventStore) Append

func (s *SQLiteEventStore) Append(ctx context.Context, executionID uuid.UUID, eventType string, data map[string]any) error

func (*SQLiteEventStore) Close

func (s *SQLiteEventStore) Close() error

Close closes the underlying database connection.

func (*SQLiteEventStore) GetEvents

func (s *SQLiteEventStore) GetEvents(ctx context.Context, executionID uuid.UUID) ([]ExecutionEvent, error)

func (*SQLiteEventStore) GetTimeline

func (s *SQLiteEventStore) GetTimeline(ctx context.Context, executionID uuid.UUID) (*MaterializedExecution, error)

func (*SQLiteEventStore) ListExecutions

type SQLiteIdempotencyStore

type SQLiteIdempotencyStore struct {
	// contains filtered or unexported fields
}

SQLiteIdempotencyStore is a SQLite-backed implementation of IdempotencyStore.

func NewSQLiteIdempotencyStore

func NewSQLiteIdempotencyStore(db *sql.DB) (*SQLiteIdempotencyStore, error)

NewSQLiteIdempotencyStore creates a new SQLiteIdempotencyStore and ensures the required table exists. The caller is responsible for opening and closing the *sql.DB connection.

func (*SQLiteIdempotencyStore) Check

func (*SQLiteIdempotencyStore) Cleanup

func (s *SQLiteIdempotencyStore) Cleanup(ctx context.Context) (int64, error)

func (*SQLiteIdempotencyStore) Store

type Session

type Session struct {
	ID        uuid.UUID       `json:"id"`
	UserID    uuid.UUID       `json:"user_id"`
	Token     string          `json:"-"`
	IPAddress string          `json:"ip_address"`
	UserAgent string          `json:"user_agent"`
	Metadata  json.RawMessage `json:"metadata,omitempty"`
	Active    bool            `json:"active"`
	CreatedAt time.Time       `json:"created_at"`
	ExpiresAt time.Time       `json:"expires_at"`
}

Session represents an active user session.

type SessionFilter

type SessionFilter struct {
	UserID     *uuid.UUID
	Active     *bool
	Pagination Pagination
}

SessionFilter specifies criteria for listing sessions.

type SessionStore

type SessionStore interface {
	Create(ctx context.Context, s *Session) error
	Get(ctx context.Context, id uuid.UUID) (*Session, error)
	GetByToken(ctx context.Context, token string) (*Session, error)
	Update(ctx context.Context, s *Session) error
	Delete(ctx context.Context, id uuid.UUID) error
	List(ctx context.Context, f SessionFilter) ([]*Session, error)
	DeleteExpired(ctx context.Context) (int64, error)
}

SessionStore defines persistence operations for sessions.

type StepDiff

type StepDiff struct {
	StepName  string         `json:"step_name"`
	Status    string         `json:"status"` // "same", "different", "added", "removed"
	OutputA   map[string]any `json:"output_a,omitempty"`
	OutputB   map[string]any `json:"output_b,omitempty"`
	DurationA time.Duration  `json:"duration_a"`
	DurationB time.Duration  `json:"duration_b"`
	Changes   []FieldChange  `json:"changes,omitempty"`
}

StepDiff represents the difference between a step across two executions.

type StepMock

type StepMock struct {
	ID            uuid.UUID      `json:"id"`
	PipelineName  string         `json:"pipeline_name"`
	StepName      string         `json:"step_name"`
	Response      map[string]any `json:"response"`
	ErrorResponse string         `json:"error_response,omitempty"`
	Delay         time.Duration  `json:"delay,omitempty"`
	Enabled       bool           `json:"enabled"`
	HitCount      int64          `json:"hit_count"`
	CreatedAt     time.Time      `json:"created_at"`
}

StepMock defines a mock response for a specific pipeline step.

type StepMockStore

type StepMockStore interface {
	// Set creates or updates a mock for a specific pipeline step.
	Set(ctx context.Context, mock *StepMock) error
	// Get retrieves a mock for a specific pipeline and step.
	Get(ctx context.Context, pipeline, step string) (*StepMock, error)
	// List returns all mocks for a given pipeline.
	List(ctx context.Context, pipeline string) ([]*StepMock, error)
	// Remove deletes a mock for a specific pipeline and step.
	Remove(ctx context.Context, pipeline, step string) error
	// ClearAll removes all mocks.
	ClearAll(ctx context.Context) error
	// IncrementHitCount increments the hit count for a mock.
	IncrementHitCount(ctx context.Context, pipeline, step string) error
}

StepMockStore manages step mocks for pipeline testing.

type StepStatus

type StepStatus string

StepStatus represents the status of an execution step.

const (
	StepStatusPending   StepStatus = "pending"
	StepStatusRunning   StepStatus = "running"
	StepStatusCompleted StepStatus = "completed"
	StepStatusFailed    StepStatus = "failed"
	StepStatusSkipped   StepStatus = "skipped"
)

type StorageProvider

type StorageProvider interface {
	// List returns file entries under the given prefix.
	List(ctx context.Context, prefix string) ([]FileInfo, error)
	// Get retrieves a file by path.
	Get(ctx context.Context, path string) (io.ReadCloser, error)
	// Put writes a file at the given path.
	Put(ctx context.Context, path string, reader io.Reader) error
	// Delete removes a file at the given path.
	Delete(ctx context.Context, path string) error
	// Stat returns metadata for a file.
	Stat(ctx context.Context, path string) (FileInfo, error)
	// MkdirAll creates a directory path and all parents that do not exist.
	MkdirAll(ctx context.Context, path string) error
}

StorageProvider defines the interface for file storage backends.

type TimelineHandler

type TimelineHandler struct {
	// contains filtered or unexported fields
}

TimelineHandler provides HTTP endpoints for the Execution Timeline API.

func NewTimelineHandler

func NewTimelineHandler(store EventStore, logger *slog.Logger) *TimelineHandler

NewTimelineHandler creates a new TimelineHandler.

func (*TimelineHandler) RegisterRoutes

func (h *TimelineHandler) RegisterRoutes(mux *http.ServeMux)

RegisterRoutes registers the timeline API routes on the given mux.

type User

type User struct {
	ID            uuid.UUID       `json:"id"`
	Email         string          `json:"email"`
	PasswordHash  string          `json:"-"`
	DisplayName   string          `json:"display_name"`
	AvatarURL     string          `json:"avatar_url,omitempty"`
	OAuthProvider OAuthProvider   `json:"oauth_provider,omitempty"`
	OAuthID       string          `json:"oauth_id,omitempty"`
	Active        bool            `json:"active"`
	Metadata      json.RawMessage `json:"metadata,omitempty"`
	CreatedAt     time.Time       `json:"created_at"`
	UpdatedAt     time.Time       `json:"updated_at"`
	LastLoginAt   *time.Time      `json:"last_login_at,omitempty"`
}

User represents a platform user.

type UserFilter

type UserFilter struct {
	Email         string
	Active        *bool
	OAuthProvider OAuthProvider
	Pagination    Pagination
}

UserFilter specifies criteria for listing users.

type UserStore

type UserStore interface {
	Create(ctx context.Context, u *User) error
	Get(ctx context.Context, id uuid.UUID) (*User, error)
	GetByEmail(ctx context.Context, email string) (*User, error)
	GetByOAuth(ctx context.Context, provider OAuthProvider, oauthID string) (*User, error)
	Update(ctx context.Context, u *User) error
	Delete(ctx context.Context, id uuid.UUID) error
	List(ctx context.Context, f UserFilter) ([]*User, error)
}

UserStore defines persistence operations for users.

type WorkflowExecution

type WorkflowExecution struct {
	ID           uuid.UUID       `json:"id"`
	WorkflowID   uuid.UUID       `json:"workflow_id"`
	TriggerType  string          `json:"trigger_type"`
	TriggerData  json.RawMessage `json:"trigger_data,omitempty"`
	Status       ExecutionStatus `json:"status"`
	OutputData   json.RawMessage `json:"output_data,omitempty"`
	ErrorMessage string          `json:"error_message,omitempty"`
	ErrorStack   string          `json:"error_stack,omitempty"`
	StartedAt    time.Time       `json:"started_at"`
	CompletedAt  *time.Time      `json:"completed_at,omitempty"`
	DurationMs   *int64          `json:"duration_ms,omitempty"`
	Metadata     json.RawMessage `json:"metadata,omitempty"`
}

WorkflowExecution represents a single execution of a workflow.

type WorkflowFilter

type WorkflowFilter struct {
	ProjectID  *uuid.UUID
	Status     WorkflowStatus
	Slug       string
	Pagination Pagination
}

WorkflowFilter specifies criteria for listing workflow records.

type WorkflowRecord

type WorkflowRecord struct {
	ID          uuid.UUID      `json:"id"`
	ProjectID   uuid.UUID      `json:"project_id"`
	Name        string         `json:"name"`
	Slug        string         `json:"slug"`
	Description string         `json:"description,omitempty"`
	ConfigYAML  string         `json:"config_yaml"`
	Version     int            `json:"version"`
	Status      WorkflowStatus `json:"status"`
	CreatedBy   uuid.UUID      `json:"created_by"`
	UpdatedBy   uuid.UUID      `json:"updated_by"`
	CreatedAt   time.Time      `json:"created_at"`
	UpdatedAt   time.Time      `json:"updated_at"`
}

WorkflowRecord represents a stored workflow configuration with version tracking.

type WorkflowStatus

type WorkflowStatus string

WorkflowStatus represents the lifecycle status of a workflow record.

const (
	WorkflowStatusDraft   WorkflowStatus = "draft"
	WorkflowStatusActive  WorkflowStatus = "active"
	WorkflowStatusStopped WorkflowStatus = "stopped"
	WorkflowStatusError   WorkflowStatus = "error"
)

type WorkflowStore

type WorkflowStore interface {
	Create(ctx context.Context, w *WorkflowRecord) error
	Get(ctx context.Context, id uuid.UUID) (*WorkflowRecord, error)
	GetBySlug(ctx context.Context, projectID uuid.UUID, slug string) (*WorkflowRecord, error)
	Update(ctx context.Context, w *WorkflowRecord) error
	Delete(ctx context.Context, id uuid.UUID) error
	List(ctx context.Context, f WorkflowFilter) ([]*WorkflowRecord, error)
	// GetVersion retrieves a specific version of a workflow.
	GetVersion(ctx context.Context, id uuid.UUID, version int) (*WorkflowRecord, error)
	// ListVersions returns all versions for a given workflow ID.
	ListVersions(ctx context.Context, id uuid.UUID) ([]*WorkflowRecord, error)
}

WorkflowStore defines persistence operations for workflow records.

type WorkspaceManager

type WorkspaceManager struct {
	// contains filtered or unexported fields
}

WorkspaceManager manages project workspace directories and their storage.

func NewWorkspaceManager

func NewWorkspaceManager(dataDir string) *WorkspaceManager

NewWorkspaceManager creates a new WorkspaceManager rooted at the given data directory.

func (*WorkspaceManager) StorageForProject

func (wm *WorkspaceManager) StorageForProject(projectID string) (*LocalStorage, error)

StorageForProject returns a LocalStorage provider scoped to a project workspace.

func (*WorkspaceManager) WorkspacePath

func (wm *WorkspaceManager) WorkspacePath(projectID string) string

WorkspacePath returns the filesystem path for a project workspace.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL