Documentation
¶
Index ¶
- Constants
- type DataBusClient
- func (d *DataBusClient) DeleteProducer(service auth.Service) error
- func (d *DataBusClient) Get(replyQueue string) error
- func (d *DataBusClient) GetGroup(groups chan<- *DataGroup, queue string)
- func (d *DataBusClient) GetProducers() ([]DataProducer, error)
- func (d *DataBusClient) Subscribe(replyQueue string) error
- type DataBusService
- type DataGroup
- type DataProducer
- type DataValue
- type EventValue
Constants ¶
const ( // commands GET = "get" SUBSCRIBE = "subscribe" GETPRODUCERS = "getproducers" DELETEPRODUCER = "deleteproducers" TERMINATE = "terminate" // message types DATAGROUP = "datagroup" )
const ( CommandQueue = "/queue/databus/command" ReplyPrefix = "/queue/databus/reply." )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DataBusClient ¶
type DataBusClient struct {
*service.BaseClient
}
func NewDataBusClient ¶
func NewDataBusClient(bus messagebus.Messagebus, clientName string) *DataBusClient
NewDataBusClient constructs a DataBusClient that issues databus commands via BaseClient helpers and uses ReplyPrefix to generate unique reply queues.
func (*DataBusClient) DeleteProducer ¶
func (d *DataBusClient) DeleteProducer(service auth.Service) error
DeleteProducer asks the databus service to stop forwarding updates for the specified service, typically after a producer disconnects.
func (*DataBusClient) Get ¶
func (d *DataBusClient) Get(replyQueue string) error
Get requests that producers send their current DataGroups, instructing the service to reply to the provided queue.
func (*DataBusClient) GetGroup ¶
func (d *DataBusClient) GetGroup(groups chan<- *DataGroup, queue string)
GetGroup consumes DATAGROUP envelopes from the specified queue, decodes them, and forwards the resulting DataGroups into the caller-provided channel.
func (*DataBusClient) GetProducers ¶
func (d *DataBusClient) GetProducers() ([]DataProducer, error)
GetProducers synchronously requests the list of active data producers.
func (*DataBusClient) Subscribe ¶
func (d *DataBusClient) Subscribe(replyQueue string) error
Subscribe registers the provided queue to receive future DataGroup broadcasts.
type DataBusService ¶
type DataBusService struct {
*service.BaseService
Recievers []string
}
func NewDataBusService ¶
func NewDataBusService(bus messagebus.Messagebus) *DataBusService
NewDataBusService constructs a DataBusService backed by the supplied message bus. The service listens on CommandQueue and keeps track of subscriber reply queues.
func (*DataBusService) HandleSubscribeEnvelope ¶
func (d *DataBusService) HandleSubscribeEnvelope(env service.Envelope)
HandleSubscribeEnvelope records the ReplyTo queue of a SUBSCRIBE command so subsequent broadcasts can reach that client. Empty or duplicate queues are ignored.
func (*DataBusService) ReceiveEnvelopes ¶
func (d *DataBusService) ReceiveEnvelopes(envelopes chan<- service.Envelope) error
ReceiveEnvelopes starts listening on the databus command queue and streams decoded envelopes into the provided channel. SUBSCRIBE envelopes trigger the internal hook before being forwarded downstream.
func (*DataBusService) SendGroup ¶
func (d *DataBusService) SendGroup(group DataGroup)
SendGroup broadcasts a DataGroup to every queue recorded in the receivers list. Failures to deliver to individual queues are logged and do not stop the fan-out.
func (*DataBusService) SendGroupToQueue ¶
func (d *DataBusService) SendGroupToQueue(group DataGroup, queue string) error
SendGroupToQueue sends a single DataGroup to the specified queue using the BaseService SendTo helper.