Documentation
¶
Index ¶
- type EventReport
- type ExecutionReport
- type ExecutionTracker
- func (t *ExecutionTracker) CancelExecution(ctx context.Context, executionID uuid.UUID) error
- func (t *ExecutionTracker) CompleteExecution(ctx context.Context, executionID uuid.UUID, output json.RawMessage) error
- func (t *ExecutionTracker) FailExecution(ctx context.Context, executionID uuid.UUID, execErr error) error
- func (t *ExecutionTracker) LogWriter(workflowID uuid.UUID, executionID uuid.UUID, level store.LogLevel) io.Writer
- func (t *ExecutionTracker) RecordStep(ctx context.Context, executionID uuid.UUID, step *store.ExecutionStep) error
- func (t *ExecutionTracker) StartExecution(ctx context.Context, workflowID uuid.UUID, triggerType string, ...) (uuid.UUID, error)
- type IngestHandler
- type IngestStore
- type LogReport
- type Reporter
- type ReporterConfig
- type V1IngestStore
- func (s *V1IngestStore) Heartbeat(_ context.Context, name string, timestamp time.Time) error
- func (s *V1IngestStore) IngestEvents(_ context.Context, _ string, items []EventReport) error
- func (s *V1IngestStore) IngestExecutions(_ context.Context, instance string, items []ExecutionReport) error
- func (s *V1IngestStore) IngestLogs(_ context.Context, _ string, items []LogReport) error
- func (s *V1IngestStore) ListInstances() ([]WorkerInstance, error)
- func (s *V1IngestStore) RegisterInstance(_ context.Context, name string, registeredAt time.Time) error
- type WorkerInstance
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type EventReport ¶
type EventReport struct {
ExecutionID string `json:"execution_id"`
EventType string `json:"event_type"`
EventData map[string]any `json:"event_data"`
CreatedAt string `json:"created_at"`
}
EventReport represents an event to report to the admin.
type ExecutionReport ¶
type ExecutionReport struct {
ID string `json:"id"`
WorkflowID string `json:"workflow_id"`
TriggerType string `json:"trigger_type"`
Status string `json:"status"`
TriggeredBy string `json:"triggered_by,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
StartedAt time.Time `json:"started_at"`
CompletedAt time.Time `json:"completed_at,omitempty"`
DurationMs int64 `json:"duration_ms"`
}
ExecutionReport represents an execution record to report to the admin.
type ExecutionTracker ¶
type ExecutionTracker struct {
// contains filtered or unexported fields
}
ExecutionTracker wraps execution lifecycle to record executions, steps, and logs.
func NewExecutionTracker ¶
func NewExecutionTracker(executions store.ExecutionStore, logs store.LogStore) *ExecutionTracker
NewExecutionTracker creates a new ExecutionTracker.
func (*ExecutionTracker) CancelExecution ¶
CancelExecution marks an execution as cancelled.
func (*ExecutionTracker) CompleteExecution ¶
func (t *ExecutionTracker) CompleteExecution(ctx context.Context, executionID uuid.UUID, output json.RawMessage) error
CompleteExecution marks an execution as successfully completed.
func (*ExecutionTracker) FailExecution ¶
func (t *ExecutionTracker) FailExecution(ctx context.Context, executionID uuid.UUID, execErr error) error
FailExecution marks an execution as failed.
func (*ExecutionTracker) LogWriter ¶
func (t *ExecutionTracker) LogWriter(workflowID uuid.UUID, executionID uuid.UUID, level store.LogLevel) io.Writer
LogWriter returns an io.Writer that appends logs to the store at the given level.
func (*ExecutionTracker) RecordStep ¶
func (t *ExecutionTracker) RecordStep(ctx context.Context, executionID uuid.UUID, step *store.ExecutionStep) error
RecordStep records a step within an execution.
func (*ExecutionTracker) StartExecution ¶
func (t *ExecutionTracker) StartExecution(ctx context.Context, workflowID uuid.UUID, triggerType string, data json.RawMessage) (uuid.UUID, error)
StartExecution begins tracking a new workflow execution.
type IngestHandler ¶
type IngestHandler struct {
// contains filtered or unexported fields
}
IngestHandler handles incoming observability data from worker instances. It is registered on the admin server to receive batches from reporters.
func NewIngestHandler ¶
func NewIngestHandler(store IngestStore, logger *slog.Logger) *IngestHandler
NewIngestHandler creates a new handler for admin ingest endpoints.
func (*IngestHandler) RegisterRoutes ¶
func (h *IngestHandler) RegisterRoutes(mux *http.ServeMux)
RegisterRoutes registers the ingest API routes on the given mux.
type IngestStore ¶
type IngestStore interface {
IngestExecutions(ctx context.Context, instance string, items []ExecutionReport) error
IngestLogs(ctx context.Context, instance string, items []LogReport) error
IngestEvents(ctx context.Context, instance string, items []EventReport) error
RegisterInstance(ctx context.Context, name string, registeredAt time.Time) error
Heartbeat(ctx context.Context, name string, timestamp time.Time) error
}
IngestStore defines the storage interface for ingested observability data.
type LogReport ¶
type LogReport struct {
WorkflowID string `json:"workflow_id"`
ExecutionID string `json:"execution_id,omitempty"`
Level string `json:"level"`
Message string `json:"message"`
ModuleName string `json:"module_name,omitempty"`
Fields string `json:"fields,omitempty"`
CreatedAt string `json:"created_at"`
}
LogReport represents a log entry to report to the admin.
type Reporter ¶
type Reporter struct {
// contains filtered or unexported fields
}
Reporter buffers observability data and periodically flushes it to the admin server.
func NewReporter ¶
func NewReporter(config ReporterConfig, logger *slog.Logger) *Reporter
NewReporter creates a new observability reporter.
func ReporterFromEnv ¶
ReporterFromEnv creates a Reporter from environment variables. Returns nil if WORKFLOW_ADMIN_URL is not set.
func (*Reporter) ReportEvent ¶
func (r *Reporter) ReportEvent(event EventReport)
ReportEvent buffers an event for the next flush.
func (*Reporter) ReportExecution ¶
func (r *Reporter) ReportExecution(exec ExecutionReport)
ReportExecution buffers an execution record for the next flush.
type ReporterConfig ¶
type ReporterConfig struct {
// AdminURL is the base URL of the admin server (e.g., "http://admin-server:8081").
AdminURL string `yaml:"admin_url" json:"admin_url"`
// FlushInterval is how often buffered data is sent to the admin server.
FlushInterval time.Duration `yaml:"flush_interval" json:"flush_interval"`
// BatchSize is the maximum number of items per flush batch.
BatchSize int `yaml:"batch_size" json:"batch_size"`
// InstanceName identifies this worker instance.
InstanceName string `yaml:"instance_name" json:"instance_name"`
// HeartbeatInterval is how often to send heartbeats to the admin server.
HeartbeatInterval time.Duration `yaml:"heartbeat_interval" json:"heartbeat_interval"`
}
ReporterConfig configures the built-in observability reporter.
func DefaultReporterConfig ¶
func DefaultReporterConfig() ReporterConfig
DefaultReporterConfig returns a config with sensible defaults.
type V1IngestStore ¶
type V1IngestStore struct {
// contains filtered or unexported fields
}
V1IngestStore implements IngestStore by writing to the V1Store's SQLite database. It inserts ingested data directly into the execution_logs, workflow_executions, and execution_events tables.
func NewV1IngestStore ¶
func NewV1IngestStore(db *sql.DB) *V1IngestStore
NewV1IngestStore creates a new ingest store backed by the given database.
func (*V1IngestStore) IngestEvents ¶
func (s *V1IngestStore) IngestEvents(_ context.Context, _ string, items []EventReport) error
IngestEvents writes events from a remote worker to the execution_events table.
func (*V1IngestStore) IngestExecutions ¶
func (s *V1IngestStore) IngestExecutions(_ context.Context, instance string, items []ExecutionReport) error
IngestExecutions writes execution records from a remote worker.
func (*V1IngestStore) IngestLogs ¶
IngestLogs writes log entries from a remote worker.
func (*V1IngestStore) ListInstances ¶
func (s *V1IngestStore) ListInstances() ([]WorkerInstance, error)
ListInstances returns all registered worker instances.
func (*V1IngestStore) RegisterInstance ¶
func (s *V1IngestStore) RegisterInstance(_ context.Context, name string, registeredAt time.Time) error
RegisterInstance records a new worker instance.