query

package
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 2, 2026 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Package query provides a reusable query layer for msgvault. It supports aggregation queries for TUI views and message retrieval for detail views. The package is designed with a backend-agnostic interface to support both SQLite (for flexibility) and Parquet (for performance) data sources.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func HasParquetData

func HasParquetData(analyticsDir string) bool

HasParquetData checks if Parquet files exist and are usable.

func MergeFilterIntoQuery

func MergeFilterIntoQuery(q *search.Query, filter MessageFilter) *search.Query

MergeFilterIntoQuery combines a MessageFilter context with a search.Query. Context filters are appended to existing query filters.

Note on semantics: Appending to FromAddrs/ToAddrs/Labels produces IN clauses with OR semantics within each dimension. This means if user searches "from:alice" and context has Sender=bob, the result matches alice OR bob. For strict AND intersection, we would need separate WHERE conditions per context filter. Current behavior: context widens the search within other constraints.

Types

type AccountInfo

type AccountInfo struct {
	ID          int64
	SourceType  string
	Identifier  string // email address
	DisplayName string
}

AccountInfo represents a source account.

type Address

type Address struct {
	Email string
	Name  string
}

Address represents an email address with optional display name.

type AggregateFunc

type AggregateFunc func(ctx context.Context, opts AggregateOptions) ([]AggregateRow, error)

AggregateFunc is a helper type for selecting aggregate methods.

type AggregateOptions

type AggregateOptions struct {
	// Account filter
	SourceID *int64 // nil means all accounts

	// Date range
	After  *time.Time
	Before *time.Time

	// Sorting
	SortField     SortField
	SortDirection SortDirection

	// Limit results (0 means default, typically 100)
	Limit int

	// Time-specific options
	TimeGranularity TimeGranularity

	// Filter options
	WithAttachmentsOnly bool

	// Text search filter (filters aggregates to only include messages matching search)
	SearchQuery string
}

AggregateOptions configures an aggregate query.

func DefaultAggregateOptions

func DefaultAggregateOptions() AggregateOptions

DefaultAggregateOptions returns sensible defaults.

type AggregateRow

type AggregateRow struct {
	Key             string // email, domain, label name, or time period
	Count           int64  // number of messages
	TotalSize       int64  // sum of size_estimate in bytes
	AttachmentSize  int64  // sum of attachment sizes in bytes
	AttachmentCount int64  // number of attachments
	TotalUnique     int64  // total unique keys (same for all rows, computed via COUNT(*) OVER())
}

AggregateRow represents a single row in an aggregate view. Used for Senders, Recipients, Domains, Labels, and Time views.

type AttachmentInfo

type AttachmentInfo struct {
	ID          int64
	Filename    string
	MimeType    string
	Size        int64
	ContentHash string
}

AttachmentInfo represents attachment metadata.

type DuckDBEngine

type DuckDBEngine struct {
	// contains filtered or unexported fields
}

DuckDBEngine implements Engine using DuckDB for fast Parquet queries. It uses a hybrid approach:

  • DuckDB with Parquet for fast aggregate queries
  • DuckDB's sqlite_scan for list queries (ListMessages, ListAccounts)
  • Direct SQLite for FTS search and message body retrieval (sqlite_scan can't use FTS5)

Deletion handling: The Python ETL excludes deleted messages (deleted_from_source_at IS NOT NULL) when building Parquet files. However, messages deleted AFTER the Parquet build will still appear in aggregates until the next `build-parquet --full-rebuild`. For the full deletion index solution, see beads issue msgvault-ozj.

func NewDuckDBEngine

func NewDuckDBEngine(analyticsDir string, sqlitePath string, sqliteDB *sql.DB) (*DuckDBEngine, error)

NewDuckDBEngine creates a new DuckDB-backed query engine. analyticsDir should point to ~/.msgvault/analytics/ sqlitePath should point to ~/.msgvault/msgvault.db sqliteDB is a direct SQLite connection for FTS search and body retrieval

The engine uses a hybrid approach:

  • DuckDB's sqlite_scan for list queries (ListMessages, ListAccounts, etc.)
  • Direct SQLite (sqliteDB) for FTS search and message body retrieval

If sqlitePath is empty, only aggregate queries and GetTotalStats will work. If sqliteDB is nil, Search will fall back to LIKE queries and body extraction from raw MIME may be slower.

func (*DuckDBEngine) AggregateByDomain

func (e *DuckDBEngine) AggregateByDomain(ctx context.Context, opts AggregateOptions) ([]AggregateRow, error)

AggregateByDomain groups messages by sender domain.

func (*DuckDBEngine) AggregateByLabel

func (e *DuckDBEngine) AggregateByLabel(ctx context.Context, opts AggregateOptions) ([]AggregateRow, error)

AggregateByLabel groups messages by label.

func (*DuckDBEngine) AggregateByRecipient

func (e *DuckDBEngine) AggregateByRecipient(ctx context.Context, opts AggregateOptions) ([]AggregateRow, error)

AggregateByRecipient groups messages by recipient email. Includes to, cc, and bcc recipients.

func (*DuckDBEngine) AggregateByRecipientName

func (e *DuckDBEngine) AggregateByRecipientName(ctx context.Context, opts AggregateOptions) ([]AggregateRow, error)

AggregateByRecipientName groups messages by recipient display name. Uses COALESCE(display_name, email_address) so recipients without a display name fall back to their email address.

func (*DuckDBEngine) AggregateBySender

func (e *DuckDBEngine) AggregateBySender(ctx context.Context, opts AggregateOptions) ([]AggregateRow, error)

AggregateBySender groups messages by sender email.

func (*DuckDBEngine) AggregateBySenderName

func (e *DuckDBEngine) AggregateBySenderName(ctx context.Context, opts AggregateOptions) ([]AggregateRow, error)

AggregateBySenderName groups messages by sender display name. Uses COALESCE(display_name, email_address) so senders without a display name fall back to their email address.

func (*DuckDBEngine) AggregateByTime

func (e *DuckDBEngine) AggregateByTime(ctx context.Context, opts AggregateOptions) ([]AggregateRow, error)

AggregateByTime groups messages by time period.

func (*DuckDBEngine) Close

func (e *DuckDBEngine) Close() error

Close releases DuckDB resources.

func (*DuckDBEngine) GetAttachment

func (e *DuckDBEngine) GetAttachment(ctx context.Context, id int64) (*AttachmentInfo, error)

GetAttachment retrieves attachment metadata by ID. Attachments live in SQLite, so delegate to the SQLite engine.

func (*DuckDBEngine) GetGmailIDsByFilter

func (e *DuckDBEngine) GetGmailIDsByFilter(ctx context.Context, filter MessageFilter) ([]string, error)

GetGmailIDsByFilter returns Gmail IDs matching a filter from Parquet files. Uses EXISTS subqueries for efficient semi-join filtering.

func (*DuckDBEngine) GetMessage

func (e *DuckDBEngine) GetMessage(ctx context.Context, id int64) (*MessageDetail, error)

GetMessage retrieves a full message from SQLite. Uses direct SQLite connection when available for better BLOB handling.

func (*DuckDBEngine) GetMessageBySourceID

func (e *DuckDBEngine) GetMessageBySourceID(ctx context.Context, sourceMessageID string) (*MessageDetail, error)

GetMessageBySourceID retrieves a message by source ID from SQLite. Uses direct SQLite connection when available for better BLOB handling.

func (*DuckDBEngine) GetTotalStats

func (e *DuckDBEngine) GetTotalStats(ctx context.Context, opts StatsOptions) (*TotalStats, error)

GetTotalStats returns overall statistics from Parquet.

func (*DuckDBEngine) ListAccounts

func (e *DuckDBEngine) ListAccounts(ctx context.Context) ([]AccountInfo, error)

ListAccounts returns accounts from SQLite via DuckDB's sqlite_scan.

func (*DuckDBEngine) ListMessages

func (e *DuckDBEngine) ListMessages(ctx context.Context, filter MessageFilter) ([]MessageSummary, error)

ListMessages retrieves messages from Parquet files for fast filtered queries. Joins normalized Parquet tables to reconstruct denormalized view.

func (*DuckDBEngine) Search

func (e *DuckDBEngine) Search(ctx context.Context, q *search.Query, limit, offset int) ([]MessageSummary, error)

Search performs a Gmail-style search query. Uses direct SQLite connection for FTS5 support when available, falls back to LIKE queries via sqlite_scan otherwise.

func (*DuckDBEngine) SearchFast

func (e *DuckDBEngine) SearchFast(ctx context.Context, q *search.Query, filter MessageFilter, limit, offset int) ([]MessageSummary, error)

SearchFast searches message metadata in Parquet files (no body text). This is much faster than FTS search for large archives. Searches: subject, sender email/name (case-insensitive).

func (*DuckDBEngine) SearchFastCount

func (e *DuckDBEngine) SearchFastCount(ctx context.Context, q *search.Query, filter MessageFilter) (int64, error)

SearchFastCount returns the total count of messages matching a search query. This is used for pagination UI to show "N of M results".

func (*DuckDBEngine) SubAggregate

func (e *DuckDBEngine) SubAggregate(ctx context.Context, filter MessageFilter, groupBy ViewType, opts AggregateOptions) ([]AggregateRow, error)

SubAggregate performs aggregation on a filtered subset of messages. This is used for sub-grouping after drill-down.

type Engine

type Engine interface {
	// Aggregate queries - return rows grouped by key
	AggregateBySender(ctx context.Context, opts AggregateOptions) ([]AggregateRow, error)
	AggregateBySenderName(ctx context.Context, opts AggregateOptions) ([]AggregateRow, error)
	AggregateByRecipient(ctx context.Context, opts AggregateOptions) ([]AggregateRow, error)
	AggregateByRecipientName(ctx context.Context, opts AggregateOptions) ([]AggregateRow, error)
	AggregateByDomain(ctx context.Context, opts AggregateOptions) ([]AggregateRow, error)
	AggregateByLabel(ctx context.Context, opts AggregateOptions) ([]AggregateRow, error)
	AggregateByTime(ctx context.Context, opts AggregateOptions) ([]AggregateRow, error)

	// SubAggregate performs aggregation on a filtered subset of messages.
	// This is used for sub-grouping after drill-down, e.g., drilling into
	// "Sender: foo@example.com" and then sub-grouping by Recipients or Labels.
	// The filter specifies the parent context (sender, domain, etc.) and
	// groupBy specifies what dimension to aggregate by.
	SubAggregate(ctx context.Context, filter MessageFilter, groupBy ViewType, opts AggregateOptions) ([]AggregateRow, error)

	// Message queries
	ListMessages(ctx context.Context, filter MessageFilter) ([]MessageSummary, error)
	GetMessage(ctx context.Context, id int64) (*MessageDetail, error)
	GetMessageBySourceID(ctx context.Context, sourceMessageID string) (*MessageDetail, error)
	GetAttachment(ctx context.Context, id int64) (*AttachmentInfo, error)

	// Search - full-text search using FTS5 (includes message body)
	Search(ctx context.Context, query *search.Query, limit, offset int) ([]MessageSummary, error)

	// SearchFast searches message metadata only (no body text).
	// This is much faster for large archives as it queries Parquet files directly.
	// Searches: subject, sender email/name (case-insensitive).
	// The filter parameter allows contextual search within a drill-down.
	SearchFast(ctx context.Context, query *search.Query, filter MessageFilter, limit, offset int) ([]MessageSummary, error)

	// SearchFastCount returns the total count of messages matching a search query.
	// This is used for pagination UI to show "N of M results".
	SearchFastCount(ctx context.Context, query *search.Query, filter MessageFilter) (int64, error)

	// GetGmailIDsByFilter returns Gmail message IDs (source_message_id) matching a filter.
	// This is useful for batch operations like staging messages for deletion.
	GetGmailIDsByFilter(ctx context.Context, filter MessageFilter) ([]string, error)

	// Account queries
	ListAccounts(ctx context.Context) ([]AccountInfo, error)

	// Stats
	GetTotalStats(ctx context.Context, opts StatsOptions) (*TotalStats, error)

	// Close releases any resources held by the engine.
	Close() error
}

Engine provides query operations for msgvault data. This interface can be implemented by different backends: - SQLiteEngine: Direct SQLite queries (flexible, moderate performance) - ParquetEngine: Arrow/Parquet queries (fast aggregates, read-only)

type MessageDetail

type MessageDetail struct {
	ID              int64
	SourceMessageID string
	ConversationID  int64
	Subject         string
	Snippet         string
	SentAt          time.Time
	ReceivedAt      *time.Time
	SizeEstimate    int64
	HasAttachments  bool

	// Participants
	From []Address
	To   []Address
	Cc   []Address
	Bcc  []Address

	// Content
	BodyText string
	BodyHTML string

	// Metadata
	Labels      []string
	Attachments []AttachmentInfo
}

MessageDetail represents a full message with body and attachments.

type MessageFilter

type MessageFilter struct {
	// Filter by aggregate key
	Sender        string // filter by sender email
	SenderName    string // filter by sender display name (COALESCE(display_name, email))
	Recipient     string // filter by recipient email
	RecipientName string // filter by recipient display name (COALESCE(display_name, email))
	Domain        string // filter by sender domain
	Label         string // filter by label name

	// Filter by conversation (thread)
	ConversationID *int64 // filter by conversation/thread ID

	// MatchEmpty* flags change how empty filter values are interpreted for each field.
	// When false (default): empty string means "no filter" (return all)
	// When true: empty string means "filter for NULL/empty values"
	// This enables drilldown into empty-bucket aggregates (e.g., messages with no sender).
	//
	// IMPORTANT: Only set ONE MatchEmpty* flag at a time. Setting multiple flags
	// creates an AND condition that may return no results (e.g., messages with
	// no sender AND no recipient AND no domain). The TUI sets exactly one flag
	// based on the current view type when drilling into an empty aggregate bucket.
	MatchEmptySender        bool
	MatchEmptySenderName    bool
	MatchEmptyRecipient     bool
	MatchEmptyRecipientName bool
	MatchEmptyDomain        bool
	MatchEmptyLabel         bool

	// Time range
	TimePeriod      string // e.g., "2024", "2024-01", "2024-01-15"
	TimeGranularity TimeGranularity

	// Account filter
	SourceID *int64 // nil means all accounts

	// Date range
	After  *time.Time
	Before *time.Time

	// Content filter
	WithAttachmentsOnly bool // only return messages with attachments

	// Pagination
	Limit  int
	Offset int

	// Sorting
	SortField     MessageSortField
	SortDirection SortDirection
}

MessageFilter specifies which messages to retrieve.

type MessageSortField

type MessageSortField int

MessageSortField represents the field to sort messages by.

const (
	MessageSortByDate MessageSortField = iota
	MessageSortBySize
	MessageSortBySubject
)

type MessageSummary

type MessageSummary struct {
	ID              int64
	SourceMessageID string
	ConversationID  int64
	Subject         string
	Snippet         string
	FromEmail       string
	FromName        string
	SentAt          time.Time
	SizeEstimate    int64
	HasAttachments  bool
	AttachmentCount int
	Labels          []string
	DeletedAt       *time.Time // When message was deleted from server (nil if not deleted)
}

MessageSummary represents a message in list views. Contains enough information for display without fetching the full body.

type ParquetSyncState

type ParquetSyncState struct {
	LastMessageID int64     `json:"last_message_id"`
	LastSyncAt    time.Time `json:"last_sync_at,omitempty"`
}

ParquetSyncState represents the sync state from _last_sync.json.

type SQLiteEngine

type SQLiteEngine struct {
	// contains filtered or unexported fields
}

SQLiteEngine implements Engine using direct SQLite queries.

func NewSQLiteEngine

func NewSQLiteEngine(db *sql.DB) *SQLiteEngine

NewSQLiteEngine creates a new SQLite-backed query engine.

func (*SQLiteEngine) AggregateByDomain

func (e *SQLiteEngine) AggregateByDomain(ctx context.Context, opts AggregateOptions) ([]AggregateRow, error)

AggregateByDomain groups messages by sender domain.

func (*SQLiteEngine) AggregateByLabel

func (e *SQLiteEngine) AggregateByLabel(ctx context.Context, opts AggregateOptions) ([]AggregateRow, error)

AggregateByLabel groups messages by label.

func (*SQLiteEngine) AggregateByRecipient

func (e *SQLiteEngine) AggregateByRecipient(ctx context.Context, opts AggregateOptions) ([]AggregateRow, error)

AggregateByRecipient groups messages by recipient email (to/cc/bcc).

func (*SQLiteEngine) AggregateByRecipientName

func (e *SQLiteEngine) AggregateByRecipientName(ctx context.Context, opts AggregateOptions) ([]AggregateRow, error)

AggregateByRecipientName groups messages by recipient display name. Uses COALESCE(display_name, email_address) so recipients without a display name fall back to their email address.

func (*SQLiteEngine) AggregateBySender

func (e *SQLiteEngine) AggregateBySender(ctx context.Context, opts AggregateOptions) ([]AggregateRow, error)

AggregateBySender groups messages by sender email.

func (*SQLiteEngine) AggregateBySenderName

func (e *SQLiteEngine) AggregateBySenderName(ctx context.Context, opts AggregateOptions) ([]AggregateRow, error)

AggregateBySenderName groups messages by sender display name. Uses COALESCE(display_name, email_address) so senders without a display name fall back to their email address.

func (*SQLiteEngine) AggregateByTime

func (e *SQLiteEngine) AggregateByTime(ctx context.Context, opts AggregateOptions) ([]AggregateRow, error)

AggregateByTime groups messages by time period.

func (*SQLiteEngine) Close

func (e *SQLiteEngine) Close() error

Close is a no-op for SQLiteEngine since it doesn't own the connection.

func (*SQLiteEngine) GetAggregateFunc

func (e *SQLiteEngine) GetAggregateFunc(viewType ViewType) AggregateFunc

GetAggregateFunc returns the appropriate aggregate function for a view type.

func (*SQLiteEngine) GetAttachment

func (e *SQLiteEngine) GetAttachment(ctx context.Context, id int64) (*AttachmentInfo, error)

GetAttachment retrieves attachment metadata by ID.

func (*SQLiteEngine) GetGmailIDsByFilter

func (e *SQLiteEngine) GetGmailIDsByFilter(ctx context.Context, filter MessageFilter) ([]string, error)

GetGmailIDsByFilter returns Gmail message IDs (source_message_id) matching a filter. This is more efficient than ListMessages when you only need the IDs.

func (*SQLiteEngine) GetMessage

func (e *SQLiteEngine) GetMessage(ctx context.Context, id int64) (*MessageDetail, error)

GetMessage retrieves a full message by internal ID.

func (*SQLiteEngine) GetMessageBySourceID

func (e *SQLiteEngine) GetMessageBySourceID(ctx context.Context, sourceMessageID string) (*MessageDetail, error)

GetMessageBySourceID retrieves a full message by source message ID (e.g., Gmail ID). Note: This searches across all accounts and returns the first match. For Gmail, message IDs are unique per account but theoretically could collide across accounts. In practice, Gmail IDs are random enough that collisions are astronomically unlikely. If you need to guarantee uniqueness, use the internal ID from GetMessage instead.

func (*SQLiteEngine) GetTotalStats

func (e *SQLiteEngine) GetTotalStats(ctx context.Context, opts StatsOptions) (*TotalStats, error)

GetTotalStats returns overall statistics.

func (*SQLiteEngine) ListAccounts

func (e *SQLiteEngine) ListAccounts(ctx context.Context) ([]AccountInfo, error)

ListAccounts returns all source accounts.

func (*SQLiteEngine) ListMessages

func (e *SQLiteEngine) ListMessages(ctx context.Context, filter MessageFilter) ([]MessageSummary, error)

ListMessages retrieves messages matching the filter.

func (*SQLiteEngine) Search

func (e *SQLiteEngine) Search(ctx context.Context, q *search.Query, limit, offset int) ([]MessageSummary, error)

func (*SQLiteEngine) SearchFast

func (e *SQLiteEngine) SearchFast(ctx context.Context, q *search.Query, filter MessageFilter, limit, offset int) ([]MessageSummary, error)

SearchFast searches message metadata only (no body text). For SQLite, this falls back to regular Search since FTS5 is fast enough and provides better results than metadata-only search. The filter parameter is applied to narrow the search scope.

func (*SQLiteEngine) SearchFastCount

func (e *SQLiteEngine) SearchFastCount(ctx context.Context, q *search.Query, filter MessageFilter) (int64, error)

SearchFastCount returns the total count of messages matching a search query. Uses the same query logic as Search to ensure consistent counts.

func (*SQLiteEngine) SubAggregate

func (e *SQLiteEngine) SubAggregate(ctx context.Context, filter MessageFilter, groupBy ViewType, opts AggregateOptions) ([]AggregateRow, error)

SubAggregate performs aggregation on a filtered subset of messages. This is used for sub-grouping after drill-down.

type SortDirection

type SortDirection int

SortDirection represents ascending or descending sort order.

const (
	SortDesc SortDirection = iota
	SortAsc
)

type SortField

type SortField int

SortField represents the field to sort aggregates by.

const (
	SortByCount SortField = iota
	SortBySize
	SortByAttachmentSize
	SortByName
)

func (SortField) String

func (f SortField) String() string

type StatsOptions

type StatsOptions struct {
	SourceID            *int64   // nil means all accounts
	WithAttachmentsOnly bool     // only count messages with attachments
	SearchQuery         string   // when set, stats reflect only messages matching this search
	GroupBy             ViewType // when set, search filters on this view's key columns instead of subject+sender
}

StatsOptions configures a stats query.

type TimeGranularity

type TimeGranularity int

TimeGranularity represents the time grouping level.

const (
	TimeYear TimeGranularity = iota
	TimeMonth
	TimeDay
)

func (TimeGranularity) String

func (g TimeGranularity) String() string

type TotalStats

type TotalStats struct {
	MessageCount    int64
	TotalSize       int64
	AttachmentCount int64
	AttachmentSize  int64
	LabelCount      int64
	AccountCount    int64
}

TotalStats provides overall database statistics.

type ViewType

type ViewType int

ViewType represents the type of aggregate view.

const (
	ViewSenders ViewType = iota
	ViewSenderNames
	ViewRecipients
	ViewRecipientNames
	ViewDomains
	ViewLabels
	ViewTime
)

func (ViewType) String

func (v ViewType) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL