monitor

package
v0.0.0-...-77b25ca Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2026 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrLogsClosed indicates the logs database is closed
	ErrLogsClosed = errors.New("logs database is closed")
	// ErrBatchIDRequired indicates batch_id is required for skill/sink logs
	ErrBatchIDRequired = errors.New("batch_id is required")
	// ErrInvalidStatus indicates an invalid status value
	ErrInvalidPullStatus  = errors.New("invalid pull status: must be SUCCESS, PARTIAL, or FAILED")
	ErrInvalidSkillStatus = errors.New("invalid skill status: must be SKIP, EXECUTED, or ERROR")
	ErrInvalidSinkStatus  = errors.New("invalid sink status: must be SUCCESS or ERROR")
)

Functions

This section is empty.

Types

type BatchLog

type BatchLog struct {
	BatchID     string     `json:"batch_id"`     // UUID with short timestamp (primary key)
	FetchedRows int        `json:"fetched_rows"` // Total CDC rows fetched
	TxCount     int        `json:"tx_count"`     // Number of transactions
	DLQCount    int        `json:"dlq_count"`    // Number of DLQ entries
	DurationMs  int64      `json:"duration_ms"`  // Total batch duration
	Status      PullStatus `json:"status"`       // SUCCESS/PARTIAL/FAILED
	CreatedAt   time.Time  `json:"created_at"`
}

BatchLog represents a batch log entry

type DB

type DB struct {
	// contains filtered or unexported fields
}

DB manages the observability logs database

func New

func New(ctx context.Context, dbPath string) (*DB, error)

New creates a new DB and runs migrations

func (*DB) Close

func (l *DB) Close() error

Close closes the database connection

func (*DB) Flush

func (l *DB) Flush() error

Flush ensures all buffered writes are committed

func (*DB) GetBatchLogStats

func (l *DB) GetBatchLogStats(since time.Time) (map[string]interface{}, error)

GetBatchLogStats returns statistics for recent pull logs

func (*DB) ListBatchLogs

func (l *DB) ListBatchLogs(limit int, since time.Time) ([]*BatchLog, error)

ListBatchLogs retrieves pull logs with optional limit ListBatchLogs retrieves batch logs with optional time filter

func (*DB) ListSinkLogs

func (l *DB) ListSinkLogs(sinkName string, database string, limit int) ([]*SinkLog, error)

ListSinkLogs retrieves sink logs with optional filters sinkName filters by specific sink, database filters by output table

func (*DB) ListSkillLogs

func (l *DB) ListSkillLogs(skillID string, pullID string, limit int) ([]*SkillLog, error)

ListSkillLogs retrieves skill logs with optional filters skillID filters by specific skill, pullID filters by specific batch

func (*DB) WriteBatchLog

func (l *DB) WriteBatchLog(log *BatchLog) error

WriteBatchLog writes a pull cycle log entry

func (*DB) WriteSinkLog

func (l *DB) WriteSinkLog(log *SinkLog) error

WriteSinkLog writes a sink write log entry

func (*DB) WriteSkillLog

func (l *DB) WriteSkillLog(log *SkillLog) error

WriteSkillLog writes a skill execution log entry

type PullStatus

type PullStatus string

PullStatus represents the status of a pull cycle

const (
	PullStatusSuccess PullStatus = "SUCCESS"
	PullStatusPartial PullStatus = "PARTIAL"
	PullStatusFailed  PullStatus = "FAILED"
)

type SinkLog

type SinkLog struct {
	ID           int64      `json:"id"`
	BatchID      string     `json:"batch_id"`     // Links to batch_logs
	SkillName    string     `json:"skill_name"`   // Skill that produced this sink
	SinkName     string     `json:"sink_name"`    // Sink config name
	Database     string     `json:"database"`     // Target database name
	OutputTable  string     `json:"output_table"` // Target table name
	Operation    string     `json:"operation"`    // INSERT/UPDATE/DELETE
	RowsWritten  int        `json:"rows_written"` // Rows written to sink
	Status       SinkStatus `json:"status"`       // SUCCESS/ERROR
	ErrorMessage string     `json:"error_message,omitempty"`
	DurationMs   int64      `json:"duration_ms"`
	CreatedAt    time.Time  `json:"created_at"`
}

SinkLog represents a sink write log entry

type SinkStatus

type SinkStatus string

SinkStatus represents the status of a sink write

const (
	SinkStatusSuccess SinkStatus = "SUCCESS"
	SinkStatusError   SinkStatus = "ERROR"
)

type SkillLog

type SkillLog struct {
	ID            int64       `json:"id"`
	BatchID       string      `json:"batch_id"`       // Links to batch_logs
	SkillID       string      `json:"skill_id"`       // Skill hash ID
	SkillName     string      `json:"skill_name"`     // Skill name
	Operation     string      `json:"operation"`      // INSERT/UPDATE/DELETE
	RowsProcessed int         `json:"rows_processed"` // Rows processed by this skill
	Status        SkillStatus `json:"status"`         // SKIP/EXECUTED/ERROR
	ErrorMessage  string      `json:"error_message,omitempty"`
	DurationMs    int64       `json:"duration_ms"`
	CreatedAt     time.Time   `json:"created_at"`
}

SkillLog represents a skill execution log entry

type SkillStatus

type SkillStatus string

SkillStatus represents the status of a skill execution

const (
	SkillStatusSkip     SkillStatus = "SKIP"
	SkillStatusExecuted SkillStatus = "EXECUTED"
	SkillStatusError    SkillStatus = "ERROR"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL