bus

package
v0.0.26 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2026 License: MIT Imports: 3 Imported by: 0

README

sup/bus

Go Reference Test License

bus provides reactive value propagation for Go programs. It is built on top of sup actors and designed for systems that poll hardware, sensors, or external services, and need to broadcast changes to multiple consumers.

Installation

go get github.com/webermarci/sup/bus

Concepts

Type Direction Use case
Signal Read → broadcast Poll a register, sensor, or API; notify subscribers on change
Mirror Read (Lazy) Transform or combine existing values without extra goroutines
Trigger Write → hardware Accept writes from callers; forward to a handler on success

Both types are actors. They do nothing until Run(ctx) is called.

Signal

A Signal periodically calls a poll function and broadcasts the result to all current subscribers whenever the value changes.

signal := bus.NewSignal("signal", func(ctx context.Context) (uint16, error) {
    return modbusClient.ReadRegister(0x01)
}).
    WithInterval(100 * time.Millisecond).
    WithInitialValue(0).
    WithInitialNotify(true)

go signal.Run(ctx)

ch := signal.Subscribe(ctx)
for v := range ch {
    fmt.Printf("register 0x01 changed: %d\n", v)
}
Options
Option Default Description
WithInterval(d) 1s How often the poll function is called
WithInitialValue(v) zero value Value before the first successful poll
WithInitialNotify(true) false Send the current value immediately to each new subscriber
Behaviour
  • If the poll function returns an error, the value is not updated and subscribers are not notified.
  • Subscribers are notified only when the value changes — repeated identical results are silently dropped.
  • Subscribing with a canceled context is a no-op; the returned channel is closed immediately.
  • Canceling a subscriber's context closes its channel and removes it from the broadcast list.

Mirror

A Mirror provides a lazy, functional transformation of one or more Readables. It does not require a goroutine or a mailbox; it calculates its value on-demand when Read() is called.

tempC := bus.NewSignal(...)

// Simple transformation
tempF := bus.NewMirror(func() float64 {
    return tempC.Read()*9/5 + 32
})

// Complex aggregation
isSafe := bus.NewMirror(func() bool {
    // Capture multiple signals in a closure for type-safe logic
    return tempC.Read() < 100.0 && pressure.Read() < 10.5
})

Trigger

A Trigger accepts writes via Write, calls an update function with the new value, and — on success — updates the stored value and notifies subscribers.

trigger := bus.NewTrigger("trigger", func(ctx context.Context, v uint16) error {
    return modbusClient.WriteRegister(0x02, v)
}).WithInitialValue(0)

go trigger.Run(ctx)

if err := trigger.Write(42); err != nil {
    log.Printf("write rejected: %v", err)
}
Options
Option Default Description
WithInitialValue(v) zero value Value before the first successful write
WithInitialNotify(true) false Send the current value immediately to each new subscriber
Behaviour
  • SetValue is synchronous — it blocks until the update function has returned.
  • If the update function returns an error, the stored value is not updated and subscribers are not notified. The error is returned to the caller.
  • Sync re-runs the update function with the current stored value, useful for forcing hardware to match state after a reconnect.

Full Example

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// 1. Inputs
	temp := bus.NewSignal("temperature", func(ctx context.Context) (float64, error) {
		return readTemperatureSensor()
	}).WithInterval(500 * time.Millisecond)

	// 2. Logic (Mirror)
	// Automatically determine if heating is needed
	needsHeat := bus.NewMirror(func() bool {
		return temp.Read() < 20.0
	})

	// 3. Output
	heater := bus.NewTrigger("heater", func(ctx context.Context, on bool) error {
		return setHeaterRelay(on)
	})

	go temp.Run(ctx)
	go heater.Run(ctx)

	// Using the Mirror in a control loop
	tempCh := temp.Subscribe(ctx)
	go func() {
		for range tempCh {
			// Read the logic from the mirror and write to the trigger
			if err := heater.Write(needsHeat.Read()); err != nil {
				fmt.Printf("heater control failed: %v\n", err)
			}
		}
	}()

	time.Sleep(10 * time.Second)
}

Using with a Supervisor

Both Signal and Trigger implement the sup.Actor interface via their Run method, so they can be placed directly under a supervisor. Mirror is passive and does not need to be supervised.

supervisor := sup.NewSupervisor("root",
	sup.WithActors(temp, heater),
	sup.WithPolicy(sup.Permanent),
	sup.WithRestartDelay(time.Second),
)

supervisor.Run(ctx)

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Mirror added in v0.0.19

type Mirror[V any] struct {
	// contains filtered or unexported fields
}

Mirror is a simple implementation of Readable that returns the value from a provided function. It does not have any internal state and always reflects the current value from the function.

func NewMirror added in v0.0.19

func NewMirror[V any](fn func() V) *Mirror[V]

NewMirror creates a new Mirror with the given function that provides the value to be read.

func (*Mirror[V]) Read added in v0.0.19

func (m *Mirror[V]) Read() V

Read returns the current value from the Mirror by calling the provided function.

type Readable added in v0.0.19

type Readable[V any] interface {
	Read() V
}

Readable represents a value that can be read. The Read method returns the current value.

type Signal

type Signal[V any] struct {
	*sup.BaseActor
	// contains filtered or unexported fields
}

Signal represents a value that is periodically updated by a function and can be subscribed to for updates.

func NewSignal

func NewSignal[V any](name string, update func(context.Context) (V, error)) *Signal[V]

NewSignal creates a new Signal with the given name and update function.

func (*Signal[V]) Read added in v0.0.18

func (s *Signal[V]) Read() V

Read returns the current value of the Signal by sending a call message to its mailbox.

func (*Signal[V]) Run

func (s *Signal[V]) Run(ctx context.Context) error

Run starts the main loop of the Signal, which periodically updates its value by calling the provided function and notifies subscribers of changes. It also handles incoming messages for getting the current value and managing subscriptions.

func (*Signal[V]) Subscribe

func (s *Signal[V]) Subscribe(ctx context.Context) <-chan V

Subscribe allows clients to receive updates whenever the Signal's value changes. It returns a channel that will receive new values.

func (*Signal[V]) WithInitialNotify

func (s *Signal[V]) WithInitialNotify(enabled bool) *Signal[V]

WithInitialNotify configures whether new subscribers should receive the current value immediately upon subscribing.

func (*Signal[V]) WithInitialValue

func (s *Signal[V]) WithInitialValue(initial V) *Signal[V]

WithInitialValue sets the initial value of the Signal before any updates occur.

func (*Signal[V]) WithInterval

func (s *Signal[V]) WithInterval(interval time.Duration) *Signal[V]

WithInterval sets the interval at which the Signal's update function is called to refresh its value.

func (*Signal[V]) WithMailboxSize

func (s *Signal[V]) WithMailboxSize(size int) *Signal[V]

WithMailboxSize allows configuring the mailbox buffer size for the Signal.

func (*Signal[V]) WithSubscriberBuffer

func (s *Signal[V]) WithSubscriberBuffer(buffer int) *Signal[V]

WithSubscriberBuffer configures the buffer size for subscriber channels to prevent blocking on updates.

type Trigger added in v0.0.17

type Trigger[V any] struct {
	*sup.BaseActor
	// contains filtered or unexported fields
}

Trigger represents a value that can be updated by a function and subscribed to for updates.

func NewTrigger added in v0.0.17

func NewTrigger[V any](name string, update func(context.Context, V) error) *Trigger[V]

NewTrigger creates a new Trigger with the given name and update function.

func (*Trigger[V]) Read added in v0.0.18

func (t *Trigger[V]) Read() V

Read retrieves the current value of the Trigger.

func (*Trigger[V]) Run added in v0.0.17

func (t *Trigger[V]) Run(ctx context.Context) error

Run starts the Trigger's main loop, processing incoming messages. It should be run in a separate goroutine and will continue until the context is canceled or the mailbox is closed.

func (*Trigger[V]) Subscribe added in v0.0.17

func (t *Trigger[V]) Subscribe(ctx context.Context) <-chan V

Subscribe returns a channel that receives updates whenever the Trigger's value changes. The subscription is automatically cleaned up when the context is done.

func (*Trigger[V]) Sync added in v0.0.17

func (t *Trigger[V]) Sync() error

Sync forces the Trigger to re-evaluate its current value by calling the update function with the current value. This can be used to trigger updates to subscribers even if the value hasn't changed.

func (*Trigger[V]) WithInitialNotify added in v0.0.17

func (t *Trigger[V]) WithInitialNotify(enabled bool) *Trigger[V]

WithInitialNotify configures whether new subscribers should receive the current value immediately upon subscribing.

func (*Trigger[V]) WithInitialValue added in v0.0.17

func (t *Trigger[V]) WithInitialValue(initial V) *Trigger[V]

WithInitialValue sets the initial value of the Trigger before any updates occur.

func (*Trigger[V]) WithMailboxSize added in v0.0.17

func (t *Trigger[V]) WithMailboxSize(size int) *Trigger[V]

WithMailboxSize allows configuring the mailbox buffer size for the Trigger.

func (*Trigger[V]) WithSubscriberBuffer added in v0.0.17

func (t *Trigger[V]) WithSubscriberBuffer(buffer int) *Trigger[V]

WithSubscriberBuffer configures the buffer size for subscriber channels to prevent blocking on updates.

func (*Trigger[V]) Write added in v0.0.18

func (t *Trigger[V]) Write(value V) error

Write attempts to update the Trigger's value using the provided update function.

type Writable added in v0.0.19

type Writable[V any] interface {
	Write(V) error
}

Writable represents a value that can be updated by writing to it. The Write method may return an error if the update is rejected.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL