emitter

package module
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 14, 2025 License: MIT Imports: 9 Imported by: 4

README

GoDoc

emitter

A lightweight, thread-safe event emission library for Go that implements a publish-subscribe pattern.

Features

  • Event System: Send rich events with data and context to named topics
  • Trigger System: Fast, lightweight one-shot notifications using atomic operations
  • Thread-Safe: All operations are safe for concurrent use
  • Context Support: Events respect context deadlines and cancellation
  • Type-Safe Arguments: Generic Arg[T]() function for type-safe argument access
  • Flexible Buffering: Configurable channel capacity for both events and triggers

Installation

go get github.com/KarpelesLab/emitter

Event System

Basic Example
h := emitter.New()
ch := h.On("event")

go func() {
    defer h.Off("event", ch)

    for ev := range ch {
        // handle event
        intVal, err := emitter.Arg[int](ev, 0)
        // intVal is 42
    }
}()

h.Emit(context.Background(), "event", 42)
Global Hub

For cases where events need to be shared across multiple packages, use the global hub:

ch := emitter.Global.On("event")

go func() {
    for ev := range ch {
        // ...
    }
}()

emitter.Global.Emit(context.Background(), "event", 42)
Emitting with Timeout

For goroutine-based emission with a timeout instead of context:

go h.EmitTimeout(30*time.Second, "topic", args...)

Trigger System

The trigger object allows waking multiple goroutines at the same time using channels rather than sync.Cond. This is useful for waking many goroutines to specific events while still using other event sources such as timers.

Triggers by default have a queue size of 1, meaning that a call to Push() can be queued and delivered later if the receiving goroutine is busy. Capacity can be set to other values including zero (do not queue) or larger values (queue multiple calls).

Unlike events Emit(), a trigger's Push() method returns instantly and is non-blocking, with minimal resource usage (a single atomic operation).

Example
trig := emitter.Global.Trigger("test")

go func() {
    t := time.NewTicker(30*time.Second)
    defer t.Stop()
    l := trig.Listen()
    defer l.Release()

    for {
        select {
        case <-t.C:
            // do something every 30 secs
        case <-l.C:
            // do something on trigger called
        }
    }
}()

trig.Push() // push signal to all listeners
Standalone Triggers

Triggers can also be used independently of a Hub:

trig := emitter.NewTrigger()

l := trig.Listen()
defer l.Release()

// In another goroutine
trig.Push()

API Overview

Hub Methods
Method Description
New() Create a new Hub instance
On(topic) Subscribe to a topic, returns a channel
OnWithCap(topic, cap) Subscribe with custom channel capacity
Off(topic, ch) Unsubscribe from a topic
Emit(ctx, topic, args...) Emit an event (blocks until delivered or context expires)
EmitTimeout(timeout, topic, args...) Emit with timeout
Trigger(name) Get or create a named trigger
Push(name) Push signal to a named trigger
Close() Close all topics and triggers
Event Methods
Method Description
Arg(n) Get nth argument as any
Arg[T](ev, n) Get nth argument with type conversion
EncodedArg(n, key, encoder) Get cached encoded representation
Trigger Interface
Method Description
Listen() Create a listener with default capacity
ListenCap(cap) Create a listener with custom capacity
Push() Wake all listeners (non-blocking)
Close() Close trigger and all listeners

License

MIT License - see LICENSE file.

Documentation

Overview

Package emitter provides a lightweight, thread-safe event emission library for Go that implements a publish-subscribe pattern. It offers two main mechanisms for inter-goroutine communication:

  • Event System: For sending rich events with data and context
  • Trigger System: For simple, fast one-shot notifications

Event System

The event system allows publishing events to named topics. Subscribers receive events through channels that they can select on alongside other channel operations.

Basic usage:

h := emitter.New()
ch := h.On("event")

go func() {
    defer h.Off("event", ch)
    for ev := range ch {
        intVal, err := emitter.Arg[int](ev, 0)
        // process event...
    }
}()

h.Emit(context.Background(), "event", 42)

A global Hub is available via Global for cases where events need to be shared across multiple packages.

Trigger System

Triggers provide a lightweight mechanism to wake multiple goroutines without carrying event data. Unlike events, triggers use atomic operations for minimal overhead and return immediately without blocking.

Trigger usage:

trig := emitter.NewTrigger()

go func() {
    l := trig.Listen()
    defer l.Release()
    for {
        select {
        case <-l.C:
            // handle trigger
        }
    }
}()

trig.Push() // wake all listeners

Triggers support configurable channel capacity to allow queuing of notifications. The default capacity is 1, allowing one pending notification to be queued.

Thread Safety

All types in this package are safe for concurrent use by multiple goroutines.

Index

Constants

This section is empty.

Variables

View Source
var ErrNoSuchTopic = errors.New("no such topic")

ErrNoSuchTopic is returned by Hub.Emit and Hub.EmitEvent when attempting to emit an event to a topic that has no subscribers.

View Source
var Global = &Hub{}

Global is the default Hub instance that can be used for application-wide event handling. It is useful when events need to be shared across multiple packages without explicitly passing a Hub reference.

Functions

func Arg

func Arg[T any](ev *Event, arg uint) (T, error)

Arg is a generic function that returns the nth argument from an event, converted to the specified type T. It uses type conversion and will return an error if the argument cannot be converted to type T.

Example:

intVal, err := emitter.Arg[int](ev, 0)
strVal, err := emitter.Arg[string](ev, 1)

Types

type ArgEncoder added in v0.2.2

type ArgEncoder func(any) ([]byte, error)

ArgEncoder is a function type for encoding event arguments into bytes. It is used with Event.EncodedArg to cache encoded representations of arguments, such as JSON encoding.

type Event

type Event struct {
	// Context is the context passed to [Hub.Emit] or [Hub.EmitEvent].
	// It can be used for cancellation or deadline propagation.
	Context context.Context

	// Topic is the name of the topic this event was emitted on.
	Topic string

	// Args contains the arguments passed to [Hub.Emit].
	Args []any
	// contains filtered or unexported fields
}

Event represents an emitted event that is delivered to subscribers. It contains the event context, topic name, and any arguments passed during emission.

func (*Event) Arg

func (ev *Event) Arg(n uint) any

Arg returns the nth argument from the event, or nil if n is out of bounds. For type-safe argument access with conversion, use the generic Arg function.

func (*Event) EncodedArg added in v0.2.2

func (ev *Event) EncodedArg(n uint, key string, encoder ArgEncoder) ([]byte, error)

EncodedArg returns an encoded representation of the nth argument, using the provided encoder function. The result is cached by key, so subsequent calls with the same argument index and key will return the cached result without re-encoding. This is useful for efficiently encoding arguments to formats like JSON when multiple listeners need the same encoded representation.

Example:

jsonBytes, err := ev.EncodedArg(0, "json", json.Marshal)

type Hub

type Hub struct {
	// Cap is the default channel capacity for new listeners created with [Hub.On].
	// Set this before creating listeners to change the default buffer size.
	// The default value of 0 means unbuffered channels.
	Cap uint
	// contains filtered or unexported fields
}

Hub is the central event dispatcher that manages topics and triggers. It provides methods for subscribing to topics, emitting events, and managing triggers for lightweight notifications.

A Hub is safe for concurrent use by multiple goroutines.

func New

func New() *Hub

New creates and returns a new Hub instance with default settings.

func (*Hub) Close added in v0.1.2

func (h *Hub) Close() error

Close will turn off all of the hub's topics, ending all listeners.

func (*Hub) Emit

func (h *Hub) Emit(ctx context.Context, topic string, args ...any) error

Emit emits an event on the given topic, and will not return until the event has been added to all the queues, or the context expires.

func (*Hub) EmitEvent added in v0.1.2

func (h *Hub) EmitEvent(ctx context.Context, topic string, ev *Event) error

EmitEvent emits an existing Event object without copying it.

func (*Hub) EmitEventTimeout added in v0.1.2

func (h *Hub) EmitEventTimeout(timeout time.Duration, topic string, ev *Event) error

EmitEventTimeout is similar to EmitEvent but with a timeout instead of a context

func (*Hub) EmitTimeout

func (h *Hub) EmitTimeout(timeout time.Duration, topic string, args ...any) error

EmitTimeout emits an event with a given timeout instead of using a context. This is useful when emitting events in goroutines, ie:

go h.EmitTimeout(30*time.Second, "topic", args...)

func (*Hub) Off

func (h *Hub) Off(topic string, ch <-chan *Event)

Off unsubscribes from a given topic. If ch is nil, the whole topic is closed, otherwise only the given channel is removed from the topic. Note that the channel will be closed in the process.

func (*Hub) On

func (h *Hub) On(topic string) <-chan *Event

On returns a channel that will receive events

func (*Hub) OnWithCap

func (h *Hub) OnWithCap(topic string, c uint) <-chan *Event

OnWithCap returns a channel that will receive events, and has the given capacity instead of the default one

func (*Hub) Push added in v0.2.1

func (h *Hub) Push(trigger string)

Push sends a signal to the named trigger, waking all its listeners. If the trigger does not exist, this method does nothing. Unlike Hub.Emit, Push returns immediately and is non-blocking.

func (*Hub) Trigger added in v0.1.6

func (h *Hub) Trigger(trigName string) Trigger

Trigger returns the given trigger, creating it if needed. This can make it easy to call methods like Listen() or Push() in one go.

type Trigger added in v0.1.3

type Trigger interface {
	Listen() *TriggerListener
	ListenCap(c uint) *TriggerListener
	Push()
	Close() error
}

Trigger is a simple lightweight process for sending simple one shot notifications on multiple channels. There is no event or notification content, just a "wake up" signal sent to many channels at the same time. Calling the trigger costs virtually nothing (just one atomic integer operation) while listeners can have more complex logic.

Even if there are a lot of listeners and it takes time to deliver notifications, each call to Push will translate to one attempt to push something on the listening channels. The only case where a channel drops notifications is if it's not ready to listen, but this can be solved by adding some capacity to the channels by setting Cap to something larger than zero.

func NewTrigger added in v0.1.3

func NewTrigger() Trigger

NewTrigger returns a new trigger object ready for use. This will also create a goroutine

type TriggerListener added in v0.1.3

type TriggerListener struct {
	C <-chan struct{}
	// contains filtered or unexported fields
}

TriggerListener represents a listener that will receive notifications when the trigger is pushed. Call Release() after using it to close the channel (with a defer l.Release())

func (*TriggerListener) Release added in v0.1.3

func (tl *TriggerListener) Release()

Release will stop sending data to the channel for this trigger. The channel will not be closed howeveras Release() is assumed to be called when exiting the listening loop.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL