Documentation
¶
Index ¶
- func DecryptCredentials(conn *types.IntegrationConnection) (*types.IntegrationCredentials, error)
- func NewRedisClientForTest() (*common.RedisClient, error)
- type BackendRepository
- type ElasticsearchClient
- type FilesystemStore
- type IntegrationRepository
- type MemberRepository
- type PostgresBackend
- func (r *PostgresBackend) AuthorizeToken(ctx context.Context, rawToken string) (*types.AuthInfo, error)
- func (b *PostgresBackend) CancelTask(ctx context.Context, externalId string) error
- func (b *PostgresBackend) Close() error
- func (r *PostgresBackend) CreateMember(ctx context.Context, workspaceId uint, email, name string, ...) (*types.WorkspaceMember, error)
- func (b *PostgresBackend) CreateTask(ctx context.Context, task *types.Task) error
- func (r *PostgresBackend) CreateToken(ctx context.Context, workspaceId, memberId uint, name string, ...) (*types.Token, string, error)
- func (r *PostgresBackend) CreateWorkerToken(ctx context.Context, name string, poolName *string, expiresAt *time.Time) (*types.Token, string, error)
- func (b *PostgresBackend) CreateWorkspace(ctx context.Context, name string) (*types.Workspace, error)
- func (b *PostgresBackend) CreateWorkspaceTool(ctx context.Context, workspaceId uint, createdByMemberId *uint, name string, ...) (*types.WorkspaceTool, error)
- func (b *PostgresBackend) DB() *sql.DB
- func (r *PostgresBackend) DeleteConnection(ctx context.Context, externalId string) error
- func (r *PostgresBackend) DeleteMember(ctx context.Context, externalId string) error
- func (b *PostgresBackend) DeleteTask(ctx context.Context, externalId string) error
- func (b *PostgresBackend) DeleteWorkspace(ctx context.Context, id uint) error
- func (b *PostgresBackend) DeleteWorkspaceTool(ctx context.Context, id uint) error
- func (b *PostgresBackend) DeleteWorkspaceToolByName(ctx context.Context, workspaceId uint, name string) error
- func (r *PostgresBackend) GetConnection(ctx context.Context, workspaceId uint, memberId uint, integrationType string) (*types.IntegrationConnection, error)
- func (r *PostgresBackend) GetConnectionByExternalId(ctx context.Context, externalId string) (*types.IntegrationConnection, error)
- func (r *PostgresBackend) GetMember(ctx context.Context, externalId string) (*types.WorkspaceMember, error)
- func (r *PostgresBackend) GetMemberByEmail(ctx context.Context, workspaceId uint, email string) (*types.WorkspaceMember, error)
- func (b *PostgresBackend) GetRetryableTasks(ctx context.Context) ([]*types.Task, error)
- func (b *PostgresBackend) GetStuckHookTasks(ctx context.Context, timeout time.Duration) ([]*types.Task, error)
- func (b *PostgresBackend) GetTask(ctx context.Context, externalId string) (*types.Task, error)
- func (b *PostgresBackend) GetTaskById(ctx context.Context, id uint) (*types.Task, error)
- func (r *PostgresBackend) GetToken(ctx context.Context, externalId string) (*types.Token, error)
- func (b *PostgresBackend) GetWorkspace(ctx context.Context, id uint) (*types.Workspace, error)
- func (b *PostgresBackend) GetWorkspaceByExternalId(ctx context.Context, externalId string) (*types.Workspace, error)
- func (b *PostgresBackend) GetWorkspaceByName(ctx context.Context, name string) (*types.Workspace, error)
- func (b *PostgresBackend) GetWorkspaceTool(ctx context.Context, id uint) (*types.WorkspaceTool, error)
- func (b *PostgresBackend) GetWorkspaceToolByExternalId(ctx context.Context, externalId string) (*types.WorkspaceTool, error)
- func (b *PostgresBackend) GetWorkspaceToolByName(ctx context.Context, workspaceId uint, name string) (*types.WorkspaceTool, error)
- func (b *PostgresBackend) GetWorkspaceToolSetting(ctx context.Context, workspaceId uint, toolName string) (*types.WorkspaceToolSetting, error)
- func (b *PostgresBackend) GetWorkspaceToolSettings(ctx context.Context, workspaceId uint) (*types.WorkspaceToolSettings, error)
- func (r *PostgresBackend) ListConnections(ctx context.Context, workspaceId uint) ([]types.IntegrationConnection, error)
- func (r *PostgresBackend) ListMembers(ctx context.Context, workspaceId uint) ([]types.WorkspaceMember, error)
- func (b *PostgresBackend) ListTasks(ctx context.Context, workspaceId uint) ([]*types.Task, error)
- func (b *PostgresBackend) ListTasksByHook(ctx context.Context, hookId uint) ([]*types.Task, error)
- func (r *PostgresBackend) ListTokens(ctx context.Context, workspaceId uint) ([]types.Token, error)
- func (r *PostgresBackend) ListWorkerTokens(ctx context.Context) ([]types.Token, error)
- func (b *PostgresBackend) ListWorkspaceToolSettings(ctx context.Context, workspaceId uint) ([]types.WorkspaceToolSetting, error)
- func (b *PostgresBackend) ListWorkspaceTools(ctx context.Context, workspaceId uint) ([]*types.WorkspaceTool, error)
- func (b *PostgresBackend) ListWorkspaces(ctx context.Context) ([]*types.Workspace, error)
- func (b *PostgresBackend) Ping(ctx context.Context) error
- func (r *PostgresBackend) RevokeToken(ctx context.Context, externalId string) error
- func (b *PostgresBackend) RunMigrations() error
- func (r *PostgresBackend) SaveConnection(ctx context.Context, workspaceId uint, memberId *uint, integrationType string, ...) (*types.IntegrationConnection, error)
- func (b *PostgresBackend) SetTaskResult(ctx context.Context, externalId string, exitCode int, errorMsg string) error
- func (b *PostgresBackend) SetTaskStarted(ctx context.Context, externalId string) error
- func (b *PostgresBackend) SetWorkspaceToolSetting(ctx context.Context, workspaceId uint, toolName string, enabled bool) error
- func (r *PostgresBackend) UpdateMember(ctx context.Context, externalId string, name string, role types.MemberRole) (*types.WorkspaceMember, error)
- func (b *PostgresBackend) UpdateTaskStatus(ctx context.Context, externalId string, status types.TaskStatus) error
- func (b *PostgresBackend) UpdateWorkspaceToolConfig(ctx context.Context, id uint, config json.RawMessage) error
- func (b *PostgresBackend) UpdateWorkspaceToolManifest(ctx context.Context, id uint, manifest json.RawMessage) error
- func (r *PostgresBackend) ValidateToken(ctx context.Context, rawToken string) (*types.TokenValidationResult, error)
- type QueryResult
- type RedisTaskQueue
- func (q *RedisTaskQueue) Complete(ctx context.Context, taskID string, result *types.TaskResult) error
- func (q *RedisTaskQueue) Fail(ctx context.Context, taskID string, taskErr error) error
- func (q *RedisTaskQueue) GetLogBuffer(ctx context.Context, taskID string) ([][]byte, error)
- func (q *RedisTaskQueue) GetResult(ctx context.Context, taskID string) (*types.TaskResult, error)
- func (q *RedisTaskQueue) GetState(ctx context.Context, taskID string) (*types.TaskState, error)
- func (q *RedisTaskQueue) InFlightCount(ctx context.Context) (int64, error)
- func (q *RedisTaskQueue) Len(ctx context.Context) (int64, error)
- func (q *RedisTaskQueue) Pop(ctx context.Context, workerID string) (*types.Task, error)
- func (q *RedisTaskQueue) PublishLog(ctx context.Context, taskID string, stream string, data string) error
- func (q *RedisTaskQueue) PublishStatus(ctx context.Context, taskID string, status types.TaskStatus, exitCode *int, ...) error
- func (q *RedisTaskQueue) Push(ctx context.Context, task *types.Task) error
- func (q *RedisTaskQueue) SubscribeLogs(ctx context.Context, taskID string) (<-chan []byte, func(), error)
- type SearchHit
- type TaskLogEntry
- type TaskQueue
- type TaskStatusEvent
- type TokenRepository
- type WorkerPoolRepository
- type WorkerRedisRepository
- func (r *WorkerRedisRepository) AddWorker(ctx context.Context, w *types.Worker) error
- func (r *WorkerRedisRepository) AllocateIP(ctx context.Context, sandboxID, workerID string) (*types.IPAllocation, error)
- func (r *WorkerRedisRepository) GetAllWorkers(ctx context.Context) ([]*types.Worker, error)
- func (r *WorkerRedisRepository) GetAvailableWorkers(ctx context.Context) ([]*types.Worker, error)
- func (r *WorkerRedisRepository) GetSandboxIP(ctx context.Context, sandboxID string) (string, bool)
- func (r *WorkerRedisRepository) GetWorker(ctx context.Context, id string) (*types.Worker, error)
- func (r *WorkerRedisRepository) ReleaseIP(ctx context.Context, sandboxID string) error
- func (r *WorkerRedisRepository) RemoveWorker(ctx context.Context, id string) error
- func (r *WorkerRedisRepository) SetWorkerKeepAlive(ctx context.Context, id string) error
- func (r *WorkerRedisRepository) UpdateWorkerStatus(ctx context.Context, id string, status types.WorkerStatus) error
- type WorkerRepository
- type WorkspaceToolRepository
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DecryptCredentials ¶
func DecryptCredentials(conn *types.IntegrationConnection) (*types.IntegrationCredentials, error)
DecryptCredentials parses credentials from a connection (TODO: implement encryption)
func NewRedisClientForTest ¶
func NewRedisClientForTest() (*common.RedisClient, error)
NewRedisClientForTest creates a Redis client backed by miniredis for testing
Types ¶
type BackendRepository ¶
type BackendRepository interface {
// Workspaces
CreateWorkspace(ctx context.Context, name string) (*types.Workspace, error)
GetWorkspace(ctx context.Context, id uint) (*types.Workspace, error)
GetWorkspaceByExternalId(ctx context.Context, externalId string) (*types.Workspace, error)
GetWorkspaceByName(ctx context.Context, name string) (*types.Workspace, error)
ListWorkspaces(ctx context.Context) ([]*types.Workspace, error)
DeleteWorkspace(ctx context.Context, id uint) error
// Workspace Tool Settings
GetWorkspaceToolSettings(ctx context.Context, workspaceId uint) (*types.WorkspaceToolSettings, error)
GetWorkspaceToolSetting(ctx context.Context, workspaceId uint, toolName string) (*types.WorkspaceToolSetting, error)
SetWorkspaceToolSetting(ctx context.Context, workspaceId uint, toolName string, enabled bool) error
ListWorkspaceToolSettings(ctx context.Context, workspaceId uint) ([]types.WorkspaceToolSetting, error)
// Members
MemberRepository
// Tokens
TokenRepository
// Integrations
IntegrationRepository
// Workspace Tools
WorkspaceToolRepository
// Tasks
CreateTask(ctx context.Context, task *types.Task) error
GetTask(ctx context.Context, externalId string) (*types.Task, error)
GetTaskById(ctx context.Context, id uint) (*types.Task, error)
ListTasks(ctx context.Context, workspaceId uint) ([]*types.Task, error)
UpdateTaskStatus(ctx context.Context, externalId string, status types.TaskStatus) error
SetTaskStarted(ctx context.Context, externalId string) error
SetTaskResult(ctx context.Context, externalId string, exitCode int, errorMsg string) error
CancelTask(ctx context.Context, externalId string) error
DeleteTask(ctx context.Context, externalId string) error
GetRetryableTasks(ctx context.Context) ([]*types.Task, error)
GetStuckHookTasks(ctx context.Context, timeout time.Duration) ([]*types.Task, error)
ListTasksByHook(ctx context.Context, hookId uint) ([]*types.Task, error)
// Database access
DB() *sql.DB
// Utilities
Ping(ctx context.Context) error
Close() error
RunMigrations() error
}
BackendRepository is the main Postgres repository for persistent data. For filesystem queries and metadata, use FilesystemStore instead.
type ElasticsearchClient ¶
type ElasticsearchClient interface {
Index(ctx context.Context, index, docID string, body []byte) error
Search(ctx context.Context, index string, query map[string]interface{}, size int) ([]json.RawMessage, error)
Get(ctx context.Context, index, docID string) ([]byte, error)
Delete(ctx context.Context, index, docID string) error
DeleteByQuery(ctx context.Context, index string, query map[string]interface{}) error
}
ElasticsearchClient is an optional interface for Elasticsearch operations.
func NewElasticsearchClient ¶
func NewElasticsearchClient(url string) ElasticsearchClient
NewElasticsearchClient creates a new Elasticsearch client.
type FilesystemStore ¶
type FilesystemStore interface {
// CreateQuery stores a new filesystem query definition.
CreateQuery(ctx context.Context, query *types.FilesystemQuery) (*types.FilesystemQuery, error)
// GetQuery retrieves a query by workspace and path.
GetQuery(ctx context.Context, workspaceId uint, path string) (*types.FilesystemQuery, error)
// GetQueryByExternalId retrieves a query by its external ID.
GetQueryByExternalId(ctx context.Context, externalId string) (*types.FilesystemQuery, error)
// ListQueries returns all queries under a parent path.
ListQueries(ctx context.Context, workspaceId uint, parentPath string) ([]*types.FilesystemQuery, error)
// UpdateQuery updates an existing query definition.
UpdateQuery(ctx context.Context, query *types.FilesystemQuery) error
// DeleteQuery removes a query by external ID.
DeleteQuery(ctx context.Context, externalId string) error
// GetQueryResults retrieves cached results for a query path.
GetQueryResults(ctx context.Context, workspaceId uint, queryPath string) ([]QueryResult, error)
// StoreQueryResults caches query results with the specified TTL.
StoreQueryResults(ctx context.Context, workspaceId uint, queryPath string, results []QueryResult, ttl time.Duration) error
// GetResultContent retrieves cached content for a specific result.
GetResultContent(ctx context.Context, workspaceId uint, queryPath, resultID string) ([]byte, error)
// StoreResultContent caches content for a specific result.
StoreResultContent(ctx context.Context, workspaceId uint, queryPath, resultID string, content []byte) error
// SearchContent performs full-text search across all materialized content.
SearchContent(ctx context.Context, workspaceId uint, query string, limit int) ([]SearchHit, error)
// IndexContent indexes content for full-text search.
IndexContent(ctx context.Context, workspaceId uint, queryPath, resultID, filename string, content []byte) error
// GetFileMeta retrieves file metadata.
GetFileMeta(ctx context.Context, path string) (*types.FileMeta, error)
// GetDirMeta retrieves directory metadata.
GetDirMeta(ctx context.Context, path string) (*types.DirMeta, error)
// SaveFileMeta stores file metadata.
SaveFileMeta(ctx context.Context, meta *types.FileMeta) error
// SaveDirMeta stores directory metadata.
SaveDirMeta(ctx context.Context, meta *types.DirMeta) error
// DeleteFileMeta removes file metadata.
DeleteFileMeta(ctx context.Context, path string) error
// DeleteDirMeta removes directory metadata.
DeleteDirMeta(ctx context.Context, path string) error
// ListDir returns directory entries.
ListDir(ctx context.Context, path string) ([]types.DirEntry, error)
// SaveDirListing stores a directory listing.
SaveDirListing(ctx context.Context, path string, entries []types.DirEntry) error
// GetSymlink retrieves a symlink target.
GetSymlink(ctx context.Context, path string) (string, error)
// SaveSymlink stores a symlink.
SaveSymlink(ctx context.Context, path, target string) error
// DeleteSymlink removes a symlink.
DeleteSymlink(ctx context.Context, path string) error
// GetWatchedSourceQueries returns stale source queries that have active hooks watching their path.
GetWatchedSourceQueries(ctx context.Context, staleAfter time.Duration, limit int) ([]*types.FilesystemQuery, error)
// CreateHook stores a new hook definition.
CreateHook(ctx context.Context, hook *types.Hook) (*types.Hook, error)
// GetHook retrieves a hook by external ID.
GetHook(ctx context.Context, externalId string) (*types.Hook, error)
// GetHookById retrieves a hook by internal ID.
GetHookById(ctx context.Context, id uint) (*types.Hook, error)
// ListHooks returns all hooks for a workspace.
ListHooks(ctx context.Context, workspaceId uint) ([]*types.Hook, error)
// UpdateHook updates an existing hook.
UpdateHook(ctx context.Context, hook *types.Hook) error
// DeleteHook removes a hook by external ID.
DeleteHook(ctx context.Context, externalId string) error
// InvalidatePath removes a specific path from all caches.
InvalidatePath(ctx context.Context, path string) error
// InvalidatePrefix removes all paths with a given prefix.
InvalidatePrefix(ctx context.Context, prefix string) error
// InvalidateQuery removes all cached data for a query (results + content).
InvalidateQuery(ctx context.Context, workspaceId uint, queryPath string) error
}
FilesystemStore is the unified interface for all filesystem operations. It consolidates query definitions (Postgres/memory), query results (Elasticsearch/memory), and filesystem metadata (Redis/memory) into a single interface.
func NewFilesystemStore ¶
func NewFilesystemStore(db *sql.DB, redis *common.RedisClient, elastic ElasticsearchClient) FilesystemStore
NewFilesystemStore creates a unified filesystem store. Pass nil for db/redis to use memory-only mode.
func NewFilesystemStoreWithTTL ¶
func NewFilesystemStoreWithTTL(db *sql.DB, redis *common.RedisClient, elastic ElasticsearchClient, ttl time.Duration) FilesystemStore
NewFilesystemStoreWithTTL creates a store with custom cache TTL.
func NewMemoryFilesystemStore ¶
func NewMemoryFilesystemStore() FilesystemStore
NewMemoryFilesystemStore creates a memory-only filesystem store (for local mode).
type IntegrationRepository ¶
type IntegrationRepository interface {
SaveConnection(ctx context.Context, workspaceId uint, memberId *uint, integrationType string, creds *types.IntegrationCredentials, scope string) (*types.IntegrationConnection, error)
GetConnection(ctx context.Context, workspaceId uint, memberId uint, integrationType string) (*types.IntegrationConnection, error)
GetConnectionByExternalId(ctx context.Context, externalId string) (*types.IntegrationConnection, error)
ListConnections(ctx context.Context, workspaceId uint) ([]types.IntegrationConnection, error)
DeleteConnection(ctx context.Context, externalId string) error
}
IntegrationRepository manages integration connections
type MemberRepository ¶
type MemberRepository interface {
CreateMember(ctx context.Context, workspaceId uint, email, name string, role types.MemberRole) (*types.WorkspaceMember, error)
GetMember(ctx context.Context, externalId string) (*types.WorkspaceMember, error)
GetMemberByEmail(ctx context.Context, workspaceId uint, email string) (*types.WorkspaceMember, error)
ListMembers(ctx context.Context, workspaceId uint) ([]types.WorkspaceMember, error)
UpdateMember(ctx context.Context, externalId string, name string, role types.MemberRole) (*types.WorkspaceMember, error)
DeleteMember(ctx context.Context, externalId string) error
}
MemberRepository manages workspace members
type PostgresBackend ¶
type PostgresBackend struct {
// contains filtered or unexported fields
}
PostgresBackend implements BackendRepository using Postgres
func NewPostgresBackend ¶
func NewPostgresBackend(cfg types.PostgresConfig) (*PostgresBackend, error)
NewPostgresBackend creates a new Postgres backend
func (*PostgresBackend) AuthorizeToken ¶
func (r *PostgresBackend) AuthorizeToken(ctx context.Context, rawToken string) (*types.AuthInfo, error)
AuthorizeToken validates a token and returns AuthInfo for the auth system
func (*PostgresBackend) CancelTask ¶
func (b *PostgresBackend) CancelTask(ctx context.Context, externalId string) error
CancelTask cancels a running or pending task
func (*PostgresBackend) Close ¶
func (b *PostgresBackend) Close() error
Close closes the database connection
func (*PostgresBackend) CreateMember ¶
func (r *PostgresBackend) CreateMember(ctx context.Context, workspaceId uint, email, name string, role types.MemberRole) (*types.WorkspaceMember, error)
func (*PostgresBackend) CreateTask ¶
CreateTask creates a new task
func (*PostgresBackend) CreateToken ¶
func (*PostgresBackend) CreateWorkerToken ¶
func (r *PostgresBackend) CreateWorkerToken(ctx context.Context, name string, poolName *string, expiresAt *time.Time) (*types.Token, string, error)
CreateWorkerToken creates a cluster-level worker token (not tied to a workspace)
func (*PostgresBackend) CreateWorkspace ¶
func (b *PostgresBackend) CreateWorkspace(ctx context.Context, name string) (*types.Workspace, error)
CreateWorkspace creates a new workspace
func (*PostgresBackend) CreateWorkspaceTool ¶
func (b *PostgresBackend) CreateWorkspaceTool(ctx context.Context, workspaceId uint, createdByMemberId *uint, name string, providerType types.WorkspaceToolProviderType, config json.RawMessage, manifest json.RawMessage) (*types.WorkspaceTool, error)
CreateWorkspaceTool creates a new workspace tool provider
func (*PostgresBackend) DB ¶
func (b *PostgresBackend) DB() *sql.DB
DB returns the underlying database connection
func (*PostgresBackend) DeleteConnection ¶
func (r *PostgresBackend) DeleteConnection(ctx context.Context, externalId string) error
func (*PostgresBackend) DeleteMember ¶
func (r *PostgresBackend) DeleteMember(ctx context.Context, externalId string) error
func (*PostgresBackend) DeleteTask ¶
func (b *PostgresBackend) DeleteTask(ctx context.Context, externalId string) error
DeleteTask removes a task by external ID
func (*PostgresBackend) DeleteWorkspace ¶
func (b *PostgresBackend) DeleteWorkspace(ctx context.Context, id uint) error
DeleteWorkspace removes a workspace by internal ID
func (*PostgresBackend) DeleteWorkspaceTool ¶
func (b *PostgresBackend) DeleteWorkspaceTool(ctx context.Context, id uint) error
DeleteWorkspaceTool removes a workspace tool by internal ID
func (*PostgresBackend) DeleteWorkspaceToolByName ¶
func (b *PostgresBackend) DeleteWorkspaceToolByName(ctx context.Context, workspaceId uint, name string) error
DeleteWorkspaceToolByName removes a workspace tool by name within a workspace
func (*PostgresBackend) GetConnection ¶
func (r *PostgresBackend) GetConnection(ctx context.Context, workspaceId uint, memberId uint, integrationType string) (*types.IntegrationConnection, error)
func (*PostgresBackend) GetConnectionByExternalId ¶
func (r *PostgresBackend) GetConnectionByExternalId(ctx context.Context, externalId string) (*types.IntegrationConnection, error)
func (*PostgresBackend) GetMember ¶
func (r *PostgresBackend) GetMember(ctx context.Context, externalId string) (*types.WorkspaceMember, error)
func (*PostgresBackend) GetMemberByEmail ¶
func (r *PostgresBackend) GetMemberByEmail(ctx context.Context, workspaceId uint, email string) (*types.WorkspaceMember, error)
func (*PostgresBackend) GetRetryableTasks ¶ added in v0.1.28
GetRetryableTasks returns failed hook-triggered tasks eligible for retry. A task is retryable if: hook_id set, status=failed, attempt < max_attempts, and enough time has passed since finished_at (exponential backoff).
func (*PostgresBackend) GetStuckHookTasks ¶ added in v0.1.28
func (b *PostgresBackend) GetStuckHookTasks(ctx context.Context, timeout time.Duration) ([]*types.Task, error)
GetStuckHookTasks returns hook-triggered tasks stuck in pending/running longer than timeout.
func (*PostgresBackend) GetTaskById ¶
GetTaskById retrieves a task by internal ID
func (*PostgresBackend) GetWorkspace ¶
GetWorkspace retrieves a workspace by internal ID
func (*PostgresBackend) GetWorkspaceByExternalId ¶
func (b *PostgresBackend) GetWorkspaceByExternalId(ctx context.Context, externalId string) (*types.Workspace, error)
GetWorkspaceByExternalId retrieves a workspace by external UUID
func (*PostgresBackend) GetWorkspaceByName ¶
func (b *PostgresBackend) GetWorkspaceByName(ctx context.Context, name string) (*types.Workspace, error)
GetWorkspaceByName retrieves a workspace by name
func (*PostgresBackend) GetWorkspaceTool ¶
func (b *PostgresBackend) GetWorkspaceTool(ctx context.Context, id uint) (*types.WorkspaceTool, error)
GetWorkspaceTool retrieves a workspace tool by internal ID
func (*PostgresBackend) GetWorkspaceToolByExternalId ¶
func (b *PostgresBackend) GetWorkspaceToolByExternalId(ctx context.Context, externalId string) (*types.WorkspaceTool, error)
GetWorkspaceToolByExternalId retrieves a workspace tool by external UUID
func (*PostgresBackend) GetWorkspaceToolByName ¶
func (b *PostgresBackend) GetWorkspaceToolByName(ctx context.Context, workspaceId uint, name string) (*types.WorkspaceTool, error)
GetWorkspaceToolByName retrieves a workspace tool by name within a workspace
func (*PostgresBackend) GetWorkspaceToolSetting ¶
func (b *PostgresBackend) GetWorkspaceToolSetting(ctx context.Context, workspaceId uint, toolName string) (*types.WorkspaceToolSetting, error)
GetWorkspaceToolSetting returns the setting for a specific tool in a workspace Returns nil if no setting exists (tool is enabled by default)
func (*PostgresBackend) GetWorkspaceToolSettings ¶
func (b *PostgresBackend) GetWorkspaceToolSettings(ctx context.Context, workspaceId uint) (*types.WorkspaceToolSettings, error)
GetWorkspaceToolSettings returns all tool settings for a workspace as a lookup structure Tools not in the database are considered enabled by default
func (*PostgresBackend) ListConnections ¶
func (r *PostgresBackend) ListConnections(ctx context.Context, workspaceId uint) ([]types.IntegrationConnection, error)
func (*PostgresBackend) ListMembers ¶
func (r *PostgresBackend) ListMembers(ctx context.Context, workspaceId uint) ([]types.WorkspaceMember, error)
func (*PostgresBackend) ListTasks ¶
ListTasks returns all tasks for a workspace (0 = all workspaces) Limited to 100 most recent tasks
func (*PostgresBackend) ListTasksByHook ¶ added in v0.1.28
ListTasksByHook returns all tasks triggered by a specific hook, most recent first.
func (*PostgresBackend) ListTokens ¶
func (*PostgresBackend) ListWorkerTokens ¶
ListWorkerTokens returns all worker tokens (cluster-level, not workspace-scoped)
func (*PostgresBackend) ListWorkspaceToolSettings ¶
func (b *PostgresBackend) ListWorkspaceToolSettings(ctx context.Context, workspaceId uint) ([]types.WorkspaceToolSetting, error)
ListWorkspaceToolSettings returns all tool settings for a workspace
func (*PostgresBackend) ListWorkspaceTools ¶
func (b *PostgresBackend) ListWorkspaceTools(ctx context.Context, workspaceId uint) ([]*types.WorkspaceTool, error)
ListWorkspaceTools returns all tools for a workspace
func (*PostgresBackend) ListWorkspaces ¶
ListWorkspaces returns all workspaces
func (*PostgresBackend) Ping ¶
func (b *PostgresBackend) Ping(ctx context.Context) error
Ping checks the database connection
func (*PostgresBackend) RevokeToken ¶
func (r *PostgresBackend) RevokeToken(ctx context.Context, externalId string) error
func (*PostgresBackend) RunMigrations ¶
func (b *PostgresBackend) RunMigrations() error
RunMigrations runs database migrations using goose
func (*PostgresBackend) SaveConnection ¶
func (r *PostgresBackend) SaveConnection(ctx context.Context, workspaceId uint, memberId *uint, integrationType string, creds *types.IntegrationCredentials, scope string) (*types.IntegrationConnection, error)
func (*PostgresBackend) SetTaskResult ¶
func (b *PostgresBackend) SetTaskResult(ctx context.Context, externalId string, exitCode int, errorMsg string) error
SetTaskResult sets the final result of a task
func (*PostgresBackend) SetTaskStarted ¶
func (b *PostgresBackend) SetTaskStarted(ctx context.Context, externalId string) error
SetTaskStarted marks a task as started
func (*PostgresBackend) SetWorkspaceToolSetting ¶
func (b *PostgresBackend) SetWorkspaceToolSetting(ctx context.Context, workspaceId uint, toolName string, enabled bool) error
SetWorkspaceToolSetting creates or updates a tool setting for a workspace
func (*PostgresBackend) UpdateMember ¶
func (r *PostgresBackend) UpdateMember(ctx context.Context, externalId string, name string, role types.MemberRole) (*types.WorkspaceMember, error)
func (*PostgresBackend) UpdateTaskStatus ¶
func (b *PostgresBackend) UpdateTaskStatus(ctx context.Context, externalId string, status types.TaskStatus) error
UpdateTaskStatus updates a task's status
func (*PostgresBackend) UpdateWorkspaceToolConfig ¶
func (b *PostgresBackend) UpdateWorkspaceToolConfig(ctx context.Context, id uint, config json.RawMessage) error
UpdateWorkspaceToolConfig updates the config for a workspace tool
func (*PostgresBackend) UpdateWorkspaceToolManifest ¶
func (b *PostgresBackend) UpdateWorkspaceToolManifest(ctx context.Context, id uint, manifest json.RawMessage) error
UpdateWorkspaceToolManifest updates the manifest for a workspace tool
func (*PostgresBackend) ValidateToken ¶
func (r *PostgresBackend) ValidateToken(ctx context.Context, rawToken string) (*types.TokenValidationResult, error)
type QueryResult ¶
type QueryResult struct {
ID string `json:"id"` // Provider-specific ID (e.g., Gmail message ID)
Filename string `json:"filename"` // Generated filename for the result
Metadata map[string]string `json:"metadata"` // Key-value metadata (date, subject, from, etc.)
Size int64 `json:"size"` // Content size in bytes
Mtime int64 `json:"mtime"` // Last modified timestamp (Unix)
}
QueryResult represents a materialized search result from a filesystem query.
type RedisTaskQueue ¶
type RedisTaskQueue struct {
// contains filtered or unexported fields
}
RedisTaskQueue implements TaskQueue using Redis
func NewRedisTaskQueue ¶
func NewRedisTaskQueue(rdb *common.RedisClient, queueName string) *RedisTaskQueue
NewRedisTaskQueue creates a new Redis-based task queue
func (*RedisTaskQueue) Complete ¶
func (q *RedisTaskQueue) Complete(ctx context.Context, taskID string, result *types.TaskResult) error
Complete marks a task as complete and stores the result
func (*RedisTaskQueue) GetLogBuffer ¶
GetLogBuffer returns buffered logs for a task (for late joiners)
func (*RedisTaskQueue) GetResult ¶
func (q *RedisTaskQueue) GetResult(ctx context.Context, taskID string) (*types.TaskResult, error)
GetResult returns the result of a completed task
func (*RedisTaskQueue) InFlightCount ¶
func (q *RedisTaskQueue) InFlightCount(ctx context.Context) (int64, error)
InFlightCount returns the number of tasks currently being processed
func (*RedisTaskQueue) Len ¶
func (q *RedisTaskQueue) Len(ctx context.Context) (int64, error)
Len returns the number of pending tasks in the queue
func (*RedisTaskQueue) PublishLog ¶
func (q *RedisTaskQueue) PublishLog(ctx context.Context, taskID string, stream string, data string) error
PublishLog publishes a log entry to the task's log channel
func (*RedisTaskQueue) PublishStatus ¶
func (q *RedisTaskQueue) PublishStatus(ctx context.Context, taskID string, status types.TaskStatus, exitCode *int, errorMsg string) error
PublishStatus publishes a task status change event
func (*RedisTaskQueue) SubscribeLogs ¶
func (q *RedisTaskQueue) SubscribeLogs(ctx context.Context, taskID string) (<-chan []byte, func(), error)
SubscribeLogs subscribes to a task's log channel and returns a channel of log entries
type SearchHit ¶
type SearchHit struct {
WorkspaceID uint `json:"workspace_id"`
QueryPath string `json:"query_path"` // Path of the query that produced this result
ResultID string `json:"result_id"` // Provider-specific ID
Filename string `json:"filename"`
Snippet string `json:"snippet"` // Text snippet with match highlighted
Score float64 `json:"score"` // Relevance score
}
SearchHit represents a full-text search match across materialized content.
type TaskLogEntry ¶
type TaskLogEntry struct {
TaskID string `json:"task_id"`
Timestamp int64 `json:"timestamp"`
Stream string `json:"stream"` // "stdout" or "stderr"
Data string `json:"data"`
}
TaskLogEntry represents a log entry for a task
type TaskQueue ¶
type TaskQueue interface {
Push(ctx context.Context, task *types.Task) error
Pop(ctx context.Context, workerID string) (*types.Task, error)
Complete(ctx context.Context, taskID string, result *types.TaskResult) error
Fail(ctx context.Context, taskID string, err error) error
GetState(ctx context.Context, taskID string) (*types.TaskState, error)
GetResult(ctx context.Context, taskID string) (*types.TaskResult, error)
Len(ctx context.Context) (int64, error)
InFlightCount(ctx context.Context) (int64, error)
// Log streaming
PublishLog(ctx context.Context, taskID string, stream string, data string) error
PublishStatus(ctx context.Context, taskID string, status types.TaskStatus, exitCode *int, errorMsg string) error
SubscribeLogs(ctx context.Context, taskID string) (<-chan []byte, func(), error)
GetLogBuffer(ctx context.Context, taskID string) ([][]byte, error)
}
TaskQueue manages task queuing and distribution via Redis
type TaskStatusEvent ¶
type TaskStatusEvent struct {
TaskID string `json:"task_id"`
Timestamp int64 `json:"timestamp"`
Status types.TaskStatus `json:"status"`
ExitCode *int `json:"exit_code,omitempty"`
Error string `json:"error,omitempty"`
}
TaskStatusEvent represents a task status change event
type TokenRepository ¶
type TokenRepository interface {
// Workspace member tokens
CreateToken(ctx context.Context, workspaceId, memberId uint, name string, expiresAt *time.Time, tokenType types.TokenType) (*types.Token, string, error)
GetToken(ctx context.Context, externalId string) (*types.Token, error)
ListTokens(ctx context.Context, workspaceId uint) ([]types.Token, error)
RevokeToken(ctx context.Context, externalId string) error
// Worker tokens (cluster-level)
CreateWorkerToken(ctx context.Context, name string, poolName *string, expiresAt *time.Time) (*types.Token, string, error)
ListWorkerTokens(ctx context.Context) ([]types.Token, error)
// Validation
ValidateToken(ctx context.Context, rawToken string) (*types.TokenValidationResult, error)
AuthorizeToken(ctx context.Context, rawToken string) (*types.AuthInfo, error)
}
TokenRepository manages authentication tokens
type WorkerPoolRepository ¶
type WorkerPoolRepository interface {
SetPoolState(ctx context.Context, poolName string, state *types.WorkerPoolState) error
GetPoolState(ctx context.Context, poolName string) (*types.WorkerPoolState, error)
}
WorkerPoolRepository manages worker pool state in Redis
type WorkerRedisRepository ¶
type WorkerRedisRepository struct {
// contains filtered or unexported fields
}
WorkerRedisRepository implements WorkerRepository using Redis.
func (*WorkerRedisRepository) AllocateIP ¶ added in v0.1.23
func (r *WorkerRedisRepository) AllocateIP(ctx context.Context, sandboxID, workerID string) (*types.IPAllocation, error)
func (*WorkerRedisRepository) GetAllWorkers ¶
func (*WorkerRedisRepository) GetAvailableWorkers ¶
func (*WorkerRedisRepository) GetSandboxIP ¶ added in v0.1.23
func (*WorkerRedisRepository) ReleaseIP ¶ added in v0.1.23
func (r *WorkerRedisRepository) ReleaseIP(ctx context.Context, sandboxID string) error
func (*WorkerRedisRepository) RemoveWorker ¶
func (r *WorkerRedisRepository) RemoveWorker(ctx context.Context, id string) error
func (*WorkerRedisRepository) SetWorkerKeepAlive ¶
func (r *WorkerRedisRepository) SetWorkerKeepAlive(ctx context.Context, id string) error
func (*WorkerRedisRepository) UpdateWorkerStatus ¶
func (r *WorkerRedisRepository) UpdateWorkerStatus(ctx context.Context, id string, status types.WorkerStatus) error
type WorkerRepository ¶
type WorkerRepository interface {
AddWorker(ctx context.Context, worker *types.Worker) error
GetWorker(ctx context.Context, workerId string) (*types.Worker, error)
GetAllWorkers(ctx context.Context) ([]*types.Worker, error)
GetAvailableWorkers(ctx context.Context) ([]*types.Worker, error)
RemoveWorker(ctx context.Context, workerId string) error
SetWorkerKeepAlive(ctx context.Context, workerId string) error
UpdateWorkerStatus(ctx context.Context, workerId string, status types.WorkerStatus) error
AllocateIP(ctx context.Context, sandboxID, workerID string) (*types.IPAllocation, error)
ReleaseIP(ctx context.Context, sandboxID string) error
GetSandboxIP(ctx context.Context, sandboxID string) (string, bool)
}
WorkerRepository manages worker state in Redis
func NewWorkerRedisRepository ¶
func NewWorkerRedisRepository(rdb *common.RedisClient) WorkerRepository
func NewWorkerRedisRepositoryForTest ¶
func NewWorkerRedisRepositoryForTest(rdb *common.RedisClient) WorkerRepository
NewWorkerRedisRepositoryForTest creates a WorkerRepository backed by miniredis
type WorkspaceToolRepository ¶
type WorkspaceToolRepository interface {
CreateWorkspaceTool(ctx context.Context, workspaceId uint, createdByMemberId *uint, name string, providerType types.WorkspaceToolProviderType, config json.RawMessage, manifest json.RawMessage) (*types.WorkspaceTool, error)
GetWorkspaceTool(ctx context.Context, id uint) (*types.WorkspaceTool, error)
GetWorkspaceToolByExternalId(ctx context.Context, externalId string) (*types.WorkspaceTool, error)
GetWorkspaceToolByName(ctx context.Context, workspaceId uint, name string) (*types.WorkspaceTool, error)
ListWorkspaceTools(ctx context.Context, workspaceId uint) ([]*types.WorkspaceTool, error)
UpdateWorkspaceToolManifest(ctx context.Context, id uint, manifest json.RawMessage) error
UpdateWorkspaceToolConfig(ctx context.Context, id uint, config json.RawMessage) error
DeleteWorkspaceTool(ctx context.Context, id uint) error
DeleteWorkspaceToolByName(ctx context.Context, workspaceId uint, name string) error
}
WorkspaceToolRepository manages workspace-scoped tool providers