Documentation
¶
Overview ¶
Package soiree provides a simple event emitter that allows you to emit events and listen for them
Index ¶
- Constants
- Variables
- func ClientAs[T any](ctx *EventContext) (T, bool)
- func EventID(event Event) string
- func PayloadAs[T any](ctx *EventContext) (T, bool)
- func ShutdownAll() error
- func UnwrapPayload[T any](event Event) (T, error)
- type BaseEvent
- func (e *BaseEvent) Client() any
- func (e *BaseEvent) Context() context.Context
- func (e *BaseEvent) IsAborted() bool
- func (e *BaseEvent) Payload() any
- func (e *BaseEvent) Properties() Properties
- func (e *BaseEvent) SetAborted(abort bool)
- func (e *BaseEvent) SetClient(client any)
- func (e *BaseEvent) SetContext(ctx context.Context)
- func (e *BaseEvent) SetPayload(payload any)
- func (e *BaseEvent) SetProperties(properties Properties)
- func (e *BaseEvent) Topic() string
- type Emitter
- type Event
- type EventBus
- func (m *EventBus) Client() any
- func (m *EventBus) Close() error
- func (m *EventBus) Emit(eventName string, payload any) <-chan error
- func (m *EventBus) EmitWithContext(ctx context.Context, eventName string, payload any) <-chan error
- func (m *EventBus) InterestedIn(topicName string) bool
- func (m *EventBus) Off(topicName string, listenerID string) error
- func (m *EventBus) On(topicName string, listener Listener) (string, error)
- func (m *EventBus) RegisterListeners(bindings ...ListenerBinding) ([]string, error)
- func (m *EventBus) WaitForIdle()
- type EventContext
- type Listener
- type ListenerBinding
- type MutationDetectedPayload
- type ObservabilitySpec
- type Option
- func Client(client any) Option
- func ErrChanBufferSize(size int) Option
- func ErrorHandler(errHandler func(Event, error) error) Option
- func EventStore(store eventStore) Option
- func IDGenerator(idGen func() string) Option
- func Panics(panicHandler PanicHandler) Option
- func Retry(retries int, factory func() backoff.BackOff) Option
- func WithRedisStore(client *redis.Client, opts ...RedisStoreOption) Option
- func Workers(n int) Option
- type PanicHandler
- type Pool
- type PoolOption
- type Properties
- type RedisStore
- func (s *RedisStore) DequeueEvent(ctx context.Context) (Event, error)
- func (s *RedisStore) Events(ctx context.Context) ([]Event, error)
- func (s *RedisStore) HandlerSucceeded(ctx context.Context, eventID string, handlerID string) (bool, error)
- func (s *RedisStore) Results(ctx context.Context) ([]StoredResult, error)
- func (s *RedisStore) SaveEvent(e Event) error
- func (s *RedisStore) SaveHandlerResult(e Event, handlerID string, err error) error
- type RedisStoreOption
- type StoredResult
- type TypedListener
- type TypedTopic
- type TypedTopicOption
- type WorkflowActionCompletedPayload
- type WorkflowActionStartedPayload
- type WorkflowAssignmentCompletedPayload
- type WorkflowAssignmentCreatedPayload
- type WorkflowInstanceCompletedPayload
- type WorkflowTimeoutExpiredPayload
- type WorkflowTriggeredPayload
Constants ¶
const ( TopicWorkflowTriggered = "workflow.triggered" TopicWorkflowActionStarted = "workflow.action.started" TopicWorkflowActionCompleted = "workflow.action.completed" TopicWorkflowAssignmentCreated = "workflow.assignment.created" TopicWorkflowAssignmentCompleted = "workflow.assignment.completed" TopicWorkflowInstanceCompleted = "workflow.instance.completed" TopicWorkflowTimeoutExpired = "workflow.timeout.expired" TopicMutationDetected = "mutation.detected" )
Topic name constants
const PropertyEventID = "soiree.event_id"
PropertyEventID is the reserved properties key used to identify events across retries/replays
Variables ¶
var ( // ErrNilListener is returned when a listener is nil ErrNilListener = errors.New("listener cannot be nil") // ErrInvalidTopicName is returned when a topic name is invalid ErrInvalidTopicName = errors.New("invalid topic name") // ErrTopicNotFound is returned when a topic is not found ErrTopicNotFound = errors.New("topic not found") // ErrListenerNotFound is returned when a listener is not found ErrListenerNotFound = errors.New("listener not found") // ErrEmitterClosed is returned when the event bus is closed ErrEmitterClosed = errors.New("event bus is closed") // ErrEmitterAlreadyClosed is returned when the event bus is already closed ErrEmitterAlreadyClosed = errors.New("event bus is already closed") // ErrNilPayload is returned when an event payload is nil ErrNilPayload = errors.New("nil payload") // ErrPayloadTypeMismatch is returned when an event payload type does not match the expected type ErrPayloadTypeMismatch = errors.New("payload type mismatch") // ErrEventTopicMismatch is returned when the emitted topic name disagrees with the Event.Topic() value ErrEventTopicMismatch = errors.New("event topic mismatch") )
var MutationDetectedTopic = NewTypedTopic[MutationDetectedPayload](TopicMutationDetected)
MutationDetectedTopic is emitted when a mutation occurs that might trigger workflows
var WorkflowActionCompletedTopic = NewTypedTopic(TopicWorkflowActionCompleted, WithObservability(ObservabilitySpec[WorkflowActionCompletedPayload]{ Operation: "handle_action_completed", Origin: "listeners", }), )
WorkflowActionCompletedTopic is emitted when a workflow action finishes
var WorkflowActionStartedTopic = NewTypedTopic(TopicWorkflowActionStarted, WithObservability(ObservabilitySpec[WorkflowActionStartedPayload]{ Operation: "handle_action_started", Origin: "listeners", }), )
WorkflowActionStartedTopic is emitted when a workflow action begins execution
var WorkflowAssignmentCompletedTopic = NewTypedTopic(TopicWorkflowAssignmentCompleted, WithObservability(ObservabilitySpec[WorkflowAssignmentCompletedPayload]{ Operation: "handle_assignment_completed", Origin: "listeners", }), )
WorkflowAssignmentCompletedTopic is emitted when an approval decision is made
var WorkflowAssignmentCreatedTopic = NewTypedTopic(TopicWorkflowAssignmentCreated, WithObservability(ObservabilitySpec[WorkflowAssignmentCreatedPayload]{ Operation: "handle_assignment_created", Origin: "listeners", }), )
WorkflowAssignmentCreatedTopic is emitted when an approval is assigned
var WorkflowInstanceCompletedTopic = NewTypedTopic(TopicWorkflowInstanceCompleted, WithObservability(ObservabilitySpec[WorkflowInstanceCompletedPayload]{ Operation: "handle_instance_completed", Origin: "listeners", }), )
WorkflowInstanceCompletedTopic is emitted when a workflow finishes
var WorkflowTimeoutExpiredTopic = NewTypedTopic[WorkflowTimeoutExpiredPayload](TopicWorkflowTimeoutExpired)
WorkflowTimeoutExpiredTopic is emitted when a timeout occurs
var WorkflowTriggeredTopic = NewTypedTopic(TopicWorkflowTriggered, WithObservability(ObservabilitySpec[WorkflowTriggeredPayload]{ Operation: "handle_workflow_triggered", Origin: "listeners", }), )
WorkflowTriggeredTopic is emitted when a workflow instance is created
Functions ¶
func ClientAs ¶ added in v0.45.0
func ClientAs[T any](ctx *EventContext) (T, bool)
ClientAs attempts to cast the client to the requested type
func PayloadAs ¶ added in v0.45.0
func PayloadAs[T any](ctx *EventContext) (T, bool)
PayloadAs attempts to cast the payload to the requested type
func ShutdownAll ¶ added in v0.18.1
func ShutdownAll() error
ShutdownAll gracefully closes all registered event buses
func UnwrapPayload ¶ added in v1.2.3
UnwrapPayload extracts a typed payload from an event, handling JSON deserialization if needed
Types ¶
type BaseEvent ¶
type BaseEvent struct {
// contains filtered or unexported fields
}
BaseEvent serves as a basic implementation of the `Event` interface and contains fields for storing the topic, payload, and aborted status of an event. The struct includes methods to interact with these fields such as getting and setting the payload, setting the aborted status, and checking if the event has been aborted. The struct also includes a `sync.RWMutex` field `mu` to handle concurrent access to the struct's fields in a thread-safe manner
func NewBaseEvent ¶
NewBaseEvent creates a new instance of BaseEvent with a payload
func (*BaseEvent) Properties ¶
func (e *BaseEvent) Properties() Properties
Properties returns the event's properties
func (*BaseEvent) SetAborted ¶
SetAborted sets the event's aborted status
func (*BaseEvent) SetContext ¶ added in v0.4.1
SetContext sets the event's context
func (*BaseEvent) SetPayload ¶
SetPayload sets the event's payload
func (*BaseEvent) SetProperties ¶
func (e *BaseEvent) SetProperties(properties Properties)
SetProperties sets the event's properties
type Event ¶
type Event interface {
// Topic returns the event's topic
Topic() string
// Payload returns the event's payload
Payload() any
// Properties returns the event's properties
Properties() Properties
// SetPayload sets the event's payload
SetPayload(any)
// SetProperties sets the event's properties
SetProperties(Properties)
// SetAborted sets the event's aborted status
SetAborted(bool)
// IsAborted checks the event's aborted status
IsAborted() bool
// Context returns the event's context
Context() context.Context
// SetContext sets the event's context
SetContext(context.Context)
// Client returns the event's client
Client() any
// SetClient sets the event's client
SetClient(any)
}
Event is an interface representing the structure of an instance of an event
type EventBus ¶ added in v1.2.3
type EventBus struct {
// contains filtered or unexported fields
}
EventBus manages subscribing and unsubscribing listeners to topics and emitting events to subscribers
func (*EventBus) Emit ¶ added in v1.2.3
Emit asynchronously dispatches an event to all subscribers of the event's topic
func (*EventBus) EmitWithContext ¶ added in v1.2.3
EmitWithContext asynchronously dispatches an event with the given context for timeout/cancellation control
func (*EventBus) InterestedIn ¶ added in v1.2.3
InterestedIn checks if the event bus has any listeners registered for the given topic
func (*EventBus) Off ¶ added in v1.2.3
Off unsubscribes a listener from a topic using the listener's unique ID
func (*EventBus) On ¶ added in v1.2.3
On subscribes a listener to a topic with the given name and returns a unique listener ID
func (*EventBus) RegisterListeners ¶ added in v1.2.3
func (m *EventBus) RegisterListeners(bindings ...ListenerBinding) ([]string, error)
RegisterListeners registers multiple listener bindings and returns their IDs
func (*EventBus) WaitForIdle ¶ added in v1.5.10
func (m *EventBus) WaitForIdle()
WaitForIdle blocks until all submitted event handlers have completed
type EventContext ¶ added in v0.45.0
type EventContext struct {
// contains filtered or unexported fields
}
EventContext bundles the event, payload, and client for a listener
func (*EventContext) Context ¶ added in v0.45.0
func (c *EventContext) Context() context.Context
Context returns the underlying request context
func (*EventContext) Event ¶ added in v0.45.0
func (c *EventContext) Event() Event
Event exposes the underlying event
func (*EventContext) Payload ¶ added in v0.45.0
func (c *EventContext) Payload() any
Payload returns the event payload
func (*EventContext) Properties ¶ added in v0.45.0
func (c *EventContext) Properties() Properties
Properties exposes the underlying property map
func (*EventContext) Property ¶ added in v0.45.0
func (c *EventContext) Property(key string) (any, bool)
Property fetches a property by key
func (*EventContext) PropertyString ¶ added in v0.45.0
func (c *EventContext) PropertyString(key string) (string, bool)
PropertyString fetches a string property by key
type Listener ¶
type Listener func(*EventContext) error
Listener handles an event via the provided event context wrapper
type ListenerBinding ¶ added in v0.45.0
type ListenerBinding struct {
// contains filtered or unexported fields
}
ListenerBinding encapsulates the registration of a listener against a topic
func BindListener ¶ added in v0.45.0
func BindListener[T any](topic TypedTopic[T], listener TypedListener[T]) ListenerBinding
BindListener produces a binding that can be registered on an EventBus
type MutationDetectedPayload ¶ added in v1.2.3
type MutationDetectedPayload struct {
// SchemaType is the type of schema where the mutation occurred
SchemaType string
// ObjectID is the ID of the object that was mutated
ObjectID string
// Operation is the type of operation performed (e.g., update, delete)
Operation string
// ChangedFields are the fields that were changed in the mutation
ChangedFields []string
// UserID is the ID of the user who performed the mutation
UserID string
}
MutationDetectedPayload contains data for mutations that might trigger workflows
type ObservabilitySpec ¶ added in v1.4.0
type ObservabilitySpec[T any] struct { // Operation is the operation name to record Operation string // Origin is the component emitting the observation Origin string // TriggerFunc overrides how trigger event values are derived TriggerFunc func(*EventContext, T) string }
ObservabilitySpec describes logging/metrics metadata for a typed topic
type Option ¶ added in v1.2.3
type Option func(*EventBus)
Option defines a function type for EventBus configuration options
func ErrChanBufferSize ¶ added in v1.2.3
ErrChanBufferSize sets the size of the buffered channel for errors returned by asynchronous emits
func ErrorHandler ¶ added in v1.2.3
ErrorHandler sets a custom error handler for an EventBus
func EventStore ¶ added in v0.22.0
func EventStore(store eventStore) Option
EventStore configures a custom event store
func IDGenerator ¶ added in v1.2.3
IDGenerator sets a custom ID generator for an EventBus
func Panics ¶ added in v1.2.3
func Panics(panicHandler PanicHandler) Option
Panics sets a custom panic handler for an EventBus
func Retry ¶ added in v1.2.3
Retry configures retry attempts and backoff behavior for listener failures
func WithRedisStore ¶ added in v0.22.0
func WithRedisStore(client *redis.Client, opts ...RedisStoreOption) Option
WithRedisStore configures a Redis-backed event store
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool is a worker pool implementation using the pond library
func NewPool ¶ added in v1.2.3
func NewPool(opts ...PoolOption) *Pool
NewPool creates a new worker pool with the given options
func (*Pool) Release ¶
func (p *Pool) Release()
Release stops all workers in the pool and waits for them to finish
func (*Pool) SubmitMultipleAndWait ¶ added in v1.2.3
SubmitMultipleAndWait submits multiple tasks and waits for all to complete
func (*Pool) WaitForIdle ¶ added in v1.5.10
func (p *Pool) WaitForIdle()
WaitForIdle blocks until all submitted tasks have completed
type PoolOption ¶ added in v1.2.3
type PoolOption func(*Pool)
PoolOption configures a Pool
func WithPoolMetricsRegisterer ¶ added in v1.2.3
func WithPoolMetricsRegisterer(reg prometheus.Registerer) PoolOption
WithPoolMetricsRegisterer configures pool metrics using the provided registerer (nil disables metrics)
func WithPoolName ¶ added in v1.2.3
func WithPoolName(name string) PoolOption
WithPoolName sets the pool name used for metrics labeling
func WithWorkers ¶ added in v1.2.3
func WithWorkers(n int) PoolOption
WithWorkers sets the maximum number of workers in the pool
type Properties ¶
Properties is a map of properties to set on an event
func (Properties) GetKey ¶ added in v0.4.1
func (p Properties) GetKey(key string) any
Get a property from the Properties map
func (Properties) Set ¶
func (p Properties) Set(name string, value any) Properties
Set a property on the Properties map
type RedisStore ¶ added in v0.22.0
type RedisStore struct {
// contains filtered or unexported fields
}
RedisStore persists events and results in redis and acts as an event queue
func NewRedisStore ¶ added in v0.22.0
func NewRedisStore(client *redis.Client, opts ...RedisStoreOption) *RedisStore
NewRedisStore creates a new RedisStore with default metrics
func (*RedisStore) DequeueEvent ¶ added in v0.22.0
func (s *RedisStore) DequeueEvent(ctx context.Context) (Event, error)
DequeueEvent pops a soiree event from the event queue - party line !
func (*RedisStore) Events ¶ added in v0.22.0
func (s *RedisStore) Events(ctx context.Context) ([]Event, error)
Events returns all persisted events
func (*RedisStore) HandlerSucceeded ¶ added in v1.2.3
func (s *RedisStore) HandlerSucceeded(ctx context.Context, eventID string, handlerID string) (bool, error)
HandlerSucceeded reports whether the handler has already succeeded for the given event ID.
func (*RedisStore) Results ¶ added in v0.22.0
func (s *RedisStore) Results(ctx context.Context) ([]StoredResult, error)
Results returns all persisted listener results
func (*RedisStore) SaveEvent ¶ added in v0.22.0
func (s *RedisStore) SaveEvent(e Event) error
SaveEvent enqueues and stores the event
func (*RedisStore) SaveHandlerResult ¶ added in v0.22.0
func (s *RedisStore) SaveHandlerResult(e Event, handlerID string, err error) error
SaveHandlerResult stores the result of a listener processing an event
type RedisStoreOption ¶ added in v1.2.3
type RedisStoreOption func(*RedisStore)
RedisStoreOption configures a RedisStore
func WithDedupTTL ¶ added in v1.2.3
func WithDedupTTL(ttl time.Duration) RedisStoreOption
WithDedupTTL sets the TTL for deduplication keys in Redis
func WithEventsTTL ¶ added in v1.2.3
func WithEventsTTL(ttl time.Duration) RedisStoreOption
WithEventsTTL sets the TTL for persisted events in Redis
func WithRedisMetrics ¶ added in v1.2.3
func WithRedisMetrics(metrics *redisMetrics) RedisStoreOption
WithRedisMetrics allows injecting custom metrics (for testing)
func WithResultsTTL ¶ added in v1.2.3
func WithResultsTTL(ttl time.Duration) RedisStoreOption
WithResultsTTL sets the TTL for handler results in Redis
type StoredResult ¶ added in v0.22.0
type StoredResult struct {
// Topic is the topic of the event that was processed
Topic string `json:"topic"`
// EventID is the unique idempotency key for the event, when available
EventID string `json:"event_id,omitempty"`
// HandlerID is the unique identifier of the listener that processed the event
HandlerID string `json:"handler_id"`
// Error is the error encountered while processing the event
Error string `json:"error,omitempty"`
}
StoredResult holds the outcome of a listener processing an event
type TypedListener ¶ added in v0.45.0
type TypedListener[T any] func(*EventContext, T) error
TypedListener represents a listener that expects a strongly typed payload
type TypedTopic ¶ added in v0.45.0
type TypedTopic[T any] struct { // contains filtered or unexported fields }
TypedTopic represents a strongly typed event topic. It carries helpers that convert between the strongly typed payload and the internal soiree.Event representation
func NewTypedTopic ¶ added in v0.45.0
func NewTypedTopic[T any](name string, opts ...TypedTopicOption[T]) TypedTopic[T]
NewTypedTopic constructs a typed topic with default wrap and unwrap helpers
func (TypedTopic[T]) Name ¶ added in v0.45.0
func (t TypedTopic[T]) Name() string
Name exposes the string representation of the topic
func (TypedTopic[T]) Observability ¶ added in v1.4.0
func (t TypedTopic[T]) Observability() (ObservabilitySpec[T], bool)
Observability returns the topic observability spec if configured
func (TypedTopic[T]) Wrap ¶ added in v1.2.3
func (t TypedTopic[T]) Wrap(payload T) (Event, error)
Wrap converts a typed payload into an Event using the topic's wrap helper.
type TypedTopicOption ¶ added in v1.2.3
type TypedTopicOption[T any] func(*TypedTopic[T])
TypedTopicOption configures a TypedTopic
func WithObservability ¶ added in v1.4.0
func WithObservability[T any](spec ObservabilitySpec[T]) TypedTopicOption[T]
WithObservability sets an observability spec for the typed topic
func WithUnwrap ¶ added in v1.2.3
func WithUnwrap[T any](unwrap func(Event) (T, error)) TypedTopicOption[T]
WithUnwrap sets a custom unwrap function for the typed topic
func WithWrap ¶ added in v1.2.3
func WithWrap[T any](wrap func(T) Event) TypedTopicOption[T]
WithWrap sets a custom wrap function for the typed topic
type WorkflowActionCompletedPayload ¶ added in v1.2.3
type WorkflowActionCompletedPayload struct {
// InstanceID is the unique identifier for the workflow instance
InstanceID string
// ActionIndex is the index of the action in the workflow
ActionIndex int
// ActionType is the type of action that was completed
ActionType enums.WorkflowActionType
// ObjectID is the ID of the object the action was acting on
ObjectID string
// ObjectType is the type of the object the action was acting on
ObjectType enums.WorkflowObjectType
// Success indicates if the action completed successfully
Success bool
// Skipped indicates if the action was skipped
Skipped bool
// ErrorMessage contains any error message if the action failed
ErrorMessage string
}
WorkflowActionCompletedPayload contains data for when a workflow action finishes
type WorkflowActionStartedPayload ¶ added in v1.2.3
type WorkflowActionStartedPayload struct {
// InstanceID is the unique identifier for the workflow instance
InstanceID string
// ActionIndex is the index of the action in the workflow
ActionIndex int
// ActionType is the type of action being started
ActionType enums.WorkflowActionType
// ObjectID is the ID of the object the action is acting on
ObjectID string
// ObjectType is the type of the object the action is acting on
ObjectType enums.WorkflowObjectType
}
WorkflowActionStartedPayload contains data for when a workflow action begins
type WorkflowAssignmentCompletedPayload ¶ added in v1.2.3
type WorkflowAssignmentCompletedPayload struct {
// AssignmentID is the unique identifier for the assignment
AssignmentID string
// InstanceID is the unique identifier for the workflow instance
InstanceID string
// Status is the status of the assignment after completion
Status enums.WorkflowAssignmentStatus
// CompletedBy is the ID of the user who completed the assignment
CompletedBy string
// ObjectID is the ID of the object the assignment is related to
ObjectID string
// ObjectType is the type of the object the assignment is related to
ObjectType enums.WorkflowObjectType
}
WorkflowAssignmentCompletedPayload contains data for when an approval decision is made
type WorkflowAssignmentCreatedPayload ¶ added in v1.2.3
type WorkflowAssignmentCreatedPayload struct {
// AssignmentID is the unique identifier for the assignment
AssignmentID string
// InstanceID is the unique identifier for the workflow instance
InstanceID string
// TargetType is the type of the target for the assignment
TargetType enums.WorkflowTargetType
// TargetIDs are the IDs of the targets for the assignment
TargetIDs []string
// ObjectID is the ID of the object the assignment is related to
ObjectID string
// ObjectType is the type of the object the assignment is related to
ObjectType enums.WorkflowObjectType
}
WorkflowAssignmentCreatedPayload contains data for when an approval is assigned
type WorkflowInstanceCompletedPayload ¶ added in v1.2.3
type WorkflowInstanceCompletedPayload struct {
// InstanceID is the unique identifier for the workflow instance
InstanceID string
// State is the final state of the workflow instance
State enums.WorkflowInstanceState
// ObjectID is the ID of the object the workflow was acting on
ObjectID string
// ObjectType is the type of the object the workflow was acting on
ObjectType enums.WorkflowObjectType
}
WorkflowInstanceCompletedPayload contains data for when a workflow finishes
type WorkflowTimeoutExpiredPayload ¶ added in v1.2.3
type WorkflowTimeoutExpiredPayload struct {
// InstanceID is the unique identifier for the workflow instance
InstanceID string
// AssignmentID is the unique identifier for the assignment that timed out
AssignmentID string
// ObjectID is the ID of the object related to the timeout
ObjectID string
// ObjectType is the type of the object related to the timeout
ObjectType enums.WorkflowObjectType
}
WorkflowTimeoutExpiredPayload contains data for when a timeout occurs
type WorkflowTriggeredPayload ¶ added in v1.2.3
type WorkflowTriggeredPayload struct {
// InstanceID is the unique identifier for the workflow instance
InstanceID string
// DefinitionID is the identifier for the workflow definition
DefinitionID string
// ObjectID is the ID of the object the workflow is acting on
ObjectID string
// ObjectType is the type of the object the workflow is acting on
ObjectType enums.WorkflowObjectType
// TriggerEventType is the event type that triggered the workflow
TriggerEventType string
// TriggerChangedFields are the fields that changed and triggered the workflow
TriggerChangedFields []string
}
WorkflowTriggeredPayload contains data for a workflow instance creation event