Documentation
¶
Index ¶
- type AICredentials
- type AIServiceFactory
- type AuthAdapter
- type AuthorizationCodeStore
- type Capabilities
- type CapabilityStore
- type ChunkOptions
- type ChunkResult
- type Chunker
- type ConfigProvider
- type ConnectionStore
- type Connector
- type ConnectorBuilder
- type ConnectorFactory
- type Container
- type ContainerLister
- type ContainerListerFactory
- type ContentExtractor
- type ContentFilter
- type CredentialsStore
- type DistributedLock
- type DocumentIDProvider
- type DocumentResult
- type DocumentStore
- type EmbeddingService
- type LLMService
- type Normaliser
- type NormaliserRegistry
- type OAuthClientStore
- type OAuthConfig
- type OAuthCredentials
- type OAuthHandler
- type OAuthHandlerFactory
- type OAuthState
- type OAuthStateStore
- type OAuthToken
- type OAuthTokenProvider
- type OAuthTokenStore
- type OAuthUserInfo
- type OperationalLimits
- type QueueStats
- type SchedulerStore
- type SearchEngine
- type SearchQueryRepository
- type SessionStore
- type SettingsStore
- type SourceStore
- type StaticTokenProvider
- func (p *StaticTokenProvider) AuthMethod() domain.AuthMethod
- func (p *StaticTokenProvider) GetAccessToken(ctx context.Context) (string, error)
- func (p *StaticTokenProvider) GetCredentials(ctx context.Context) (*domain.Credentials, error)
- func (p *StaticTokenProvider) IsValid(ctx context.Context) bool
- type SyncStateStore
- type TaskFilter
- type TaskQueue
- type TokenProvider
- type TokenProviderFactory
- type TokenRefresher
- type UserStore
- type VectorIndex
- type VectorSearchResult
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AICredentials ¶
AICredentials holds AI provider API keys and base URLs from environment variables
type AIServiceFactory ¶
type AIServiceFactory interface {
// CreateEmbeddingService creates an embedding service from settings and credentials.
// Credentials come from ConfigProvider (environment variables).
// Returns error if settings are invalid or service creation fails.
CreateEmbeddingService(settings *domain.EmbeddingSettings, credentials *AICredentials) (EmbeddingService, error)
// CreateLLMService creates an LLM service from settings and credentials.
// Credentials come from ConfigProvider (environment variables).
// Returns error if settings are invalid or service creation fails.
CreateLLMService(settings *domain.LLMSettings, credentials *AICredentials) (LLMService, error)
}
AIServiceFactory creates AI services based on configuration. Credentials are provided separately from domain settings.
type AuthAdapter ¶
type AuthAdapter interface {
// Password operations
HashPassword(password string) (string, error)
VerifyPassword(password, hash string) bool
// Token operations
GenerateToken(claims *domain.TokenClaims) (string, error)
ParseToken(token string) (*domain.TokenClaims, error)
}
AuthAdapter handles authentication cryptographic operations. This does NOT handle storage - use SessionStore for session persistence.
type AuthorizationCodeStore ¶ added in v0.2.1
type AuthorizationCodeStore interface {
// Save stores a new authorization code
Save(ctx context.Context, code *domain.AuthorizationCode) error
// GetAndMarkUsed atomically retrieves the code and marks it as used.
// This ensures single-use semantics for authorization codes.
// Returns domain.ErrNotFound if code doesn't exist.
GetAndMarkUsed(ctx context.Context, code string) (*domain.AuthorizationCode, error)
// Cleanup removes expired authorization codes.
// Should be called periodically (e.g., via cron job).
Cleanup(ctx context.Context) error
}
AuthorizationCodeStore manages short-lived authorization codes for the auth code flow
type Capabilities ¶
type Capabilities struct {
// OAuth providers that are configured via environment variables
OAuthProviders []domain.PlatformType
// AI providers available for embedding
EmbeddingProviders []domain.AIProvider
// AI providers available for LLM
LLMProviders []domain.AIProvider
// SearchEngineAvailable indicates if a search engine (e.g. OpenSearch) was initialized
SearchEngineAvailable bool
// VectorStoreAvailable indicates if a vector store (e.g. pgvector) was initialized
VectorStoreAvailable bool
// Operational boundaries from environment variables
Limits OperationalLimits
}
Capabilities represents what features are available based on environment configuration
type CapabilityStore ¶
type CapabilityStore interface {
// GetPreferences retrieves capability preferences for a team.
// Returns error if team not found or database error occurs.
GetPreferences(ctx context.Context, teamID string) (*domain.CapabilityPreferences, error)
// SavePreferences persists capability preferences for a team.
// Creates new preferences if they don't exist, updates if they do.
SavePreferences(ctx context.Context, prefs *domain.CapabilityPreferences) error
}
CapabilityStore manages capability preferences persistence
type ChunkOptions ¶
type ChunkOptions struct {
MaxChunkSize int // Maximum characters per chunk
Overlap int // Character overlap between chunks
}
ChunkOptions configures chunking behavior
func DefaultChunkOptions ¶
func DefaultChunkOptions() ChunkOptions
DefaultChunkOptions returns sensible defaults
type ChunkResult ¶
ChunkResult represents a chunk with position info
type Chunker ¶
type Chunker interface {
// Chunk splits content into chunks
Chunk(content string, opts ChunkOptions) []ChunkResult
}
Chunker splits document content into searchable chunks
type ConfigProvider ¶
type ConfigProvider interface {
// GetOAuthCredentials returns OAuth client credentials for a platform.
// Returns nil if the platform is not configured in environment variables.
GetOAuthCredentials(platform domain.PlatformType) *OAuthCredentials
// GetAICredentials returns AI provider credentials (API key, base URL).
// Returns nil if the provider is not configured in environment variables.
GetAICredentials(provider domain.AIProvider) *AICredentials
// IsOAuthConfigured returns true if OAuth credentials are available for the platform.
IsOAuthConfigured(platform domain.PlatformType) bool
// IsAIConfigured returns true if AI credentials are available for the provider.
IsAIConfigured(provider domain.AIProvider) bool
// GetCapabilities returns information about what's available based on env configuration.
// This is used for the /api/v1/capabilities endpoint.
GetCapabilities() *Capabilities
// GetBaseURL returns the application base URL for OAuth callbacks.
GetBaseURL() string
}
ConfigProvider provides access to configuration from environment variables. This is a driven port that abstracts environment-based configuration. Implementation lives in internal/config/ (infrastructure layer).
type ConnectionStore ¶
type ConnectionStore interface {
// Save stores a new connection or updates an existing one.
// Secrets are encrypted before storage.
Save(ctx context.Context, conn *domain.Connection) error
// Get retrieves a connection by ID with decrypted secrets.
// Returns domain.ErrNotFound if the connection doesn't exist.
Get(ctx context.Context, id string) (*domain.Connection, error)
// List retrieves all connections as summaries (no secrets).
List(ctx context.Context) ([]*domain.ConnectionSummary, error)
// Delete removes a connection by ID.
// Returns domain.ErrNotFound if the connection doesn't exist.
Delete(ctx context.Context, id string) error
// GetByPlatform retrieves connections for a platform type (no secrets).
GetByPlatform(ctx context.Context, platform domain.PlatformType) ([]*domain.ConnectionSummary, error)
// GetByAccountID retrieves a connection by platform type and account ID.
// Returns nil if not found.
GetByAccountID(ctx context.Context, platform domain.PlatformType, accountID string) (*domain.Connection, error)
// UpdateSecrets updates the encrypted secrets and OAuth metadata.
// Used after token refresh.
UpdateSecrets(ctx context.Context, id string, secrets *domain.ConnectionSecrets, expiry *time.Time) error
// UpdateLastUsed updates the last_used_at timestamp.
UpdateLastUsed(ctx context.Context, id string) error
}
ConnectionStore persists connector connections with encrypted secrets.
type Connector ¶
type Connector interface {
// Type returns the provider type.
Type() domain.ProviderType
// ValidateConfig validates source configuration.
ValidateConfig(config domain.SourceConfig) error
// FetchChanges fetches document changes since last sync.
// Returns changes, next cursor, and error.
// The cursor enables incremental sync - pass empty string for full sync.
FetchChanges(ctx context.Context, source *domain.Source, cursor string) ([]*domain.Change, string, error)
// FetchDocument fetches a single document by external ID.
// Returns the document, content hash, and error.
FetchDocument(ctx context.Context, source *domain.Source, externalID string) (*domain.Document, string, error)
// TestConnection tests the connection to the source.
TestConnection(ctx context.Context, source *domain.Source) error
}
Connector fetches documents from a source provider. Connectors are created by ConnectorBuilder with resolved credentials.
type ConnectorBuilder ¶
type ConnectorBuilder interface {
// Type returns the provider type this builder creates.
Type() domain.ProviderType
// Build creates a connector scoped to a specific container.
// The containerID comes from source.SelectedContainers (one per sync job).
// For providers that don't support container selection, containerID may be empty.
// The TokenProvider handles credential retrieval and OAuth token refresh.
Build(ctx context.Context, tokenProvider TokenProvider, containerID string) (Connector, error)
// SupportsOAuth returns true if this connector uses OAuth authentication.
SupportsOAuth() bool
// OAuthConfig returns OAuth configuration for this provider.
// Returns nil if the provider doesn't support OAuth.
OAuthConfig() *OAuthConfig
// SupportsContainerSelection returns true if this connector supports container picking.
// If true, admins can select specific containers (repos, drives, spaces) to index.
// If false, the connector indexes all accessible content.
SupportsContainerSelection() bool
}
ConnectorBuilder creates connector instances for a specific provider type. Each provider has its own builder registered with the ConnectorFactory.
type ConnectorFactory ¶
type ConnectorFactory interface {
// Register registers a connector builder for a provider type.
Register(builder ConnectorBuilder)
// Create creates a connector for the given source, scoped to a container.
// Called by SyncOrchestrator once per container in source.SelectedContainers.
// For providers without container selection, containerID may be empty.
Create(ctx context.Context, source *domain.Source, containerID string) (Connector, error)
// SupportedTypes returns all registered provider types.
SupportedTypes() []domain.ProviderType
// GetBuilder returns the builder for a provider type.
GetBuilder(providerType domain.ProviderType) (ConnectorBuilder, error)
// SupportsOAuth returns true if the provider supports OAuth.
SupportsOAuth(providerType domain.ProviderType) bool
// GetOAuthConfig returns OAuth config for a provider.
GetOAuthConfig(providerType domain.ProviderType) *OAuthConfig
}
ConnectorFactory manages connector builders and creates connectors.
type Container ¶
type Container struct {
// ID is the provider-specific identifier.
// Format varies by provider:
// - GitHub: "owner/repo"
// - Google Drive: drive ID
// - Confluence: space key
ID string `json:"id"`
// Name is the human-readable display name.
Name string `json:"name"`
// Description is an optional description of the container.
Description string `json:"description,omitempty"`
// Type identifies the container type.
// Examples: "repository", "shared_drive", "space", "project", "channel"
Type string `json:"type"`
// HasChildren indicates if this container has child containers (for folder navigation).
HasChildren bool `json:"has_children,omitempty"`
// Metadata contains provider-specific additional data.
// Examples:
// - GitHub: {"owner": "...", "private": "true", "archived": "false"}
// - Google Drive: {"shared": "true"}
Metadata map[string]string `json:"metadata,omitempty"`
}
Container represents a top-level container that can be selected for indexing. Examples:
- GitHub: repository (owner/repo)
- Google Drive: shared drive
- Confluence: space
- Jira: project
- Notion: workspace
- Slack: channel
type ContainerLister ¶
type ContainerLister interface {
// ListContainers lists containers accessible with the current credentials.
// Pagination is handled via cursor - pass empty string for the first page.
// parentID can be used to list children of a specific container (for folder navigation).
// Pass empty string for root level containers.
// Returns:
// - containers: list of available containers
// - nextCursor: cursor for the next page, empty if no more pages
// - err: any error that occurred
ListContainers(ctx context.Context, cursor string, parentID string) ([]*Container, string, error)
}
ContainerLister lists available containers for a connector. Not all connectors support container selection - some index all accessible content.
The ContainerLister is typically retrieved from the ConnectorFactory for a specific provider type and installation.
type ContainerListerFactory ¶
type ContainerListerFactory interface {
// Create creates a ContainerLister for the given provider and installation.
// Returns nil if the provider doesn't support container selection.
Create(ctx context.Context, providerType domain.ProviderType, installationID string) (ContainerLister, error)
// SupportsContainerSelection returns true if the provider supports container selection.
// If false, the connector indexes all accessible content.
SupportsContainerSelection(providerType domain.ProviderType) bool
}
ContainerListerFactory creates ContainerListers for different providers.
type ContentExtractor ¶
type ContentExtractor interface {
// Extract extracts text content from raw data
Extract(ctx context.Context, data []byte, mimeType string) (string, error)
// SupportedTypes returns supported MIME types
SupportedTypes() []string
}
ContentExtractor extracts text content from various file formats
type ContentFilter ¶ added in v0.2.1
type ContentFilter interface {
// ShouldFetchContent determines if content should be fetched for a file.
// It combines path pattern matching and MIME type detection to make the decision.
// Returns:
// - shouldFetch: true if content should be fetched and processed
// - mimeType: detected MIME type from the file path
ShouldFetchContent(ctx context.Context, path string, settings *domain.SyncExclusionSettings) (shouldFetch bool, mimeType string)
// GetMimeType returns the MIME type for a file path based on its extension or filename.
// This provides a single source of truth for MIME type detection across the application.
GetMimeType(path string) string
}
ContentFilter determines whether content should be fetched and processed based on file path patterns and MIME type exclusions. This port enables early filtering before content is fetched, reducing wasted resources.
type CredentialsStore ¶
type CredentialsStore interface {
// Save stores credentials (encrypts sensitive fields)
Save(ctx context.Context, creds *domain.Credentials) error
// Get retrieves credentials by ID
Get(ctx context.Context, id string) (*domain.Credentials, error)
// List retrieves all credentials
List(ctx context.Context) ([]*domain.Credentials, error)
// Delete deletes credentials
Delete(ctx context.Context, id string) error
// GetByProvider retrieves credentials for a provider type
GetByProvider(ctx context.Context, providerType domain.ProviderType) ([]*domain.Credentials, error)
}
CredentialsStore handles credential persistence (PostgreSQL, encrypted)
type DistributedLock ¶
type DistributedLock interface {
// Acquire attempts to acquire a named lock with the given TTL.
// Returns true if the lock was successfully acquired, false if already held by another instance.
// The lock will automatically expire after TTL (implementation dependent).
Acquire(ctx context.Context, name string, ttl time.Duration) (acquired bool, err error)
// Release releases a named lock.
// This is best-effort; implementations with TTL will auto-expire anyway.
// Safe to call even if the lock is not held or has expired.
Release(ctx context.Context, name string) error
// Extend extends the TTL of a currently held lock.
// Returns error if the lock is not held by this instance.
// Note: Not all implementations support TTL extension (e.g., PostgreSQL advisory locks).
Extend(ctx context.Context, name string, ttl time.Duration) error
// Ping checks if the lock backend is healthy.
Ping(ctx context.Context) error
}
DistributedLock provides distributed locking for coordinating work across instances. This is used to prevent duplicate execution in multi-instance deployments.
type DocumentIDProvider ¶ added in v0.2.2
type DocumentIDProvider interface {
// GetAllowedDocumentIDs returns the document IDs that should be included in search.
// Returns nil/empty slice if no filtering should be applied.
GetAllowedDocumentIDs(ctx context.Context, query string, filters pipeline.SearchFilters) ([]string, error)
}
DocumentIDProvider provides document IDs for pre-filtering search results. Implementations can apply ACL, tenant isolation, or other filtering logic.
type DocumentResult ¶
type DocumentResult struct {
DocumentID string
SourceID string
Title string
Content string // Full document content (for snippet extraction)
Score float64
}
DocumentResult represents a document-level search result from BM25.
type DocumentStore ¶
type DocumentStore interface {
// Save creates or updates a document
Save(ctx context.Context, doc *domain.Document) error
// SaveBatch saves multiple documents in a transaction
SaveBatch(ctx context.Context, docs []*domain.Document) error
// Get retrieves a document by ID
Get(ctx context.Context, id string) (*domain.Document, error)
// GetByExternalID retrieves a document by source and external ID
GetByExternalID(ctx context.Context, sourceID, externalID string) (*domain.Document, error)
// GetBySource retrieves all documents for a source with pagination
GetBySource(ctx context.Context, sourceID string, limit, offset int) ([]*domain.Document, error)
// Delete deletes a document
Delete(ctx context.Context, id string) error
// DeleteBySource deletes all documents for a source
DeleteBySource(ctx context.Context, sourceID string) error
// DeleteBySourceAndContainer deletes all documents for a specific container within a source
DeleteBySourceAndContainer(ctx context.Context, sourceID, containerID string) error
// DeleteBatch deletes multiple documents by ID
DeleteBatch(ctx context.Context, ids []string) error
// Count returns total document count
Count(ctx context.Context) (int, error)
// CountBySource returns document count for a source
CountBySource(ctx context.Context, sourceID string) (int, error)
// ListExternalIDs returns all external IDs for a source (for diff sync)
ListExternalIDs(ctx context.Context, sourceID string) ([]string, error)
}
DocumentStore handles document persistence (PostgreSQL)
type EmbeddingService ¶
type EmbeddingService interface {
// Embed generates embeddings for multiple texts
Embed(ctx context.Context, texts []string) ([][]float32, error)
// EmbedQuery generates an embedding for a search query
// May use different model/parameters optimized for queries
EmbedQuery(ctx context.Context, query string) ([]float32, error)
// Dimensions returns the embedding dimension size
Dimensions() int
// Model returns the model name being used
Model() string
// HealthCheck verifies the embedding service is available
HealthCheck(ctx context.Context) error
// Close releases resources held by the embedding service
Close() error
}
EmbeddingService generates text embeddings
type LLMService ¶
type LLMService interface {
// Complete sends a completion request to the LLM and returns the response.
// This is the primary method for all LLM interactions.
Complete(ctx context.Context, req domain.CompletionRequest) (domain.CompletionResponse, error)
// Model returns the model name being used (e.g., "gpt-4o", "claude-3-5-sonnet")
Model() string
// Ping verifies the LLM service is reachable and properly configured
Ping(ctx context.Context) error
// Close releases resources held by the LLM service
Close() error
}
LLMService provides a generic completion interface for Large Language Models. This is a pure provider interface - task-specific logic (query expansion, summarization, etc.) is implemented in pipeline stages.
type Normaliser ¶
type Normaliser interface {
// Normalise transforms raw content into normalized text.
// The mimeType helps determine the appropriate processing.
Normalise(content string, mimeType string) string
// SupportedTypes returns MIME types this normaliser handles.
// Can include wildcards like "text/*" or specific types like "text/markdown".
SupportedTypes() []string
// Priority returns the normaliser priority (higher = more specific).
// Priority ranges:
// 90-100: Connector-specific (e.g., Gmail email normaliser)
// 50-89: Format-specific (PDF, Markdown, HTML)
// 10-49: Generic (basic text processing)
// 1-9: Fallback (raw text extraction)
Priority() int
}
Normaliser normalizes raw document content for indexing. It transforms provider-specific document formats into normalized text.
type NormaliserRegistry ¶
type NormaliserRegistry interface {
// Get retrieves the best-matching normaliser for a MIME type.
// Returns nil if no normaliser is registered for the type.
// When multiple match, the highest priority normaliser is returned.
Get(mimeType string) Normaliser
// GetAll retrieves all normalisers that match a MIME type, sorted by priority (highest first).
GetAll(mimeType string) []Normaliser
// Register registers a normaliser.
Register(normaliser Normaliser)
// List returns all registered MIME types.
List() []string
}
NormaliserRegistry manages content normalisers. When multiple normalisers match a MIME type, the highest priority one is used.
type OAuthClientStore ¶ added in v0.2.1
type OAuthClientStore interface {
// Save stores a new client or updates an existing one
Save(ctx context.Context, client *domain.OAuthClient) error
// Get retrieves a client by client_id
// Returns domain.ErrNotFound if the client doesn't exist
Get(ctx context.Context, clientID string) (*domain.OAuthClient, error)
// Delete removes a client by client_id
// Returns domain.ErrNotFound if the client doesn't exist
Delete(ctx context.Context, clientID string) error
// List retrieves all registered clients
List(ctx context.Context) ([]*domain.OAuthClient, error)
}
OAuthClientStore manages OAuth 2.0 client registrations
type OAuthConfig ¶
type OAuthConfig struct {
// AuthURL is the authorization endpoint
AuthURL string
// TokenURL is the token exchange endpoint
TokenURL string
// Scopes are the required OAuth scopes
Scopes []string
// UserInfoURL is the endpoint to fetch user info (optional)
UserInfoURL string
}
OAuthConfig contains OAuth settings for a provider.
type OAuthCredentials ¶
OAuthCredentials holds OAuth client credentials from environment variables
type OAuthHandler ¶
type OAuthHandler interface {
// DefaultConfig returns the default OAuth configuration (auth URL, token URL, scopes)
DefaultConfig() OAuthConfig
// BuildAuthURL constructs the OAuth authorization URL with PKCE
BuildAuthURL(clientID, redirectURI, state, codeChallenge string, scopes []string) string
// ExchangeCode exchanges an authorization code for access tokens
ExchangeCode(ctx context.Context, clientID, clientSecret, code, redirectURI, codeVerifier string) (*OAuthToken, error)
// GetUserInfo retrieves user information using an access token
GetUserInfo(ctx context.Context, accessToken string) (*OAuthUserInfo, error)
// RefreshToken refreshes an expired access token
RefreshToken(ctx context.Context, refreshToken string) (*OAuthToken, error)
}
OAuthHandler handles OAuth flow for a provider.
type OAuthHandlerFactory ¶
type OAuthHandlerFactory interface {
// GetOAuthHandler returns an OAuth handler for the given platform.
// Returns nil if the platform doesn't support OAuth or isn't implemented.
GetOAuthHandler(platform domain.PlatformType) OAuthHandler
}
OAuthHandlerFactory creates OAuth handlers for specific platforms. This abstracts the connectors.Factory dependency.
type OAuthState ¶
type OAuthState struct {
// State is a cryptographically random string used for CSRF protection.
State string
// ProviderType is the OAuth provider (github, google_drive, etc.)
ProviderType string
// CodeVerifier is the PKCE code verifier (plain text, not hashed).
// This is used to generate code_challenge for the auth request
// and sent as code_verifier during token exchange.
CodeVerifier string
// RedirectURI is the callback URL where the provider will redirect.
RedirectURI string
// ReturnContext indicates where to redirect after OAuth completes.
// Values: "setup", "admin-sources", or empty for default behavior.
ReturnContext string
// CreatedAt is when the state was created.
CreatedAt time.Time
// ExpiresAt is when the state expires (typically 10 minutes).
ExpiresAt time.Time
}
OAuthState represents a pending OAuth authorization flow state. Used for CSRF protection and PKCE code verifier storage.
type OAuthStateStore ¶
type OAuthStateStore interface {
// Save stores a new OAuth state.
// The state typically expires in 10 minutes.
Save(ctx context.Context, state *OAuthState) error
// GetAndDelete atomically retrieves and deletes the state.
// This ensures single-use semantics.
// Returns nil, nil if the state doesn't exist or has expired.
GetAndDelete(ctx context.Context, state string) (*OAuthState, error)
// Cleanup removes expired states.
// Should be called periodically (e.g., every hour) to clean up orphaned states.
Cleanup(ctx context.Context) error
}
OAuthStateStore manages OAuth flow state for CSRF protection. States are single-use and expire after a short period.
type OAuthToken ¶
type OAuthToken struct {
AccessToken string
RefreshToken string
ExpiresIn int // Seconds until expiry
TokenType string // Usually "Bearer"
Scope string // Space-separated scopes
}
OAuthToken represents OAuth tokens from a provider.
type OAuthTokenProvider ¶
type OAuthTokenProvider struct {
// contains filtered or unexported fields
}
OAuthTokenProvider implements TokenProvider for OAuth credentials. It automatically refreshes tokens when they expire.
func NewOAuthTokenProvider ¶
func NewOAuthTokenProvider( creds *domain.Credentials, refresher TokenRefresher, store CredentialsStore, ) *OAuthTokenProvider
NewOAuthTokenProvider creates a token provider for OAuth credentials.
func (*OAuthTokenProvider) AuthMethod ¶
func (p *OAuthTokenProvider) AuthMethod() domain.AuthMethod
AuthMethod returns OAuth2.
func (*OAuthTokenProvider) GetAccessToken ¶
func (p *OAuthTokenProvider) GetAccessToken(ctx context.Context) (string, error)
GetAccessToken returns a valid access token, refreshing if needed.
func (*OAuthTokenProvider) GetCredentials ¶
func (p *OAuthTokenProvider) GetCredentials(ctx context.Context) (*domain.Credentials, error)
GetCredentials returns the full credentials.
type OAuthTokenStore ¶ added in v0.2.1
type OAuthTokenStore interface {
// SaveAccessToken stores a new access token
SaveAccessToken(ctx context.Context, token *domain.OAuthAccessToken) error
// GetAccessToken retrieves an access token by its ID (jti claim)
// Returns domain.ErrNotFound if the token doesn't exist
GetAccessToken(ctx context.Context, tokenID string) (*domain.OAuthAccessToken, error)
// RevokeAccessToken marks an access token as revoked
// Returns domain.ErrNotFound if the token doesn't exist
RevokeAccessToken(ctx context.Context, tokenID string) error
// SaveRefreshToken stores a new refresh token
SaveRefreshToken(ctx context.Context, token *domain.OAuthRefreshToken) error
// GetRefreshToken retrieves a refresh token by its ID
// Returns domain.ErrNotFound if the token doesn't exist
GetRefreshToken(ctx context.Context, tokenID string) (*domain.OAuthRefreshToken, error)
// RevokeRefreshToken marks a refresh token as revoked
// Returns domain.ErrNotFound if the token doesn't exist
RevokeRefreshToken(ctx context.Context, tokenID string) error
// RotateRefreshToken marks the old token as rotated and returns the new token ID.
// This implements refresh token rotation for enhanced security.
// Returns domain.ErrNotFound if the old token doesn't exist.
RotateRefreshToken(ctx context.Context, oldTokenID string, newTokenID string) error
// RevokeAllForClient revokes all tokens (access and refresh) for a given client.
// Used when a client is deleted or compromised.
RevokeAllForClient(ctx context.Context, clientID string) error
// Cleanup removes expired tokens.
// Should be called periodically (e.g., via cron job).
Cleanup(ctx context.Context) error
}
OAuthTokenStore manages access tokens and refresh tokens with revocation support
type OAuthUserInfo ¶
type OAuthUserInfo struct {
ID string // Provider-specific user ID
Email string
Name string
ImageURL string
}
OAuthUserInfo represents user info from OAuth provider.
type OperationalLimits ¶
type OperationalLimits struct {
SyncMinInterval int // Minutes floor (default: 5)
SyncMaxInterval int // Minutes ceiling (default: 1440)
MaxWorkers int // Worker ceiling (default: 10)
MaxResultsPerPage int // Results ceiling (default: 100)
}
OperationalLimits defines guardrails from environment configuration
type QueueStats ¶
type QueueStats struct {
// PendingCount is the number of tasks waiting to be processed
PendingCount int64 `json:"pending_count"`
// ProcessingCount is the number of tasks currently being processed
ProcessingCount int64 `json:"processing_count"`
// CompletedCount is the number of successfully completed tasks
CompletedCount int64 `json:"completed_count"`
// FailedCount is the number of tasks that failed after all retries
FailedCount int64 `json:"failed_count"`
// OldestPendingAge is the age of the oldest pending task in seconds
OldestPendingAge int64 `json:"oldest_pending_age"`
}
QueueStats contains queue statistics
type SchedulerStore ¶
type SchedulerStore interface {
// GetScheduledTask retrieves a scheduled task by ID
GetScheduledTask(ctx context.Context, id string) (*domain.ScheduledTask, error)
// ListScheduledTasks retrieves all scheduled tasks for a team
ListScheduledTasks(ctx context.Context, teamID string) ([]*domain.ScheduledTask, error)
// SaveScheduledTask creates or updates a scheduled task
SaveScheduledTask(ctx context.Context, task *domain.ScheduledTask) error
// DeleteScheduledTask removes a scheduled task
DeleteScheduledTask(ctx context.Context, id string) error
// GetDueScheduledTasks retrieves scheduled tasks that are due to run
GetDueScheduledTasks(ctx context.Context) ([]*domain.ScheduledTask, error)
// UpdateLastRun updates the last run time and next run time
UpdateLastRun(ctx context.Context, id string, lastError string) error
}
SchedulerStore handles persistence for scheduled tasks. This is separate from TaskQueue because scheduled tasks are configuration, not transient queue items.
type SearchEngine ¶
type SearchEngine interface {
// Index indexes chunks for a document (legacy, kept for backward compat)
Index(ctx context.Context, chunks []*domain.Chunk) error
// IndexDocument indexes a full document for BM25 text search.
// Uses document_id as the OpenSearch document ID (upsert semantics).
IndexDocument(ctx context.Context, doc *domain.DocumentContent) error
// Search performs a search query (legacy chunk-level search)
Search(ctx context.Context, query string, queryEmbedding []float32, opts domain.SearchOptions) ([]*domain.RankedChunk, int, error)
// SearchDocuments performs a BM25 text search returning document-level results.
SearchDocuments(ctx context.Context, query string, opts domain.SearchOptions) ([]DocumentResult, int, error)
// Delete deletes chunks by IDs
Delete(ctx context.Context, chunkIDs []string) error
// DeleteByDocument deletes all indexed data for a document
DeleteByDocument(ctx context.Context, documentID string) error
// DeleteByDocuments deletes all indexed data for multiple documents in a single operation
DeleteByDocuments(ctx context.Context, documentIDs []string) error
// DeleteBySource deletes all indexed data for a source
DeleteBySource(ctx context.Context, sourceID string) error
// DeleteBySourceAndContainer deletes all indexed data for a specific container within a source
DeleteBySourceAndContainer(ctx context.Context, sourceID, containerID string) error
// HealthCheck verifies the search engine is available
HealthCheck(ctx context.Context) error
// Count returns the total number of indexed documents
Count(ctx context.Context) (int64, error)
// GetDocument retrieves a document's full indexed content by document ID.
// Returns domain.ErrNotFound if the document is not in the search index.
GetDocument(ctx context.Context, documentID string) (*domain.DocumentContent, error)
}
SearchEngine handles search indexing and querying
type SearchQueryRepository ¶
type SearchQueryRepository interface {
// Save logs a search query for analytics tracking
Save(ctx context.Context, query *domain.SearchQuery) error
// GetSearchHistory retrieves recent search queries
// Returns up to limit most recent searches, ordered by created_at desc
GetSearchHistory(ctx context.Context, teamID string, limit int) ([]*domain.SearchQuery, error)
// GetSearchAnalytics computes aggregated search analytics for a time period
GetSearchAnalytics(ctx context.Context, teamID string, period domain.AnalyticsPeriod) (*domain.SearchAnalytics, error)
// GetSearchMetrics computes performance metrics for a time period
GetSearchMetrics(ctx context.Context, teamID string, period domain.AnalyticsPeriod) (*domain.SearchMetrics, error)
}
SearchQueryRepository handles search query logging and analytics
type SessionStore ¶
type SessionStore interface {
// Save stores a session with TTL based on ExpiresAt
Save(ctx context.Context, session *domain.Session) error
// Get retrieves a session by ID
Get(ctx context.Context, id string) (*domain.Session, error)
// GetByToken retrieves a session by token value
GetByToken(ctx context.Context, token string) (*domain.Session, error)
// GetByRefreshToken retrieves a session by refresh token value
GetByRefreshToken(ctx context.Context, refreshToken string) (*domain.Session, error)
// Delete deletes a session
Delete(ctx context.Context, id string) error
// DeleteByToken deletes a session by token
DeleteByToken(ctx context.Context, token string) error
// DeleteByUser deletes all sessions for a user (logout everywhere)
DeleteByUser(ctx context.Context, userID string) error
// ListByUser lists all active sessions for a user
ListByUser(ctx context.Context, userID string) ([]*domain.Session, error)
}
SessionStore handles session persistence (Redis)
type SettingsStore ¶
type SettingsStore interface {
// GetSettings retrieves settings for a team
GetSettings(ctx context.Context, teamID string) (*domain.Settings, error)
// SaveSettings persists team settings
SaveSettings(ctx context.Context, settings *domain.Settings) error
// GetAISettings retrieves AI-specific settings for a team
GetAISettings(ctx context.Context, teamID string) (*domain.AISettings, error)
// SaveAISettings persists AI-specific settings
SaveAISettings(ctx context.Context, teamID string, settings *domain.AISettings) error
}
SettingsStore persists team and AI settings
type SourceStore ¶
type SourceStore interface {
// Save creates or updates a source
Save(ctx context.Context, source *domain.Source) error
// Get retrieves a source by ID
Get(ctx context.Context, id string) (*domain.Source, error)
// GetByName retrieves a source by name
GetByName(ctx context.Context, name string) (*domain.Source, error)
// List retrieves all sources
List(ctx context.Context) ([]*domain.Source, error)
// ListEnabled retrieves all enabled sources
ListEnabled(ctx context.Context) ([]*domain.Source, error)
// Delete deletes a source
Delete(ctx context.Context, id string) error
// SetEnabled updates the enabled status
SetEnabled(ctx context.Context, id string, enabled bool) error
// CountByConnection returns the number of sources using a connection
CountByConnection(ctx context.Context, connectionID string) (int, error)
// ListByConnection returns sources using a connection
ListByConnection(ctx context.Context, connectionID string) ([]*domain.Source, error)
// UpdateContainers updates the containers for a source
UpdateContainers(ctx context.Context, id string, containers []domain.Container) error
}
SourceStore handles source persistence (PostgreSQL)
type StaticTokenProvider ¶
type StaticTokenProvider struct {
// contains filtered or unexported fields
}
StaticTokenProvider implements TokenProvider for non-OAuth credentials. Used for API keys, PATs, and basic auth.
func NewStaticTokenProvider ¶
func NewStaticTokenProvider(creds *domain.Credentials) *StaticTokenProvider
NewStaticTokenProvider creates a token provider for static credentials.
func (*StaticTokenProvider) AuthMethod ¶
func (p *StaticTokenProvider) AuthMethod() domain.AuthMethod
AuthMethod returns the authentication method.
func (*StaticTokenProvider) GetAccessToken ¶
func (p *StaticTokenProvider) GetAccessToken(ctx context.Context) (string, error)
GetAccessToken returns the API key or PAT.
func (*StaticTokenProvider) GetCredentials ¶
func (p *StaticTokenProvider) GetCredentials(ctx context.Context) (*domain.Credentials, error)
GetCredentials returns the full credentials.
type SyncStateStore ¶
type SyncStateStore interface {
// Save creates or updates sync state
Save(ctx context.Context, state *domain.SyncState) error
// Get retrieves sync state for a source
Get(ctx context.Context, sourceID string) (*domain.SyncState, error)
// List retrieves sync states for all sources
List(ctx context.Context) ([]*domain.SyncState, error)
// Delete deletes sync state for a source
Delete(ctx context.Context, sourceID string) error
// UpdateStatus updates only the status field
UpdateStatus(ctx context.Context, sourceID string, status domain.SyncStatus) error
// UpdateCursor updates the sync cursor
UpdateCursor(ctx context.Context, sourceID string, cursor string) error
}
SyncStateStore handles sync state persistence (PostgreSQL)
type TaskFilter ¶
type TaskFilter struct {
// TeamID filters by team (required)
TeamID string
// Status filters by task status (optional, empty means all)
Status domain.TaskStatus
// Type filters by task type (optional, empty means all)
Type domain.TaskType
// Limit is the maximum number of tasks to return
Limit int
// Offset is the number of tasks to skip (for pagination)
Offset int
}
TaskFilter specifies criteria for listing tasks
type TaskQueue ¶
type TaskQueue interface {
// Enqueue adds a task to the queue for processing.
// The task will be picked up by a worker based on priority and scheduled time.
Enqueue(ctx context.Context, task *domain.Task) error
// EnqueueBatch adds multiple tasks to the queue atomically.
// If any task fails to enqueue, all tasks are rolled back.
EnqueueBatch(ctx context.Context, tasks []*domain.Task) error
// Dequeue retrieves the next available task for processing.
// This should block until a task is available or context is cancelled.
// The task is marked as processing and will not be returned to other workers.
// Returns nil, nil if no tasks are available (for non-blocking implementations).
Dequeue(ctx context.Context) (*domain.Task, error)
// DequeueWithTimeout retrieves the next available task, waiting up to timeout.
// Returns nil, nil if timeout is reached with no tasks available.
DequeueWithTimeout(ctx context.Context, timeout int) (*domain.Task, error)
// Ack acknowledges successful completion of a task.
// The task is removed from the queue.
Ack(ctx context.Context, taskID string) error
// Nack indicates task processing failed and should be retried.
// The task is returned to the queue with updated retry count.
// If max retries exceeded, task is moved to failed state.
Nack(ctx context.Context, taskID string, reason string) error
// GetTask retrieves a task by ID (for status checking).
GetTask(ctx context.Context, taskID string) (*domain.Task, error)
// ListTasks retrieves tasks matching the filter criteria.
ListTasks(ctx context.Context, filter TaskFilter) ([]*domain.Task, error)
// CancelTask marks a pending task as cancelled.
// Returns error if task is already processing or completed.
CancelTask(ctx context.Context, taskID string) error
// PurgeTasks removes completed/failed tasks older than the specified age.
// This is used for cleanup.
PurgeTasks(ctx context.Context, olderThan int) (int, error)
// Stats returns queue statistics.
Stats(ctx context.Context) (*QueueStats, error)
// Ping checks if the queue backend is healthy.
Ping(ctx context.Context) error
// GetJobStats computes aggregated job statistics for a time period
// This is used by the admin dashboard to show job execution metrics
GetJobStats(ctx context.Context, teamID string, period domain.AnalyticsPeriod) (*domain.JobStats, error)
// CountTasks returns the total number of tasks matching the filter
// This is used for pagination to determine if there are more results
CountTasks(ctx context.Context, filter TaskFilter) (int64, error)
// Close cleans up resources.
Close() error
}
TaskQueue handles background task queuing and processing. Implementations can use Redis (preferred) or Postgres (fallback).
type TokenProvider ¶
type TokenProvider interface {
// GetAccessToken returns a valid access token.
// For OAuth, this automatically refreshes expired tokens.
// For API keys, this returns the stored key.
GetAccessToken(ctx context.Context) (string, error)
// GetCredentials returns the full credentials.
// Use GetAccessToken for most operations - this is for special cases.
GetCredentials(ctx context.Context) (*domain.Credentials, error)
// AuthMethod returns the authentication method.
AuthMethod() domain.AuthMethod
// IsValid checks if the credentials are still valid.
IsValid(ctx context.Context) bool
}
TokenProvider provides access tokens for API authentication. It handles token storage, retrieval, and automatic refresh for OAuth.
type TokenProviderFactory ¶
type TokenProviderFactory interface {
// Create creates a TokenProvider for a connection.
// It looks up the connection by ID, decrypts credentials, and creates
// an appropriate TokenProvider based on the auth method.
Create(ctx context.Context, connectionID string) (TokenProvider, error)
// CreateFromConnection creates a TokenProvider from a connection directly.
// Use this when you already have the connection loaded.
CreateFromConnection(ctx context.Context, conn *domain.Connection) (TokenProvider, error)
}
TokenProviderFactory creates TokenProviders from connection IDs. It resolves connection credentials and wraps them in appropriate TokenProviders.
type TokenRefresher ¶
type TokenRefresher interface {
// Refresh refreshes the OAuth tokens.
// Returns the new tokens and updates the credentials in the store.
Refresh(ctx context.Context, creds *domain.Credentials) (*domain.Credentials, error)
}
TokenRefresher handles OAuth token refresh operations. This is used internally by OAuth TokenProviders.
type UserStore ¶
type UserStore interface {
// Save creates or updates a user
Save(ctx context.Context, user *domain.User) error
// Get retrieves a user by ID
Get(ctx context.Context, id string) (*domain.User, error)
// GetByEmail retrieves a user by email
GetByEmail(ctx context.Context, email string) (*domain.User, error)
// List retrieves all users for a team
List(ctx context.Context, teamID string) ([]*domain.User, error)
// Delete deletes a user
Delete(ctx context.Context, id string) error
// UpdateLastLogin updates the last login timestamp
UpdateLastLogin(ctx context.Context, id string) error
}
UserStore handles user persistence (PostgreSQL)
type VectorIndex ¶
type VectorIndex interface {
// Index adds a single embedding to the index
Index(ctx context.Context, id string, documentID string, embedding []float32) error
// IndexBatch adds multiple embeddings with their chunk content
IndexBatch(ctx context.Context, ids []string, documentIDs []string, sourceIDs []string, contents []string, embeddings [][]float32) error
// Search finds similar vectors, returns chunk IDs and distances
Search(ctx context.Context, embedding []float32, k int) ([]string, []float64, error)
// SearchWithContent finds similar vectors and returns chunk content alongside IDs/distances.
// sourceIDs optionally filters results to specific sources (nil or empty = no filter).
// documentIDs optionally filters results to specific documents (nil or empty = no filter).
SearchWithContent(ctx context.Context, embedding []float32, k int, sourceIDs []string, documentIDs []string) ([]VectorSearchResult, error)
// Delete removes a single embedding by chunk ID
Delete(ctx context.Context, id string) error
// DeleteBatch removes multiple embeddings by chunk IDs
DeleteBatch(ctx context.Context, ids []string) error
// DeleteByDocument removes all embeddings for a document
DeleteByDocument(ctx context.Context, documentID string) error
// DeleteByDocuments removes all embeddings for multiple documents in a single operation
DeleteByDocuments(ctx context.Context, documentIDs []string) error
// DeleteBySourceAndContainer removes all embeddings for a specific container within a source
DeleteBySourceAndContainer(ctx context.Context, sourceID, containerID string) error
// HealthCheck verifies the vector store is available
HealthCheck(ctx context.Context) error
}
VectorIndex handles vector similarity search using a standalone embeddings table. This interface allows for dedicated vector store implementations (e.g., pgvector).
Source Files
¶
- ai_factory.go
- auth.go
- capability_store.go
- config_provider.go
- connection_store.go
- connector.go
- container_lister.go
- content_filter.go
- distributed_lock.go
- document_filter.go
- document_store.go
- embedding.go
- llm.go
- normaliser.go
- oauth_server_store.go
- oauth_state.go
- search_engine.go
- search_query_repository.go
- session_store.go
- settings_store.go
- source_store.go
- sync_store.go
- task_queue.go
- token_provider.go
- user_store.go