Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AsyncEventConsumer ¶
type AsyncEventConsumer struct {
// Name of the events in the push queue to watch for.
WatchEvents []string
// Number of events to be fetched and processed at a time.
BatchSize int
// An async event handler that consumes events.
Consumer AsyncEventHandlerFunc
// ConsumerOption is the configuration for the PGConsumer.
ConsumerOption *ConsumerOption
// EventFetcherOption contains configuration on how the events should be fetched.
EventFetcherOption *EventFetcherOption
// contains filtered or unexported fields
}
func (AsyncEventConsumer) EventConsumer ¶
func (t AsyncEventConsumer) EventConsumer() (*PGConsumer, error)
func (AsyncEventConsumer) GetRecords ¶
func (t AsyncEventConsumer) GetRecords() ([]models.Event, error)
func (*AsyncEventConsumer) Handle ¶
func (t *AsyncEventConsumer) Handle(ctx context.Context) (int, error)
func (*AsyncEventConsumer) RecordEvents ¶
func (t *AsyncEventConsumer) RecordEvents(size int)
RecordEvents will record all the events fetched by the consumer in a ring buffer.
type AsyncEventHandlerFunc ¶
AsyncEventHandlerFunc processes multiple events and returns the failed ones
func AsyncHandler ¶
func AsyncHandler(fn func(ctx context.Context, e models.Events) models.Events) AsyncEventHandlerFunc
AsyncHandler converts the given user defined handler into a async event handler.
type ConsumerOption ¶
type ConsumerOption struct {
// Number of concurrent consumers.
// default: 1
NumConsumers int
// Timeout is the timeout to call the consumer func in case no pg notification is received.
// default: 1 minute
Timeout time.Duration
// handle errors when consuming.
// returns whether to retry or not.
// default: sleep for 1s and retry.
ErrorHandler func(ctx context.Context, e error) bool
}
type EventFetcherOption ¶
type EventFetcherOption struct {
// MaxAttempts is the number of times an event is attempted to process
// default: 3
MaxAttempts int
// BaseDelay is the base delay between retries
// default: 60 seconds
BaseDelay int
// Exponent is the exponent of the base delay
// default: 5 (along with baseDelay = 60, the retries are 1, 6, 31, 156 (in minutes))
Exponent int
}
type PGConsumer ¶
type PGConsumer struct {
// contains filtered or unexported fields
}
PGConsumer manages concurrent consumers to handle PostgreSQL NOTIFY events from a specific channel.
func NewPGConsumer ¶
func NewPGConsumer(consumerFunc ConsumerFunc, opt *ConsumerOption) (*PGConsumer, error)
NewPGConsumer returns a new EventConsumer
func (*PGConsumer) ConsumeUntilEmpty ¶
func (t *PGConsumer) ConsumeUntilEmpty(ctx context.Context)
ConsumeUntilEmpty consumes events in a loop until the event queue is empty.
type SyncEventConsumer ¶
type SyncEventConsumer struct {
// Name of the events in the push queue to watch for.
WatchEvents []string
// List of sync event handlers that process a single event one after another in order.
// All the handlers must succeed or else the event will be marked as failed.
Consumers []SyncEventHandlerFunc
// ConsumerOption is the configuration for the PGConsumer.
ConsumerOption *ConsumerOption
// EventFetcherOption contains configuration on how the events should be fetched.
EventFetchOption *EventFetcherOption
// contains filtered or unexported fields
}
func (SyncEventConsumer) EventConsumer ¶
func (t SyncEventConsumer) EventConsumer() (*PGConsumer, error)
func (SyncEventConsumer) GetRecords ¶
func (t SyncEventConsumer) GetRecords() ([]models.Event, error)
func (*SyncEventConsumer) Handle ¶
func (t *SyncEventConsumer) Handle(ctx context.Context) (int, error)
func (*SyncEventConsumer) RecordEvents ¶
func (t *SyncEventConsumer) RecordEvents(size int)
RecordEvents will record all the events fetched by the consumer in a ring buffer.
type SyncEventHandlerFunc ¶
SyncEventHandlerFunc processes a single event and ONLY makes db changes.
func SyncHandlers ¶
SyncHandlers converts the given user defined handlers into sync event handlers.