Documentation
¶
Index ¶
Constants ¶
const ( // DefaultWorkersPerQueue - By default most pipelines will only require a single sender worker, as the single worker itself can // concurrently transmit multiple http requests at once. This value is not intended to be configurable, but legacy // usages of the sender will override this value where necessary. If there is a desire to edit the concurrency of the senders // via config, see the BatchMaxConcurrentSend endpoint setting. DefaultWorkersPerQueue = 1 // DefaultQueuesCount - By default most pipelines will only require a single queue, as the single queue itself can // concurrently transmit multiple http requests at once. Systems forced in to a legacy mode will override this value. DefaultQueuesCount = 1 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DestinationFactory ¶ added in v0.66.0
type DestinationFactory func() *client.Destinations
DestinationFactory used to generate client destinations on each call.
type DestinationSender ¶
type DestinationSender struct {
// contains filtered or unexported fields
}
DestinationSender wraps a destination to send messages blocking on a full buffer, but not blocking when a destination is retrying
func NewDestinationSender ¶
func NewDestinationSender(config pkgconfigmodel.Reader, destination client.Destination, output chan *message.Payload, bufferSize int) *DestinationSender
NewDestinationSender creates a new DestinationSender
func (*DestinationSender) NonBlockingSend ¶
func (d *DestinationSender) NonBlockingSend(payload *message.Payload) bool
NonBlockingSend tries to send the payload and fails silently if the input is full. returns false if the buffer is full - true if successful.
func (*DestinationSender) Send ¶
func (d *DestinationSender) Send(payload *message.Payload) bool
Send sends a payload and blocks if the input is full. It will not block if the destination is retrying payloads and will cancel the blocking attempt if the retry state changes
func (*DestinationSender) Stop ¶
func (d *DestinationSender) Stop()
Stop stops the DestinationSender
type MessageBuffer ¶
type MessageBuffer struct {
// contains filtered or unexported fields
}
MessageBuffer accumulates messages to a buffer until the max capacity is reached.
func NewMessageBuffer ¶
func NewMessageBuffer(batchSizeLimit int, contentSizeLimit int) *MessageBuffer
NewMessageBuffer returns a new MessageBuffer.
func (*MessageBuffer) AddMessage ¶
func (p *MessageBuffer) AddMessage(message *message.Message) bool
AddMessage adds a message to the buffer if there is still some free space, returns true if the message was added.
func (*MessageBuffer) ContentSizeLimit ¶
func (p *MessageBuffer) ContentSizeLimit() int
ContentSizeLimit returns the configured content size limit. Messages above this limit are not accepted.
func (*MessageBuffer) GetMessages ¶
func (p *MessageBuffer) GetMessages() []*message.Message
GetMessages returns the messages stored in the buffer.
func (*MessageBuffer) IsEmpty ¶
func (p *MessageBuffer) IsEmpty() bool
IsEmpty returns true if the buffer is empty.
func (*MessageBuffer) IsFull ¶
func (p *MessageBuffer) IsFull() bool
IsFull returns true if the buffer is full.
type Mock ¶ added in v0.66.0
type Mock struct {
// contains filtered or unexported fields
}
Mock represents a mocked sender that fulfills the pipeline component interface
func NewMockSender ¶ added in v0.66.0
func NewMockSender() *Mock
NewMockSender generates a mock sender
func (*Mock) PipelineMonitor ¶ added in v0.66.0
func (s *Mock) PipelineMonitor() metrics.PipelineMonitor
PipelineMonitor returns an instance of NoopPipelineMonitor
type PipelineComponent ¶ added in v0.66.0
type PipelineComponent interface {
In() chan *message.Payload
PipelineMonitor() metrics.PipelineMonitor
Start()
Stop()
}
PipelineComponent abstracts a pipeline component
type Sender ¶
type Sender struct {
// contains filtered or unexported fields
}
Sender can distribute payloads on multiple underlying workers
func NewSender ¶
func NewSender( config pkgconfigmodel.Reader, inputChan chan *message.Payload, outputChan chan *message.Payload, destinations *client.Destinations, bufferSize int, senderDoneChan chan *sync.WaitGroup, flushWg *sync.WaitGroup, pipelineMonitor metrics.PipelineMonitor, ) *Sender
NewSender is the legacy sender.
func NewSenderV2 ¶ added in v0.66.0
func NewSenderV2( config pkgconfigmodel.Reader, auditor auditor.Auditor, destinationFactory DestinationFactory, bufferSize int, senderDoneChan chan *sync.WaitGroup, flushWg *sync.WaitGroup, queueCount int, workersPerQueue int, pipelineMonitor metrics.PipelineMonitor, ) *Sender
NewSenderV2 is the default implementation of the sender factory.
func (*Sender) PipelineMonitor ¶ added in v0.66.0
func (s *Sender) PipelineMonitor() metrics.PipelineMonitor
PipelineMonitor returns the pipeline monitor of the sender workers.
type Serializer ¶
Serializer transforms a batch of messages into a payload. It is the one rendering the messages (i.e. either directly using raw []byte data from unstructured messages or turning structured messages into []byte data).
var ( // LineSerializer is a shared line serializer. LineSerializer Serializer = &lineSerializer{} // ArraySerializer is a shared line serializer. ArraySerializer Serializer = &arraySerializer{} )
type Strategy ¶
type Strategy interface {
Start()
Stop()
}
Strategy should contain all logic to send logs to a remote destination and forward them the next stage of the pipeline. In the logs pipeline, the strategy implementation should convert a stream of incoming Messages to a stream of Payloads that the sender can handle. A strategy is startable and stoppable so that the pipeline can manage it's lifecycle.
func NewBatchStrategy ¶
func NewBatchStrategy(inputChan chan *message.Message, outputChan chan *message.Payload, flushChan chan struct{}, serverless bool, flushWg *sync.WaitGroup, serializer Serializer, batchWait time.Duration, maxBatchSize int, maxContentSize int, pipelineName string, compression compression.Compressor, pipelineMonitor metrics.PipelineMonitor) Strategy
NewBatchStrategy returns a new batch concurrent strategy with the specified batch & content size limits
func NewStreamStrategy ¶
func NewStreamStrategy(inputChan chan *message.Message, outputChan chan *message.Payload, compression compression.Compressor) Strategy
NewStreamStrategy creates a new stream strategy