sender

package module
v0.66.0-rc.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2025 License: Apache-2.0 Imports: 15 Imported by: 8

Documentation

Index

Constants

View Source
const (
	// DefaultWorkersPerQueue - By default most pipelines will only require a single sender worker, as the single worker itself can
	// concurrently transmit multiple http requests at once. This value is not intended to be configurable, but legacy
	// usages of the sender will override this value where necessary. If there is a desire to edit the concurrency of the senders
	// via config, see the BatchMaxConcurrentSend endpoint setting.
	DefaultWorkersPerQueue = 1

	// DefaultQueuesCount - By default most pipelines will only require a single queue, as the single queue itself can
	// concurrently transmit multiple http requests at once. Systems forced in to a legacy mode will override this value.
	DefaultQueuesCount = 1
)

Variables

This section is empty.

Functions

This section is empty.

Types

type DestinationFactory added in v0.66.0

type DestinationFactory func() *client.Destinations

DestinationFactory used to generate client destinations on each call.

type DestinationSender

type DestinationSender struct {
	// contains filtered or unexported fields
}

DestinationSender wraps a destination to send messages blocking on a full buffer, but not blocking when a destination is retrying

func NewDestinationSender

func NewDestinationSender(config pkgconfigmodel.Reader, destination client.Destination, output chan *message.Payload, bufferSize int) *DestinationSender

NewDestinationSender creates a new DestinationSender

func (*DestinationSender) NonBlockingSend

func (d *DestinationSender) NonBlockingSend(payload *message.Payload) bool

NonBlockingSend tries to send the payload and fails silently if the input is full. returns false if the buffer is full - true if successful.

func (*DestinationSender) Send

func (d *DestinationSender) Send(payload *message.Payload) bool

Send sends a payload and blocks if the input is full. It will not block if the destination is retrying payloads and will cancel the blocking attempt if the retry state changes

func (*DestinationSender) Stop

func (d *DestinationSender) Stop()

Stop stops the DestinationSender

type MessageBuffer

type MessageBuffer struct {
	// contains filtered or unexported fields
}

MessageBuffer accumulates messages to a buffer until the max capacity is reached.

func NewMessageBuffer

func NewMessageBuffer(batchSizeLimit int, contentSizeLimit int) *MessageBuffer

NewMessageBuffer returns a new MessageBuffer.

func (*MessageBuffer) AddMessage

func (p *MessageBuffer) AddMessage(message *message.Message) bool

AddMessage adds a message to the buffer if there is still some free space, returns true if the message was added.

func (*MessageBuffer) Clear

func (p *MessageBuffer) Clear()

Clear reinitializes the buffer.

func (*MessageBuffer) ContentSizeLimit

func (p *MessageBuffer) ContentSizeLimit() int

ContentSizeLimit returns the configured content size limit. Messages above this limit are not accepted.

func (*MessageBuffer) GetMessages

func (p *MessageBuffer) GetMessages() []*message.Message

GetMessages returns the messages stored in the buffer.

func (*MessageBuffer) IsEmpty

func (p *MessageBuffer) IsEmpty() bool

IsEmpty returns true if the buffer is empty.

func (*MessageBuffer) IsFull

func (p *MessageBuffer) IsFull() bool

IsFull returns true if the buffer is full.

type Mock added in v0.66.0

type Mock struct {
	// contains filtered or unexported fields
}

Mock represents a mocked sender that fulfills the pipeline component interface

func NewMockSender added in v0.66.0

func NewMockSender() *Mock

NewMockSender generates a mock sender

func (*Mock) In added in v0.66.0

func (s *Mock) In() chan *message.Payload

In returns a self-emptying chan

func (*Mock) PipelineMonitor added in v0.66.0

func (s *Mock) PipelineMonitor() metrics.PipelineMonitor

PipelineMonitor returns an instance of NoopPipelineMonitor

func (*Mock) Start added in v0.66.0

func (s *Mock) Start()

Start begins the routine that empties the In channel

func (*Mock) Stop added in v0.66.0

func (s *Mock) Stop()

Stop closes the in channel

type PipelineComponent added in v0.66.0

type PipelineComponent interface {
	In() chan *message.Payload
	PipelineMonitor() metrics.PipelineMonitor
	Start()
	Stop()
}

PipelineComponent abstracts a pipeline component

type Sender

type Sender struct {
	// contains filtered or unexported fields
}

Sender can distribute payloads on multiple underlying workers

func NewSender

func NewSender(
	config pkgconfigmodel.Reader,
	inputChan chan *message.Payload,
	outputChan chan *message.Payload,
	destinations *client.Destinations,
	bufferSize int,
	senderDoneChan chan *sync.WaitGroup,
	flushWg *sync.WaitGroup,
	pipelineMonitor metrics.PipelineMonitor,
) *Sender

NewSender is the legacy sender.

func NewSenderV2 added in v0.66.0

func NewSenderV2(
	config pkgconfigmodel.Reader,
	auditor auditor.Auditor,
	destinationFactory DestinationFactory,
	bufferSize int,
	senderDoneChan chan *sync.WaitGroup,
	flushWg *sync.WaitGroup,
	queueCount int,
	workersPerQueue int,
	pipelineMonitor metrics.PipelineMonitor,
) *Sender

NewSenderV2 is the default implementation of the sender factory.

func (*Sender) In added in v0.66.0

func (s *Sender) In() chan *message.Payload

In is the input channel of a worker set.

func (*Sender) PipelineMonitor added in v0.66.0

func (s *Sender) PipelineMonitor() metrics.PipelineMonitor

PipelineMonitor returns the pipeline monitor of the sender workers.

func (*Sender) Start

func (s *Sender) Start()

Start starts all sender workers.

func (*Sender) Stop

func (s *Sender) Stop()

Stop stops all sender workers

type Serializer

type Serializer interface {
	Serialize(messages []*message.Message, writer io.Writer) error
}

Serializer transforms a batch of messages into a payload. It is the one rendering the messages (i.e. either directly using raw []byte data from unstructured messages or turning structured messages into []byte data).

var (
	// LineSerializer is a shared line serializer.
	LineSerializer Serializer = &lineSerializer{}
	// ArraySerializer is a shared line serializer.
	ArraySerializer Serializer = &arraySerializer{}
)

type Strategy

type Strategy interface {
	Start()
	Stop()
}

Strategy should contain all logic to send logs to a remote destination and forward them the next stage of the pipeline. In the logs pipeline, the strategy implementation should convert a stream of incoming Messages to a stream of Payloads that the sender can handle. A strategy is startable and stoppable so that the pipeline can manage it's lifecycle.

func NewBatchStrategy

func NewBatchStrategy(inputChan chan *message.Message,
	outputChan chan *message.Payload,
	flushChan chan struct{},
	serverless bool,
	flushWg *sync.WaitGroup,
	serializer Serializer,
	batchWait time.Duration,
	maxBatchSize int,
	maxContentSize int,
	pipelineName string,
	compression compression.Compressor,
	pipelineMonitor metrics.PipelineMonitor) Strategy

NewBatchStrategy returns a new batch concurrent strategy with the specified batch & content size limits

func NewStreamStrategy

func NewStreamStrategy(inputChan chan *message.Message, outputChan chan *message.Payload, compression compression.Compressor) Strategy

NewStreamStrategy creates a new stream strategy

Directories

Path Synopsis
Package http manages creation of http-based senders
Package http manages creation of http-based senders
Package tcp manages creation of tcp-based senders
Package tcp manages creation of tcp-based senders

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL