Documentation
¶
Index ¶
- Constants
- Variables
- func AddDefaultEncodeHandler(handler EncodeHandler)
- func AddMessageBodyEncoder(encoding EncodingType, encoder MessageBodyEncoder)
- func AddOutputFactory(name string, factory OutputFactory)
- func BatchConsumerFactory(callbacks UntypedBatchConsumerCallbackMap) (map[string]kernel.ModuleFactory, error)
- func ByteChunkToInterfaces(chunk Chunk) []any
- func ByteChunkToStrings(chunk Chunk) []string
- func CompressMessage(compression CompressionType, body []byte) ([]byte, error)
- func ConfigurableConsumerKey(name string) string
- func ConfigurableConsumerRetryKey(name string) string
- func ConfigurableInputKey(name string) string
- func ConfigurableOutputKey(name string) string
- func ConfigurableProducerKey(name string) string
- func ConsumerFactory(callbacks UntypedConsumerCallbackMap) (map[string]kernel.ModuleFactory, error)
- func DecodeMessage(encoding EncodingType, data []byte, out any) error
- func DecompressMessage(compression CompressionType, body []byte) ([]byte, error)
- func EncodeMessage(encoding EncodingType, data any) ([]byte, error)
- func GetAllConsumerNames(config cfg.Config) ([]string, error)
- func KafkaHeadersToGosoAttributes(kafkaRecordHeaders []kgo.RecordHeader) map[string]string
- func KinsumerAutoscaleModuleFactory(kinsumerInputName string) ...
- func MessagesPerRunnerHandlerFactory(ctx context.Context, config cfg.Config, logger log.Logger, ...) (calculator.Handler, error)
- func NewBaseConsumer(ctx context.Context, config cfg.Config, logger log.Logger, name string, ...) (*baseConsumer, error)
- func NewBaseConsumerWithInterfaces(uuidGen uuid.Uuid, logger log.Logger, metricWriter metric.Writer, ...) *baseConsumer
- func NewBatchConsumerFactory[M any](callbacks BatchConsumerCallbackMap[M]) kernel.ModuleMultiFactory
- func NewConfigurableMultiOutput(ctx context.Context, config cfg.Config, logger log.Logger, base string) (Output, *OutputCapabilities, error)
- func NewConfigurableOutput(ctx context.Context, config cfg.Config, logger log.Logger, name string) (Output, *OutputCapabilities, error)
- func NewConsumer[M any](name string, callbackFactory ConsumerCallbackFactory[M]) kernel.ModuleFactory
- func NewConsumerFactory[M any](callbacks ConsumerCallbackMap[M]) kernel.ModuleMultiFactory
- func NewKafkaMessage(message WritableMessage) (*kgo.Record, error)
- func NewKafkaMessageAttrs(key string) map[string]any
- func NewKafkaMessageHandler(data chan *Message) kafkaConsumer.KafkaMessageHandler
- func NewKafkaMessages(messages []WritableMessage) ([]*kgo.Record, error)
- func NewKinesisMessageHandler(channel chan *Message) kinesis.MessageHandler
- func NewLifecycleManager(settings *SqsInputSettings, targets []SnsInputTarget) reslife.LifeCycleerFactory
- func NewMessageEncoder(config *MessageEncoderSettings) *messageEncoder
- func NewMessagesPerRunnerHandlerWithInterfaces(logger log.Logger, clock clock.Clock, cwClient gosoCloudwatch.Client, ...) calculator.Handler
- func NewOutputTracer(ctx context.Context, config cfg.Config, logger log.Logger, base Output, ...) (*outputTracer, error)
- func NewOutputTracerWithInterfaces(tracer tracing.Tracer, base Output, name string) *outputTracer
- func NewProducer(ctx context.Context, config cfg.Config, logger log.Logger, name string, ...) (*producer, error)
- func NewProducerDaemon(ctx context.Context, config cfg.Config, logger log.Logger, name string) (*producerDaemon, error)
- func NewProducerDaemonWithInterfaces(logger log.Logger, metric metric.Writer, aggregator ProducerDaemonAggregator, ...) *producerDaemon
- func NewProducerWithInterfaces(encoder MessageEncoder, output Output) *producer
- func NewRetryHandler(ctx context.Context, config cfg.Config, logger log.Logger, ...) (Input, RetryHandler, error)
- func NewRetryHandlerNoop(context.Context, cfg.Config, log.Logger, string) (Input, RetryHandler, error)
- func NewRetryHandlerSqs(ctx context.Context, config cfg.Config, logger log.Logger, name string) (Input, RetryHandler, error)
- func NewSnsInput(ctx context.Context, config cfg.Config, logger log.Logger, ...) (*snsInput, error)
- func NewSnsInputWithInterfaces(sqsInput *sqsInput) *snsInput
- func NewSqsInput(ctx context.Context, config cfg.Config, logger log.Logger, ...) (*sqsInput, error)
- func NewSqsInputWithInterfaces(logger log.Logger, queue sqs.Queue, unmarshaller UnmarshallerFunc, ...) *sqsInput
- func NewTypedBatchConsumer[M any](name string, callbackFactory BatchConsumerCallbackFactory[M]) kernel.ModuleFactory
- func NewUntypedBatchConsumer(name string, callbackFactory UntypedBatchConsumerCallbackFactory) kernel.ModuleFactory
- func NewUntypedBatchConsumerFactory(callbacks UntypedBatchConsumerCallbackMap) kernel.ModuleMultiFactory
- func NewUntypedConsumer(name string, callbackFactory UntypedConsumerCallbackFactory) kernel.ModuleFactory
- func NewUntypedConsumerFactory(callbacks UntypedConsumerCallbackMap) kernel.ModuleMultiFactory
- func ProducerDaemonFactory(ctx context.Context, config cfg.Config, logger log.Logger) (map[string]kernel.ModuleFactory, error)
- func ProvideProducerDaemon(ctx context.Context, config cfg.Config, logger log.Logger, name string) (*producerDaemon, error)
- func ResetInMemoryInputs()
- func ResetInMemoryOutputs()
- func SetInputFactory(typ string, factory InputFactory)
- func SnsMarshaller(msg *Message) (*string, error)
- func WithDefaultMessageBodyEncoding(encoding EncodingType)
- type AcknowledgeableInput
- type AggregateFlush
- type BaseConsumerCallback
- type BaseOutputConfiguration
- type BaseOutputConfigurationAware
- type BaseOutputConfigurationTracing
- type BatchConsumer
- type BatchConsumerCallback
- type BatchConsumerCallbackFactory
- type BatchConsumerCallbackMap
- type BatchConsumerSettings
- type Chunk
- type Chunks
- type CompressionType
- type Consumer
- type ConsumerCallback
- type ConsumerCallbackFactory
- type ConsumerCallbackMap
- type ConsumerMetadata
- type ConsumerRetrySettings
- type ConsumerSettings
- type EncodeHandler
- type EncodingType
- type FileOutputMode
- type FileOutputSettings
- type FileSettings
- type InMemoryInput
- type InMemoryOutput
- func (o *InMemoryOutput) Clear()
- func (o *InMemoryOutput) ContainsBody(body string) bool
- func (o *InMemoryOutput) Get(i int) (*Message, bool)
- func (o *InMemoryOutput) Len() int
- func (o *InMemoryOutput) Size() int
- func (o *InMemoryOutput) Write(_ context.Context, batch []WritableMessage) error
- func (o *InMemoryOutput) WriteOne(ctx context.Context, msg WritableMessage) error
- type InMemoryOutputConfiguration
- type InMemorySettings
- type InitializeableCallback
- type Input
- func NewConfigurableInput(ctx context.Context, config cfg.Config, logger log.Logger, name string) (Input, error)
- func NewFileInput(_ cfg.Config, logger log.Logger, settings FileSettings) Input
- func NewFileInputWithInterfaces(logger log.Logger, settings FileSettings) Input
- func NewKafkaInput(ctx context.Context, config cfg.Config, logger log.Logger, ...) (Input, error)
- func NewKafkaInputWithInterfaces(logger log.Logger, connection connection.Settings, ...) Input
- func NewKinesisInput(ctx context.Context, config cfg.Config, logger log.Logger, ...) (Input, error)
- func NewNoopInput() Input
- func NewRedisListInput(ctx context.Context, config cfg.Config, logger log.Logger, ...) (Input, error)
- func NewRedisListInputWithInterfaces(logger log.Logger, client redis.Client, mw metric.Writer, ...) Input
- func ProvideConfigurableInput(ctx context.Context, config cfg.Config, logger log.Logger, name string) (Input, error)
- type InputFactory
- type KafkaInputConfiguration
- type KafkaOutputConfiguration
- type KafkaSourceMessage
- type KinesisInputConfiguration
- type KinesisOutputConfiguration
- type KinesisOutputSettings
- type KinsumerAutoscaleModule
- type KinsumerAutoscaleModuleDynamoDbNamingSettings
- type KinsumerAutoscaleModuleDynamoDbSettings
- type KinsumerAutoscaleModuleEcsSettings
- type KinsumerAutoscaleModuleSettings
- type KinsumerAutoscaleOrchestrator
- type KinsumerAutoscaleOrchestratorFactory
- type LifecycleManager
- type Message
- func BuildAggregateMessage(aggregateBody string, attributes ...map[string]string) *Message
- func KafkaToGosoMessage(kafkaRecord kgo.Record) *Message
- func MarshalJsonMessage(body any, attributes ...map[string]string) (*Message, error)
- func MarshalProtobufMessage(body ProtobufEncodable, attributes ...map[string]string) (*Message, error)
- func MessageUnmarshaller(data *string) (*Message, error)
- func NewJsonMessage(body string, attributes ...map[string]string) *Message
- func NewMessage(body string, attributes ...map[string]string) *Message
- func NewProtobufMessage(body string, attributes ...map[string]string) *Message
- func RawUnmarshaller(data *string) (*Message, error)
- func SnsUnmarshaller(data *string) (*Message, error)
- type MessageBodyCompressor
- type MessageBodyEncoder
- func InitKafkaSchemaRegistry(ctx context.Context, settings SchemaSettingsWithEncoding, ...) (MessageBodyEncoder, error)
- func NewAvroEncoder(schema string) (MessageBodyEncoder, error)
- func NewBase64LayeredProtobufEncoder() MessageBodyEncoder
- func NewJsonEncoder() MessageBodyEncoder
- func NewProtobufEncoder() MessageBodyEncoder
- type MessageEncoder
- type MessageEncoderSettings
- type ModelMsg
- type NoOpOutput
- type Output
- func NewFileOutput(_ cfg.Config, logger log.Logger, settings *FileOutputSettings) Output
- func NewKafkaOutput(ctx context.Context, config cfg.Config, logger log.Logger, ...) (Output, error)
- func NewKafkaOutputWithInterfaces(logger log.Logger, connection connection.Settings, ...) Output
- func NewKinesisOutput(ctx context.Context, config cfg.Config, logger log.Logger, ...) (Output, error)
- func NewKinesisOutputWithInterfaces(recordWriter gosoKinesis.RecordWriter) Output
- func NewRedisListOutput(ctx context.Context, config cfg.Config, logger log.Logger, ...) (Output, error)
- func NewRedisListOutputWithInterfaces(logger log.Logger, mw metric.Writer, client redis.Client, ...) Output
- func NewSnsOutput(ctx context.Context, config cfg.Config, logger log.Logger, ...) (Output, error)
- func NewSnsOutputWithInterfaces(logger log.Logger, topic sns.Topic) Output
- func NewSqsOutput(ctx context.Context, config cfg.Config, logger log.Logger, ...) (Output, error)
- func NewSqsOutputWithInterfaces(logger log.Logger, queue sqs.Queue, settings *SqsOutputSettings) Output
- type OutputCapabilities
- type OutputChannel
- type OutputFactory
- type PartitionerRand
- type Producer
- type ProducerDaemonAggregator
- func NewProducerDaemonAggregator(settings ProducerDaemonSettings, compression CompressionType, ...) (ProducerDaemonAggregator, error)
- func NewProducerDaemonNoopAggregator() ProducerDaemonAggregator
- func NewProducerDaemonPartitionedAggregator(logger log.Logger, settings ProducerDaemonSettings, ...) (ProducerDaemonAggregator, error)
- func NewProducerDaemonPartitionedAggregatorWithInterfaces(logger log.Logger, rand PartitionerRand, aggregators int, ...) (ProducerDaemonAggregator, error)
- type ProducerDaemonBatcher
- type ProducerDaemonSettings
- type ProducerMetadata
- type ProducerOption
- type ProducerSettings
- type ProtobufEncodable
- type RawMessage
- type RedisListInputSettings
- type RedisListOutputSettings
- type RetryHandler
- type RetryHandlerFactory
- type RetryHandlerNoop
- type RetryHandlerSettings
- type RetryHandlerSqs
- type RetryHandlerSqsSettings
- type RetryingInput
- type RunnableBatchConsumerCallback
- type RunnableCallback
- type RunnableConsumerCallback
- type RunnableUntypedBatchConsumerCallback
- type RunnableUntypedConsumerCallback
- type SchemaRegistryAwareInput
- type SchemaRegistryAwareOutput
- type SchemaSettings
- type SchemaSettingsAwareCallback
- type SchemaSettingsWithEncoding
- type SnsInputConfiguration
- type SnsInputSettings
- type SnsInputTarget
- type SnsInputTargetConfiguration
- type SnsOutputConfiguration
- type SnsOutputSettings
- type SqsInputSettings
- type SqsOutputConfiguration
- type SqsOutputSettings
- type UnmarshallerFunc
- type UntypedBatchConsumerCallback
- type UntypedBatchConsumerCallbackFactory
- type UntypedBatchConsumerCallbackMap
- type UntypedConsumerCallback
- type UntypedConsumerCallbackFactory
- type UntypedConsumerCallbackMap
- type WritableMessage
Constants ¶
const ( AggregateMessageModeAtLeastOnce = "atLeastOnce" AggregateMessageModeAtMostOnce = "atMostOnce" )
const ( InputTypeFile = "file" InputTypeInMemory = "inMemory" InputTypeKafka = "kafka" InputTypeKinesis = "kinesis" InputTypeRedis = "redis" InputTypeSns = "sns" InputTypeSqs = "sqs" )
const ( AttributeSqsMessageId = "sqsMessageId" AttributeSqsReceiptHandle = "sqsReceiptHandle" AttributeSqsApproximateReceiveCount = "sqsApproximateReceiveCount" )
const ( AttributeEncoding = "encoding" AttributeCompression = "compression" )
const ( AttributeKafkaKey = "KafkaKey" MetaDataKafkaOriginalMessage = "KafkaOriginal" )
const ( PrmHandlerName = "stream_messages_per_runner" PerRunnerMetricName = "StreamMessages" MessagesAvailableMetricName = "StreamMessagesAvailable" MessagesSentMetricName = "StreamMessagesSent" )
const ( OutputTypeFile = "file" OutputTypeInMemory = "inMemory" OutputTypeKafka = "kafka" OutputTypeKinesis = "kinesis" OutputTypeMultiple = "multiple" OutputTypeNoOp = "noop" OutputTypeRedis = "redis" OutputTypeSns = "sns" OutputTypeSqs = "sqs" )
const ( AttributeKinesisPartitionKey = "gosoline.kinesis.partitionKey" AttributeKinesisExplicitHashKey = "gosoline.kinesis.explicitHashKey" )
const ( AttributeAggregate = "goso.aggregate" AttributeAggregateCount = "goso.aggregate.count" )
const ( AttributeRetry = "goso.retry" AttributeRetryId = "goso.retry.id" )
const ( UnmarshallerMsg = "msg" UnmarshallerRaw = "raw" UnmarshallerSns = "sns" )
const (
ConfigKeyStream = "stream"
)
const MetadataKeyProducers = "stream.producers"
const SqsOutputBatchSize = 10
Variables ¶
var DefaultOutputCapabilities = &OutputCapabilities{ IsPartitionedOutput: false, ProvidesCompression: false, SupportsAggregation: true, MaxBatchSize: nil, MaxMessageSize: nil, IgnoreProducerDaemonBatchSettings: false, }
Functions ¶
func AddDefaultEncodeHandler ¶
func AddDefaultEncodeHandler(handler EncodeHandler)
func AddMessageBodyEncoder ¶
func AddMessageBodyEncoder(encoding EncodingType, encoder MessageBodyEncoder)
func AddOutputFactory ¶ added in v0.51.0
func AddOutputFactory(name string, factory OutputFactory)
func BatchConsumerFactory ¶ added in v0.50.0
func BatchConsumerFactory(callbacks UntypedBatchConsumerCallbackMap) (map[string]kernel.ModuleFactory, error)
func ByteChunkToInterfaces ¶
func ByteChunkToStrings ¶
func CompressMessage ¶
func CompressMessage(compression CompressionType, body []byte) ([]byte, error)
func ConfigurableConsumerKey ¶
func ConfigurableInputKey ¶
func ConfigurableOutputKey ¶
func ConfigurableProducerKey ¶
func ConsumerFactory ¶
func ConsumerFactory(callbacks UntypedConsumerCallbackMap) (map[string]kernel.ModuleFactory, error)
func DecodeMessage ¶
func DecodeMessage(encoding EncodingType, data []byte, out any) error
func DecompressMessage ¶
func DecompressMessage(compression CompressionType, body []byte) ([]byte, error)
func EncodeMessage ¶
func EncodeMessage(encoding EncodingType, data any) ([]byte, error)
func GetAllConsumerNames ¶ added in v0.26.2
func KafkaHeadersToGosoAttributes ¶ added in v0.12.0
func KafkaHeadersToGosoAttributes(kafkaRecordHeaders []kgo.RecordHeader) map[string]string
func KinsumerAutoscaleModuleFactory ¶ added in v0.34.1
func MessagesPerRunnerHandlerFactory ¶ added in v0.22.0
func MessagesPerRunnerHandlerFactory(ctx context.Context, config cfg.Config, logger log.Logger, calculatorSettings *calculator.CalculatorSettings) (calculator.Handler, error)
func NewBaseConsumer ¶
func NewBaseConsumerWithInterfaces ¶
func NewBaseConsumerWithInterfaces( uuidGen uuid.Uuid, logger log.Logger, metricWriter metric.Writer, tracer tracing.Tracer, input Input, encoder MessageEncoder, retryInput Input, retryHandler RetryHandler, consumerCallback any, settings ConsumerSettings, name string, appId cfg.AppId, ) *baseConsumer
func NewBatchConsumerFactory ¶ added in v0.50.0
func NewBatchConsumerFactory[M any](callbacks BatchConsumerCallbackMap[M]) kernel.ModuleMultiFactory
func NewConfigurableOutput ¶
func NewConsumer ¶
func NewConsumer[M any](name string, callbackFactory ConsumerCallbackFactory[M]) kernel.ModuleFactory
func NewConsumerFactory ¶
func NewConsumerFactory[M any](callbacks ConsumerCallbackMap[M]) kernel.ModuleMultiFactory
func NewKafkaMessage ¶
func NewKafkaMessage(message WritableMessage) (*kgo.Record, error)
func NewKafkaMessageAttrs ¶
func NewKafkaMessageHandler ¶ added in v0.51.0
func NewKafkaMessageHandler(data chan *Message) kafkaConsumer.KafkaMessageHandler
func NewKafkaMessages ¶
func NewKafkaMessages(messages []WritableMessage) ([]*kgo.Record, error)
func NewKinesisMessageHandler ¶
func NewKinesisMessageHandler(channel chan *Message) kinesis.MessageHandler
func NewLifecycleManager ¶ added in v0.37.0
func NewLifecycleManager(settings *SqsInputSettings, targets []SnsInputTarget) reslife.LifeCycleerFactory
func NewMessageEncoder ¶
func NewMessageEncoder(config *MessageEncoderSettings) *messageEncoder
func NewMessagesPerRunnerHandlerWithInterfaces ¶ added in v0.22.0
func NewMessagesPerRunnerHandlerWithInterfaces( logger log.Logger, clock clock.Clock, cwClient gosoCloudwatch.Client, baseHandler calculator.PerRunnerMetricHandler, calculatorSettings *calculator.CalculatorSettings, handlerSettings *calculator.PerRunnerMetricSettings, queueNames []string, ) calculator.Handler
func NewOutputTracer ¶
func NewProducer ¶
func NewProducerDaemon ¶
func NewProducerDaemonWithInterfaces ¶
func NewProducerDaemonWithInterfaces( logger log.Logger, metric metric.Writer, aggregator ProducerDaemonAggregator, batcher ProducerDaemonBatcher, output Output, clock clock.Clock, name string, settings ProducerDaemonSettings, supportsAggregation bool, ) *producerDaemon
func NewProducerWithInterfaces ¶
func NewProducerWithInterfaces(encoder MessageEncoder, output Output) *producer
func NewRetryHandler ¶
func NewRetryHandlerNoop ¶
func NewRetryHandlerSqs ¶
func NewSnsInput ¶
func NewSnsInput(ctx context.Context, config cfg.Config, logger log.Logger, settings *SnsInputSettings, targets []SnsInputTarget) (*snsInput, error)
func NewSnsInputWithInterfaces ¶
func NewSnsInputWithInterfaces(sqsInput *sqsInput) *snsInput
func NewSqsInput ¶
func NewSqsInputWithInterfaces ¶
func NewSqsInputWithInterfaces( logger log.Logger, queue sqs.Queue, unmarshaller UnmarshallerFunc, healthCheckTimer clock.HealthCheckTimer, settings *SqsInputSettings, ) *sqsInput
func NewTypedBatchConsumer ¶ added in v0.41.0
func NewTypedBatchConsumer[M any](name string, callbackFactory BatchConsumerCallbackFactory[M]) kernel.ModuleFactory
func NewUntypedBatchConsumer ¶ added in v0.41.0
func NewUntypedBatchConsumer(name string, callbackFactory UntypedBatchConsumerCallbackFactory) kernel.ModuleFactory
func NewUntypedBatchConsumerFactory ¶ added in v0.50.0
func NewUntypedBatchConsumerFactory(callbacks UntypedBatchConsumerCallbackMap) kernel.ModuleMultiFactory
func NewUntypedConsumer ¶ added in v0.41.0
func NewUntypedConsumer(name string, callbackFactory UntypedConsumerCallbackFactory) kernel.ModuleFactory
func NewUntypedConsumerFactory ¶ added in v0.41.0
func NewUntypedConsumerFactory(callbacks UntypedConsumerCallbackMap) kernel.ModuleMultiFactory
func ProducerDaemonFactory ¶
func ProvideProducerDaemon ¶
func ResetInMemoryInputs ¶
func ResetInMemoryInputs()
func ResetInMemoryOutputs ¶
func ResetInMemoryOutputs()
func SetInputFactory ¶
func SetInputFactory(typ string, factory InputFactory)
func SnsMarshaller ¶
func WithDefaultMessageBodyEncoding ¶
func WithDefaultMessageBodyEncoding(encoding EncodingType)
Types ¶
type AcknowledgeableInput ¶
type AcknowledgeableInput interface {
Input
// Ack acknowledges a single message. If possible, prefer calling AckBatch as it is more efficient.
Ack(ctx context.Context, msg *Message, ack bool) error
// AckBatch does the same as calling Ack for every single message would, but it might use fewer calls to an external
// service.
AckBatch(ctx context.Context, msgs []*Message, acks []bool) error
}
An AcknowledgeableInput is an Input with the additional ability to mark messages as successfully consumed. For example, an SQS queue would provide a message after its visibility timeout a second time if we didn't acknowledge it.
type AggregateFlush ¶
type BaseConsumerCallback ¶
type BaseOutputConfiguration ¶
type BaseOutputConfiguration struct {
Tracing BaseOutputConfigurationTracing `cfg:"tracing"`
}
func (*BaseOutputConfiguration) SetTracing ¶
func (b *BaseOutputConfiguration) SetTracing(enabled bool)
type BaseOutputConfigurationAware ¶
type BaseOutputConfigurationAware interface {
SetTracing(enabled bool)
}
type BaseOutputConfigurationTracing ¶
type BaseOutputConfigurationTracing struct {
Enabled bool `cfg:"enabled" default:"true"`
}
type BatchConsumer ¶
type BatchConsumer struct {
// contains filtered or unexported fields
}
func NewUntypedBatchConsumerWithInterfaces ¶ added in v0.41.0
func NewUntypedBatchConsumerWithInterfaces(base *baseConsumer, callback UntypedBatchConsumerCallback, ticker *time.Ticker, settings *BatchConsumerSettings) *BatchConsumer
type BatchConsumerCallback ¶
type BatchConsumerCallbackMap ¶ added in v0.50.0
type BatchConsumerCallbackMap[M any] map[string]BatchConsumerCallbackFactory[M]
type BatchConsumerSettings ¶
type Chunks ¶
type Chunks []Chunk
func BuildChunks ¶
func BuildChunks(batch []WritableMessage, size int) (Chunks, error)
type CompressionType ¶
type CompressionType string
const ( CompressionNone CompressionType = "none" CompressionGZip CompressionType = "application/gzip" // compressors that are only provided externally (e.g. by kafka) CompressionSnappy CompressionType = "application/snappy" CompressionLZ4 CompressionType = "application/lz4" CompressionZstd CompressionType = "application/zstd" )
func GetCompressionAttribute ¶
func GetCompressionAttribute(attributes map[string]string) *CompressionType
GetCompressionAttribute returns the compression attribute if one is set, nil if none is set, and an error if the set value is of the wrong type.
func (CompressionType) String ¶
func (s CompressionType) String() string
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func NewUntypedConsumerWithInterfaces ¶ added in v0.41.0
func NewUntypedConsumerWithInterfaces(base *baseConsumer, callback UntypedConsumerCallback, healthCheckTimer clock.HealthCheckTimer) *Consumer
type ConsumerCallback ¶
type ConsumerCallbackFactory ¶
type ConsumerCallbackMap ¶
type ConsumerCallbackMap[M any] map[string]ConsumerCallbackFactory[M]
type ConsumerMetadata ¶
type ConsumerRetrySettings ¶
type ConsumerSettings ¶
type ConsumerSettings struct {
Input string `cfg:"input" default:"consumer" validate:"required"`
RunnerCount int `cfg:"runner_count" default:"1" validate:"min=1"`
Encoding EncodingType `cfg:"encoding" default:"application/json"`
IdleTimeout time.Duration `cfg:"idle_timeout" default:"10s"`
AcknowledgeGraceTime time.Duration `cfg:"acknowledge_grace_time" default:"10s"`
ConsumeGraceTime time.Duration `cfg:"consume_grace_time" default:"10s"`
Retry ConsumerRetrySettings `cfg:"retry"`
Healthcheck health.HealthCheckSettings `cfg:"healthcheck"`
AggregateMessageMode string `cfg:"aggregate_message_mode" default:"atMostOnce" validate:"oneof=atLeastOnce atMostOnce"`
}
func ReadConsumerSettings ¶ added in v0.28.3
func ReadConsumerSettings(config cfg.Config, name string) (ConsumerSettings, error)
type EncodeHandler ¶
type EncodingType ¶
type EncodingType string
const ( EncodingAvro EncodingType = "application/avro" EncodingJson EncodingType = "application/json" EncodingProtobuf EncodingType = "application/x-protobuf" )
func GetEncodingAttribute ¶
func GetEncodingAttribute(attributes map[string]string) *EncodingType
GetEncodingAttribute returns the encoding attribute if one is set, nil if none is set, and an error if the set value is of the wrong type.
func (EncodingType) String ¶
func (s EncodingType) String() string
type FileOutputMode ¶
type FileOutputMode string
const ( FileOutputModeAppend FileOutputMode = "append" FileOutputModeSingle FileOutputMode = "single" FileOutputModeTruncate FileOutputMode = "truncate" )
type FileOutputSettings ¶
type FileOutputSettings struct {
Filename string `cfg:"filename"`
Mode FileOutputMode `cfg:"mode" default:"append"`
}
type FileSettings ¶
type InMemoryInput ¶
type InMemoryInput struct {
// contains filtered or unexported fields
}
func NewInMemoryInput ¶
func NewInMemoryInput(settings *InMemorySettings) *InMemoryInput
func ProvideInMemoryInput ¶
func ProvideInMemoryInput(name string, settings *InMemorySettings) *InMemoryInput
func (*InMemoryInput) Data ¶
func (i *InMemoryInput) Data() <-chan *Message
func (*InMemoryInput) IsHealthy ¶ added in v0.40.17
func (i *InMemoryInput) IsHealthy() bool
func (*InMemoryInput) Publish ¶
func (i *InMemoryInput) Publish(messages ...*Message)
func (*InMemoryInput) Reset ¶
func (i *InMemoryInput) Reset()
func (*InMemoryInput) Stop ¶
func (i *InMemoryInput) Stop(ctx context.Context)
type InMemoryOutput ¶
type InMemoryOutput struct {
// contains filtered or unexported fields
}
func NewInMemoryOutput ¶
func NewInMemoryOutput() *InMemoryOutput
func ProvideInMemoryOutput ¶
func ProvideInMemoryOutput(name string) *InMemoryOutput
func (*InMemoryOutput) Clear ¶
func (o *InMemoryOutput) Clear()
func (*InMemoryOutput) ContainsBody ¶
func (o *InMemoryOutput) ContainsBody(body string) bool
func (*InMemoryOutput) Len ¶
func (o *InMemoryOutput) Len() int
func (*InMemoryOutput) Size ¶
func (o *InMemoryOutput) Size() int
func (*InMemoryOutput) Write ¶
func (o *InMemoryOutput) Write(_ context.Context, batch []WritableMessage) error
func (*InMemoryOutput) WriteOne ¶
func (o *InMemoryOutput) WriteOne(ctx context.Context, msg WritableMessage) error
type InMemoryOutputConfiguration ¶
type InMemoryOutputConfiguration struct {
BaseOutputConfiguration
Type string `cfg:"type" default:"inMemory"`
}
type InMemorySettings ¶
type InMemorySettings struct {
Size int `cfg:"size" default:"1"`
}
type InitializeableCallback ¶ added in v0.37.0
type Input ¶
type Input interface {
// Run provides a steady stream of messages, returned via Data. Run does not return until Stop is called and thus
// should be called in its own go routine. The only exception to this is if we either fail to produce messages and
// return an error or if the input is depleted (like an InMemoryInput).
//
// Run should only be called once, not all inputs can be resumed.
Run(ctx context.Context) error
// Stop causes Run to return as fast as possible. Calling Stop is preferable to canceling the context passed to Run
// as it allows Run to shut down cleaner (and might take a bit longer, e.g., to finish processing the current batch
// of messages).
Stop(ctx context.Context)
// Data returns a channel containing the messages produced by this input.
Data() <-chan *Message
// IsHealthy checks if the input is still able to produce data. An Input is healthy if it produces zero or more
// messages repeatedly. Producing zero messages would for example happen if the input requested data from an
// external queue, but the queue was empty. An Input is unhealthy if it is no longer able to produce any messages.
//
// If an input exhausts its source (file, finished stream, fixed list, ...), it is still considered as healthy.
IsHealthy() bool
}
An Input provides you with a steady stream of messages until you Stop it.
func NewConfigurableInput ¶
func NewFileInput ¶
func NewFileInputWithInterfaces ¶
func NewFileInputWithInterfaces(logger log.Logger, settings FileSettings) Input
func NewKafkaInput ¶
func NewKafkaInputWithInterfaces ¶
func NewKafkaInputWithInterfaces( logger log.Logger, connection connection.Settings, healthCheckTimer clock.HealthCheckTimer, partitionManager kafkaConsumer.PartitionManager, reader kafkaConsumer.Reader, schemaRegistryService schemaRegistry.Service, maxPollRecords int, data chan *Message, ) Input
func NewKinesisInput ¶
func NewNoopInput ¶ added in v0.14.0
func NewNoopInput() Input
func NewRedisListInput ¶
func NewRedisListInputWithInterfaces ¶
func NewRedisListInputWithInterfaces( logger log.Logger, client redis.Client, mw metric.Writer, settings *RedisListInputSettings, healthCheckTimer clock.HealthCheckTimer, ) Input
type InputFactory ¶
type KafkaInputConfiguration ¶ added in v0.51.0
type KafkaInputConfiguration struct {
kafkaConsumer.Settings
Type string `cfg:"type" default:"kafka"`
}
type KafkaOutputConfiguration ¶ added in v0.51.0
type KafkaOutputConfiguration struct {
BaseOutputConfiguration
Type string `cfg:"type" default:"kafka"`
Project string `cfg:"project"`
Family string `cfg:"family"`
Group string `cfg:"group"`
Application string `cfg:"application"`
TopicId string `cfg:"topic_id"`
Connection string `cfg:"connection" default:"default"`
// LingerTimeout is the max time the producer will wait for new records before flushing the current batch.
// When set to 0s, batches will be sent out as fast as possible (or when the size limits are reached with enough back pressure).
// The kafka library recommends to increase this only when batching with low volume.
LingerTimeout time.Duration `cfg:"linger_timeout" default:"0s"`
RequestTimeout time.Duration `cfg:"request_timeout" default:"10s"`
MaxBatchSize int `cfg:"max_batch_size" default:"10000"`
MaxBatchBytes int32 `cfg:"max_batch_bytes" default:"1000012"`
}
type KafkaSourceMessage ¶
type KinesisOutputConfiguration ¶
type KinesisOutputConfiguration struct {
BaseOutputConfiguration
Type string `cfg:"type" default:"kinesis"`
Project string `cfg:"project"`
Family string `cfg:"family"`
Group string `cfg:"group"`
Application string `cfg:"application"`
ClientName string `cfg:"client_name" default:"default"`
StreamName string `cfg:"stream_name"`
}
type KinesisOutputSettings ¶
func (KinesisOutputSettings) GetAppId ¶
func (s KinesisOutputSettings) GetAppId() cfg.AppId
func (KinesisOutputSettings) GetClientName ¶
func (s KinesisOutputSettings) GetClientName() string
func (KinesisOutputSettings) GetStreamName ¶
func (s KinesisOutputSettings) GetStreamName() string
type KinsumerAutoscaleModule ¶ added in v0.34.1
type KinsumerAutoscaleModule struct {
kernel.BackgroundModule
kernel.ApplicationStage
// contains filtered or unexported fields
}
func NewKinsumerAutoscaleModuleWithInterfaces ¶ added in v0.34.1
func NewKinsumerAutoscaleModuleWithInterfaces( logger log.Logger, kinesisClient gosoKinesis.Client, kinesisStreamName string, leaderElection ddb.LeaderElection, memberId string, orchestrator KinsumerAutoscaleOrchestrator, settings KinsumerAutoscaleModuleSettings, ticker clock.Ticker, ) *KinsumerAutoscaleModule
type KinsumerAutoscaleModuleDynamoDbNamingSettings ¶ added in v0.34.1
type KinsumerAutoscaleModuleDynamoDbNamingSettings struct {
Pattern string `cfg:"pattern,nodecode" default:"{env}-kinsumer-autoscale-leaders"`
}
type KinsumerAutoscaleModuleDynamoDbSettings ¶ added in v0.34.1
type KinsumerAutoscaleModuleDynamoDbSettings struct {
Naming KinsumerAutoscaleModuleDynamoDbNamingSettings `cfg:"naming"`
}
type KinsumerAutoscaleModuleEcsSettings ¶ added in v0.34.1
type KinsumerAutoscaleModuleSettings ¶ added in v0.34.1
type KinsumerAutoscaleModuleSettings struct {
Ecs KinsumerAutoscaleModuleEcsSettings `cfg:"ecs"`
Enabled bool `cfg:"enabled" default:"true"`
DynamoDb KinsumerAutoscaleModuleDynamoDbSettings `cfg:"dynamodb"`
LeaderElection string `cfg:"leader_election" default:"kinsumer-autoscale"`
Orchestrator string `cfg:"orchestrator" default:"ecs"`
Period time.Duration `cfg:"period" default:"1m"`
}
type KinsumerAutoscaleOrchestrator ¶ added in v0.34.1
type KinsumerAutoscaleOrchestratorFactory ¶ added in v0.34.1
type LifecycleManager ¶ added in v0.37.0
type LifecycleManager interface {
reslife.LifeCycleer
reslife.Creator
}
type Message ¶
type Message struct {
Attributes map[string]string `json:"attributes"`
Body string `json:"body"`
// contains filtered or unexported fields
}
func BuildAggregateMessage ¶
func KafkaToGosoMessage ¶
func MarshalJsonMessage ¶
func MarshalProtobufMessage ¶
func MarshalProtobufMessage(body ProtobufEncodable, attributes ...map[string]string) (*Message, error)
func MessageUnmarshaller ¶
func NewProtobufMessage ¶
func RawUnmarshaller ¶
func SnsUnmarshaller ¶
func (*Message) GetAttributes ¶
func (*Message) MarshalToBytes ¶
func (*Message) MarshalToString ¶
func (*Message) UnmarshalFromBytes ¶
func (*Message) UnmarshalFromString ¶
type MessageBodyCompressor ¶
type MessageBodyEncoder ¶
type MessageBodyEncoder interface {
Encode(data any) ([]byte, error)
Decode(data []byte, out any) error
}
func InitKafkaSchemaRegistry ¶ added in v0.51.0
func InitKafkaSchemaRegistry( ctx context.Context, settings SchemaSettingsWithEncoding, schemaRegistryService schemaRegistry.Service, ) (MessageBodyEncoder, error)
func NewAvroEncoder ¶ added in v0.51.0
func NewAvroEncoder(schema string) (MessageBodyEncoder, error)
func NewBase64LayeredProtobufEncoder ¶ added in v0.51.0
func NewBase64LayeredProtobufEncoder() MessageBodyEncoder
func NewJsonEncoder ¶
func NewJsonEncoder() MessageBodyEncoder
func NewProtobufEncoder ¶
func NewProtobufEncoder() MessageBodyEncoder
type MessageEncoder ¶
type MessageEncoderSettings ¶
type MessageEncoderSettings struct {
Encoding EncodingType
Compression CompressionType
EncodeHandlers []EncodeHandler
ExternalEncoder MessageBodyEncoder
}
type ModelMsg ¶
func CreateModelMsg ¶
type NoOpOutput ¶
type NoOpOutput struct{}
func (*NoOpOutput) Write ¶
func (o *NoOpOutput) Write(_ context.Context, _ []WritableMessage) error
func (*NoOpOutput) WriteOne ¶
func (o *NoOpOutput) WriteOne(_ context.Context, _ WritableMessage) error
type Output ¶
type Output interface {
WriteOne(ctx context.Context, msg WritableMessage) error
Write(ctx context.Context, batch []WritableMessage) error
}
func NewFileOutput ¶
func NewKafkaOutput ¶
func NewKafkaOutputWithInterfaces ¶
func NewKafkaOutputWithInterfaces( logger log.Logger, connection connection.Settings, schemaRegistryService schemaRegistry.Service, writer kafkaProducer.Writer, maxBatchBytes int32, maxBatchSize int, ) Output
func NewKinesisOutput ¶
func NewKinesisOutputWithInterfaces ¶
func NewKinesisOutputWithInterfaces(recordWriter gosoKinesis.RecordWriter) Output
func NewRedisListOutput ¶
func NewSnsOutput ¶
func NewSqsOutput ¶
type OutputCapabilities ¶ added in v0.51.0
type OutputCapabilities struct {
// IsPartitionedOutput should be true if the output is writing to more than one shard/partition/bucket, and we need to
// take care about writing messages to the correct partition.
IsPartitionedOutput bool
// ProvidesCompression should be true if the Output natively handles compression.
ProvidesCompression bool
// SupportsAggregation should be false if the Output can not handle aggregated messages, e.g. when using a schema registry.
SupportsAggregation bool
// MaxBatchSize is the maximum number of messages we can write at once to the output (or nil if there is no limit).
MaxBatchSize *int
// MaxMessageSize is the maximum size of a message for this output (or nil if there is no limit on message size).
MaxMessageSize *int
// IgnoreProducerDaemonBatchSettings should be true if only the size restrictions specified on the output capabilities should be used.
// Otherwise, they are only used if lower than the restrictions specified on the producer daemon batch settings.
IgnoreProducerDaemonBatchSettings bool
}
type OutputChannel ¶
type OutputChannel interface {
Read() ([]WritableMessage, bool)
Write(ctx context.Context, msg []WritableMessage)
Close(ctx context.Context)
IsClosed() bool
}
func NewOutputChannel ¶
func NewOutputChannel(logger log.Logger, bufferSize int) OutputChannel
type OutputFactory ¶
type PartitionerRand ¶
type Producer ¶
type Producer interface {
WriteOne(ctx context.Context, model any, attributeSets ...map[string]string) error
Write(ctx context.Context, models any, attributeSets ...map[string]string) error
}
func NewMultiProducer ¶
type ProducerDaemonAggregator ¶
type ProducerDaemonAggregator interface {
Write(ctx context.Context, msg *Message) ([]AggregateFlush, error)
Flush() ([]AggregateFlush, error)
}
func NewProducerDaemonAggregator ¶
func NewProducerDaemonAggregator(settings ProducerDaemonSettings, compression CompressionType, attributeSets ...map[string]string) (ProducerDaemonAggregator, error)
func NewProducerDaemonNoopAggregator ¶ added in v0.51.0
func NewProducerDaemonNoopAggregator() ProducerDaemonAggregator
func NewProducerDaemonPartitionedAggregator ¶
func NewProducerDaemonPartitionedAggregator(logger log.Logger, settings ProducerDaemonSettings, compression CompressionType) (ProducerDaemonAggregator, error)
func NewProducerDaemonPartitionedAggregatorWithInterfaces ¶
func NewProducerDaemonPartitionedAggregatorWithInterfaces(logger log.Logger, rand PartitionerRand, aggregators int, createAggregator func(attributes map[string]string) (ProducerDaemonAggregator, error)) (ProducerDaemonAggregator, error)
type ProducerDaemonBatcher ¶
type ProducerDaemonBatcher interface {
Append(msg *Message) ([]WritableMessage, error)
Flush() []WritableMessage
}
func NewProducerDaemonBatcher ¶
func NewProducerDaemonBatcher(settings ProducerDaemonSettings) ProducerDaemonBatcher
func NewProducerDaemonBatcherWithoutJsonEncoding ¶ added in v0.51.0
func NewProducerDaemonBatcherWithoutJsonEncoding(settings ProducerDaemonSettings) ProducerDaemonBatcher
type ProducerDaemonSettings ¶
type ProducerDaemonSettings struct {
Enabled bool `cfg:"enabled" default:"false"`
// Amount of time spend waiting for messages before sending out a batch.
Interval time.Duration `cfg:"interval" default:"1m"`
// Size of the buffer channel, i.e., how many messages can be in-flight at once? Generally it is a good idea to match
// this with the number of runners.
BufferSize int `cfg:"buffer_size" default:"10" validate:"min=1"`
// Number of daemons running in the background, writing complete batches to the output.
RunnerCount int `cfg:"runner_count" default:"10" validate:"min=1"`
// How many SQS messages do we submit in a single batch? SQS can accept up to 10 messages per batch.
// SNS doesn't support batching, so the value doesn't matter for SNS.
BatchSize int `cfg:"batch_size" default:"10" validate:"min=1"`
// How large may the sum of all messages in the aggregation be? For SQS you can't send more than 256 KB in one batch,
// for SNS a single message can't be larger than 256 KB. We use 252 KB as default to leave some room for request
// encoding and overhead.
BatchMaxSize int `cfg:"batch_max_size" default:"258048" validate:"min=0"`
// How many stream.Messages do we pack together in a single batch (one message in SQS) at once?
AggregationSize int `cfg:"aggregation_size" default:"1" validate:"min=1"`
// Maximum size in bytes of a batch. Defaults to 64 KB to leave some room for encoding overhead.
// Set to 0 to disable limiting the maximum size for a batch (it will still not put more than BatchSize messages
// in a batch).
//
// Note: Gosoline can't ensure your messages stay below this size if your messages are quite large (especially when
// using compression). Imagine you already aggregated 40kb of compressed messages (around 53kb when base64 encoded)
// and are now writing a message that compresses to 20 kb. Now your buffer reaches 60 kb and 80 kb base64 encoded.
// Gosoline will not already output a 53 kb message if you requested 64 kb messages (it would accept a 56 kb message),
// but after writing the next message
AggregationMaxSize int `cfg:"aggregation_max_size" default:"65536" validate:"min=0"`
// If you are writing to an output using a partition key, we ensure messages are still distributed to a partition
// according to their partition key (although not necessary the same partition as without the producer daemon).
// For this, we split the messages into buckets while collecting them, thus potentially aggregating more messages in
// memory (depending on the number of buckets you configure).
//
// Note: This still does not guarantee that your messages are perfectly ordered - this is impossible as soon as you
// have more than once producer. However, messages with the same partition key will end up in the same shard, so if
// you are reading two different shards and one is much further behind than the other, you will not see messages
// *massively* out of order - it should be roughly bounded by the time you buffer messages (the Interval setting) and
// thus be not much more than a minute (using the default setting) instead of hours (if one shard is half a day behind
// while the other is up-to-date).
//
// Second note: If you change the amount of partitions, messages might move between buckets and thus end up in different
// shards than before. Thus, only do this if you can handle it (e.g., because no shard is currently lagging behind).
PartitionBucketCount int `cfg:"partition_bucket_count" default:"128" validate:"min=1"`
// Additional attributes we append to each message
MessageAttributes map[string]string `cfg:"message_attributes"`
}
type ProducerMetadata ¶
type ProducerOption ¶ added in v0.51.0
type ProducerOption func(p *producerOptions)
func WithEncodeHandlers ¶ added in v0.51.0
func WithEncodeHandlers(encodeHandlers []EncodeHandler) ProducerOption
func WithSchemaSettings ¶ added in v0.51.0
func WithSchemaSettings(schemaSettings SchemaSettings) ProducerOption
type ProducerSettings ¶
type ProducerSettings struct {
Output string `cfg:"output"`
Encoding EncodingType `cfg:"encoding"`
Compression CompressionType `cfg:"compression" default:"none"`
Daemon ProducerDaemonSettings `cfg:"daemon"`
}
type ProtobufEncodable ¶
type RawMessage ¶
type RawMessage struct {
Body any
Encoder MessageBodyEncoder
}
func NewRawMessage ¶
func NewRawMessage(body any, encoder MessageBodyEncoder) *RawMessage
NewRawMessage creates a new RawMessage. It uses the provided encoder to encode the message body.
func NewRawMessageWithJsonEncoder ¶ added in v0.51.0
func NewRawMessageWithJsonEncoder(body any) *RawMessage
NewRawMessageWithJsonEncoder works like NewRawMessage with the encoder set to marshal the body as JSON.
func (*RawMessage) MarshalToBytes ¶
func (m *RawMessage) MarshalToBytes() ([]byte, error)
func (*RawMessage) MarshalToString ¶
func (m *RawMessage) MarshalToString() (string, error)
type RedisListInputSettings ¶
type RedisListOutputSettings ¶
type RetryHandler ¶
func NewManualSqsRetryHandler ¶ added in v0.14.0
func NewManualSqsRetryHandler(logger log.Logger, queue sqs.Queue, settings *SqsOutputSettings) RetryHandler
func NewManualSqsRetryHandlerFromInterfaces ¶ added in v0.14.0
func NewManualSqsRetryHandlerFromInterfaces(output Output) RetryHandler
type RetryHandlerFactory ¶
type RetryHandlerNoop ¶
type RetryHandlerNoop struct{}
func NewRetryHandlerNoopWithInterfaces ¶
func NewRetryHandlerNoopWithInterfaces() RetryHandlerNoop
type RetryHandlerSettings ¶
type RetryHandlerSqs ¶
type RetryHandlerSqs struct {
// contains filtered or unexported fields
}
func NewRetryHandlerSqsWithInterfaces ¶
func NewRetryHandlerSqsWithInterfaces(output Output, settings *RetryHandlerSqsSettings) *RetryHandlerSqs
type RetryHandlerSqsSettings ¶
type RetryHandlerSqsSettings struct {
cfg.AppId
RetryHandlerSettings
ClientName string `cfg:"client_name" default:"default"`
MaxNumberOfMessages int32 `cfg:"max_number_of_messages" default:"10" validate:"min=1,max=10"`
WaitTime int32 `cfg:"wait_time" default:"10"`
RunnerCount int `cfg:"runner_count" default:"1"`
QueueId string `cfg:"queue_id"`
Healthcheck health.HealthCheckSettings `cfg:"healthcheck"`
}
type RetryingInput ¶ added in v0.14.0
type RetryingInput interface {
GetRetryHandler() (Input, RetryHandler)
}
type RunnableBatchConsumerCallback ¶
type RunnableBatchConsumerCallback[M any] interface { BatchConsumerCallback[M] RunnableCallback }
type RunnableCallback ¶
type RunnableConsumerCallback ¶
type RunnableConsumerCallback[M any] interface { ConsumerCallback[M] RunnableCallback }
type RunnableUntypedBatchConsumerCallback ¶ added in v0.41.0
type RunnableUntypedBatchConsumerCallback interface {
UntypedBatchConsumerCallback
RunnableCallback
}
type RunnableUntypedConsumerCallback ¶ added in v0.41.0
type RunnableUntypedConsumerCallback interface {
UntypedConsumerCallback
RunnableCallback
}
type SchemaRegistryAwareInput ¶ added in v0.51.0
type SchemaRegistryAwareInput interface {
Input
// InitSchemaRegistry initializes the schema registry and returns the encoder/decoder corresponding to the schema
InitSchemaRegistry(ctx context.Context, settings SchemaSettingsWithEncoding) (MessageBodyEncoder, error)
}
type SchemaRegistryAwareOutput ¶ added in v0.51.0
type SchemaRegistryAwareOutput interface {
Output
// InitSchemaRegistry initializes the schema registry and returns the encoder/decoder corresponding to the schema
InitSchemaRegistry(ctx context.Context, settings SchemaSettingsWithEncoding) (MessageBodyEncoder, error)
}
type SchemaSettings ¶ added in v0.51.0
func (SchemaSettings) WithEncoding ¶ added in v0.51.0
func (s SchemaSettings) WithEncoding(encoding EncodingType) SchemaSettingsWithEncoding
type SchemaSettingsAwareCallback ¶ added in v0.51.0
type SchemaSettingsAwareCallback interface {
GetSchemaSettings() (*SchemaSettings, error)
}
type SchemaSettingsWithEncoding ¶ added in v0.51.0
type SchemaSettingsWithEncoding struct {
Subject string
Schema string
Encoding EncodingType
ProtobufMessageIndex []int
Model any
}
type SnsInputConfiguration ¶
type SnsInputConfiguration struct {
Type string `cfg:"type" default:"sns"`
ConsumerId string `cfg:"id" validate:"required"`
Family string `cfg:"family" default:""`
Group string `cfg:"group" default:""`
Application string `cfg:"application" default:""`
Targets []SnsInputTargetConfiguration `cfg:"targets" validate:"min=1"`
MaxNumberOfMessages int32 `cfg:"max_number_of_messages" default:"10" validate:"min=1,max=10"`
WaitTime int32 `cfg:"wait_time" default:"3" validate:"min=1"`
VisibilityTimeout int `cfg:"visibility_timeout" default:"30" validate:"min=1"`
RunnerCount int `cfg:"runner_count" default:"1" validate:"min=1"`
RedrivePolicy sqs.RedrivePolicy `cfg:"redrive_policy"`
ClientName string `cfg:"client_name" default:"default"`
Healthcheck health.HealthCheckSettings `cfg:"healthcheck"`
}
type SnsInputSettings ¶
type SnsInputSettings struct {
cfg.AppId
QueueId string `cfg:"queue_id"`
MaxNumberOfMessages int32 `cfg:"max_number_of_messages" default:"10" validate:"min=1,max=10"`
WaitTime int32 `cfg:"wait_time"`
RedrivePolicy sqs.RedrivePolicy `cfg:"redrive_policy"`
VisibilityTimeout int `cfg:"visibility_timeout"`
RunnerCount int `cfg:"runner_count"`
ClientName string `cfg:"client_name"`
Healthcheck health.HealthCheckSettings `cfg:"healthcheck"`
}
func (SnsInputSettings) GetAppId ¶
func (s SnsInputSettings) GetAppId() cfg.AppId
func (SnsInputSettings) GetClientName ¶
func (s SnsInputSettings) GetClientName() string
func (SnsInputSettings) GetQueueId ¶
func (s SnsInputSettings) GetQueueId() string
func (SnsInputSettings) IsFifoEnabled ¶
func (s SnsInputSettings) IsFifoEnabled() bool
type SnsInputTarget ¶
type SnsInputTarget struct {
cfg.AppId
TopicId string
Attributes map[string]string
ClientName string
}
func (SnsInputTarget) GetAppId ¶
func (t SnsInputTarget) GetAppId() cfg.AppId
func (SnsInputTarget) GetClientName ¶
func (t SnsInputTarget) GetClientName() string
func (SnsInputTarget) GetTopicId ¶
func (t SnsInputTarget) GetTopicId() string
type SnsInputTargetConfiguration ¶
type SnsInputTargetConfiguration struct {
Family string `cfg:"family"`
Group string `cfg:"group" validate:"required"`
Application string `cfg:"application" validate:"required"`
TopicId string `cfg:"topic_id" validate:"required"`
Attributes map[string]string `cfg:"attributes"`
ClientName string `cfg:"client_name" default:"default"`
}
type SnsOutputConfiguration ¶
type SnsOutputConfiguration struct {
BaseOutputConfiguration
Type string `cfg:"type" default:"sns"`
Project string `cfg:"project"`
Family string `cfg:"family"`
Group string `cfg:"group"`
Application string `cfg:"application"`
TopicId string `cfg:"topic_id" validate:"required"`
ClientName string `cfg:"client_name" default:"default"`
}
type SnsOutputSettings ¶
func (SnsOutputSettings) GetAppId ¶
func (s SnsOutputSettings) GetAppId() cfg.AppId
func (SnsOutputSettings) GetClientName ¶
func (s SnsOutputSettings) GetClientName() string
func (SnsOutputSettings) GetTopicId ¶
func (s SnsOutputSettings) GetTopicId() string
type SqsInputSettings ¶
type SqsInputSettings struct {
cfg.AppId
QueueId string `cfg:"queue_id"`
MaxNumberOfMessages int32 `cfg:"max_number_of_messages" default:"10" validate:"min=1,max=10"`
WaitTime int32 `cfg:"wait_time"`
VisibilityTimeout int `cfg:"visibility_timeout"`
RunnerCount int `cfg:"runner_count"`
Fifo sqs.FifoSettings `cfg:"fifo"`
RedrivePolicy sqs.RedrivePolicy `cfg:"redrive_policy"`
ClientName string `cfg:"client_name"`
Unmarshaller string `cfg:"unmarshaller" default:"msg"`
Healthcheck health.HealthCheckSettings `cfg:"healthcheck"`
}
func (SqsInputSettings) GetAppId ¶
func (s SqsInputSettings) GetAppId() cfg.AppId
func (SqsInputSettings) GetClientName ¶
func (s SqsInputSettings) GetClientName() string
func (SqsInputSettings) GetQueueId ¶
func (s SqsInputSettings) GetQueueId() string
func (SqsInputSettings) IsFifoEnabled ¶
func (s SqsInputSettings) IsFifoEnabled() bool
type SqsOutputConfiguration ¶
type SqsOutputConfiguration struct {
BaseOutputConfiguration
Type string `cfg:"type" default:"sqs"`
Project string `cfg:"project"`
Family string `cfg:"family"`
Group string `cfg:"group"`
Application string `cfg:"application"`
QueueId string `cfg:"queue_id" validate:"required"`
VisibilityTimeout int `cfg:"visibility_timeout" default:"30" validate:"gt=0"`
RedrivePolicy sqs.RedrivePolicy `cfg:"redrive_policy"`
Fifo sqs.FifoSettings `cfg:"fifo"`
ClientName string `cfg:"client_name" default:"default"`
}
type SqsOutputSettings ¶
type SqsOutputSettings struct {
cfg.AppId
ClientName string
Fifo sqs.FifoSettings
QueueId string
RedrivePolicy sqs.RedrivePolicy
VisibilityTimeout int
}
func (SqsOutputSettings) GetAppId ¶
func (s SqsOutputSettings) GetAppId() cfg.AppId
func (SqsOutputSettings) GetClientName ¶
func (s SqsOutputSettings) GetClientName() string
func (SqsOutputSettings) GetQueueId ¶
func (s SqsOutputSettings) GetQueueId() string
func (SqsOutputSettings) IsFifoEnabled ¶
func (s SqsOutputSettings) IsFifoEnabled() bool
type UnmarshallerFunc ¶
type UntypedBatchConsumerCallback ¶ added in v0.41.0
type UntypedBatchConsumerCallback interface {
BaseConsumerCallback
Consume(ctx context.Context, models []any, attributes []map[string]string) ([]bool, error)
}
func EraseBatchConsumerCallbackTypes ¶ added in v0.41.0
func EraseBatchConsumerCallbackTypes[M any](consumerCallback BatchConsumerCallback[M]) UntypedBatchConsumerCallback
type UntypedBatchConsumerCallbackFactory ¶ added in v0.41.0
type UntypedBatchConsumerCallbackFactory func(ctx context.Context, config cfg.Config, logger log.Logger) (UntypedBatchConsumerCallback, error)
func EraseBatchConsumerCallbackFactoryTypes ¶ added in v0.41.0
func EraseBatchConsumerCallbackFactoryTypes[M any](callbackFactory BatchConsumerCallbackFactory[M]) UntypedBatchConsumerCallbackFactory
type UntypedBatchConsumerCallbackMap ¶ added in v0.50.0
type UntypedBatchConsumerCallbackMap map[string]UntypedBatchConsumerCallbackFactory
type UntypedConsumerCallback ¶ added in v0.41.0
type UntypedConsumerCallback interface {
BaseConsumerCallback
Consume(ctx context.Context, model any, attributes map[string]string) (bool, error)
}
func EraseConsumerCallbackTypes ¶ added in v0.41.0
func EraseConsumerCallbackTypes[M any](consumerCallback ConsumerCallback[M]) UntypedConsumerCallback
type UntypedConsumerCallbackFactory ¶ added in v0.41.0
type UntypedConsumerCallbackFactory func(ctx context.Context, config cfg.Config, logger log.Logger) (UntypedConsumerCallback, error)
func EraseConsumerCallbackFactoryTypes ¶ added in v0.41.0
func EraseConsumerCallbackFactoryTypes[M any](callbackFactory ConsumerCallbackFactory[M]) UntypedConsumerCallbackFactory
type UntypedConsumerCallbackMap ¶ added in v0.41.0
type UntypedConsumerCallbackMap map[string]UntypedConsumerCallbackFactory
type WritableMessage ¶
type WritableMessage interface {
MarshalToBytes() ([]byte, error)
MarshalToString() (string, error)
}
func MessagesToWritableMessages ¶
func MessagesToWritableMessages(batch []*Message) []WritableMessage
func NewRawJsonMessage ¶
func NewRawJsonMessage(attributes map[string]string, body []byte) WritableMessage
Source Files
¶
- chunking.go
- compression.go
- config.go
- consumer.go
- consumer_acknowledge.go
- consumer_base.go
- consumer_batch.go
- consumer_batch_module_factory.go
- consumer_module_factory.go
- consumer_settings.go
- encoding.go
- encoding_avro.go
- encoding_json.go
- encoding_protobuf.go
- encoding_protobuf_base64_layered.go
- input.go
- input_configurable.go
- input_file.go
- input_in_memory.go
- input_kafka.go
- input_kinesis.go
- input_noop.go
- input_redis_list.go
- input_sns.go
- input_sns_lifecycle.go
- input_sqs.go
- kinsumer_autoscale_module.go
- kinsumer_autoscale_module_settings.go
- kinsumer_autoscale_orchestrator.go
- kinsumer_autoscale_orchestrator_ecs.go
- manual_sqs_retry_handler.go
- message.go
- message_builder.go
- message_encoding.go
- message_kafka.go
- message_model.go
- mpr_handler.go
- mpr_queue_names.go
- multi_producer.go
- output.go
- output_channel.go
- output_configurable.go
- output_configurable_multiple.go
- output_file.go
- output_in_memory.go
- output_kafka.go
- output_kinesis.go
- output_noop.go
- output_redis_list.go
- output_sns.go
- output_sqs.go
- output_tracer.go
- producer.go
- producer_daemon.go
- producer_daemon_aggregator.go
- producer_daemon_batcher.go
- producer_daemon_config_postprocessor.go
- producer_daemon_factory.go
- producer_daemon_noop_aggregator.go
- producer_daemon_partitioned_aggregator.go
- producer_options.go
- raw_message.go
- retry.go
- retry_noop.go
- retry_sqs.go
- schema_registry_kafka.go
- schema_settings.go
- typed_batch_consumer.go
- typed_consumer.go
- unmarshal.go