Documentation
¶
Index ¶
- Constants
- type AlertRecord
- type DBSizeInfo
- type DurationStats
- type FileMessage
- type FileMessageWithTasks
- type Phase
- type PhaseDB
- type SQLite
- func (s *SQLite) Add(t task.Task)
- func (s *SQLite) AddAlert(t task.Task, message string) error
- func (s *SQLite) AddFileMessage(sts stat.Stats, taskIDs []string, taskNames []string) error
- func (s *SQLite) CheckIncompleteTasks() int
- func (s *SQLite) Children(t task.Task) []Phase
- func (o *SQLite) Close() error
- func (s *SQLite) DatesByType(dataType string) ([]string, error)
- func (s *SQLite) Get(t task.Task) PhaseDB
- func (s *SQLite) GetAlertsAfterTime(afterTime time.Time) ([]AlertRecord, error)
- func (s *SQLite) GetAlertsByDate(date time.Time) ([]AlertRecord, error)
- func (s *SQLite) GetAllPhasesGrouped() map[string][]PhaseDB
- func (s *SQLite) GetDBSize() (*DBSizeInfo, error)
- func (s *SQLite) GetDatesWithAlerts() ([]string, error)
- func (s *SQLite) GetDatesWithData() ([]string, error)
- func (s *SQLite) GetDatesWithFiles() ([]string, error)
- func (s *SQLite) GetDatesWithTasks() ([]string, error)
- func (s *SQLite) GetFileMessages(limit int, offset int) ([]FileMessage, error)
- func (s *SQLite) GetFileMessagesByDate(date time.Time) ([]FileMessage, error)
- func (s *SQLite) GetFileMessagesWithTasks(limit int, offset int) ([]FileMessageWithTasks, error)
- func (s *SQLite) GetPhasesForWorkflow(filePath string) ([]PhaseDB, error)
- func (o *SQLite) GetSchemaVersion() int
- func (s *SQLite) GetTableStats() ([]TableStat, error)
- func (s *SQLite) GetTask(id string) TaskJob
- func (s *SQLite) GetTaskRecapByDate(date time.Time) (TaskStats, error)
- func (s *SQLite) GetTasksByDate(date time.Time, filter *TaskFilter) ([]TaskView, int, error)
- func (s *SQLite) GetWorkflowFiles() []string
- func (s *SQLite) IsDir() bool
- func (o *SQLite) Open(workflowPath string, fOpts *file.Options) error
- func (s *SQLite) RebuildDateIndex() error
- func (s *SQLite) Recap(day time.Time) TaskStats
- func (s *SQLite) Recycle(t time.Time) (string, error)
- func (s *SQLite) Refresh() (changedFiles []string, err error)
- func (s *SQLite) Search(taskType, job string) PhaseDB
- func (s *SQLite) SendFunc(p bus.Producer) func(string, *task.Task) error
- func (o *SQLite) Sync() error
- type Stats
- type SummaryLine
- type TableStat
- type TaskCounts
- type TaskFilter
- type TaskJob
- type TaskStats
- func (ts TaskStats) GetCountsWithHourly() (TaskCounts, [24]TaskCounts)
- func (ts TaskStats) GetCountsWithHourlyFiltered(filter *TaskFilter) (TaskCounts, [24]TaskCounts)
- func (ts TaskStats) JobsByType() map[string][]string
- func (ts TaskStats) TotalCounts() TaskCounts
- func (ts TaskStats) UniqueTypes() []string
- type TaskView
- type Workflow
Constants ¶
const (
// DefaultPageSize is the default number of items per page for paginated queries
DefaultPageSize = 100
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AlertRecord ¶
type AlertRecord struct {
ID int64 `json:"id"`
TaskID string `json:"task_id"`
TaskTime time.Time `json:"task_time"`
Type string `json:"type"`
Job string `json:"job"`
Msg string `json:"msg"`
CreatedAt time.Time `json:"created_at"`
}
AlertRecord represents an alert stored in the database
type DBSizeInfo ¶
type DBSizeInfo struct {
TotalSize string `json:"total_size"`
PageCount int64 `json:"page_count"`
PageSize int64 `json:"page_size"`
DBPath string `json:"db_path"`
}
DBSizeInfo contains database size information
type DurationStats ¶
type DurationStats struct {
Min time.Duration
Max time.Duration
// contains filtered or unexported fields
}
func (*DurationStats) Add ¶
func (s *DurationStats) Add(d time.Duration)
func (*DurationStats) Average ¶
func (s *DurationStats) Average() time.Duration
func (*DurationStats) String ¶
func (s *DurationStats) String() string
type FileMessage ¶
type FileMessage struct {
ID int `json:"id"`
Path string `json:"path"`
Size int64 `json:"size"`
LastModified time.Time `json:"last_modified"`
ReceivedAt time.Time `json:"received_at"`
TaskTime time.Time `json:"task_time"`
TaskIDs []string `json:"task_ids"`
TaskNames []string `json:"task_names"`
}
FileMessage represents a file message record
type FileMessageWithTasks ¶
type FileMessageWithTasks struct {
FileID int `json:"file_id"`
Path string `json:"path"`
TaskTime time.Time `json:"task_time"`
ReceivedAt time.Time `json:"received_at"`
TaskID string `json:"task_id"`
TaskType string `json:"task_type"`
TaskJob string `json:"task_job"`
TaskResult string `json:"task_result"`
TaskCreated time.Time `json:"task_created"`
TaskStarted time.Time `json:"task_started"`
TaskEnded time.Time `json:"task_ended"`
}
FileMessageWithTasks represents a file message with associated task details
type Phase ¶
type Phase struct {
Task string // Should use Topic() and Job() for access
Rule string
DependsOn string // Task that the previous workflow depends on
Retry int
Template string // template used to create the task
}
Phase represents a workflow phase (same as workflow.Phase)
func (Phase) ToWorkflowPhase
deprecated
type PhaseDB ¶
type SQLite ¶
type SQLite struct {
LocalPath string
BackupPath string
TaskTTL time.Duration `` /* 200-byte string literal not displayed */
Retention time.Duration // 90 days
// contains filtered or unexported fields
}
func (*SQLite) AddFileMessage ¶
AddFileMessage stores a file message in the database
func (*SQLite) CheckIncompleteTasks ¶
CheckIncompleteTasks checks for tasks that have not completed within the TTL period and adds them to the alerts table with deduplication. Returns count of incomplete tasks found. Uses a JOIN query to efficiently find incomplete tasks without existing alerts.
func (*SQLite) Children ¶
Children of the given task t, a child phase is one that dependsOn another task Empty slice will be returned if no children are found. A task without a type or metadata containing the workflow info will result in an error
func (*SQLite) DatesByType ¶
DatesByType returns a list of dates (YYYY-MM-DD format) that have data for the specified type dataType can be "tasks", "alerts", or "files" This uses the date_index table for instant lookups
func (*SQLite) Get ¶
Get the Phase associated with the task looks for matching phases within a workflow defined in meta that matches the task Type and job.
func (*SQLite) GetAlertsAfterTime ¶
func (s *SQLite) GetAlertsAfterTime(afterTime time.Time) ([]AlertRecord, error)
GetAlertsAfterTime retrieves all alerts created after a specific time
func (*SQLite) GetAlertsByDate ¶
func (s *SQLite) GetAlertsByDate(date time.Time) ([]AlertRecord, error)
GetAlertsByDate retrieves all alerts for a specific date
func (*SQLite) GetAllPhasesGrouped ¶
GetAllPhasesGrouped returns all phases grouped by workflow file
func (*SQLite) GetDBSize ¶
func (s *SQLite) GetDBSize() (*DBSizeInfo, error)
GetDBSize returns database size information
func (*SQLite) GetDatesWithAlerts ¶
GetDatesWithAlerts returns a list of dates (YYYY-MM-DD format) that have alert records
func (*SQLite) GetDatesWithData ¶
GetDatesWithData returns a list of dates (YYYY-MM-DD format) that have any data for tasks, alerts, or files within the retention period
func (*SQLite) GetDatesWithFiles ¶
GetDatesWithFiles returns a list of dates (YYYY-MM-DD format) that have file message records
func (*SQLite) GetDatesWithTasks ¶
GetDatesWithTasks returns a list of dates (YYYY-MM-DD format) that have task records
func (*SQLite) GetFileMessages ¶
func (s *SQLite) GetFileMessages(limit int, offset int) ([]FileMessage, error)
GetFileMessages retrieves file messages with optional filtering
func (*SQLite) GetFileMessagesByDate ¶
func (s *SQLite) GetFileMessagesByDate(date time.Time) ([]FileMessage, error)
GetFileMessagesByDate retrieves file messages for a specific date
func (*SQLite) GetFileMessagesWithTasks ¶
func (s *SQLite) GetFileMessagesWithTasks(limit int, offset int) ([]FileMessageWithTasks, error)
GetFileMessagesWithTasks retrieves file messages with their associated task details
func (*SQLite) GetPhasesForWorkflow ¶
GetPhasesForWorkflow returns all phases for a specific workflow file
func (*SQLite) GetSchemaVersion ¶
GetSchemaVersion returns the current schema version from the database. Returns 0 if the schema_version table doesn't exist or is empty (new database).
func (*SQLite) GetTableStats ¶
GetTableStats returns statistics for all tables in the database
func (*SQLite) GetTaskRecapByDate ¶
GetTaskRecapByDate creates a recap of tasks for a specific date
func (*SQLite) GetTasksByDate ¶
GetTasksByDate retrieves tasks for a specific date with optional filtering and pagination
func (*SQLite) GetWorkflowFiles ¶
GetWorkflowFiles returns a map of all workflow files in the database
func (*SQLite) IsDir ¶
IsDir returns true if the original workflow path is a folder rather than a file
func (*SQLite) Open ¶
Open the sqlite DB. If localPath doesn't exist then check if BackupPath exists and copy it to localPath Also initializes workflow path and determines if it's a directory
func (*SQLite) RebuildDateIndex ¶
RebuildDateIndex scans all tables and rebuilds the date_index table This should be called once during migration or can be exposed as an admin endpoint
func (*SQLite) Recycle ¶
Recycle cleans up any records older than day in the DB tables: files, alerts and tasks.
func (*SQLite) Refresh ¶
Refresh checks the cache and reloads any files if the checksum has changed.
type Stats ¶
type Stats struct {
CompletedCount int
CompletedTimes []time.Time
ErrorCount int
ErrorTimes []time.Time
AlertCount int
AlertTimes []time.Time
WarnCount int
WarnTimes []time.Time
RunningCount int
RunningTimes []time.Time
ExecTimes *DurationStats
}
func (*Stats) MarshalJSON ¶
type SummaryLine ¶
type SummaryLine struct {
Key string `json:"key"` // "task.type:job"
Count int `json:"count"` // number of alerts
TimeRange string `json:"time_range"` // formatted time range
}
SummaryLine represents a grouped alert summary for dashboard display
func BuildCompactSummary ¶
func BuildCompactSummary(alerts []AlertRecord) []SummaryLine
BuildCompactSummary processes alerts in memory to create compact summary Groups by TaskType:Job and collects task times for proper date formatting
type TableStat ¶
type TableStat struct {
Name string `json:"name"`
RowCount int64 `json:"row_count"`
TableBytes int64 `json:"table_bytes"`
TableHuman string `json:"table_human"`
IndexBytes int64 `json:"index_bytes"`
IndexHuman string `json:"index_human"`
TotalBytes int64 `json:"total_bytes"`
TotalHuman string `json:"total_human"`
Percentage float64 `json:"percentage"`
}
TableStat contains information about a database table
type TaskCounts ¶
TaskCounts represents aggregate counts of tasks by result status
type TaskFilter ¶
type TaskFilter struct {
ID string // Filter by task ID (resets other filters)
Type string // Filter by task type
Job string // Filter by job name
Result string // Filter by result status (complete, error, alert, warn, or "running" for empty)
Page int // Page number (1-based, default: 1)
Limit int // Number of results per page (default: 100)
}
TaskFilter contains options for filtering and paginating task queries. Empty string fields are ignored in the query.
type TaskJob ¶
type TaskJob struct {
LastUpdate time.Time // time since the last event with id
Completed bool
Events []task.Task
// contains filtered or unexported fields
}
TaskJob describes info about completed tasks that are within the cache
type TaskStats ¶
TaskStats is a map of task keys (type:job) to their statistics
func (TaskStats) GetCountsWithHourly ¶
func (ts TaskStats) GetCountsWithHourly() (TaskCounts, [24]TaskCounts)
GetCountsWithHourly returns both total and hourly task counts in a single iteration The hourly array contains 24 TaskCounts where index represents the hour (0-23)
func (TaskStats) GetCountsWithHourlyFiltered ¶
func (ts TaskStats) GetCountsWithHourlyFiltered(filter *TaskFilter) (TaskCounts, [24]TaskCounts)
GetCountsWithHourlyFiltered returns total and hourly counts with optional filtering by type, job, and result The hourly array contains 24 TaskCounts where index represents the hour (0-23)
func (TaskStats) JobsByType ¶
JobsByType returns jobs organized by type
func (TaskStats) TotalCounts ¶
func (ts TaskStats) TotalCounts() TaskCounts
TotalCounts returns aggregate result counts across all tasks
func (TaskStats) UniqueTypes ¶
UniqueTypes returns a sorted list of unique task types
type TaskView ¶
type TaskView struct {
ID string `json:"id"`
Type string `json:"type"`
Job string `json:"job"`
Info string `json:"info"`
Result string `json:"result"`
Meta string `json:"meta"`
Msg string `json:"msg"`
TaskSeconds int `json:"task_seconds"`
TaskTime string `json:"task_time"`
QueueSeconds int `json:"queue_seconds"`
QueueTime string `json:"queue_time"`
Created string `json:"created"`
Started string `json:"started"`
Ended string `json:"ended"`
}
TaskView represents a task with calculated times from the tasks view