bus

package
v0.0.35 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2026 License: MIT Imports: 4 Imported by: 0

README

sup/bus

Go Reference Test License

bus provides reactive value propagation for Go programs. It is built on top of sup actors and designed for systems that poll hardware, sensors, or external services, and need to broadcast changes to multiple consumers.

Installation

go get github.com/webermarci/sup/bus

Concepts

Type Direction Use case
Signal Read → broadcast Poll a register, sensor, or API; notify subscribers on update
Computed Notify → Update Eagerly update value when dependencies change; broadcast updates
Debounce Notify → Wait → Broadcast Ignore rapid updates until the source is quiet; prevent noise
Throttle Notify → Limit → Broadcast Limit the maximum rate of updates to prevent overwhelming consumers
Link Subscribe → Write Connect a Provider to a Writer as a supervised actor
ViewFunc Read (Lazy) Transform or combine existing values without any goroutines
Trigger Write → update Accept writes from callers; forward to a handler on success

Active types (Signal, Computed, Debounce, Throttle, Link, Trigger) are actors and should be managed with a supervisor.

Signal

A Signal periodically calls an update function and broadcasts the result to all current subscribers.

signal := bus.NewSignal("signal", func(ctx context.Context) (uint16, error) {
	return modbusClient.ReadRegister(0x01)
}).
	WithInterval(100 * time.Millisecond).
	WithInitialValue(0).
	WithInitialNotify(true)

go signal.Run(ctx)

ch := signal.Subscribe(ctx)
for v := range ch {
	fmt.Printf("register 0x01 changed: %d\n", v)
}
Options
Option Default Description
WithInterval(d) 1s How often the update function is called
WithInitialValue(v) zero value Value before the first successful update
WithInitialNotify(true) false Send the current value immediately to each new subscriber
WithSubscriberBuffer(n) 16 The channel buffer size for subscribers
WithEqual(func) nil Custom equality function used to suppress identical updates
Behaviour
  • If the update function returns an error, the value is not updated and subscribers are not notified.
  • If WithEqual is configured, repeated equal values are dropped.
  • If WithEqual is not configured, every successful update is broadcast.
  • Subscribing with a canceled context is a no-op; the returned channel is closed immediately.
  • Canceling a subscriber's context closes its channel and removes it from the broadcast list.

ViewFunc

A ViewFunc provides a lazy, zero-overhead functional transformation of one or more Readers. It calculates its value on demand when Read() is called. It is an adapter type, not an actor, and requires no supervision.

tempC := bus.NewSignal(...)

// Simple transformation
tempF := bus.ViewFunc[float64](func() float64 {
	return tempC.Read()*9/5 + 32
})

tempF.Read()

// Complex aggregation
isSafe := bus.ViewFunc[bool](func() bool {
	return tempC.Read() < 100.0 && pressure.Read() < 10.5
})

isSafe.Read()

Computed

A Computed actor eagerly updates its value whenever its dependencies notify it of a change. Unlike a ViewFunc, which is lazy, a Computed actor maintains its own state and broadcasts changes to its own subscribers.

temp := bus.NewSignal(...)
humidity := bus.NewSignal(...)

heatIndex := bus.NewComputed("heatIndex", func() float64 {
	return calculateHeatIndex(temp.Read(), humidity.Read())
}, temp, humidity).
	WithCoalesceWindow(5 * time.Millisecond)

go heatIndex.Run(ctx)

ch := heatIndex.Subscribe(ctx)
for v := range ch {
	fmt.Printf("new heat index: %.2f\n", v)
}
Options
Option Default Description
WithCoalesceWindow(d) 5ms Delay used to batch concurrent dependency updates
WithInitialNotify(true) false Send the current value immediately to each new subscriber
WithSubscriberBuffer(n) 16 The channel buffer size for subscribers
WithEqual(func) nil Custom equality function used to suppress identical updates
Behaviour
  • It calls the update function once during creation to establish the initial value.
  • It subscribes to all provided dependencies and re-runs the update function whenever any dependency notifies it.
  • Glitch-free: it batches concurrent dependency notifications within a coalesceWindow so diamond graphs do not cause redundant recalculations or torn states.
  • If WithEqual is configured, equal results are not broadcast.
  • If WithEqual is not configured, every recomputation is broadcast.

Debounce

A Debounce actor delays broadcasting updates from its upstream source until the source has stopped changing for a specified wait duration. This is ideal for taming noisy sensors or rapid user inputs.

button := bus.NewTrigger(...)

cleanButton := bus.NewDebounce("clean-button", button, 300*time.Millisecond).
	WithMaxWait(1 * time.Second)

go cleanButton.Run(ctx)

ch := cleanButton.Subscribe(ctx)
for v := range ch {
	fmt.Println("debounced button state:", v)
}
Options
Option Default Description
WithMaxWait(d) 0 Force an update after this duration, preventing infinite starvation
WithInitialNotify(true) false Send the current value immediately to each new subscriber
WithSubscriberBuffer(n) 16 The channel buffer size for subscribers
Behaviour
  • Emits only after the source has been quiet for the wait duration.
  • Evaluates the source immediately upon creation so Read() yields a valid initial value.

Throttle

A Throttle actor limits the rate at which updates from its upstream source are broadcast. It ensures that updates are sent at most once per interval, which is useful for decoupling fast internal state changes from slower consumers such as WebSockets or UI rendering.

fastComputed := bus.NewComputed(...)

uiState := bus.NewThrottle("ui-throttle", fastComputed, 200*time.Millisecond)

go uiState.Run(ctx)

ch := uiState.Subscribe(ctx)
for state := range ch {
	websocket.Send(state)
}
Options
Option Default Description
WithInitialNotify(true) false Send the current value immediately to each new subscriber
WithSubscriberBuffer(n) 16 The channel buffer size for subscribers
Behaviour
  • If the throttle window is open, the first event is emitted immediately.
  • If events arrive while the window is closed, it keeps the latest one and emits it when the interval elapses.
  • Evaluates the source immediately upon creation so Read() yields a valid initial value.

Trigger

A Trigger accepts writes via Write, calls an update function with the new value, and, on success, updates the stored value and notifies subscribers.

trigger := bus.NewTrigger("trigger", func(ctx context.Context, v uint16) error {
	return modbusClient.WriteRegister(0x02, v)
}).WithInitialValue(0)

go trigger.Run(ctx)

if err := trigger.Write(42); err != nil {
	fmt.Printf("write rejected: %v\n", err)
}
Options
Option Default Description
WithInitialValue(v) zero value Value before the first successful write
WithInitialNotify(true) false Send the current value immediately to each new subscriber
WithSubscriberBuffer(n) 16 The channel buffer size for subscribers
Behaviour
  • Write is synchronous; it blocks until the update function returns.
  • If the update function returns an error, the stored value is not updated and subscribers are not notified.

A Link actor connects a readable Provider to a destination Writer such as a Trigger. This lets you route data through your reactive pipeline without writing unsupervised goroutines.

wiring := bus.NewLink("websocket-wiring", uiState, websocketTrigger)

go wiring.Run(ctx)
Behaviour
  • It subscribes to the source provider and forwards each received value to the destination writer.
  • It exits when the context is canceled or the source subscription closes.
  • Writes are synchronous, so backpressure naturally propagates through the link.

Full Example

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// 1. Inputs
	temp := bus.NewSignal("temperature", func(ctx context.Context) (float64, error) {
		return readTemperatureSensor()
	}).WithInterval(10 * time.Millisecond)

	// Filter out sensor noise with Debounce
	cleanTemp := bus.NewDebounce("clean-temp", temp, 200*time.Millisecond)

	// 2. Logic
	needsHeat := bus.ViewFunc[bool](func() bool {
		return cleanTemp.Read() < 20.0
	})

	// 3. Output Control
	safeHeaterCmd := bus.NewThrottle("safe-heater", needsHeat, time.Second)

	// 4. Output Actuator
	heater := bus.NewTrigger("heater", func(ctx context.Context, on bool) error {
		return setHeaterRelay(on)
	})

	// 5. Wiring
	wiring := bus.NewLink("heater-wiring", safeHeaterCmd, heater)

	// 6. Supervision
	supervisor := sup.NewSupervisor("root",
		sup.WithActors(temp, cleanTemp, safeHeaterCmd, heater, wiring),
		sup.WithPolicy(sup.Permanent),
		sup.WithRestartDelay(time.Second),
	)

	supervisor.Run(ctx)
}

Using with a Supervisor

All active types (Signal, Computed, Debounce, Throttle, Link, and Trigger) implement the sup.Actor interface via their Run method, so they can be placed directly under a supervisor. ViewFunc is not supervised because it contains no running goroutines.

supervisor := sup.NewSupervisor("root",
	sup.WithActors(temp, heater, cleanTemp, safeHeaterCmd, wiring),
	sup.WithPolicy(sup.Permanent),
	sup.WithRestartDelay(time.Second),
)

supervisor.Run(ctx)

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Computed added in v0.0.32

type Computed[V any] struct {
	*sup.BaseActor
	// contains filtered or unexported fields
}

Computed is a reactive value that updates itself based on its dependencies. It implements both Subscribable and Notifyable interfaces.

func NewComputed added in v0.0.32

func NewComputed[V any](name string, update func() V, deps ...Watcher) *Computed[V]

NewComputed creates a new Computed with the given name, update function, and dependencies. The update function is called whenever any of the dependencies notify a change, and the result is broadcast to subscribers.

func (*Computed[V]) Read added in v0.0.32

func (c *Computed[V]) Read() V

Read returns the current value of the Computed. It acquires a read lock to ensure thread-safe access to the value.

func (*Computed[V]) Run added in v0.0.32

func (c *Computed[V]) Run(ctx context.Context) error

Run is the main loop for the Computed actor. It subscribes to all dependencies and listens for notifications. Whenever any dependency notifies a change, it calls the update function to compute the new value, updates its internal state, and broadcasts the new value to subscribers. The loop continues until the context is canceled, at which point it cleans up and exits.

func (*Computed[V]) Subscribe added in v0.0.32

func (c *Computed[V]) Subscribe(ctx context.Context) <-chan V

Subscribe returns a channel that receives updates whenever the Computed's value changes. It subscribes to all dependencies and triggers an update whenever any of them notify a change. The current value is sent to the channel immediately upon subscription.

func (*Computed[V]) Watch added in v0.0.32

func (c *Computed[V]) Watch(ctx context.Context) <-chan struct{}

Watch returns a channel that receives notifications whenever any of the dependencies of the Computed change. It subscribes to all dependencies and triggers an update whenever any of them notify a change. The channel will receive a notification immediately upon subscription.

func (*Computed[V]) WithCoalesceWindow added in v0.0.32

func (c *Computed[V]) WithCoalesceWindow(window time.Duration) *Computed[V]

WithCoalesceWindow configures the delay used to batch concurrent dependency updates. Defaults to 5ms, which is typically enough to catch immediately adjacent graph updates.

func (*Computed[V]) WithEqual added in v0.0.32

func (c *Computed[V]) WithEqual(eq func(a, b V) bool) *Computed[V]

WithEqual configures a custom equality function to determine if the computed value has changed. If not set, the Computed will use the default equality check (==) to compare old and new values. This can be useful for complex types where a simple equality check may not be sufficient.

func (*Computed[V]) WithInitialNotify added in v0.0.32

func (c *Computed[V]) WithInitialNotify(enabled bool) *Computed[V]

WithInitialNotify configures whether new subscribers should receive the current value immediately upon subscribing.

func (*Computed[V]) WithSubscriberBuffer added in v0.0.32

func (c *Computed[V]) WithSubscriberBuffer(buffer int) *Computed[V]

WithSubscriberBuffer configures the buffer size for subscriber channels.

type Debounce added in v0.0.32

type Debounce[V any] struct {
	*sup.BaseActor
	// contains filtered or unexported fields
}

Debounce is a reactive value that delays broadcasting updates from its source until the source has stopped changing for a specified wait duration.

func NewDebounce added in v0.0.32

func NewDebounce[V any](name string, src Provider[V], wait time.Duration) *Debounce[V]

NewDebounce creates a new Debounce actor attached to a source provider.

func (*Debounce[V]) Read added in v0.0.32

func (d *Debounce[V]) Read() V

Read returns the currently settled value safely.

func (*Debounce[V]) Run added in v0.0.32

func (d *Debounce[V]) Run(ctx context.Context) error

Run is the main actor loop. It subscribes to the source and manages the sliding window.

func (*Debounce[V]) Subscribe added in v0.0.32

func (d *Debounce[V]) Subscribe(ctx context.Context) <-chan V

Subscribe returns a channel that receives the debounced updates.

func (*Debounce[V]) Watch added in v0.0.32

func (d *Debounce[V]) Watch(ctx context.Context) <-chan struct{}

Watch returns a channel that receives notifications when the value settles.

func (*Debounce[V]) WithInitialNotify added in v0.0.32

func (d *Debounce[V]) WithInitialNotify(enabled bool) *Debounce[V]

WithInitialNotify configures whether new subscribers receive the current value immediately.

func (*Debounce[V]) WithMaxWait added in v0.0.32

func (d *Debounce[V]) WithMaxWait(maxWait time.Duration) *Debounce[V]

WithMaxWait configures a maximum time to wait before forcing an update, preventing infinite starvation if the source is constantly changing.

func (*Debounce[V]) WithSubscriberBuffer added in v0.0.32

func (d *Debounce[V]) WithSubscriberBuffer(buffer int) *Debounce[V]

WithSubscriberBuffer configures the buffer size for subscriber channels.

type Link[V any] struct {
	*sup.BaseActor
	// contains filtered or unexported fields
}

Link is a supervised actor that subscribes to a Provider and writes every value it receives to a Writer. It eliminates the need for manual, unsupervised goroutines when connecting reactive pipelines.

func NewLink[V any](name string, src Provider[V], dest Writer[V]) *Link[V]

NewLink creates a new Link connecting a source to a destination.

func (*Link[V]) Run added in v0.0.32

func (l *Link[V]) Run(ctx context.Context) error

Run is the main actor loop. It forwards values from the source to the destination.

type Provider added in v0.0.32

type Provider[V any] interface {
	sup.Actor
	Reader[V]
	Subscriber[V]
	Watcher
}

Provider represents any reactive actor that emits values. Trigger, Signal, Derived, and Debounce all satisfy this interface.

type Reader added in v0.0.29

type Reader[V any] interface {
	Read() V
}

Reader represents a value that can be read. The Read method returns the current value.

type Signal

type Signal[V any] struct {
	*sup.BaseActor
	// contains filtered or unexported fields
}

Signal represents a value that is periodically updated by a function and can be subscribed to for updates.

func NewSignal

func NewSignal[V any](name string, update func(context.Context) (V, error)) *Signal[V]

NewSignal creates a new Signal with the given name and update function.

func (*Signal[V]) Read added in v0.0.18

func (s *Signal[V]) Read() V

Read returns the current value of the Signal. It acquires a read lock to ensure thread-safe access to the value.

func (*Signal[V]) Run

func (s *Signal[V]) Run(ctx context.Context) error

Run starts the Signal's update loop, which periodically calls the update function to refresh the Signal's value and notifies subscribers of any changes. The loop continues until the provided context is canceled, at which point it will clean up all subscriber channels.

func (*Signal[V]) Subscribe

func (s *Signal[V]) Subscribe(ctx context.Context) <-chan V

Subscribe allows clients to subscribe to updates of the Signal's value. It returns a channel that will receive new values whenever they are updated. The subscription will automatically clean up when the provided context is canceled.

func (*Signal[V]) Watch added in v0.0.29

func (s *Signal[V]) Watch(ctx context.Context) <-chan struct{}

Watch allows clients to subscribe to notifications whenever the Signal's value is updated, without receiving the actual value. It returns a channel that will receive a notification (empty struct) whenever the value is updated. The subscription will automatically clean up when the provided context is canceled.

func (*Signal[V]) WithEqual added in v0.0.32

func (s *Signal[V]) WithEqual(eq func(a, b V) bool) *Signal[V]

WithEqual configures a custom equality function to determine if the Signal's value has changed. If not set, the Signal will use the default equality check (==) to compare old and new values. This can be useful for complex types where a simple equality check may not be sufficient.

func (*Signal[V]) WithInitialNotify

func (s *Signal[V]) WithInitialNotify(enabled bool) *Signal[V]

WithInitialNotify configures whether new subscribers should receive the current value immediately upon subscribing.

func (*Signal[V]) WithInitialValue

func (s *Signal[V]) WithInitialValue(initial V) *Signal[V]

WithInitialValue sets the initial value of the Signal before any updates occur.

func (*Signal[V]) WithInterval

func (s *Signal[V]) WithInterval(interval time.Duration) *Signal[V]

WithInterval sets the interval at which the Signal's update function is called to refresh its value.

func (*Signal[V]) WithSubscriberBuffer

func (s *Signal[V]) WithSubscriberBuffer(buffer int) *Signal[V]

WithSubscriberBuffer configures the buffer size for subscriber channels to prevent blocking on updates.

type Subscriber added in v0.0.29

type Subscriber[V any] interface {
	Subscribe(context.Context) <-chan V
}

Subscriber represents a value that can be subscribed to for updates. The Subscribe method returns a channel that will receive the updated value whenever it changes. The channel will be closed when the context is canceled.

type Throttle added in v0.0.32

type Throttle[V any] struct {
	*sup.BaseActor
	// contains filtered or unexported fields
}

Throttle is a reactive value that limits the rate at which updates from its source are broadcast. It ensures that updates are sent at most once per interval, always emitting the most recent (trailing) value from that interval.

func NewThrottle added in v0.0.32

func NewThrottle[V any](name string, source Provider[V], interval time.Duration) *Throttle[V]

NewThrottle creates a new Throttle actor attached to a source provider.

func (*Throttle[V]) Read added in v0.0.32

func (t *Throttle[V]) Read() V

Read returns the currently settled value safely.

func (*Throttle[V]) Run added in v0.0.32

func (t *Throttle[V]) Run(ctx context.Context) error

Run is the main actor loop. It manages the trailing-edge rate limit.

func (*Throttle[V]) Subscribe added in v0.0.32

func (t *Throttle[V]) Subscribe(ctx context.Context) <-chan V

Subscribe returns a channel that receives the throttled updates.

func (*Throttle[V]) Watch added in v0.0.32

func (t *Throttle[V]) Watch(ctx context.Context) <-chan struct{}

Watch returns a channel that receives notifications when the value updates.

func (*Throttle[V]) WithInitialNotify added in v0.0.32

func (t *Throttle[V]) WithInitialNotify(enabled bool) *Throttle[V]

WithInitialNotify configures whether new subscribers receive the current value immediately.

func (*Throttle[V]) WithSubscriberBuffer added in v0.0.32

func (t *Throttle[V]) WithSubscriberBuffer(buffer int) *Throttle[V]

WithSubscriberBuffer configures the buffer size for subscriber channels.

type Trigger added in v0.0.17

type Trigger[V any] struct {
	*sup.BaseActor
	// contains filtered or unexported fields
}

Trigger represents a value that can be updated by a function and subscribed to for updates.

func NewTrigger added in v0.0.17

func NewTrigger[V any](name string, update func(context.Context, V) error) *Trigger[V]

NewTrigger creates a new Trigger with the given name and update function.

func (*Trigger[V]) Read added in v0.0.18

func (t *Trigger[V]) Read() V

Read returns the current value of the Trigger. It acquires a lock to ensure thread-safe access to the value.

func (*Trigger[V]) Run added in v0.0.17

func (t *Trigger[V]) Run(ctx context.Context) error

Run starts the Trigger's main loop, which waits for the context to be canceled. When the context is canceled, it cleans up all subscriber channels. This method should be run in a separate goroutine.

func (*Trigger[V]) Subscribe added in v0.0.17

func (t *Trigger[V]) Subscribe(ctx context.Context) <-chan V

Subscribe returns a channel that receives updates whenever the Trigger's value changes. If initialNotify is enabled, the current value is sent to the channel immediately upon subscription. It acquires a lock to ensure thread-safe access to the current value when subscribing.

func (*Trigger[V]) Watch added in v0.0.29

func (t *Trigger[V]) Watch(ctx context.Context) <-chan struct{}

Watch allows clients to subscribe to notifications whenever the Trigger's value is updated, without receiving the actual value. It returns a channel that will receive a notification (empty struct) whenever the value is updated. The subscription will automatically clean up when the provided context is canceled.

func (*Trigger[V]) WithInitialNotify added in v0.0.17

func (t *Trigger[V]) WithInitialNotify(enabled bool) *Trigger[V]

WithInitialNotify configures whether new subscribers should receive the current value immediately upon subscribing.

func (*Trigger[V]) WithInitialValue added in v0.0.17

func (t *Trigger[V]) WithInitialValue(initial V) *Trigger[V]

WithInitialValue sets the initial value of the Trigger before any updates occur.

func (*Trigger[V]) WithSubscriberBuffer added in v0.0.17

func (t *Trigger[V]) WithSubscriberBuffer(buffer int) *Trigger[V]

WithSubscriberBuffer configures the buffer size for subscriber channels to prevent blocking on updates.

func (*Trigger[V]) Write added in v0.0.18

func (t *Trigger[V]) Write(ctx context.Context, value V) error

Write updates the Trigger's value by calling the update function with the provided value. If the update is successful, it notifies all subscribers of the new value. It acquires a lock to ensure thread-safe updates to the value.

type ViewFunc added in v0.0.32

type ViewFunc[V any] func() V

ViewFunc is an adapter to allow the use of ordinary functions as Readers. If f is a function with the appropriate signature, ViewFunc(f) is a Reader that calls f.

func (ViewFunc[V]) Read added in v0.0.32

func (f ViewFunc[V]) Read() V

Read calls the underlying function to satisfy the Reader interface.

type Watcher added in v0.0.29

type Watcher interface {
	Watch(ctx context.Context) <-chan struct{}
}

Watcher represents a value that can be watched for changes. The Watch method returns a channel that will receive a notification whenever the value changes. The channel will be closed when the context is canceled.

type Writer added in v0.0.29

type Writer[V any] interface {
	Write(context.Context, V) error
}

Writer represents a value that can be updated by writing to it. The Write method may return an error if the update is rejected.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL