flowcontrol

package
v0.18.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 2, 2024 License: MIT Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type FlowControl

type FlowControl struct {
	// contains filtered or unexported fields
}

FlowControl helps with controlling the flow of events. We provide a channel based API for consumers, but pulling from NATS is done in batches. To maintain a steady flow of events, we need to control the batch size and how many events are sent to the consumer.

There are two goals in this:

1. Make sure that the consumer does not have to wait too long for events. 2. Make sure that the consumer is not overwhelmed with events.

As we serve things over gRPC streams we don't have full control over if the consumer buffers events. So we control how many pending events a consumer can process at a time.

We use a MIMD style algorithm to control the maximum amount of events being processed. The idea is that we want to increase the number of events if the consumer is processing events normally, and decrease the events if the consumer is slow or rejecting events.

func NewFlowControl

func NewFlowControl(
	ctx context.Context,
	logger *zap.Logger,
	eventTimeout time.Duration,
	maxPendingEvents int,
) *FlowControl

NewFlowControl creates a new flow control.

func (*FlowControl) GetBatchSize

func (fc *FlowControl) GetBatchSize() int

GetBatchSize returns the number of events to load from NATS. This will adjust the processing limit based on the flow of events.

This operation will block until we are processing less events than the limit.

func (*FlowControl) Received

func (fc *FlowControl) Received(id uint64) func(processType ProcessType)

Received is called when an event is received from NATS. It returns a function that should be called when the event is processed, such as acknowledging, rejecting or pinging it.

func (*FlowControl) WaitUntilAvailable

func (fc *FlowControl) WaitUntilAvailable()

type ProcessType

type ProcessType int
const (
	ProcessTypeAck ProcessType = iota
	ProcessTypeReject
	ProcessTypePermanentReject
	ProcessTypePing
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL