state

package
v0.13.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2026 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Package state persists per-run state.json + events.jsonl under `<BaseDir>/workflows/<id>/runs/<run-id>/`. Atomic writes via the shared internal/agents/storage helpers. In-memory variant available for tests.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type FileStore

type FileStore struct {
	Layout config.Layout
}

FileStore writes state.json + events.jsonl per run.

func New

func New(layout config.Layout) *FileStore

New returns the on-disk implementation.

func (*FileStore) AppendEvent

func (s *FileStore) AppendEvent(id, runID string, ev workflow.RunEvent) error

AppendEvent appends one line to events.jsonl atomically.

func (*FileStore) IndexAppend

func (s *FileStore) IndexAppend(id string, entry IndexEntry) error

IndexAppend persists one summary row to the id's sharded index. Constant-time regardless of total run history (touches only the current shard).

func (*FileStore) IndexList

func (s *FileStore) IndexList(id string, page, pageSize int) ([]IndexEntry, bool, error)

IndexList returns one page of summaries, newest first. pageSize defaults to shardedlog.DefaultShardMax. hasMore=true when older pages exist.

func (*FileStore) ListEvents

func (s *FileStore) ListEvents(id, runID string) ([]workflow.RunEvent, error)

ListEvents reads + decodes the full events.jsonl. Empty file → nil.

func (*FileStore) ListRuns

func (s *FileStore) ListRuns(id string) ([]string, error)

ListRuns returns runs/<id> names sorted, newest first.

func (*FileStore) Load

func (s *FileStore) Load(id, runID string) (workflow.RunState, error)

Load reads state.json.

func (*FileStore) Save

func (s *FileStore) Save(id, runID string, st workflow.RunState) error

Save writes state.json atomically.

type IndexEntry

type IndexEntry struct {
	ID         string     `json:"id"`
	Status     string     `json:"status,omitempty"`
	StartedAt  time.Time  `json:"at"`
	EndedAt    *time.Time `json:"end,omitempty"`
	DurationMs int64      `json:"ms,omitempty"`
}

IndexEntry is the row shape stored in the per-id index. Kept lean so a 100-row shard stays well under 10KB.

type LokiPusher

type LokiPusher struct {
	// contains filtered or unexported fields
}

LokiPusher batches workflow RunEvents and pushes them to a Loki HTTP endpoint asynchronously. Events are queued into a channel; a background goroutine flushes every 5 seconds or whenever 100 entries accumulate.

Payload is Loki-compatible push body per https://grafana.com/docs/loki/latest/reference/loki-http-api/#push-log-entries-to-loki

Label scheme:

  • wick_workflow = workflow id
  • wick_run = run ID
  • wick_event = event type (node_started, node_completed, …)
  • any extra labels from ExtraLabels

Input/output JSON body is intentionally excluded from the pushed payload to avoid Loki size limits — only event metadata is sent. The full payload remains on disk in events.jsonl.

func NewLokiPusher

func NewLokiPusher(lokiURL, extraLabelStr string) *LokiPusher

NewLokiPusher creates a pusher targeting lokiURL. extraLabelStr is an optional "key=value,key2=value2" string. Returns nil if lokiURL is empty.

func (*LokiPusher) Push

func (p *LokiPusher) Push(id, runID string, ev workflow.RunEvent)

Push enqueues one event. Non-blocking — drops silently if queue full (Loki is a best-effort sink; disk events.jsonl is the source of truth).

func (*LokiPusher) Stop

func (p *LokiPusher) Stop()

Stop drains pending entries and shuts down the background goroutine.

type Store

type Store interface {
	Save(id, runID string, st workflow.RunState) error
	Load(id, runID string) (workflow.RunState, error)
	AppendEvent(id, runID string, ev workflow.RunEvent) error
	ListEvents(id, runID string) ([]workflow.RunEvent, error)
	ListRuns(id string) ([]string, error)
	IndexAppend(id string, entry IndexEntry) error
	IndexList(id string, page, pageSize int) ([]IndexEntry, bool, error)
}

Store persists RunState + appends RunEvent for one workflow's runs/ folder.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL