soiree

package
v1.5.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 30, 2026 License: Apache-2.0 Imports: 20 Imported by: 0

README

Soiree

Soiree is an event coordination toolkit that wraps listener registration, execution order, retries, and persistence behind a small API. It is built around an EventBus that manages topics and listeners, uses a worker pool (default 10 workers), and surfaces rich context to handlers through EventContext.

The package underpins our ent mutation hooks, but it is intentionally framework-agnostic and can be embedded anywhere ordinary Go code needs structured asynchronous notifications.

Key Concepts

  • EventBus - runtime broker that owns topics, manages listener registration, and executes handlers on a worker pool.
  • Topics - logical channels identified by string names; wildcard patterns are supported, more specific patterns run first, and listeners run in registration order within each topic.
  • Listeners - functions that consume an *EventContext; they can inspect/mutate properties and access the originating client. Use typed topics when you need payload access.
  • Stores & Retries - optional pluggable persistence and retry policies (e.g., Redis queue with exponential backoff).
  • Typed Topics - helpers that wrap/unwrap strongly typed payloads so handlers work with domain objects instead of raw any.

Quick Start

package main

import (
	"fmt"

	"github.com/theopenlane/core/pkg/events/soiree"
)

func main() {
	bus := soiree.New()
	const topic = "user.created"

	_, err := bus.On(topic, func(ctx *soiree.EventContext) error {
		name, _ := ctx.PropertyString("name")
		fmt.Printf("welcome %s\n", name)
		return nil
	})
	if err != nil {
		panic(err)
	}

	event := soiree.NewBaseEvent(topic, map[string]string{"email": "user@example.com"})
	event.Properties().Set("name", "Ada Lovelace")

	for err := range bus.Emit(event.Topic(), event) {
		if err != nil {
			fmt.Printf("listener error: %v\n", err)
		}
	}
}

Need payload access? Bind a typed listener (see below) so your handler receives the payload directly.

Typed Topics

The topics_typed.go helpers let you bind strongly typed payloads while still benefiting from pooled execution:

// Simple: just specify the type and topic name - wrap/unwrap are provided automatically
userTopic := soiree.NewTypedTopic[UserEvent]("user.created")

binding := soiree.BindListener(userTopic, func(ctx *soiree.EventContext, payload UserEvent) error {
	ctx.Properties().Set("user_id", payload.ID)
	return nil
})

bus := soiree.New()
if _, err := bus.RegisterListeners(binding); err != nil {
	panic(err)
}

event, err := userTopic.Wrap(UserEvent{ID: "user-123"})
if err != nil {
	panic(err)
}
for err := range bus.Emit(userTopic.Name(), event) {
	if err != nil {
		fmt.Printf("listener error: %v\n", err)
	}
}

For custom wrap/unwrap logic, use functional options:

userTopic := soiree.NewTypedTopic[UserEvent](
	"user.created",
	soiree.WithUnwrap(func(evt soiree.Event) (UserEvent, error) {
		payload, ok := evt.Payload().(UserEvent)
		if !ok {
			return UserEvent{}, fmt.Errorf("unexpected payload %T", evt.Payload())
		}
		return payload, nil
	}),
)

EventContext Cheat Sheet

Method Purpose
Context() Returns the request context associated with the event.
Event() / Payload() / PayloadAs[T](ctx) Access the underlying event or payload.
Properties() / Property / PropertyString Inspect or mutate ad-hoc metadata shared across listeners.
ClientAs[T](ctx) Retrieve the client attached to the event (e.g., an ent client).

Use typed topics to access event payloads inside listeners.

Configuration Options

Use functional options with New to customise behavior:

Option Description
Workers Resize the worker pool used by emitted events (default is 10 workers).
ErrorHandler Override how handler errors are processed before they reach the caller.
Panics Centralize panic recovery logic.
IDGenerator Supply custom listener/event IDs (e.g., UUIDs).
EventStore / WithRedisStore Persist events and handler outcomes; queue stores enable replay.
Retry Configure retry attempts and backoff policy for failing listeners.
ErrChanBufferSize Resize the buffered error channel returned by Emit.
Client Attach an arbitrary client object that is then available to listeners.

Emit runs listeners on the worker pool; drain the returned channel to block inline.

Need a standalone pool (for non-EventBus concurrency)? Use soiree.NewPool(soiree.WithWorkers(n)). Pool metrics are registered on the Prometheus default registerer with a pool label; override with soiree.WithPoolMetricsRegisterer(reg) and soiree.WithPoolName(name).

For pool tuning guidance review the pond documentation: https://github.com/alitto/pond.

Lifecycle Flow

sequenceDiagram
    participant Producer
    participant EventBus
    participant Topic as Topic Registry
    participant ListenerA
    participant ListenerB

    Producer->>EventBus: Emit(topic, payload)
    EventBus->>Topic: EnsureTopic + enqueue
    Topic->>ListenerA: Execute (registration order)
    ListenerA-->>EventBus: Error / nil
    EventBus-->>Producer: Forward to error channel
    Topic->>ListenerB: Execute unless aborted
    ListenerB-->>EventBus: Ack
    EventBus-->>Producer: Close error channel

ent Integration

internal/ent/hooks wires Soiree into ent mutations via the Eventer. The hook:

  1. Determines whether any listeners are registered for the entity’s topic (including dynamic registrations on the pool).
  2. Marshals the mutation payload (operation, entity ID, and ent client) into a strongly typed event.
  3. Emits after commit (or immediately for non‑transactional operations).

If you add listeners directly to the bus (e.g., in tests or feature toggles), they are still honored because the hook checks the live bus state before deciding to emit.

Adding a new ent mutation listener

Define the handler

Implement a hooks.MutationHandler in internal/ent/hooks. Handlers receive a *soiree.EventContext and *hooks.MutationPayload; they can access the ent client via payload.Client or soiree.ClientAs.

Register the listener

Use eventer.AddMutationListener(<entity>, handler, opts...), typically in registerDefaultMutationListeners or wherever the feature is initialised. Supply priorities/pre-hooks/post-hooks as needed.

Ensure emission

The ent hook emits automatically once the listener is registered. If you need a new topic name, rely on the entity’s schema type (e.g., entgen.TypeOrganization) so mutationTopic produces a consistent topic.

Test the flow

  • Unit tests: construct an Eventer with NewEventer(WithEventerEmitter(soiree.New())), register your listener, and invoke the handler with a fake mutation payload.
  • Integration tests: exercise the ent mutation (GraphQL or REST) and assert on side effects, inspecting events via captured mocks or the listener itself.
Practical example
// internal/ent/hooks/listeners_billing.go
func handleOrganizationBillingUpdate(ctx *soiree.EventContext, payload *hooks.MutationPayload) error {
	if payload.Operation != ent.OpUpdateOne.String() {
		return nil
	}

	inv, ok := newEntitlementInvocation(ctx, payload, orgAllowContext)
	if !ok {
		return nil
	}

	return inv.reconcile()
}

// internal/ent/hooks/eventer.go
func registerDefaultMutationListeners(e *Eventer) {
	// existing listeners…
	e.AddMutationListener(entgen.TypeOrganizationSetting, handleOrganizationBillingUpdate)
}

Unit test pattern:

func TestHandleOrganizationBillingUpdate(t *testing.T) {
	bus := soiree.New()
	eventer := NewEventer(WithEventerEmitter(bus))
	eventer.AddMutationListener(entgen.TypeOrganizationSetting, handleOrganizationBillingUpdate)

	payload := &hooks.MutationPayload{
		Operation: ent.OpUpdateOne.String(),
		EntityID:  "org-setting-id",
		Client:    fakeEntClient(t), // supply test double
	}

	ctx := context.Background()
	event := soiree.NewBaseEvent(entgen.TypeOrganizationSetting, payload)
	event.SetContext(ctx)

	var errs []error
	for err := range bus.Emit(event.Topic(), event) {
		if err != nil {
			errs = append(errs, err)
		}
	}
	require.Empty(t, errs)
	// assert on fakeEntClient interactions…
}

End-to-end tests can exercise the actual ent mutation (GraphQL/REST) and assert on downstream effects (e.g., Stripe updates) once the listener runs.

Documentation

Overview

Package soiree provides a simple event emitter that allows you to emit events and listen for them

Index

Constants

View Source
const (
	TopicWorkflowTriggered           = "workflow.triggered"
	TopicWorkflowActionStarted       = "workflow.action.started"
	TopicWorkflowActionCompleted     = "workflow.action.completed"
	TopicWorkflowAssignmentCreated   = "workflow.assignment.created"
	TopicWorkflowAssignmentCompleted = "workflow.assignment.completed"
	TopicWorkflowInstanceCompleted   = "workflow.instance.completed"
	TopicWorkflowTimeoutExpired      = "workflow.timeout.expired"
	TopicMutationDetected            = "mutation.detected"
)

Topic name constants

View Source
const PropertyEventID = "soiree.event_id"

PropertyEventID is the reserved properties key used to identify events across retries/replays

Variables

View Source
var (
	// ErrNilListener is returned when a listener is nil
	ErrNilListener = errors.New("listener cannot be nil")
	// ErrInvalidTopicName is returned when a topic name is invalid
	ErrInvalidTopicName = errors.New("invalid topic name")
	// ErrTopicNotFound is returned when a topic is not found
	ErrTopicNotFound = errors.New("topic not found")
	// ErrListenerNotFound is returned when a listener is not found
	ErrListenerNotFound = errors.New("listener not found")
	// ErrEmitterClosed is returned when the event bus is closed
	ErrEmitterClosed = errors.New("event bus is closed")
	// ErrEmitterAlreadyClosed is returned when the event bus is already closed
	ErrEmitterAlreadyClosed = errors.New("event bus is already closed")

	// ErrNilPayload is returned when an event payload is nil
	ErrNilPayload = errors.New("nil payload")
	// ErrPayloadTypeMismatch is returned when an event payload type does not match the expected type
	ErrPayloadTypeMismatch = errors.New("payload type mismatch")
	// ErrEventTopicMismatch is returned when the emitted topic name disagrees with the Event.Topic() value
	ErrEventTopicMismatch = errors.New("event topic mismatch")
)

MutationDetectedTopic is emitted when a mutation occurs that might trigger workflows

View Source
var WorkflowActionCompletedTopic = NewTypedTopic(TopicWorkflowActionCompleted,
	WithObservability(ObservabilitySpec[WorkflowActionCompletedPayload]{
		Operation: "handle_action_completed",
		Origin:    "listeners",
	}),
)

WorkflowActionCompletedTopic is emitted when a workflow action finishes

View Source
var WorkflowActionStartedTopic = NewTypedTopic(TopicWorkflowActionStarted,
	WithObservability(ObservabilitySpec[WorkflowActionStartedPayload]{
		Operation: "handle_action_started",
		Origin:    "listeners",
	}),
)

WorkflowActionStartedTopic is emitted when a workflow action begins execution

View Source
var WorkflowAssignmentCompletedTopic = NewTypedTopic(TopicWorkflowAssignmentCompleted,
	WithObservability(ObservabilitySpec[WorkflowAssignmentCompletedPayload]{
		Operation: "handle_assignment_completed",
		Origin:    "listeners",
	}),
)

WorkflowAssignmentCompletedTopic is emitted when an approval decision is made

View Source
var WorkflowAssignmentCreatedTopic = NewTypedTopic(TopicWorkflowAssignmentCreated,
	WithObservability(ObservabilitySpec[WorkflowAssignmentCreatedPayload]{
		Operation: "handle_assignment_created",
		Origin:    "listeners",
	}),
)

WorkflowAssignmentCreatedTopic is emitted when an approval is assigned

View Source
var WorkflowInstanceCompletedTopic = NewTypedTopic(TopicWorkflowInstanceCompleted,
	WithObservability(ObservabilitySpec[WorkflowInstanceCompletedPayload]{
		Operation: "handle_instance_completed",
		Origin:    "listeners",
	}),
)

WorkflowInstanceCompletedTopic is emitted when a workflow finishes

WorkflowTimeoutExpiredTopic is emitted when a timeout occurs

View Source
var WorkflowTriggeredTopic = NewTypedTopic(TopicWorkflowTriggered,
	WithObservability(ObservabilitySpec[WorkflowTriggeredPayload]{
		Operation: "handle_workflow_triggered",
		Origin:    "listeners",
	}),
)

WorkflowTriggeredTopic is emitted when a workflow instance is created

Functions

func ClientAs added in v0.45.0

func ClientAs[T any](ctx *EventContext) (T, bool)

ClientAs attempts to cast the client to the requested type

func EventID added in v1.2.3

func EventID(event Event) string

EventID returns the stable idempotency key for an event when present

func PayloadAs added in v0.45.0

func PayloadAs[T any](ctx *EventContext) (T, bool)

PayloadAs attempts to cast the payload to the requested type

func ShutdownAll added in v0.18.1

func ShutdownAll() error

ShutdownAll gracefully closes all registered event buses

func UnwrapPayload added in v1.2.3

func UnwrapPayload[T any](event Event) (T, error)

UnwrapPayload extracts a typed payload from an event, handling JSON deserialization if needed

Types

type BaseEvent

type BaseEvent struct {
	// contains filtered or unexported fields
}

BaseEvent serves as a basic implementation of the `Event` interface and contains fields for storing the topic, payload, and aborted status of an event. The struct includes methods to interact with these fields such as getting and setting the payload, setting the aborted status, and checking if the event has been aborted. The struct also includes a `sync.RWMutex` field `mu` to handle concurrent access to the struct's fields in a thread-safe manner

func NewBaseEvent

func NewBaseEvent(topic string, payload any) *BaseEvent

NewBaseEvent creates a new instance of BaseEvent with a payload

func (*BaseEvent) Client added in v0.4.1

func (e *BaseEvent) Client() any

Client returns the event's client

func (*BaseEvent) Context added in v0.4.1

func (e *BaseEvent) Context() context.Context

Context returns the event's context

func (*BaseEvent) IsAborted

func (e *BaseEvent) IsAborted() bool

IsAborted checks the event's aborted status

func (*BaseEvent) Payload

func (e *BaseEvent) Payload() any

Payload returns the event's payload

func (*BaseEvent) Properties

func (e *BaseEvent) Properties() Properties

Properties returns the event's properties

func (*BaseEvent) SetAborted

func (e *BaseEvent) SetAborted(abort bool)

SetAborted sets the event's aborted status

func (*BaseEvent) SetClient added in v0.4.1

func (e *BaseEvent) SetClient(client any)

SetClient sets the event's client

func (*BaseEvent) SetContext added in v0.4.1

func (e *BaseEvent) SetContext(ctx context.Context)

SetContext sets the event's context

func (*BaseEvent) SetPayload

func (e *BaseEvent) SetPayload(payload any)

SetPayload sets the event's payload

func (*BaseEvent) SetProperties

func (e *BaseEvent) SetProperties(properties Properties)

SetProperties sets the event's properties

func (*BaseEvent) Topic

func (e *BaseEvent) Topic() string

Topic returns the event's topic

type Emitter added in v1.4.0

type Emitter interface {
	Emit(topic string, payload any) <-chan error
}

Emitter defines the minimal interface for emitting events

type Event

type Event interface {
	// Topic returns the event's topic
	Topic() string
	// Payload returns the event's payload
	Payload() any
	// Properties returns the event's properties
	Properties() Properties
	// SetPayload sets the event's payload
	SetPayload(any)
	// SetProperties sets the event's properties
	SetProperties(Properties)
	// SetAborted sets the event's aborted status
	SetAborted(bool)
	// IsAborted checks the event's aborted status
	IsAborted() bool
	// Context returns the event's context
	Context() context.Context
	// SetContext sets the event's context
	SetContext(context.Context)
	// Client returns the event's client
	Client() any
	// SetClient sets the event's client
	SetClient(any)
}

Event is an interface representing the structure of an instance of an event

type EventBus added in v1.2.3

type EventBus struct {
	// contains filtered or unexported fields
}

EventBus manages subscribing and unsubscribing listeners to topics and emitting events to subscribers

func New added in v1.2.3

func New(opts ...Option) *EventBus

New initializes a new EventBus with optional configuration options

func (*EventBus) Client added in v1.2.3

func (m *EventBus) Client() any

Client returns the client set on the event bus

func (*EventBus) Close added in v1.2.3

func (m *EventBus) Close() error

Close terminates the event pool and releases resources

func (*EventBus) Emit added in v1.2.3

func (m *EventBus) Emit(eventName string, payload any) <-chan error

Emit asynchronously dispatches an event to all subscribers of the event's topic

func (*EventBus) EmitWithContext added in v1.2.3

func (m *EventBus) EmitWithContext(ctx context.Context, eventName string, payload any) <-chan error

EmitWithContext asynchronously dispatches an event with the given context for timeout/cancellation control

func (*EventBus) InterestedIn added in v1.2.3

func (m *EventBus) InterestedIn(topicName string) bool

InterestedIn checks if the event bus has any listeners registered for the given topic

func (*EventBus) Off added in v1.2.3

func (m *EventBus) Off(topicName string, listenerID string) error

Off unsubscribes a listener from a topic using the listener's unique ID

func (*EventBus) On added in v1.2.3

func (m *EventBus) On(topicName string, listener Listener) (string, error)

On subscribes a listener to a topic with the given name and returns a unique listener ID

func (*EventBus) RegisterListeners added in v1.2.3

func (m *EventBus) RegisterListeners(bindings ...ListenerBinding) ([]string, error)

RegisterListeners registers multiple listener bindings and returns their IDs

func (*EventBus) WaitForIdle added in v1.5.10

func (m *EventBus) WaitForIdle()

WaitForIdle blocks until all submitted event handlers have completed

type EventContext added in v0.45.0

type EventContext struct {
	// contains filtered or unexported fields
}

EventContext bundles the event, payload, and client for a listener

func (*EventContext) Context added in v0.45.0

func (c *EventContext) Context() context.Context

Context returns the underlying request context

func (*EventContext) Event added in v0.45.0

func (c *EventContext) Event() Event

Event exposes the underlying event

func (*EventContext) Payload added in v0.45.0

func (c *EventContext) Payload() any

Payload returns the event payload

func (*EventContext) Properties added in v0.45.0

func (c *EventContext) Properties() Properties

Properties exposes the underlying property map

func (*EventContext) Property added in v0.45.0

func (c *EventContext) Property(key string) (any, bool)

Property fetches a property by key

func (*EventContext) PropertyString added in v0.45.0

func (c *EventContext) PropertyString(key string) (string, bool)

PropertyString fetches a string property by key

type Listener

type Listener func(*EventContext) error

Listener handles an event via the provided event context wrapper

type ListenerBinding added in v0.45.0

type ListenerBinding struct {
	// contains filtered or unexported fields
}

ListenerBinding encapsulates the registration of a listener against a topic

func BindListener added in v0.45.0

func BindListener[T any](topic TypedTopic[T], listener TypedListener[T]) ListenerBinding

BindListener produces a binding that can be registered on an EventBus

func (ListenerBinding) Register added in v0.45.0

func (b ListenerBinding) Register(bus *EventBus) (string, error)

Register registers the listener binding on the provided bus

type MutationDetectedPayload added in v1.2.3

type MutationDetectedPayload struct {
	// SchemaType is the type of schema where the mutation occurred
	SchemaType string
	// ObjectID is the ID of the object that was mutated
	ObjectID string
	// Operation is the type of operation performed (e.g., update, delete)
	Operation string
	// ChangedFields are the fields that were changed in the mutation
	ChangedFields []string
	// UserID is the ID of the user who performed the mutation
	UserID string
}

MutationDetectedPayload contains data for mutations that might trigger workflows

type ObservabilitySpec added in v1.4.0

type ObservabilitySpec[T any] struct {
	// Operation is the operation name to record
	Operation string
	// Origin is the component emitting the observation
	Origin string
	// TriggerFunc overrides how trigger event values are derived
	TriggerFunc func(*EventContext, T) string
}

ObservabilitySpec describes logging/metrics metadata for a typed topic

type Option added in v1.2.3

type Option func(*EventBus)

Option defines a function type for EventBus configuration options

func Client added in v1.2.3

func Client(client any) Option

Client sets a custom client for the EventBus

func ErrChanBufferSize added in v1.2.3

func ErrChanBufferSize(size int) Option

ErrChanBufferSize sets the size of the buffered channel for errors returned by asynchronous emits

func ErrorHandler added in v1.2.3

func ErrorHandler(errHandler func(Event, error) error) Option

ErrorHandler sets a custom error handler for an EventBus

func EventStore added in v0.22.0

func EventStore(store eventStore) Option

EventStore configures a custom event store

func IDGenerator added in v1.2.3

func IDGenerator(idGen func() string) Option

IDGenerator sets a custom ID generator for an EventBus

func Panics added in v1.2.3

func Panics(panicHandler PanicHandler) Option

Panics sets a custom panic handler for an EventBus

func Retry added in v1.2.3

func Retry(retries int, factory func() backoff.BackOff) Option

Retry configures retry attempts and backoff behavior for listener failures

func WithRedisStore added in v0.22.0

func WithRedisStore(client *redis.Client, opts ...RedisStoreOption) Option

WithRedisStore configures a Redis-backed event store

func Workers added in v1.2.3

func Workers(n int) Option

Workers sets the number of worker goroutines for concurrent event handling

type PanicHandler

type PanicHandler func(any)

PanicHandler is a function type that handles panics

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool is a worker pool implementation using the pond library

func NewPool added in v1.2.3

func NewPool(opts ...PoolOption) *Pool

NewPool creates a new worker pool with the given options

func (*Pool) Release

func (p *Pool) Release()

Release stops all workers in the pool and waits for them to finish

func (*Pool) Resize added in v1.2.3

func (p *Pool) Resize(maxWorkers int)

Resize adjusts the maximum number of workers in the pool

func (*Pool) Submit

func (p *Pool) Submit(task func())

Submit submits a task to the worker pool

func (*Pool) SubmitMultipleAndWait added in v1.2.3

func (p *Pool) SubmitMultipleAndWait(tasks []func()) error

SubmitMultipleAndWait submits multiple tasks and waits for all to complete

func (*Pool) WaitForIdle added in v1.5.10

func (p *Pool) WaitForIdle()

WaitForIdle blocks until all submitted tasks have completed

type PoolOption added in v1.2.3

type PoolOption func(*Pool)

PoolOption configures a Pool

func WithPoolMetricsRegisterer added in v1.2.3

func WithPoolMetricsRegisterer(reg prometheus.Registerer) PoolOption

WithPoolMetricsRegisterer configures pool metrics using the provided registerer (nil disables metrics)

func WithPoolName added in v1.2.3

func WithPoolName(name string) PoolOption

WithPoolName sets the pool name used for metrics labeling

func WithWorkers added in v1.2.3

func WithWorkers(n int) PoolOption

WithWorkers sets the maximum number of workers in the pool

type Properties

type Properties map[string]any

Properties is a map of properties to set on an event

func NewProperties

func NewProperties() Properties

NewProperties creates a new Properties map

func (Properties) GetKey added in v0.4.1

func (p Properties) GetKey(key string) any

Get a property from the Properties map

func (Properties) Set

func (p Properties) Set(name string, value any) Properties

Set a property on the Properties map

type RedisStore added in v0.22.0

type RedisStore struct {
	// contains filtered or unexported fields
}

RedisStore persists events and results in redis and acts as an event queue

func NewRedisStore added in v0.22.0

func NewRedisStore(client *redis.Client, opts ...RedisStoreOption) *RedisStore

NewRedisStore creates a new RedisStore with default metrics

func (*RedisStore) DequeueEvent added in v0.22.0

func (s *RedisStore) DequeueEvent(ctx context.Context) (Event, error)

DequeueEvent pops a soiree event from the event queue - party line !

func (*RedisStore) Events added in v0.22.0

func (s *RedisStore) Events(ctx context.Context) ([]Event, error)

Events returns all persisted events

func (*RedisStore) HandlerSucceeded added in v1.2.3

func (s *RedisStore) HandlerSucceeded(ctx context.Context, eventID string, handlerID string) (bool, error)

HandlerSucceeded reports whether the handler has already succeeded for the given event ID.

func (*RedisStore) Results added in v0.22.0

func (s *RedisStore) Results(ctx context.Context) ([]StoredResult, error)

Results returns all persisted listener results

func (*RedisStore) SaveEvent added in v0.22.0

func (s *RedisStore) SaveEvent(e Event) error

SaveEvent enqueues and stores the event

func (*RedisStore) SaveHandlerResult added in v0.22.0

func (s *RedisStore) SaveHandlerResult(e Event, handlerID string, err error) error

SaveHandlerResult stores the result of a listener processing an event

type RedisStoreOption added in v1.2.3

type RedisStoreOption func(*RedisStore)

RedisStoreOption configures a RedisStore

func WithDedupTTL added in v1.2.3

func WithDedupTTL(ttl time.Duration) RedisStoreOption

WithDedupTTL sets the TTL for deduplication keys in Redis

func WithEventsTTL added in v1.2.3

func WithEventsTTL(ttl time.Duration) RedisStoreOption

WithEventsTTL sets the TTL for persisted events in Redis

func WithRedisMetrics added in v1.2.3

func WithRedisMetrics(metrics *redisMetrics) RedisStoreOption

WithRedisMetrics allows injecting custom metrics (for testing)

func WithResultsTTL added in v1.2.3

func WithResultsTTL(ttl time.Duration) RedisStoreOption

WithResultsTTL sets the TTL for handler results in Redis

type StoredResult added in v0.22.0

type StoredResult struct {
	// Topic is the topic of the event that was processed
	Topic string `json:"topic"`
	// EventID is the unique idempotency key for the event, when available
	EventID string `json:"event_id,omitempty"`
	// HandlerID is the unique identifier of the listener that processed the event
	HandlerID string `json:"handler_id"`
	// Error is the error encountered while processing the event
	Error string `json:"error,omitempty"`
}

StoredResult holds the outcome of a listener processing an event

type TypedListener added in v0.45.0

type TypedListener[T any] func(*EventContext, T) error

TypedListener represents a listener that expects a strongly typed payload

type TypedTopic added in v0.45.0

type TypedTopic[T any] struct {
	// contains filtered or unexported fields
}

TypedTopic represents a strongly typed event topic. It carries helpers that convert between the strongly typed payload and the internal soiree.Event representation

func NewTypedTopic added in v0.45.0

func NewTypedTopic[T any](name string, opts ...TypedTopicOption[T]) TypedTopic[T]

NewTypedTopic constructs a typed topic with default wrap and unwrap helpers

func (TypedTopic[T]) Name added in v0.45.0

func (t TypedTopic[T]) Name() string

Name exposes the string representation of the topic

func (TypedTopic[T]) Observability added in v1.4.0

func (t TypedTopic[T]) Observability() (ObservabilitySpec[T], bool)

Observability returns the topic observability spec if configured

func (TypedTopic[T]) Wrap added in v1.2.3

func (t TypedTopic[T]) Wrap(payload T) (Event, error)

Wrap converts a typed payload into an Event using the topic's wrap helper.

type TypedTopicOption added in v1.2.3

type TypedTopicOption[T any] func(*TypedTopic[T])

TypedTopicOption configures a TypedTopic

func WithObservability added in v1.4.0

func WithObservability[T any](spec ObservabilitySpec[T]) TypedTopicOption[T]

WithObservability sets an observability spec for the typed topic

func WithUnwrap added in v1.2.3

func WithUnwrap[T any](unwrap func(Event) (T, error)) TypedTopicOption[T]

WithUnwrap sets a custom unwrap function for the typed topic

func WithWrap added in v1.2.3

func WithWrap[T any](wrap func(T) Event) TypedTopicOption[T]

WithWrap sets a custom wrap function for the typed topic

type WorkflowActionCompletedPayload added in v1.2.3

type WorkflowActionCompletedPayload struct {
	// InstanceID is the unique identifier for the workflow instance
	InstanceID string
	// ActionIndex is the index of the action in the workflow
	ActionIndex int
	// ActionType is the type of action that was completed
	ActionType enums.WorkflowActionType
	// ObjectID is the ID of the object the action was acting on
	ObjectID string
	// ObjectType is the type of the object the action was acting on
	ObjectType enums.WorkflowObjectType
	// Success indicates if the action completed successfully
	Success bool
	// Skipped indicates if the action was skipped
	Skipped bool
	// ErrorMessage contains any error message if the action failed
	ErrorMessage string
}

WorkflowActionCompletedPayload contains data for when a workflow action finishes

type WorkflowActionStartedPayload added in v1.2.3

type WorkflowActionStartedPayload struct {
	// InstanceID is the unique identifier for the workflow instance
	InstanceID string
	// ActionIndex is the index of the action in the workflow
	ActionIndex int
	// ActionType is the type of action being started
	ActionType enums.WorkflowActionType
	// ObjectID is the ID of the object the action is acting on
	ObjectID string
	// ObjectType is the type of the object the action is acting on
	ObjectType enums.WorkflowObjectType
}

WorkflowActionStartedPayload contains data for when a workflow action begins

type WorkflowAssignmentCompletedPayload added in v1.2.3

type WorkflowAssignmentCompletedPayload struct {
	// AssignmentID is the unique identifier for the assignment
	AssignmentID string
	// InstanceID is the unique identifier for the workflow instance
	InstanceID string
	// Status is the status of the assignment after completion
	Status enums.WorkflowAssignmentStatus
	// CompletedBy is the ID of the user who completed the assignment
	CompletedBy string
	// ObjectID is the ID of the object the assignment is related to
	ObjectID string
	// ObjectType is the type of the object the assignment is related to
	ObjectType enums.WorkflowObjectType
}

WorkflowAssignmentCompletedPayload contains data for when an approval decision is made

type WorkflowAssignmentCreatedPayload added in v1.2.3

type WorkflowAssignmentCreatedPayload struct {
	// AssignmentID is the unique identifier for the assignment
	AssignmentID string
	// InstanceID is the unique identifier for the workflow instance
	InstanceID string
	// TargetType is the type of the target for the assignment
	TargetType enums.WorkflowTargetType
	// TargetIDs are the IDs of the targets for the assignment
	TargetIDs []string
	// ObjectID is the ID of the object the assignment is related to
	ObjectID string
	// ObjectType is the type of the object the assignment is related to
	ObjectType enums.WorkflowObjectType
}

WorkflowAssignmentCreatedPayload contains data for when an approval is assigned

type WorkflowInstanceCompletedPayload added in v1.2.3

type WorkflowInstanceCompletedPayload struct {
	// InstanceID is the unique identifier for the workflow instance
	InstanceID string
	// State is the final state of the workflow instance
	State enums.WorkflowInstanceState
	// ObjectID is the ID of the object the workflow was acting on
	ObjectID string
	// ObjectType is the type of the object the workflow was acting on
	ObjectType enums.WorkflowObjectType
}

WorkflowInstanceCompletedPayload contains data for when a workflow finishes

type WorkflowTimeoutExpiredPayload added in v1.2.3

type WorkflowTimeoutExpiredPayload struct {
	// InstanceID is the unique identifier for the workflow instance
	InstanceID string
	// AssignmentID is the unique identifier for the assignment that timed out
	AssignmentID string
	// ObjectID is the ID of the object related to the timeout
	ObjectID string
	// ObjectType is the type of the object related to the timeout
	ObjectType enums.WorkflowObjectType
}

WorkflowTimeoutExpiredPayload contains data for when a timeout occurs

type WorkflowTriggeredPayload added in v1.2.3

type WorkflowTriggeredPayload struct {
	// InstanceID is the unique identifier for the workflow instance
	InstanceID string
	// DefinitionID is the identifier for the workflow definition
	DefinitionID string
	// ObjectID is the ID of the object the workflow is acting on
	ObjectID string
	// ObjectType is the type of the object the workflow is acting on
	ObjectType enums.WorkflowObjectType
	// TriggerEventType is the event type that triggered the workflow
	TriggerEventType string
	// TriggerChangedFields are the fields that changed and triggered the workflow
	TriggerChangedFields []string
}

WorkflowTriggeredPayload contains data for a workflow instance creation event

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL