memory

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2025 License: Apache-2.0 Imports: 4 Imported by: 0

Documentation

Overview

Package memory provides in-memory implementations of the bus interface.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DefaultBus

type DefaultBus[T any] struct {
	// contains filtered or unexported fields
}

DefaultBus is the default, thread-safe implementation of the Bus interface. It uses channels to deliver messages to subscribers, with each subscriber having its own dedicated goroutine for message processing.

func New

func New[T any]() *DefaultBus[T]

New creates and returns a new instance of DefaultBus, which is the default, thread-safe implementation of the Bus interface. It is initialized with the default publish timeout.

The type parameter T specifies the type of message that the bus will handle.

func (*DefaultBus[T]) Publish

func (b *DefaultBus[T]) Publish(_ context.Context, topic string, msg T) error

Publish sends a message to all handlers subscribed to the specified topic. It sends the message to a channel for each subscriber, where it will be processed by the subscriber's dedicated goroutine.

To prevent a slow subscriber from blocking the publisher indefinitely, this call will time out after a configurable duration if a subscriber's channel is full. If a timeout occurs, the message is dropped for that subscriber, and a warning is logged.

Parameters:

  • topic: The topic to publish the message to.
  • msg: The message to be sent.

func (*DefaultBus[T]) Subscribe

func (b *DefaultBus[T]) Subscribe(_ context.Context, topic string, handler func(T)) (unsubscribe func())

Subscribe registers a handler function for a given topic. It starts a new goroutine for each subscription to process messages from a buffered channel, ensuring that subscribers handle messages independently and do not block each other.

Each subscriber is assigned a unique ID, and its channel is added to the list of subscribers for the given topic.

Parameters:

  • topic: The topic to subscribe to.
  • handler: The function to execute when a message is received.

Returns an `unsubscribe` function that can be called to remove the subscription. When called, it removes the subscriber from the bus and closes its channel, terminating the associated goroutine.

func (*DefaultBus[T]) SubscribeOnce

func (b *DefaultBus[T]) SubscribeOnce(ctx context.Context, topic string, handler func(T)) (unsubscribe func())

SubscribeOnce registers a handler for a topic that will be executed only once. After the handler is invoked for the first time, the subscription is automatically removed.

This is useful for scenarios where a component needs to wait for a specific event to occur once and then stop listening.

Parameters:

  • topic: The topic to subscribe to.
  • handler: The function to execute.

Returns a function that can be used to unsubscribe before the handler is invoked.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL