eventbus

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 27, 2020 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Consume

func Consume(items [][]byte, w io.WriteCloser) bool

Consume an item by writing it to the specified WriteCloser. This is used in the StreamListener creation

func CreateGossipStreamer

func CreateGossipStreamer() (*EventBus, *GossipStreamer)

CreateGossipStreamer sets up and event bus, subscribes a SimpleStreamer to the gossip topic, and sets the right preprocessors up for the gossip topic.

Types

type Broker

type Broker interface {
	Subscriber
	Publisher
}

Broker is an Publisher and an Subscriber

type CallbackListener

type CallbackListener struct {
	// contains filtered or unexported fields
}

CallbackListener subscribes using callbacks

func (*CallbackListener) Close

func (c *CallbackListener) Close()

Close as part of the Listener method

func (*CallbackListener) Notify

func (c *CallbackListener) Notify(m message.Message) error

Notify the copy of a message as a parameter to a callback

type ChanListener

type ChanListener struct {
	// contains filtered or unexported fields
}

ChanListener dispatches a message using a channel

func (*ChanListener) Close

func (c *ChanListener) Close()

Close has no effect

func (*ChanListener) Notify

func (c *ChanListener) Notify(m message.Message) error

Notify sends a message to the internal dispatcher channel

type Collector

type Collector struct {
	// contains filtered or unexported fields
}

Collector is a very stupid implementation of the wire.EventCollector interface in case no function would be supplied, it would use a channel to publish the collected packets

func NewSimpleCollector

func NewSimpleCollector(rChan chan message.Message, f func(message.Message) error) *Collector

NewSimpleCollector is a simple wrapper around a callback that redirects collected buffers into a channel

func (*Collector) Collect

func (m *Collector) Collect(b message.Message) error

Collect redirects a buffer copy to a channel

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

EventBus - box for listeners and callbacks.

func CreateFrameStreamer

func CreateFrameStreamer(topic topics.Topic) (*EventBus, io.WriteCloser)

CreateFrameStreamer sets up and event bus, subscribes a SimpleStreamer to the gossip topic, and sets the right preprocessors up for the gossip topic.

func New

func New() *EventBus

New returns new EventBus with empty listeners.

func (*EventBus) AddDefaultTopic

func (bus *EventBus) AddDefaultTopic(topic topics.Topic)

AddDefaultTopic adds a topic to the default multiListener

func (*EventBus) Publish

func (bus *EventBus) Publish(topic topics.Topic, m message.Message)

Publish executes callback defined for a topic. topic is explicitly set as it might be different from the message Category (i.e. in the Gossip case) Publishing is a fire and forget. If there is no listener for a topic, the messages are lost

func (*EventBus) Subscribe

func (bus *EventBus) Subscribe(topic topics.Topic, listener Listener) uint32

Subscribe subscribes to a topic with a channel.

func (*EventBus) SubscribeDefault

func (bus *EventBus) SubscribeDefault(listener Listener) uint32

SubscribeDefault subscribes a Listener to the default multiListener. This is normally useful for implementing a sub-dispatching mechanism (i.e. bus of busses architecture)

func (*EventBus) Unsubscribe

func (bus *EventBus) Unsubscribe(topic topics.Topic, id uint32)

Unsubscribe removes all listeners defined for a topic.

type GossipStreamer

type GossipStreamer struct {
	*SimpleStreamer
}

GossipStreamer is a SimpleStreamer which removes the checksum and the topic when reading. It is supposed to be used when testing data that needs to be streamed over the network

func NewGossipStreamer

func NewGossipStreamer(magic protocol.Magic) *GossipStreamer

NewGossipStreamer creates a new GossipStreamer instance

func (*GossipStreamer) Read

func (ms *GossipStreamer) Read() ([]byte, error)

Read the stream

func (*GossipStreamer) SeenTopics

func (ms *GossipStreamer) SeenTopics() []topics.Topic

SeenTopics returns a slice of all the topics the SimpleStreamer has found in its stream so far.

type Listener

type Listener interface {
	// Notify a listener of a new message
	Notify(message.Message) error
	// Close the listener
	Close()
}

Listener publishes a byte array that subscribers of the EventBus can use

func NewCallbackListener

func NewCallbackListener(callback func(message.Message) error) Listener

NewCallbackListener creates a callback based dispatcher

func NewChanListener

func NewChanListener(msgChan chan<- message.Message) Listener

NewChanListener creates a channel based dispatcher

func NewStreamListener

func NewStreamListener(w io.WriteCloser) Listener

NewStreamListener creates a new StreamListener

type Multicaster

type Multicaster interface {
	AddDefaultTopic(topics.Topic)
	SubscribeDefault(Listener) uint32
}

Multicaster allows for a single Listener to listen to multiple topics

type Publisher

type Publisher interface {
	Publish(topics.Topic, message.Message)
}

Publisher publishes serialized messages on a specific topic

type SimpleStreamer

type SimpleStreamer struct {
	*bufio.Reader
	*bufio.Writer
	// contains filtered or unexported fields
}

SimpleStreamer is a test helper which can capture information that gets gossiped by the node. It can read from the gossip stream, and stores the topics that it has seen.

func NewSimpleStreamer

func NewSimpleStreamer(magic protocol.Magic) *SimpleStreamer

NewSimpleStreamer returns an initialized SimpleStreamer.

func (*SimpleStreamer) Close

func (ms *SimpleStreamer) Close() error

Close implements io.WriteCloser.

func (*SimpleStreamer) Read

func (ms *SimpleStreamer) Read() ([]byte, error)

func (*SimpleStreamer) Write

func (ms *SimpleStreamer) Write(p []byte) (n int, err error)

Write receives the packets from the ringbuffer and writes it on the internal pipe immediatelyh

type StreamListener

type StreamListener struct {
	// contains filtered or unexported fields
}

StreamListener uses a ring buffer to dispatch messages

func (*StreamListener) Close

func (s *StreamListener) Close()

Close the internal ringbuffer

func (*StreamListener) Notify

func (s *StreamListener) Notify(m message.Message) error

Notify puts a message to the Listener's ringbuffer

type StupidStreamer added in v0.3.0

type StupidStreamer struct {
	*bufio.Reader
	*bufio.Writer
}

StupidStreamer is a streamer meant for using when testing internal forwarding of binary packets through the ring buffer. It does *not* add magic, frames or other wraps on the forwarded packet

func NewStupidStreamer added in v0.3.0

func NewStupidStreamer() *StupidStreamer

NewStupidStreamer returns an initialized SimpleStreamer.

func (*StupidStreamer) Close added in v0.3.0

func (sss *StupidStreamer) Close() error

Close implements io.WriteCloser.

func (*StupidStreamer) Read added in v0.3.0

func (sss *StupidStreamer) Read() ([]byte, error)

func (*StupidStreamer) Write added in v0.3.0

func (sss *StupidStreamer) Write(p []byte) (n int, err error)

Write receives the packets from the ringbuffer and writes it on the internal pipe immediatelyh

type Subscriber

type Subscriber interface {
	Subscribe(topics.Topic, Listener) uint32
	Unsubscribe(topics.Topic, uint32)
}

Subscriber subscribes a channel to Event notifications on a specific topic

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL