Documentation
¶
Overview ¶
Package state persists per-run state.json + events.jsonl under `<BaseDir>/workflows/<id>/runs/<run-id>/`. Atomic writes via the shared internal/agents/storage helpers. In-memory variant available for tests.
Index ¶
- type FileStore
- func (s *FileStore) AppendEvent(id, runID string, ev workflow.RunEvent) error
- func (s *FileStore) IndexAppend(id string, entry IndexEntry) error
- func (s *FileStore) IndexList(id string, page, pageSize int) ([]IndexEntry, bool, error)
- func (s *FileStore) ListEvents(id, runID string) ([]workflow.RunEvent, error)
- func (s *FileStore) ListRuns(id string) ([]string, error)
- func (s *FileStore) Load(id, runID string) (workflow.RunState, error)
- func (s *FileStore) Save(id, runID string, st workflow.RunState) error
- type IndexEntry
- type LokiPusher
- type Store
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type FileStore ¶
FileStore writes state.json + events.jsonl per run.
func (*FileStore) AppendEvent ¶
AppendEvent appends one line to events.jsonl atomically.
func (*FileStore) IndexAppend ¶
func (s *FileStore) IndexAppend(id string, entry IndexEntry) error
IndexAppend persists one summary row to the id's sharded index. Constant-time regardless of total run history (touches only the current shard).
func (*FileStore) IndexList ¶
IndexList returns one page of summaries, newest first. pageSize defaults to shardedlog.DefaultShardMax. hasMore=true when older pages exist.
func (*FileStore) ListEvents ¶
ListEvents reads + decodes the full events.jsonl. Empty file → nil.
type IndexEntry ¶
type IndexEntry struct {
ID string `json:"id"`
Status string `json:"status,omitempty"`
StartedAt time.Time `json:"at"`
EndedAt *time.Time `json:"end,omitempty"`
DurationMs int64 `json:"ms,omitempty"`
}
IndexEntry is the row shape stored in the per-id index. Kept lean so a 100-row shard stays well under 10KB.
type LokiPusher ¶
type LokiPusher struct {
// contains filtered or unexported fields
}
LokiPusher batches workflow RunEvents and pushes them to a Loki HTTP endpoint asynchronously. Events are queued into a channel; a background goroutine flushes every 5 seconds or whenever 100 entries accumulate.
Payload is Loki-compatible push body per https://grafana.com/docs/loki/latest/reference/loki-http-api/#push-log-entries-to-loki
Label scheme:
- wick_workflow = workflow id
- wick_run = run ID
- wick_event = event type (node_started, node_completed, …)
- any extra labels from ExtraLabels
Input/output JSON body is intentionally excluded from the pushed payload to avoid Loki size limits — only event metadata is sent. The full payload remains on disk in events.jsonl.
func NewLokiPusher ¶
func NewLokiPusher(lokiURL, extraLabelStr string) *LokiPusher
NewLokiPusher creates a pusher targeting lokiURL. extraLabelStr is an optional "key=value,key2=value2" string. Returns nil if lokiURL is empty.
func (*LokiPusher) Push ¶
func (p *LokiPusher) Push(id, runID string, ev workflow.RunEvent)
Push enqueues one event. Non-blocking — drops silently if queue full (Loki is a best-effort sink; disk events.jsonl is the source of truth).
func (*LokiPusher) Stop ¶
func (p *LokiPusher) Stop()
Stop drains pending entries and shuts down the background goroutine.
type Store ¶
type Store interface {
Save(id, runID string, st workflow.RunState) error
Load(id, runID string) (workflow.RunState, error)
AppendEvent(id, runID string, ev workflow.RunEvent) error
ListEvents(id, runID string) ([]workflow.RunEvent, error)
ListRuns(id string) ([]string, error)
IndexAppend(id string, entry IndexEntry) error
IndexList(id string, page, pageSize int) ([]IndexEntry, bool, error)
}
Store persists RunState + appends RunEvent for one workflow's runs/ folder.