Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type FlowControl ¶
type FlowControl struct {
// contains filtered or unexported fields
}
FlowControl helps with controlling the flow of events. We provide a channel based API for consumers, but pulling from NATS is done in batches. To maintain a steady flow of events, we need to control the batch size and how many events are sent to the consumer.
There are two goals in this:
1. Make sure that the consumer does not have to wait too long for events. 2. Make sure that the consumer is not overwhelmed with events.
As we serve things over gRPC streams we don't have full control over if the consumer buffers events. So we control how many pending events a consumer can process at a time.
We use a MIMD style algorithm to control the maximum amount of events being processed. The idea is that we want to increase the number of events if the consumer is processing events normally, and decrease the events if the consumer is slow or rejecting events.
func NewFlowControl ¶
func NewFlowControl( ctx context.Context, logger *zap.Logger, eventTimeout time.Duration, maxPendingEvents int, ) *FlowControl
NewFlowControl creates a new flow control.
func (*FlowControl) GetBatchSize ¶
func (fc *FlowControl) GetBatchSize() int
GetBatchSize returns the number of events to load from NATS. This will adjust the processing limit based on the flow of events.
This operation will block until we are processing less events than the limit.
func (*FlowControl) Received ¶
func (fc *FlowControl) Received(id uint64) func(processType ProcessType)
Received is called when an event is received from NATS. It returns a function that should be called when the event is processed, such as acknowledging, rejecting or pinging it.
func (*FlowControl) WaitUntilAvailable ¶
func (fc *FlowControl) WaitUntilAvailable()
type ProcessType ¶
type ProcessType int
const ( ProcessTypeAck ProcessType = iota ProcessTypeReject ProcessTypePermanentReject ProcessTypePing )