Documentation
¶
Index ¶
- Variables
- type BatchLog
- type DB
- func (l *DB) Close() error
- func (l *DB) Flush() error
- func (l *DB) GetBatchLogStats(since time.Time) (map[string]interface{}, error)
- func (l *DB) ListBatchLogs(limit int, since time.Time) ([]*BatchLog, error)
- func (l *DB) ListSinkLogs(sinkName string, database string, limit int) ([]*SinkLog, error)
- func (l *DB) ListSkillLogs(skillID string, pullID string, limit int) ([]*SkillLog, error)
- func (l *DB) WriteBatchLog(log *BatchLog) error
- func (l *DB) WriteSinkLog(log *SinkLog) error
- func (l *DB) WriteSkillLog(log *SkillLog) error
- type PullStatus
- type SinkLog
- type SinkStatus
- type SkillLog
- type SkillStatus
Constants ¶
This section is empty.
Variables ¶
var ( // ErrLogsClosed indicates the logs database is closed ErrLogsClosed = errors.New("logs database is closed") // ErrBatchIDRequired indicates batch_id is required for skill/sink logs ErrBatchIDRequired = errors.New("batch_id is required") // ErrInvalidStatus indicates an invalid status value ErrInvalidPullStatus = errors.New("invalid pull status: must be SUCCESS, PARTIAL, or FAILED") ErrInvalidSkillStatus = errors.New("invalid skill status: must be SKIP, EXECUTED, or ERROR") ErrInvalidSinkStatus = errors.New("invalid sink status: must be SUCCESS or ERROR") )
Functions ¶
This section is empty.
Types ¶
type BatchLog ¶
type BatchLog struct {
BatchID string `json:"batch_id"` // UUID with short timestamp (primary key)
FetchedRows int `json:"fetched_rows"` // Total CDC rows fetched
TxCount int `json:"tx_count"` // Number of transactions
DLQCount int `json:"dlq_count"` // Number of DLQ entries
DurationMs int64 `json:"duration_ms"` // Total batch duration
Status PullStatus `json:"status"` // SUCCESS/PARTIAL/FAILED
CreatedAt time.Time `json:"created_at"`
}
BatchLog represents a batch log entry
type DB ¶
type DB struct {
// contains filtered or unexported fields
}
DB manages the observability logs database
func (*DB) GetBatchLogStats ¶
GetBatchLogStats returns statistics for recent pull logs
func (*DB) ListBatchLogs ¶
ListBatchLogs retrieves pull logs with optional limit ListBatchLogs retrieves batch logs with optional time filter
func (*DB) ListSinkLogs ¶
ListSinkLogs retrieves sink logs with optional filters sinkName filters by specific sink, database filters by output table
func (*DB) ListSkillLogs ¶
ListSkillLogs retrieves skill logs with optional filters skillID filters by specific skill, pullID filters by specific batch
func (*DB) WriteBatchLog ¶
WriteBatchLog writes a pull cycle log entry
func (*DB) WriteSinkLog ¶
WriteSinkLog writes a sink write log entry
func (*DB) WriteSkillLog ¶
WriteSkillLog writes a skill execution log entry
type PullStatus ¶
type PullStatus string
PullStatus represents the status of a pull cycle
const ( PullStatusSuccess PullStatus = "SUCCESS" PullStatusPartial PullStatus = "PARTIAL" PullStatusFailed PullStatus = "FAILED" )
type SinkLog ¶
type SinkLog struct {
ID int64 `json:"id"`
BatchID string `json:"batch_id"` // Links to batch_logs
SkillName string `json:"skill_name"` // Skill that produced this sink
SinkName string `json:"sink_name"` // Sink config name
Database string `json:"database"` // Target database name
OutputTable string `json:"output_table"` // Target table name
Operation string `json:"operation"` // INSERT/UPDATE/DELETE
RowsWritten int `json:"rows_written"` // Rows written to sink
Status SinkStatus `json:"status"` // SUCCESS/ERROR
ErrorMessage string `json:"error_message,omitempty"`
DurationMs int64 `json:"duration_ms"`
CreatedAt time.Time `json:"created_at"`
}
SinkLog represents a sink write log entry
type SinkStatus ¶
type SinkStatus string
SinkStatus represents the status of a sink write
const ( SinkStatusSuccess SinkStatus = "SUCCESS" SinkStatusError SinkStatus = "ERROR" )
type SkillLog ¶
type SkillLog struct {
ID int64 `json:"id"`
BatchID string `json:"batch_id"` // Links to batch_logs
SkillID string `json:"skill_id"` // Skill hash ID
SkillName string `json:"skill_name"` // Skill name
Operation string `json:"operation"` // INSERT/UPDATE/DELETE
RowsProcessed int `json:"rows_processed"` // Rows processed by this skill
Status SkillStatus `json:"status"` // SKIP/EXECUTED/ERROR
ErrorMessage string `json:"error_message,omitempty"`
DurationMs int64 `json:"duration_ms"`
CreatedAt time.Time `json:"created_at"`
}
SkillLog represents a skill execution log entry
type SkillStatus ¶
type SkillStatus string
SkillStatus represents the status of a skill execution
const ( SkillStatusSkip SkillStatus = "SKIP" SkillStatusExecuted SkillStatus = "EXECUTED" SkillStatusError SkillStatus = "ERROR" )