Documentation
¶
Overview ¶
Package bus contains all things bus.
Example (UsingGalacticChannels) ¶
package main
import (
"encoding/json"
"fmt"
"github.com/pb33f/ranch/bridge"
"github.com/pb33f/ranch/bus"
"github.com/pb33f/ranch/model"
"log"
)
func main() {
// get a pointer to the bus.
b := bus.NewEventBus()
// get a pointer to the channel manager
cm := b.GetChannelManager()
channel := "my-stream"
cm.CreateChannel(channel)
// create done signal
var done = make(chan bool)
// listen to stream of messages coming in on channel.
h, err := b.ListenStream(channel)
if err != nil {
log.Panicf("unable to listen to channel stream, error: %e", err)
}
count := 0
// listen for five messages and then exit, send a completed signal on channel.
h.Handle(
func(msg *model.Message) {
// unmarshal the payload into a Response object (used by fabric services)
r := &model.Response{}
d := msg.Payload.([]byte)
if err := json.Unmarshal(d, &r); err != nil {
fmt.Printf("Unable to unmarshal response: %s\n", err)
return
}
fmt.Printf("Stream Ticked: %s\n", r.Payload.(string))
count++
if count >= 5 {
done <- true
}
},
func(err error) {
log.Panicf("error received on channel %e", err)
})
// create a broker connector config, in this case, we will connect to the application fabric demo endpoint.
config := &bridge.BrokerConnectorConfig{
Username: "guest",
Password: "guest",
ServerAddr: "appfabric.vmware.com",
WebSocketConfig: &bridge.WebSocketConfig{
WSPath: "/fabric",
},
UseWS: true}
// connect to broker.
c, err := b.ConnectBroker(config)
if err != nil {
log.Panicf("unable to connect to fabric, error: %e", err)
}
// mark our local channel as galactic and map it to our connection and the /topic/simple-stream service
// running on appfabric.vmware.com
err = cm.MarkChannelAsGalactic(channel, "/topic/simple-stream", c)
if err != nil {
log.Panicf("unable to map local channel to broker destination: %e", err)
}
// wait for done signal
<-done
// mark channel as local (unsubscribe from all mappings)
err = cm.MarkChannelAsLocal(channel)
if err != nil {
log.Panicf("unable to unsubscribe, error: %e", err)
}
err = c.Disconnect()
if err != nil {
log.Panicf("unable to disconnect, error: %e", err)
}
}
Output:
Index ¶
- Constants
- type BrokerControl
- type Channel
- func (channel *Channel) ContainsHandlers() bool
- func (channel *Channel) IsGalactic() bool
- func (channel *Channel) IsPrivate() bool
- func (channel *Channel) Send(message *model.Message)
- func (channel *Channel) SendContext(ctx context.Context, message *model.Message)
- func (channel *Channel) SetGalactic(mappedDestination string)
- func (channel *Channel) SetLocal()
- func (channel *Channel) SetPrivate(private bool)
- type ChannelControl
- type ChannelManager
- type EventBus
- type MessageErrorContextFunction
- type MessageErrorFunction
- type MessageHandler
- type MessageHandlerContextFunction
- type MessageHandlerFunction
- type MonitorControl
- type MonitorEvent
- type MonitorEventHandler
- type MonitorEventListenerId
- type MonitorEventType
- type Publisher
- type Subscriber
Examples ¶
Constants ¶
const ( // ChannelCreatedEvt is emitted when a bus channel is created. ChannelCreatedEvt = monitor.ChannelCreatedEvt // ChannelDestroyedEvt is emitted when a bus channel is destroyed. ChannelDestroyedEvt = monitor.ChannelDestroyedEvt // ChannelSubscriberJoinedEvt is emitted when a handler subscribes to a channel. ChannelSubscriberJoinedEvt = monitor.ChannelSubscriberJoinedEvt // ChannelSubscriberLeftEvt is emitted when a handler unsubscribes from a channel. ChannelSubscriberLeftEvt = monitor.ChannelSubscriberLeftEvt // BrokerSubscribedEvt is emitted when a broker subscription is created. BrokerSubscribedEvt = monitor.BrokerSubscribedEvt // BrokerUnsubscribedEvt is emitted when a broker subscription is removed. BrokerUnsubscribedEvt = monitor.BrokerUnsubscribedEvt )
const RANCH_INTERNAL_CHANNEL_PREFIX = "_ranchInternal/"
RANCH_INTERNAL_CHANNEL_PREFIX prefixes channels reserved for Ranch internals.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BrokerControl ¶ added in v0.9.0
type BrokerControl interface {
ConnectBroker(config *bridge.BrokerConnectorConfig) (conn bridge.Connection, err error)
}
BrokerControl exposes broker connection support.
type Channel ¶
type Channel struct {
Name string `json:"string"`
// contains filtered or unexported fields
}
Channel represents a named bus stream and the handlers subscribed to it.
func NewChannel ¶
NewChannel creates an empty bus channel with the provided name.
func (*Channel) ContainsHandlers ¶
ContainsHandlers reports whether the channel currently has subscribed handlers.
func (*Channel) IsGalactic ¶
IsGalactic reports whether the channel forwards through a broker destination.
func (*Channel) SendContext ¶ added in v0.9.0
SendContext dispatches a message to subscribed handlers unless ctx is already canceled.
func (*Channel) SetGalactic ¶
SetGalactic marks the channel as backed by a broker destination.
func (*Channel) SetLocal ¶
func (channel *Channel) SetLocal()
SetLocal marks the channel as local-only.
func (*Channel) SetPrivate ¶
SetPrivate marks the channel as private or public.
type ChannelControl ¶ added in v0.9.0
type ChannelControl interface {
GetChannelManager() ChannelManager
}
ChannelControl exposes direct access to event bus channels.
type ChannelManager ¶
type ChannelManager interface {
CreateChannel(channelName string) *Channel
DestroyChannel(channelName string)
CheckChannelExists(channelName string) bool
GetChannel(channelName string) (*Channel, error)
GetAllChannels() map[string]*Channel
// SubscribeChannelHandler registers fn on a channel. If runOnce is true,
// the channel removes the handler after the next message sent to that channel.
SubscribeChannelHandler(channelName string, fn MessageHandlerFunction, runOnce bool) (*uuid.UUID, error)
// SubscribeChannelHandlerContext registers fn on a channel. If runOnce is true,
// the channel removes the handler after the next message sent to that channel.
SubscribeChannelHandlerContext(channelName string, fn MessageHandlerContextFunction, runOnce bool) (*uuid.UUID, error)
UnsubscribeChannelHandler(channelName string, id *uuid.UUID) error
WaitForChannel(channelName string) error
MarkChannelAsGalactic(channelName string, brokerDestination string, connection bridge.Connection) (err error)
MarkChannelAsLocal(channelName string) (err error)
}
ChannelManager owns channel lifecycle and subscription access for an EventBus.
func NewBusChannelManager ¶
func NewBusChannelManager(bus EventBus) ChannelManager
NewBusChannelManager creates the channel manager used by an EventBus.
type EventBus ¶
type EventBus interface {
Publisher
Subscriber
ChannelControl
BrokerControl
MonitorControl
GetId() *uuid.UUID
}
EventBus combines publishing, subscription, channel, broker, and monitor APIs.
func NewEventBus ¶ added in v0.9.0
func NewEventBus() EventBus
NewEventBus creates an EventBus using the default logger.
func NewEventBusWithLogger ¶ added in v0.9.0
NewEventBusWithLogger creates an EventBus using logger, or slog.Default when logger is nil.
type MessageErrorContextFunction ¶ added in v0.9.0
MessageErrorContextFunction handles asynchronous bus errors with the dispatch context.
type MessageErrorFunction ¶
type MessageErrorFunction func(error)
MessageErrorFunction handles asynchronous bus errors.
type MessageHandler ¶
type MessageHandler interface {
GetId() *uuid.UUID
GetDestinationId() *uuid.UUID
Handle(successHandler MessageHandlerFunction, errorHandler MessageErrorFunction)
HandleContext(
ctx context.Context, successHandler MessageHandlerContextFunction, errorHandler MessageErrorContextFunction)
Fire() error
FireContext(ctx context.Context) error
Close()
}
MessageHandler provides access to the ID the handler is listening for from all messages It also provides a Handle method that accepts a success and error function as handlers. The Fire method will fire the message queued when using RequestOnce or RequestStream
type MessageHandlerContextFunction ¶ added in v0.9.0
MessageHandlerContextFunction handles a delivered bus message with the dispatch context.
type MessageHandlerFunction ¶
MessageHandlerFunction handles a delivered bus message.
type MonitorControl ¶ added in v0.9.0
type MonitorControl interface {
AddMonitorEventListener(listener MonitorEventHandler, eventTypes ...MonitorEventType) MonitorEventListenerId
RemoveMonitorEventListener(listenerId MonitorEventListenerId)
SendMonitorEvent(evtType MonitorEventType, entityName string, data any)
}
MonitorControl exposes monitor event subscription and publishing.
type MonitorEvent ¶
MonitorEvent describes a lifecycle event published by bus components.
func NewMonitorEvent ¶
func NewMonitorEvent(evtType MonitorEventType, entityName string, data any) *MonitorEvent
NewMonitorEvent creates a monitor event for an entity and optional data payload.
type MonitorEventHandler ¶
MonitorEventHandler receives monitor events.
type MonitorEventListenerId ¶
type MonitorEventListenerId int
MonitorEventListenerId identifies a registered monitor event listener.
type MonitorEventType ¶
MonitorEventType names a bus monitor event.
type Publisher ¶ added in v0.9.0
type Publisher interface {
SendRequestMessage(channelName string, payload any, destinationId *uuid.UUID) error
SendRequestMessageContext(
ctx context.Context, channelName string, payload any, destinationId *uuid.UUID) error
SendResponseMessage(channelName string, payload any, destinationId *uuid.UUID) error
SendResponseMessageContext(
ctx context.Context, channelName string, payload any, destinationId *uuid.UUID) error
SendBroadcastMessage(channelName string, payload any) error
SendBroadcastMessageContext(ctx context.Context, channelName string, payload any) error
SendErrorMessage(channelName string, err error, destinationId *uuid.UUID) error
SendErrorMessageContext(ctx context.Context, channelName string, err error, destinationId *uuid.UUID) error
}
Publisher contains the outbound message APIs exposed by an EventBus.
type Subscriber ¶ added in v0.9.0
type Subscriber interface {
ListenStream(channelName string) (MessageHandler, error)
ListenStreamForDestination(channelName string, destinationId *uuid.UUID) (MessageHandler, error)
ListenFirehose(channelName string) (MessageHandler, error)
ListenRequestStream(channelName string) (MessageHandler, error)
ListenRequestStreamForDestination(channelName string, destinationId *uuid.UUID) (MessageHandler, error)
ListenRequestOnce(channelName string) (MessageHandler, error)
ListenRequestOnceForDestination(channelName string, destinationId *uuid.UUID) (MessageHandler, error)
ListenOnce(channelName string) (MessageHandler, error)
ListenOnceForDestination(channelName string, destId *uuid.UUID) (MessageHandler, error)
RequestOnce(channelName string, payload any) (MessageHandler, error)
RequestOnceForDestination(channelName string, payload any, destId *uuid.UUID) (MessageHandler, error)
RequestStream(channelName string, payload any) (MessageHandler, error)
RequestStreamForDestination(channelName string, payload any, destId *uuid.UUID) (MessageHandler, error)
}
Subscriber contains the inbound subscription APIs exposed by an EventBus.