Documentation
¶
Overview ¶
Package bus contains all things bus.
Example (UsingGalacticChannels) ¶
package main
import (
"encoding/json"
"fmt"
"github.com/pb33f/ranch/bridge"
"github.com/pb33f/ranch/bus"
"github.com/pb33f/ranch/model"
"log"
)
func main() {
// get a pointer to the bus.
b := bus.GetBus()
// get a pointer to the channel manager
cm := b.GetChannelManager()
channel := "my-stream"
cm.CreateChannel(channel)
// create done signal
var done = make(chan bool)
// listen to stream of messages coming in on channel.
h, err := b.ListenStream(channel)
if err != nil {
log.Panicf("unable to listen to channel stream, error: %e", err)
}
count := 0
// listen for five messages and then exit, send a completed signal on channel.
h.Handle(
func(msg *model.Message) {
// unmarshal the payload into a Response object (used by fabric services)
r := &model.Response{}
d := msg.Payload.([]byte)
json.Unmarshal(d, &r)
fmt.Printf("Stream Ticked: %s\n", r.Payload.(string))
count++
if count >= 5 {
done <- true
}
},
func(err error) {
log.Panicf("error received on channel %e", err)
})
// create a broker connector config, in this case, we will connect to the application fabric demo endpoint.
config := &bridge.BrokerConnectorConfig{
Username: "guest",
Password: "guest",
ServerAddr: "appfabric.vmware.com",
WebSocketConfig: &bridge.WebSocketConfig{
WSPath: "/fabric",
},
UseWS: true}
// connect to broker.
c, err := b.ConnectBroker(config)
if err != nil {
log.Panicf("unable to connect to fabric, error: %e", err)
}
// mark our local channel as galactic and map it to our connection and the /topic/simple-stream service
// running on appfabric.vmware.com
err = cm.MarkChannelAsGalactic(channel, "/topic/simple-stream", c)
if err != nil {
log.Panicf("unable to map local channel to broker destination: %e", err)
}
// wait for done signal
<-done
// mark channel as local (unsubscribe from all mappings)
err = cm.MarkChannelAsLocal(channel)
if err != nil {
log.Panicf("unable to unsubscribe, error: %e", err)
}
err = c.Disconnect()
if err != nil {
log.Panicf("unable to disconnect, error: %e", err)
}
}
Index ¶
- Constants
- func EnableLogging(enable bool)
- type BusStore
- type BusTransaction
- type BusTransactionReadyFunction
- type Channel
- func (channel *Channel) ContainsHandlers() bool
- func (channel *Channel) IsGalactic() bool
- func (channel *Channel) IsPrivate() bool
- func (channel *Channel) Send(message *model.Message)
- func (channel *Channel) SetGalactic(mappedDestination string)
- func (channel *Channel) SetLocal()
- func (channel *Channel) SetPrivate(private bool)
- type ChannelManager
- type EndpointConfig
- type EventBus
- type FabricEndpoint
- type MessageErrorFunction
- type MessageHandler
- type MessageHandlerFunction
- type MonitorEvent
- type MonitorEventHandler
- type MonitorEventListenerId
- type MonitorEventType
- type MutationRequest
- type MutationRequestHandlerFunction
- type MutationStoreStream
- type StompSessionEvent
- type StoreChange
- type StoreChangeHandlerFunction
- type StoreManager
- type StoreStream
Examples ¶
Constants ¶
const RANCH_INTERNAL_CHANNEL_PREFIX = "_ranchInternal/"
const (
STOMP_SESSION_NOTIFY_CHANNEL = RANCH_INTERNAL_CHANNEL_PREFIX + "stomp-session-notify"
)
Variables ¶
This section is empty.
Functions ¶
func EnableLogging ¶
func EnableLogging(enable bool)
Types ¶
type BusStore ¶
type BusStore interface {
// Get the name (the id) of the store.
GetName() string
// Add new or updates existing item in the store.
Put(id string, value interface{}, state interface{})
// Returns an item from the store and a boolean flag
// indicating whether the item exists
Get(id string) (interface{}, bool)
// Shorten version of the Get() method, returns only the item value.
GetValue(id string) interface{}
// Remove an item from the store. Returns true if the remove operation was successful.
Remove(id string, state interface{}) bool
// Return a slice containing all store items.
AllValues() []interface{}
// Return a map with all items from the store.
AllValuesAsMap() map[string]interface{}
// Return a map with all items from the store with the current store version.
AllValuesAndVersion() (map[string]interface{}, int64)
// Subscribe to state changes for a specific object.
OnChange(id string, state ...interface{}) StoreStream
// Subscribe to state changes for all objects
OnAllChanges(state ...interface{}) StoreStream
// Notify when the store has been initialize (via populate() or initialize()
WhenReady(readyFunction func())
// Populate the store with a map of items and their ID's.
Populate(items map[string]interface{}) error
// Mark the store as initialized and notify all watchers.
Initialize()
// Subscribe to mutation requests made via mutate() method.
OnMutationRequest(mutationType ...interface{}) MutationStoreStream
// Send a mutation request to any subscribers handling mutations.
Mutate(request interface{}, requestType interface{},
successHandler func(interface{}), errorHandler func(interface{}))
// Removes all items from the store and change its state to uninitialized".
Reset()
// Returns true if this is galactic store.
IsGalactic() bool
// Get the item type if such is specified during the creation of the
// store
GetItemType() reflect.Type
}
BusStore is a stateful in memory cache for objects. All state changes (any time the cache is modified) will broadcast that updated object to any subscribers of the BusStore for those specific objects or all objects of a certain type and state changes.
type BusTransaction ¶
type BusTransaction interface {
// Sends a request to a channel as a part of this transaction.
SendRequest(channel string, payload interface{}) error
// Wait for a store to be initialized as a part of this transaction.
WaitForStoreReady(storeName string) error
// Registers a new complete handler. Once all responses to requests have been received,
// the transaction is complete.
OnComplete(completeHandler BusTransactionReadyFunction) error
// Register a new error handler. If an error is thrown by any of the responders, the transaction
// is aborted and the error sent to the registered errorHandlers.
OnError(errorHandler MessageErrorFunction) error
// Commit the transaction, all requests will be sent and will wait for responses.
// Once all the responses are in, onComplete handlers will be called with the responses.
Commit() error
}
type Channel ¶
type Channel struct {
Name string `json:"string"`
// contains filtered or unexported fields
}
Channel represents the stream and the subscribed event handlers waiting for ticks on the stream
func NewChannel ¶
Create a new Channel with the supplied Channel name. Returns a pointer to that Channel.
func (*Channel) ContainsHandlers ¶
Check if the Channel has any registered subscribers
func (*Channel) IsGalactic ¶
Returns true is the Channel is marked as galactic
func (*Channel) SetGalactic ¶
Mark the Channel as galactic
func (*Channel) SetPrivate ¶
Mark the Channel as private
type ChannelManager ¶
type ChannelManager interface {
CreateChannel(channelName string) *Channel
DestroyChannel(channelName string)
CheckChannelExists(channelName string) bool
GetChannel(channelName string) (*Channel, error)
GetAllChannels() map[string]*Channel
SubscribeChannelHandler(channelName string, fn MessageHandlerFunction, runOnce bool) (*uuid.UUID, error)
UnsubscribeChannelHandler(channelName string, id *uuid.UUID) error
WaitForChannel(channelName string) error
MarkChannelAsGalactic(channelName string, brokerDestination string, connection bridge.Connection) (err error)
MarkChannelAsLocal(channelName string) (err error)
}
ChannelManager interfaces controls all access to channels vis the bus.
func NewBusChannelManager ¶
func NewBusChannelManager(bus EventBus) ChannelManager
type EndpointConfig ¶
type EndpointConfig struct {
// Prefix for public topics e.g. "/topic"
TopicPrefix string
// Prefix for user queues e.g. "/user/queue"
UserQueuePrefix string
// Prefix used for public application requests e.g. "/pub"
AppRequestPrefix string
// Prefix used for "private" application requests e.g. "/pub/queue"
// Requests sent to destinations prefixed with the AppRequestQueuePrefix
// should generate responses sent to single client queue.
// E.g. if a client sends a request to the "/pub/queue/sample-channel" destination
// the application should sent the response only to this client on the
// "/user/queue/sample-channel" destination.
// This behavior will mimic the Spring SimpleMessageBroker implementation.
AppRequestQueuePrefix string
Heartbeat int64
// Custom middleware for broker commands and destinations.
MiddlewareRegistry stompserver.MiddlewareRegistry
}
type EventBus ¶
type EventBus interface {
GetId() *uuid.UUID
GetChannelManager() ChannelManager
SendRequestMessage(channelName string, payload interface{}, destinationId *uuid.UUID) error
SendResponseMessage(channelName string, payload interface{}, destinationId *uuid.UUID) error
SendBroadcastMessage(channelName string, payload interface{}) error
SendErrorMessage(channelName string, err error, destinationId *uuid.UUID) error
ListenStream(channelName string) (MessageHandler, error)
ListenStreamForDestination(channelName string, destinationId *uuid.UUID) (MessageHandler, error)
ListenFirehose(channelName string) (MessageHandler, error)
ListenRequestStream(channelName string) (MessageHandler, error)
ListenRequestStreamForDestination(channelName string, destinationId *uuid.UUID) (MessageHandler, error)
ListenRequestOnce(channelName string) (MessageHandler, error)
ListenRequestOnceForDestination(channelName string, destinationId *uuid.UUID) (MessageHandler, error)
ListenOnce(channelName string) (MessageHandler, error)
ListenOnceForDestination(channelName string, destId *uuid.UUID) (MessageHandler, error)
RequestOnce(channelName string, payload interface{}) (MessageHandler, error)
RequestOnceForDestination(channelName string, payload interface{}, destId *uuid.UUID) (MessageHandler, error)
RequestStream(channelName string, payload interface{}) (MessageHandler, error)
RequestStreamForDestination(channelName string, payload interface{}, destId *uuid.UUID) (MessageHandler, error)
ConnectBroker(config *bridge.BrokerConnectorConfig) (conn bridge.Connection, err error)
StartFabricEndpoint(connectionListener stompserver.RawConnectionListener, config EndpointConfig) error
StopFabricEndpoint() error
GetStompServer() stompserver.StompServer
GetStoreManager() StoreManager
CreateSyncTransaction() BusTransaction
CreateAsyncTransaction() BusTransaction
AddMonitorEventListener(listener MonitorEventHandler, eventTypes ...MonitorEventType) MonitorEventListenerId
RemoveMonitorEventListener(listenerId MonitorEventListenerId)
SendMonitorEvent(evtType MonitorEventType, entityName string, data interface{})
}
EventBus provides access to ChannelManager, simple message sending and simple API calls for handling messaging and error handling over channels on the bus.
func NewEventBusInstance ¶
func NewEventBusInstance() EventBus
type FabricEndpoint ¶
type FabricEndpoint interface {
Start()
Stop()
GetStompServer() stompserver.StompServer
}
type MessageErrorFunction ¶
type MessageErrorFunction func(error)
Signature used for all functions used on bus stream APIs to Handle errors.
type MessageHandler ¶
type MessageHandler interface {
GetId() *uuid.UUID
GetDestinationId() *uuid.UUID
Handle(successHandler MessageHandlerFunction, errorHandler MessageErrorFunction)
Fire() error
Close()
}
MessageHandler provides access to the ID the handler is listening for from all messages It also provides a Handle method that accepts a success and error function as handlers. The Fire method will fire the message queued when using RequestOnce or RequestStream
type MessageHandlerFunction ¶
Signature used for all functions used on bus stream APIs to Handle messages.
type MonitorEvent ¶
type MonitorEvent struct {
// Type of the event
EventType MonitorEventType
// The name of the channel or the store related to this event
EntityName string
// Optional event data
Data interface{}
}
func NewMonitorEvent ¶
func NewMonitorEvent(evtType MonitorEventType, entityName string, data interface{}) *MonitorEvent
Create a new monitor event
type MonitorEventHandler ¶
type MonitorEventHandler func(event *MonitorEvent)
type MonitorEventListenerId ¶
type MonitorEventListenerId int
type MonitorEventType ¶
type MonitorEventType int32
const ( ChannelCreatedEvt MonitorEventType = iota ChannelDestroyedEvt ChannelSubscriberJoinedEvt ChannelSubscriberLeftEvt StoreCreatedEvt StoreDestroyedEvt StoreInitializedEvt BrokerSubscribedEvt BrokerUnsubscribedEvt FabricEndpointSubscribeEvt FabricEndpointUnsubscribeEvt )
type MutationRequest ¶
type MutationRequest struct {
Request interface{}
RequestType interface{}
SuccessHandler func(interface{})
ErrorHandler func(interface{})
}
type MutationRequestHandlerFunction ¶
type MutationRequestHandlerFunction func(mutationReq *MutationRequest)
type MutationStoreStream ¶
type MutationStoreStream interface {
// Subscribe to the mutation requests stream.
Subscribe(handler MutationRequestHandlerFunction) error
// Unsubscribe from the stream.
Unsubscribe() error
}
Interface for subscribing for mutation requests
type StompSessionEvent ¶
type StompSessionEvent struct {
Id string
EventType stompserver.StompSessionEventType
}
type StoreChange ¶
type StoreChange struct {
Id string // the id of the updated item
Value interface{} // the updated value of the item
State interface{} // state associated with this change
IsDeleteChange bool // true if the item was removed from the store
StoreVersion int64 // the store's version when this change was made
}
Describes a single store item change
type StoreChangeHandlerFunction ¶
type StoreChangeHandlerFunction func(change *StoreChange)
type StoreManager ¶
type StoreManager interface {
// Create a new Store, if the store already exists, then it will be returned.
CreateStore(name string) BusStore
// Create a new Store and use the itemType to deserialize item values when handling
// incoming UpdateStoreRequest. If the store already exists, the method will return
// the existing store instance.
CreateStoreWithType(name string, itemType reflect.Type) BusStore
// Get a reference to the existing store. Returns nil if the store doesn't exist.
GetStore(name string) BusStore
// Deletes a store.
DestroyStore(name string) bool
// Configure galactic store sync channel for a given connection.
// Should be called before OpenGalacticStore() and OpenGalacticStoreWithItemType() APIs.
ConfigureStoreSyncChannel(conn bridge.Connection, topicPrefix string, pubPrefix string) error
// Open new galactic store
OpenGalacticStore(name string, conn bridge.Connection) (BusStore, error)
// Open new galactic store and deserialize items from server to itemType
OpenGalacticStoreWithItemType(name string, conn bridge.Connection, itemType reflect.Type) (BusStore, error)
}
StoreManager interface controls all access to BusStores
type StoreStream ¶
type StoreStream interface {
// Subscribe to the store changes stream.
Subscribe(handler StoreChangeHandlerFunction) error
// Unsubscribe from the stream.
Unsubscribe() error
}
Interface for subscribing for store changes