Documentation
¶
Index ¶
- Constants
- Variables
- type Client
- type DeadCallback
- type ErrorCallback
- type Handler
- type Inbox
- type Option
- func OnDeadCallback(callback DeadCallback) Option
- func OnErrorCallback(callback ErrorCallback) Option
- func WithHandlerTimeout(dur time.Duration) Option
- func WithIterationRate(dur time.Duration) Option
- func WithIterationSeed(seed int) Option
- func WithMaxRetryAttempt(maxAttempt int) Option
- func WithRetention(eraseInterval time.Duration, windowDays int) Option
- type Record
- type Registry
- type Status
Constants ¶
const ( // DefaultIterationRate is the timeout after which all events // in the inbox table will be processed. DefaultIterationRate = 5 * time.Second // DefaultIterationSeed is a number that is used to generate a random // duration for the next worker iteration. DefaultIterationSeed = 2 // DefaultHandlerTimeout is the timeout after which the handler // will be stopped and the status will be set as Fail. DefaultHandlerTimeout = 10 * time.Second // DefaultRetryAttempts is the max attempts before event marks // as 'dead'. 'Dead' means that the event will no longer be // processed. DefaultRetryAttempts = 5 )
Variables ¶
var ErrNoRecords = errors.New("no records in inbox table")
Functions ¶
This section is empty.
Types ¶
type Client ¶
Client provides possibility to set records to the inbox table. All records will be processed in the future.
type DeadCallback ¶ added in v0.7.0
DeadCallback prototype of function that is called if message is 'dead'
type ErrorCallback ¶ added in v0.3.0
type ErrorCallback func(err error)
ErrorCallback prototype of function that is called if errors occurs during inbox process.
type Handler ¶
type Handler interface { // Key is a unique identifier of current handler. // This string must be not empty and must be unique for each // handler that passed to the Registry. Only the first handler with a key // will be stored in the Registry, all other handlers with the same key // will be ignored. Key() string // Process is a function that will be executed for each handler associated // with specific event_type and key provided by the Handler implementation. Process(context.Context, []byte) error }
type Inbox ¶
type Inbox struct {
// contains filtered or unexported fields
}
Inbox is struct that implement inbox pattern.
Writing all incoming events in a temporary table to future processing. Then we try to process each event with the provided handlers. In addition, Inbox filters new events. All events with the same event_id will be ignored.
More about inbox pattern you can read at https://softwaremill.com/microservices-101.
type Option ¶
type Option func(config) config
Option sets specific configuration to the Inbox.
func OnDeadCallback ¶ added in v0.3.0
func OnDeadCallback(callback DeadCallback) Option
OnDeadCallback sets custom callback for each message that can not be processed and marks as 'dead'. Function fires if 'dead' message detected.
func OnErrorCallback ¶ added in v0.7.0
func OnErrorCallback(callback ErrorCallback) Option
ErrorCallback sets custom callback that is called if errors occurs during inbox process.
func WithHandlerTimeout ¶
WithHandlerTimeout sets new interval after which handler will be stopped.
func WithIterationRate ¶
WithIterationRate sets new interval for process all inbox events.
func WithIterationSeed ¶ added in v0.6.0
WithIterationSeed sets the seed value for generating a random duration to add to DefaultIterationRate.
func WithMaxRetryAttempt ¶ added in v0.3.0
WithMaxRetryAttempt sets custom max attempts for processing event.
func WithRetention ¶ added in v0.7.0
WithRetention sets the retention configuration for inbox table.
Arguments:
eraseInterval - interval for the next erase execution. windowDays - the data older than the specified number of days will be deleted.
type Record ¶
type Record struct {
// contains filtered or unexported fields
}
Record is event that should be processed by inbox worker.
func NewRecord ¶
func NewRecord(id uuid.UUID, eventType string, payload []byte, eventDate ...time.Time) (*Record, error)
NewRecord creates new record that can be processed by inbox worker.
Parameters:
id - is a unique id for inbox table. ID should be unique or storage will ignore all duplicate ids. eventType - is a topic with which event was published. payload - the received body. eventDate (optional) - when event was occurred.
func (*Record) CalcNewDeadline ¶
func (*Record) Done ¶
func (r *Record) Done()
Done sets Done status to current Record. Status will be ignored on first save to the outbox table.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry contains all handler that will be processed by Inbox.
func NewRegistry ¶
func NewRegistry() *Registry
func (*Registry) Handlers ¶
Handlers returns map where key is event type and values are handlers associated to this event type.
func (*Registry) On ¶
On register new handlers to specific event key. All handlers will be executed on received event with provided key.
Example:
We have - event type = "order_events" - handler key = "process_order" - handler key = "update_order" The registry will bind the keys "process_order", "update_order" with event "order_events" and execute both registered handlers for each received event with event type "order_events". We have - event type = "order_events" - handler key = "process_order" - handler key = "process_order" If we are trying to provide several handlers with the same key, then only the first handler will be associated with the event type. The second handler will be ignored. We have - event type = "order_events" - registered handler with key = "process_order" - new handler key = "process_order" If you are trying to provide a handler to an already existing event type, for example, "order_events", and the handler has the same key as already provided, then this handler will be ignored.
type Status ¶
type Status string
Status defines current status of Record.
const ( // Progress means the current Record is processed by worker. Progress Status = "progress" // Failed means the current Record not processed by worker by specific // reason. Failed Status = "failed" // Done means the current Record is successfully processed. Done Status = "done" // Null means the current Record is not processed yet. Null Status = "" // Dead means the current Record is not processable. Dead Status = "dead" )