message

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 20, 2018 License: MIT Imports: 8 Imported by: 1,061

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrAlreadyAcked  = errors.New("message already acked")
	ErrAlreadyNacked = errors.New("message already nacked")
)

Functions

This section is empty.

Types

type HandlerFunc

type HandlerFunc func(msg *Message) ([]*Message, error)

HandlerFunc is function called when message is received.

msg.Ack() is called automatically when HandlerFunc doesn't return error. When HandlerFunc returns error, msg.Nack() is called. When msg.Ack() was called in handler and HandlerFunc returns error, msg.Nack() will be not sent because Ack was already sent.

HandlerFunc's are executed parallel when multiple messages was received (because msg.Ack() was sent in HandlerFunc or Subscriber supports multiple consumers).

type HandlerMiddleware

type HandlerMiddleware func(h HandlerFunc) HandlerFunc

HandlerMiddleware allows us to write something like decorators to HandlerFunc. It can execute something before handler (for example: modify consumed message) or after (modify produced messages, ack/nack on consumed message, handle errors, logging, etc.).

It can be attached to the router by using `AddMiddleware` method.

Example:

func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
	return func(message *message.Message) ([]*message.Message, error) {
		fmt.Println("executed before handler")
		producedMessages, err := h(message)
		fmt.Println("executed after handler")

		return producedMessages, err
	}
}

type Message

type Message struct {
	// UUID is an unique identifier of message.
	//
	// It is only used by Watermill for debugging.
	// UUID can be empty.
	UUID string

	// Metadata contains the message metadata.
	//
	// Can be used to store data which doesn't require unmarshaling entire payload.
	// It is something similar to HTTP request's headers.
	Metadata Metadata

	// Payload is message's payload.
	Payload Payload
	// contains filtered or unexported fields
}

func NewMessage

func NewMessage(uuid string, payload Payload) *Message

func (*Message) Ack

func (m *Message) Ack() error

Ack sends message's acknowledgement.

Ack is not blocking. Ack is idempotent. Error is returned, if Nack is already sent.

func (*Message) Acked

func (m *Message) Acked() <-chan struct{}

Acked returns channel which is closed when acknowledgement is sent.

Usage:

select {
case <-message.Acked():
	// ack received
case <-message.Nacked():
	// nack received
}

func (Message) Context added in v0.2.0

func (m Message) Context() context.Context

Context returns the message's context. To change the context, use SetContext.

The returned context is always non-nil; it defaults to the background context.

func (Message) Copy added in v0.2.0

func (m Message) Copy() *Message

Copy copies all message without Acks/Nacks.

func (Message) Equals added in v0.2.0

func (m Message) Equals(toCompare *Message) bool

Equals compare, that two messages are equal. Acks/Nacks are not compared.

func (*Message) Nack

func (m *Message) Nack() error

Nack sends message's negative acknowledgement.

Nack is not blocking. Nack is idempotent. Error is returned, if Ack is already sent.

func (*Message) Nacked

func (m *Message) Nacked() <-chan struct{}

Nacked returns channel which is closed when negative acknowledgement is sent.

Usage:

select {
case <-message.Acked():
	// ack received
case <-message.Nacked():
	// nack received
}

func (*Message) SetContext added in v0.2.0

func (m *Message) SetContext(ctx context.Context)

SetContext sets provided context to the message.

type Messages

type Messages []*Message

func (Messages) IDs

func (m Messages) IDs() []string

type Metadata

type Metadata map[string]string

func (Metadata) Get

func (m Metadata) Get(key string) string

func (Metadata) Set

func (m Metadata) Set(key, value string)

type Payload

type Payload []byte

type PubSub

type PubSub interface {
	Close() error
	// contains filtered or unexported methods
}

func NewPubSub

func NewPubSub(publisher Publisher, subscriber Subscriber) PubSub

type Publisher

type Publisher interface {

	// Close should flush unsent messages, if publisher is async.
	Close() error
	// contains filtered or unexported methods
}

type Router

type Router struct {
	// contains filtered or unexported fields
}

func NewRouter

func NewRouter(config RouterConfig, logger watermill.LoggerAdapter) (*Router, error)

func (*Router) AddHandler

func (r *Router) AddHandler(
	handlerName string,
	subscribeTopic string,
	publishTopic string,
	pubSub PubSub,
	handlerFunc HandlerFunc,
) error

AddHandler adds a new handler.

handlerName must be unique. For now, it is used only for debugging.

subscribeTopic is a topic from which handler will receive messages.

publishTopic is a topic to which router will produce messages retuened by handlerFunc. When handler needs to publish to multiple topics, it is recommended to just inject Publisher to Handler or implement middleware which will catch messages and publish to topic based on metadata for example.

pubSub is PubSub from which messages will be consumed and to which created messages will be published. If you have separated Publisher and Subscriber object, you can create PubSub object by calling message.NewPubSub(publisher, subscriber).

func (*Router) AddMiddleware

func (r *Router) AddMiddleware(m ...HandlerMiddleware)

AddMiddleware adds a new middleware to the router.

The order of middlewares matters. Middleware added at the beginning is executed first.

func (*Router) AddNoPublisherHandler

func (r *Router) AddNoPublisherHandler(
	handlerName string,
	subscribeTopic string,
	subscriber Subscriber,
	handlerFunc HandlerFunc,
) error

AddNoPublisherHandler adds a new handler. This handler cannot return messages. When message is returned it will occur an error and Nack will be sent.

handlerName must be unique. For now, it is used only for debugging.

subscribeTopic is a topic from which handler will receive messages.

subscriber is Subscriber from which messages will be consumed.

func (*Router) AddPlugin

func (r *Router) AddPlugin(p ...RouterPlugin)

func (*Router) Close

func (r *Router) Close() error

func (*Router) Logger

func (r *Router) Logger() watermill.LoggerAdapter

func (*Router) Run

func (r *Router) Run() (err error)

Run runs all plugins and handlers and starts subscribing to provided topics. This call is blocking until router is running.

To stop Run() you should call Close() on the router.

func (*Router) Running

func (r *Router) Running() chan struct{}

Running is closed when router is running. In other words: you can wait till router is running using

fmt.Println("Starting router")
go r.Run()
<- r.Running()
fmt.Println("Router is running")

type RouterConfig

type RouterConfig struct {
	// CloseTimeout determines how long router should work for handlers when closing.
	CloseTimeout time.Duration
}

func (RouterConfig) Validate

func (c RouterConfig) Validate() error

type RouterPlugin

type RouterPlugin func(*Router) error

RouterPlugin is function which is executed on Router start.

type Subscriber

type Subscriber interface {

	// Close closes all subscriptions with their output channels and flush offsets etc. when needed.
	Close() error
	// contains filtered or unexported methods
}

Directories

Path Synopsis
router

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL