observability

package
v0.0.0-...-9dee9fb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2026 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type EventReport

type EventReport struct {
	ExecutionID string         `json:"execution_id"`
	EventType   string         `json:"event_type"`
	EventData   map[string]any `json:"event_data"`
	CreatedAt   string         `json:"created_at"`
}

EventReport represents an event to report to the admin.

type ExecutionReport

type ExecutionReport struct {
	ID           string    `json:"id"`
	WorkflowID   string    `json:"workflow_id"`
	TriggerType  string    `json:"trigger_type"`
	Status       string    `json:"status"`
	TriggeredBy  string    `json:"triggered_by,omitempty"`
	ErrorMessage string    `json:"error_message,omitempty"`
	StartedAt    time.Time `json:"started_at"`
	CompletedAt  time.Time `json:"completed_at,omitempty"`
	DurationMs   int64     `json:"duration_ms"`
}

ExecutionReport represents an execution record to report to the admin.

type ExecutionTracker

type ExecutionTracker struct {
	// contains filtered or unexported fields
}

ExecutionTracker wraps execution lifecycle to record executions, steps, and logs.

func NewExecutionTracker

func NewExecutionTracker(executions store.ExecutionStore, logs store.LogStore) *ExecutionTracker

NewExecutionTracker creates a new ExecutionTracker.

func (*ExecutionTracker) CancelExecution

func (t *ExecutionTracker) CancelExecution(ctx context.Context, executionID uuid.UUID) error

CancelExecution marks an execution as cancelled.

func (*ExecutionTracker) CompleteExecution

func (t *ExecutionTracker) CompleteExecution(ctx context.Context, executionID uuid.UUID, output json.RawMessage) error

CompleteExecution marks an execution as successfully completed.

func (*ExecutionTracker) FailExecution

func (t *ExecutionTracker) FailExecution(ctx context.Context, executionID uuid.UUID, execErr error) error

FailExecution marks an execution as failed.

func (*ExecutionTracker) LogWriter

func (t *ExecutionTracker) LogWriter(workflowID uuid.UUID, executionID uuid.UUID, level store.LogLevel) io.Writer

LogWriter returns an io.Writer that appends logs to the store at the given level.

func (*ExecutionTracker) RecordStep

func (t *ExecutionTracker) RecordStep(ctx context.Context, executionID uuid.UUID, step *store.ExecutionStep) error

RecordStep records a step within an execution.

func (*ExecutionTracker) StartExecution

func (t *ExecutionTracker) StartExecution(ctx context.Context, workflowID uuid.UUID, triggerType string, data json.RawMessage) (uuid.UUID, error)

StartExecution begins tracking a new workflow execution.

type IngestHandler

type IngestHandler struct {
	// contains filtered or unexported fields
}

IngestHandler handles incoming observability data from worker instances. It is registered on the admin server to receive batches from reporters.

func NewIngestHandler

func NewIngestHandler(store IngestStore, logger *slog.Logger) *IngestHandler

NewIngestHandler creates a new handler for admin ingest endpoints.

func (*IngestHandler) RegisterRoutes

func (h *IngestHandler) RegisterRoutes(mux *http.ServeMux)

RegisterRoutes registers the ingest API routes on the given mux.

type IngestStore

type IngestStore interface {
	IngestExecutions(ctx context.Context, instance string, items []ExecutionReport) error
	IngestLogs(ctx context.Context, instance string, items []LogReport) error
	IngestEvents(ctx context.Context, instance string, items []EventReport) error
	RegisterInstance(ctx context.Context, name string, registeredAt time.Time) error
	Heartbeat(ctx context.Context, name string, timestamp time.Time) error
}

IngestStore defines the storage interface for ingested observability data.

type LogReport

type LogReport struct {
	WorkflowID  string `json:"workflow_id"`
	ExecutionID string `json:"execution_id,omitempty"`
	Level       string `json:"level"`
	Message     string `json:"message"`
	ModuleName  string `json:"module_name,omitempty"`
	Fields      string `json:"fields,omitempty"`
	CreatedAt   string `json:"created_at"`
}

LogReport represents a log entry to report to the admin.

type Reporter

type Reporter struct {
	// contains filtered or unexported fields
}

Reporter buffers observability data and periodically flushes it to the admin server.

func NewReporter

func NewReporter(config ReporterConfig, logger *slog.Logger) *Reporter

NewReporter creates a new observability reporter.

func ReporterFromEnv

func ReporterFromEnv(logger *slog.Logger) *Reporter

ReporterFromEnv creates a Reporter from environment variables. Returns nil if WORKFLOW_ADMIN_URL is not set.

func (*Reporter) ReportEvent

func (r *Reporter) ReportEvent(event EventReport)

ReportEvent buffers an event for the next flush.

func (*Reporter) ReportExecution

func (r *Reporter) ReportExecution(exec ExecutionReport)

ReportExecution buffers an execution record for the next flush.

func (*Reporter) ReportLog

func (r *Reporter) ReportLog(log LogReport)

ReportLog buffers a log entry for the next flush.

func (*Reporter) Start

func (r *Reporter) Start(ctx context.Context)

Start begins the background flush and heartbeat loops.

func (*Reporter) Stop

func (r *Reporter) Stop()

Stop shuts down the reporter, performing a final flush.

type ReporterConfig

type ReporterConfig struct {
	// AdminURL is the base URL of the admin server (e.g., "http://admin-server:8081").
	AdminURL string `yaml:"admin_url" json:"admin_url"`
	// FlushInterval is how often buffered data is sent to the admin server.
	FlushInterval time.Duration `yaml:"flush_interval" json:"flush_interval"`
	// BatchSize is the maximum number of items per flush batch.
	BatchSize int `yaml:"batch_size" json:"batch_size"`
	// InstanceName identifies this worker instance.
	InstanceName string `yaml:"instance_name" json:"instance_name"`
	// HeartbeatInterval is how often to send heartbeats to the admin server.
	HeartbeatInterval time.Duration `yaml:"heartbeat_interval" json:"heartbeat_interval"`
}

ReporterConfig configures the built-in observability reporter.

func DefaultReporterConfig

func DefaultReporterConfig() ReporterConfig

DefaultReporterConfig returns a config with sensible defaults.

type V1IngestStore

type V1IngestStore struct {
	// contains filtered or unexported fields
}

V1IngestStore implements IngestStore by writing to the V1Store's SQLite database. It inserts ingested data directly into the execution_logs, workflow_executions, and execution_events tables.

func NewV1IngestStore

func NewV1IngestStore(db *sql.DB) *V1IngestStore

NewV1IngestStore creates a new ingest store backed by the given database.

func (*V1IngestStore) Heartbeat

func (s *V1IngestStore) Heartbeat(_ context.Context, name string, timestamp time.Time) error

Heartbeat updates the last_seen timestamp for a worker instance.

func (*V1IngestStore) IngestEvents

func (s *V1IngestStore) IngestEvents(_ context.Context, _ string, items []EventReport) error

IngestEvents writes events from a remote worker to the execution_events table.

func (*V1IngestStore) IngestExecutions

func (s *V1IngestStore) IngestExecutions(_ context.Context, instance string, items []ExecutionReport) error

IngestExecutions writes execution records from a remote worker.

func (*V1IngestStore) IngestLogs

func (s *V1IngestStore) IngestLogs(_ context.Context, _ string, items []LogReport) error

IngestLogs writes log entries from a remote worker.

func (*V1IngestStore) ListInstances

func (s *V1IngestStore) ListInstances() ([]WorkerInstance, error)

ListInstances returns all registered worker instances.

func (*V1IngestStore) RegisterInstance

func (s *V1IngestStore) RegisterInstance(_ context.Context, name string, registeredAt time.Time) error

RegisterInstance records a new worker instance.

type WorkerInstance

type WorkerInstance struct {
	Name         string `json:"name"`
	Status       string `json:"status"`
	LastSeen     string `json:"last_seen"`
	RegisteredAt string `json:"registered_at"`
}

WorkerInstance represents a registered remote worker.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL