driven

package
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2026 License: Apache-2.0 Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AICredentials

type AICredentials struct {
	APIKey  string
	BaseURL string // Optional custom base URL
}

AICredentials holds AI provider API keys and base URLs from environment variables

type AIServiceFactory

type AIServiceFactory interface {
	// CreateEmbeddingService creates an embedding service from settings and credentials.
	// Credentials come from ConfigProvider (environment variables).
	// Returns error if settings are invalid or service creation fails.
	CreateEmbeddingService(settings *domain.EmbeddingSettings, credentials *AICredentials) (EmbeddingService, error)

	// CreateLLMService creates an LLM service from settings and credentials.
	// Credentials come from ConfigProvider (environment variables).
	// Returns error if settings are invalid or service creation fails.
	CreateLLMService(settings *domain.LLMSettings, credentials *AICredentials) (LLMService, error)
}

AIServiceFactory creates AI services based on configuration. Credentials are provided separately from domain settings.

type AuthAdapter

type AuthAdapter interface {
	// Password operations
	HashPassword(password string) (string, error)
	VerifyPassword(password, hash string) bool

	// Token operations
	GenerateToken(claims *domain.TokenClaims) (string, error)
	ParseToken(token string) (*domain.TokenClaims, error)
}

AuthAdapter handles authentication cryptographic operations. This does NOT handle storage - use SessionStore for session persistence.

type AuthorizationCodeStore added in v0.2.1

type AuthorizationCodeStore interface {
	// Save stores a new authorization code
	Save(ctx context.Context, code *domain.AuthorizationCode) error

	// GetAndMarkUsed atomically retrieves the code and marks it as used.
	// This ensures single-use semantics for authorization codes.
	// Returns domain.ErrNotFound if code doesn't exist.
	GetAndMarkUsed(ctx context.Context, code string) (*domain.AuthorizationCode, error)

	// Cleanup removes expired authorization codes.
	// Should be called periodically (e.g., via cron job).
	Cleanup(ctx context.Context) error
}

AuthorizationCodeStore manages short-lived authorization codes for the auth code flow

type Capabilities

type Capabilities struct {
	// OAuth providers that are configured via environment variables
	OAuthProviders []domain.PlatformType

	// AI providers available for embedding
	EmbeddingProviders []domain.AIProvider

	// AI providers available for LLM
	LLMProviders []domain.AIProvider

	// SearchEngineAvailable indicates if a search engine (e.g. OpenSearch) was initialized
	SearchEngineAvailable bool

	// VectorStoreAvailable indicates if a vector store (e.g. pgvector) was initialized
	VectorStoreAvailable bool

	// Operational boundaries from environment variables
	Limits OperationalLimits
}

Capabilities represents what features are available based on environment configuration

type CapabilityStore

type CapabilityStore interface {
	// GetPreferences retrieves capability preferences for a team.
	// Returns error if team not found or database error occurs.
	GetPreferences(ctx context.Context, teamID string) (*domain.CapabilityPreferences, error)

	// SavePreferences persists capability preferences for a team.
	// Creates new preferences if they don't exist, updates if they do.
	SavePreferences(ctx context.Context, prefs *domain.CapabilityPreferences) error
}

CapabilityStore manages capability preferences persistence

type ChunkOptions

type ChunkOptions struct {
	MaxChunkSize int // Maximum characters per chunk
	Overlap      int // Character overlap between chunks
}

ChunkOptions configures chunking behavior

func DefaultChunkOptions

func DefaultChunkOptions() ChunkOptions

DefaultChunkOptions returns sensible defaults

type ChunkResult

type ChunkResult struct {
	Content   string
	StartChar int
	EndChar   int
	Position  int
}

ChunkResult represents a chunk with position info

type Chunker

type Chunker interface {
	// Chunk splits content into chunks
	Chunk(content string, opts ChunkOptions) []ChunkResult
}

Chunker splits document content into searchable chunks

type ConfigProvider

type ConfigProvider interface {
	// GetOAuthCredentials returns OAuth client credentials for a platform.
	// Returns nil if the platform is not configured in environment variables.
	GetOAuthCredentials(platform domain.PlatformType) *OAuthCredentials

	// GetAICredentials returns AI provider credentials (API key, base URL).
	// Returns nil if the provider is not configured in environment variables.
	GetAICredentials(provider domain.AIProvider) *AICredentials

	// IsOAuthConfigured returns true if OAuth credentials are available for the platform.
	IsOAuthConfigured(platform domain.PlatformType) bool

	// IsAIConfigured returns true if AI credentials are available for the provider.
	IsAIConfigured(provider domain.AIProvider) bool

	// GetCapabilities returns information about what's available based on env configuration.
	// This is used for the /api/v1/capabilities endpoint.
	GetCapabilities() *Capabilities

	// GetBaseURL returns the application base URL for OAuth callbacks.
	GetBaseURL() string
}

ConfigProvider provides access to configuration from environment variables. This is a driven port that abstracts environment-based configuration. Implementation lives in internal/config/ (infrastructure layer).

type ConnectionStore

type ConnectionStore interface {
	// Save stores a new connection or updates an existing one.
	// Secrets are encrypted before storage.
	Save(ctx context.Context, conn *domain.Connection) error

	// Get retrieves a connection by ID with decrypted secrets.
	// Returns domain.ErrNotFound if the connection doesn't exist.
	Get(ctx context.Context, id string) (*domain.Connection, error)

	// List retrieves all connections as summaries (no secrets).
	List(ctx context.Context) ([]*domain.ConnectionSummary, error)

	// Delete removes a connection by ID.
	// Returns domain.ErrNotFound if the connection doesn't exist.
	Delete(ctx context.Context, id string) error

	// GetByPlatform retrieves connections for a platform type (no secrets).
	GetByPlatform(ctx context.Context, platform domain.PlatformType) ([]*domain.ConnectionSummary, error)

	// GetByAccountID retrieves a connection by platform type and account ID.
	// Returns nil if not found.
	GetByAccountID(ctx context.Context, platform domain.PlatformType, accountID string) (*domain.Connection, error)

	// UpdateSecrets updates the encrypted secrets and OAuth metadata.
	// Used after token refresh.
	UpdateSecrets(ctx context.Context, id string, secrets *domain.ConnectionSecrets, expiry *time.Time) error

	// UpdateLastUsed updates the last_used_at timestamp.
	UpdateLastUsed(ctx context.Context, id string) error
}

ConnectionStore persists connector connections with encrypted secrets.

type Connector

type Connector interface {
	// Type returns the provider type.
	Type() domain.ProviderType

	// ValidateConfig validates source configuration.
	ValidateConfig(config domain.SourceConfig) error

	// FetchChanges fetches document changes since last sync.
	// Returns changes, next cursor, and error.
	// The cursor enables incremental sync - pass empty string for full sync.
	FetchChanges(ctx context.Context, source *domain.Source, cursor string) ([]*domain.Change, string, error)

	// FetchDocument fetches a single document by external ID.
	// Returns the document, content hash, and error.
	FetchDocument(ctx context.Context, source *domain.Source, externalID string) (*domain.Document, string, error)

	// TestConnection tests the connection to the source.
	TestConnection(ctx context.Context, source *domain.Source) error
}

Connector fetches documents from a source provider. Connectors are created by ConnectorBuilder with resolved credentials.

type ConnectorBuilder

type ConnectorBuilder interface {
	// Type returns the provider type this builder creates.
	Type() domain.ProviderType

	// Build creates a connector scoped to a specific container.
	// The containerID comes from source.SelectedContainers (one per sync job).
	// For providers that don't support container selection, containerID may be empty.
	// The TokenProvider handles credential retrieval and OAuth token refresh.
	Build(ctx context.Context, tokenProvider TokenProvider, containerID string) (Connector, error)

	// SupportsOAuth returns true if this connector uses OAuth authentication.
	SupportsOAuth() bool

	// OAuthConfig returns OAuth configuration for this provider.
	// Returns nil if the provider doesn't support OAuth.
	OAuthConfig() *OAuthConfig

	// SupportsContainerSelection returns true if this connector supports container picking.
	// If true, admins can select specific containers (repos, drives, spaces) to index.
	// If false, the connector indexes all accessible content.
	SupportsContainerSelection() bool
}

ConnectorBuilder creates connector instances for a specific provider type. Each provider has its own builder registered with the ConnectorFactory.

type ConnectorFactory

type ConnectorFactory interface {
	// Register registers a connector builder for a provider type.
	Register(builder ConnectorBuilder)

	// Create creates a connector for the given source, scoped to a container.
	// Called by SyncOrchestrator once per container in source.SelectedContainers.
	// For providers without container selection, containerID may be empty.
	Create(ctx context.Context, source *domain.Source, containerID string) (Connector, error)

	// SupportedTypes returns all registered provider types.
	SupportedTypes() []domain.ProviderType

	// GetBuilder returns the builder for a provider type.
	GetBuilder(providerType domain.ProviderType) (ConnectorBuilder, error)

	// SupportsOAuth returns true if the provider supports OAuth.
	SupportsOAuth(providerType domain.ProviderType) bool

	// GetOAuthConfig returns OAuth config for a provider.
	GetOAuthConfig(providerType domain.ProviderType) *OAuthConfig
}

ConnectorFactory manages connector builders and creates connectors.

type Container

type Container struct {
	// ID is the provider-specific identifier.
	// Format varies by provider:
	//   - GitHub: "owner/repo"
	//   - Google Drive: drive ID
	//   - Confluence: space key
	ID string `json:"id"`

	// Name is the human-readable display name.
	Name string `json:"name"`

	// Description is an optional description of the container.
	Description string `json:"description,omitempty"`

	// Type identifies the container type.
	// Examples: "repository", "shared_drive", "space", "project", "channel"
	Type string `json:"type"`

	// HasChildren indicates if this container has child containers (for folder navigation).
	HasChildren bool `json:"has_children,omitempty"`

	// Metadata contains provider-specific additional data.
	// Examples:
	//   - GitHub: {"owner": "...", "private": "true", "archived": "false"}
	//   - Google Drive: {"shared": "true"}
	Metadata map[string]string `json:"metadata,omitempty"`
}

Container represents a top-level container that can be selected for indexing. Examples:

  • GitHub: repository (owner/repo)
  • Google Drive: shared drive
  • Confluence: space
  • Jira: project
  • Notion: workspace
  • Slack: channel

type ContainerLister

type ContainerLister interface {
	// ListContainers lists containers accessible with the current credentials.
	// Pagination is handled via cursor - pass empty string for the first page.
	// parentID can be used to list children of a specific container (for folder navigation).
	// Pass empty string for root level containers.
	// Returns:
	//   - containers: list of available containers
	//   - nextCursor: cursor for the next page, empty if no more pages
	//   - err: any error that occurred
	ListContainers(ctx context.Context, cursor string, parentID string) ([]*Container, string, error)
}

ContainerLister lists available containers for a connector. Not all connectors support container selection - some index all accessible content.

The ContainerLister is typically retrieved from the ConnectorFactory for a specific provider type and installation.

type ContainerListerFactory

type ContainerListerFactory interface {
	// Create creates a ContainerLister for the given provider and installation.
	// Returns nil if the provider doesn't support container selection.
	Create(ctx context.Context, providerType domain.ProviderType, installationID string) (ContainerLister, error)

	// SupportsContainerSelection returns true if the provider supports container selection.
	// If false, the connector indexes all accessible content.
	SupportsContainerSelection(providerType domain.ProviderType) bool
}

ContainerListerFactory creates ContainerListers for different providers.

type ContentExtractor

type ContentExtractor interface {
	// Extract extracts text content from raw data
	Extract(ctx context.Context, data []byte, mimeType string) (string, error)

	// SupportedTypes returns supported MIME types
	SupportedTypes() []string
}

ContentExtractor extracts text content from various file formats

type ContentFilter added in v0.2.1

type ContentFilter interface {
	// ShouldFetchContent determines if content should be fetched for a file.
	// It combines path pattern matching and MIME type detection to make the decision.
	// Returns:
	//   - shouldFetch: true if content should be fetched and processed
	//   - mimeType: detected MIME type from the file path
	ShouldFetchContent(ctx context.Context, path string, settings *domain.SyncExclusionSettings) (shouldFetch bool, mimeType string)

	// GetMimeType returns the MIME type for a file path based on its extension or filename.
	// This provides a single source of truth for MIME type detection across the application.
	GetMimeType(path string) string
}

ContentFilter determines whether content should be fetched and processed based on file path patterns and MIME type exclusions. This port enables early filtering before content is fetched, reducing wasted resources.

type CredentialsStore

type CredentialsStore interface {
	// Save stores credentials (encrypts sensitive fields)
	Save(ctx context.Context, creds *domain.Credentials) error

	// Get retrieves credentials by ID
	Get(ctx context.Context, id string) (*domain.Credentials, error)

	// List retrieves all credentials
	List(ctx context.Context) ([]*domain.Credentials, error)

	// Delete deletes credentials
	Delete(ctx context.Context, id string) error

	// GetByProvider retrieves credentials for a provider type
	GetByProvider(ctx context.Context, providerType domain.ProviderType) ([]*domain.Credentials, error)
}

CredentialsStore handles credential persistence (PostgreSQL, encrypted)

type DistributedLock

type DistributedLock interface {
	// Acquire attempts to acquire a named lock with the given TTL.
	// Returns true if the lock was successfully acquired, false if already held by another instance.
	// The lock will automatically expire after TTL (implementation dependent).
	Acquire(ctx context.Context, name string, ttl time.Duration) (acquired bool, err error)

	// Release releases a named lock.
	// This is best-effort; implementations with TTL will auto-expire anyway.
	// Safe to call even if the lock is not held or has expired.
	Release(ctx context.Context, name string) error

	// Extend extends the TTL of a currently held lock.
	// Returns error if the lock is not held by this instance.
	// Note: Not all implementations support TTL extension (e.g., PostgreSQL advisory locks).
	Extend(ctx context.Context, name string, ttl time.Duration) error

	// Ping checks if the lock backend is healthy.
	Ping(ctx context.Context) error
}

DistributedLock provides distributed locking for coordinating work across instances. This is used to prevent duplicate execution in multi-instance deployments.

type DocumentIDProvider added in v0.2.2

type DocumentIDProvider interface {
	// GetAllowedDocumentIDs returns the document IDs that should be included in search.
	// Returns nil/empty slice if no filtering should be applied.
	GetAllowedDocumentIDs(ctx context.Context, query string, filters pipeline.SearchFilters) ([]string, error)
}

DocumentIDProvider provides document IDs for pre-filtering search results. Implementations can apply ACL, tenant isolation, or other filtering logic.

type DocumentResult

type DocumentResult struct {
	DocumentID string
	SourceID   string
	Title      string
	Content    string // Full document content (for snippet extraction)
	Score      float64
}

DocumentResult represents a document-level search result from BM25.

type DocumentStore

type DocumentStore interface {
	// Save creates or updates a document
	Save(ctx context.Context, doc *domain.Document) error

	// SaveBatch saves multiple documents in a transaction
	SaveBatch(ctx context.Context, docs []*domain.Document) error

	// Get retrieves a document by ID
	Get(ctx context.Context, id string) (*domain.Document, error)

	// GetByExternalID retrieves a document by source and external ID
	GetByExternalID(ctx context.Context, sourceID, externalID string) (*domain.Document, error)

	// GetBySource retrieves all documents for a source with pagination
	GetBySource(ctx context.Context, sourceID string, limit, offset int) ([]*domain.Document, error)

	// Delete deletes a document
	Delete(ctx context.Context, id string) error

	// DeleteBySource deletes all documents for a source
	DeleteBySource(ctx context.Context, sourceID string) error

	// DeleteBySourceAndContainer deletes all documents for a specific container within a source
	DeleteBySourceAndContainer(ctx context.Context, sourceID, containerID string) error

	// DeleteBatch deletes multiple documents by ID
	DeleteBatch(ctx context.Context, ids []string) error

	// Count returns total document count
	Count(ctx context.Context) (int, error)

	// CountBySource returns document count for a source
	CountBySource(ctx context.Context, sourceID string) (int, error)

	// ListExternalIDs returns all external IDs for a source (for diff sync)
	ListExternalIDs(ctx context.Context, sourceID string) ([]string, error)
}

DocumentStore handles document persistence (PostgreSQL)

type EmbeddingService

type EmbeddingService interface {
	// Embed generates embeddings for multiple texts
	Embed(ctx context.Context, texts []string) ([][]float32, error)

	// EmbedQuery generates an embedding for a search query
	// May use different model/parameters optimized for queries
	EmbedQuery(ctx context.Context, query string) ([]float32, error)

	// Dimensions returns the embedding dimension size
	Dimensions() int

	// Model returns the model name being used
	Model() string

	// HealthCheck verifies the embedding service is available
	HealthCheck(ctx context.Context) error

	// Close releases resources held by the embedding service
	Close() error
}

EmbeddingService generates text embeddings

type LLMService

type LLMService interface {
	// Complete sends a completion request to the LLM and returns the response.
	// This is the primary method for all LLM interactions.
	Complete(ctx context.Context, req domain.CompletionRequest) (domain.CompletionResponse, error)

	// Model returns the model name being used (e.g., "gpt-4o", "claude-3-5-sonnet")
	Model() string

	// Ping verifies the LLM service is reachable and properly configured
	Ping(ctx context.Context) error

	// Close releases resources held by the LLM service
	Close() error
}

LLMService provides a generic completion interface for Large Language Models. This is a pure provider interface - task-specific logic (query expansion, summarization, etc.) is implemented in pipeline stages.

type Normaliser

type Normaliser interface {
	// Normalise transforms raw content into normalized text.
	// The mimeType helps determine the appropriate processing.
	Normalise(content string, mimeType string) string

	// SupportedTypes returns MIME types this normaliser handles.
	// Can include wildcards like "text/*" or specific types like "text/markdown".
	SupportedTypes() []string

	// Priority returns the normaliser priority (higher = more specific).
	// Priority ranges:
	//   90-100: Connector-specific (e.g., Gmail email normaliser)
	//   50-89:  Format-specific (PDF, Markdown, HTML)
	//   10-49:  Generic (basic text processing)
	//   1-9:    Fallback (raw text extraction)
	Priority() int
}

Normaliser normalizes raw document content for indexing. It transforms provider-specific document formats into normalized text.

type NormaliserRegistry

type NormaliserRegistry interface {
	// Get retrieves the best-matching normaliser for a MIME type.
	// Returns nil if no normaliser is registered for the type.
	// When multiple match, the highest priority normaliser is returned.
	Get(mimeType string) Normaliser

	// GetAll retrieves all normalisers that match a MIME type, sorted by priority (highest first).
	GetAll(mimeType string) []Normaliser

	// Register registers a normaliser.
	Register(normaliser Normaliser)

	// List returns all registered MIME types.
	List() []string
}

NormaliserRegistry manages content normalisers. When multiple normalisers match a MIME type, the highest priority one is used.

type OAuthClientStore added in v0.2.1

type OAuthClientStore interface {
	// Save stores a new client or updates an existing one
	Save(ctx context.Context, client *domain.OAuthClient) error

	// Get retrieves a client by client_id
	// Returns domain.ErrNotFound if the client doesn't exist
	Get(ctx context.Context, clientID string) (*domain.OAuthClient, error)

	// Delete removes a client by client_id
	// Returns domain.ErrNotFound if the client doesn't exist
	Delete(ctx context.Context, clientID string) error

	// List retrieves all registered clients
	List(ctx context.Context) ([]*domain.OAuthClient, error)
}

OAuthClientStore manages OAuth 2.0 client registrations

type OAuthConfig

type OAuthConfig struct {
	// AuthURL is the authorization endpoint
	AuthURL string

	// TokenURL is the token exchange endpoint
	TokenURL string

	// Scopes are the required OAuth scopes
	Scopes []string

	// UserInfoURL is the endpoint to fetch user info (optional)
	UserInfoURL string
}

OAuthConfig contains OAuth settings for a provider.

type OAuthCredentials

type OAuthCredentials struct {
	ClientID     string
	ClientSecret string
}

OAuthCredentials holds OAuth client credentials from environment variables

type OAuthHandler

type OAuthHandler interface {
	// DefaultConfig returns the default OAuth configuration (auth URL, token URL, scopes)
	DefaultConfig() OAuthConfig

	// BuildAuthURL constructs the OAuth authorization URL with PKCE
	BuildAuthURL(clientID, redirectURI, state, codeChallenge string, scopes []string) string

	// ExchangeCode exchanges an authorization code for access tokens
	ExchangeCode(ctx context.Context, clientID, clientSecret, code, redirectURI, codeVerifier string) (*OAuthToken, error)

	// GetUserInfo retrieves user information using an access token
	GetUserInfo(ctx context.Context, accessToken string) (*OAuthUserInfo, error)

	// RefreshToken refreshes an expired access token
	RefreshToken(ctx context.Context, refreshToken string) (*OAuthToken, error)
}

OAuthHandler handles OAuth flow for a provider.

type OAuthHandlerFactory

type OAuthHandlerFactory interface {
	// GetOAuthHandler returns an OAuth handler for the given platform.
	// Returns nil if the platform doesn't support OAuth or isn't implemented.
	GetOAuthHandler(platform domain.PlatformType) OAuthHandler
}

OAuthHandlerFactory creates OAuth handlers for specific platforms. This abstracts the connectors.Factory dependency.

type OAuthState

type OAuthState struct {
	// State is a cryptographically random string used for CSRF protection.
	State string

	// ProviderType is the OAuth provider (github, google_drive, etc.)
	ProviderType string

	// CodeVerifier is the PKCE code verifier (plain text, not hashed).
	// This is used to generate code_challenge for the auth request
	// and sent as code_verifier during token exchange.
	CodeVerifier string

	// RedirectURI is the callback URL where the provider will redirect.
	RedirectURI string

	// ReturnContext indicates where to redirect after OAuth completes.
	// Values: "setup", "admin-sources", or empty for default behavior.
	ReturnContext string

	// CreatedAt is when the state was created.
	CreatedAt time.Time

	// ExpiresAt is when the state expires (typically 10 minutes).
	ExpiresAt time.Time
}

OAuthState represents a pending OAuth authorization flow state. Used for CSRF protection and PKCE code verifier storage.

type OAuthStateStore

type OAuthStateStore interface {
	// Save stores a new OAuth state.
	// The state typically expires in 10 minutes.
	Save(ctx context.Context, state *OAuthState) error

	// GetAndDelete atomically retrieves and deletes the state.
	// This ensures single-use semantics.
	// Returns nil, nil if the state doesn't exist or has expired.
	GetAndDelete(ctx context.Context, state string) (*OAuthState, error)

	// Cleanup removes expired states.
	// Should be called periodically (e.g., every hour) to clean up orphaned states.
	Cleanup(ctx context.Context) error
}

OAuthStateStore manages OAuth flow state for CSRF protection. States are single-use and expire after a short period.

type OAuthToken

type OAuthToken struct {
	AccessToken  string
	RefreshToken string
	ExpiresIn    int    // Seconds until expiry
	TokenType    string // Usually "Bearer"
	Scope        string // Space-separated scopes
}

OAuthToken represents OAuth tokens from a provider.

type OAuthTokenProvider

type OAuthTokenProvider struct {
	// contains filtered or unexported fields
}

OAuthTokenProvider implements TokenProvider for OAuth credentials. It automatically refreshes tokens when they expire.

func NewOAuthTokenProvider

func NewOAuthTokenProvider(
	creds *domain.Credentials,
	refresher TokenRefresher,
	store CredentialsStore,
) *OAuthTokenProvider

NewOAuthTokenProvider creates a token provider for OAuth credentials.

func (*OAuthTokenProvider) AuthMethod

func (p *OAuthTokenProvider) AuthMethod() domain.AuthMethod

AuthMethod returns OAuth2.

func (*OAuthTokenProvider) GetAccessToken

func (p *OAuthTokenProvider) GetAccessToken(ctx context.Context) (string, error)

GetAccessToken returns a valid access token, refreshing if needed.

func (*OAuthTokenProvider) GetCredentials

func (p *OAuthTokenProvider) GetCredentials(ctx context.Context) (*domain.Credentials, error)

GetCredentials returns the full credentials.

func (*OAuthTokenProvider) IsValid

func (p *OAuthTokenProvider) IsValid(ctx context.Context) bool

IsValid checks if credentials are valid (not expired or can be refreshed).

type OAuthTokenStore added in v0.2.1

type OAuthTokenStore interface {
	// SaveAccessToken stores a new access token
	SaveAccessToken(ctx context.Context, token *domain.OAuthAccessToken) error

	// GetAccessToken retrieves an access token by its ID (jti claim)
	// Returns domain.ErrNotFound if the token doesn't exist
	GetAccessToken(ctx context.Context, tokenID string) (*domain.OAuthAccessToken, error)

	// RevokeAccessToken marks an access token as revoked
	// Returns domain.ErrNotFound if the token doesn't exist
	RevokeAccessToken(ctx context.Context, tokenID string) error

	// SaveRefreshToken stores a new refresh token
	SaveRefreshToken(ctx context.Context, token *domain.OAuthRefreshToken) error

	// GetRefreshToken retrieves a refresh token by its ID
	// Returns domain.ErrNotFound if the token doesn't exist
	GetRefreshToken(ctx context.Context, tokenID string) (*domain.OAuthRefreshToken, error)

	// RevokeRefreshToken marks a refresh token as revoked
	// Returns domain.ErrNotFound if the token doesn't exist
	RevokeRefreshToken(ctx context.Context, tokenID string) error

	// RotateRefreshToken marks the old token as rotated and returns the new token ID.
	// This implements refresh token rotation for enhanced security.
	// Returns domain.ErrNotFound if the old token doesn't exist.
	RotateRefreshToken(ctx context.Context, oldTokenID string, newTokenID string) error

	// RevokeAllForClient revokes all tokens (access and refresh) for a given client.
	// Used when a client is deleted or compromised.
	RevokeAllForClient(ctx context.Context, clientID string) error

	// Cleanup removes expired tokens.
	// Should be called periodically (e.g., via cron job).
	Cleanup(ctx context.Context) error
}

OAuthTokenStore manages access tokens and refresh tokens with revocation support

type OAuthUserInfo

type OAuthUserInfo struct {
	ID       string // Provider-specific user ID
	Email    string
	Name     string
	ImageURL string
}

OAuthUserInfo represents user info from OAuth provider.

type OperationalLimits

type OperationalLimits struct {
	SyncMinInterval   int // Minutes floor (default: 5)
	SyncMaxInterval   int // Minutes ceiling (default: 1440)
	MaxWorkers        int // Worker ceiling (default: 10)
	MaxResultsPerPage int // Results ceiling (default: 100)
}

OperationalLimits defines guardrails from environment configuration

type QueueStats

type QueueStats struct {
	// PendingCount is the number of tasks waiting to be processed
	PendingCount int64 `json:"pending_count"`

	// ProcessingCount is the number of tasks currently being processed
	ProcessingCount int64 `json:"processing_count"`

	// CompletedCount is the number of successfully completed tasks
	CompletedCount int64 `json:"completed_count"`

	// FailedCount is the number of tasks that failed after all retries
	FailedCount int64 `json:"failed_count"`

	// OldestPendingAge is the age of the oldest pending task in seconds
	OldestPendingAge int64 `json:"oldest_pending_age"`
}

QueueStats contains queue statistics

type SchedulerStore

type SchedulerStore interface {
	// GetScheduledTask retrieves a scheduled task by ID
	GetScheduledTask(ctx context.Context, id string) (*domain.ScheduledTask, error)

	// ListScheduledTasks retrieves all scheduled tasks for a team
	ListScheduledTasks(ctx context.Context, teamID string) ([]*domain.ScheduledTask, error)

	// SaveScheduledTask creates or updates a scheduled task
	SaveScheduledTask(ctx context.Context, task *domain.ScheduledTask) error

	// DeleteScheduledTask removes a scheduled task
	DeleteScheduledTask(ctx context.Context, id string) error

	// GetDueScheduledTasks retrieves scheduled tasks that are due to run
	GetDueScheduledTasks(ctx context.Context) ([]*domain.ScheduledTask, error)

	// UpdateLastRun updates the last run time and next run time
	UpdateLastRun(ctx context.Context, id string, lastError string) error
}

SchedulerStore handles persistence for scheduled tasks. This is separate from TaskQueue because scheduled tasks are configuration, not transient queue items.

type SearchEngine

type SearchEngine interface {
	// Index indexes chunks for a document (legacy, kept for backward compat)
	Index(ctx context.Context, chunks []*domain.Chunk) error

	// IndexDocument indexes a full document for BM25 text search.
	// Uses document_id as the OpenSearch document ID (upsert semantics).
	IndexDocument(ctx context.Context, doc *domain.DocumentContent) error

	// Search performs a search query (legacy chunk-level search)
	Search(ctx context.Context, query string, queryEmbedding []float32, opts domain.SearchOptions) ([]*domain.RankedChunk, int, error)

	// SearchDocuments performs a BM25 text search returning document-level results.
	SearchDocuments(ctx context.Context, query string, opts domain.SearchOptions) ([]DocumentResult, int, error)

	// Delete deletes chunks by IDs
	Delete(ctx context.Context, chunkIDs []string) error

	// DeleteByDocument deletes all indexed data for a document
	DeleteByDocument(ctx context.Context, documentID string) error

	// DeleteByDocuments deletes all indexed data for multiple documents in a single operation
	DeleteByDocuments(ctx context.Context, documentIDs []string) error

	// DeleteBySource deletes all indexed data for a source
	DeleteBySource(ctx context.Context, sourceID string) error

	// DeleteBySourceAndContainer deletes all indexed data for a specific container within a source
	DeleteBySourceAndContainer(ctx context.Context, sourceID, containerID string) error

	// HealthCheck verifies the search engine is available
	HealthCheck(ctx context.Context) error

	// Count returns the total number of indexed documents
	Count(ctx context.Context) (int64, error)

	// GetDocument retrieves a document's full indexed content by document ID.
	// Returns domain.ErrNotFound if the document is not in the search index.
	GetDocument(ctx context.Context, documentID string) (*domain.DocumentContent, error)
}

SearchEngine handles search indexing and querying

type SearchQueryRepository

type SearchQueryRepository interface {
	// Save logs a search query for analytics tracking
	Save(ctx context.Context, query *domain.SearchQuery) error

	// GetSearchHistory retrieves recent search queries
	// Returns up to limit most recent searches, ordered by created_at desc
	GetSearchHistory(ctx context.Context, teamID string, limit int) ([]*domain.SearchQuery, error)

	// GetSearchAnalytics computes aggregated search analytics for a time period
	GetSearchAnalytics(ctx context.Context, teamID string, period domain.AnalyticsPeriod) (*domain.SearchAnalytics, error)

	// GetSearchMetrics computes performance metrics for a time period
	GetSearchMetrics(ctx context.Context, teamID string, period domain.AnalyticsPeriod) (*domain.SearchMetrics, error)
}

SearchQueryRepository handles search query logging and analytics

type SessionStore

type SessionStore interface {
	// Save stores a session with TTL based on ExpiresAt
	Save(ctx context.Context, session *domain.Session) error

	// Get retrieves a session by ID
	Get(ctx context.Context, id string) (*domain.Session, error)

	// GetByToken retrieves a session by token value
	GetByToken(ctx context.Context, token string) (*domain.Session, error)

	// GetByRefreshToken retrieves a session by refresh token value
	GetByRefreshToken(ctx context.Context, refreshToken string) (*domain.Session, error)

	// Delete deletes a session
	Delete(ctx context.Context, id string) error

	// DeleteByToken deletes a session by token
	DeleteByToken(ctx context.Context, token string) error

	// DeleteByUser deletes all sessions for a user (logout everywhere)
	DeleteByUser(ctx context.Context, userID string) error

	// ListByUser lists all active sessions for a user
	ListByUser(ctx context.Context, userID string) ([]*domain.Session, error)
}

SessionStore handles session persistence (Redis)

type SettingsStore

type SettingsStore interface {
	// GetSettings retrieves settings for a team
	GetSettings(ctx context.Context, teamID string) (*domain.Settings, error)

	// SaveSettings persists team settings
	SaveSettings(ctx context.Context, settings *domain.Settings) error

	// GetAISettings retrieves AI-specific settings for a team
	GetAISettings(ctx context.Context, teamID string) (*domain.AISettings, error)

	// SaveAISettings persists AI-specific settings
	SaveAISettings(ctx context.Context, teamID string, settings *domain.AISettings) error
}

SettingsStore persists team and AI settings

type SourceStore

type SourceStore interface {
	// Save creates or updates a source
	Save(ctx context.Context, source *domain.Source) error

	// Get retrieves a source by ID
	Get(ctx context.Context, id string) (*domain.Source, error)

	// GetByName retrieves a source by name
	GetByName(ctx context.Context, name string) (*domain.Source, error)

	// List retrieves all sources
	List(ctx context.Context) ([]*domain.Source, error)

	// ListEnabled retrieves all enabled sources
	ListEnabled(ctx context.Context) ([]*domain.Source, error)

	// Delete deletes a source
	Delete(ctx context.Context, id string) error

	// SetEnabled updates the enabled status
	SetEnabled(ctx context.Context, id string, enabled bool) error

	// CountByConnection returns the number of sources using a connection
	CountByConnection(ctx context.Context, connectionID string) (int, error)

	// ListByConnection returns sources using a connection
	ListByConnection(ctx context.Context, connectionID string) ([]*domain.Source, error)

	// UpdateContainers updates the containers for a source
	UpdateContainers(ctx context.Context, id string, containers []domain.Container) error
}

SourceStore handles source persistence (PostgreSQL)

type StaticTokenProvider

type StaticTokenProvider struct {
	// contains filtered or unexported fields
}

StaticTokenProvider implements TokenProvider for non-OAuth credentials. Used for API keys, PATs, and basic auth.

func NewStaticTokenProvider

func NewStaticTokenProvider(creds *domain.Credentials) *StaticTokenProvider

NewStaticTokenProvider creates a token provider for static credentials.

func (*StaticTokenProvider) AuthMethod

func (p *StaticTokenProvider) AuthMethod() domain.AuthMethod

AuthMethod returns the authentication method.

func (*StaticTokenProvider) GetAccessToken

func (p *StaticTokenProvider) GetAccessToken(ctx context.Context) (string, error)

GetAccessToken returns the API key or PAT.

func (*StaticTokenProvider) GetCredentials

func (p *StaticTokenProvider) GetCredentials(ctx context.Context) (*domain.Credentials, error)

GetCredentials returns the full credentials.

func (*StaticTokenProvider) IsValid

func (p *StaticTokenProvider) IsValid(ctx context.Context) bool

IsValid returns true - static credentials don't expire.

type SyncStateStore

type SyncStateStore interface {
	// Save creates or updates sync state
	Save(ctx context.Context, state *domain.SyncState) error

	// Get retrieves sync state for a source
	Get(ctx context.Context, sourceID string) (*domain.SyncState, error)

	// List retrieves sync states for all sources
	List(ctx context.Context) ([]*domain.SyncState, error)

	// Delete deletes sync state for a source
	Delete(ctx context.Context, sourceID string) error

	// UpdateStatus updates only the status field
	UpdateStatus(ctx context.Context, sourceID string, status domain.SyncStatus) error

	// UpdateCursor updates the sync cursor
	UpdateCursor(ctx context.Context, sourceID string, cursor string) error
}

SyncStateStore handles sync state persistence (PostgreSQL)

type TaskFilter

type TaskFilter struct {
	// TeamID filters by team (required)
	TeamID string

	// Status filters by task status (optional, empty means all)
	Status domain.TaskStatus

	// Type filters by task type (optional, empty means all)
	Type domain.TaskType

	// Limit is the maximum number of tasks to return
	Limit int

	// Offset is the number of tasks to skip (for pagination)
	Offset int
}

TaskFilter specifies criteria for listing tasks

type TaskQueue

type TaskQueue interface {
	// Enqueue adds a task to the queue for processing.
	// The task will be picked up by a worker based on priority and scheduled time.
	Enqueue(ctx context.Context, task *domain.Task) error

	// EnqueueBatch adds multiple tasks to the queue atomically.
	// If any task fails to enqueue, all tasks are rolled back.
	EnqueueBatch(ctx context.Context, tasks []*domain.Task) error

	// Dequeue retrieves the next available task for processing.
	// This should block until a task is available or context is cancelled.
	// The task is marked as processing and will not be returned to other workers.
	// Returns nil, nil if no tasks are available (for non-blocking implementations).
	Dequeue(ctx context.Context) (*domain.Task, error)

	// DequeueWithTimeout retrieves the next available task, waiting up to timeout.
	// Returns nil, nil if timeout is reached with no tasks available.
	DequeueWithTimeout(ctx context.Context, timeout int) (*domain.Task, error)

	// Ack acknowledges successful completion of a task.
	// The task is removed from the queue.
	Ack(ctx context.Context, taskID string) error

	// Nack indicates task processing failed and should be retried.
	// The task is returned to the queue with updated retry count.
	// If max retries exceeded, task is moved to failed state.
	Nack(ctx context.Context, taskID string, reason string) error

	// GetTask retrieves a task by ID (for status checking).
	GetTask(ctx context.Context, taskID string) (*domain.Task, error)

	// ListTasks retrieves tasks matching the filter criteria.
	ListTasks(ctx context.Context, filter TaskFilter) ([]*domain.Task, error)

	// CancelTask marks a pending task as cancelled.
	// Returns error if task is already processing or completed.
	CancelTask(ctx context.Context, taskID string) error

	// PurgeTasks removes completed/failed tasks older than the specified age.
	// This is used for cleanup.
	PurgeTasks(ctx context.Context, olderThan int) (int, error)

	// Stats returns queue statistics.
	Stats(ctx context.Context) (*QueueStats, error)

	// Ping checks if the queue backend is healthy.
	Ping(ctx context.Context) error

	// GetJobStats computes aggregated job statistics for a time period
	// This is used by the admin dashboard to show job execution metrics
	GetJobStats(ctx context.Context, teamID string, period domain.AnalyticsPeriod) (*domain.JobStats, error)

	// CountTasks returns the total number of tasks matching the filter
	// This is used for pagination to determine if there are more results
	CountTasks(ctx context.Context, filter TaskFilter) (int64, error)

	// Close cleans up resources.
	Close() error
}

TaskQueue handles background task queuing and processing. Implementations can use Redis (preferred) or Postgres (fallback).

type TokenProvider

type TokenProvider interface {
	// GetAccessToken returns a valid access token.
	// For OAuth, this automatically refreshes expired tokens.
	// For API keys, this returns the stored key.
	GetAccessToken(ctx context.Context) (string, error)

	// GetCredentials returns the full credentials.
	// Use GetAccessToken for most operations - this is for special cases.
	GetCredentials(ctx context.Context) (*domain.Credentials, error)

	// AuthMethod returns the authentication method.
	AuthMethod() domain.AuthMethod

	// IsValid checks if the credentials are still valid.
	IsValid(ctx context.Context) bool
}

TokenProvider provides access tokens for API authentication. It handles token storage, retrieval, and automatic refresh for OAuth.

type TokenProviderFactory

type TokenProviderFactory interface {
	// Create creates a TokenProvider for a connection.
	// It looks up the connection by ID, decrypts credentials, and creates
	// an appropriate TokenProvider based on the auth method.
	Create(ctx context.Context, connectionID string) (TokenProvider, error)

	// CreateFromConnection creates a TokenProvider from a connection directly.
	// Use this when you already have the connection loaded.
	CreateFromConnection(ctx context.Context, conn *domain.Connection) (TokenProvider, error)
}

TokenProviderFactory creates TokenProviders from connection IDs. It resolves connection credentials and wraps them in appropriate TokenProviders.

type TokenRefresher

type TokenRefresher interface {
	// Refresh refreshes the OAuth tokens.
	// Returns the new tokens and updates the credentials in the store.
	Refresh(ctx context.Context, creds *domain.Credentials) (*domain.Credentials, error)
}

TokenRefresher handles OAuth token refresh operations. This is used internally by OAuth TokenProviders.

type UserStore

type UserStore interface {
	// Save creates or updates a user
	Save(ctx context.Context, user *domain.User) error

	// Get retrieves a user by ID
	Get(ctx context.Context, id string) (*domain.User, error)

	// GetByEmail retrieves a user by email
	GetByEmail(ctx context.Context, email string) (*domain.User, error)

	// List retrieves all users for a team
	List(ctx context.Context, teamID string) ([]*domain.User, error)

	// Delete deletes a user
	Delete(ctx context.Context, id string) error

	// UpdateLastLogin updates the last login timestamp
	UpdateLastLogin(ctx context.Context, id string) error
}

UserStore handles user persistence (PostgreSQL)

type VectorIndex

type VectorIndex interface {
	// Index adds a single embedding to the index
	Index(ctx context.Context, id string, documentID string, embedding []float32) error

	// IndexBatch adds multiple embeddings with their chunk content
	IndexBatch(ctx context.Context, ids []string, documentIDs []string, sourceIDs []string, contents []string, embeddings [][]float32) error

	// Search finds similar vectors, returns chunk IDs and distances
	Search(ctx context.Context, embedding []float32, k int) ([]string, []float64, error)

	// SearchWithContent finds similar vectors and returns chunk content alongside IDs/distances.
	// sourceIDs optionally filters results to specific sources (nil or empty = no filter).
	// documentIDs optionally filters results to specific documents (nil or empty = no filter).
	SearchWithContent(ctx context.Context, embedding []float32, k int, sourceIDs []string, documentIDs []string) ([]VectorSearchResult, error)

	// Delete removes a single embedding by chunk ID
	Delete(ctx context.Context, id string) error

	// DeleteBatch removes multiple embeddings by chunk IDs
	DeleteBatch(ctx context.Context, ids []string) error

	// DeleteByDocument removes all embeddings for a document
	DeleteByDocument(ctx context.Context, documentID string) error

	// DeleteByDocuments removes all embeddings for multiple documents in a single operation
	DeleteByDocuments(ctx context.Context, documentIDs []string) error

	// DeleteBySourceAndContainer removes all embeddings for a specific container within a source
	DeleteBySourceAndContainer(ctx context.Context, sourceID, containerID string) error

	// HealthCheck verifies the vector store is available
	HealthCheck(ctx context.Context) error
}

VectorIndex handles vector similarity search using a standalone embeddings table. This interface allows for dedicated vector store implementations (e.g., pgvector).

type VectorSearchResult

type VectorSearchResult struct {
	ChunkID    string
	DocumentID string
	Content    string
	Distance   float64
}

VectorSearchResult represents a chunk-level vector search result with content.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL