sender

package
v0.80.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 12, 2026 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Overview

Package sender provides log message sending functionality

Package sender provides log message sending functionality

Index

Constants

View Source
const (
	// DefaultWorkersPerQueue - By default most pipelines will only require a single sender worker, as the single worker itself can
	// concurrently transmit multiple http requests at once. This value is not intended to be configurable, but legacy
	// usages of the sender will override this value where necessary. If there is a desire to edit the concurrency of the senders
	// via config, see the BatchMaxConcurrentSend endpoint setting.
	DefaultWorkersPerQueue = 1

	// DefaultQueuesCount - By default most pipelines will only require a single queue, as the single queue itself can
	// concurrently transmit multiple http requests at once. Systems forced in to a legacy mode will override this value.
	DefaultQueuesCount = 1
)

Variables

This section is empty.

Functions

This section is empty.

Types

type DestinationFactory

type DestinationFactory func(id string) *client.Destinations

DestinationFactory used to generate client destinations on each call.

type DestinationSender

type DestinationSender struct {
	// contains filtered or unexported fields
}

DestinationSender wraps a destination to send messages blocking on a full buffer, but not blocking when a destination is retrying

func NewDestinationSender

func NewDestinationSender(config pkgconfigmodel.Reader, destination client.Destination, output chan *message.Payload, bufferSize int) *DestinationSender

NewDestinationSender creates a new DestinationSender

func (*DestinationSender) NonBlockingSend

func (d *DestinationSender) NonBlockingSend(payload *message.Payload) bool

NonBlockingSend tries to send the payload and fails silently if the input is full. returns false if the buffer is full - true if successful.

func (*DestinationSender) Send

func (d *DestinationSender) Send(payload *message.Payload) bool

Send sends a payload and blocks if the input is full. It will not block if the destination is retrying payloads and will cancel the blocking attempt if the retry state changes

func (*DestinationSender) Stop

func (d *DestinationSender) Stop()

Stop stops the DestinationSender

type MessageBuffer

type MessageBuffer struct {
	// contains filtered or unexported fields
}

MessageBuffer accumulates message metadata to a buffer until the max capacity is reached.

func NewMessageBuffer

func NewMessageBuffer(batchSizeLimit int, contentSizeLimit int) *MessageBuffer

NewMessageBuffer returns a new MessageBuffer.

func (*MessageBuffer) AddMessage

func (p *MessageBuffer) AddMessage(message *message.Message) bool

AddMessage adds a message to the buffer if there is still some free space, returns true if the message was added.

func (*MessageBuffer) Clear

func (p *MessageBuffer) Clear()

Clear reinitializes the buffer.

func (*MessageBuffer) ContentSizeLimit

func (p *MessageBuffer) ContentSizeLimit() int

ContentSizeLimit returns the configured content size limit. Messages above this limit are not accepted.

func (*MessageBuffer) GetMessages

func (p *MessageBuffer) GetMessages() []*message.MessageMetadata

GetMessages returns the messages stored in the buffer.

func (*MessageBuffer) IsBytesFull

func (p *MessageBuffer) IsBytesFull() bool

IsBytesFull returns true if the buffer is full due to the content size limit.

func (*MessageBuffer) IsCountFull

func (p *MessageBuffer) IsCountFull() bool

IsCountFull returns true if the buffer is full due to the message count limit.

func (*MessageBuffer) IsEmpty

func (p *MessageBuffer) IsEmpty() bool

IsEmpty returns true if the buffer is empty.

func (*MessageBuffer) IsFull

func (p *MessageBuffer) IsFull() bool

IsFull returns true if the buffer is full.

type Mock

type Mock struct {
	// contains filtered or unexported fields
}

Mock represents a mocked sender that fulfills the pipeline component interface

func NewMockSender

func NewMockSender() *Mock

NewMockSender generates a mock sender

func (*Mock) In

func (s *Mock) In() chan *message.Payload

In returns a self-emptying chan

func (*Mock) PipelineMonitor

func (s *Mock) PipelineMonitor() metrics.PipelineMonitor

PipelineMonitor returns an instance of NoopPipelineMonitor

func (*Mock) Start

func (s *Mock) Start()

Start begins the routine that empties the In channel

func (*Mock) Stop

func (s *Mock) Stop()

Stop closes the in channel

type MockServerlessMeta

type MockServerlessMeta struct {
	// contains filtered or unexported fields
}

MockServerlessMeta is a struct that contains essential control structures for serverless mode. Do not access any methods on this struct without checking IsEnabled first.

func NewMockServerlessMeta

func NewMockServerlessMeta(isEnabled bool) *MockServerlessMeta

NewMockServerlessMeta returns a new MockServerlessMeta

func (*MockServerlessMeta) IsEnabled

func (s *MockServerlessMeta) IsEnabled() bool

IsEnabled returns true if the serverless mode is enabled.

func (*MockServerlessMeta) Lock

func (s *MockServerlessMeta) Lock()

Lock is a no-op for the mock serverless meta.

func (*MockServerlessMeta) SenderDoneChan

func (s *MockServerlessMeta) SenderDoneChan() chan *sync.WaitGroup

SenderDoneChan returns the channel is used to transfer wait groups from the sync_destination to the sender.

func (*MockServerlessMeta) Unlock

func (s *MockServerlessMeta) Unlock()

Unlock is a no-op for the mock serverless meta.

func (*MockServerlessMeta) WaitGroup

func (s *MockServerlessMeta) WaitGroup() *sync.WaitGroup

WaitGroup returns the wait group for the serverless mode.

type NoopSink

type NoopSink struct{}

NoopSink is a Sink implementation that does nothing This is used when there is no need to hook an auditor to the sender

func (*NoopSink) Channel

func (t *NoopSink) Channel() chan *message.Payload

Channel returns a nil channel

type PipelineComponent

type PipelineComponent interface {
	In() chan *message.Payload
	PipelineMonitor() metrics.PipelineMonitor
	Start()
	Stop()
}

PipelineComponent abstracts a pipeline component

type Sender

type Sender struct {
	// contains filtered or unexported fields
}

Sender can distribute payloads on multiple underlying workers

func NewSender

func NewSender(
	config pkgconfigmodel.Reader,
	sink Sink,
	destinationFactory DestinationFactory,
	bufferSize int,
	serverlessMeta ServerlessMeta,
	queueCount int,
	workersPerQueue int,
	pipelineMonitor metrics.PipelineMonitor,
) *Sender

NewSender returns a new sender.

func (*Sender) In

func (s *Sender) In() chan *message.Payload

In is the input channel of a worker set.

func (*Sender) PipelineMonitor

func (s *Sender) PipelineMonitor() metrics.PipelineMonitor

PipelineMonitor returns the pipeline monitor of the sender workers.

func (*Sender) Start

func (s *Sender) Start()

Start starts all sender workers.

func (*Sender) Stop

func (s *Sender) Stop()

Stop stops all sender workers

type Serializer

type Serializer interface {
	Serialize(message *message.Message, writer io.Writer) error
	Finish(writer io.Writer) error
	Reset()
}

Serializer transforms a batch of messages into a payload. It is the one rendering the messages (i.e. either directly using raw []byte data from unstructured messages or turning structured messages into []byte data).

func NewArraySerializer

func NewArraySerializer() Serializer

NewArraySerializer creates a new arraySerializer

type ServerlessMeta

type ServerlessMeta interface {
	Lock()
	Unlock()
	WaitGroup() *sync.WaitGroup
	SenderDoneChan() chan *sync.WaitGroup
	IsEnabled() bool
}

ServerlessMeta is a struct that contains essential control structures for serverless mode. Do not access any methods on this interface without checking IsEnabled first.

func NewServerlessMeta

func NewServerlessMeta(isEnabled bool) ServerlessMeta

NewServerlessMeta creates a new ServerlessMeta instance.

type Sink

type Sink interface {
	Channel() chan *message.Payload
}

Sink is the component that messages are sent to once the sender has finished processing them.

type Strategy

type Strategy interface {
	Start()
	Stop()
}

Strategy should contain all logic to send logs to a remote destination and forward them the next stage of the pipeline. In the logs pipeline, the strategy implementation should convert a stream of incoming Messages to a stream of Payloads that the sender can handle. A strategy is startable and stoppable so that the pipeline can manage it's lifecycle.

func NewBatchStrategy

func NewBatchStrategy(
	inputChan chan *message.Message,
	outputChan chan *message.Payload,
	flushChan chan struct{},
	serverlessMeta ServerlessMeta,
	batchWait time.Duration,
	maxBatchSize int,
	maxContentSize int,
	pipelineName string,
	compression compression.Compressor,
	pipelineMonitor metrics.PipelineMonitor,
	instanceID string,
) Strategy

NewBatchStrategy returns a new batch concurrent strategy with the specified batch & content size limits

func NewStreamStrategy

func NewStreamStrategy(inputChan chan *message.Message, outputChan chan *message.Payload, compression compression.Compressor) Strategy

NewStreamStrategy creates a new stream strategy

Directories

Path Synopsis
Package http manages creation of http-based senders
Package http manages creation of http-based senders
Package tcp manages creation of tcp-based senders
Package tcp manages creation of tcp-based senders

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL