bus

package
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2026 License: BSD-2-Clause Imports: 10 Imported by: 11

Documentation

Overview

Package bus contains all things bus.

Example (CreateBus)

Basic bus use.

package main

import ()

func main() {

}
Example (UsingGalacticChannels)
package main

import (
	"encoding/json"
	"fmt"
	"github.com/pb33f/ranch/bridge"
	"github.com/pb33f/ranch/bus"
	"github.com/pb33f/ranch/model"
	"log"
)

func main() {
	// get a pointer to the bus.
	b := bus.NewEventBus()

	// get a pointer to the channel manager
	cm := b.GetChannelManager()

	channel := "my-stream"
	cm.CreateChannel(channel)

	// create done signal
	var done = make(chan bool)

	// listen to stream of messages coming in on channel.
	h, err := b.ListenStream(channel)

	if err != nil {
		log.Panicf("unable to listen to channel stream, error: %e", err)
	}

	count := 0

	// listen for five messages and then exit, send a completed signal on channel.
	h.Handle(
		func(msg *model.Message) {

			// unmarshal the payload into a Response object (used by fabric services)
			r := &model.Response{}
			d := msg.Payload.([]byte)
			if err := json.Unmarshal(d, &r); err != nil {
				fmt.Printf("Unable to unmarshal response: %s\n", err)
				return
			}
			fmt.Printf("Stream Ticked: %s\n", r.Payload.(string))
			count++
			if count >= 5 {
				done <- true
			}
		},
		func(err error) {
			log.Panicf("error received on channel %e", err)
		})

	// create a broker connector config, in this case, we will connect to the application fabric demo endpoint.
	config := &bridge.BrokerConnectorConfig{
		Username:   "guest",
		Password:   "guest",
		ServerAddr: "appfabric.vmware.com",
		WebSocketConfig: &bridge.WebSocketConfig{
			WSPath: "/fabric",
		},
		UseWS: true}

	// connect to broker.
	c, err := b.ConnectBroker(config)
	if err != nil {
		log.Panicf("unable to connect to fabric, error: %e", err)
	}

	// mark our local channel as galactic and map it to our connection and the /topic/simple-stream service
	// running on appfabric.vmware.com
	err = cm.MarkChannelAsGalactic(channel, "/topic/simple-stream", c)
	if err != nil {
		log.Panicf("unable to map local channel to broker destination: %e", err)
	}

	// wait for done signal
	<-done

	// mark channel as local (unsubscribe from all mappings)
	err = cm.MarkChannelAsLocal(channel)
	if err != nil {
		log.Panicf("unable to unsubscribe, error: %e", err)
	}
	err = c.Disconnect()
	if err != nil {
		log.Panicf("unable to disconnect, error: %e", err)
	}
}

Index

Examples

Constants

View Source
const (
	// ChannelCreatedEvt is emitted when a bus channel is created.
	ChannelCreatedEvt = monitor.ChannelCreatedEvt
	// ChannelDestroyedEvt is emitted when a bus channel is destroyed.
	ChannelDestroyedEvt = monitor.ChannelDestroyedEvt
	// ChannelSubscriberJoinedEvt is emitted when a handler subscribes to a channel.
	ChannelSubscriberJoinedEvt = monitor.ChannelSubscriberJoinedEvt
	// ChannelSubscriberLeftEvt is emitted when a handler unsubscribes from a channel.
	ChannelSubscriberLeftEvt = monitor.ChannelSubscriberLeftEvt
	// BrokerSubscribedEvt is emitted when a broker subscription is created.
	BrokerSubscribedEvt = monitor.BrokerSubscribedEvt
	// BrokerUnsubscribedEvt is emitted when a broker subscription is removed.
	BrokerUnsubscribedEvt = monitor.BrokerUnsubscribedEvt
)
View Source
const RANCH_INTERNAL_CHANNEL_PREFIX = "_ranchInternal/"

RANCH_INTERNAL_CHANNEL_PREFIX prefixes channels reserved for Ranch internals.

Variables

This section is empty.

Functions

This section is empty.

Types

type BrokerControl added in v0.9.0

type BrokerControl interface {
	ConnectBroker(config *bridge.BrokerConnectorConfig) (conn bridge.Connection, err error)
}

BrokerControl exposes broker connection support.

type Channel

type Channel struct {
	Name string `json:"string"`
	// contains filtered or unexported fields
}

Channel represents a named bus stream and the handlers subscribed to it.

func NewChannel

func NewChannel(channelName string) *Channel

NewChannel creates an empty bus channel with the provided name.

func (*Channel) ContainsHandlers

func (channel *Channel) ContainsHandlers() bool

ContainsHandlers reports whether the channel currently has subscribed handlers.

func (*Channel) IsGalactic

func (channel *Channel) IsGalactic() bool

IsGalactic reports whether the channel forwards through a broker destination.

func (*Channel) IsPrivate

func (channel *Channel) IsPrivate() bool

IsPrivate reports whether the channel is marked private.

func (*Channel) Send

func (channel *Channel) Send(message *model.Message)

Send dispatches a message to the channel using a background context.

func (*Channel) SendContext added in v0.9.0

func (channel *Channel) SendContext(ctx context.Context, message *model.Message)

SendContext dispatches a message to subscribed handlers unless ctx is already canceled.

func (*Channel) SetGalactic

func (channel *Channel) SetGalactic(mappedDestination string)

SetGalactic marks the channel as backed by a broker destination.

func (*Channel) SetLocal

func (channel *Channel) SetLocal()

SetLocal marks the channel as local-only.

func (*Channel) SetPrivate

func (channel *Channel) SetPrivate(private bool)

SetPrivate marks the channel as private or public.

type ChannelControl added in v0.9.0

type ChannelControl interface {
	GetChannelManager() ChannelManager
}

ChannelControl exposes direct access to event bus channels.

type ChannelManager

type ChannelManager interface {
	CreateChannel(channelName string) *Channel
	DestroyChannel(channelName string)
	CheckChannelExists(channelName string) bool
	GetChannel(channelName string) (*Channel, error)
	GetAllChannels() map[string]*Channel
	// SubscribeChannelHandler registers fn on a channel. If runOnce is true,
	// the channel removes the handler after the next message sent to that channel.
	SubscribeChannelHandler(channelName string, fn MessageHandlerFunction, runOnce bool) (*uuid.UUID, error)
	// SubscribeChannelHandlerContext registers fn on a channel. If runOnce is true,
	// the channel removes the handler after the next message sent to that channel.
	SubscribeChannelHandlerContext(channelName string, fn MessageHandlerContextFunction, runOnce bool) (*uuid.UUID, error)
	UnsubscribeChannelHandler(channelName string, id *uuid.UUID) error
	WaitForChannel(channelName string) error
	MarkChannelAsGalactic(channelName string, brokerDestination string, connection bridge.Connection) (err error)
	MarkChannelAsLocal(channelName string) (err error)
}

ChannelManager owns channel lifecycle and subscription access for an EventBus.

func NewBusChannelManager

func NewBusChannelManager(bus EventBus) ChannelManager

NewBusChannelManager creates the channel manager used by an EventBus.

type EventBus

EventBus combines publishing, subscription, channel, broker, and monitor APIs.

func NewEventBus added in v0.9.0

func NewEventBus() EventBus

NewEventBus creates an EventBus using the default logger.

func NewEventBusWithLogger added in v0.9.0

func NewEventBusWithLogger(logger *slog.Logger) EventBus

NewEventBusWithLogger creates an EventBus using logger, or slog.Default when logger is nil.

type MessageErrorContextFunction added in v0.9.0

type MessageErrorContextFunction func(context.Context, error)

MessageErrorContextFunction handles asynchronous bus errors with the dispatch context.

type MessageErrorFunction

type MessageErrorFunction func(error)

MessageErrorFunction handles asynchronous bus errors.

type MessageHandler

type MessageHandler interface {
	GetId() *uuid.UUID
	GetDestinationId() *uuid.UUID
	Handle(successHandler MessageHandlerFunction, errorHandler MessageErrorFunction)
	HandleContext(
		ctx context.Context, successHandler MessageHandlerContextFunction, errorHandler MessageErrorContextFunction)
	Fire() error
	FireContext(ctx context.Context) error
	Close()
}

MessageHandler provides access to the ID the handler is listening for from all messages It also provides a Handle method that accepts a success and error function as handlers. The Fire method will fire the message queued when using RequestOnce or RequestStream

type MessageHandlerContextFunction added in v0.9.0

type MessageHandlerContextFunction func(context.Context, *model.Message)

MessageHandlerContextFunction handles a delivered bus message with the dispatch context.

type MessageHandlerFunction

type MessageHandlerFunction func(*model.Message)

MessageHandlerFunction handles a delivered bus message.

type MonitorControl added in v0.9.0

type MonitorControl interface {
	AddMonitorEventListener(listener MonitorEventHandler, eventTypes ...MonitorEventType) MonitorEventListenerId
	RemoveMonitorEventListener(listenerId MonitorEventListenerId)
	SendMonitorEvent(evtType MonitorEventType, entityName string, data any)
}

MonitorControl exposes monitor event subscription and publishing.

type MonitorEvent

type MonitorEvent = monitor.Event

MonitorEvent describes a lifecycle event published by bus components.

func NewMonitorEvent

func NewMonitorEvent(evtType MonitorEventType, entityName string, data any) *MonitorEvent

NewMonitorEvent creates a monitor event for an entity and optional data payload.

type MonitorEventHandler

type MonitorEventHandler = monitor.Handler

MonitorEventHandler receives monitor events.

type MonitorEventListenerId

type MonitorEventListenerId int

MonitorEventListenerId identifies a registered monitor event listener.

type MonitorEventType

type MonitorEventType = monitor.EventType

MonitorEventType names a bus monitor event.

type Publisher added in v0.9.0

type Publisher interface {
	SendRequestMessage(channelName string, payload any, destinationId *uuid.UUID) error
	SendRequestMessageContext(
		ctx context.Context, channelName string, payload any, destinationId *uuid.UUID) error
	SendResponseMessage(channelName string, payload any, destinationId *uuid.UUID) error
	SendResponseMessageContext(
		ctx context.Context, channelName string, payload any, destinationId *uuid.UUID) error
	SendBroadcastMessage(channelName string, payload any) error
	SendBroadcastMessageContext(ctx context.Context, channelName string, payload any) error
	SendErrorMessage(channelName string, err error, destinationId *uuid.UUID) error
	SendErrorMessageContext(ctx context.Context, channelName string, err error, destinationId *uuid.UUID) error
}

Publisher contains the outbound message APIs exposed by an EventBus.

type Subscriber added in v0.9.0

type Subscriber interface {
	ListenStream(channelName string) (MessageHandler, error)
	ListenStreamForDestination(channelName string, destinationId *uuid.UUID) (MessageHandler, error)
	ListenFirehose(channelName string) (MessageHandler, error)
	ListenRequestStream(channelName string) (MessageHandler, error)
	ListenRequestStreamForDestination(channelName string, destinationId *uuid.UUID) (MessageHandler, error)
	ListenRequestOnce(channelName string) (MessageHandler, error)
	ListenRequestOnceForDestination(channelName string, destinationId *uuid.UUID) (MessageHandler, error)
	ListenOnce(channelName string) (MessageHandler, error)
	ListenOnceForDestination(channelName string, destId *uuid.UUID) (MessageHandler, error)
	RequestOnce(channelName string, payload any) (MessageHandler, error)
	RequestOnceForDestination(channelName string, payload any, destId *uuid.UUID) (MessageHandler, error)
	RequestStream(channelName string, payload any) (MessageHandler, error)
	RequestStreamForDestination(channelName string, payload any, destId *uuid.UUID) (MessageHandler, error)
}

Subscriber contains the inbound subscription APIs exposed by an EventBus.

Directories

Path Synopsis
Package tx provides transactional request orchestration on top of the event bus.
Package tx provides transactional request orchestration on top of the event bus.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL