Documentation
¶
Index ¶
- type AICredentials
- type AIServiceFactory
- type AuthAdapter
- type Capabilities
- type CapabilityStore
- type ChunkOptions
- type ChunkResult
- type ChunkStore
- type Chunker
- type ConfigProvider
- type ConnectionStore
- type Connector
- type ConnectorBuilder
- type ConnectorFactory
- type Container
- type ContainerLister
- type ContainerListerFactory
- type ContentExtractor
- type CredentialsStore
- type DistributedLock
- type DocumentResult
- type DocumentStore
- type EmbeddingService
- type LLMService
- type Normaliser
- type NormaliserRegistry
- type OAuthConfig
- type OAuthCredentials
- type OAuthHandler
- type OAuthHandlerFactory
- type OAuthState
- type OAuthStateStore
- type OAuthToken
- type OAuthTokenProvider
- type OAuthUserInfo
- type OperationalLimits
- type QueueStats
- type SchedulerStore
- type SearchEngine
- type SearchQueryRepository
- type SessionStore
- type SettingsStore
- type SourceStore
- type StaticTokenProvider
- func (p *StaticTokenProvider) AuthMethod() domain.AuthMethod
- func (p *StaticTokenProvider) GetAccessToken(ctx context.Context) (string, error)
- func (p *StaticTokenProvider) GetCredentials(ctx context.Context) (*domain.Credentials, error)
- func (p *StaticTokenProvider) IsValid(ctx context.Context) bool
- type SyncStateStore
- type TaskFilter
- type TaskQueue
- type TokenProvider
- type TokenProviderFactory
- type TokenRefresher
- type UserStore
- type VectorIndex
- type VectorSearchResult
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AICredentials ¶
AICredentials holds AI provider API keys and base URLs from environment variables
type AIServiceFactory ¶
type AIServiceFactory interface {
// CreateEmbeddingService creates an embedding service from settings and credentials.
// Credentials come from ConfigProvider (environment variables).
// Returns error if settings are invalid or service creation fails.
CreateEmbeddingService(settings *domain.EmbeddingSettings, credentials *AICredentials) (EmbeddingService, error)
// CreateLLMService creates an LLM service from settings and credentials.
// Credentials come from ConfigProvider (environment variables).
// Returns error if settings are invalid or service creation fails.
CreateLLMService(settings *domain.LLMSettings, credentials *AICredentials) (LLMService, error)
}
AIServiceFactory creates AI services based on configuration. Credentials are provided separately from domain settings.
type AuthAdapter ¶
type AuthAdapter interface {
// Password operations
HashPassword(password string) (string, error)
VerifyPassword(password, hash string) bool
// Token operations
GenerateToken(claims *domain.TokenClaims) (string, error)
ParseToken(token string) (*domain.TokenClaims, error)
}
AuthAdapter handles authentication cryptographic operations. This does NOT handle storage - use SessionStore for session persistence.
type Capabilities ¶
type Capabilities struct {
// OAuth providers that are configured via environment variables
OAuthProviders []domain.ProviderType
// AI providers available for embedding
EmbeddingProviders []domain.AIProvider
// AI providers available for LLM
LLMProviders []domain.AIProvider
// SearchEngineAvailable indicates if a search engine (e.g. OpenSearch) was initialized
SearchEngineAvailable bool
// VectorStoreAvailable indicates if a vector store (e.g. pgvector) was initialized
VectorStoreAvailable bool
// Operational boundaries from environment variables
Limits OperationalLimits
}
Capabilities represents what features are available based on environment configuration
type CapabilityStore ¶
type CapabilityStore interface {
// GetPreferences retrieves capability preferences for a team.
// Returns error if team not found or database error occurs.
GetPreferences(ctx context.Context, teamID string) (*domain.CapabilityPreferences, error)
// SavePreferences persists capability preferences for a team.
// Creates new preferences if they don't exist, updates if they do.
SavePreferences(ctx context.Context, prefs *domain.CapabilityPreferences) error
}
CapabilityStore manages capability preferences persistence
type ChunkOptions ¶
type ChunkOptions struct {
MaxChunkSize int // Maximum characters per chunk
Overlap int // Character overlap between chunks
}
ChunkOptions configures chunking behavior
func DefaultChunkOptions ¶
func DefaultChunkOptions() ChunkOptions
DefaultChunkOptions returns sensible defaults
type ChunkResult ¶
ChunkResult represents a chunk with position info
type ChunkStore ¶
type ChunkStore interface {
// Save creates or updates a chunk
Save(ctx context.Context, chunk *domain.Chunk) error
// SaveBatch saves multiple chunks in a transaction
SaveBatch(ctx context.Context, chunks []*domain.Chunk) error
// GetByDocument retrieves all chunks for a document
GetByDocument(ctx context.Context, documentID string) ([]*domain.Chunk, error)
// Delete deletes a chunk
Delete(ctx context.Context, id string) error
// DeleteByDocument deletes all chunks for a document
DeleteByDocument(ctx context.Context, documentID string) error
// DeleteBySource deletes all chunks for a source
DeleteBySource(ctx context.Context, sourceID string) error
}
ChunkStore handles chunk persistence (PostgreSQL)
type Chunker ¶
type Chunker interface {
// Chunk splits content into chunks
Chunk(content string, opts ChunkOptions) []ChunkResult
}
Chunker splits document content into searchable chunks
type ConfigProvider ¶
type ConfigProvider interface {
// GetOAuthCredentials returns OAuth client credentials for a provider.
// Returns nil if the provider is not configured in environment variables.
GetOAuthCredentials(provider domain.ProviderType) *OAuthCredentials
// GetAICredentials returns AI provider credentials (API key, base URL).
// Returns nil if the provider is not configured in environment variables.
GetAICredentials(provider domain.AIProvider) *AICredentials
// IsOAuthConfigured returns true if OAuth credentials are available for the provider.
IsOAuthConfigured(provider domain.ProviderType) bool
// IsAIConfigured returns true if AI credentials are available for the provider.
IsAIConfigured(provider domain.AIProvider) bool
// GetCapabilities returns information about what's available based on env configuration.
// This is used for the /api/v1/capabilities endpoint.
GetCapabilities() *Capabilities
// GetBaseURL returns the application base URL for OAuth callbacks.
GetBaseURL() string
}
ConfigProvider provides access to configuration from environment variables. This is a driven port that abstracts environment-based configuration. Implementation lives in internal/config/ (infrastructure layer).
type ConnectionStore ¶
type ConnectionStore interface {
// Save stores a new connection or updates an existing one.
// Secrets are encrypted before storage.
Save(ctx context.Context, conn *domain.Connection) error
// Get retrieves a connection by ID with decrypted secrets.
// Returns domain.ErrNotFound if the connection doesn't exist.
Get(ctx context.Context, id string) (*domain.Connection, error)
// List retrieves all connections as summaries (no secrets).
List(ctx context.Context) ([]*domain.ConnectionSummary, error)
// Delete removes a connection by ID.
// Returns domain.ErrNotFound if the connection doesn't exist.
Delete(ctx context.Context, id string) error
// GetByProvider retrieves connections for a provider type (no secrets).
GetByProvider(ctx context.Context, providerType domain.ProviderType) ([]*domain.ConnectionSummary, error)
// GetByAccountID retrieves a connection by provider type and account ID.
// Returns nil if not found.
GetByAccountID(ctx context.Context, providerType domain.ProviderType, accountID string) (*domain.Connection, error)
// UpdateSecrets updates the encrypted secrets and OAuth metadata.
// Used after token refresh.
UpdateSecrets(ctx context.Context, id string, secrets *domain.ConnectionSecrets, expiry *time.Time) error
// UpdateLastUsed updates the last_used_at timestamp.
UpdateLastUsed(ctx context.Context, id string) error
}
ConnectionStore persists connector connections with encrypted secrets.
type Connector ¶
type Connector interface {
// Type returns the provider type.
Type() domain.ProviderType
// ValidateConfig validates source configuration.
ValidateConfig(config domain.SourceConfig) error
// FetchChanges fetches document changes since last sync.
// Returns changes, next cursor, and error.
// The cursor enables incremental sync - pass empty string for full sync.
FetchChanges(ctx context.Context, source *domain.Source, cursor string) ([]*domain.Change, string, error)
// FetchDocument fetches a single document by external ID.
// Returns the document, content hash, and error.
FetchDocument(ctx context.Context, source *domain.Source, externalID string) (*domain.Document, string, error)
// TestConnection tests the connection to the source.
TestConnection(ctx context.Context, source *domain.Source) error
}
Connector fetches documents from a source provider. Connectors are created by ConnectorBuilder with resolved credentials.
type ConnectorBuilder ¶
type ConnectorBuilder interface {
// Type returns the provider type this builder creates.
Type() domain.ProviderType
// Build creates a connector scoped to a specific container.
// The containerID comes from source.SelectedContainers (one per sync job).
// For providers that don't support container selection, containerID may be empty.
// The TokenProvider handles credential retrieval and OAuth token refresh.
Build(ctx context.Context, tokenProvider TokenProvider, containerID string) (Connector, error)
// SupportsOAuth returns true if this connector uses OAuth authentication.
SupportsOAuth() bool
// OAuthConfig returns OAuth configuration for this provider.
// Returns nil if the provider doesn't support OAuth.
OAuthConfig() *OAuthConfig
// SupportsContainerSelection returns true if this connector supports container picking.
// If true, admins can select specific containers (repos, drives, spaces) to index.
// If false, the connector indexes all accessible content.
SupportsContainerSelection() bool
}
ConnectorBuilder creates connector instances for a specific provider type. Each provider has its own builder registered with the ConnectorFactory.
type ConnectorFactory ¶
type ConnectorFactory interface {
// Register registers a connector builder for a provider type.
Register(builder ConnectorBuilder)
// Create creates a connector for the given source, scoped to a container.
// Called by SyncOrchestrator once per container in source.SelectedContainers.
// For providers without container selection, containerID may be empty.
Create(ctx context.Context, source *domain.Source, containerID string) (Connector, error)
// SupportedTypes returns all registered provider types.
SupportedTypes() []domain.ProviderType
// GetBuilder returns the builder for a provider type.
GetBuilder(providerType domain.ProviderType) (ConnectorBuilder, error)
// SupportsOAuth returns true if the provider supports OAuth.
SupportsOAuth(providerType domain.ProviderType) bool
// GetOAuthConfig returns OAuth config for a provider.
GetOAuthConfig(providerType domain.ProviderType) *OAuthConfig
}
ConnectorFactory manages connector builders and creates connectors.
type Container ¶
type Container struct {
// ID is the provider-specific identifier.
// Format varies by provider:
// - GitHub: "owner/repo"
// - Google Drive: drive ID
// - Confluence: space key
ID string `json:"id"`
// Name is the human-readable display name.
Name string `json:"name"`
// Description is an optional description of the container.
Description string `json:"description,omitempty"`
// Type identifies the container type.
// Examples: "repository", "shared_drive", "space", "project", "channel"
Type string `json:"type"`
// Metadata contains provider-specific additional data.
// Examples:
// - GitHub: {"owner": "...", "private": "true", "archived": "false"}
// - Google Drive: {"shared": "true"}
Metadata map[string]string `json:"metadata,omitempty"`
}
Container represents a top-level container that can be selected for indexing. Examples:
- GitHub: repository (owner/repo)
- Google Drive: shared drive
- Confluence: space
- Jira: project
- Notion: workspace
- Slack: channel
type ContainerLister ¶
type ContainerLister interface {
// ListContainers lists containers accessible with the current credentials.
// Pagination is handled via cursor - pass empty string for the first page.
// Returns:
// - containers: list of available containers
// - nextCursor: cursor for the next page, empty if no more pages
// - err: any error that occurred
ListContainers(ctx context.Context, cursor string) ([]*Container, string, error)
}
ContainerLister lists available containers for a connector. Not all connectors support container selection - some index all accessible content.
The ContainerLister is typically retrieved from the ConnectorFactory for a specific provider type and installation.
type ContainerListerFactory ¶
type ContainerListerFactory interface {
// Create creates a ContainerLister for the given provider and installation.
// Returns nil if the provider doesn't support container selection.
Create(ctx context.Context, providerType domain.ProviderType, installationID string) (ContainerLister, error)
// SupportsContainerSelection returns true if the provider supports container selection.
// If false, the connector indexes all accessible content.
SupportsContainerSelection(providerType domain.ProviderType) bool
}
ContainerListerFactory creates ContainerListers for different providers.
type ContentExtractor ¶
type ContentExtractor interface {
// Extract extracts text content from raw data
Extract(ctx context.Context, data []byte, mimeType string) (string, error)
// SupportedTypes returns supported MIME types
SupportedTypes() []string
}
ContentExtractor extracts text content from various file formats
type CredentialsStore ¶
type CredentialsStore interface {
// Save stores credentials (encrypts sensitive fields)
Save(ctx context.Context, creds *domain.Credentials) error
// Get retrieves credentials by ID
Get(ctx context.Context, id string) (*domain.Credentials, error)
// List retrieves all credentials
List(ctx context.Context) ([]*domain.Credentials, error)
// Delete deletes credentials
Delete(ctx context.Context, id string) error
// GetByProvider retrieves credentials for a provider type
GetByProvider(ctx context.Context, providerType domain.ProviderType) ([]*domain.Credentials, error)
}
CredentialsStore handles credential persistence (PostgreSQL, encrypted)
type DistributedLock ¶
type DistributedLock interface {
// Acquire attempts to acquire a named lock with the given TTL.
// Returns true if the lock was successfully acquired, false if already held by another instance.
// The lock will automatically expire after TTL (implementation dependent).
Acquire(ctx context.Context, name string, ttl time.Duration) (acquired bool, err error)
// Release releases a named lock.
// This is best-effort; implementations with TTL will auto-expire anyway.
// Safe to call even if the lock is not held or has expired.
Release(ctx context.Context, name string) error
// Extend extends the TTL of a currently held lock.
// Returns error if the lock is not held by this instance.
// Note: Not all implementations support TTL extension (e.g., PostgreSQL advisory locks).
Extend(ctx context.Context, name string, ttl time.Duration) error
// Ping checks if the lock backend is healthy.
Ping(ctx context.Context) error
}
DistributedLock provides distributed locking for coordinating work across instances. This is used to prevent duplicate execution in multi-instance deployments.
type DocumentResult ¶
type DocumentResult struct {
DocumentID string
SourceID string
Title string
Content string // Full document content (for snippet extraction)
Score float64
}
DocumentResult represents a document-level search result from BM25.
type DocumentStore ¶
type DocumentStore interface {
// Save creates or updates a document
Save(ctx context.Context, doc *domain.Document) error
// SaveBatch saves multiple documents in a transaction
SaveBatch(ctx context.Context, docs []*domain.Document) error
// Get retrieves a document by ID
Get(ctx context.Context, id string) (*domain.Document, error)
// GetByExternalID retrieves a document by source and external ID
GetByExternalID(ctx context.Context, sourceID, externalID string) (*domain.Document, error)
// GetBySource retrieves all documents for a source with pagination
GetBySource(ctx context.Context, sourceID string, limit, offset int) ([]*domain.Document, error)
// Delete deletes a document
Delete(ctx context.Context, id string) error
// DeleteBySource deletes all documents for a source
DeleteBySource(ctx context.Context, sourceID string) error
// DeleteBatch deletes multiple documents by ID
DeleteBatch(ctx context.Context, ids []string) error
// Count returns total document count
Count(ctx context.Context) (int, error)
// CountBySource returns document count for a source
CountBySource(ctx context.Context, sourceID string) (int, error)
// ListExternalIDs returns all external IDs for a source (for diff sync)
ListExternalIDs(ctx context.Context, sourceID string) ([]string, error)
}
DocumentStore handles document persistence (PostgreSQL)
type EmbeddingService ¶
type EmbeddingService interface {
// Embed generates embeddings for multiple texts
Embed(ctx context.Context, texts []string) ([][]float32, error)
// EmbedQuery generates an embedding for a search query
// May use different model/parameters optimized for queries
EmbedQuery(ctx context.Context, query string) ([]float32, error)
// Dimensions returns the embedding dimension size
Dimensions() int
// Model returns the model name being used
Model() string
// HealthCheck verifies the embedding service is available
HealthCheck(ctx context.Context) error
// Close releases resources held by the embedding service
Close() error
}
EmbeddingService generates text embeddings
type LLMService ¶
type LLMService interface {
// ExpandQuery takes a search query and returns expanded/related terms
// Useful for improving search recall
ExpandQuery(ctx context.Context, query string) ([]string, error)
// Summarise generates a summary of the given content
// maxLen is a hint for maximum length (model may not respect exactly)
Summarise(ctx context.Context, content string, maxLen int) (string, error)
// RewriteQuery rewrites the query for better search results
// Returns the rewritten query
RewriteQuery(ctx context.Context, query string) (string, error)
// Model returns the model name being used
Model() string
// Ping verifies the LLM service is available
Ping(ctx context.Context) error
// Close releases resources held by the LLM service
Close() error
}
LLMService provides large language model capabilities for search enhancement
type Normaliser ¶
type Normaliser interface {
// Normalise transforms raw content into normalized text.
// The mimeType helps determine the appropriate processing.
Normalise(content string, mimeType string) string
// SupportedTypes returns MIME types this normaliser handles.
// Can include wildcards like "text/*" or specific types like "text/markdown".
SupportedTypes() []string
// Priority returns the normaliser priority (higher = more specific).
// Priority ranges:
// 90-100: Connector-specific (e.g., Gmail email normaliser)
// 50-89: Format-specific (PDF, Markdown, HTML)
// 10-49: Generic (basic text processing)
// 1-9: Fallback (raw text extraction)
Priority() int
}
Normaliser normalizes raw document content for indexing. It transforms provider-specific document formats into normalized text.
type NormaliserRegistry ¶
type NormaliserRegistry interface {
// Get retrieves the best-matching normaliser for a MIME type.
// Returns nil if no normaliser is registered for the type.
// When multiple match, the highest priority normaliser is returned.
Get(mimeType string) Normaliser
// GetAll retrieves all normalisers that match a MIME type, sorted by priority (highest first).
GetAll(mimeType string) []Normaliser
// Register registers a normaliser.
Register(normaliser Normaliser)
// List returns all registered MIME types.
List() []string
}
NormaliserRegistry manages content normalisers. When multiple normalisers match a MIME type, the highest priority one is used.
type OAuthConfig ¶
type OAuthConfig struct {
// AuthURL is the authorization endpoint
AuthURL string
// TokenURL is the token exchange endpoint
TokenURL string
// Scopes are the required OAuth scopes
Scopes []string
// UserInfoURL is the endpoint to fetch user info (optional)
UserInfoURL string
}
OAuthConfig contains OAuth settings for a provider.
type OAuthCredentials ¶
OAuthCredentials holds OAuth client credentials from environment variables
type OAuthHandler ¶
type OAuthHandler interface {
// DefaultConfig returns the default OAuth configuration (auth URL, token URL, scopes)
DefaultConfig() OAuthConfig
// BuildAuthURL constructs the OAuth authorization URL with PKCE
BuildAuthURL(clientID, redirectURI, state, codeChallenge string, scopes []string) string
// ExchangeCode exchanges an authorization code for access tokens
ExchangeCode(ctx context.Context, clientID, clientSecret, code, redirectURI, codeVerifier string) (*OAuthToken, error)
// GetUserInfo retrieves user information using an access token
GetUserInfo(ctx context.Context, accessToken string) (*OAuthUserInfo, error)
// RefreshToken refreshes an expired access token
RefreshToken(ctx context.Context, refreshToken string) (*OAuthToken, error)
}
OAuthHandler handles OAuth flow for a provider.
type OAuthHandlerFactory ¶
type OAuthHandlerFactory interface {
// GetOAuthHandler returns an OAuth handler for the given provider.
// Returns nil if the provider doesn't support OAuth or isn't implemented.
GetOAuthHandler(providerType domain.ProviderType) OAuthHandler
}
OAuthHandlerFactory creates OAuth handlers for specific providers. This abstracts the connectors.Factory dependency.
type OAuthState ¶
type OAuthState struct {
// State is a cryptographically random string used for CSRF protection.
State string
// ProviderType is the OAuth provider (github, google_drive, etc.)
ProviderType string
// CodeVerifier is the PKCE code verifier (plain text, not hashed).
// This is used to generate code_challenge for the auth request
// and sent as code_verifier during token exchange.
CodeVerifier string
// RedirectURI is the callback URL where the provider will redirect.
RedirectURI string
// ReturnContext indicates where to redirect after OAuth completes.
// Values: "setup", "admin-sources", or empty for default behavior.
ReturnContext string
// CreatedAt is when the state was created.
CreatedAt time.Time
// ExpiresAt is when the state expires (typically 10 minutes).
ExpiresAt time.Time
}
OAuthState represents a pending OAuth authorization flow state. Used for CSRF protection and PKCE code verifier storage.
type OAuthStateStore ¶
type OAuthStateStore interface {
// Save stores a new OAuth state.
// The state typically expires in 10 minutes.
Save(ctx context.Context, state *OAuthState) error
// GetAndDelete atomically retrieves and deletes the state.
// This ensures single-use semantics.
// Returns nil, nil if the state doesn't exist or has expired.
GetAndDelete(ctx context.Context, state string) (*OAuthState, error)
// Cleanup removes expired states.
// Should be called periodically (e.g., every hour) to clean up orphaned states.
Cleanup(ctx context.Context) error
}
OAuthStateStore manages OAuth flow state for CSRF protection. States are single-use and expire after a short period.
type OAuthToken ¶
type OAuthToken struct {
AccessToken string
RefreshToken string
ExpiresIn int // Seconds until expiry
TokenType string // Usually "Bearer"
Scope string // Space-separated scopes
}
OAuthToken represents OAuth tokens from a provider.
type OAuthTokenProvider ¶
type OAuthTokenProvider struct {
// contains filtered or unexported fields
}
OAuthTokenProvider implements TokenProvider for OAuth credentials. It automatically refreshes tokens when they expire.
func NewOAuthTokenProvider ¶
func NewOAuthTokenProvider( creds *domain.Credentials, refresher TokenRefresher, store CredentialsStore, ) *OAuthTokenProvider
NewOAuthTokenProvider creates a token provider for OAuth credentials.
func (*OAuthTokenProvider) AuthMethod ¶
func (p *OAuthTokenProvider) AuthMethod() domain.AuthMethod
AuthMethod returns OAuth2.
func (*OAuthTokenProvider) GetAccessToken ¶
func (p *OAuthTokenProvider) GetAccessToken(ctx context.Context) (string, error)
GetAccessToken returns a valid access token, refreshing if needed.
func (*OAuthTokenProvider) GetCredentials ¶
func (p *OAuthTokenProvider) GetCredentials(ctx context.Context) (*domain.Credentials, error)
GetCredentials returns the full credentials.
type OAuthUserInfo ¶
type OAuthUserInfo struct {
ID string // Provider-specific user ID
Email string
Name string
ImageURL string
}
OAuthUserInfo represents user info from OAuth provider.
type OperationalLimits ¶
type OperationalLimits struct {
SyncMinInterval int // Minutes floor (default: 5)
SyncMaxInterval int // Minutes ceiling (default: 1440)
MaxWorkers int // Worker ceiling (default: 10)
MaxResultsPerPage int // Results ceiling (default: 100)
}
OperationalLimits defines guardrails from environment configuration
type QueueStats ¶
type QueueStats struct {
// PendingCount is the number of tasks waiting to be processed
PendingCount int64 `json:"pending_count"`
// ProcessingCount is the number of tasks currently being processed
ProcessingCount int64 `json:"processing_count"`
// CompletedCount is the number of successfully completed tasks
CompletedCount int64 `json:"completed_count"`
// FailedCount is the number of tasks that failed after all retries
FailedCount int64 `json:"failed_count"`
// OldestPendingAge is the age of the oldest pending task in seconds
OldestPendingAge int64 `json:"oldest_pending_age"`
}
QueueStats contains queue statistics
type SchedulerStore ¶
type SchedulerStore interface {
// GetScheduledTask retrieves a scheduled task by ID
GetScheduledTask(ctx context.Context, id string) (*domain.ScheduledTask, error)
// ListScheduledTasks retrieves all scheduled tasks for a team
ListScheduledTasks(ctx context.Context, teamID string) ([]*domain.ScheduledTask, error)
// SaveScheduledTask creates or updates a scheduled task
SaveScheduledTask(ctx context.Context, task *domain.ScheduledTask) error
// DeleteScheduledTask removes a scheduled task
DeleteScheduledTask(ctx context.Context, id string) error
// GetDueScheduledTasks retrieves scheduled tasks that are due to run
GetDueScheduledTasks(ctx context.Context) ([]*domain.ScheduledTask, error)
// UpdateLastRun updates the last run time and next run time
UpdateLastRun(ctx context.Context, id string, lastError string) error
}
SchedulerStore handles persistence for scheduled tasks. This is separate from TaskQueue because scheduled tasks are configuration, not transient queue items.
type SearchEngine ¶
type SearchEngine interface {
// Index indexes chunks for a document (legacy, kept for backward compat)
Index(ctx context.Context, chunks []*domain.Chunk) error
// IndexDocument indexes a full document for BM25 text search.
// Uses document_id as the OpenSearch document ID (upsert semantics).
IndexDocument(ctx context.Context, doc *domain.DocumentContent) error
// Search performs a search query (legacy chunk-level search)
Search(ctx context.Context, query string, queryEmbedding []float32, opts domain.SearchOptions) ([]*domain.RankedChunk, int, error)
// SearchDocuments performs a BM25 text search returning document-level results.
SearchDocuments(ctx context.Context, query string, opts domain.SearchOptions) ([]DocumentResult, int, error)
// Delete deletes chunks by IDs
Delete(ctx context.Context, chunkIDs []string) error
// DeleteByDocument deletes all indexed data for a document
DeleteByDocument(ctx context.Context, documentID string) error
// DeleteByDocuments deletes all indexed data for multiple documents in a single operation
DeleteByDocuments(ctx context.Context, documentIDs []string) error
// DeleteBySource deletes all indexed data for a source
DeleteBySource(ctx context.Context, sourceID string) error
// HealthCheck verifies the search engine is available
HealthCheck(ctx context.Context) error
// Count returns the total number of indexed documents
Count(ctx context.Context) (int64, error)
}
SearchEngine handles search indexing and querying
type SearchQueryRepository ¶
type SearchQueryRepository interface {
// Save logs a search query for analytics tracking
Save(ctx context.Context, query *domain.SearchQuery) error
// GetSearchHistory retrieves recent search queries
// Returns up to limit most recent searches, ordered by created_at desc
GetSearchHistory(ctx context.Context, teamID string, limit int) ([]*domain.SearchQuery, error)
// GetSearchAnalytics computes aggregated search analytics for a time period
GetSearchAnalytics(ctx context.Context, teamID string, period domain.AnalyticsPeriod) (*domain.SearchAnalytics, error)
// GetSearchMetrics computes performance metrics for a time period
GetSearchMetrics(ctx context.Context, teamID string, period domain.AnalyticsPeriod) (*domain.SearchMetrics, error)
}
SearchQueryRepository handles search query logging and analytics
type SessionStore ¶
type SessionStore interface {
// Save stores a session with TTL based on ExpiresAt
Save(ctx context.Context, session *domain.Session) error
// Get retrieves a session by ID
Get(ctx context.Context, id string) (*domain.Session, error)
// GetByToken retrieves a session by token value
GetByToken(ctx context.Context, token string) (*domain.Session, error)
// GetByRefreshToken retrieves a session by refresh token value
GetByRefreshToken(ctx context.Context, refreshToken string) (*domain.Session, error)
// Delete deletes a session
Delete(ctx context.Context, id string) error
// DeleteByToken deletes a session by token
DeleteByToken(ctx context.Context, token string) error
// DeleteByUser deletes all sessions for a user (logout everywhere)
DeleteByUser(ctx context.Context, userID string) error
// ListByUser lists all active sessions for a user
ListByUser(ctx context.Context, userID string) ([]*domain.Session, error)
}
SessionStore handles session persistence (Redis)
type SettingsStore ¶
type SettingsStore interface {
// GetSettings retrieves settings for a team
GetSettings(ctx context.Context, teamID string) (*domain.Settings, error)
// SaveSettings persists team settings
SaveSettings(ctx context.Context, settings *domain.Settings) error
// GetAISettings retrieves AI-specific settings for a team
GetAISettings(ctx context.Context, teamID string) (*domain.AISettings, error)
// SaveAISettings persists AI-specific settings
SaveAISettings(ctx context.Context, teamID string, settings *domain.AISettings) error
}
SettingsStore persists team and AI settings
type SourceStore ¶
type SourceStore interface {
// Save creates or updates a source
Save(ctx context.Context, source *domain.Source) error
// Get retrieves a source by ID
Get(ctx context.Context, id string) (*domain.Source, error)
// GetByName retrieves a source by name
GetByName(ctx context.Context, name string) (*domain.Source, error)
// List retrieves all sources
List(ctx context.Context) ([]*domain.Source, error)
// ListEnabled retrieves all enabled sources
ListEnabled(ctx context.Context) ([]*domain.Source, error)
// Delete deletes a source
Delete(ctx context.Context, id string) error
// SetEnabled updates the enabled status
SetEnabled(ctx context.Context, id string, enabled bool) error
// CountByConnection returns the number of sources using a connection
CountByConnection(ctx context.Context, connectionID string) (int, error)
// ListByConnection returns sources using a connection
ListByConnection(ctx context.Context, connectionID string) ([]*domain.Source, error)
// UpdateContainers updates the containers for a source
UpdateContainers(ctx context.Context, id string, containers []domain.Container) error
}
SourceStore handles source persistence (PostgreSQL)
type StaticTokenProvider ¶
type StaticTokenProvider struct {
// contains filtered or unexported fields
}
StaticTokenProvider implements TokenProvider for non-OAuth credentials. Used for API keys, PATs, and basic auth.
func NewStaticTokenProvider ¶
func NewStaticTokenProvider(creds *domain.Credentials) *StaticTokenProvider
NewStaticTokenProvider creates a token provider for static credentials.
func (*StaticTokenProvider) AuthMethod ¶
func (p *StaticTokenProvider) AuthMethod() domain.AuthMethod
AuthMethod returns the authentication method.
func (*StaticTokenProvider) GetAccessToken ¶
func (p *StaticTokenProvider) GetAccessToken(ctx context.Context) (string, error)
GetAccessToken returns the API key or PAT.
func (*StaticTokenProvider) GetCredentials ¶
func (p *StaticTokenProvider) GetCredentials(ctx context.Context) (*domain.Credentials, error)
GetCredentials returns the full credentials.
type SyncStateStore ¶
type SyncStateStore interface {
// Save creates or updates sync state
Save(ctx context.Context, state *domain.SyncState) error
// Get retrieves sync state for a source
Get(ctx context.Context, sourceID string) (*domain.SyncState, error)
// List retrieves sync states for all sources
List(ctx context.Context) ([]*domain.SyncState, error)
// Delete deletes sync state for a source
Delete(ctx context.Context, sourceID string) error
// UpdateStatus updates only the status field
UpdateStatus(ctx context.Context, sourceID string, status domain.SyncStatus) error
// UpdateCursor updates the sync cursor
UpdateCursor(ctx context.Context, sourceID string, cursor string) error
}
SyncStateStore handles sync state persistence (PostgreSQL)
type TaskFilter ¶
type TaskFilter struct {
// TeamID filters by team (required)
TeamID string
// Status filters by task status (optional, empty means all)
Status domain.TaskStatus
// Type filters by task type (optional, empty means all)
Type domain.TaskType
// Limit is the maximum number of tasks to return
Limit int
// Offset is the number of tasks to skip (for pagination)
Offset int
}
TaskFilter specifies criteria for listing tasks
type TaskQueue ¶
type TaskQueue interface {
// Enqueue adds a task to the queue for processing.
// The task will be picked up by a worker based on priority and scheduled time.
Enqueue(ctx context.Context, task *domain.Task) error
// EnqueueBatch adds multiple tasks to the queue atomically.
// If any task fails to enqueue, all tasks are rolled back.
EnqueueBatch(ctx context.Context, tasks []*domain.Task) error
// Dequeue retrieves the next available task for processing.
// This should block until a task is available or context is cancelled.
// The task is marked as processing and will not be returned to other workers.
// Returns nil, nil if no tasks are available (for non-blocking implementations).
Dequeue(ctx context.Context) (*domain.Task, error)
// DequeueWithTimeout retrieves the next available task, waiting up to timeout.
// Returns nil, nil if timeout is reached with no tasks available.
DequeueWithTimeout(ctx context.Context, timeout int) (*domain.Task, error)
// Ack acknowledges successful completion of a task.
// The task is removed from the queue.
Ack(ctx context.Context, taskID string) error
// Nack indicates task processing failed and should be retried.
// The task is returned to the queue with updated retry count.
// If max retries exceeded, task is moved to failed state.
Nack(ctx context.Context, taskID string, reason string) error
// GetTask retrieves a task by ID (for status checking).
GetTask(ctx context.Context, taskID string) (*domain.Task, error)
// ListTasks retrieves tasks matching the filter criteria.
ListTasks(ctx context.Context, filter TaskFilter) ([]*domain.Task, error)
// CancelTask marks a pending task as cancelled.
// Returns error if task is already processing or completed.
CancelTask(ctx context.Context, taskID string) error
// PurgeTasks removes completed/failed tasks older than the specified age.
// This is used for cleanup.
PurgeTasks(ctx context.Context, olderThan int) (int, error)
// Stats returns queue statistics.
Stats(ctx context.Context) (*QueueStats, error)
// Ping checks if the queue backend is healthy.
Ping(ctx context.Context) error
// GetJobStats computes aggregated job statistics for a time period
// This is used by the admin dashboard to show job execution metrics
GetJobStats(ctx context.Context, teamID string, period domain.AnalyticsPeriod) (*domain.JobStats, error)
// CountTasks returns the total number of tasks matching the filter
// This is used for pagination to determine if there are more results
CountTasks(ctx context.Context, filter TaskFilter) (int64, error)
// Close cleans up resources.
Close() error
}
TaskQueue handles background task queuing and processing. Implementations can use Redis (preferred) or Postgres (fallback).
type TokenProvider ¶
type TokenProvider interface {
// GetAccessToken returns a valid access token.
// For OAuth, this automatically refreshes expired tokens.
// For API keys, this returns the stored key.
GetAccessToken(ctx context.Context) (string, error)
// GetCredentials returns the full credentials.
// Use GetAccessToken for most operations - this is for special cases.
GetCredentials(ctx context.Context) (*domain.Credentials, error)
// AuthMethod returns the authentication method.
AuthMethod() domain.AuthMethod
// IsValid checks if the credentials are still valid.
IsValid(ctx context.Context) bool
}
TokenProvider provides access tokens for API authentication. It handles token storage, retrieval, and automatic refresh for OAuth.
type TokenProviderFactory ¶
type TokenProviderFactory interface {
// Create creates a TokenProvider for a connection.
// It looks up the connection by ID, decrypts credentials, and creates
// an appropriate TokenProvider based on the auth method.
Create(ctx context.Context, connectionID string) (TokenProvider, error)
// CreateFromConnection creates a TokenProvider from a connection directly.
// Use this when you already have the connection loaded.
CreateFromConnection(ctx context.Context, conn *domain.Connection) (TokenProvider, error)
}
TokenProviderFactory creates TokenProviders from connection IDs. It resolves connection credentials and wraps them in appropriate TokenProviders.
type TokenRefresher ¶
type TokenRefresher interface {
// Refresh refreshes the OAuth tokens.
// Returns the new tokens and updates the credentials in the store.
Refresh(ctx context.Context, creds *domain.Credentials) (*domain.Credentials, error)
}
TokenRefresher handles OAuth token refresh operations. This is used internally by OAuth TokenProviders.
type UserStore ¶
type UserStore interface {
// Save creates or updates a user
Save(ctx context.Context, user *domain.User) error
// Get retrieves a user by ID
Get(ctx context.Context, id string) (*domain.User, error)
// GetByEmail retrieves a user by email
GetByEmail(ctx context.Context, email string) (*domain.User, error)
// List retrieves all users for a team
List(ctx context.Context, teamID string) ([]*domain.User, error)
// Delete deletes a user
Delete(ctx context.Context, id string) error
// UpdateLastLogin updates the last login timestamp
UpdateLastLogin(ctx context.Context, id string) error
}
UserStore handles user persistence (PostgreSQL)
type VectorIndex ¶
type VectorIndex interface {
// Index adds a single embedding to the index
Index(ctx context.Context, id string, documentID string, embedding []float32) error
// IndexBatch adds multiple embeddings with their chunk content
IndexBatch(ctx context.Context, ids []string, documentIDs []string, contents []string, embeddings [][]float32) error
// Search finds similar vectors, returns chunk IDs and distances
Search(ctx context.Context, embedding []float32, k int) ([]string, []float64, error)
// SearchWithContent finds similar vectors and returns chunk content alongside IDs/distances.
SearchWithContent(ctx context.Context, embedding []float32, k int) ([]VectorSearchResult, error)
// Delete removes a single embedding by chunk ID
Delete(ctx context.Context, id string) error
// DeleteBatch removes multiple embeddings by chunk IDs
DeleteBatch(ctx context.Context, ids []string) error
// DeleteByDocument removes all embeddings for a document
DeleteByDocument(ctx context.Context, documentID string) error
// DeleteByDocuments removes all embeddings for multiple documents in a single operation
DeleteByDocuments(ctx context.Context, documentIDs []string) error
// HealthCheck verifies the vector store is available
HealthCheck(ctx context.Context) error
}
VectorIndex handles vector similarity search using a standalone embeddings table. This interface allows for dedicated vector store implementations (e.g., pgvector).
Source Files
¶
- ai_factory.go
- auth.go
- capability_store.go
- config_provider.go
- connection_store.go
- connector.go
- container_lister.go
- distributed_lock.go
- document_store.go
- embedding.go
- llm.go
- normaliser.go
- oauth_state.go
- search_engine.go
- search_query_repository.go
- session_store.go
- settings_store.go
- source_store.go
- sync_store.go
- task_queue.go
- token_provider.go
- user_store.go