Documentation
¶
Overview ¶
Package gala provides durable, typed eventing primitives intended to replace ad-hoc in-memory dispatch patterns with a River-native foundation it's a black tie affair for your events, ensuring they arrive in style and on time
Index ¶
- Constants
- Variables
- func HasFlag(ctx context.Context, flag ContextFlag) bool
- func RegisterTopic[T any](registry *Registry, registration Registration[T]) error
- func SanitizeTag(s string) string
- func WithFlag(ctx context.Context, flag ContextFlag) context.Context
- type Codec
- type Config
- type ContextCodec
- type ContextFlag
- type ContextKey
- type ContextManager
- type ContextSnapshot
- type Definition
- type DispatchFunc
- type DispatchMode
- type Dispatcher
- type EmitReceipt
- type Envelope
- type EventID
- type Gala
- func (g *Gala) Close() error
- func (g *Gala) ContextManager() *ContextManager
- func (g *Gala) DispatchEnvelope(ctx context.Context, envelope Envelope) error
- func (g *Gala) EmitEnvelope(ctx context.Context, envelope Envelope) error
- func (g *Gala) EmitWithHeaders(ctx context.Context, topic TopicName, payload any, headers Headers) EmitReceipt
- func (g *Gala) HasActiveJobForTopic(ctx context.Context, topic TopicName) (bool, error)
- func (g *Gala) HasActiveJobWithMetadata(ctx context.Context, metadataFragment string) (bool, error)
- func (g *Gala) Injector() do.Injector
- func (g *Gala) Registry() *Registry
- func (g *Gala) StartWorkers(ctx context.Context) error
- func (g *Gala) StopWorkers(ctx context.Context) error
- func (g *Gala) WaitIdle()
- type Handler
- type HandlerContext
- type Headers
- type JSONCodec
- type KeyCodec
- type ListenerError
- type ListenerID
- type Pool
- type PoolOption
- type Provider
- type Registration
- type Registry
- type RiverDispatchArgs
- type RiverDispatchWorker
- type RiverDispatcher
- type RiverInsertClient
- type Schedule
- type ScheduleOption
- type ScheduleState
- type Topic
- type TopicName
- type TypedContextCodec
- type WorkflowActionCompletedPayload
- type WorkflowActionStartedPayload
- type WorkflowAssignmentCompletedPayload
- type WorkflowAssignmentCreatedPayload
- type WorkflowInstanceCompletedPayload
- type WorkflowTimeoutExpiredPayload
- type WorkflowTriggeredPayload
Constants ¶
const ( // DefaultMinInterval is the shortest allowed scheduling interval DefaultMinInterval = 20 * time.Minute // DefaultMaxInterval is the longest allowed scheduling interval DefaultMaxInterval = 24 * time.Hour // DefaultBackoffFactor is the multiplier applied on idle or error ticks DefaultBackoffFactor = 2.0 // DefaultHighDriftThreshold is the delta above which the interval snaps to minimum DefaultHighDriftThreshold = 200 // FullFetchMinInterval is the minimum interval for operations that always fetch all records FullFetchMinInterval = time.Hour // FullHighDriftThreshold is the delta above which the interval snaps to minimum FullHighDriftThreshold = 1000 )
const DefaultQueueName = "events"
DefaultQueueName is the default queue used for gala durable dispatch jobs
const RiverDispatchJobKind = "gala_dispatch_v1"
RiverDispatchJobKind is the River job kind used for durable gala dispatch
Variables ¶
var ( // ErrGalaRequired is returned when a nil gala runtime is used ErrGalaRequired = errors.New("gala: gala is required") // ErrRegistryRequired is returned when a nil topic registry is used ErrRegistryRequired = errors.New("gala: registry is required") // ErrTopicNameRequired is returned when a topic name is empty ErrTopicNameRequired = errors.New("gala: topic name is required") // ErrTopicAlreadyRegistered is returned when a topic is registered more than once ErrTopicAlreadyRegistered = errors.New("gala: topic already registered") // ErrTopicNotRegistered is returned when topic metadata cannot be found ErrTopicNotRegistered = errors.New("gala: topic not registered") // ErrCodecRequired is returned when a registration is missing a codec ErrCodecRequired = errors.New("gala: codec is required") // ErrListenerNameRequired is returned when a listener name is empty ErrListenerNameRequired = errors.New("gala: listener name is required") // ErrListenerHandlerRequired is returned when a listener callback is missing ErrListenerHandlerRequired = errors.New("gala: listener handler is required") // ErrListenerTopicNotRegistered is returned when a listener is attached before topic registration ErrListenerTopicNotRegistered = errors.New("gala: listener topic not registered") // ErrPayloadTypeMismatch is returned when payload casting fails for a topic or listener ErrPayloadTypeMismatch = errors.New("gala: payload type mismatch") // ErrPayloadEncodeFailed is returned when payload serialization fails ErrPayloadEncodeFailed = errors.New("gala: payload encode failed") // ErrPayloadDecodeFailed is returned when payload deserialization fails ErrPayloadDecodeFailed = errors.New("gala: payload decode failed") // ErrEnvelopePayloadRequired is returned when an envelope has an empty payload ErrEnvelopePayloadRequired = errors.New("gala: envelope payload is required") // ErrDispatcherRequired is returned when emit is attempted without a dispatcher ErrDispatcherRequired = errors.New("gala: dispatcher is required") // ErrDispatchFailed is returned when dispatch fails ErrDispatchFailed = errors.New("gala: dispatch failed") // ErrContextCodecRequired is returned when context codec registration receives nil ErrContextCodecRequired = errors.New("gala: context codec is required") // ErrContextCodecKeyRequired is returned when a context codec key is empty ErrContextCodecKeyRequired = errors.New("gala: context codec key is required") // ErrContextCodecAlreadyRegistered is returned when a context codec key is duplicated ErrContextCodecAlreadyRegistered = errors.New("gala: context codec already registered") // ErrContextSnapshotCaptureFailed is returned when snapshot capture fails ErrContextSnapshotCaptureFailed = errors.New("gala: context snapshot capture failed") // ErrContextSnapshotRestoreFailed is returned when snapshot restore fails ErrContextSnapshotRestoreFailed = errors.New("gala: context snapshot restore failed") // ErrRiverJobClientRequired is returned when a river dispatcher is built without a job client ErrRiverJobClientRequired = errors.New("gala: river job client is required") // ErrRiverGalaProviderRequired is returned when a river worker is built without a gala provider ErrRiverGalaProviderRequired = errors.New("gala: river gala provider is required") // ErrRiverDispatchJobEnvelopeRequired is returned when a river dispatch job has no envelope payload ErrRiverDispatchJobEnvelopeRequired = errors.New("gala: river dispatch job envelope is required") // ErrRiverEnvelopeEncodeFailed is returned when encoding a river envelope payload fails ErrRiverEnvelopeEncodeFailed = errors.New("gala: river envelope encode failed") // ErrRiverEnvelopeDecodeFailed is returned when decoding a river envelope payload fails ErrRiverEnvelopeDecodeFailed = errors.New("gala: river envelope decode failed") // ErrRiverDispatchInsertFailed is returned when inserting a durable river dispatch job fails ErrRiverDispatchInsertFailed = errors.New("gala: river dispatch insert failed") // ErrRiverConnectionURIRequired is returned when river runtime setup is missing a connection URI ErrRiverConnectionURIRequired = errors.New("gala: river connection URI is required") // ErrRiverClientInitializationFailed is returned when building the river queue client fails ErrRiverClientInitializationFailed = errors.New("gala: river client initialization failed") // ErrRiverWorkerStartFailed is returned when starting gala river workers fails ErrRiverWorkerStartFailed = errors.New("gala: river worker start failed") // ErrRiverWorkerStopFailed is returned when stopping gala river workers fails ErrRiverWorkerStopFailed = errors.New("gala: river worker stop failed") // ErrRiverClientCloseFailed is returned when closing the gala river queue client fails ErrRiverClientCloseFailed = errors.New("gala: river client close failed") // ErrDispatchModeInvalid is returned when an unknown gala dispatch mode is configured. ErrDispatchModeInvalid = errors.New("gala: dispatch mode is invalid") // ErrListenerPanicked is returned when a listener panics during execution ErrListenerPanicked = errors.New("gala: listener panicked") )
var ( // WorkflowTriggeredEventTopic is the typed topic for workflow triggered events. WorkflowTriggeredEventTopic = Topic[WorkflowTriggeredPayload]{Name: TopicWorkflowTriggered} // WorkflowActionStartedEventTopic is the typed topic for action started events. WorkflowActionStartedEventTopic = Topic[WorkflowActionStartedPayload]{Name: TopicWorkflowActionStarted} // WorkflowActionCompletedEventTopic is the typed topic for action completed events. WorkflowActionCompletedEventTopic = Topic[WorkflowActionCompletedPayload]{Name: TopicWorkflowActionCompleted} // WorkflowAssignmentCreatedEventTopic is the typed topic for assignment created events. WorkflowAssignmentCreatedEventTopic = Topic[WorkflowAssignmentCreatedPayload]{Name: TopicWorkflowAssignmentCreated} // WorkflowAssignmentCompletedEventTopic is the typed topic for assignment completed events. WorkflowAssignmentCompletedEventTopic = Topic[WorkflowAssignmentCompletedPayload]{Name: TopicWorkflowAssignmentCompleted} // WorkflowInstanceCompletedEventTopic is the typed topic for instance completed events. WorkflowInstanceCompletedEventTopic = Topic[WorkflowInstanceCompletedPayload]{Name: TopicWorkflowInstanceCompleted} // WorkflowTimeoutExpiredEventTopic is the typed topic for timeout events. WorkflowTimeoutExpiredEventTopic = Topic[WorkflowTimeoutExpiredPayload]{Name: TopicWorkflowTimeoutExpired} )
Functions ¶
func HasFlag ¶
func HasFlag(ctx context.Context, flag ContextFlag) bool
HasFlag reports whether a typed context flag is set
func RegisterTopic ¶
func RegisterTopic[T any](registry *Registry, registration Registration[T]) error
RegisterTopic registers one typed topic in the registry
func SanitizeTag ¶ added in v1.18.0
SanitizeTag returns a River-compatible tag: non-word/hyphen chars replaced with `_`, leading and trailing hyphens replaced with `_`, truncated to 255 characters
Types ¶
type Codec ¶
type Codec[T any] interface { // Encode serializes the typed payload Encode(T) ([]byte, error) // Decode deserializes payload bytes into the typed payload Decode([]byte) (T, error) }
Codec encodes and decodes a topic payload type
type Config ¶
type Config struct {
// DispatchMode controls whether events are dispatched durably (River) or in-memory.
DispatchMode DispatchMode
// Enabled toggles Gala worker startup and dispatch support when true
Enabled bool
// ConnectionURI is the database connection URI used for the dedicated gala river client
ConnectionURI string
// QueueName is the gala queue used for durable dispatch jobs
QueueName string
// WorkerCount is the max worker concurrency for the gala queue
WorkerCount int
// QueueWorkers configures additional queue worker concurrency by queue name.
QueueWorkers map[string]int
// MaxRetries sets max attempts for gala dispatch jobs when greater than zero
MaxRetries int
// RunMigrations enables River schema migrations on startup (use for tests only)
RunMigrations bool
// FetchCooldown is the minimum time between job fetches per worker (default 100ms, min 1ms)
// Lower values increase throughput but also database load. River enforces 1ms minimum.
FetchCooldown time.Duration
// FetchPollInterval is the fallback polling interval when LISTEN/NOTIFY misses events (default 1s)
// This is only used when LISTEN/NOTIFY fails to deliver notifications.
FetchPollInterval time.Duration
}
Config configures cohesive Gala startup
type ContextCodec ¶
type ContextCodec interface {
// Key returns the stable snapshot key
Key() ContextKey
// Capture extracts and encodes context data
Capture(context.Context) (json.RawMessage, bool, error)
// Restore decodes and re-attaches context data
Restore(context.Context, json.RawMessage) (context.Context, error)
}
ContextCodec captures and restores one typed context value
type ContextFlag ¶
type ContextFlag string
ContextFlag identifies a boolean context flag
const ( // ContextFlagWorkflowBypass marks workflow bypass behavior ContextFlagWorkflowBypass ContextFlag = "workflow_bypass" // ContextFlagWorkflowAllowEventEmission allows workflow listener execution while bypass is set ContextFlagWorkflowAllowEventEmission ContextFlag = "workflow_allow_event_emission" )
type ContextKey ¶
type ContextKey = string
ContextKey identifies a restorable context value key using string alias for better readability and to avoid collisions with other context keys this has to be a string to be used as a JSON key for durability rather than a strict type + contextx
type ContextManager ¶
type ContextManager struct {
// contains filtered or unexported fields
}
ContextManager manages context codecs and snapshot round-trips
func NewContextManager ¶
func NewContextManager(codecs ...ContextCodec) (*ContextManager, error)
NewContextManager creates a context manager and registers any initial codecs
func (*ContextManager) Capture ¶
func (m *ContextManager) Capture(ctx context.Context) (ContextSnapshot, error)
Capture captures all registered context codec values and current context flags
func (*ContextManager) Register ¶
func (m *ContextManager) Register(codec ContextCodec) error
Register registers a context codec by key
func (*ContextManager) Restore ¶
func (m *ContextManager) Restore(ctx context.Context, snapshot ContextSnapshot) (context.Context, error)
Restore restores snapshot values into a new context
type ContextSnapshot ¶
type ContextSnapshot struct {
// Values contains codec-managed context values
Values map[ContextKey]json.RawMessage `json:"values,omitempty"`
// Flags contains boolean context flags
Flags map[ContextFlag]bool `json:"flags,omitempty"`
}
ContextSnapshot captures context data that can be restored after durable hops
type Definition ¶
type Definition[T any] struct { // Topic is the topic handled by this listener Topic Topic[T] // Name is the stable listener name Name string // Operations optionally scopes listener interest to specific mutation operations // Empty means the listener accepts all operations for the topic Operations []string // Handle is the callback invoked for this listener Handle Handler[T] }
Definition defines one listener binding
type DispatchFunc ¶ added in v1.11.0
DispatchFunc adapts a function to the Dispatcher interface.
type DispatchMode ¶ added in v1.11.0
type DispatchMode string
DispatchMode controls whether envelopes are dispatched durably or in-memory.
const ( // DispatchModeDurable persists envelopes in River before worker execution. DispatchModeDurable DispatchMode = "durable" // DispatchModeInMemory dispatches envelopes immediately in-process. DispatchModeInMemory DispatchMode = "in_memory" )
type Dispatcher ¶
type Dispatcher interface {
// Dispatch dispatches an envelope to the configured transport
Dispatch(context.Context, Envelope) error
}
Dispatcher dispatches envelopes to the configured transport
type EmitReceipt ¶
type EmitReceipt struct {
// EventID is the emitted event identifier
EventID EventID
// Accepted reports whether the event was accepted for processing
Accepted bool
// Err contains any terminal emit error
Err error
}
EmitReceipt captures synchronous dispatch results
type Envelope ¶
type Envelope struct {
// ID is the unique event identifier
ID EventID `json:"id"`
// Topic is the destination topic
Topic TopicName `json:"topic"`
// OccurredAt is the emit timestamp in UTC
OccurredAt time.Time `json:"occurred_at"`
// Headers holds operational metadata
Headers Headers `json:"headers"`
// Payload is encoded topic payload data
Payload json.RawMessage `json:"payload"`
// ContextSnapshot holds restorable context metadata
ContextSnapshot ContextSnapshot `json:"context_snapshot"`
}
Envelope is the durable event envelope
type EventID ¶
type EventID string
EventID is a stable identifier used for idempotency and traceability
type Gala ¶
type Gala struct {
// contains filtered or unexported fields
}
Gala provides cohesive event dispatch + worker lifecycle management no black tie required, but a riverboat and some confetti wouldn't hurt
func NewInMemory ¶ added in v1.11.0
NewInMemory creates a Gala runtime that dispatches envelopes asynchronously via an in-process pool. This is useful when listeners should not require durable River workers.
func (*Gala) ContextManager ¶
func (g *Gala) ContextManager() *ContextManager
ContextManager returns the Gala context manager
func (*Gala) DispatchEnvelope ¶
DispatchEnvelope dispatches one envelope to all listeners on the topic
func (*Gala) EmitEnvelope ¶
EmitEnvelope dispatches a pre-built envelope using its topic registration. When the envelope does not already carry a ContextSnapshot, one is captured from ctx so that durable dispatch can reconstruct auth and other values.
func (*Gala) EmitWithHeaders ¶
func (g *Gala) EmitWithHeaders(ctx context.Context, topic TopicName, payload any, headers Headers) EmitReceipt
EmitWithHeaders emits a payload with explicit headers
func (*Gala) HasActiveJobForTopic ¶ added in v1.21.2
HasActiveJobForTopic reports whether at least one River job for the given topic exists in an active state (available, scheduled, running, or retryable). Returns false without error when Gala is not in durable mode
func (*Gala) HasActiveJobWithMetadata ¶ added in v1.21.2
HasActiveJobWithMetadata reports whether at least one River job whose metadata JSONB contains the given fragment exists in an active state (available, scheduled, running, or retryable). Returns false without error when Gala is not in durable mode
func (*Gala) StartWorkers ¶
StartWorkers starts Gala workers
func (*Gala) StopWorkers ¶
StopWorkers stops Gala workers
type Handler ¶
type Handler[T any] func(HandlerContext, T) error
Handler processes a typed event payload
type HandlerContext ¶
type HandlerContext struct {
// Context is the restored event context used for listener execution
Context context.Context
// Envelope is the envelope being processed
Envelope Envelope
// Injector provides typed dependency lookup via samber/do
Injector do.Injector
}
HandlerContext provides event context and dependency resolution scope for listeners
type Headers ¶
type Headers struct {
// IdempotencyKey identifies duplicate-safe processing scope
IdempotencyKey string `json:"idempotency_key,omitempty"`
// Properties stores additional metadata for UI visibility
Properties map[string]string `json:"properties,omitempty"`
// Tags are low-cardinality labels forwarded to the transport layer (e.g. River job tags)
Tags []string `json:"tags,omitempty"`
// Listeners are the registered listener names for the topic, populated at dispatch time
Listeners []string `json:"listeners,omitempty"`
// Queue optionally overrides the River queue used for dispatch
Queue string `json:"queue,omitempty"`
// MaxAttempts optionally overrides River max attempts for this envelope
MaxAttempts int `json:"max_attempts,omitempty"`
// ScheduledAt defers execution until the specified time; nil means immediate
ScheduledAt *time.Time `json:"scheduled_at,omitempty"`
}
Headers defines operational metadata for an envelope
func NewHeaders ¶ added in v1.20.0
NewHeaders returns Headers with the given tags and the input marshaled as JSON in the "input" property
type JSONCodec ¶
type JSONCodec[T any] struct{}
JSONCodec is the default JSON implementation of Codec
type KeyCodec ¶ added in v1.11.13
type KeyCodec[T any] struct { // contains filtered or unexported fields }
KeyCodec connects a contextx.Key to a stable JSON snapshot identifier, making any JSON-serializable context value durable across gala event hops.
Declare keys at package scope and register a KeyCodec at application startup — gala holds no hardcoded knowledge of domain types.
Example:
// in iam/auth
var CallerKey = contextx.NewKey[*Caller]()
// at application startup
mgr.Register(gala.NewKeyCodec("caller", auth.CallerKey))
func NewKeyCodec ¶ added in v1.11.13
func NewKeyCodec[T any](id ContextKey, key contextx.Key[T]) KeyCodec[T]
NewKeyCodec creates a KeyCodec that captures and restores values from key using id as the stable JSON snapshot identifier.
id must be non-empty and unique across all registered codecs. T must be JSON-serializable; types containing channels, functions, or unsafe pointers will fail at capture time.
func (KeyCodec[T]) Capture ¶ added in v1.11.13
Capture extracts the key value from ctx and JSON-encodes it. Returns nil, false, nil when the key is not populated.
func (KeyCodec[T]) Key ¶ added in v1.11.13
func (c KeyCodec[T]) Key() ContextKey
Key returns the stable snapshot identifier used for this codec in JSON.
type ListenerError ¶
type ListenerError struct {
// ListenerName is the name of the listener that failed
ListenerName string
// Cause is the underlying error from the listener
Cause error
// Panicked indicates whether the listener panicked
Panicked bool
}
ListenerError captures a listener execution failure with context
func (ListenerError) Error ¶
func (e ListenerError) Error() string
Error returns an error message for listener execution failures
func (ListenerError) Unwrap ¶
func (e ListenerError) Unwrap() error
Unwrap returns the underlying cause for use with errors.Is and errors.As
type ListenerID ¶
type ListenerID string
ListenerID identifies a registered listener
func AttachListener ¶
func AttachListener[T any](registry *Registry, definition Definition[T]) (ListenerID, error)
AttachListener registers one typed listener in the registry
func RegisterListeners ¶
func RegisterListeners[T any](registry *Registry, definitions ...Definition[T]) ([]ListenerID, error)
RegisterListeners registers listeners and ensures their topic contracts are configured
type Pool ¶ added in v1.11.0
type Pool struct {
// contains filtered or unexported fields
}
Pool is a lightweight in-memory task pool exposed from gala
func NewPool ¶ added in v1.11.0
func NewPool(opts ...PoolOption) *Pool
NewPool creates an in-memory task pool
func (*Pool) Release ¶ added in v1.11.0
func (p *Pool) Release()
Release stops workers and waits for completion
func (*Pool) SubmitMultipleAndWait ¶ added in v1.11.0
SubmitMultipleAndWait schedules all tasks and waits for completion
type PoolOption ¶ added in v1.11.0
type PoolOption func(*Pool)
PoolOption configures a pool instance
func WithPoolMetricsRegisterer ¶ added in v1.11.0
func WithPoolMetricsRegisterer(reg prometheus.Registerer) PoolOption
WithPoolMetricsRegisterer stores the target registerer for future metrics wiring
func WithPoolName ¶ added in v1.11.0
func WithPoolName(name string) PoolOption
WithPoolName sets the pool name (reserved for metrics labeling)
func WithWorkers ¶ added in v1.11.0
func WithWorkers(n int) PoolOption
WithWorkers sets maximum worker concurrency
type Provider ¶
type Provider func() *Gala
Provider resolves the gala instance used by River workers
type Registration ¶
type Registration[T any] struct { // Topic defines the typed topic contract Topic Topic[T] // Codec serializes and deserializes payloads for the topic Codec Codec[T] }
Registration ties a typed topic to its codec
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry stores topic codecs, policies, and listeners
func NewRegistry ¶
func NewRegistry() *Registry
NewRegistry creates an empty topic/listener registry
func (*Registry) DecodePayload ¶
DecodePayload decodes payload bytes for a registered topic
func (*Registry) EncodePayload ¶
EncodePayload encodes a payload for a registered topic
type RiverDispatchArgs ¶
type RiverDispatchArgs struct {
// Envelope is the encoded gala envelope payload
Envelope []byte `json:"envelope"`
}
RiverDispatchArgs stores a JSON-encoded gala envelope for durable dispatch
func NewRiverDispatchArgs ¶
func NewRiverDispatchArgs(envelope Envelope) (RiverDispatchArgs, error)
NewRiverDispatchArgs builds River dispatch args from an envelope
func (RiverDispatchArgs) DecodeEnvelope ¶
func (a RiverDispatchArgs) DecodeEnvelope() (Envelope, error)
DecodeEnvelope decodes the gala envelope from dispatch args
type RiverDispatchWorker ¶
type RiverDispatchWorker struct {
river.WorkerDefaults[RiverDispatchArgs]
// contains filtered or unexported fields
}
RiverDispatchWorker processes durable gala dispatch jobs from River
func NewRiverDispatchWorker ¶
func NewRiverDispatchWorker(galaProvider Provider) *RiverDispatchWorker
NewRiverDispatchWorker creates a RiverDispatchWorker
func (*RiverDispatchWorker) Work ¶
func (w *RiverDispatchWorker) Work(ctx context.Context, job *river.Job[RiverDispatchArgs]) error
Work processes one River dispatch job and invokes Gala dispatch
type RiverDispatcher ¶
type RiverDispatcher struct {
// contains filtered or unexported fields
}
RiverDispatcher dispatches envelopes to River
func NewRiverDispatcher ¶
func NewRiverDispatcher(jobClient RiverInsertClient, defaultQueue string) (*RiverDispatcher, error)
NewRiverDispatcher creates a River-backed durable dispatcher
type RiverInsertClient ¶
type RiverInsertClient interface {
// Insert inserts a River job with optional insert options
Insert(context.Context, river.JobArgs, *river.InsertOpts) (*rivertype.JobInsertResult, error)
}
RiverInsertClient represents the minimal insert capability required for durable dispatch
type Schedule ¶ added in v1.15.0
type Schedule struct {
// MinInterval is the shortest allowed interval between runs
MinInterval time.Duration `json:"min_interval"`
// MaxInterval is the longest allowed interval between runs
MaxInterval time.Duration `json:"max_interval"`
// BackoffFactor is the multiplier applied when backing off (idle or error)
BackoffFactor float64 `json:"backoff_factor"`
// HighDriftThreshold is the delta count above which the interval resets to MinInterval
HighDriftThreshold int `json:"high_drift_threshold"`
}
Schedule defines the adaptive scheduling policy for recurring work
func NewFullFetchSchedule ¶ added in v1.19.1
func NewFullFetchSchedule(opts ...ScheduleOption) *Schedule
NewFullFetchSchedule creates a Schedule suited for operations that always fetch all records and cannot do incremental syncs, using FullFetchMinInterval as the minimum
func NewSchedule ¶ added in v1.15.0
func NewSchedule(opts ...ScheduleOption) Schedule
NewSchedule creates a Schedule with defaults and applies any provided options
func (Schedule) Next ¶ added in v1.15.0
func (s Schedule) Next(state ScheduleState, delta int, err error) ScheduleState
Next computes the next scheduling state from the current state and run outcome. A non-nil error signals a failed run; delta is the number of records that changed
type ScheduleOption ¶ added in v1.15.0
type ScheduleOption func(*Schedule)
ScheduleOption configures a Schedule
func WithBackoffFactor ¶ added in v1.15.0
func WithBackoffFactor(f float64) ScheduleOption
WithBackoffFactor sets the multiplier applied when backing off
func WithHighDriftThreshold ¶ added in v1.15.0
func WithHighDriftThreshold(n int) ScheduleOption
WithHighDriftThreshold sets the delta count above which the interval resets to minimum
func WithMaxInterval ¶ added in v1.15.0
func WithMaxInterval(d time.Duration) ScheduleOption
WithMaxInterval sets the longest allowed interval between runs
func WithMinInterval ¶ added in v1.15.0
func WithMinInterval(d time.Duration) ScheduleOption
WithMinInterval sets the shortest allowed interval between runs
type ScheduleState ¶ added in v1.15.0
type ScheduleState struct {
// Interval is the current scheduling interval
Interval time.Duration `json:"interval"`
// IdleStreak is the number of consecutive runs with zero delta
IdleStreak int `json:"idle_streak"`
// ErrorStreak is the number of consecutive runs that returned an error
ErrorStreak int `json:"error_streak"`
}
ScheduleState carries adaptive scheduling state across dispatch cycles
func (ScheduleState) NextScheduledAt ¶ added in v1.15.0
func (s ScheduleState) NextScheduledAt() time.Time
NextScheduledAt returns the wall-clock time for the next run based on the computed state
type TopicName ¶
type TopicName string
TopicName is the stable string identifier for a topic
const ( // TopicWorkflowTriggered is emitted when a workflow instance is created TopicWorkflowTriggered TopicName = "workflow.command.trigger" // TopicWorkflowActionStarted is emitted when a workflow action begins TopicWorkflowActionStarted TopicName = "workflow.command.advance" // TopicWorkflowActionCompleted is emitted when a workflow action finishes TopicWorkflowActionCompleted TopicName = "workflow.command.action_completed" // TopicWorkflowAssignmentCreated is emitted when an assignment is created TopicWorkflowAssignmentCreated TopicName = "workflow.command.assignment_created" // TopicWorkflowAssignmentCompleted is emitted when an assignment resolves TopicWorkflowAssignmentCompleted TopicName = "workflow.command.assignment_completed" // TopicWorkflowInstanceCompleted is emitted when an instance reaches a terminal state TopicWorkflowInstanceCompleted TopicName = "workflow.command.instance_completed" // TopicWorkflowTimeoutExpired is emitted when a workflow timeout expires TopicWorkflowTimeoutExpired TopicName = "workflow.command.timeout_expire" )
type TypedContextCodec ¶
type TypedContextCodec[T any] struct { // contains filtered or unexported fields }
TypedContextCodec captures/restores context values stored via contextx.With
func NewTypedContextCodec ¶
func NewTypedContextCodec[T any](key ContextKey) TypedContextCodec[T]
NewTypedContextCodec creates a typed context codec for a specific snapshot key
func (TypedContextCodec[T]) Capture ¶
func (c TypedContextCodec[T]) Capture(ctx context.Context) (json.RawMessage, bool, error)
Capture extracts a typed context value and JSON encodes it
func (TypedContextCodec[T]) Key ¶
func (c TypedContextCodec[T]) Key() ContextKey
Key returns the codec snapshot key
func (TypedContextCodec[T]) Restore ¶
func (c TypedContextCodec[T]) Restore(ctx context.Context, raw json.RawMessage) (context.Context, error)
Restore JSON decodes a typed context value and re-attaches it
type WorkflowActionCompletedPayload ¶ added in v1.11.0
type WorkflowActionCompletedPayload struct {
// InstanceID is the unique identifier for the workflow instance
InstanceID string `json:"instance_id"`
// ActionIndex is the index of the action in the workflow
ActionIndex int `json:"action_index"`
// ActionType is the type of action being completed
ActionType enums.WorkflowActionType `json:"action_type"`
// ObjectID is the identifier for the object associated with the workflow
ObjectID string `json:"object_id"`
// ObjectType is the type of the object associated with the workflow
ObjectType enums.WorkflowObjectType `json:"object_type"`
// Success indicates if the action completed successfully
Success bool `json:"success"`
// Skipped indicates if the action was skipped
Skipped bool `json:"skipped,omitempty"`
// ErrorMessage contains the error message if the action failed
ErrorMessage string `json:"error_message,omitempty"`
}
WorkflowActionCompletedPayload contains data for when a workflow action finishes
type WorkflowActionStartedPayload ¶ added in v1.11.0
type WorkflowActionStartedPayload struct {
// InstanceID is the unique identifier for the workflow instance
InstanceID string `json:"instance_id"`
// ActionIndex is the index of the action in the workflow
ActionIndex int `json:"action_index"`
// ActionType is the type of action being started
ActionType enums.WorkflowActionType `json:"action_type"`
// ObjectID is the identifier for the object associated with the workflow
ObjectID string `json:"object_id"`
// ObjectType is the type of the object associated with the workflow
ObjectType enums.WorkflowObjectType `json:"object_type"`
}
WorkflowActionStartedPayload contains data for when a workflow action begins
type WorkflowAssignmentCompletedPayload ¶ added in v1.11.0
type WorkflowAssignmentCompletedPayload struct {
// AssignmentID is the unique identifier for the assignment
AssignmentID string `json:"assignment_id"`
// InstanceID is the unique identifier for the workflow instance
InstanceID string `json:"instance_id"`
// Status is the status of the assignment
Status enums.WorkflowAssignmentStatus `json:"status"`
// CompletedBy is the identifier of the user who completed the assignment
CompletedBy string `json:"completed_by,omitempty"`
// ObjectID is the identifier for the object associated with the workflow
ObjectID string `json:"object_id"`
// ObjectType is the type of the object associated with the workflow
ObjectType enums.WorkflowObjectType `json:"object_type"`
}
WorkflowAssignmentCompletedPayload contains data for completed assignments
type WorkflowAssignmentCreatedPayload ¶ added in v1.11.0
type WorkflowAssignmentCreatedPayload struct {
// AssignmentID is the unique identifier for the assignment
AssignmentID string `json:"assignment_id"`
// InstanceID is the unique identifier for the workflow instance
InstanceID string `json:"instance_id"`
// TargetType is the type of the assignment target
TargetType enums.WorkflowTargetType `json:"target_type"`
// TargetIDs are the identifiers for the assignment targets
TargetIDs []string `json:"target_ids,omitempty"`
// ObjectID is the identifier for the object associated with the workflow
ObjectID string `json:"object_id"`
// ObjectType is the type of the object associated with the workflow
ObjectType enums.WorkflowObjectType `json:"object_type"`
}
WorkflowAssignmentCreatedPayload contains data for created assignments
type WorkflowInstanceCompletedPayload ¶ added in v1.11.0
type WorkflowInstanceCompletedPayload struct {
// InstanceID is the unique identifier for the workflow instance
InstanceID string `json:"instance_id"`
// State is the terminal state of the workflow instance
State enums.WorkflowInstanceState `json:"state"`
// ObjectID is the identifier for the object associated with the workflow
ObjectID string `json:"object_id"`
// ObjectType is the type of the object associated with the workflow
ObjectType enums.WorkflowObjectType `json:"object_type"`
}
WorkflowInstanceCompletedPayload contains data for completed instances
type WorkflowTimeoutExpiredPayload ¶ added in v1.11.0
type WorkflowTimeoutExpiredPayload struct {
// InstanceID is the unique identifier for the workflow instance
InstanceID string `json:"instance_id"`
// AssignmentID is the unique identifier for the assignment
AssignmentID string `json:"assignment_id"`
// ObjectID is the identifier for the object associated with the workflow
ObjectID string `json:"object_id"`
// ObjectType is the type of the object associated with the workflow
ObjectType enums.WorkflowObjectType `json:"object_type"`
}
WorkflowTimeoutExpiredPayload contains data for workflow timeout expiration
type WorkflowTriggeredPayload ¶ added in v1.11.0
type WorkflowTriggeredPayload struct {
// InstanceID is the unique identifier for the workflow instance
InstanceID string `json:"instance_id"`
// DefinitionID is the identifier for the workflow definition
DefinitionID string `json:"definition_id"`
// ObjectID is the identifier for the object associated with the workflow
ObjectID string `json:"object_id"`
// ObjectType is the type of the object associated with the workflow
ObjectType enums.WorkflowObjectType `json:"object_type"`
// TriggerEventType is the event type that triggered the workflow
TriggerEventType string `json:"trigger_event_type"`
// TriggerChangedFields are the fields that changed and triggered the workflow
TriggerChangedFields []string `json:"trigger_changed_fields,omitempty"`
}
WorkflowTriggeredPayload contains data for a workflow instance creation event