repository

package
v0.1.148 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2026 License: AGPL-3.0, AGPL-3.0-or-later Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DecryptCredentials

func DecryptCredentials(conn *types.IntegrationConnection) (*types.IntegrationCredentials, error)

DecryptCredentials parses credentials from a connection (TODO: implement encryption)

func NewRedisClientForTest

func NewRedisClientForTest() (*common.RedisClient, error)

NewRedisClientForTest creates a Redis client backed by miniredis for testing

Types

type BackendRepository

type BackendRepository interface {
	// Workspaces
	CreateWorkspace(ctx context.Context, name string, tenantId *string) (*types.Workspace, error)
	GetWorkspace(ctx context.Context, id uint) (*types.Workspace, error)
	GetWorkspaceByExternalId(ctx context.Context, externalId string) (*types.Workspace, error)
	GetWorkspaceByName(ctx context.Context, name string) (*types.Workspace, error)
	ListWorkspaces(ctx context.Context) ([]*types.Workspace, error)
	ListWorkspacesByTenantId(ctx context.Context, tenantId string) ([]*types.Workspace, error)
	DeleteWorkspace(ctx context.Context, id uint) error

	// Workspace Tool Settings
	GetWorkspaceToolSettings(ctx context.Context, workspaceId uint) (*types.WorkspaceToolSettings, error)
	GetWorkspaceToolSetting(ctx context.Context, workspaceId uint, toolName string) (*types.WorkspaceToolSetting, error)
	SetWorkspaceToolSetting(ctx context.Context, workspaceId uint, toolName string, enabled bool) error
	ListWorkspaceToolSettings(ctx context.Context, workspaceId uint) ([]types.WorkspaceToolSetting, error)

	// Members
	MemberRepository

	// Tokens
	TokenRepository

	// Integrations
	IntegrationRepository

	// Workspace Tools
	WorkspaceToolRepository

	// Run execution payloads
	CreateRunExecution(ctx context.Context, task *types.RunExecution) error
	GetRunExecution(ctx context.Context, externalId string) (*types.RunExecution, error)
	GetRunExecutionByID(ctx context.Context, id uint) (*types.RunExecution, error)
	ListRunExecutions(ctx context.Context, workspaceId uint) ([]*types.RunExecution, error)
	UpdateRunExecutionStatus(ctx context.Context, externalId string, status types.RunExecutionStatus) error
	SetRunExecutionStarted(ctx context.Context, externalId string) error
	SetRunExecutionResult(ctx context.Context, externalId string, exitCode int, errorMsg string) error
	SetRunExecutionStartedForAttempt(ctx context.Context, externalId string, attemptID string) (bool, error)
	SetRunExecutionResultForAttempt(ctx context.Context, externalId string, attemptID string, exitCode int, errorMsg string) (bool, error)
	CancelRunExecution(ctx context.Context, externalId string) error
	DeleteRunExecution(ctx context.Context, externalId string) error
	MarkRunExecutionRetried(ctx context.Context, externalId string) error
	GetRetryableRunExecutions(ctx context.Context) ([]*types.RunExecution, error)
	GetStuckHookRunExecutions(ctx context.Context, timeout time.Duration) ([]*types.RunExecution, error)
	ListRunExecutionsByHook(ctx context.Context, hookId uint) ([]*types.RunExecution, error)

	// Agents
	CreateAgentProfile(ctx context.Context, profile *types.AgentProfile) error
	GetAgentProfile(ctx context.Context, workspaceId uint, agentId string) (*types.AgentProfile, error)
	GetAgentProfileByKey(ctx context.Context, workspaceId uint, agentKey string) (*types.AgentProfile, error)
	ListAgentProfiles(ctx context.Context, workspaceId uint) ([]*types.AgentProfile, error)
	UpdateAgentProfile(ctx context.Context, profile *types.AgentProfile) error
	DeleteAgentProfile(ctx context.Context, workspaceId uint, agentId string) error

	// Channel Bindings
	// When agentID is nil, operates on workspace-level bindings (agent_id IS NULL).
	// When agentID is non-nil, operates on that specific agent's bindings.
	ListChannelBindings(ctx context.Context, workspaceId uint, agentID *string) ([]*types.ChannelBinding, error)
	UpsertChannelBinding(ctx context.Context, binding *types.ChannelBinding) error
	DeleteChannelBinding(ctx context.Context, workspaceId uint, agentID *string, channelType string) error
	GetChannelBindingByAddress(ctx context.Context, channelType string, address string) (*types.ChannelBinding, error)

	// Tasks
	CreateTask(ctx context.Context, task *types.AgentTask) error
	CreateTaskWithOutbox(ctx context.Context, task *types.AgentTask, event *types.OrchestrationOutboxEvent) error
	ListTasks(ctx context.Context, workspaceId uint, limit int) ([]*types.AgentTask, error)
	ListTasksFiltered(ctx context.Context, workspaceId uint, filter types.AgentTaskListFilter) ([]*types.AgentTask, error)
	GetTaskByID(ctx context.Context, taskId string) (*types.AgentTask, error)
	GetTask(ctx context.Context, workspaceId uint, taskId string) (*types.AgentTask, error)
	GetTaskByIdempotency(ctx context.Context, workspaceId uint, agentId *string, idempotencyKey string) (*types.AgentTask, error)
	GetTaskBlocker(ctx context.Context, workspaceID uint, blockerID string) (*types.TaskBlocker, error)
	GetCurrentTaskBlocker(ctx context.Context, workspaceID uint, taskID string) (*types.TaskBlocker, error)
	ClaimQueuedTaskForDispatch(ctx context.Context, taskID string, staleAfter time.Duration) (*types.AgentTask, bool, error)
	OpenTaskBlockerIfCurrentRun(ctx context.Context, request types.TaskBlockerOpenRequest) (bool, *types.TaskBlocker, error)
	ResolveCurrentTaskBlocker(ctx context.Context, workspaceID uint, taskID string, resolution *types.TaskBlockerResolution) (*types.TaskBlocker, error)
	UpdateTaskState(ctx context.Context, update types.TaskStateUpdate) error
	UpdateTaskStateIfCurrentRun(ctx context.Context, update types.CurrentRunTaskStateUpdate) (bool, error)
	SleepTaskWithOutbox(ctx context.Context, taskID string, expectedRunID string, wakeAt time.Time, wakeReason string, wakeAgenda []*types.TaskWakeAgendaItem, outboxEvent *types.OrchestrationOutboxEvent) (bool, error)
	RequeueTaskWithOutboxIfCurrentRun(ctx context.Context, task *types.AgentTask, expectedRunID string, outboxEvent *types.OrchestrationOutboxEvent) (bool, error)
	CancelPendingOutboxEventsForTask(ctx context.Context, taskID string) error
	UpdateTask(ctx context.Context, task *types.AgentTask) error
	UpdateTaskCost(ctx context.Context, taskID string, costUSD float64) error
	ArchiveTask(ctx context.Context, taskId string) error
	ListActiveChildTaskIDs(ctx context.Context, parentTaskID string) ([]string, error)
	ListChildTaskIDsByParents(ctx context.Context, parentTaskIDs []string) (map[string]string, error)
	ListSubtasks(ctx context.Context, parentTaskID string) ([]*types.AgentTask, error)
	ListSubtasksByOutputIDs(ctx context.Context, outputIDs []string) ([]*types.AgentTask, error)
	CreateSpawnBinding(ctx context.Context, taskID, sourceOutputID, entityLabel string) error
	ListSpawnBindingsForOutputs(ctx context.Context, outputIDs []string) ([]SpawnBinding, error)

	// Task input inbox
	AppendTaskInput(ctx context.Context, input *types.TaskInput) error
	ListPendingTaskInputs(ctx context.Context, taskID string, limit int) ([]*types.TaskInput, error)
	ListOrphanedPendingInputs(ctx context.Context, maxAge time.Duration, limit int) ([]*types.TaskInput, error)
	ClaimNextTaskInput(ctx context.Context, taskID string, runID string, executionID string) (*types.TaskInput, error)
	ConsumeOldestPendingInput(ctx context.Context, taskID string) (string, error)
	AckTaskInputConsumed(ctx context.Context, inputID string) error
	ReleaseStaleTaskInputClaims(ctx context.Context, runID string) error
	CountPendingTaskInputs(ctx context.Context, taskID string) (int, error)
	CreateScheduledTask(ctx context.Context, st *types.ScheduledTask) error
	GetScheduledTask(ctx context.Context, workspaceID uint, externalID string) (*types.ScheduledTask, error)
	ListScheduledTasks(ctx context.Context, workspaceID uint) ([]*types.ScheduledTask, error)
	ListScheduledTasksByView(ctx context.Context, workspaceID uint, sourceViewID string) ([]*types.ScheduledTask, error)
	UpdateScheduledTask(ctx context.Context, st *types.ScheduledTask) error
	DeleteScheduledTask(ctx context.Context, workspaceID uint, externalID string) error
	DeleteScheduledTasksByAgent(ctx context.Context, workspaceID uint, agentID string) error
	ListDueScheduledTasks(ctx context.Context, now time.Time, limit int) ([]*types.ScheduledTask, error)
	AdvanceScheduledTask(ctx context.Context, id string, oldNextRunAt, newNextRunAt time.Time) (bool, error)
	RevertScheduledTaskAdvance(ctx context.Context, id string, currentNextRunAt, revertTo time.Time) (bool, error)

	// Orchestration outbox/inbox/retry guard
	EnqueueOrchestrationOutboxEvent(ctx context.Context, event *types.OrchestrationOutboxEvent) error
	ClaimPendingOrchestrationOutboxEvents(ctx context.Context, limit int) ([]*types.OrchestrationOutboxEvent, error)
	MarkOrchestrationOutboxEventPublished(ctx context.Context, eventID int64) error
	MarkOrchestrationOutboxEventError(ctx context.Context, eventID int64, lastError string) error
	AcquireOrchestrationResultInbox(ctx context.Context, resultKey string, streamID string) (bool, error)
	AcquireOrchestrationRetryGuard(ctx context.Context, guardKey string) (bool, error)

	// Runs
	CreateAgentRun(ctx context.Context, run *types.AgentRun) error
	GetAgentRunByID(ctx context.Context, runId string) (*types.AgentRun, error)
	GetAgentRun(ctx context.Context, workspaceId uint, runId string) (*types.AgentRun, error)
	ListAgentRuns(ctx context.Context, workspaceId uint, limit int) ([]*types.AgentRun, error)
	ListAgentRunsFiltered(ctx context.Context, workspaceId uint, filter types.AgentRunListFilter) ([]*types.AgentRun, error)
	ListActiveRunsBySession(ctx context.Context, workspaceId uint, sessionID string, excludeRunIDs []string, limit int) ([]*types.AgentRun, error)
	UpdateAgentRunLifecycle(ctx context.Context, runId string, status types.AgentRunStatus, startedAt, endedAt *time.Time, errorMsg *string) error
	SetAgentRunUsageJSON(ctx context.Context, runId string, usageJSON map[string]any) error
	SetAgentRunClaim(ctx context.Context, runId string, workerId string, heartbeatAt time.Time, expiresAt time.Time) error
	ClearAgentRunClaim(ctx context.Context, runId string) error
	ClearExpiredAgentRunClaim(ctx context.Context, runId string, workerId string, expiresAt time.Time) (bool, error)
	RefreshAgentRunClaims(ctx context.Context, workerId string, heartbeatAt time.Time, expiresAt time.Time) (int64, error)
	ListClaimedAgentRuns(ctx context.Context, limit int) ([]*types.AgentRun, error)
	ListExpiredClaimedAgentRuns(ctx context.Context, now time.Time, limit int) ([]*types.AgentRun, error)
	ListStaleUnclaimedAgentRuns(ctx context.Context, cutoff time.Time, limit int) ([]*types.AgentRun, error)
	ListOrphanedRunningTaskRunIDs(ctx context.Context, staleCutoff time.Time, limit int) ([]string, error)
	ListRunningTasksWithNoRun(ctx context.Context, staleCutoff time.Time, limit int) ([]string, error)
	IncrementAgentRunSnapshotSeq(ctx context.Context, runId string) (int64, error)

	// Run attempts
	CreateAgentRunAttempt(ctx context.Context, attempt *types.AgentRunAttempt) error
	GetAgentRunAttempt(ctx context.Context, attemptId string) (*types.AgentRunAttempt, error)
	ListAgentRunAttempts(ctx context.Context, runId string) ([]*types.AgentRunAttempt, error)
	GetRunAttemptByExecutionID(ctx context.Context, executionID string) (*types.AgentRunAttempt, error)
	UpdateAgentRunAttemptStart(ctx context.Context, attemptId string, startedAt time.Time) error
	UpdateAgentRunAttemptResult(ctx context.Context, attemptId string, status types.AgentAttemptStatus, exitCode *int, endedAt time.Time, errorMsg *string) error
	BindAttemptExecutionTask(ctx context.Context, attemptId, taskExternalID string) error

	// Run snapshots
	AppendAgentRunSnapshot(ctx context.Context, snap *types.AgentRunSnapshot) error
	ListAgentRunSnapshots(ctx context.Context, runId string, limit int) ([]*types.AgentRunSnapshot, error)

	// Execution instances
	GetOrCreateExecutionInstance(ctx context.Context, inst *types.AgentExecutionInstance) (*types.AgentExecutionInstance, error)
	GetExecutionInstanceByKey(ctx context.Context, instanceKey string) (*types.AgentExecutionInstance, error)
	UpdateExecutionInstanceState(ctx context.Context, instanceKey string, running, pending, stopping, desired int, status types.AgentExecutionInstanceStatus, lastEventAt *time.Time) error
	AdjustExecutionInstanceRunningAttempts(ctx context.Context, instanceKey string, runningDelta int, lastEventAt *time.Time) error

	// Agent stats
	GetAgentStats(ctx context.Context, workspaceId uint, agentID string) (*types.AgentStats, error)

	// Task outputs
	ListTaskOutputs(ctx context.Context, workspaceId uint, taskID string) ([]*types.TaskOutput, error)
	ListWorkspaceTaskOutputs(ctx context.Context, workspaceId uint, filter types.TaskOutputListFilter) ([]*types.TaskOutput, error)
	CreateTaskOutput(ctx context.Context, output *types.TaskOutput) error
	GetTaskOutput(ctx context.Context, workspaceId uint, outputID string) (*types.TaskOutput, error)
	AppendTaskOutputRows(ctx context.Context, workspaceId uint, outputID string, rows []byte) error
	UpdateTaskOutputSummary(ctx context.Context, workspaceId uint, outputID string, summary string) error
	UpdateTaskOutputStatus(ctx context.Context, workspaceId uint, outputID string, status string) error
	ArchiveTaskOutput(ctx context.Context, workspaceId uint, outputID string) error
	ArchiveAllTaskOutputs(ctx context.Context, workspaceId uint) (int64, error)
	DeleteTaskOutput(ctx context.Context, workspaceId uint, outputID string) error

	// Views
	CreateView(ctx context.Context, v *types.View) error
	GetView(ctx context.Context, workspaceID uint, viewID string) (*types.View, error)
	ListViews(ctx context.Context, workspaceID uint) ([]*types.View, error)
	UpdateView(ctx context.Context, v *types.View) error
	DeleteView(ctx context.Context, workspaceID uint, viewID string) error

	// Source watches (correlation-key based routing for sleeping tasks)
	UpsertTaskSourceWatches(ctx context.Context, workspaceID uint, taskID string, watches []TaskSourceWatch) error
	FindTasksByCorrelationKeys(ctx context.Context, integration string, keys []string) ([]TaskSourceWatchMatch, error)
	DeleteTaskSourceWatches(ctx context.Context, taskID string) error
	HasTaskSourceWatches(ctx context.Context, taskID string) bool
	GetTaskSourceWatches(ctx context.Context, taskID string) ([]TaskSourceWatch, error)

	// Database access
	DB() *sql.DB

	// Utilities
	Ping(ctx context.Context) error
	Close() error
	RunMigrations() error
}

BackendRepository is the main Postgres repository for persistent data. For filesystem queries and metadata, use FilesystemStore instead.

type ElasticsearchClient

type ElasticsearchClient interface {
	Index(ctx context.Context, index, docID string, body []byte) error
	Search(ctx context.Context, index string, query map[string]interface{}, size int) ([]json.RawMessage, error)
	Get(ctx context.Context, index, docID string) ([]byte, error)
	Delete(ctx context.Context, index, docID string) error
	DeleteByQuery(ctx context.Context, index string, query map[string]interface{}) error
}

ElasticsearchClient is an optional interface for Elasticsearch operations.

func NewElasticsearchClient

func NewElasticsearchClient(url string) ElasticsearchClient

NewElasticsearchClient creates a new Elasticsearch client.

type FilesystemStore

type FilesystemStore interface {

	// CreateQuery stores a new filesystem query definition.
	CreateQuery(ctx context.Context, query *types.FilesystemQuery) (*types.FilesystemQuery, error)

	// GetQuery retrieves a query by workspace and path.
	GetQuery(ctx context.Context, workspaceId uint, path string) (*types.FilesystemQuery, error)

	// GetQueryByExternalId retrieves a query by its external ID.
	GetQueryByExternalId(ctx context.Context, externalId string) (*types.FilesystemQuery, error)

	// ListQueries returns all queries under a parent path.
	ListQueries(ctx context.Context, workspaceId uint, parentPath string) ([]*types.FilesystemQuery, error)

	// ListTaskOwnedQueries returns system-managed follow-up queries owned by a task.
	ListTaskOwnedQueries(ctx context.Context, workspaceId uint, taskID string) ([]*types.FilesystemQuery, error)

	// CountQueries returns the count of queries in a workspace.
	CountQueries(ctx context.Context, workspaceId uint) (int, error)

	// UpdateQuery updates an existing query definition.
	UpdateQuery(ctx context.Context, query *types.FilesystemQuery) error

	// DeleteQuery removes a query by external ID.
	DeleteQuery(ctx context.Context, externalId string) error

	// GetQueryResults retrieves cached results for a query path.
	GetQueryResults(ctx context.Context, workspaceId uint, queryPath string) ([]QueryResult, error)

	// StoreQueryResults caches query results with the specified TTL.
	StoreQueryResults(ctx context.Context, workspaceId uint, queryPath string, results []QueryResult, ttl time.Duration) error

	// GetResultContent retrieves cached content for a specific result.
	GetResultContent(ctx context.Context, workspaceId uint, queryPath, resultID string) ([]byte, error)

	// StoreResultContent caches content for a specific result.
	StoreResultContent(ctx context.Context, workspaceId uint, queryPath, resultID string, content []byte) error

	// SearchContent performs full-text search across all materialized content.
	SearchContent(ctx context.Context, workspaceId uint, query string, limit int) ([]SearchHit, error)

	// IndexContent indexes content for full-text search.
	IndexContent(ctx context.Context, workspaceId uint, queryPath, resultID, filename string, content []byte) error

	// StatPath retrieves directory, file, and symlink metadata in a single round-trip.
	// Returns (dirMeta, fileMeta, symlinkTarget, error). At most one of the three will be non-zero.
	StatPath(ctx context.Context, path string) (*types.DirMeta, *types.FileMeta, string, error)

	// GetFileMeta retrieves file metadata.
	GetFileMeta(ctx context.Context, path string) (*types.FileMeta, error)

	// GetDirMeta retrieves directory metadata.
	GetDirMeta(ctx context.Context, path string) (*types.DirMeta, error)

	// SaveFileMeta stores file metadata.
	SaveFileMeta(ctx context.Context, meta *types.FileMeta) error

	// SaveDirMeta stores directory metadata.
	SaveDirMeta(ctx context.Context, meta *types.DirMeta) error

	// DeleteFileMeta removes file metadata.
	DeleteFileMeta(ctx context.Context, path string) error

	// DeleteDirMeta removes directory metadata.
	DeleteDirMeta(ctx context.Context, path string) error

	// ListDir returns directory entries.
	ListDir(ctx context.Context, path string) ([]types.DirEntry, error)

	// SaveDirListing stores a directory listing.
	SaveDirListing(ctx context.Context, path string, entries []types.DirEntry) error

	// GetSymlink retrieves a symlink target.
	GetSymlink(ctx context.Context, path string) (string, error)

	// SaveSymlink stores a symlink.
	SaveSymlink(ctx context.Context, path, target string) error

	// DeleteSymlink removes a symlink.
	DeleteSymlink(ctx context.Context, path string) error

	// GetWatchedSourceQueries returns stale source queries that have active hooks watching their path.
	GetWatchedSourceQueries(ctx context.Context, staleAfter time.Duration, limit int) ([]*types.FilesystemQuery, error)

	// UpdateQueryBaseline sets the baseline_item_ids for a query.
	UpdateQueryBaseline(ctx context.Context, queryID uint, ids []string) error

	// DeactivateHookAndUpdateBaseline atomically deactivates a hook and updates the query baseline.
	DeactivateHookAndUpdateBaseline(ctx context.Context, hookID uint, queryID uint, ids []string) error

	// GetActiveFollowupHook finds the active task_input hook for a followup query path.
	GetActiveFollowupHook(ctx context.Context, workspaceID uint, queryPath string) (*types.Hook, error)

	// CreateHook stores a new hook definition.
	CreateHook(ctx context.Context, hook *types.Hook) (*types.Hook, error)

	// GetHook retrieves a hook by external ID.
	GetHook(ctx context.Context, externalId string) (*types.Hook, error)

	// GetHookById retrieves a hook by internal ID.
	GetHookById(ctx context.Context, id uint) (*types.Hook, error)

	// ListHooks returns all hooks for a workspace.
	ListHooks(ctx context.Context, workspaceId uint) ([]*types.Hook, error)

	// UpdateHook updates an existing hook.
	UpdateHook(ctx context.Context, hook *types.Hook) error

	// DeleteHook removes a hook by external ID.
	DeleteHook(ctx context.Context, externalId string) error

	// InvalidatePath removes a specific path from all caches.
	InvalidatePath(ctx context.Context, path string) error

	// InvalidatePrefix removes all paths with a given prefix.
	InvalidatePrefix(ctx context.Context, prefix string) error

	// InvalidateQuery removes all cached data for a query (results + content).
	InvalidateQuery(ctx context.Context, workspaceId uint, queryPath string) error
}

FilesystemStore is the unified interface for all filesystem operations. It consolidates query definitions (Postgres/memory), query results (Elasticsearch/memory), and filesystem metadata (Redis/memory) into a single interface.

func NewFilesystemStore

func NewFilesystemStore(db *sql.DB, redis *common.RedisClient, elastic ElasticsearchClient) FilesystemStore

NewFilesystemStore creates a unified filesystem store. Pass nil for db/redis to use memory-only mode.

func NewFilesystemStoreWithTTL

func NewFilesystemStoreWithTTL(db *sql.DB, redis *common.RedisClient, elastic ElasticsearchClient, ttl time.Duration) FilesystemStore

NewFilesystemStoreWithTTL creates a store with custom cache TTL.

func NewMemoryFilesystemStore

func NewMemoryFilesystemStore() FilesystemStore

NewMemoryFilesystemStore creates a memory-only filesystem store (for local mode).

type IntegrationRepository

type IntegrationRepository interface {
	SaveConnection(ctx context.Context, workspaceId uint, memberId *uint, integrationType string, creds *types.IntegrationCredentials, scope string) (*types.IntegrationConnection, error)
	GetConnection(ctx context.Context, workspaceId uint, memberId uint, integrationType string) (*types.IntegrationConnection, error)
	GetConnectionByExternalId(ctx context.Context, externalId string) (*types.IntegrationConnection, error)
	ListConnections(ctx context.Context, workspaceId uint) ([]types.IntegrationConnection, error)
	DeleteConnection(ctx context.Context, externalId string) error
}

IntegrationRepository manages integration connections

type MemberRepository

type MemberRepository interface {
	CreateMember(ctx context.Context, workspaceId uint, email, name string, role types.MemberRole) (*types.WorkspaceMember, error)
	GetMember(ctx context.Context, externalId string) (*types.WorkspaceMember, error)
	GetMemberByEmail(ctx context.Context, workspaceId uint, email string) (*types.WorkspaceMember, error)
	ListMembers(ctx context.Context, workspaceId uint) ([]types.WorkspaceMember, error)
	UpdateMember(ctx context.Context, externalId string, name string, role types.MemberRole) (*types.WorkspaceMember, error)
	DeleteMember(ctx context.Context, externalId string) error
}

MemberRepository manages workspace members

type OrchestrationStore added in v0.1.60

type OrchestrationStore struct {
	// contains filtered or unexported fields
}

OrchestrationStore centralizes Redis-backed orchestration primitives: stream dispatch/result channels, instance locks, and run events.

func NewOrchestrationStore added in v0.1.60

func NewOrchestrationStore(backend BackendRepository, redis *common.RedisClient) *OrchestrationStore

func (*OrchestrationStore) AckRunResults added in v0.1.75

func (s *OrchestrationStore) AckRunResults(ctx context.Context, messageIDs ...string) error

func (*OrchestrationStore) AckTaskDispatch added in v0.1.75

func (s *OrchestrationStore) AckTaskDispatch(ctx context.Context, messageIDs ...string) error

func (*OrchestrationStore) ClaimPendingRunResults added in v0.1.75

func (s *OrchestrationStore) ClaimPendingRunResults(
	ctx context.Context,
	consumer string,
	minIdle time.Duration,
	count int64,
) ([]redislib.XMessage, error)

func (*OrchestrationStore) ClaimPendingTaskDispatch added in v0.1.75

func (s *OrchestrationStore) ClaimPendingTaskDispatch(
	ctx context.Context,
	consumer string,
	minIdle time.Duration,
	count int64,
) ([]redislib.XMessage, error)

func (*OrchestrationStore) EnsureRunResultGroup added in v0.1.75

func (s *OrchestrationStore) EnsureRunResultGroup(ctx context.Context) error

func (*OrchestrationStore) EnsureTaskDispatchGroup added in v0.1.75

func (s *OrchestrationStore) EnsureTaskDispatchGroup(ctx context.Context) error

func (*OrchestrationStore) ListRunEvents added in v0.1.60

func (s *OrchestrationStore) ListRunEvents(ctx context.Context, runID string) ([]string, error)

func (*OrchestrationStore) PublishRunEvent added in v0.1.60

func (s *OrchestrationStore) PublishRunEvent(ctx context.Context, runID string, body []byte) error

func (*OrchestrationStore) PublishRunResult added in v0.1.75

func (s *OrchestrationStore) PublishRunResult(ctx context.Context, values map[string]any) (string, error)

func (*OrchestrationStore) PublishRunResultDLQ added in v0.1.75

func (s *OrchestrationStore) PublishRunResultDLQ(ctx context.Context, values map[string]any) (string, error)

func (*OrchestrationStore) PublishTaskDispatch added in v0.1.75

func (s *OrchestrationStore) PublishTaskDispatch(ctx context.Context, values map[string]any) (string, error)

func (*OrchestrationStore) PublishTaskDispatchDLQ added in v0.1.75

func (s *OrchestrationStore) PublishTaskDispatchDLQ(ctx context.Context, values map[string]any) (string, error)

func (*OrchestrationStore) PublishTaskLive added in v0.1.98

func (s *OrchestrationStore) PublishTaskLive(ctx context.Context, taskID string) error

func (*OrchestrationStore) PublishWorkspaceLive added in v0.1.98

func (s *OrchestrationStore) PublishWorkspaceLive(ctx context.Context, workspaceID uint) error

func (*OrchestrationStore) ReadRunResults added in v0.1.75

func (s *OrchestrationStore) ReadRunResults(
	ctx context.Context,
	consumer string,
	block time.Duration,
	count int64,
) ([]redislib.XMessage, error)

func (*OrchestrationStore) ReadTaskDispatch added in v0.1.75

func (s *OrchestrationStore) ReadTaskDispatch(
	ctx context.Context,
	consumer string,
	block time.Duration,
	count int64,
) ([]redislib.XMessage, error)

func (*OrchestrationStore) SubscribeRunEvents added in v0.1.98

func (s *OrchestrationStore) SubscribeRunEvents(ctx context.Context, runID string) (<-chan struct{}, func(), error)

func (*OrchestrationStore) SubscribeTaskLive added in v0.1.98

func (s *OrchestrationStore) SubscribeTaskLive(ctx context.Context, taskID string) (<-chan struct{}, func(), error)

func (*OrchestrationStore) SubscribeWorkspaceLive added in v0.1.98

func (s *OrchestrationStore) SubscribeWorkspaceLive(ctx context.Context, workspaceID uint) (<-chan struct{}, func(), error)

func (*OrchestrationStore) UpdateTaskState added in v0.1.60

func (s *OrchestrationStore) UpdateTaskState(ctx context.Context, update types.TaskStateUpdate) error

func (*OrchestrationStore) WithInstanceLock added in v0.1.60

func (s *OrchestrationStore) WithInstanceLock(ctx context.Context, lockKey string, fn func() error) error

type PostgresBackend

type PostgresBackend struct {
	// contains filtered or unexported fields
}

PostgresBackend implements BackendRepository using Postgres

func NewPostgresBackend

func NewPostgresBackend(cfg types.PostgresConfig) (*PostgresBackend, error)

NewPostgresBackend creates a new Postgres backend

func (*PostgresBackend) AckTaskInputConsumed added in v0.1.98

func (b *PostgresBackend) AckTaskInputConsumed(ctx context.Context, inputID string) error

func (*PostgresBackend) AcquireOrchestrationResultInbox added in v0.1.75

func (b *PostgresBackend) AcquireOrchestrationResultInbox(
	ctx context.Context,
	resultKey string,
	streamID string,
) (bool, error)

func (*PostgresBackend) AcquireOrchestrationRetryGuard added in v0.1.75

func (b *PostgresBackend) AcquireOrchestrationRetryGuard(
	ctx context.Context,
	guardKey string,
) (bool, error)

func (*PostgresBackend) AdjustExecutionInstanceRunningAttempts added in v0.1.60

func (b *PostgresBackend) AdjustExecutionInstanceRunningAttempts(
	ctx context.Context,
	instanceKey string,
	runningDelta int,
	lastEventAt *time.Time,
) error

func (*PostgresBackend) AdvanceScheduledTask added in v0.1.89

func (b *PostgresBackend) AdvanceScheduledTask(ctx context.Context, id string, oldNextRunAt, newNextRunAt time.Time) (bool, error)

func (*PostgresBackend) AppendAgentRunSnapshot added in v0.1.60

func (b *PostgresBackend) AppendAgentRunSnapshot(ctx context.Context, snap *types.AgentRunSnapshot) error

func (*PostgresBackend) AppendTaskInput added in v0.1.98

func (b *PostgresBackend) AppendTaskInput(ctx context.Context, input *types.TaskInput) error

func (*PostgresBackend) AppendTaskOutputRows added in v0.1.96

func (b *PostgresBackend) AppendTaskOutputRows(ctx context.Context, workspaceId uint, outputID string, rowsJSON []byte) error

func (*PostgresBackend) ArchiveAllTaskOutputs added in v0.1.96

func (b *PostgresBackend) ArchiveAllTaskOutputs(ctx context.Context, workspaceId uint) (int64, error)

func (*PostgresBackend) ArchiveTask added in v0.1.71

func (b *PostgresBackend) ArchiveTask(ctx context.Context, taskID string) error

func (*PostgresBackend) ArchiveTaskOutput added in v0.1.96

func (b *PostgresBackend) ArchiveTaskOutput(ctx context.Context, workspaceId uint, outputID string) error

func (*PostgresBackend) AuthorizeToken

func (r *PostgresBackend) AuthorizeToken(ctx context.Context, rawToken string) (*types.AuthInfo, error)

AuthorizeToken validates a token and returns AuthInfo for the auth system

func (*PostgresBackend) BindAttemptExecutionTask added in v0.1.60

func (b *PostgresBackend) BindAttemptExecutionTask(ctx context.Context, attemptId, taskExternalID string) error

func (*PostgresBackend) CancelPendingOutboxEventsForTask added in v0.1.96

func (b *PostgresBackend) CancelPendingOutboxEventsForTask(ctx context.Context, taskID string) error

func (*PostgresBackend) CancelRunExecution added in v0.1.60

func (b *PostgresBackend) CancelRunExecution(ctx context.Context, externalId string) error

func (*PostgresBackend) ClaimNextTaskInput added in v0.1.98

func (b *PostgresBackend) ClaimNextTaskInput(ctx context.Context, taskID string, runID string, executionID string) (*types.TaskInput, error)

func (*PostgresBackend) ClaimPendingOrchestrationOutboxEvents added in v0.1.75

func (b *PostgresBackend) ClaimPendingOrchestrationOutboxEvents(
	ctx context.Context,
	limit int,
) ([]*types.OrchestrationOutboxEvent, error)

func (*PostgresBackend) ClaimQueuedTaskForDispatch added in v0.1.75

func (b *PostgresBackend) ClaimQueuedTaskForDispatch(
	ctx context.Context,
	taskID string,
	staleAfter time.Duration,
) (*types.AgentTask, bool, error)

func (*PostgresBackend) ClearAgentRunClaim added in v0.1.66

func (b *PostgresBackend) ClearAgentRunClaim(ctx context.Context, runId string) error

func (*PostgresBackend) ClearExpiredAgentRunClaim added in v0.1.66

func (b *PostgresBackend) ClearExpiredAgentRunClaim(ctx context.Context, runId string, workerId string, expiresAt time.Time) (bool, error)

func (*PostgresBackend) Close

func (b *PostgresBackend) Close() error

Close closes the database connection

func (*PostgresBackend) ConsumeOldestPendingInput added in v0.1.98

func (b *PostgresBackend) ConsumeOldestPendingInput(ctx context.Context, taskID string) (string, error)

func (*PostgresBackend) CountPendingTaskInputs added in v0.1.98

func (b *PostgresBackend) CountPendingTaskInputs(ctx context.Context, taskID string) (int, error)

func (*PostgresBackend) CreateAgentProfile added in v0.1.60

func (b *PostgresBackend) CreateAgentProfile(ctx context.Context, profile *types.AgentProfile) error

func (*PostgresBackend) CreateAgentRun added in v0.1.60

func (b *PostgresBackend) CreateAgentRun(ctx context.Context, run *types.AgentRun) error

func (*PostgresBackend) CreateAgentRunAttempt added in v0.1.60

func (b *PostgresBackend) CreateAgentRunAttempt(ctx context.Context, attempt *types.AgentRunAttempt) error

func (*PostgresBackend) CreateMember

func (r *PostgresBackend) CreateMember(ctx context.Context, workspaceId uint, email, name string, role types.MemberRole) (*types.WorkspaceMember, error)

func (*PostgresBackend) CreateOrgToken added in v0.1.39

func (r *PostgresBackend) CreateOrgToken(ctx context.Context, name string, tenantId string, expiresAt *time.Time) (*types.Token, string, error)

CreateOrgToken creates a tenant-scoped organization token (not tied to a workspace).

func (*PostgresBackend) CreateRunExecution added in v0.1.60

func (b *PostgresBackend) CreateRunExecution(ctx context.Context, task *types.RunExecution) error

func (*PostgresBackend) CreateScheduledTask added in v0.1.89

func (b *PostgresBackend) CreateScheduledTask(ctx context.Context, st *types.ScheduledTask) error

func (*PostgresBackend) CreateSpawnBinding added in v0.1.110

func (b *PostgresBackend) CreateSpawnBinding(ctx context.Context, taskID, sourceOutputID, entityLabel string) error

func (*PostgresBackend) CreateTask

func (b *PostgresBackend) CreateTask(ctx context.Context, task *types.AgentTask) error

func (*PostgresBackend) CreateTaskOutput added in v0.1.96

func (b *PostgresBackend) CreateTaskOutput(ctx context.Context, output *types.TaskOutput) error

func (*PostgresBackend) CreateTaskWithOutbox added in v0.1.75

func (b *PostgresBackend) CreateTaskWithOutbox(
	ctx context.Context,
	task *types.AgentTask,
	event *types.OrchestrationOutboxEvent,
) error

func (*PostgresBackend) CreateToken

func (r *PostgresBackend) CreateToken(ctx context.Context, workspaceId, memberId uint, name string, expiresAt *time.Time, tokenType types.TokenType) (*types.Token, string, error)

func (*PostgresBackend) CreateView added in v0.1.103

func (b *PostgresBackend) CreateView(ctx context.Context, v *types.View) error

func (*PostgresBackend) CreateWorkerToken

func (r *PostgresBackend) CreateWorkerToken(ctx context.Context, name string, poolName *string, expiresAt *time.Time) (*types.Token, string, error)

CreateWorkerToken creates a cluster-level worker token (not tied to a workspace)

func (*PostgresBackend) CreateWorkspace

func (b *PostgresBackend) CreateWorkspace(ctx context.Context, name string, tenantId *string) (*types.Workspace, error)

CreateWorkspace creates a new workspace with an optional tenant_id for org scoping.

func (*PostgresBackend) CreateWorkspaceServiceToken added in v0.1.32

func (r *PostgresBackend) CreateWorkspaceServiceToken(ctx context.Context, workspaceId uint, name string) (*types.Token, string, error)

CreateWorkspaceServiceToken creates a workspace-scoped service token (no member). Used for hooks and other automated workspace operations.

func (*PostgresBackend) CreateWorkspaceTool

func (b *PostgresBackend) CreateWorkspaceTool(ctx context.Context, workspaceId uint, createdByMemberId *uint, name string, providerType types.WorkspaceToolProviderType, config json.RawMessage, manifest json.RawMessage) (*types.WorkspaceTool, error)

CreateWorkspaceTool creates a new workspace tool provider

func (*PostgresBackend) DB

func (b *PostgresBackend) DB() *sql.DB

DB returns the underlying database connection

func (*PostgresBackend) DeleteAgentProfile added in v0.1.73

func (b *PostgresBackend) DeleteAgentProfile(ctx context.Context, workspaceId uint, agentId string) error

func (*PostgresBackend) DeleteChannelBinding added in v0.1.96

func (b *PostgresBackend) DeleteChannelBinding(ctx context.Context, workspaceId uint, agentID *string, channelType string) error

func (*PostgresBackend) DeleteConnection

func (r *PostgresBackend) DeleteConnection(ctx context.Context, externalId string) error

func (*PostgresBackend) DeleteMember

func (r *PostgresBackend) DeleteMember(ctx context.Context, externalId string) error

func (*PostgresBackend) DeleteRunExecution added in v0.1.60

func (b *PostgresBackend) DeleteRunExecution(ctx context.Context, externalId string) error

func (*PostgresBackend) DeleteScheduledTask added in v0.1.89

func (b *PostgresBackend) DeleteScheduledTask(ctx context.Context, workspaceID uint, externalID string) error

func (*PostgresBackend) DeleteScheduledTasksByAgent added in v0.1.98

func (b *PostgresBackend) DeleteScheduledTasksByAgent(ctx context.Context, workspaceID uint, agentID string) error

func (*PostgresBackend) DeleteTaskOutput added in v0.1.96

func (b *PostgresBackend) DeleteTaskOutput(ctx context.Context, workspaceId uint, outputID string) error

func (*PostgresBackend) DeleteTaskSourceWatches added in v0.1.115

func (b *PostgresBackend) DeleteTaskSourceWatches(ctx context.Context, taskID string) error

func (*PostgresBackend) DeleteView added in v0.1.103

func (b *PostgresBackend) DeleteView(ctx context.Context, workspaceID uint, viewID string) error

func (*PostgresBackend) DeleteWorkspace

func (b *PostgresBackend) DeleteWorkspace(ctx context.Context, id uint) error

DeleteWorkspace removes a workspace by internal ID

func (*PostgresBackend) DeleteWorkspaceTool

func (b *PostgresBackend) DeleteWorkspaceTool(ctx context.Context, id uint) error

DeleteWorkspaceTool removes a workspace tool by internal ID

func (*PostgresBackend) DeleteWorkspaceToolByName

func (b *PostgresBackend) DeleteWorkspaceToolByName(ctx context.Context, workspaceId uint, name string) error

DeleteWorkspaceToolByName removes a workspace tool by name within a workspace

func (*PostgresBackend) EnqueueOrchestrationOutboxEvent added in v0.1.75

func (b *PostgresBackend) EnqueueOrchestrationOutboxEvent(
	ctx context.Context,
	event *types.OrchestrationOutboxEvent,
) error

func (*PostgresBackend) EnsureWorkspaceServiceToken added in v0.1.32

func (r *PostgresBackend) EnsureWorkspaceServiceToken(ctx context.Context, workspaceId uint) (*types.Token, string, error)

EnsureWorkspaceServiceToken returns an existing workspace service token or creates one. Returns the token record and the raw token string. Note: if a service token already exists, we create a new one because bcrypt hashes are one-way — we can't recover the raw token from an existing record.

func (*PostgresBackend) FindTasksByCorrelationKeys added in v0.1.115

func (b *PostgresBackend) FindTasksByCorrelationKeys(ctx context.Context, integration string, keys []string) ([]TaskSourceWatchMatch, error)

func (*PostgresBackend) GetAgentProfile added in v0.1.60

func (b *PostgresBackend) GetAgentProfile(ctx context.Context, workspaceId uint, agentId string) (*types.AgentProfile, error)

func (*PostgresBackend) GetAgentProfileByKey added in v0.1.60

func (b *PostgresBackend) GetAgentProfileByKey(ctx context.Context, workspaceId uint, agentKey string) (*types.AgentProfile, error)

func (*PostgresBackend) GetAgentRun added in v0.1.60

func (b *PostgresBackend) GetAgentRun(ctx context.Context, workspaceId uint, runId string) (*types.AgentRun, error)

func (*PostgresBackend) GetAgentRunAttempt added in v0.1.60

func (b *PostgresBackend) GetAgentRunAttempt(ctx context.Context, attemptId string) (*types.AgentRunAttempt, error)

func (*PostgresBackend) GetAgentRunByID added in v0.1.60

func (b *PostgresBackend) GetAgentRunByID(ctx context.Context, runId string) (*types.AgentRun, error)

func (*PostgresBackend) GetAgentStats added in v0.1.96

func (b *PostgresBackend) GetAgentStats(ctx context.Context, workspaceId uint, agentID string) (*types.AgentStats, error)

func (*PostgresBackend) GetChannelBindingByAddress added in v0.1.96

func (b *PostgresBackend) GetChannelBindingByAddress(ctx context.Context, channelType string, address string) (*types.ChannelBinding, error)

func (*PostgresBackend) GetConnection

func (r *PostgresBackend) GetConnection(ctx context.Context, workspaceId uint, memberId uint, integrationType string) (*types.IntegrationConnection, error)

func (*PostgresBackend) GetConnectionByExternalId

func (r *PostgresBackend) GetConnectionByExternalId(ctx context.Context, externalId string) (*types.IntegrationConnection, error)

func (*PostgresBackend) GetCurrentTaskBlocker added in v0.1.110

func (b *PostgresBackend) GetCurrentTaskBlocker(ctx context.Context, workspaceID uint, taskID string) (*types.TaskBlocker, error)

func (*PostgresBackend) GetExecutionInstanceByKey added in v0.1.60

func (b *PostgresBackend) GetExecutionInstanceByKey(ctx context.Context, instanceKey string) (*types.AgentExecutionInstance, error)

func (*PostgresBackend) GetMember

func (r *PostgresBackend) GetMember(ctx context.Context, externalId string) (*types.WorkspaceMember, error)

func (*PostgresBackend) GetMemberByEmail

func (r *PostgresBackend) GetMemberByEmail(ctx context.Context, workspaceId uint, email string) (*types.WorkspaceMember, error)

func (*PostgresBackend) GetOrCreateExecutionInstance added in v0.1.60

func (b *PostgresBackend) GetOrCreateExecutionInstance(ctx context.Context, inst *types.AgentExecutionInstance) (*types.AgentExecutionInstance, error)

func (*PostgresBackend) GetRetryableRunExecutions added in v0.1.60

func (b *PostgresBackend) GetRetryableRunExecutions(ctx context.Context) ([]*types.RunExecution, error)

func (*PostgresBackend) GetRunAttemptByExecutionID added in v0.1.60

func (b *PostgresBackend) GetRunAttemptByExecutionID(ctx context.Context, executionID string) (*types.AgentRunAttempt, error)

func (*PostgresBackend) GetRunExecution added in v0.1.60

func (b *PostgresBackend) GetRunExecution(ctx context.Context, externalId string) (*types.RunExecution, error)

func (*PostgresBackend) GetRunExecutionByID added in v0.1.60

func (b *PostgresBackend) GetRunExecutionByID(_ context.Context, id uint) (*types.RunExecution, error)

func (*PostgresBackend) GetScheduledTask added in v0.1.89

func (b *PostgresBackend) GetScheduledTask(ctx context.Context, workspaceID uint, externalID string) (*types.ScheduledTask, error)

func (*PostgresBackend) GetStuckHookRunExecutions added in v0.1.60

func (b *PostgresBackend) GetStuckHookRunExecutions(ctx context.Context, timeout time.Duration) ([]*types.RunExecution, error)

func (*PostgresBackend) GetTask

func (b *PostgresBackend) GetTask(ctx context.Context, workspaceId uint, taskID string) (*types.AgentTask, error)

func (*PostgresBackend) GetTaskBlocker added in v0.1.110

func (b *PostgresBackend) GetTaskBlocker(ctx context.Context, workspaceID uint, blockerID string) (*types.TaskBlocker, error)

func (*PostgresBackend) GetTaskByID added in v0.1.60

func (b *PostgresBackend) GetTaskByID(ctx context.Context, taskID string) (*types.AgentTask, error)

func (*PostgresBackend) GetTaskByIdempotency added in v0.1.60

func (b *PostgresBackend) GetTaskByIdempotency(ctx context.Context, workspaceId uint, agentId *string, idempotencyKey string) (*types.AgentTask, error)

func (*PostgresBackend) GetTaskOutput added in v0.1.96

func (b *PostgresBackend) GetTaskOutput(ctx context.Context, workspaceId uint, outputID string) (*types.TaskOutput, error)

func (*PostgresBackend) GetTaskSourceWatches added in v0.1.142

func (b *PostgresBackend) GetTaskSourceWatches(ctx context.Context, taskID string) ([]TaskSourceWatch, error)

func (*PostgresBackend) GetToken

func (r *PostgresBackend) GetToken(ctx context.Context, externalId string) (*types.Token, error)

func (*PostgresBackend) GetView added in v0.1.103

func (b *PostgresBackend) GetView(ctx context.Context, workspaceID uint, viewID string) (*types.View, error)

func (*PostgresBackend) GetWorkspace

func (b *PostgresBackend) GetWorkspace(ctx context.Context, id uint) (*types.Workspace, error)

GetWorkspace retrieves a workspace by internal ID

func (*PostgresBackend) GetWorkspaceByExternalId

func (b *PostgresBackend) GetWorkspaceByExternalId(ctx context.Context, externalId string) (*types.Workspace, error)

GetWorkspaceByExternalId retrieves a workspace by external UUID

func (*PostgresBackend) GetWorkspaceByName

func (b *PostgresBackend) GetWorkspaceByName(ctx context.Context, name string) (*types.Workspace, error)

GetWorkspaceByName retrieves a workspace by name

func (*PostgresBackend) GetWorkspaceTool

func (b *PostgresBackend) GetWorkspaceTool(ctx context.Context, id uint) (*types.WorkspaceTool, error)

GetWorkspaceTool retrieves a workspace tool by internal ID

func (*PostgresBackend) GetWorkspaceToolByExternalId

func (b *PostgresBackend) GetWorkspaceToolByExternalId(ctx context.Context, externalId string) (*types.WorkspaceTool, error)

GetWorkspaceToolByExternalId retrieves a workspace tool by external UUID

func (*PostgresBackend) GetWorkspaceToolByName

func (b *PostgresBackend) GetWorkspaceToolByName(ctx context.Context, workspaceId uint, name string) (*types.WorkspaceTool, error)

GetWorkspaceToolByName retrieves a workspace tool by name within a workspace

func (*PostgresBackend) GetWorkspaceToolSetting

func (b *PostgresBackend) GetWorkspaceToolSetting(ctx context.Context, workspaceId uint, toolName string) (*types.WorkspaceToolSetting, error)

GetWorkspaceToolSetting returns the setting for a specific tool in a workspace Returns nil if no setting exists (tool is enabled by default)

func (*PostgresBackend) GetWorkspaceToolSettings

func (b *PostgresBackend) GetWorkspaceToolSettings(ctx context.Context, workspaceId uint) (*types.WorkspaceToolSettings, error)

GetWorkspaceToolSettings returns all tool settings for a workspace as a lookup structure Tools not in the database are considered enabled by default

func (*PostgresBackend) HasTaskSourceWatches added in v0.1.142

func (b *PostgresBackend) HasTaskSourceWatches(ctx context.Context, taskID string) bool

func (*PostgresBackend) IncrementAgentRunSnapshotSeq added in v0.1.60

func (b *PostgresBackend) IncrementAgentRunSnapshotSeq(ctx context.Context, runId string) (int64, error)

func (*PostgresBackend) ListActiveChildTaskIDs added in v0.1.110

func (b *PostgresBackend) ListActiveChildTaskIDs(ctx context.Context, parentTaskID string) ([]string, error)

func (*PostgresBackend) ListActiveRunsBySession added in v0.1.73

func (b *PostgresBackend) ListActiveRunsBySession(
	ctx context.Context,
	workspaceId uint,
	sessionID string,
	excludeRunIDs []string,
	limit int,
) ([]*types.AgentRun, error)

func (*PostgresBackend) ListAgentProfiles added in v0.1.60

func (b *PostgresBackend) ListAgentProfiles(ctx context.Context, workspaceId uint) ([]*types.AgentProfile, error)

func (*PostgresBackend) ListAgentRunAttempts added in v0.1.60

func (b *PostgresBackend) ListAgentRunAttempts(ctx context.Context, runId string) ([]*types.AgentRunAttempt, error)

func (*PostgresBackend) ListAgentRunSnapshots added in v0.1.60

func (b *PostgresBackend) ListAgentRunSnapshots(ctx context.Context, runId string, limit int) ([]*types.AgentRunSnapshot, error)

func (*PostgresBackend) ListAgentRuns added in v0.1.60

func (b *PostgresBackend) ListAgentRuns(ctx context.Context, workspaceId uint, limit int) ([]*types.AgentRun, error)

func (*PostgresBackend) ListAgentRunsFiltered added in v0.1.66

func (b *PostgresBackend) ListAgentRunsFiltered(ctx context.Context, workspaceId uint, filter types.AgentRunListFilter) ([]*types.AgentRun, error)

func (*PostgresBackend) ListChannelBindings added in v0.1.96

func (b *PostgresBackend) ListChannelBindings(ctx context.Context, workspaceId uint, agentID *string) ([]*types.ChannelBinding, error)

func (*PostgresBackend) ListChildTaskIDsByParents added in v0.1.125

func (b *PostgresBackend) ListChildTaskIDsByParents(ctx context.Context, parentTaskIDs []string) (map[string]string, error)

func (*PostgresBackend) ListClaimedAgentRuns added in v0.1.71

func (b *PostgresBackend) ListClaimedAgentRuns(ctx context.Context, limit int) ([]*types.AgentRun, error)

func (*PostgresBackend) ListConnections

func (r *PostgresBackend) ListConnections(ctx context.Context, workspaceId uint) ([]types.IntegrationConnection, error)

func (*PostgresBackend) ListDueScheduledTasks added in v0.1.92

func (b *PostgresBackend) ListDueScheduledTasks(ctx context.Context, now time.Time, limit int) ([]*types.ScheduledTask, error)

func (*PostgresBackend) ListExpiredClaimedAgentRuns added in v0.1.66

func (b *PostgresBackend) ListExpiredClaimedAgentRuns(ctx context.Context, now time.Time, limit int) ([]*types.AgentRun, error)

func (*PostgresBackend) ListMembers

func (r *PostgresBackend) ListMembers(ctx context.Context, workspaceId uint) ([]types.WorkspaceMember, error)

func (*PostgresBackend) ListOrgTokens added in v0.1.39

func (r *PostgresBackend) ListOrgTokens(ctx context.Context, tenantId string) ([]types.Token, error)

ListOrgTokens returns all organization tokens, optionally filtered by tenant_id.

func (*PostgresBackend) ListOrphanedPendingInputs added in v0.1.98

func (b *PostgresBackend) ListOrphanedPendingInputs(ctx context.Context, maxAge time.Duration, limit int) ([]*types.TaskInput, error)

func (*PostgresBackend) ListOrphanedRunningTaskRunIDs added in v0.1.131

func (b *PostgresBackend) ListOrphanedRunningTaskRunIDs(ctx context.Context, staleCutoff time.Time, limit int) ([]string, error)

ListOrphanedRunningTaskRunIDs finds tasks stuck in "running" whose target run is no longer active (terminal or unclaimed). Returns the target_run_id values so the recovery loop can settle them.

func (*PostgresBackend) ListPendingTaskInputs added in v0.1.98

func (b *PostgresBackend) ListPendingTaskInputs(ctx context.Context, taskID string, limit int) ([]*types.TaskInput, error)

func (*PostgresBackend) ListRunExecutions added in v0.1.60

func (b *PostgresBackend) ListRunExecutions(ctx context.Context, workspaceId uint) ([]*types.RunExecution, error)

func (*PostgresBackend) ListRunExecutionsByHook added in v0.1.60

func (b *PostgresBackend) ListRunExecutionsByHook(ctx context.Context, hookId uint) ([]*types.RunExecution, error)

func (*PostgresBackend) ListRunningTasksWithNoRun added in v0.1.131

func (b *PostgresBackend) ListRunningTasksWithNoRun(ctx context.Context, staleCutoff time.Time, limit int) ([]string, error)

func (*PostgresBackend) ListScheduledTasks added in v0.1.89

func (b *PostgresBackend) ListScheduledTasks(ctx context.Context, workspaceID uint) ([]*types.ScheduledTask, error)

func (*PostgresBackend) ListScheduledTasksByView added in v0.1.112

func (b *PostgresBackend) ListScheduledTasksByView(ctx context.Context, workspaceID uint, sourceViewID string) ([]*types.ScheduledTask, error)

func (*PostgresBackend) ListSpawnBindingsForOutputs added in v0.1.110

func (b *PostgresBackend) ListSpawnBindingsForOutputs(ctx context.Context, outputIDs []string) ([]SpawnBinding, error)

func (*PostgresBackend) ListStaleUnclaimedAgentRuns added in v0.1.66

func (b *PostgresBackend) ListStaleUnclaimedAgentRuns(ctx context.Context, cutoff time.Time, limit int) ([]*types.AgentRun, error)

func (*PostgresBackend) ListSubtasks added in v0.1.110

func (b *PostgresBackend) ListSubtasks(ctx context.Context, parentTaskID string) ([]*types.AgentTask, error)

func (*PostgresBackend) ListSubtasksByOutputIDs added in v0.1.110

func (b *PostgresBackend) ListSubtasksByOutputIDs(ctx context.Context, outputIDs []string) ([]*types.AgentTask, error)

func (*PostgresBackend) ListTaskOutputs added in v0.1.96

func (b *PostgresBackend) ListTaskOutputs(ctx context.Context, workspaceId uint, taskID string) ([]*types.TaskOutput, error)

func (*PostgresBackend) ListTasks

func (b *PostgresBackend) ListTasks(ctx context.Context, workspaceId uint, limit int) ([]*types.AgentTask, error)

func (*PostgresBackend) ListTasksFiltered added in v0.1.66

func (b *PostgresBackend) ListTasksFiltered(ctx context.Context, workspaceId uint, filter types.AgentTaskListFilter) ([]*types.AgentTask, error)

func (*PostgresBackend) ListTokens

func (r *PostgresBackend) ListTokens(ctx context.Context, workspaceId uint) ([]types.Token, error)

func (*PostgresBackend) ListViews added in v0.1.103

func (b *PostgresBackend) ListViews(ctx context.Context, workspaceID uint) ([]*types.View, error)

func (*PostgresBackend) ListWorkerTokens

func (r *PostgresBackend) ListWorkerTokens(ctx context.Context) ([]types.Token, error)

ListWorkerTokens returns all worker tokens (cluster-level, not workspace-scoped)

func (*PostgresBackend) ListWorkspaceTaskOutputs added in v0.1.96

func (b *PostgresBackend) ListWorkspaceTaskOutputs(
	ctx context.Context,
	workspaceId uint,
	filter types.TaskOutputListFilter,
) ([]*types.TaskOutput, error)

func (*PostgresBackend) ListWorkspaceToolSettings

func (b *PostgresBackend) ListWorkspaceToolSettings(ctx context.Context, workspaceId uint) ([]types.WorkspaceToolSetting, error)

ListWorkspaceToolSettings returns all tool settings for a workspace

func (*PostgresBackend) ListWorkspaceTools

func (b *PostgresBackend) ListWorkspaceTools(ctx context.Context, workspaceId uint) ([]*types.WorkspaceTool, error)

ListWorkspaceTools returns all tools for a workspace

func (*PostgresBackend) ListWorkspaces

func (b *PostgresBackend) ListWorkspaces(ctx context.Context) ([]*types.Workspace, error)

ListWorkspaces returns all workspaces

func (*PostgresBackend) ListWorkspacesByTenantId added in v0.1.39

func (b *PostgresBackend) ListWorkspacesByTenantId(ctx context.Context, tenantId string) ([]*types.Workspace, error)

ListWorkspacesByTenantId returns all workspaces belonging to a tenant

func (*PostgresBackend) MarkOrchestrationOutboxEventError added in v0.1.75

func (b *PostgresBackend) MarkOrchestrationOutboxEventError(
	ctx context.Context,
	eventID int64,
	lastError string,
) error

func (*PostgresBackend) MarkOrchestrationOutboxEventPublished added in v0.1.75

func (b *PostgresBackend) MarkOrchestrationOutboxEventPublished(ctx context.Context, eventID int64) error

func (*PostgresBackend) MarkRunExecutionRetried added in v0.1.60

func (b *PostgresBackend) MarkRunExecutionRetried(ctx context.Context, externalId string) error

func (*PostgresBackend) OpenTaskBlockerIfCurrentRun added in v0.1.110

func (b *PostgresBackend) OpenTaskBlockerIfCurrentRun(
	ctx context.Context,
	request types.TaskBlockerOpenRequest,
) (bool, *types.TaskBlocker, error)

func (*PostgresBackend) Ping

func (b *PostgresBackend) Ping(ctx context.Context) error

Ping checks the database connection

func (*PostgresBackend) RefreshAgentRunClaims added in v0.1.66

func (b *PostgresBackend) RefreshAgentRunClaims(ctx context.Context, workerId string, heartbeatAt time.Time, expiresAt time.Time) (int64, error)

func (*PostgresBackend) ReleaseStaleTaskInputClaims added in v0.1.98

func (b *PostgresBackend) ReleaseStaleTaskInputClaims(ctx context.Context, runID string) error

func (*PostgresBackend) RequeueTaskWithOutboxIfCurrentRun added in v0.1.98

func (b *PostgresBackend) RequeueTaskWithOutboxIfCurrentRun(
	ctx context.Context,
	task *types.AgentTask,
	expectedRunID string,
	outboxEvent *types.OrchestrationOutboxEvent,
) (bool, error)

func (*PostgresBackend) ResolveCurrentTaskBlocker added in v0.1.110

func (b *PostgresBackend) ResolveCurrentTaskBlocker(
	ctx context.Context,
	workspaceID uint,
	taskID string,
	resolution *types.TaskBlockerResolution,
) (*types.TaskBlocker, error)

func (*PostgresBackend) RevertScheduledTaskAdvance added in v0.1.92

func (b *PostgresBackend) RevertScheduledTaskAdvance(ctx context.Context, id string, currentNextRunAt, revertTo time.Time) (bool, error)

func (*PostgresBackend) RevokeOrgToken added in v0.1.40

func (r *PostgresBackend) RevokeOrgToken(ctx context.Context, externalId string) error

RevokeOrgToken deletes a token only if it is of type 'organization'. Returns sql.ErrNoRows if the token doesn't exist or isn't an org token.

func (*PostgresBackend) RevokeToken

func (r *PostgresBackend) RevokeToken(ctx context.Context, externalId string) error

func (*PostgresBackend) RunMigrations

func (b *PostgresBackend) RunMigrations() error

RunMigrations runs database migrations using goose

func (*PostgresBackend) SaveConnection

func (r *PostgresBackend) SaveConnection(ctx context.Context, workspaceId uint, memberId *uint, integrationType string, creds *types.IntegrationCredentials, scope string) (*types.IntegrationConnection, error)

func (*PostgresBackend) SetAgentRunClaim added in v0.1.66

func (b *PostgresBackend) SetAgentRunClaim(ctx context.Context, runId string, workerId string, heartbeatAt time.Time, expiresAt time.Time) error

func (*PostgresBackend) SetAgentRunUsageJSON added in v0.1.110

func (b *PostgresBackend) SetAgentRunUsageJSON(ctx context.Context, runId string, usageJSON map[string]any) error

func (*PostgresBackend) SetEventRecorder added in v0.1.95

func (b *PostgresBackend) SetEventRecorder(r instrumentation.EventRecorder)

SetEventRecorder sets the product analytics event recorder.

func (*PostgresBackend) SetRunExecutionResult added in v0.1.60

func (b *PostgresBackend) SetRunExecutionResult(ctx context.Context, externalId string, exitCode int, errorMsg string) error

func (*PostgresBackend) SetRunExecutionResultForAttempt added in v0.1.75

func (b *PostgresBackend) SetRunExecutionResultForAttempt(
	ctx context.Context,
	externalId string,
	attemptID string,
	exitCode int,
	errorMsg string,
) (bool, error)

func (*PostgresBackend) SetRunExecutionStarted added in v0.1.60

func (b *PostgresBackend) SetRunExecutionStarted(ctx context.Context, externalId string) error

func (*PostgresBackend) SetRunExecutionStartedForAttempt added in v0.1.75

func (b *PostgresBackend) SetRunExecutionStartedForAttempt(
	ctx context.Context,
	externalId string,
	attemptID string,
) (bool, error)

func (*PostgresBackend) SetWorkspaceToolSetting

func (b *PostgresBackend) SetWorkspaceToolSetting(ctx context.Context, workspaceId uint, toolName string, enabled bool) error

SetWorkspaceToolSetting creates or updates a tool setting for a workspace

func (*PostgresBackend) SleepTaskWithOutbox added in v0.1.96

func (b *PostgresBackend) SleepTaskWithOutbox(
	ctx context.Context,
	taskID string,
	expectedRunID string,
	wakeAt time.Time,
	wakeReason string,
	wakeAgenda []*types.TaskWakeAgendaItem,
	outboxEvent *types.OrchestrationOutboxEvent,
) (bool, error)

func (*PostgresBackend) UpdateAgentProfile added in v0.1.60

func (b *PostgresBackend) UpdateAgentProfile(ctx context.Context, profile *types.AgentProfile) error

func (*PostgresBackend) UpdateAgentRunAttemptResult added in v0.1.60

func (b *PostgresBackend) UpdateAgentRunAttemptResult(ctx context.Context, attemptId string, status types.AgentAttemptStatus, exitCode *int, endedAt time.Time, errorMsg *string) error

func (*PostgresBackend) UpdateAgentRunAttemptStart added in v0.1.60

func (b *PostgresBackend) UpdateAgentRunAttemptStart(ctx context.Context, attemptId string, startedAt time.Time) error

func (*PostgresBackend) UpdateAgentRunLifecycle added in v0.1.60

func (b *PostgresBackend) UpdateAgentRunLifecycle(ctx context.Context, runId string, status types.AgentRunStatus, startedAt, endedAt *time.Time, errorMsg *string) error

func (*PostgresBackend) UpdateExecutionInstanceState added in v0.1.60

func (b *PostgresBackend) UpdateExecutionInstanceState(
	ctx context.Context,
	instanceKey string,
	running, pending, stopping, desired int,
	status types.AgentExecutionInstanceStatus,
	lastEventAt *time.Time,
) error

func (*PostgresBackend) UpdateMember

func (r *PostgresBackend) UpdateMember(ctx context.Context, externalId string, name string, role types.MemberRole) (*types.WorkspaceMember, error)

func (*PostgresBackend) UpdateRunExecutionStatus added in v0.1.60

func (b *PostgresBackend) UpdateRunExecutionStatus(ctx context.Context, externalId string, status types.RunExecutionStatus) error

func (*PostgresBackend) UpdateScheduledTask added in v0.1.89

func (b *PostgresBackend) UpdateScheduledTask(ctx context.Context, st *types.ScheduledTask) error

func (*PostgresBackend) UpdateTask added in v0.1.96

func (b *PostgresBackend) UpdateTask(ctx context.Context, task *types.AgentTask) error

func (*PostgresBackend) UpdateTaskCost added in v0.1.96

func (b *PostgresBackend) UpdateTaskCost(ctx context.Context, taskID string, costUSD float64) error

func (*PostgresBackend) UpdateTaskOutputStatus added in v0.1.106

func (b *PostgresBackend) UpdateTaskOutputStatus(ctx context.Context, workspaceId uint, outputID string, status string) error

func (*PostgresBackend) UpdateTaskOutputSummary added in v0.1.96

func (b *PostgresBackend) UpdateTaskOutputSummary(ctx context.Context, workspaceId uint, outputID string, summary string) error

func (*PostgresBackend) UpdateTaskState added in v0.1.60

func (b *PostgresBackend) UpdateTaskState(ctx context.Context, update types.TaskStateUpdate) error

func (*PostgresBackend) UpdateTaskStateIfCurrentRun added in v0.1.75

func (b *PostgresBackend) UpdateTaskStateIfCurrentRun(ctx context.Context, update types.CurrentRunTaskStateUpdate) (bool, error)

func (*PostgresBackend) UpdateView added in v0.1.103

func (b *PostgresBackend) UpdateView(ctx context.Context, v *types.View) error

func (*PostgresBackend) UpdateWorkspaceToolConfig

func (b *PostgresBackend) UpdateWorkspaceToolConfig(ctx context.Context, id uint, config json.RawMessage) error

UpdateWorkspaceToolConfig updates the config for a workspace tool

func (*PostgresBackend) UpdateWorkspaceToolManifest

func (b *PostgresBackend) UpdateWorkspaceToolManifest(ctx context.Context, id uint, manifest json.RawMessage) error

UpdateWorkspaceToolManifest updates the manifest for a workspace tool

func (*PostgresBackend) UpsertChannelBinding added in v0.1.96

func (b *PostgresBackend) UpsertChannelBinding(ctx context.Context, binding *types.ChannelBinding) error

func (*PostgresBackend) UpsertTaskSourceWatches added in v0.1.115

func (b *PostgresBackend) UpsertTaskSourceWatches(ctx context.Context, workspaceID uint, taskID string, watches []TaskSourceWatch) error

func (*PostgresBackend) ValidateToken

func (r *PostgresBackend) ValidateToken(ctx context.Context, rawToken string) (*types.TokenValidationResult, error)

type QueryResult

type QueryResult struct {
	ID       string            `json:"id"`       // Provider-specific ID (e.g., Gmail message ID)
	Filename string            `json:"filename"` // Generated filename for the result
	Metadata map[string]string `json:"metadata"` // Key-value metadata (date, subject, from, etc.)
	Size     int64             `json:"size"`     // Content size in bytes
	Mtime    int64             `json:"mtime"`    // Last modified timestamp (Unix)
}

QueryResult represents a materialized search result from a filesystem query.

type RedisTaskQueue

type RedisTaskQueue struct {
	// contains filtered or unexported fields
}

RedisTaskQueue implements TaskQueue using Redis

func NewRedisTaskQueue

func NewRedisTaskQueue(rdb *common.RedisClient, queueName string) *RedisTaskQueue

NewRedisTaskQueue creates a new Redis-based task queue

func (*RedisTaskQueue) Complete

func (q *RedisTaskQueue) Complete(ctx context.Context, taskID string, result *types.RunExecutionResult) error

Complete marks a task as complete and stores the result

func (*RedisTaskQueue) Fail

func (q *RedisTaskQueue) Fail(ctx context.Context, taskID string, taskErr error) error

Fail marks a task as failed

func (*RedisTaskQueue) GetLogBuffer

func (q *RedisTaskQueue) GetLogBuffer(ctx context.Context, taskID string) ([][]byte, error)

GetLogBuffer returns buffered logs for a task (for late joiners)

func (*RedisTaskQueue) GetResult

func (q *RedisTaskQueue) GetResult(ctx context.Context, taskID string) (*types.RunExecutionResult, error)

GetResult returns the result of a completed task

func (*RedisTaskQueue) GetState

func (q *RedisTaskQueue) GetState(ctx context.Context, taskID string) (*types.RunExecutionState, error)

GetState returns the current state of a task

func (*RedisTaskQueue) InFlightCount

func (q *RedisTaskQueue) InFlightCount(ctx context.Context) (int64, error)

InFlightCount returns the number of tasks currently being processed

func (*RedisTaskQueue) Len

func (q *RedisTaskQueue) Len(ctx context.Context) (int64, error)

Len returns the number of pending tasks in the queue

func (*RedisTaskQueue) Pop

func (q *RedisTaskQueue) Pop(ctx context.Context, workerID string) (*types.RunExecution, error)

Pop blocks until a task is available and returns it

func (*RedisTaskQueue) PublishLog

func (q *RedisTaskQueue) PublishLog(ctx context.Context, taskID string, stream string, data string) error

PublishLog publishes a log entry to the task's log channel

func (*RedisTaskQueue) PublishStatus

func (q *RedisTaskQueue) PublishStatus(ctx context.Context, taskID string, status types.RunExecutionStatus, exitCode *int, errorMsg string) error

PublishStatus publishes a task status change event

func (*RedisTaskQueue) Push

func (q *RedisTaskQueue) Push(ctx context.Context, task *types.RunExecution) error

Push adds a task to the queue

func (*RedisTaskQueue) PushDelayed added in v0.1.60

func (q *RedisTaskQueue) PushDelayed(ctx context.Context, task *types.RunExecution, delay time.Duration) error

PushDelayed stores a task for delayed enqueue using a Redis sorted set. Delayed tasks survive process restarts and are moved to the main queue by Pop.

func (*RedisTaskQueue) Requeue added in v0.1.97

func (q *RedisTaskQueue) Requeue(ctx context.Context, task *types.RunExecution) error

Requeue atomically returns an execution to the pending queue after a worker crashed or lost it before the run claim was established.

func (*RedisTaskQueue) SubscribeLogs

func (q *RedisTaskQueue) SubscribeLogs(ctx context.Context, taskID string) (<-chan []byte, func(), error)

SubscribeLogs subscribes to a task's log channel and returns a channel of log entries

type RedisTerminalIORepository added in v0.1.53

type RedisTerminalIORepository struct {
	// contains filtered or unexported fields
}

func (*RedisTerminalIORepository) AcquireSessionLease added in v0.1.73

func (r *RedisTerminalIORepository) AcquireSessionLease(ctx context.Context, workspaceID uint, sessionID, ownerID string, ttl time.Duration) (bool, error)

func (*RedisTerminalIORepository) GetRunInteraction added in v0.1.73

func (r *RedisTerminalIORepository) GetRunInteraction(ctx context.Context, workspaceID uint, runID string) (*types.RunInteraction, error)

func (*RedisTerminalIORepository) GetSessionCheckpoint added in v0.1.97

func (r *RedisTerminalIORepository) GetSessionCheckpoint(ctx context.Context, workspaceID uint, sessionID string) (*types.SessionCheckpoint, error)

func (*RedisTerminalIORepository) GetSessionLeaseOwner added in v0.1.73

func (r *RedisTerminalIORepository) GetSessionLeaseOwner(ctx context.Context, workspaceID uint, sessionID string) (string, error)

func (*RedisTerminalIORepository) PublishCancel added in v0.1.53

func (r *RedisTerminalIORepository) PublishCancel(ctx context.Context, taskID string) error

func (*RedisTerminalIORepository) PublishInputWake added in v0.1.98

func (r *RedisTerminalIORepository) PublishInputWake(ctx context.Context, taskID string) error

func (*RedisTerminalIORepository) PublishOutput added in v0.1.53

func (r *RedisTerminalIORepository) PublishOutput(ctx context.Context, taskID string, data []byte) error

func (*RedisTerminalIORepository) ReleaseSessionLease added in v0.1.73

func (r *RedisTerminalIORepository) ReleaseSessionLease(ctx context.Context, workspaceID uint, sessionID, ownerID string) error

func (*RedisTerminalIORepository) RenewSessionLease added in v0.1.73

func (r *RedisTerminalIORepository) RenewSessionLease(ctx context.Context, workspaceID uint, sessionID, ownerID string, ttl time.Duration) (bool, error)

func (*RedisTerminalIORepository) SetRunInteraction added in v0.1.73

func (r *RedisTerminalIORepository) SetRunInteraction(
	ctx context.Context,
	workspaceID uint,
	runID string,
	interaction types.RunInteraction,
	ttl time.Duration,
) error

func (*RedisTerminalIORepository) SetSessionCheckpoint added in v0.1.97

func (r *RedisTerminalIORepository) SetSessionCheckpoint(
	ctx context.Context,
	workspaceID uint,
	sessionID string,
	checkpoint *types.SessionCheckpoint,
	ttl time.Duration,
) error

func (*RedisTerminalIORepository) SubscribeCancel added in v0.1.53

func (r *RedisTerminalIORepository) SubscribeCancel(ctx context.Context, taskID string) (<-chan struct{}, func(), error)

func (*RedisTerminalIORepository) SubscribeInputWake added in v0.1.98

func (r *RedisTerminalIORepository) SubscribeInputWake(ctx context.Context, taskID string) (<-chan struct{}, func(), error)

func (*RedisTerminalIORepository) SubscribeOutput added in v0.1.53

func (r *RedisTerminalIORepository) SubscribeOutput(ctx context.Context, taskID string) (<-chan []byte, func(), error)

type SearchHit

type SearchHit struct {
	WorkspaceID uint    `json:"workspace_id"`
	QueryPath   string  `json:"query_path"` // Path of the query that produced this result
	ResultID    string  `json:"result_id"`  // Provider-specific ID
	Filename    string  `json:"filename"`
	Snippet     string  `json:"snippet"` // Text snippet with match highlighted
	Score       float64 `json:"score"`   // Relevance score
}

SearchHit represents a full-text search match across materialized content.

type SpawnBinding added in v0.1.110

type SpawnBinding struct {
	TaskID         string    `db:"task_id"`
	SourceOutputID string    `db:"source_output_id"`
	EntityLabel    string    `db:"entity_label"`
	CreatedAt      time.Time `db:"created_at"`
}

type TaskLogEntry

type TaskLogEntry struct {
	TaskID    string `json:"task_id"`
	Timestamp int64  `json:"timestamp"`
	Stream    string `json:"stream"` // "stdout" or "stderr"
	Data      string `json:"data"`
}

TaskLogEntry represents a log entry for a task

type TaskQueue

type TaskQueue interface {
	Push(ctx context.Context, task *types.RunExecution) error
	Requeue(ctx context.Context, task *types.RunExecution) error
	Pop(ctx context.Context, workerID string) (*types.RunExecution, error)
	Complete(ctx context.Context, taskID string, result *types.RunExecutionResult) error
	Fail(ctx context.Context, taskID string, err error) error
	GetState(ctx context.Context, taskID string) (*types.RunExecutionState, error)
	GetResult(ctx context.Context, taskID string) (*types.RunExecutionResult, error)
	Len(ctx context.Context) (int64, error)
	InFlightCount(ctx context.Context) (int64, error)

	// Log streaming
	PublishLog(ctx context.Context, taskID string, stream string, data string) error
	PublishStatus(ctx context.Context, taskID string, status types.RunExecutionStatus, exitCode *int, errorMsg string) error
	SubscribeLogs(ctx context.Context, taskID string) (<-chan []byte, func(), error)
	GetLogBuffer(ctx context.Context, taskID string) ([][]byte, error)
}

TaskQueue manages task queuing and distribution via Redis

type TaskSourceWatch added in v0.1.115

type TaskSourceWatch struct {
	Integration    string
	CorrelationKey string
	Reason         string
}

type TaskSourceWatchMatch added in v0.1.115

type TaskSourceWatchMatch struct {
	WorkspaceID    uint
	TaskID         string
	CorrelationKey string
	Reason         string
	ParentTaskID   string
}

type TaskStatusEvent

type TaskStatusEvent struct {
	TaskID    string                   `json:"task_id"`
	Timestamp int64                    `json:"timestamp"`
	Status    types.RunExecutionStatus `json:"status"`
	ExitCode  *int                     `json:"exit_code,omitempty"`
	Error     string                   `json:"error,omitempty"`
}

TaskStatusEvent represents a task status change event

type TerminalIORepository added in v0.1.53

type TerminalIORepository interface {
	PublishInputWake(ctx context.Context, taskID string) error
	SubscribeInputWake(ctx context.Context, taskID string) (<-chan struct{}, func(), error)

	PublishOutput(ctx context.Context, taskID string, data []byte) error
	SubscribeOutput(ctx context.Context, taskID string) (<-chan []byte, func(), error)

	PublishCancel(ctx context.Context, taskID string) error
	SubscribeCancel(ctx context.Context, taskID string) (<-chan struct{}, func(), error)

	// Session lease: exclusive ownership of an interactive session.
	AcquireSessionLease(ctx context.Context, workspaceID uint, sessionID, ownerID string, ttl time.Duration) (bool, error)
	RenewSessionLease(ctx context.Context, workspaceID uint, sessionID, ownerID string, ttl time.Duration) (bool, error)
	ReleaseSessionLease(ctx context.Context, workspaceID uint, sessionID, ownerID string) error
	GetSessionLeaseOwner(ctx context.Context, workspaceID uint, sessionID string) (string, error)
	SetSessionCheckpoint(ctx context.Context, workspaceID uint, sessionID string, checkpoint *types.SessionCheckpoint, ttl time.Duration) error
	GetSessionCheckpoint(ctx context.Context, workspaceID uint, sessionID string) (*types.SessionCheckpoint, error)

	// Run interaction state: backend-owned state for working/waiting/closed.
	SetRunInteraction(ctx context.Context, workspaceID uint, runID string, interaction types.RunInteraction, ttl time.Duration) error
	GetRunInteraction(ctx context.Context, workspaceID uint, runID string) (*types.RunInteraction, error)
}

TerminalIORepository manages interactive terminal I/O transport. Implementations encapsulate broker/channel details (e.g., Redis pub/sub).

func NewRedisTerminalIORepository added in v0.1.53

func NewRedisTerminalIORepository(rdb *common.RedisClient) TerminalIORepository

type TokenRepository

type TokenRepository interface {
	// Workspace member tokens
	CreateToken(ctx context.Context, workspaceId, memberId uint, name string, expiresAt *time.Time, tokenType types.TokenType) (*types.Token, string, error)
	GetToken(ctx context.Context, externalId string) (*types.Token, error)
	ListTokens(ctx context.Context, workspaceId uint) ([]types.Token, error)
	RevokeToken(ctx context.Context, externalId string) error

	// Worker tokens (cluster-level)
	CreateWorkerToken(ctx context.Context, name string, poolName *string, expiresAt *time.Time) (*types.Token, string, error)
	ListWorkerTokens(ctx context.Context) ([]types.Token, error)

	// Organization tokens (tenant-scoped)
	CreateOrgToken(ctx context.Context, name string, tenantId string, expiresAt *time.Time) (*types.Token, string, error)
	ListOrgTokens(ctx context.Context, tenantId string) ([]types.Token, error)
	RevokeOrgToken(ctx context.Context, externalId string) error

	// Workspace service tokens (workspace-scoped, no member)
	CreateWorkspaceServiceToken(ctx context.Context, workspaceId uint, name string) (*types.Token, string, error)
	EnsureWorkspaceServiceToken(ctx context.Context, workspaceId uint) (*types.Token, string, error)

	// Validation
	ValidateToken(ctx context.Context, rawToken string) (*types.TokenValidationResult, error)
	AuthorizeToken(ctx context.Context, rawToken string) (*types.AuthInfo, error)
}

TokenRepository manages authentication tokens

type WorkerPoolRepository

type WorkerPoolRepository interface {
	SetPoolState(ctx context.Context, poolName string, state *types.WorkerPoolState) error
	GetPoolState(ctx context.Context, poolName string) (*types.WorkerPoolState, error)
}

WorkerPoolRepository manages worker pool state in Redis

type WorkerRedisRepository

type WorkerRedisRepository struct {
	// contains filtered or unexported fields
}

WorkerRedisRepository implements WorkerRepository using Redis.

func (*WorkerRedisRepository) AddWorker

func (r *WorkerRedisRepository) AddWorker(ctx context.Context, w *types.Worker) error

func (*WorkerRedisRepository) AllocateIP added in v0.1.23

func (r *WorkerRedisRepository) AllocateIP(ctx context.Context, sandboxID, workerID string) (*types.IPAllocation, error)

func (*WorkerRedisRepository) GetAllWorkers

func (r *WorkerRedisRepository) GetAllWorkers(ctx context.Context) ([]*types.Worker, error)

func (*WorkerRedisRepository) GetAvailableWorkers

func (r *WorkerRedisRepository) GetAvailableWorkers(ctx context.Context) ([]*types.Worker, error)

func (*WorkerRedisRepository) GetSandboxIP added in v0.1.23

func (r *WorkerRedisRepository) GetSandboxIP(ctx context.Context, sandboxID string) (string, bool)

func (*WorkerRedisRepository) GetWorker

func (r *WorkerRedisRepository) GetWorker(ctx context.Context, id string) (*types.Worker, error)

func (*WorkerRedisRepository) ReleaseIP added in v0.1.23

func (r *WorkerRedisRepository) ReleaseIP(ctx context.Context, sandboxID string) error

func (*WorkerRedisRepository) RemoveWorker

func (r *WorkerRedisRepository) RemoveWorker(ctx context.Context, id string) error

func (*WorkerRedisRepository) SetWorkerKeepAlive

func (r *WorkerRedisRepository) SetWorkerKeepAlive(ctx context.Context, id string) error

func (*WorkerRedisRepository) UpdateWorkerStatus

func (r *WorkerRedisRepository) UpdateWorkerStatus(ctx context.Context, id string, status types.WorkerStatus) error

type WorkerRepository

type WorkerRepository interface {
	AddWorker(ctx context.Context, worker *types.Worker) error
	GetWorker(ctx context.Context, workerId string) (*types.Worker, error)
	GetAllWorkers(ctx context.Context) ([]*types.Worker, error)
	GetAvailableWorkers(ctx context.Context) ([]*types.Worker, error)
	RemoveWorker(ctx context.Context, workerId string) error
	SetWorkerKeepAlive(ctx context.Context, workerId string) error
	UpdateWorkerStatus(ctx context.Context, workerId string, status types.WorkerStatus) error
	AllocateIP(ctx context.Context, sandboxID, workerID string) (*types.IPAllocation, error)
	ReleaseIP(ctx context.Context, sandboxID string) error
	GetSandboxIP(ctx context.Context, sandboxID string) (string, bool)
}

WorkerRepository manages worker state in Redis

func NewWorkerRedisRepository

func NewWorkerRedisRepository(rdb *common.RedisClient) WorkerRepository

func NewWorkerRedisRepositoryForTest

func NewWorkerRedisRepositoryForTest(rdb *common.RedisClient) WorkerRepository

NewWorkerRedisRepositoryForTest creates a WorkerRepository backed by miniredis

type WorkspaceToolRepository

type WorkspaceToolRepository interface {
	CreateWorkspaceTool(ctx context.Context, workspaceId uint, createdByMemberId *uint, name string, providerType types.WorkspaceToolProviderType, config json.RawMessage, manifest json.RawMessage) (*types.WorkspaceTool, error)
	GetWorkspaceTool(ctx context.Context, id uint) (*types.WorkspaceTool, error)
	GetWorkspaceToolByExternalId(ctx context.Context, externalId string) (*types.WorkspaceTool, error)
	GetWorkspaceToolByName(ctx context.Context, workspaceId uint, name string) (*types.WorkspaceTool, error)
	ListWorkspaceTools(ctx context.Context, workspaceId uint) ([]*types.WorkspaceTool, error)
	UpdateWorkspaceToolManifest(ctx context.Context, id uint, manifest json.RawMessage) error
	UpdateWorkspaceToolConfig(ctx context.Context, id uint, config json.RawMessage) error
	DeleteWorkspaceTool(ctx context.Context, id uint) error
	DeleteWorkspaceToolByName(ctx context.Context, workspaceId uint, name string) error
}

WorkspaceToolRepository manages workspace-scoped tool providers

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL