Documentation
¶
Index ¶
- Variables
- func GetFqn(obj interface{}) string
- func GetTypeFQN(t reflect.Type) string
- type AMQPOutbox
- type Builder
- type Bus
- type BusConfiguration
- type BusMessage
- type BusSwitch
- type DefaultBus
- func (b *DefaultBus) GetHealth() HealthCard
- func (b *DefaultBus) HandleDeadletter(handler func(tx *sql.Tx, poision amqp.Delivery) error)
- func (b *DefaultBus) HandleEvent(exchange, topic string, event Message, handler MessageHandler) error
- func (b *DefaultBus) HandleMessage(message Message, handler MessageHandler) error
- func (b *DefaultBus) NotifyHealth(health chan error)
- func (b *DefaultBus) Publish(ctx context.Context, exchange, topic string, message *BusMessage, ...) error
- func (b *DefaultBus) RPC(ctx context.Context, service string, request, reply *BusMessage, ...) (*BusMessage, error)
- func (b *DefaultBus) RegisterSaga(saga Saga, conf ...SagaConfFn) error
- func (b *DefaultBus) Send(ctx context.Context, toService string, message *BusMessage, ...) error
- func (b *DefaultBus) Shutdown() (shutdwonErr error)
- func (b *DefaultBus) Start() error
- type HandlerRegister
- type Health
- type HealthCard
- type Invocation
- type Message
- type MessageFilter
- type MessageHandler
- type MessagePolicy
- type Messaging
- type RegisterDeadletterHandler
- type Registration
- type RequestSagaTimeout
- type Safety
- type Saga
- type SagaConfFn
- type SagaRegister
- type SagaTimeoutMessage
- type Semantics
- type Serializer
- type TxOutbox
- type TxProvider
Constants ¶
This section is empty.
Variables ¶
var ( //MaxRetryCount defines the max times a retry can run. //Default is 3 but it is configurable MaxRetryCount uint = 3 //BaseRetryDuration defines the basic milliseconds that the retry algorithm uses //for a random retry time. Default is 10 but it is configurable. BaseRetryDuration = 10 * time.Millisecond //RpcHeaderName used to define the header in grabbit for RPC RpcHeaderName = "x-grabbit-msg-rpc-id" )
Functions ¶
func GetFqn ¶
func GetFqn(obj interface{}) string
GetFqn gets the "fully qualified name" of an interface. meaning the package path + typename
func GetTypeFQN ¶
GetTypeFQN gets the "fully qualified name" of a type. meaning the package path + typename
Types ¶
type AMQPOutbox ¶
type AMQPOutbox struct {
SvcName string
// contains filtered or unexported fields
}
AMQPOutbox sends messages to the amqp transport
func (*AMQPOutbox) NotifyConfirm ¶
func (out *AMQPOutbox) NotifyConfirm(ack, nack chan uint64)
NotifyConfirm send an amqp notification
func (*AMQPOutbox) Post ¶
func (out *AMQPOutbox) Post(exchange, routingKey string, amqpMessage amqp.Publishing) (uint64, error)
Post implements Outbox.Send
type Builder ¶
type Builder interface {
PurgeOnStartUp() Builder
WithDeadlettering(deadletterExchange string) Builder
/*
Txnl sets the bus to be transactional using a persisted saga store
provider: mysql for mysql database
connStr: connection string in the format of the passed in provider
*/
Txnl(provider, connStr string) Builder
//WithSerializer provides the ability to plugin custom serializers
WithSerializer(serializer Serializer) Builder
/*
WorkerNum sets the number of worker go routines consuming messages from the queue
The default value if this option is not set is 1
*/
WorkerNum(workers uint, prefetchCount uint) Builder
/*
WithConfirms enables publisher confirms
*/
WithConfirms() Builder
//WithPolicies defines the default policies that are applied for evey outgoing amqp messge
WithPolicies(policies ...MessagePolicy) Builder
//ConfigureHealthCheck defines the default timeout in seconds for the db ping check
ConfigureHealthCheck(timeoutInSeconds time.Duration) Builder
//RetriesNum defines the number of retries upon error
WithConfiguration(config BusConfiguration) Builder
//Build the bus
Build(svcName string) Bus
}
Builder is the main interface that should be used to create an instance of a Bus
type Bus ¶
type Bus interface {
HandlerRegister
RegisterDeadletterHandler
BusSwitch
Messaging
SagaRegister
Health
}
Bus interface provides the majority of functionality to Send, Reply and Publish messages to the Bus
type BusConfiguration ¶ added in v1.0.2
BusConfiguration provides configuration passed to the bus builder
type BusMessage ¶
type BusMessage struct {
ID string
CorrelationID string
SagaID string
SagaCorrelationID string
Semantics Semantics /*cmd or evt*/
Payload Message
PayloadFQN string
RPCID string
}
BusMessage the structure that gets sent to the underlying transport
func NewBusMessage ¶
func NewBusMessage(payload Message) *BusMessage
NewBusMessage factory method for creating a BusMessage that wraps the given payload
func NewFromAMQPHeaders ¶
func NewFromAMQPHeaders(headers amqp.Table) *BusMessage
NewFromAMQPHeaders creates a BusMessage from headers of an amqp message
func (*BusMessage) GetAMQPHeaders ¶
func (bm *BusMessage) GetAMQPHeaders() (headers amqp.Table)
GetAMQPHeaders convert to AMQP headers Table everything but a payload
func (*BusMessage) GetTraceLog ¶
func (bm *BusMessage) GetTraceLog() (fields []log.Field)
func (*BusMessage) SetFromAMQPHeaders ¶
func (bm *BusMessage) SetFromAMQPHeaders(headers amqp.Table)
SetFromAMQPHeaders convert from AMQP headers Table everything but a payload
func (*BusMessage) SetPayload ¶
func (bm *BusMessage) SetPayload(payload Message)
SetPayload sets the payload and makes sure that Name is saved
type BusSwitch ¶
type BusSwitch interface {
/*
Start starts the bus, once the bus is started messages get consiumed from the queue
and handlers get invoced.
Register all handlers prior to calling GBus.Start()
*/
Start() error
/*
Shutdown the bus and close connection to the underlying broker
*/
Shutdown() error
}
BusSwitch starts and shutdowns the bus
type DefaultBus ¶
type DefaultBus struct {
*Safety
Outgoing *AMQPOutbox
Outbox TxOutbox
PrefetchCount uint
AmqpConnStr string
AMQPChannel *amqp.Channel
SvcName string
Registrations []*Registration
RPCHandlers map[string]MessageHandler
HandlersLock *sync.Mutex
RPCLock *sync.Mutex
SenderLock *sync.Mutex
ConsumerLock *sync.Mutex
RegisteredSchemas map[string]bool
DelayedSubscriptions [][]string
PurgeOnStartup bool
Glue SagaRegister
TxProvider TxProvider
IsTxnl bool
WorkerNum uint
Serializer Serializer
DLX string
DefaultPolicies []MessagePolicy
Confirm bool
DbPingTimeout time.Duration
// contains filtered or unexported fields
}
DefaultBus implements the Bus interface
func (*DefaultBus) GetHealth ¶
func (b *DefaultBus) GetHealth() HealthCard
GetHealth implements Health.GetHealth
func (*DefaultBus) HandleDeadletter ¶
HandleDeadletter implements GBus.HandleDeadletter
func (*DefaultBus) HandleEvent ¶
func (b *DefaultBus) HandleEvent(exchange, topic string, event Message, handler MessageHandler) error
HandleEvent implements GBus.HandleEvent
func (*DefaultBus) HandleMessage ¶
func (b *DefaultBus) HandleMessage(message Message, handler MessageHandler) error
HandleMessage implements GBus.HandleMessage
func (*DefaultBus) NotifyHealth ¶
func (b *DefaultBus) NotifyHealth(health chan error)
NotifyHealth implements Health.NotifyHealth
func (*DefaultBus) Publish ¶
func (b *DefaultBus) Publish(ctx context.Context, exchange, topic string, message *BusMessage, policies ...MessagePolicy) error
Publish implements GBus.Publish(topic, message)
func (*DefaultBus) RPC ¶
func (b *DefaultBus) RPC(ctx context.Context, service string, request, reply *BusMessage, timeout time.Duration) (*BusMessage, error)
RPC implements GBus.RPC
func (*DefaultBus) RegisterSaga ¶
func (b *DefaultBus) RegisterSaga(saga Saga, conf ...SagaConfFn) error
RegisterSaga impements GBus.RegisterSaga
func (*DefaultBus) Send ¶
func (b *DefaultBus) Send(ctx context.Context, toService string, message *BusMessage, policies ...MessagePolicy) error
Send implements GBus.Send(destination string, message interface{})
func (*DefaultBus) Shutdown ¶
func (b *DefaultBus) Shutdown() (shutdwonErr error)
Shutdown implements GBus.Start()
type HandlerRegister ¶
type HandlerRegister interface {
/*
HandleMessage registers a handler to a specific message type
Use this method to register handlers for commands and reply messages
Use the HandleEvent method to subscribe on events and register a handler
*/
HandleMessage(message Message, handler MessageHandler) error
/*
HandleEvent registers a handler for a specific message type published
to an exchange with a specific topic
*/
HandleEvent(exchange, topic string, event Message, handler MessageHandler) error
}
HandlerRegister registers message handlers to specific messages and events
type Health ¶
type Health interface {
NotifyHealth(health chan error)
GetHealth() HealthCard
}
Health reports om health issues in which the bus needs to be restarted
type HealthCard ¶
HealthCard that holds the health values of the bus
type Invocation ¶
type Invocation interface {
Reply(ctx context.Context, message *BusMessage) error
Bus() Messaging
Tx() *sql.Tx
Ctx() context.Context
Routing() (exchange, routingKey string)
}
Invocation context for a specific processed message
type Message ¶
type Message interface {
SchemaName() string
}
Message a common interface that passes to the serializers to allow decoding and encoding of content
type MessageFilter ¶
func NewMessageFilter ¶
func NewMessageFilter(exchange, routingKey string, message Message) *MessageFilter
func (*MessageFilter) Matches ¶
func (filter *MessageFilter) Matches(exchange, routingKey, msgName string) bool
type MessageHandler ¶
type MessageHandler func(invocation Invocation, message *BusMessage) error
MessageHandler signature for all command handlers
type MessagePolicy ¶
type MessagePolicy interface {
Apply(publishing *amqp.Publishing)
}
MessagePolicy defines a user policy for out going amqp messages User policies can control message ttl, durability etc..
type Messaging ¶
type Messaging interface {
/*
Send a command or a command response to a specific service
one-to-one semantics
*/
Send(ctx context.Context, toService string, command *BusMessage, policies ...MessagePolicy) error
/*
Publish and event, one-to-many semantics
*/
Publish(ctx context.Context, exchange, topic string, event *BusMessage, policies ...MessagePolicy) error
/*
RPC calls the service passing him the request BusMessage and blocks until a reply is
received or timeout experied.
*/
RPC(ctx context.Context, service string, request, reply *BusMessage, timeout time.Duration) (*BusMessage, error)
}
Messaging interface to send and publish messages to the bus
type RegisterDeadletterHandler ¶
type RegisterDeadletterHandler interface {
HandleDeadletter(handler func(tx *sql.Tx, poision amqp.Delivery) error)
}
RegisterDeadletterHandler provides the ability to handle messages that were rejected as poision and arrive to the deadletter queue
type Registration ¶
type Registration struct {
Handler MessageHandler
// contains filtered or unexported fields
}
func NewRegistration ¶
func NewRegistration(exchange, routingKey string, message Message, handler MessageHandler) *Registration
func (*Registration) Matches ¶
func (sub *Registration) Matches(exchange, routingKey, msgName string) bool
type RequestSagaTimeout ¶
type RequestSagaTimeout interface {
TimeoutDuration() time.Duration
Timeout(tx *sql.Tx, bus Messaging) error
}
RequestSagaTimeout is the interface a saga needs to implement to get timeout servicess
type Saga ¶
type Saga interface {
//StartedBy returns the messages that when received should create a new saga instance
StartedBy() []Message
/*
RegisterAllHandlers passes in the HandlerRegister so that the saga can register
the messages that it handles
*/
RegisterAllHandlers(register HandlerRegister)
//IsComplete retruns if the saga is complete and can be discarded
IsComplete() bool
//New is a factory method used by the bus to crerate new instances of a saga
New() Saga
}
Saga is the base interface for all Sagas.
type SagaConfFn ¶
SagaConfFn is a function to allow configuration of a saga in the context of the gbus
type SagaRegister ¶
type SagaRegister interface {
RegisterSaga(saga Saga, conf ...SagaConfFn) error
}
SagaRegister registers sagas to the bus
type SagaTimeoutMessage ¶
type SagaTimeoutMessage struct {
SagaID string
}
SagaTimeoutMessage is the timeout message for Saga's
func (SagaTimeoutMessage) SchemaName ¶
func (SagaTimeoutMessage) SchemaName() string
SchemaName implements gbus.Message
type Serializer ¶
type Serializer interface {
Name() string
Encode(message Message) ([]byte, error)
Decode(buffer []byte, schemaName string) (Message, error)
Register(obj Message)
}
Serializer is the base interface for all message serializers