datasource

package
v0.1.8-rc.24 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 1, 2026 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Package datasource defines the unified data source abstraction for Genie: connectors that enumerate items from external systems (Drive, Gmail, Slack, Linear, GitHub, Calendar) and produce normalized items for vectorization. The sync pipeline consumes these items and upserts them into the vector store so memory_search can query across all sources.

Index

Constants

View Source
const MaxSearchKeywords = 10

MaxSearchKeywords is the maximum number of search keywords allowed (setup and config).

Variables

This section is empty.

Functions

func BuildConnectors

func BuildConnectors(ctx context.Context, cfg *Config, opts ConnectorOptions, extra ...DataSource) map[string]DataSource

BuildConnectors returns a slice of DataSource connectors for all enabled sources in cfg. For each enabled source name:

  • If a ConnectorFactory is registered (via RegisterConnectorFactory), it is called with ctx and opts to build the connector.
  • If no factory is registered, the source name is skipped (MCP-backed sources such as jira/confluence/servicenow are expected to be provided by the caller via the extra parameter and passed to a separate sync pipeline).

extra is an optional list of additional DataSource connectors to include (e.g. MCP-backed sources built by the caller from mcpresource.NewFromServerName). Connectors in extra whose Name() is not in cfg.EnabledSourceNames() are filtered out.

func ItemMatchesKeywords

func ItemMatchesKeywords(item *NormalizedItem, keywords []string) bool

ItemMatchesKeywords returns true if item content or any metadata value contains at least one of the keywords (substring, case-insensitive). When keywords is nil or empty, returns true (no filter).

func RegisterConnectorFactory

func RegisterConnectorFactory(sourceName string, factory ConnectorFactory)

RegisterConnectorFactory registers a ConnectorFactory for the given source name. Call from an init() function in datasource-specific packages (e.g. gmail, gdrive) so the factory is picked up automatically when the package is imported. Subsequent calls override the previous factory for that source name.

func SaveSyncState

func SaveSyncState(dir string, state SyncState) error

SaveSyncState writes the sync state to the given directory. Creates the directory if needed. The file is syncStateFilename inside dir.

Types

type CalendarSourceConfig

type CalendarSourceConfig struct {
	Enabled     bool     `yaml:"enabled,omitempty" toml:"enabled,omitempty"`
	CalendarIDs []string `yaml:"calendar_ids,omitempty" toml:"calendar_ids,omitempty"`
}

CalendarSourceConfig enables and scopes the Calendar data source.

func (*CalendarSourceConfig) IsEnabled

func (c *CalendarSourceConfig) IsEnabled() bool

func (*CalendarSourceConfig) ScopeValues

func (c *CalendarSourceConfig) ScopeValues() []string

type Config

type Config struct {
	// Enabled turns the data sources layer on or off. When false, no sync
	// runs and no connector is instantiated for vectorization.
	Enabled bool `yaml:"enabled,omitempty" toml:"enabled,omitempty"`

	// SyncInterval is how often the sync job runs (e.g. "15m", "1h"). When
	// empty or zero, sync is on-demand only (e.g. via a tool or admin API).
	SyncInterval time.Duration `yaml:"sync_interval,omitempty" toml:"sync_interval,omitempty"`

	// SearchKeywords limits which items are indexed: only items whose content or
	// metadata contains at least one of these keywords (case-insensitive) are
	// embedded. Up to MaxSearchKeywords (10). When empty, all items are indexed.
	SearchKeywords []string `yaml:"search_keywords,omitempty" toml:"search_keywords,omitempty"`

	// GDrive scopes vectorization to the given folder IDs. Credentials come
	// from the main [google_drive] (or env). Omit or set Enabled false to skip.
	GDrive *GDriveSourceConfig `yaml:"gdrive,omitempty" toml:"gdrive,omitempty"`

	// Gmail scopes vectorization to the given labels. Credentials come from
	// the email/Gmail config. Omit or set Enabled false to skip.
	Gmail *GmailSourceConfig `yaml:"gmail,omitempty" toml:"gmail,omitempty"`

	// Slack scopes vectorization to the given channel IDs. Omit or set
	// Enabled false to skip.
	Slack *SlackSourceConfig `yaml:"slack,omitempty" toml:"slack,omitempty"`

	// Linear scopes vectorization to the given team IDs. Omit or set
	// Enabled false to skip.
	Linear *LinearSourceConfig `yaml:"linear,omitempty" toml:"linear,omitempty"`

	// Calendar scopes vectorization to the given calendar IDs. Omit or set
	// Enabled false to skip.
	Calendar *CalendarSourceConfig `yaml:"calendar,omitempty" toml:"calendar,omitempty"`

	// Jira scopes vectorization to the given project keys. Omit or set
	// Enabled false to skip. Requires a Jira MCP server in [mcp] config.
	Jira *JiraSourceConfig `yaml:"jira,omitempty" toml:"jira,omitempty"`

	// Confluence scopes vectorization to the given space keys. Omit or set
	// Enabled false to skip. Requires a Confluence MCP server in [mcp] config.
	Confluence *ConfluenceSourceConfig `yaml:"confluence,omitempty" toml:"confluence,omitempty"`

	// ServiceNow scopes vectorization to the given table names. Omit or set
	// Enabled false to skip. Requires a ServiceNow MCP server in [mcp] config.
	ServiceNow *ServiceNowSourceConfig `yaml:"servicenow,omitempty" toml:"servicenow,omitempty"`

	// ExternalSources holds additional SourceConfig entries registered at
	// runtime (e.g. by SCM config). These are keyed by source name and
	// participate in ScopeFromConfig/EnabledSourceNames automatically.
	ExternalSources map[string]SourceConfig `yaml:"-" toml:"-"`
}

Config holds the unified data sources configuration: a master switch, optional sync schedule, and per-source enable/scope. Credentials for each system (e.g. [google_drive], email, Slack) remain in their existing config sections; this block only controls which sources are vectorized and their scope (folders, channels, labels, etc.). When a source is disabled or its scope is empty, the sync pipeline skips it.

func (*Config) EnabledSourceNames

func (c *Config) EnabledSourceNames() []string

EnabledSourceNames returns the list of source names that are enabled and have non-empty scope in this config. The sync pipeline can iterate this to decide which connectors to run.

func (*Config) ScopeFromConfig

func (c *Config) ScopeFromConfig(sourceName string) Scope

ScopeFromConfig builds a Scope from the current Config for the given source name. It is used by the sync pipeline to pass the right scope to each connector's ListItems. Returns a zero Scope when the source is disabled or has no scope configured.

func (*Config) SearchKeywordsTrimmed

func (c *Config) SearchKeywordsTrimmed() []string

SearchKeywordsTrimmed returns SearchKeywords limited to MaxSearchKeywords, with empty strings and duplicates (case-insensitive) removed.

type ConfluenceSourceConfig

type ConfluenceSourceConfig struct {
	Enabled   bool     `yaml:"enabled,omitempty" toml:"enabled,omitempty"`
	SpaceKeys []string `yaml:"space_keys,omitempty" toml:"space_keys,omitempty"`
	MCPServer string   `yaml:"mcp_server,omitempty" toml:"mcp_server,omitempty"`
}

ConfluenceSourceConfig enables and scopes the Confluence data source. Requires a corresponding MCP server named "confluence" (or as specified by MCPServer).

func (*ConfluenceSourceConfig) IsEnabled

func (c *ConfluenceSourceConfig) IsEnabled() bool

func (*ConfluenceSourceConfig) ScopeValues

func (c *ConfluenceSourceConfig) ScopeValues() []string

type ConnectorFactory

type ConnectorFactory func(ctx context.Context, opts ConnectorOptions) DataSource

ConnectorFactory is a function that builds a DataSource for a given source name, using the provided context and ConnectorOptions. It returns nil when it cannot build the connector (e.g. missing credentials); the registry skips nil connectors silently.

Packages register factories via RegisterConnectorFactory in their init() functions, so app.go only needs to blank-import the package to activate it.

type ConnectorOptions

type ConnectorOptions struct {
	// SecretProvider is the application's secret provider, passed as any to
	// avoid importing pkg/security from the datasource package. Each factory
	// type-asserts to security.SecretProvider before use.
	SecretProvider any
	// DocParser is the optional document parsing provider (e.g. Docling Serve)
	// for extracting text from binary files (PDF, DOCX, images). When nil,
	// connectors skip document parsing and produce header-only items for binary files.
	DocParser docparser.Provider
	// ExtraInt holds integer extras keyed by name (e.g. "max_depth").
	ExtraInt map[string]int
	// ExtraString holds string extras keyed by name (e.g. "credentials_file", "scm_provider").
	ExtraString map[string]string
}

ConnectorOptions holds the dependencies a factory may need to build its connector. All fields are optional; factories should check for nil / empty before use and return nil when required dependencies are missing.

type DataSource

type DataSource interface {
	// Name returns the source identifier (e.g. "gdrive", "gmail", "slack", "linear", "github", "calendar").
	// It is used as the "source" field in NormalizedItem and in metadata for filtering.
	Name() string

	// ListItems returns all items in scope for this source. Scope is
	// source-specific (e.g. folder IDs, channel IDs, label IDs) and is
	// supplied when the connector is constructed from config. The caller
	// (sync pipeline) then maps each NormalizedItem to a vector.BatchItem
	// and upserts into the vector store.
	ListItems(ctx context.Context, scope Scope) ([]NormalizedItem, error)
}

DataSource is the contract for a connector that can list items from an external system in a normalized shape. Each connector (Slack, Drive, Gmail, etc.) implements this interface; the sync job calls ListItems (or ListItemsSince when supported) and turns results into vector.BatchItem for Upsert. Auth and credentials are per-source and handled by the connector; this interface is scope and item enumeration only.

type GDriveSourceConfig

type GDriveSourceConfig struct {
	Enabled   bool     `yaml:"enabled,omitempty" toml:"enabled,omitempty"`
	FolderIDs []string `yaml:"folder_ids,omitempty" toml:"folder_ids,omitempty"`
}

GDriveSourceConfig enables and scopes the Google Drive data source.

func (*GDriveSourceConfig) IsEnabled

func (c *GDriveSourceConfig) IsEnabled() bool

func (*GDriveSourceConfig) ScopeValues

func (c *GDriveSourceConfig) ScopeValues() []string

type GmailSourceConfig

type GmailSourceConfig struct {
	Enabled  bool     `yaml:"enabled,omitempty" toml:"enabled,omitempty"`
	LabelIDs []string `yaml:"label_ids,omitempty" toml:"label_ids,omitempty"`
}

GmailSourceConfig enables and scopes the Gmail data source.

func (*GmailSourceConfig) IsEnabled

func (c *GmailSourceConfig) IsEnabled() bool

func (*GmailSourceConfig) ScopeValues

func (c *GmailSourceConfig) ScopeValues() []string

type JiraSourceConfig

type JiraSourceConfig struct {
	Enabled     bool     `yaml:"enabled,omitempty" toml:"enabled,omitempty"`
	ProjectKeys []string `yaml:"project_keys,omitempty" toml:"project_keys,omitempty"`
	MCPServer   string   `yaml:"mcp_server,omitempty" toml:"mcp_server,omitempty"`
}

JiraSourceConfig enables and scopes the Jira data source. Requires a corresponding MCP server named "jira" (or as specified by MCPServer).

func (*JiraSourceConfig) IsEnabled

func (c *JiraSourceConfig) IsEnabled() bool

func (*JiraSourceConfig) ScopeValues

func (c *JiraSourceConfig) ScopeValues() []string

type LinearSourceConfig

type LinearSourceConfig struct {
	Enabled bool     `yaml:"enabled,omitempty" toml:"enabled,omitempty"`
	TeamIDs []string `yaml:"team_ids,omitempty" toml:"team_ids,omitempty"`
}

LinearSourceConfig enables and scopes the Linear data source.

func (*LinearSourceConfig) IsEnabled

func (c *LinearSourceConfig) IsEnabled() bool

func (*LinearSourceConfig) ScopeValues

func (c *LinearSourceConfig) ScopeValues() []string

type ListItemsSince

type ListItemsSince interface {
	DataSource
	// ListItemsSince returns items in scope that were updated after the given time.
	ListItemsSince(ctx context.Context, scope Scope, since time.Time) ([]NormalizedItem, error)
}

ListItemsSince optionally supports incremental sync. Connectors that implement it can return only items updated after the given time. The sync pipeline can use this to avoid re-processing unchanged items.

type NormalizedItem

type NormalizedItem struct {
	// ID is stable and unique across all sources (e.g. "gdrive:fileId", "slack:channelId:ts").
	ID string
	// Source is the same as DataSource.Name() (e.g. "gdrive", "slack").
	Source string
	// SourceRef identifies the origin for source material lookup (Type + RefID).
	// When set, sync stores source_type and source_ref_id in vector metadata so
	// search results can cite and verify the original (e.g. open Gmail message, Drive file).
	SourceRef *SourceRef
	// UpdatedAt is used for incremental sync and ordering.
	UpdatedAt time.Time
	// Content is the text to embed (title + body, snippet, or full text).
	Content string
	// Metadata holds optional keys (title, author, type, product, category, etc.)
	// that can be used for SearchWithFilter when AllowedMetadataKeys permits.
	Metadata map[string]string
}

NormalizedItem is the common shape produced by every data source connector. It has a stable ID (used for Upsert), source name, content to embed, and metadata for filtering. The sync pipeline maps this to vector.BatchItem and stores source_type + source_ref_id in metadata so retrieval can show origin.

func (*NormalizedItem) SourceRefID

func (item *NormalizedItem) SourceRefID() string

SourceRefID returns the ref ID for source material lookup. Uses item.SourceRef.RefID when set; otherwise derives from item.ID (e.g. "gmail:msg123" -> "msg123").

type Scope

type Scope struct {
	// Items maps source name → scope values (e.g. "gdrive" → folder IDs,
	// "slack" → channel IDs, "jira" → project keys).
	Items map[string][]string
}

Scope is the source-specific scope passed to ListItems. It uses a generic map keyed by source name so adding new sources requires no struct changes. Use NewScope to create and Get to read.

func NewScope

func NewScope(source string, values []string) Scope

NewScope creates a Scope for a single source.

func (Scope) Get

func (s Scope) Get(source string) []string

Get returns the scope values for the given source name. Returns nil when the source has no scope configured.

type ServiceNowSourceConfig

type ServiceNowSourceConfig struct {
	Enabled    bool     `yaml:"enabled,omitempty" toml:"enabled,omitempty"`
	TableNames []string `yaml:"table_names,omitempty" toml:"table_names,omitempty"`
	MCPServer  string   `yaml:"mcp_server,omitempty" toml:"mcp_server,omitempty"`
}

ServiceNowSourceConfig enables and scopes the ServiceNow data source. Requires a corresponding MCP server named "servicenow" (or as specified by MCPServer).

func (*ServiceNowSourceConfig) IsEnabled

func (c *ServiceNowSourceConfig) IsEnabled() bool

func (*ServiceNowSourceConfig) ScopeValues

func (c *ServiceNowSourceConfig) ScopeValues() []string

type SlackSourceConfig

type SlackSourceConfig struct {
	Enabled    bool     `yaml:"enabled,omitempty" toml:"enabled,omitempty"`
	ChannelIDs []string `yaml:"channel_ids,omitempty" toml:"channel_ids,omitempty"`
}

SlackSourceConfig enables and scopes the Slack data source.

func (*SlackSourceConfig) IsEnabled

func (c *SlackSourceConfig) IsEnabled() bool

func (*SlackSourceConfig) ScopeValues

func (c *SlackSourceConfig) ScopeValues() []string

type SourceConfig

type SourceConfig interface {
	// IsEnabled returns true when this source is turned on and has meaningful scope.
	IsEnabled() bool
	// ScopeValues returns the scope items for this source (e.g. folder IDs,
	// channel IDs, project keys). An empty slice means no scope is configured.
	ScopeValues() []string
}

SourceConfig is the interface that all data source configuration types must implement. It enables generic scope building and source enumeration without per-source switch statements. Adding a new data source only requires a new config type that implements this interface.

type SourceRef

type SourceRef struct {
	// Type is the source type (e.g. "gmail", "gdrive", "slack", "linear", "github", "calendar").
	Type string
	// RefID is the ID in that system to look up the original (e.g. message ID, file ID, channel:ts).
	RefID string
}

SourceRef identifies the origin of an item for lookup and verification. When returning search results, consumers can use Type + RefID to open or fetch the original (e.g. Gmail message, Drive file) and avoid hallucination.

type SyncState

type SyncState map[string]string

SyncState holds the last successful sync time per source name. It is persisted as JSON so incremental sync can resume after restarts. Only one Genie instance should use a given working directory; no file locking is performed, so multiple instances sharing the same dir can overwrite or corrupt the state file.

func LoadSyncState

func LoadSyncState(dir string) (SyncState, error)

LoadSyncState reads the sync state from the given directory. The file is syncStateFilename inside dir. Returns nil map and nil error if the file does not exist (first run). Callers can treat missing file as "no previous sync" and perform a full ListItems.

func (SyncState) LastSync

func (s SyncState) LastSync(source string) time.Time

LastSync returns the last sync time for the given source, or zero time if not present or unparseable. State values are RFC3339 strings.

func (SyncState) SetLastSync

func (s SyncState) SetLastSync(source string, t time.Time)

SetLastSync records the last sync time for the given source (RFC3339). Modifies the map in place; call SaveSyncState to persist.

Directories

Path Synopsis
Code generated by counterfeiter.
Code generated by counterfeiter.
Package docparser provides a multi-backend document parser that converts files (PDF, DOCX, images, etc.) into []datasource.NormalizedItem for vectorization.
Package docparser provides a multi-backend document parser that converts files (PDF, DOCX, images, etc.) into []datasource.NormalizedItem for vectorization.
docparserfakes
Code generated by counterfeiter.
Code generated by counterfeiter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL