sources

package
v0.1.147 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 7, 2026 License: AGPL-3.0, AGPL-3.0-or-later Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultCacheTTL  = 30 * time.Second // DefaultCacheTTL is the default TTL for cached source data
	DefaultCacheSize = 10000            // DefaultCacheSize is the maximum number of cache entries

)
View Source
const (
	ModeDir  = syscall.S_IFDIR | 0755
	ModeFile = syscall.S_IFREG | 0644
	ModeLink = syscall.S_IFLNK | 0777
)

Common file modes

Variables

View Source
var ErrIsDir = fmt.Errorf("is a directory")

ErrIsDir is returned when path is a directory but file was expected

View Source
var ErrNotConnected = fmt.Errorf("integration not connected")

ErrNotConnected is returned when the integration is not connected

View Source
var ErrNotDir = fmt.Errorf("not a directory")

ErrNotDir is returned when path is not a directory

View Source
var ErrNotFound = fmt.Errorf("not found")

ErrNotFound is returned when a path doesn't exist

View Source
var ErrRateLimited = fmt.Errorf("rate limited")

ErrRateLimited is returned when a request is rate limited

View Source
var ErrSearchNotSupported = errors.New("search not supported")

ErrSearchNotSupported is returned when a provider doesn't support search

Functions

func CacheKey

func CacheKey(workspaceId uint, integration, path, operation string) string

CacheKey generates a cache key for a source request

func DefaultFilenameFormat

func DefaultFilenameFormat(integration string) string

DefaultFilenameFormat returns a sensible default filename format for an integration.

func GenerateErrorJSON

func GenerateErrorJSON(err error) []byte

GenerateErrorJSON creates a JSON error response

func GenerateSourceReadme

func GenerateSourceReadme(integration string, connected bool, scope string, workspaceId string) []byte

GenerateSourceReadme creates the README.md content for an integration

func NowUnix

func NowUnix() int64

NowUnix returns the current Unix timestamp

func SanitizeFilename

func SanitizeFilename(s string) string

SanitizeFilename makes a string safe for use as a filename. It removes emojis, non-ASCII characters, and filesystem-unsafe characters, keeping only alphanumeric, underscores, hyphens, and dots. This is the canonical sanitization function for all providers.

Types

type CacheEntry

type CacheEntry struct {
	Data      []byte
	Info      *FileInfo
	Entries   []DirEntry
	ExpiresAt time.Time
}

CacheEntry holds a cached response with expiration

func (*CacheEntry) IsExpired

func (e *CacheEntry) IsExpired() bool

IsExpired returns true if the entry has expired

type CacheStats

type CacheStats struct {
	Size    int
	MaxSize int
	TTL     time.Duration
	Expired int
}

CacheStats contains cache statistics

type CredentialValidator added in v0.1.32

type CredentialValidator interface {
	ValidateCredentials(ctx context.Context, creds *types.IntegrationCredentials) error
}

CredentialValidator is optionally implemented by providers that can validate credentials at connection creation time.

type DirEntry

type DirEntry struct {
	Name  string
	Mode  uint32
	IsDir bool
	Size  int64
	Mtime int64
}

DirEntry represents a directory entry

type EventInjector added in v0.1.142

type EventInjector interface {
	InjectItems(taskID string, items []QueryResult)
	DrainItems(taskID string) []QueryResult
}

EventInjector allows injecting synthetic query results for a task, bypassing the real provider. Used in test mode to simulate source events through the real pipeline (poller → refresh → baseline diff → wake).

type FileInfo

type FileInfo struct {
	Size   int64
	Mode   uint32
	Mtime  int64 // Unix timestamp
	IsDir  bool
	IsLink bool
}

FileInfo contains file/directory metadata

func DirInfo

func DirInfo() *FileInfo

DirInfo creates FileInfo for a directory

func FileInfoFromBytes

func FileInfoFromBytes(data []byte) *FileInfo

FileInfoFromBytes creates FileInfo for file content

type HookEnricher added in v0.1.125

type HookEnricher interface {
	EnrichHookContent(ctx context.Context, pctx *ProviderContext, data map[string]any) string
}

HookEnricher is optionally implemented by providers that can produce rich context when a hook fires for their integration. The returned string is embedded directly in the agent's prompt so it doesn't have to read the filesystem for context. Return "" if there's nothing to enrich.

type MemoryEventInjector added in v0.1.142

type MemoryEventInjector struct {
	// contains filtered or unexported fields
}

MemoryEventInjector is an in-memory EventInjector.

func NewMemoryEventInjector added in v0.1.142

func NewMemoryEventInjector() *MemoryEventInjector

func (*MemoryEventInjector) DrainItems added in v0.1.142

func (m *MemoryEventInjector) DrainItems(taskID string) []QueryResult

func (*MemoryEventInjector) InjectItems added in v0.1.142

func (m *MemoryEventInjector) InjectItems(taskID string, items []QueryResult)

type NativeBrowsable added in v0.1.32

type NativeBrowsable interface {
	IsNativeBrowsable() bool
}

NativeBrowsable is optionally implemented by providers that expose a native file tree alongside smart queries.

type Provider

type Provider interface {
	// Name returns the integration name (e.g., "github", "gmail")
	Name() string

	// Stat returns file/directory attributes for a path within the integration
	// Path is relative to the integration root (e.g., "views/repos.json" not "github/views/repos.json")
	Stat(ctx context.Context, pctx *ProviderContext, path string) (*FileInfo, error)

	// ReadDir lists directory contents
	ReadDir(ctx context.Context, pctx *ProviderContext, path string) ([]DirEntry, error)

	// Read reads file content
	// Returns the data and any error
	Read(ctx context.Context, pctx *ProviderContext, path string, offset, length int64) ([]byte, error)

	// Readlink reads a symbolic link target (optional, return empty string if not supported)
	Readlink(ctx context.Context, pctx *ProviderContext, path string) (string, error)

	// Search executes a provider-specific query and returns results
	// The query format is provider-specific (e.g., Gmail search syntax, Drive query syntax)
	// Returns ErrSearchNotSupported if the provider doesn't support search
	Search(ctx context.Context, pctx *ProviderContext, query string, limit int) ([]SearchResult, error)
}

Provider defines the interface for source integrations. Each integration (github, gmail, notion, etc.) implements this interface.

type ProviderContext

type ProviderContext struct {
	WorkspaceId uint
	MemberId    uint
	Credentials *types.IntegrationCredentials
}

ProviderContext contains workspace and credential info for provider operations

type QueryExecutor

type QueryExecutor interface {
	// ExecuteQuery runs a query and returns results with pagination metadata.
	// The spec contains the provider-specific query string, filename format,
	// and optional PageToken for fetching subsequent pages.
	// Returns a QueryResponse containing results and pagination info.
	ExecuteQuery(ctx context.Context, pctx *ProviderContext, spec QuerySpec) (*QueryResponse, error)

	// ReadResult fetches content for a specific result by its provider ID.
	// This is called when a user reads a file from a source view folder.
	ReadResult(ctx context.Context, pctx *ProviderContext, resultID string) ([]byte, error)

	// FormatFilename generates a filename from metadata using the format template.
	// Supported placeholders vary by provider but typically include:
	// - {id}: Unique identifier
	// - {date}: Date in YYYY-MM-DD format
	// - {subject}, {from}, {to}: Email-specific
	// - {title}, {name}: Document-specific
	// The result should be sanitized for filesystem use.
	FormatFilename(format string, metadata map[string]string) string
}

QueryExecutor is an optional interface implemented by providers that support filesystem query operations. This enables the source view filesystem feature where users create virtual folders/files that execute queries on access.

type QueryResponse

type QueryResponse struct {
	Results       []QueryResult // Results for this page
	NextPageToken string        // Token for fetching the next page (empty if no more pages)
	HasMore       bool          // True if there are more results beyond this page
}

QueryResponse wraps query results with pagination metadata. Providers return this to indicate whether more results are available.

type QueryResult

type QueryResult struct {
	ID       string            // Provider-specific ID (e.g., Gmail message ID, GDrive file ID)
	Filename string            // Generated filename using FilenameFormat
	Metadata map[string]string // Key-value metadata (date, subject, from, etc.)
	Size     int64             // Content size in bytes (0 if unknown)
	Mtime    int64             // Last modified time (Unix timestamp)
}

QueryResult represents a single result from executing a filesystem query. This is used by QueryExecutor to return search results with metadata.

type QuerySpec

type QuerySpec struct {
	Query          string            // Provider-specific query string (e.g., "is:unread", "mimeType='application/pdf'")
	Limit          int               // Page size - number of results per page (default 50)
	MaxResults     int               // Total cap on results across all pages (default 500)
	FilenameFormat string            // Format template for generating filenames (e.g., "{date}_{subject}_{id}.txt")
	PageToken      string            // Pagination token for fetching subsequent pages
	Metadata       map[string]string // Additional provider-specific metadata (e.g., content_type for GitHub)
}

QuerySpec contains the parsed query specification from a FilesystemQuery.

type RateLimitConfig

type RateLimitConfig struct {
	// RequestsPerSecond is the rate limit per integration per workspace
	RequestsPerSecond float64

	// BurstSize is the maximum burst size
	BurstSize int

	// CleanupInterval is how often to clean up stale limiters
	CleanupInterval time.Duration
}

RateLimitConfig configures the rate limiter

func DefaultRateLimitConfig

func DefaultRateLimitConfig() RateLimitConfig

DefaultRateLimitConfig returns sensible defaults

type RateLimiter

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter provides per-integration, per-workspace rate limiting to prevent hammering upstream APIs when multiple agents are active.

func NewRateLimiter

func NewRateLimiter(config RateLimitConfig) *RateLimiter

NewRateLimiter creates a new rate limiter

func (*RateLimiter) Allow

func (r *RateLimiter) Allow(workspaceId uint, integration string) bool

Allow checks if a request is allowed under the rate limit. Returns true if allowed, false if rate limited.

func (*RateLimiter) Wait

func (r *RateLimiter) Wait(ctx context.Context, workspaceId uint, integration string) error

Wait waits until a request is allowed or context is cancelled. Returns nil if allowed, context error if cancelled/timed out.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry manages registered source providers

func NewRegistry

func NewRegistry() *Registry

NewRegistry creates a new provider registry

func (*Registry) Get

func (r *Registry) Get(name string) Provider

Get returns a provider by name

func (*Registry) Has

func (r *Registry) Has(name string) bool

Has returns true if a provider is registered

func (*Registry) List

func (r *Registry) List() []string

List returns all registered provider names

func (*Registry) Register

func (r *Registry) Register(p Provider)

Register adds a provider to the registry

type Resource added in v0.1.50

type Resource struct {
	ID   string `json:"id"`   // Unique identifier (e.g., "owner/repo", "C01234")
	Name string `json:"name"` // Display name (e.g., "owner/repo", "#general")
}

Resource represents a selectable item from an integration (repo, channel, etc.).

type ResourceLister added in v0.1.50

type ResourceLister interface {
	// DefaultResourceType returns the primary resource type name (e.g., "repos", "channels").
	DefaultResourceType() string

	// ListResources returns available resources of the given type.
	// Supported types are provider-specific (e.g., "repos" for GitHub, "channels" for Slack).
	ListResources(ctx context.Context, pctx *ProviderContext, resourceType string) ([]Resource, error)
}

ResourceLister is optionally implemented by providers that can enumerate available resources for use in filter dropdowns (repos, channels, etc.).

type SearchResult

type SearchResult struct {
	Name    string // filename for the result (e.g., "2026-01-28_invoice_abc123.txt")
	Id      string // internal ID for fetching content (e.g., message ID, file ID)
	Mode    uint32 // file mode
	Size    int64  // file size if known, 0 otherwise
	Mtime   int64  // modification time (Unix timestamp)
	Preview string // optional preview/snippet of content
}

SearchResult represents a single search result from a provider

type SourceCache

type SourceCache struct {
	// contains filtered or unexported fields
}

SourceCache provides caching and request coalescing for source operations. It helps prevent upstream API hammering when multiple agents read the same data.

func NewSourceCache

func NewSourceCache(ttl time.Duration, maxSize int) *SourceCache

NewSourceCache creates a new source cache

func (*SourceCache) DoOnce

func (c *SourceCache) DoOnce(key string, fn func() (any, error)) (any, error)

DoOnce executes a function only once for concurrent requests with the same key. Other callers with the same key will wait and receive the same result.

func (*SourceCache) GetData

func (c *SourceCache) GetData(key string) ([]byte, bool)

GetData returns cached data if available and not expired

func (*SourceCache) GetEntries

func (c *SourceCache) GetEntries(key string) ([]DirEntry, bool)

GetEntries returns cached DirEntry list if available and not expired

func (*SourceCache) GetInfo

func (c *SourceCache) GetInfo(key string) (*FileInfo, bool)

GetInfo returns cached FileInfo if available and not expired

func (*SourceCache) Invalidate

func (c *SourceCache) Invalidate(key string)

Invalidate removes a cache entry

func (*SourceCache) InvalidatePrefix

func (c *SourceCache) InvalidatePrefix(prefix string)

InvalidatePrefix removes all cache entries matching a prefix

func (*SourceCache) SetData

func (c *SourceCache) SetData(key string, data []byte)

SetData caches data with the default TTL

func (*SourceCache) SetEntries

func (c *SourceCache) SetEntries(key string, entries []DirEntry)

SetEntries caches directory entries with the default TTL

func (*SourceCache) SetEntriesWithTTL

func (c *SourceCache) SetEntriesWithTTL(key string, entries []DirEntry, ttl time.Duration)

SetEntriesWithTTL caches directory entries with a custom TTL

func (*SourceCache) SetInfo

func (c *SourceCache) SetInfo(key string, info *FileInfo)

SetInfo caches FileInfo with the default TTL

func (*SourceCache) Stats

func (c *SourceCache) Stats() CacheStats

Stats returns cache statistics

type SourceStatus

type SourceStatus struct {
	Integration string
	DisplayName string
	Description string
	Connected   bool
	Scope       string // "shared" or "personal"
	Hint        string // CLI hint when disconnected
}

SourceStatus represents the status information for an integration

Directories

Path Synopsis
Package queries provides query inference and execution for integration sources.
Package queries provides query inference and execution for integration sources.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL