sqlite

package
v0.0.0-...-8845da6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 4, 2026 License: MIT Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultPageSize is the default number of items per page for paginated queries
	DefaultPageSize = 100
)

Variables

This section is empty.

Functions

This section is empty.

Types

type AlertRecord

type AlertRecord struct {
	ID        int64     `json:"id"`
	TaskID    string    `json:"task_id"`
	TaskTime  time.Time `json:"task_time"`
	Type      string    `json:"type"`
	Job       string    `json:"job"`
	Msg       string    `json:"msg"`
	CreatedAt time.Time `json:"created_at"`
}

AlertRecord represents an alert stored in the database

type DBSizeInfo

type DBSizeInfo struct {
	TotalSize string `json:"total_size"`
	PageCount int64  `json:"page_count"`
	PageSize  int64  `json:"page_size"`
	DBPath    string `json:"db_path"`
}

DBSizeInfo contains database size information

type DurationStats

type DurationStats struct {
	Min time.Duration
	Max time.Duration
	// contains filtered or unexported fields
}

func (*DurationStats) Add

func (s *DurationStats) Add(d time.Duration)

func (*DurationStats) Average

func (s *DurationStats) Average() time.Duration

func (*DurationStats) String

func (s *DurationStats) String() string

type FileMessage

type FileMessage struct {
	ID           int       `json:"id"`
	Path         string    `json:"path"`
	Size         int64     `json:"size"`
	LastModified time.Time `json:"last_modified"`
	ReceivedAt   time.Time `json:"received_at"`
	TaskTime     time.Time `json:"task_time"`
	TaskIDs      []string  `json:"task_ids"`
	TaskNames    []string  `json:"task_names"`
}

FileMessage represents a file message record

type FileMessageWithTasks

type FileMessageWithTasks struct {
	FileID      int       `json:"file_id"`
	Path        string    `json:"path"`
	TaskTime    time.Time `json:"task_time"`
	ReceivedAt  time.Time `json:"received_at"`
	TaskID      string    `json:"task_id"`
	TaskType    string    `json:"task_type"`
	TaskJob     string    `json:"task_job"`
	TaskResult  string    `json:"task_result"`
	TaskCreated time.Time `json:"task_created"`
	TaskStarted time.Time `json:"task_started"`
	TaskEnded   time.Time `json:"task_ended"`
}

FileMessageWithTasks represents a file message with associated task details

type Phase

type Phase struct {
	Task      string // Should use Topic() and Job() for access
	Rule      string
	DependsOn string // Task that the previous workflow depends on
	Retry     int
	Template  string // template used to create the task
}

Phase represents a workflow phase (same as workflow.Phase)

func (Phase) IsEmpty

func (ph Phase) IsEmpty() bool

func (Phase) Job

func (ph Phase) Job() string

Job portion of the Task

func (Phase) ToWorkflowPhase deprecated

func (ph Phase) ToWorkflowPhase() workflow.Phase

Deprecated: ToWorkflowPhase converts cache.Phase to workflow.Phase

func (Phase) Topic

func (ph Phase) Topic() string

Topic portion of the Task

func (Phase) Validate

func (ph Phase) Validate() string

Validate a phase and returns status message

type PhaseDB

type PhaseDB struct {
	Phase
	FilePath string // workflow file path
	Status   string // status of the phase (e.g. valid, invalid, warning)
}

func (PhaseDB) Job

func (p PhaseDB) Job() string

func (PhaseDB) Topic

func (p PhaseDB) Topic() string

type SQLite

type SQLite struct {
	LocalPath  string
	BackupPath string

	TaskTTL   time.Duration `` /* 200-byte string literal not displayed */
	Retention time.Duration // 90 days
	// contains filtered or unexported fields
}

func (*SQLite) Add

func (s *SQLite) Add(t task.Task)

func (*SQLite) AddAlert

func (s *SQLite) AddAlert(t task.Task, message string) error

AddAlert stores an alert record in the database

func (*SQLite) AddFileMessage

func (s *SQLite) AddFileMessage(sts stat.Stats, taskIDs []string, taskNames []string) error

AddFileMessage stores a file message in the database

func (*SQLite) CheckIncompleteTasks

func (s *SQLite) CheckIncompleteTasks() int

CheckIncompleteTasks checks for tasks that have not completed within the TTL period and adds them to the alerts table with deduplication. Returns count of incomplete tasks found. Uses a JOIN query to efficiently find incomplete tasks without existing alerts.

func (*SQLite) Children

func (s *SQLite) Children(t task.Task) []Phase

Children of the given task t, a child phase is one that dependsOn another task Empty slice will be returned if no children are found. A task without a type or metadata containing the workflow info will result in an error

func (*SQLite) Close

func (o *SQLite) Close() error

Close the DB connection and copy the current file to the backup location

func (*SQLite) DatesByType

func (s *SQLite) DatesByType(dataType string) ([]string, error)

DatesByType returns a list of dates (YYYY-MM-DD format) that have data for the specified type dataType can be "tasks", "alerts", or "files" This uses the date_index table for instant lookups

func (*SQLite) Get

func (s *SQLite) Get(t task.Task) PhaseDB

Get the Phase associated with the task looks for matching phases within a workflow defined in meta that matches the task Type and job.

func (*SQLite) GetAlertsAfterTime

func (s *SQLite) GetAlertsAfterTime(afterTime time.Time) ([]AlertRecord, error)

GetAlertsAfterTime retrieves all alerts created after a specific time

func (*SQLite) GetAlertsByDate

func (s *SQLite) GetAlertsByDate(date time.Time) ([]AlertRecord, error)

GetAlertsByDate retrieves all alerts for a specific date

func (*SQLite) GetAllPhasesGrouped

func (s *SQLite) GetAllPhasesGrouped() map[string][]PhaseDB

GetAllPhasesGrouped returns all phases grouped by workflow file

func (*SQLite) GetDBSize

func (s *SQLite) GetDBSize() (*DBSizeInfo, error)

GetDBSize returns database size information

func (*SQLite) GetDatesWithAlerts

func (s *SQLite) GetDatesWithAlerts() ([]string, error)

GetDatesWithAlerts returns a list of dates (YYYY-MM-DD format) that have alert records

func (*SQLite) GetDatesWithData

func (s *SQLite) GetDatesWithData() ([]string, error)

GetDatesWithData returns a list of dates (YYYY-MM-DD format) that have any data for tasks, alerts, or files within the retention period

func (*SQLite) GetDatesWithFiles

func (s *SQLite) GetDatesWithFiles() ([]string, error)

GetDatesWithFiles returns a list of dates (YYYY-MM-DD format) that have file message records

func (*SQLite) GetDatesWithTasks

func (s *SQLite) GetDatesWithTasks() ([]string, error)

GetDatesWithTasks returns a list of dates (YYYY-MM-DD format) that have task records

func (*SQLite) GetFileMessages

func (s *SQLite) GetFileMessages(limit int, offset int) ([]FileMessage, error)

GetFileMessages retrieves file messages with optional filtering

func (*SQLite) GetFileMessagesByDate

func (s *SQLite) GetFileMessagesByDate(date time.Time) ([]FileMessage, error)

GetFileMessagesByDate retrieves file messages for a specific date

func (*SQLite) GetFileMessagesWithTasks

func (s *SQLite) GetFileMessagesWithTasks(limit int, offset int) ([]FileMessageWithTasks, error)

GetFileMessagesWithTasks retrieves file messages with their associated task details

func (*SQLite) GetPhasesForWorkflow

func (s *SQLite) GetPhasesForWorkflow(filePath string) ([]PhaseDB, error)

GetPhasesForWorkflow returns all phases for a specific workflow file

func (*SQLite) GetSchemaVersion

func (o *SQLite) GetSchemaVersion() int

GetSchemaVersion returns the current schema version from the database. Returns 0 if the schema_version table doesn't exist or is empty (new database).

func (*SQLite) GetTableStats

func (s *SQLite) GetTableStats() ([]TableStat, error)

GetTableStats returns statistics for all tables in the database

func (*SQLite) GetTask

func (s *SQLite) GetTask(id string) TaskJob

func (*SQLite) GetTaskRecapByDate

func (s *SQLite) GetTaskRecapByDate(date time.Time) (TaskStats, error)

GetTaskRecapByDate creates a recap of tasks for a specific date

func (*SQLite) GetTasksByDate

func (s *SQLite) GetTasksByDate(date time.Time, filter *TaskFilter) ([]TaskView, int, error)

GetTasksByDate retrieves tasks for a specific date with optional filtering and pagination

func (*SQLite) GetWorkflowFiles

func (s *SQLite) GetWorkflowFiles() []string

GetWorkflowFiles returns a map of all workflow files in the database

func (*SQLite) IsDir

func (s *SQLite) IsDir() bool

IsDir returns true if the original workflow path is a folder rather than a file

func (*SQLite) Open

func (o *SQLite) Open(workflowPath string, fOpts *file.Options) error

Open the sqlite DB. If localPath doesn't exist then check if BackupPath exists and copy it to localPath Also initializes workflow path and determines if it's a directory

func (*SQLite) RebuildDateIndex

func (s *SQLite) RebuildDateIndex() error

RebuildDateIndex scans all tables and rebuilds the date_index table This should be called once during migration or can be exposed as an admin endpoint

func (*SQLite) Recap

func (s *SQLite) Recap(day time.Time) TaskStats

Recap returns a summary of task statistics for a given day

func (*SQLite) Recycle

func (s *SQLite) Recycle(t time.Time) (string, error)

Recycle cleans up any records older than day in the DB tables: files, alerts and tasks.

func (*SQLite) Refresh

func (s *SQLite) Refresh() (changedFiles []string, err error)

Refresh checks the cache and reloads any files if the checksum has changed.

func (*SQLite) Search

func (s *SQLite) Search(taskType, job string) PhaseDB

Search the all workflows within the cache and return the first matching phase with the specific task and job (optional)

func (*SQLite) SendFunc

func (s *SQLite) SendFunc(p bus.Producer) func(string, *task.Task) error

func (*SQLite) Sync

func (o *SQLite) Sync() error

Sync the local DB to the backup location

type Stats

type Stats struct {
	CompletedCount int
	CompletedTimes []time.Time

	ErrorCount int
	ErrorTimes []time.Time

	AlertCount int
	AlertTimes []time.Time

	WarnCount int
	WarnTimes []time.Time

	RunningCount int
	RunningTimes []time.Time

	ExecTimes *DurationStats
}

func (*Stats) Add

func (stats *Stats) Add(tsk task.Task)

func (*Stats) MarshalJSON

func (s *Stats) MarshalJSON() ([]byte, error)

func (Stats) String

func (s Stats) String() string

type SummaryLine

type SummaryLine struct {
	Key       string `json:"key"`        // "task.type:job"
	Count     int    `json:"count"`      // number of alerts
	TimeRange string `json:"time_range"` // formatted time range
}

SummaryLine represents a grouped alert summary for dashboard display

func BuildCompactSummary

func BuildCompactSummary(alerts []AlertRecord) []SummaryLine

BuildCompactSummary processes alerts in memory to create compact summary Groups by TaskType:Job and collects task times for proper date formatting

type TableStat

type TableStat struct {
	Name       string  `json:"name"`
	RowCount   int64   `json:"row_count"`
	TableBytes int64   `json:"table_bytes"`
	TableHuman string  `json:"table_human"`
	IndexBytes int64   `json:"index_bytes"`
	IndexHuman string  `json:"index_human"`
	TotalBytes int64   `json:"total_bytes"`
	TotalHuman string  `json:"total_human"`
	Percentage float64 `json:"percentage"`
}

TableStat contains information about a database table

type TaskCounts

type TaskCounts struct {
	Total     int
	Completed int
	Error     int
	Alert     int
	Warn      int
	Running   int
}

TaskCounts represents aggregate counts of tasks by result status

type TaskFilter

type TaskFilter struct {
	ID     string // Filter by task ID (resets other filters)
	Type   string // Filter by task type
	Job    string // Filter by job name
	Result string // Filter by result status (complete, error, alert, warn, or "running" for empty)
	Page   int    // Page number (1-based, default: 1)
	Limit  int    // Number of results per page (default: 100)
}

TaskFilter contains options for filtering and paginating task queries. Empty string fields are ignored in the query.

type TaskJob

type TaskJob struct {
	LastUpdate time.Time // time since the last event with id
	Completed  bool

	Events []task.Task
	// contains filtered or unexported fields
}

TaskJob describes info about completed tasks that are within the cache

type TaskStats

type TaskStats map[string]*Stats

TaskStats is a map of task keys (type:job) to their statistics

func (TaskStats) GetCountsWithHourly

func (ts TaskStats) GetCountsWithHourly() (TaskCounts, [24]TaskCounts)

GetCountsWithHourly returns both total and hourly task counts in a single iteration The hourly array contains 24 TaskCounts where index represents the hour (0-23)

func (TaskStats) GetCountsWithHourlyFiltered

func (ts TaskStats) GetCountsWithHourlyFiltered(filter *TaskFilter) (TaskCounts, [24]TaskCounts)

GetCountsWithHourlyFiltered returns total and hourly counts with optional filtering by type, job, and result The hourly array contains 24 TaskCounts where index represents the hour (0-23)

func (TaskStats) JobsByType

func (ts TaskStats) JobsByType() map[string][]string

JobsByType returns jobs organized by type

func (TaskStats) TotalCounts

func (ts TaskStats) TotalCounts() TaskCounts

TotalCounts returns aggregate result counts across all tasks

func (TaskStats) UniqueTypes

func (ts TaskStats) UniqueTypes() []string

UniqueTypes returns a sorted list of unique task types

type TaskView

type TaskView struct {
	ID           string `json:"id"`
	Type         string `json:"type"`
	Job          string `json:"job"`
	Info         string `json:"info"`
	Result       string `json:"result"`
	Meta         string `json:"meta"`
	Msg          string `json:"msg"`
	TaskSeconds  int    `json:"task_seconds"`
	TaskTime     string `json:"task_time"`
	QueueSeconds int    `json:"queue_seconds"`
	QueueTime    string `json:"queue_time"`
	Created      string `json:"created"`
	Started      string `json:"started"`
	Ended        string `json:"ended"`
}

TaskView represents a task with calculated times from the tasks view

type Workflow

type Workflow struct {
	Checksum string  // md5 hash for the file to check for changes
	Phases   []Phase `toml:"phase"`
}

Workflow represents a workflow file with phases

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL