driven

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2026 License: Apache-2.0 Imports: 3 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AICredentials

type AICredentials struct {
	APIKey  string
	BaseURL string // Optional custom base URL
}

AICredentials holds AI provider API keys and base URLs from environment variables

type AIServiceFactory

type AIServiceFactory interface {
	// CreateEmbeddingService creates an embedding service from settings and credentials.
	// Credentials come from ConfigProvider (environment variables).
	// Returns error if settings are invalid or service creation fails.
	CreateEmbeddingService(settings *domain.EmbeddingSettings, credentials *AICredentials) (EmbeddingService, error)

	// CreateLLMService creates an LLM service from settings and credentials.
	// Credentials come from ConfigProvider (environment variables).
	// Returns error if settings are invalid or service creation fails.
	CreateLLMService(settings *domain.LLMSettings, credentials *AICredentials) (LLMService, error)
}

AIServiceFactory creates AI services based on configuration. Credentials are provided separately from domain settings.

type AuthAdapter

type AuthAdapter interface {
	// Password operations
	HashPassword(password string) (string, error)
	VerifyPassword(password, hash string) bool

	// Token operations
	GenerateToken(claims *domain.TokenClaims) (string, error)
	ParseToken(token string) (*domain.TokenClaims, error)
}

AuthAdapter handles authentication cryptographic operations. This does NOT handle storage - use SessionStore for session persistence.

type Capabilities

type Capabilities struct {
	// OAuth providers that are configured via environment variables
	OAuthProviders []domain.ProviderType

	// AI providers available for embedding
	EmbeddingProviders []domain.AIProvider

	// AI providers available for LLM
	LLMProviders []domain.AIProvider

	// SearchEngineAvailable indicates if a search engine (e.g. OpenSearch) was initialized
	SearchEngineAvailable bool

	// VectorStoreAvailable indicates if a vector store (e.g. pgvector) was initialized
	VectorStoreAvailable bool

	// Operational boundaries from environment variables
	Limits OperationalLimits
}

Capabilities represents what features are available based on environment configuration

type CapabilityStore

type CapabilityStore interface {
	// GetPreferences retrieves capability preferences for a team.
	// Returns error if team not found or database error occurs.
	GetPreferences(ctx context.Context, teamID string) (*domain.CapabilityPreferences, error)

	// SavePreferences persists capability preferences for a team.
	// Creates new preferences if they don't exist, updates if they do.
	SavePreferences(ctx context.Context, prefs *domain.CapabilityPreferences) error
}

CapabilityStore manages capability preferences persistence

type ChunkOptions

type ChunkOptions struct {
	MaxChunkSize int // Maximum characters per chunk
	Overlap      int // Character overlap between chunks
}

ChunkOptions configures chunking behavior

func DefaultChunkOptions

func DefaultChunkOptions() ChunkOptions

DefaultChunkOptions returns sensible defaults

type ChunkResult

type ChunkResult struct {
	Content   string
	StartChar int
	EndChar   int
	Position  int
}

ChunkResult represents a chunk with position info

type ChunkStore

type ChunkStore interface {
	// Save creates or updates a chunk
	Save(ctx context.Context, chunk *domain.Chunk) error

	// SaveBatch saves multiple chunks in a transaction
	SaveBatch(ctx context.Context, chunks []*domain.Chunk) error

	// GetByDocument retrieves all chunks for a document
	GetByDocument(ctx context.Context, documentID string) ([]*domain.Chunk, error)

	// Delete deletes a chunk
	Delete(ctx context.Context, id string) error

	// DeleteByDocument deletes all chunks for a document
	DeleteByDocument(ctx context.Context, documentID string) error

	// DeleteBySource deletes all chunks for a source
	DeleteBySource(ctx context.Context, sourceID string) error
}

ChunkStore handles chunk persistence (PostgreSQL)

type Chunker

type Chunker interface {
	// Chunk splits content into chunks
	Chunk(content string, opts ChunkOptions) []ChunkResult
}

Chunker splits document content into searchable chunks

type ConfigProvider

type ConfigProvider interface {
	// GetOAuthCredentials returns OAuth client credentials for a provider.
	// Returns nil if the provider is not configured in environment variables.
	GetOAuthCredentials(provider domain.ProviderType) *OAuthCredentials

	// GetAICredentials returns AI provider credentials (API key, base URL).
	// Returns nil if the provider is not configured in environment variables.
	GetAICredentials(provider domain.AIProvider) *AICredentials

	// IsOAuthConfigured returns true if OAuth credentials are available for the provider.
	IsOAuthConfigured(provider domain.ProviderType) bool

	// IsAIConfigured returns true if AI credentials are available for the provider.
	IsAIConfigured(provider domain.AIProvider) bool

	// GetCapabilities returns information about what's available based on env configuration.
	// This is used for the /api/v1/capabilities endpoint.
	GetCapabilities() *Capabilities

	// GetBaseURL returns the application base URL for OAuth callbacks.
	GetBaseURL() string
}

ConfigProvider provides access to configuration from environment variables. This is a driven port that abstracts environment-based configuration. Implementation lives in internal/config/ (infrastructure layer).

type ConnectionStore

type ConnectionStore interface {
	// Save stores a new connection or updates an existing one.
	// Secrets are encrypted before storage.
	Save(ctx context.Context, conn *domain.Connection) error

	// Get retrieves a connection by ID with decrypted secrets.
	// Returns domain.ErrNotFound if the connection doesn't exist.
	Get(ctx context.Context, id string) (*domain.Connection, error)

	// List retrieves all connections as summaries (no secrets).
	List(ctx context.Context) ([]*domain.ConnectionSummary, error)

	// Delete removes a connection by ID.
	// Returns domain.ErrNotFound if the connection doesn't exist.
	Delete(ctx context.Context, id string) error

	// GetByProvider retrieves connections for a provider type (no secrets).
	GetByProvider(ctx context.Context, providerType domain.ProviderType) ([]*domain.ConnectionSummary, error)

	// GetByAccountID retrieves a connection by provider type and account ID.
	// Returns nil if not found.
	GetByAccountID(ctx context.Context, providerType domain.ProviderType, accountID string) (*domain.Connection, error)

	// UpdateSecrets updates the encrypted secrets and OAuth metadata.
	// Used after token refresh.
	UpdateSecrets(ctx context.Context, id string, secrets *domain.ConnectionSecrets, expiry *time.Time) error

	// UpdateLastUsed updates the last_used_at timestamp.
	UpdateLastUsed(ctx context.Context, id string) error
}

ConnectionStore persists connector connections with encrypted secrets.

type Connector

type Connector interface {
	// Type returns the provider type.
	Type() domain.ProviderType

	// ValidateConfig validates source configuration.
	ValidateConfig(config domain.SourceConfig) error

	// FetchChanges fetches document changes since last sync.
	// Returns changes, next cursor, and error.
	// The cursor enables incremental sync - pass empty string for full sync.
	FetchChanges(ctx context.Context, source *domain.Source, cursor string) ([]*domain.Change, string, error)

	// FetchDocument fetches a single document by external ID.
	// Returns the document, content hash, and error.
	FetchDocument(ctx context.Context, source *domain.Source, externalID string) (*domain.Document, string, error)

	// TestConnection tests the connection to the source.
	TestConnection(ctx context.Context, source *domain.Source) error
}

Connector fetches documents from a source provider. Connectors are created by ConnectorBuilder with resolved credentials.

type ConnectorBuilder

type ConnectorBuilder interface {
	// Type returns the provider type this builder creates.
	Type() domain.ProviderType

	// Build creates a connector scoped to a specific container.
	// The containerID comes from source.SelectedContainers (one per sync job).
	// For providers that don't support container selection, containerID may be empty.
	// The TokenProvider handles credential retrieval and OAuth token refresh.
	Build(ctx context.Context, tokenProvider TokenProvider, containerID string) (Connector, error)

	// SupportsOAuth returns true if this connector uses OAuth authentication.
	SupportsOAuth() bool

	// OAuthConfig returns OAuth configuration for this provider.
	// Returns nil if the provider doesn't support OAuth.
	OAuthConfig() *OAuthConfig

	// SupportsContainerSelection returns true if this connector supports container picking.
	// If true, admins can select specific containers (repos, drives, spaces) to index.
	// If false, the connector indexes all accessible content.
	SupportsContainerSelection() bool
}

ConnectorBuilder creates connector instances for a specific provider type. Each provider has its own builder registered with the ConnectorFactory.

type ConnectorFactory

type ConnectorFactory interface {
	// Register registers a connector builder for a provider type.
	Register(builder ConnectorBuilder)

	// Create creates a connector for the given source, scoped to a container.
	// Called by SyncOrchestrator once per container in source.SelectedContainers.
	// For providers without container selection, containerID may be empty.
	Create(ctx context.Context, source *domain.Source, containerID string) (Connector, error)

	// SupportedTypes returns all registered provider types.
	SupportedTypes() []domain.ProviderType

	// GetBuilder returns the builder for a provider type.
	GetBuilder(providerType domain.ProviderType) (ConnectorBuilder, error)

	// SupportsOAuth returns true if the provider supports OAuth.
	SupportsOAuth(providerType domain.ProviderType) bool

	// GetOAuthConfig returns OAuth config for a provider.
	GetOAuthConfig(providerType domain.ProviderType) *OAuthConfig
}

ConnectorFactory manages connector builders and creates connectors.

type Container

type Container struct {
	// ID is the provider-specific identifier.
	// Format varies by provider:
	//   - GitHub: "owner/repo"
	//   - Google Drive: drive ID
	//   - Confluence: space key
	ID string `json:"id"`

	// Name is the human-readable display name.
	Name string `json:"name"`

	// Description is an optional description of the container.
	Description string `json:"description,omitempty"`

	// Type identifies the container type.
	// Examples: "repository", "shared_drive", "space", "project", "channel"
	Type string `json:"type"`

	// Metadata contains provider-specific additional data.
	// Examples:
	//   - GitHub: {"owner": "...", "private": "true", "archived": "false"}
	//   - Google Drive: {"shared": "true"}
	Metadata map[string]string `json:"metadata,omitempty"`
}

Container represents a top-level container that can be selected for indexing. Examples:

  • GitHub: repository (owner/repo)
  • Google Drive: shared drive
  • Confluence: space
  • Jira: project
  • Notion: workspace
  • Slack: channel

type ContainerLister

type ContainerLister interface {
	// ListContainers lists containers accessible with the current credentials.
	// Pagination is handled via cursor - pass empty string for the first page.
	// Returns:
	//   - containers: list of available containers
	//   - nextCursor: cursor for the next page, empty if no more pages
	//   - err: any error that occurred
	ListContainers(ctx context.Context, cursor string) ([]*Container, string, error)
}

ContainerLister lists available containers for a connector. Not all connectors support container selection - some index all accessible content.

The ContainerLister is typically retrieved from the ConnectorFactory for a specific provider type and installation.

type ContainerListerFactory

type ContainerListerFactory interface {
	// Create creates a ContainerLister for the given provider and installation.
	// Returns nil if the provider doesn't support container selection.
	Create(ctx context.Context, providerType domain.ProviderType, installationID string) (ContainerLister, error)

	// SupportsContainerSelection returns true if the provider supports container selection.
	// If false, the connector indexes all accessible content.
	SupportsContainerSelection(providerType domain.ProviderType) bool
}

ContainerListerFactory creates ContainerListers for different providers.

type ContentExtractor

type ContentExtractor interface {
	// Extract extracts text content from raw data
	Extract(ctx context.Context, data []byte, mimeType string) (string, error)

	// SupportedTypes returns supported MIME types
	SupportedTypes() []string
}

ContentExtractor extracts text content from various file formats

type CredentialsStore

type CredentialsStore interface {
	// Save stores credentials (encrypts sensitive fields)
	Save(ctx context.Context, creds *domain.Credentials) error

	// Get retrieves credentials by ID
	Get(ctx context.Context, id string) (*domain.Credentials, error)

	// List retrieves all credentials
	List(ctx context.Context) ([]*domain.Credentials, error)

	// Delete deletes credentials
	Delete(ctx context.Context, id string) error

	// GetByProvider retrieves credentials for a provider type
	GetByProvider(ctx context.Context, providerType domain.ProviderType) ([]*domain.Credentials, error)
}

CredentialsStore handles credential persistence (PostgreSQL, encrypted)

type DistributedLock

type DistributedLock interface {
	// Acquire attempts to acquire a named lock with the given TTL.
	// Returns true if the lock was successfully acquired, false if already held by another instance.
	// The lock will automatically expire after TTL (implementation dependent).
	Acquire(ctx context.Context, name string, ttl time.Duration) (acquired bool, err error)

	// Release releases a named lock.
	// This is best-effort; implementations with TTL will auto-expire anyway.
	// Safe to call even if the lock is not held or has expired.
	Release(ctx context.Context, name string) error

	// Extend extends the TTL of a currently held lock.
	// Returns error if the lock is not held by this instance.
	// Note: Not all implementations support TTL extension (e.g., PostgreSQL advisory locks).
	Extend(ctx context.Context, name string, ttl time.Duration) error

	// Ping checks if the lock backend is healthy.
	Ping(ctx context.Context) error
}

DistributedLock provides distributed locking for coordinating work across instances. This is used to prevent duplicate execution in multi-instance deployments.

type DocumentResult

type DocumentResult struct {
	DocumentID string
	SourceID   string
	Title      string
	Content    string // Full document content (for snippet extraction)
	Score      float64
}

DocumentResult represents a document-level search result from BM25.

type DocumentStore

type DocumentStore interface {
	// Save creates or updates a document
	Save(ctx context.Context, doc *domain.Document) error

	// SaveBatch saves multiple documents in a transaction
	SaveBatch(ctx context.Context, docs []*domain.Document) error

	// Get retrieves a document by ID
	Get(ctx context.Context, id string) (*domain.Document, error)

	// GetByExternalID retrieves a document by source and external ID
	GetByExternalID(ctx context.Context, sourceID, externalID string) (*domain.Document, error)

	// GetBySource retrieves all documents for a source with pagination
	GetBySource(ctx context.Context, sourceID string, limit, offset int) ([]*domain.Document, error)

	// Delete deletes a document
	Delete(ctx context.Context, id string) error

	// DeleteBySource deletes all documents for a source
	DeleteBySource(ctx context.Context, sourceID string) error

	// DeleteBatch deletes multiple documents by ID
	DeleteBatch(ctx context.Context, ids []string) error

	// Count returns total document count
	Count(ctx context.Context) (int, error)

	// CountBySource returns document count for a source
	CountBySource(ctx context.Context, sourceID string) (int, error)

	// ListExternalIDs returns all external IDs for a source (for diff sync)
	ListExternalIDs(ctx context.Context, sourceID string) ([]string, error)
}

DocumentStore handles document persistence (PostgreSQL)

type EmbeddingService

type EmbeddingService interface {
	// Embed generates embeddings for multiple texts
	Embed(ctx context.Context, texts []string) ([][]float32, error)

	// EmbedQuery generates an embedding for a search query
	// May use different model/parameters optimized for queries
	EmbedQuery(ctx context.Context, query string) ([]float32, error)

	// Dimensions returns the embedding dimension size
	Dimensions() int

	// Model returns the model name being used
	Model() string

	// HealthCheck verifies the embedding service is available
	HealthCheck(ctx context.Context) error

	// Close releases resources held by the embedding service
	Close() error
}

EmbeddingService generates text embeddings

type LLMService

type LLMService interface {
	// ExpandQuery takes a search query and returns expanded/related terms
	// Useful for improving search recall
	ExpandQuery(ctx context.Context, query string) ([]string, error)

	// Summarise generates a summary of the given content
	// maxLen is a hint for maximum length (model may not respect exactly)
	Summarise(ctx context.Context, content string, maxLen int) (string, error)

	// RewriteQuery rewrites the query for better search results
	// Returns the rewritten query
	RewriteQuery(ctx context.Context, query string) (string, error)

	// Model returns the model name being used
	Model() string

	// Ping verifies the LLM service is available
	Ping(ctx context.Context) error

	// Close releases resources held by the LLM service
	Close() error
}

LLMService provides large language model capabilities for search enhancement

type Normaliser

type Normaliser interface {
	// Normalise transforms raw content into normalized text.
	// The mimeType helps determine the appropriate processing.
	Normalise(content string, mimeType string) string

	// SupportedTypes returns MIME types this normaliser handles.
	// Can include wildcards like "text/*" or specific types like "text/markdown".
	SupportedTypes() []string

	// Priority returns the normaliser priority (higher = more specific).
	// Priority ranges:
	//   90-100: Connector-specific (e.g., Gmail email normaliser)
	//   50-89:  Format-specific (PDF, Markdown, HTML)
	//   10-49:  Generic (basic text processing)
	//   1-9:    Fallback (raw text extraction)
	Priority() int
}

Normaliser normalizes raw document content for indexing. It transforms provider-specific document formats into normalized text.

type NormaliserRegistry

type NormaliserRegistry interface {
	// Get retrieves the best-matching normaliser for a MIME type.
	// Returns nil if no normaliser is registered for the type.
	// When multiple match, the highest priority normaliser is returned.
	Get(mimeType string) Normaliser

	// GetAll retrieves all normalisers that match a MIME type, sorted by priority (highest first).
	GetAll(mimeType string) []Normaliser

	// Register registers a normaliser.
	Register(normaliser Normaliser)

	// List returns all registered MIME types.
	List() []string
}

NormaliserRegistry manages content normalisers. When multiple normalisers match a MIME type, the highest priority one is used.

type OAuthConfig

type OAuthConfig struct {
	// AuthURL is the authorization endpoint
	AuthURL string

	// TokenURL is the token exchange endpoint
	TokenURL string

	// Scopes are the required OAuth scopes
	Scopes []string

	// UserInfoURL is the endpoint to fetch user info (optional)
	UserInfoURL string
}

OAuthConfig contains OAuth settings for a provider.

type OAuthCredentials

type OAuthCredentials struct {
	ClientID     string
	ClientSecret string
}

OAuthCredentials holds OAuth client credentials from environment variables

type OAuthHandler

type OAuthHandler interface {
	// DefaultConfig returns the default OAuth configuration (auth URL, token URL, scopes)
	DefaultConfig() OAuthConfig

	// BuildAuthURL constructs the OAuth authorization URL with PKCE
	BuildAuthURL(clientID, redirectURI, state, codeChallenge string, scopes []string) string

	// ExchangeCode exchanges an authorization code for access tokens
	ExchangeCode(ctx context.Context, clientID, clientSecret, code, redirectURI, codeVerifier string) (*OAuthToken, error)

	// GetUserInfo retrieves user information using an access token
	GetUserInfo(ctx context.Context, accessToken string) (*OAuthUserInfo, error)

	// RefreshToken refreshes an expired access token
	RefreshToken(ctx context.Context, refreshToken string) (*OAuthToken, error)
}

OAuthHandler handles OAuth flow for a provider.

type OAuthHandlerFactory

type OAuthHandlerFactory interface {
	// GetOAuthHandler returns an OAuth handler for the given provider.
	// Returns nil if the provider doesn't support OAuth or isn't implemented.
	GetOAuthHandler(providerType domain.ProviderType) OAuthHandler
}

OAuthHandlerFactory creates OAuth handlers for specific providers. This abstracts the connectors.Factory dependency.

type OAuthState

type OAuthState struct {
	// State is a cryptographically random string used for CSRF protection.
	State string

	// ProviderType is the OAuth provider (github, google_drive, etc.)
	ProviderType string

	// CodeVerifier is the PKCE code verifier (plain text, not hashed).
	// This is used to generate code_challenge for the auth request
	// and sent as code_verifier during token exchange.
	CodeVerifier string

	// RedirectURI is the callback URL where the provider will redirect.
	RedirectURI string

	// ReturnContext indicates where to redirect after OAuth completes.
	// Values: "setup", "admin-sources", or empty for default behavior.
	ReturnContext string

	// CreatedAt is when the state was created.
	CreatedAt time.Time

	// ExpiresAt is when the state expires (typically 10 minutes).
	ExpiresAt time.Time
}

OAuthState represents a pending OAuth authorization flow state. Used for CSRF protection and PKCE code verifier storage.

type OAuthStateStore

type OAuthStateStore interface {
	// Save stores a new OAuth state.
	// The state typically expires in 10 minutes.
	Save(ctx context.Context, state *OAuthState) error

	// GetAndDelete atomically retrieves and deletes the state.
	// This ensures single-use semantics.
	// Returns nil, nil if the state doesn't exist or has expired.
	GetAndDelete(ctx context.Context, state string) (*OAuthState, error)

	// Cleanup removes expired states.
	// Should be called periodically (e.g., every hour) to clean up orphaned states.
	Cleanup(ctx context.Context) error
}

OAuthStateStore manages OAuth flow state for CSRF protection. States are single-use and expire after a short period.

type OAuthToken

type OAuthToken struct {
	AccessToken  string
	RefreshToken string
	ExpiresIn    int    // Seconds until expiry
	TokenType    string // Usually "Bearer"
	Scope        string // Space-separated scopes
}

OAuthToken represents OAuth tokens from a provider.

type OAuthTokenProvider

type OAuthTokenProvider struct {
	// contains filtered or unexported fields
}

OAuthTokenProvider implements TokenProvider for OAuth credentials. It automatically refreshes tokens when they expire.

func NewOAuthTokenProvider

func NewOAuthTokenProvider(
	creds *domain.Credentials,
	refresher TokenRefresher,
	store CredentialsStore,
) *OAuthTokenProvider

NewOAuthTokenProvider creates a token provider for OAuth credentials.

func (*OAuthTokenProvider) AuthMethod

func (p *OAuthTokenProvider) AuthMethod() domain.AuthMethod

AuthMethod returns OAuth2.

func (*OAuthTokenProvider) GetAccessToken

func (p *OAuthTokenProvider) GetAccessToken(ctx context.Context) (string, error)

GetAccessToken returns a valid access token, refreshing if needed.

func (*OAuthTokenProvider) GetCredentials

func (p *OAuthTokenProvider) GetCredentials(ctx context.Context) (*domain.Credentials, error)

GetCredentials returns the full credentials.

func (*OAuthTokenProvider) IsValid

func (p *OAuthTokenProvider) IsValid(ctx context.Context) bool

IsValid checks if credentials are valid (not expired or can be refreshed).

type OAuthUserInfo

type OAuthUserInfo struct {
	ID       string // Provider-specific user ID
	Email    string
	Name     string
	ImageURL string
}

OAuthUserInfo represents user info from OAuth provider.

type OperationalLimits

type OperationalLimits struct {
	SyncMinInterval   int // Minutes floor (default: 5)
	SyncMaxInterval   int // Minutes ceiling (default: 1440)
	MaxWorkers        int // Worker ceiling (default: 10)
	MaxResultsPerPage int // Results ceiling (default: 100)
}

OperationalLimits defines guardrails from environment configuration

type QueueStats

type QueueStats struct {
	// PendingCount is the number of tasks waiting to be processed
	PendingCount int64 `json:"pending_count"`

	// ProcessingCount is the number of tasks currently being processed
	ProcessingCount int64 `json:"processing_count"`

	// CompletedCount is the number of successfully completed tasks
	CompletedCount int64 `json:"completed_count"`

	// FailedCount is the number of tasks that failed after all retries
	FailedCount int64 `json:"failed_count"`

	// OldestPendingAge is the age of the oldest pending task in seconds
	OldestPendingAge int64 `json:"oldest_pending_age"`
}

QueueStats contains queue statistics

type SchedulerStore

type SchedulerStore interface {
	// GetScheduledTask retrieves a scheduled task by ID
	GetScheduledTask(ctx context.Context, id string) (*domain.ScheduledTask, error)

	// ListScheduledTasks retrieves all scheduled tasks for a team
	ListScheduledTasks(ctx context.Context, teamID string) ([]*domain.ScheduledTask, error)

	// SaveScheduledTask creates or updates a scheduled task
	SaveScheduledTask(ctx context.Context, task *domain.ScheduledTask) error

	// DeleteScheduledTask removes a scheduled task
	DeleteScheduledTask(ctx context.Context, id string) error

	// GetDueScheduledTasks retrieves scheduled tasks that are due to run
	GetDueScheduledTasks(ctx context.Context) ([]*domain.ScheduledTask, error)

	// UpdateLastRun updates the last run time and next run time
	UpdateLastRun(ctx context.Context, id string, lastError string) error
}

SchedulerStore handles persistence for scheduled tasks. This is separate from TaskQueue because scheduled tasks are configuration, not transient queue items.

type SearchEngine

type SearchEngine interface {
	// Index indexes chunks for a document (legacy, kept for backward compat)
	Index(ctx context.Context, chunks []*domain.Chunk) error

	// IndexDocument indexes a full document for BM25 text search.
	// Uses document_id as the OpenSearch document ID (upsert semantics).
	IndexDocument(ctx context.Context, doc *domain.DocumentContent) error

	// Search performs a search query (legacy chunk-level search)
	Search(ctx context.Context, query string, queryEmbedding []float32, opts domain.SearchOptions) ([]*domain.RankedChunk, int, error)

	// SearchDocuments performs a BM25 text search returning document-level results.
	SearchDocuments(ctx context.Context, query string, opts domain.SearchOptions) ([]DocumentResult, int, error)

	// Delete deletes chunks by IDs
	Delete(ctx context.Context, chunkIDs []string) error

	// DeleteByDocument deletes all indexed data for a document
	DeleteByDocument(ctx context.Context, documentID string) error

	// DeleteByDocuments deletes all indexed data for multiple documents in a single operation
	DeleteByDocuments(ctx context.Context, documentIDs []string) error

	// DeleteBySource deletes all indexed data for a source
	DeleteBySource(ctx context.Context, sourceID string) error

	// HealthCheck verifies the search engine is available
	HealthCheck(ctx context.Context) error

	// Count returns the total number of indexed documents
	Count(ctx context.Context) (int64, error)
}

SearchEngine handles search indexing and querying

type SearchQueryRepository

type SearchQueryRepository interface {
	// Save logs a search query for analytics tracking
	Save(ctx context.Context, query *domain.SearchQuery) error

	// GetSearchHistory retrieves recent search queries
	// Returns up to limit most recent searches, ordered by created_at desc
	GetSearchHistory(ctx context.Context, teamID string, limit int) ([]*domain.SearchQuery, error)

	// GetSearchAnalytics computes aggregated search analytics for a time period
	GetSearchAnalytics(ctx context.Context, teamID string, period domain.AnalyticsPeriod) (*domain.SearchAnalytics, error)

	// GetSearchMetrics computes performance metrics for a time period
	GetSearchMetrics(ctx context.Context, teamID string, period domain.AnalyticsPeriod) (*domain.SearchMetrics, error)
}

SearchQueryRepository handles search query logging and analytics

type SessionStore

type SessionStore interface {
	// Save stores a session with TTL based on ExpiresAt
	Save(ctx context.Context, session *domain.Session) error

	// Get retrieves a session by ID
	Get(ctx context.Context, id string) (*domain.Session, error)

	// GetByToken retrieves a session by token value
	GetByToken(ctx context.Context, token string) (*domain.Session, error)

	// GetByRefreshToken retrieves a session by refresh token value
	GetByRefreshToken(ctx context.Context, refreshToken string) (*domain.Session, error)

	// Delete deletes a session
	Delete(ctx context.Context, id string) error

	// DeleteByToken deletes a session by token
	DeleteByToken(ctx context.Context, token string) error

	// DeleteByUser deletes all sessions for a user (logout everywhere)
	DeleteByUser(ctx context.Context, userID string) error

	// ListByUser lists all active sessions for a user
	ListByUser(ctx context.Context, userID string) ([]*domain.Session, error)
}

SessionStore handles session persistence (Redis)

type SettingsStore

type SettingsStore interface {
	// GetSettings retrieves settings for a team
	GetSettings(ctx context.Context, teamID string) (*domain.Settings, error)

	// SaveSettings persists team settings
	SaveSettings(ctx context.Context, settings *domain.Settings) error

	// GetAISettings retrieves AI-specific settings for a team
	GetAISettings(ctx context.Context, teamID string) (*domain.AISettings, error)

	// SaveAISettings persists AI-specific settings
	SaveAISettings(ctx context.Context, teamID string, settings *domain.AISettings) error
}

SettingsStore persists team and AI settings

type SourceStore

type SourceStore interface {
	// Save creates or updates a source
	Save(ctx context.Context, source *domain.Source) error

	// Get retrieves a source by ID
	Get(ctx context.Context, id string) (*domain.Source, error)

	// GetByName retrieves a source by name
	GetByName(ctx context.Context, name string) (*domain.Source, error)

	// List retrieves all sources
	List(ctx context.Context) ([]*domain.Source, error)

	// ListEnabled retrieves all enabled sources
	ListEnabled(ctx context.Context) ([]*domain.Source, error)

	// Delete deletes a source
	Delete(ctx context.Context, id string) error

	// SetEnabled updates the enabled status
	SetEnabled(ctx context.Context, id string, enabled bool) error

	// CountByConnection returns the number of sources using a connection
	CountByConnection(ctx context.Context, connectionID string) (int, error)

	// ListByConnection returns sources using a connection
	ListByConnection(ctx context.Context, connectionID string) ([]*domain.Source, error)

	// UpdateContainers updates the containers for a source
	UpdateContainers(ctx context.Context, id string, containers []domain.Container) error
}

SourceStore handles source persistence (PostgreSQL)

type StaticTokenProvider

type StaticTokenProvider struct {
	// contains filtered or unexported fields
}

StaticTokenProvider implements TokenProvider for non-OAuth credentials. Used for API keys, PATs, and basic auth.

func NewStaticTokenProvider

func NewStaticTokenProvider(creds *domain.Credentials) *StaticTokenProvider

NewStaticTokenProvider creates a token provider for static credentials.

func (*StaticTokenProvider) AuthMethod

func (p *StaticTokenProvider) AuthMethod() domain.AuthMethod

AuthMethod returns the authentication method.

func (*StaticTokenProvider) GetAccessToken

func (p *StaticTokenProvider) GetAccessToken(ctx context.Context) (string, error)

GetAccessToken returns the API key or PAT.

func (*StaticTokenProvider) GetCredentials

func (p *StaticTokenProvider) GetCredentials(ctx context.Context) (*domain.Credentials, error)

GetCredentials returns the full credentials.

func (*StaticTokenProvider) IsValid

func (p *StaticTokenProvider) IsValid(ctx context.Context) bool

IsValid returns true - static credentials don't expire.

type SyncStateStore

type SyncStateStore interface {
	// Save creates or updates sync state
	Save(ctx context.Context, state *domain.SyncState) error

	// Get retrieves sync state for a source
	Get(ctx context.Context, sourceID string) (*domain.SyncState, error)

	// List retrieves sync states for all sources
	List(ctx context.Context) ([]*domain.SyncState, error)

	// Delete deletes sync state for a source
	Delete(ctx context.Context, sourceID string) error

	// UpdateStatus updates only the status field
	UpdateStatus(ctx context.Context, sourceID string, status domain.SyncStatus) error

	// UpdateCursor updates the sync cursor
	UpdateCursor(ctx context.Context, sourceID string, cursor string) error
}

SyncStateStore handles sync state persistence (PostgreSQL)

type TaskFilter

type TaskFilter struct {
	// TeamID filters by team (required)
	TeamID string

	// Status filters by task status (optional, empty means all)
	Status domain.TaskStatus

	// Type filters by task type (optional, empty means all)
	Type domain.TaskType

	// Limit is the maximum number of tasks to return
	Limit int

	// Offset is the number of tasks to skip (for pagination)
	Offset int
}

TaskFilter specifies criteria for listing tasks

type TaskQueue

type TaskQueue interface {
	// Enqueue adds a task to the queue for processing.
	// The task will be picked up by a worker based on priority and scheduled time.
	Enqueue(ctx context.Context, task *domain.Task) error

	// EnqueueBatch adds multiple tasks to the queue atomically.
	// If any task fails to enqueue, all tasks are rolled back.
	EnqueueBatch(ctx context.Context, tasks []*domain.Task) error

	// Dequeue retrieves the next available task for processing.
	// This should block until a task is available or context is cancelled.
	// The task is marked as processing and will not be returned to other workers.
	// Returns nil, nil if no tasks are available (for non-blocking implementations).
	Dequeue(ctx context.Context) (*domain.Task, error)

	// DequeueWithTimeout retrieves the next available task, waiting up to timeout.
	// Returns nil, nil if timeout is reached with no tasks available.
	DequeueWithTimeout(ctx context.Context, timeout int) (*domain.Task, error)

	// Ack acknowledges successful completion of a task.
	// The task is removed from the queue.
	Ack(ctx context.Context, taskID string) error

	// Nack indicates task processing failed and should be retried.
	// The task is returned to the queue with updated retry count.
	// If max retries exceeded, task is moved to failed state.
	Nack(ctx context.Context, taskID string, reason string) error

	// GetTask retrieves a task by ID (for status checking).
	GetTask(ctx context.Context, taskID string) (*domain.Task, error)

	// ListTasks retrieves tasks matching the filter criteria.
	ListTasks(ctx context.Context, filter TaskFilter) ([]*domain.Task, error)

	// CancelTask marks a pending task as cancelled.
	// Returns error if task is already processing or completed.
	CancelTask(ctx context.Context, taskID string) error

	// PurgeTasks removes completed/failed tasks older than the specified age.
	// This is used for cleanup.
	PurgeTasks(ctx context.Context, olderThan int) (int, error)

	// Stats returns queue statistics.
	Stats(ctx context.Context) (*QueueStats, error)

	// Ping checks if the queue backend is healthy.
	Ping(ctx context.Context) error

	// GetJobStats computes aggregated job statistics for a time period
	// This is used by the admin dashboard to show job execution metrics
	GetJobStats(ctx context.Context, teamID string, period domain.AnalyticsPeriod) (*domain.JobStats, error)

	// CountTasks returns the total number of tasks matching the filter
	// This is used for pagination to determine if there are more results
	CountTasks(ctx context.Context, filter TaskFilter) (int64, error)

	// Close cleans up resources.
	Close() error
}

TaskQueue handles background task queuing and processing. Implementations can use Redis (preferred) or Postgres (fallback).

type TokenProvider

type TokenProvider interface {
	// GetAccessToken returns a valid access token.
	// For OAuth, this automatically refreshes expired tokens.
	// For API keys, this returns the stored key.
	GetAccessToken(ctx context.Context) (string, error)

	// GetCredentials returns the full credentials.
	// Use GetAccessToken for most operations - this is for special cases.
	GetCredentials(ctx context.Context) (*domain.Credentials, error)

	// AuthMethod returns the authentication method.
	AuthMethod() domain.AuthMethod

	// IsValid checks if the credentials are still valid.
	IsValid(ctx context.Context) bool
}

TokenProvider provides access tokens for API authentication. It handles token storage, retrieval, and automatic refresh for OAuth.

type TokenProviderFactory

type TokenProviderFactory interface {
	// Create creates a TokenProvider for a connection.
	// It looks up the connection by ID, decrypts credentials, and creates
	// an appropriate TokenProvider based on the auth method.
	Create(ctx context.Context, connectionID string) (TokenProvider, error)

	// CreateFromConnection creates a TokenProvider from a connection directly.
	// Use this when you already have the connection loaded.
	CreateFromConnection(ctx context.Context, conn *domain.Connection) (TokenProvider, error)
}

TokenProviderFactory creates TokenProviders from connection IDs. It resolves connection credentials and wraps them in appropriate TokenProviders.

type TokenRefresher

type TokenRefresher interface {
	// Refresh refreshes the OAuth tokens.
	// Returns the new tokens and updates the credentials in the store.
	Refresh(ctx context.Context, creds *domain.Credentials) (*domain.Credentials, error)
}

TokenRefresher handles OAuth token refresh operations. This is used internally by OAuth TokenProviders.

type UserStore

type UserStore interface {
	// Save creates or updates a user
	Save(ctx context.Context, user *domain.User) error

	// Get retrieves a user by ID
	Get(ctx context.Context, id string) (*domain.User, error)

	// GetByEmail retrieves a user by email
	GetByEmail(ctx context.Context, email string) (*domain.User, error)

	// List retrieves all users for a team
	List(ctx context.Context, teamID string) ([]*domain.User, error)

	// Delete deletes a user
	Delete(ctx context.Context, id string) error

	// UpdateLastLogin updates the last login timestamp
	UpdateLastLogin(ctx context.Context, id string) error
}

UserStore handles user persistence (PostgreSQL)

type VectorIndex

type VectorIndex interface {
	// Index adds a single embedding to the index
	Index(ctx context.Context, id string, documentID string, embedding []float32) error

	// IndexBatch adds multiple embeddings with their chunk content
	IndexBatch(ctx context.Context, ids []string, documentIDs []string, contents []string, embeddings [][]float32) error

	// Search finds similar vectors, returns chunk IDs and distances
	Search(ctx context.Context, embedding []float32, k int) ([]string, []float64, error)

	// SearchWithContent finds similar vectors and returns chunk content alongside IDs/distances.
	SearchWithContent(ctx context.Context, embedding []float32, k int) ([]VectorSearchResult, error)

	// Delete removes a single embedding by chunk ID
	Delete(ctx context.Context, id string) error

	// DeleteBatch removes multiple embeddings by chunk IDs
	DeleteBatch(ctx context.Context, ids []string) error

	// DeleteByDocument removes all embeddings for a document
	DeleteByDocument(ctx context.Context, documentID string) error

	// DeleteByDocuments removes all embeddings for multiple documents in a single operation
	DeleteByDocuments(ctx context.Context, documentIDs []string) error

	// HealthCheck verifies the vector store is available
	HealthCheck(ctx context.Context) error
}

VectorIndex handles vector similarity search using a standalone embeddings table. This interface allows for dedicated vector store implementations (e.g., pgvector).

type VectorSearchResult

type VectorSearchResult struct {
	ChunkID    string
	DocumentID string
	Content    string
	Distance   float64
}

VectorSearchResult represents a chunk-level vector search result with content.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL