Documentation
¶
Index ¶
- Variables
- func GetFqn(obj interface{}) string
- func GetTypeFQN(t reflect.Type) string
- type AMQPOutbox
- type Builder
- type Bus
- type BusConfiguration
- type BusMessage
- type BusSwitch
- type DeadLetterMessageHandler
- type Deadlettering
- type DefaultBus
- func (b *DefaultBus) GetHealth() HealthCard
- func (b *DefaultBus) HandleDeadletter(handler DeadLetterMessageHandler)
- func (b *DefaultBus) HandleEvent(exchange, topic string, event Message, handler MessageHandler) error
- func (b *DefaultBus) HandleMessage(message Message, handler MessageHandler) error
- func (b *DefaultBus) Log() logrus.FieldLogger
- func (b *DefaultBus) NotifyHealth(health chan error)
- func (b *DefaultBus) Publish(ctx context.Context, exchange, topic string, message *BusMessage, ...) error
- func (b *DefaultBus) RPC(ctx context.Context, service string, request, reply *BusMessage, ...) (*BusMessage, error)
- func (b *DefaultBus) RegisterSaga(saga Saga, conf ...SagaConfFn) error
- func (b *DefaultBus) ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error
- func (b *DefaultBus) Send(ctx context.Context, toService string, message *BusMessage, ...) error
- func (b *DefaultBus) Shutdown() (shutdwonErr error)
- func (b *DefaultBus) Start() error
- type DeliveryInfo
- type Glogged
- type HandlerRegister
- type Health
- type HealthCard
- type Invocation
- type Logged
- type Message
- type MessageFilter
- type MessageHandler
- type MessagePolicy
- type Messaging
- type Registration
- type RequestSagaTimeout
- type Safety
- type Saga
- type SagaConfFn
- type SagaGlue
- type SagaRegister
- type SagaTimeoutMessage
- type Semantics
- type Serializer
- type TimeoutManager
- type TxOutbox
- type TxProvider
Constants ¶
This section is empty.
Variables ¶
var ( //MaxRetryCount defines the max times a retry can run. //Default is 3 but it is configurable MaxRetryCount uint = 3 //BaseRetryDuration defines the basic milliseconds that the retry algorithm uses //for a random retry time. Default is 10 but it is configurable. BaseRetryDuration = 10 * time.Millisecond //RpcHeaderName used to define the header in grabbit for RPC RpcHeaderName = "x-grabbit-msg-rpc-id" )
Functions ¶
func GetFqn ¶
func GetFqn(obj interface{}) string
GetFqn gets the "fully qualified name" of an interface. meaning the package path + typename
func GetTypeFQN ¶
GetTypeFQN gets the "fully qualified name" of a type. meaning the package path + typename
Types ¶
type AMQPOutbox ¶
type AMQPOutbox struct {
SvcName string
// contains filtered or unexported fields
}
AMQPOutbox sends messages to the amqp transport
func (*AMQPOutbox) NotifyConfirm ¶
func (out *AMQPOutbox) NotifyConfirm(ack, nack chan uint64)
NotifyConfirm send an amqp notification
func (*AMQPOutbox) Post ¶
func (out *AMQPOutbox) Post(exchange, routingKey string, amqpMessage amqp.Publishing) (uint64, error)
Post implements Outbox.Send
func (*AMQPOutbox) Shutdown ¶ added in v1.1.0
func (out *AMQPOutbox) Shutdown()
Shutdown stops the outbox
type Builder ¶
type Builder interface {
PurgeOnStartUp() Builder
WithDeadlettering(deadletterExchange string) Builder
/*
Txnl sets the bus to be transactional using a persisted saga store
provider: mysql for mysql database
connStr: connection string in the format of the passed in provider
*/
Txnl(provider, connStr string) Builder
//WithSerializer provides the ability to plugin custom serializers
WithSerializer(serializer Serializer) Builder
/*
WorkerNum sets the number of worker go routines consuming messages from the queue
The default value if this option is not set is 1
*/
WorkerNum(workers uint, prefetchCount uint) Builder
/*
WithConfirms enables publisher confirms
*/
WithConfirms() Builder
//WithPolicies defines the default policies that are applied for evey outgoing amqp messge
WithPolicies(policies ...MessagePolicy) Builder
//ConfigureHealthCheck defines the default timeout in seconds for the db ping check
ConfigureHealthCheck(timeoutInSeconds time.Duration) Builder
//RetriesNum defines the number of retries upon error
WithConfiguration(config BusConfiguration) Builder
//Build the bus
Build(svcName string) Bus
//WithLogger set custom logger instance
WithLogger(logger logrus.FieldLogger) Builder
}
Builder is the main interface that should be used to create an instance of a Bus
type Bus ¶
type Bus interface {
HandlerRegister
Deadlettering
BusSwitch
Messaging
SagaRegister
Health
Logged
}
Bus interface provides the majority of functionality to Send, Reply and Publish messages to the Bus
type BusConfiguration ¶ added in v1.0.2
BusConfiguration provides configuration passed to the bus builder
type BusMessage ¶
type BusMessage struct {
ID string
CorrelationID string
SagaID string
SagaCorrelationID string
Semantics Semantics /*cmd or evt*/
Payload Message
PayloadFQN string
RPCID string
}
BusMessage the structure that gets sent to the underlying transport
func NewBusMessage ¶
func NewBusMessage(payload Message) *BusMessage
NewBusMessage factory method for creating a BusMessage that wraps the given payload
func NewFromAMQPHeaders ¶
func NewFromAMQPHeaders(headers amqp.Table) *BusMessage
NewFromAMQPHeaders creates a BusMessage from headers of an amqp message
func (*BusMessage) GetAMQPHeaders ¶
func (bm *BusMessage) GetAMQPHeaders() (headers amqp.Table)
GetAMQPHeaders convert to AMQP headers Table everything but a payload
func (*BusMessage) GetTraceLog ¶
func (bm *BusMessage) GetTraceLog() (fields []log.Field)
GetTraceLog returns an array of log entires containing all of the message properties
func (*BusMessage) SetFromAMQPHeaders ¶
func (bm *BusMessage) SetFromAMQPHeaders(headers amqp.Table)
SetFromAMQPHeaders convert from AMQP headers Table everything but a payload
func (*BusMessage) SetPayload ¶
func (bm *BusMessage) SetPayload(payload Message)
SetPayload sets the payload and makes sure that Name is saved
type BusSwitch ¶
type BusSwitch interface {
/*
Start starts the bus, once the bus is started messages get consiumed from the queue
and handlers get invoced.
Register all handlers prior to calling GBus.Start()
*/
Start() error
/*
Shutdown the bus and close connection to the underlying broker
*/
Shutdown() error
}
BusSwitch starts and shutdowns the bus
type DeadLetterMessageHandler ¶ added in v1.1.0
DeadLetterMessageHandler signature for dead letter handler
func (DeadLetterMessageHandler) Name ¶ added in v1.1.0
func (dlmg DeadLetterMessageHandler) Name() string
Name is a helper function returning the runtime name of the function bound to an instance of the DeadLetterMessageHandler type
type Deadlettering ¶ added in v1.1.0
type Deadlettering interface {
HandleDeadletter(handler DeadLetterMessageHandler)
ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error
}
Deadlettering provides the ability to handle messages that were rejected as poision and arrive to the deadletter queue
type DefaultBus ¶
type DefaultBus struct {
*Safety
*Glogged
Outbox TxOutbox
PrefetchCount uint
AmqpConnStr string
SvcName string
Registrations []*Registration
RPCHandlers map[string]MessageHandler
HandlersLock *sync.Mutex
RPCLock *sync.Mutex
SenderLock *sync.Mutex
ConsumerLock *sync.Mutex
RegisteredSchemas map[string]bool
DelayedSubscriptions [][]string
PurgeOnStartup bool
Glue SagaGlue
TxProvider TxProvider
WorkerNum uint
Serializer Serializer
DLX string
DefaultPolicies []MessagePolicy
Confirm bool
DbPingTimeout time.Duration
// contains filtered or unexported fields
}
DefaultBus implements the Bus interface
func (*DefaultBus) GetHealth ¶
func (b *DefaultBus) GetHealth() HealthCard
GetHealth implements Health.GetHealth
func (*DefaultBus) HandleDeadletter ¶
func (b *DefaultBus) HandleDeadletter(handler DeadLetterMessageHandler)
HandleDeadletter implements GBus.HandleDeadletter
func (*DefaultBus) HandleEvent ¶
func (b *DefaultBus) HandleEvent(exchange, topic string, event Message, handler MessageHandler) error
HandleEvent implements GBus.HandleEvent
func (*DefaultBus) HandleMessage ¶
func (b *DefaultBus) HandleMessage(message Message, handler MessageHandler) error
HandleMessage implements GBus.HandleMessage
func (*DefaultBus) Log ¶ added in v1.0.3
func (b *DefaultBus) Log() logrus.FieldLogger
Log returns the default logrus.FieldLogger for the bus via the Glogged helper
func (*DefaultBus) NotifyHealth ¶
func (b *DefaultBus) NotifyHealth(health chan error)
NotifyHealth implements Health.NotifyHealth
func (*DefaultBus) Publish ¶
func (b *DefaultBus) Publish(ctx context.Context, exchange, topic string, message *BusMessage, policies ...MessagePolicy) error
Publish implements GBus.Publish(topic, message)
func (*DefaultBus) RPC ¶
func (b *DefaultBus) RPC(ctx context.Context, service string, request, reply *BusMessage, timeout time.Duration) (*BusMessage, error)
RPC implements GBus.RPC
func (*DefaultBus) RegisterSaga ¶
func (b *DefaultBus) RegisterSaga(saga Saga, conf ...SagaConfFn) error
RegisterSaga impements GBus.RegisterSaga
func (*DefaultBus) ReturnDeadToQueue ¶ added in v1.1.0
func (b *DefaultBus) ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error
ReturnDeadToQueue returns a message to its original destination
func (*DefaultBus) Send ¶
func (b *DefaultBus) Send(ctx context.Context, toService string, message *BusMessage, policies ...MessagePolicy) error
Send implements GBus.Send(destination string, message interface{})
func (*DefaultBus) Shutdown ¶
func (b *DefaultBus) Shutdown() (shutdwonErr error)
Shutdown implements GBus.Start()
type DeliveryInfo ¶ added in v1.1.0
DeliveryInfo provdes information as to the attempted deilvery of the invocation
type Glogged ¶ added in v1.0.3
type Glogged struct {
// contains filtered or unexported fields
}
Glogged provides an easy way for structs with in the grabbit package to participate in the general logging schema of the bus
func (*Glogged) Log ¶ added in v1.0.3
func (gl *Glogged) Log() logrus.FieldLogger
Log returns the set default log or a new instance of a logrus.FieldLogger
func (*Glogged) SetLogger ¶ added in v1.0.3
func (gl *Glogged) SetLogger(entry logrus.FieldLogger)
SetLogger sets the default logrus.FieldLogger that should be used when logging a new message
type HandlerRegister ¶
type HandlerRegister interface {
/*
HandleMessage registers a handler to a specific message type
Use this method to register handlers for commands and reply messages
Use the HandleEvent method to subscribe on events and register a handler
*/
HandleMessage(message Message, handler MessageHandler) error
/*
HandleEvent registers a handler for a specific message type published
to an exchange with a specific topic
*/
HandleEvent(exchange, topic string, event Message, handler MessageHandler) error
}
HandlerRegister registers message handlers to specific messages and events
type Health ¶
type Health interface {
NotifyHealth(health chan error)
GetHealth() HealthCard
}
Health reports om health issues in which the bus needs to be restarted
type HealthCard ¶
HealthCard that holds the health values of the bus
type Invocation ¶
type Invocation interface {
Logged
Reply(ctx context.Context, message *BusMessage) error
Bus() Messaging
Tx() *sql.Tx
Ctx() context.Context
Routing() (exchange, routingKey string)
DeliveryInfo() DeliveryInfo
}
Invocation context for a specific processed message
type Logged ¶ added in v1.0.3
type Logged interface {
SetLogger(entry logrus.FieldLogger)
Log() logrus.FieldLogger
}
Logged represents a grabbit component that can be logged
type Message ¶
type Message interface {
SchemaName() string
}
Message a common interface that passes to the serializers to allow decoding and encoding of content
type MessageFilter ¶
MessageFilter matches rabbitmq topic patterns
func NewMessageFilter ¶
func NewMessageFilter(exchange, routingKey string, message Message) *MessageFilter
NewMessageFilter creates a new MessageFilter
func (*MessageFilter) Matches ¶
func (filter *MessageFilter) Matches(exchange, routingKey, msgName string) bool
Matches the passed in exchange, routingKey, msgName with the defined filter
type MessageHandler ¶
type MessageHandler func(invocation Invocation, message *BusMessage) error
MessageHandler signature for all command handlers
func (MessageHandler) Name ¶ added in v1.1.0
func (mg MessageHandler) Name() string
Name is a helper function returning the runtime name of the function bound to an instance of the MessageHandler type
type MessagePolicy ¶
type MessagePolicy interface {
Apply(publishing *amqp.Publishing)
}
MessagePolicy defines a user policy for out going amqp messages User policies can control message ttl, durability etc..
type Messaging ¶
type Messaging interface {
/*
Send a command or a command response to a specific service
one-to-one semantics
*/
Send(ctx context.Context, toService string, command *BusMessage, policies ...MessagePolicy) error
/*
Publish and event, one-to-many semantics
*/
Publish(ctx context.Context, exchange, topic string, event *BusMessage, policies ...MessagePolicy) error
/*
RPC calls the service passing him the request BusMessage and blocks until a reply is
received or timeout experied.
*/
RPC(ctx context.Context, service string, request, reply *BusMessage, timeout time.Duration) (*BusMessage, error)
}
Messaging interface to send and publish messages to the bus
type Registration ¶
type Registration struct {
Handler MessageHandler
// contains filtered or unexported fields
}
Registration represents a message handler's registration for a given exchange, topic and msg combination
func NewRegistration ¶
func NewRegistration(exchange, routingKey string, message Message, handler MessageHandler) *Registration
NewRegistration creates a new registration
func (*Registration) Matches ¶
func (sub *Registration) Matches(exchange, routingKey, msgName string) bool
Matches the registration with the given xchange, routingKey, msgName
type RequestSagaTimeout ¶
type RequestSagaTimeout interface {
TimeoutDuration() time.Duration
Timeout(tx *sql.Tx, bus Messaging) error
}
RequestSagaTimeout is the interface a saga needs to implement to get timeout servicess
type Saga ¶
type Saga interface {
//StartedBy returns the messages that when received should create a new saga instance
StartedBy() []Message
/*
RegisterAllHandlers passes in the HandlerRegister so that the saga can register
the messages that it handles
*/
RegisterAllHandlers(register HandlerRegister)
//IsComplete retruns if the saga is complete and can be discarded
IsComplete() bool
//New is a factory method used by the bus to crerate new instances of a saga
New() Saga
}
Saga is the base interface for all Sagas.
type SagaConfFn ¶
SagaConfFn is a function to allow configuration of a saga in the context of the gbus
type SagaGlue ¶ added in v1.1.0
type SagaGlue interface {
SagaRegister
Logged
Start() error
Stop() error
}
SagaGlue glues together all the parts needed in order to orchistrate saga instances
type SagaRegister ¶
type SagaRegister interface {
RegisterSaga(saga Saga, conf ...SagaConfFn) error
}
SagaRegister registers sagas to the bus
type SagaTimeoutMessage ¶
type SagaTimeoutMessage struct {
SagaID string
}
SagaTimeoutMessage is the timeout message for Saga's
func (SagaTimeoutMessage) SchemaName ¶
func (SagaTimeoutMessage) SchemaName() string
SchemaName implements gbus.Message
type Serializer ¶
type Serializer interface {
Name() string
Encode(message Message) ([]byte, error)
Decode(buffer []byte, schemaName string) (Message, error)
Register(obj Message)
}
Serializer is the base interface for all message serializers
type TimeoutManager ¶ added in v1.1.0
type TimeoutManager interface {
//RegisterTimeout requests the TimeoutManager to register a timeout for a specific saga instance
RegisterTimeout(tx *sql.Tx, sagaID string, duration time.Duration) error
//ClearTimeout clears a timeout for a specific saga
ClearTimeout(tx *sql.Tx, sagaID string) error
//SetTimeoutFunction accepts the function that the TimeoutManager should invoke once a timeout expires
SetTimeoutFunction(func(tx *sql.Tx, sagaID string) error)
//Start starts the timeout manager
Start() error
//Stop shuts the timeout manager down
Stop() error
}
TimeoutManager abstracts the implementation of determining when a saga should be timed out