Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AckFunc ¶
AckFunc is a function used to acknowledge receipt of a message batch from a buffer. The provided error indicates whether the message batch was successfully delivered. Returns an error if the acknowledge was not propagated.
type Config ¶
type Config struct {
Type string `json:"type" yaml:"type"`
Plugin any `json:"plugin,omitempty" yaml:"plugin,omitempty"`
}
Config is the all encompassing configuration struct for all buffer types. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.
type ReaderWriter ¶
type ReaderWriter interface {
// Read the next oldest message batch. If the buffer has a persisted store
// the message is preserved until the returned AckFunc is called. Some
// temporal buffer implementations such as windowers will ignore the ack
// func.
Read(context.Context) (message.Batch, AckFunc, error)
// Write a new message batch to the stack.
Write(context.Context, message.Batch, AckFunc) error
// EndOfInput indicates to the buffer that the input has ended and that once
// the buffer is depleted it should return component.ErrTypeClosed from Read in
// order to gracefully shut down the pipeline.
//
// EndOfInput should be idempotent as it may be called more than once.
EndOfInput()
// Close the buffer and all resources it has, messages should no longer be
// written or read by the implementation and it should clean up all
// resources.
Close(context.Context) error
}
ReaderWriter is a read/write interface implemented by buffers.
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream wraps a read/write buffer implementation with a channel based streaming component that satisfies the internal Benthos Consumer and Producer interfaces.
func (*Stream) Consume ¶
func (m *Stream) Consume(msgs <-chan message.Transaction) error
Consume assigns a messages channel for the output to read.
func (*Stream) TransactionChan ¶
func (m *Stream) TransactionChan() <-chan message.Transaction
TransactionChan returns the channel used for consuming messages from this buffer.
func (*Stream) TriggerCloseNow ¶ added in v4.6.0
func (m *Stream) TriggerCloseNow()
TriggerCloseNow shuts down the Stream and stops processing messages.
func (*Stream) TriggerStopConsuming ¶ added in v4.6.0
func (m *Stream) TriggerStopConsuming()
TriggerStopConsuming instructs the buffer to stop consuming messages and close once the buffer is empty.
type Streamed ¶
type Streamed interface {
// TransactionChan returns a channel used for consuming transactions from
// this type. Every transaction received must be resolved before another
// transaction will be sent.
TransactionChan() <-chan message.Transaction
// Consume starts the type receiving transactions from a Transactor.
Consume(<-chan message.Transaction) error
// TriggerStopConsuming instructs the buffer to cut off the producer it is
// consuming from. It will then enter a mode whereby messages can only be
// read, and when the buffer is empty it will shut down.
TriggerStopConsuming()
// TriggerCloseNow triggers the shut down of this component but should not
// block the calling goroutine.
TriggerCloseNow()
// WaitForClose is a blocking call to wait until the component has finished
// shutting down and cleaning up resources.
WaitForClose(ctx context.Context) error
}
Streamed is an interface implemented by all buffer types that provides stream based methods.
func NewStream ¶
func NewStream(typeStr string, buffer ReaderWriter, mgr component.Observability) Streamed
NewStream creates a new Producer/Consumer around a buffer.