Documentation
¶
Index ¶
- Constants
- func AddDefaultEncodeHandler(handler EncodeHandler)
- func AddMessageBodyEncoder(encoding EncodingType, encoder MessageBodyEncoder)
- func BatchConsumerFactory(callbacks UntypedBatchConsumerCallbackMap) (map[string]kernel.ModuleFactory, error)
- func ByteChunkToInterfaces(chunk Chunk) []any
- func ByteChunkToStrings(chunk Chunk) []string
- func CompressMessage(compression CompressionType, body []byte) ([]byte, error)
- func ConfigurableConsumerKey(name string) string
- func ConfigurableConsumerRetryKey(name string) string
- func ConfigurableInputKey(name string) string
- func ConfigurableOutputKey(name string) string
- func ConfigurableProducerKey(name string) string
- func ConsumerFactory(callbacks UntypedConsumerCallbackMap) (map[string]kernel.ModuleFactory, error)
- func DecodeMessage(encoding EncodingType, data []byte, out any) error
- func DecompressMessage(compression CompressionType, body []byte) ([]byte, error)
- func EncodeMessage(encoding EncodingType, data any) ([]byte, error)
- func GetAllConsumerNames(config cfg.Config) ([]string, error)
- func GosoToKafkaMessage(msg *Message) kafka.Message
- func GosoToKafkaMessages(msgs ...*Message) []kafka.Message
- func KafkaHeadersToGosoAttributes(headers []kafka.Header) map[string]string
- func KinsumerAutoscaleModuleFactory(kinsumerInputName string) ...
- func MessagesPerRunnerHandlerFactory(ctx context.Context, config cfg.Config, logger log.Logger, ...) (calculator.Handler, error)
- func NewBaseConsumer(ctx context.Context, config cfg.Config, logger log.Logger, name string, ...) (*baseConsumer, error)
- func NewBaseConsumerWithInterfaces(uuidGen uuid.Uuid, logger log.Logger, metricWriter metric.Writer, ...) *baseConsumer
- func NewBatchConsumerFactory[M any](callbacks BatchConsumerCallbackMap[M]) kernel.ModuleMultiFactory
- func NewConsumer[M any](name string, callbackFactory ConsumerCallbackFactory[M]) kernel.ModuleFactory
- func NewConsumerFactory[M any](callbacks ConsumerCallbackMap[M]) kernel.ModuleMultiFactory
- func NewKafkaMessage(writable WritableMessage) kafka.Message
- func NewKafkaMessageAttrs(key string) map[string]any
- func NewKafkaMessages(ms []WritableMessage) []kafka.Message
- func NewKinesisMessageHandler(channel chan *Message) kinesis.MessageHandler
- func NewLifecycleManager(settings *SqsInputSettings, targets []SnsInputTarget) reslife.LifeCycleerFactory
- func NewMessageEncoder(config *MessageEncoderSettings) *messageEncoder
- func NewMessagesPerRunnerHandlerWithInterfaces(logger log.Logger, clock clock.Clock, cwClient gosoCloudwatch.Client, ...) calculator.Handler
- func NewOutputTracer(ctx context.Context, config cfg.Config, logger log.Logger, base Output, ...) (*outputTracer, error)
- func NewOutputTracerWithInterfaces(tracer tracing.Tracer, base Output, name string) *outputTracer
- func NewProducer(ctx context.Context, config cfg.Config, logger log.Logger, name string, ...) (*producer, error)
- func NewProducerDaemon(ctx context.Context, config cfg.Config, logger log.Logger, name string) (*producerDaemon, error)
- func NewProducerDaemonWithInterfaces(logger log.Logger, metric metric.Writer, aggregator ProducerDaemonAggregator, ...) *producerDaemon
- func NewProducerWithInterfaces(encoder MessageEncoder, output Output) *producer
- func NewRetryHandler(ctx context.Context, config cfg.Config, logger log.Logger, ...) (Input, RetryHandler, error)
- func NewRetryHandlerNoop(context.Context, cfg.Config, log.Logger, string) (Input, RetryHandler, error)
- func NewRetryHandlerSqs(ctx context.Context, config cfg.Config, logger log.Logger, name string) (Input, RetryHandler, error)
- func NewSnsInput(ctx context.Context, config cfg.Config, logger log.Logger, ...) (*snsInput, error)
- func NewSnsInputWithInterfaces(sqsInput *sqsInput) *snsInput
- func NewSqsInput(ctx context.Context, config cfg.Config, logger log.Logger, ...) (*sqsInput, error)
- func NewSqsInputWithInterfaces(logger log.Logger, queue sqs.Queue, unmarshaller UnmarshallerFunc, ...) *sqsInput
- func NewTypedBatchConsumer[M any](name string, callbackFactory BatchConsumerCallbackFactory[M]) kernel.ModuleFactory
- func NewUntypedBatchConsumer(name string, callbackFactory UntypedBatchConsumerCallbackFactory) kernel.ModuleFactory
- func NewUntypedBatchConsumerFactory(callbacks UntypedBatchConsumerCallbackMap) kernel.ModuleMultiFactory
- func NewUntypedConsumer(name string, callbackFactory UntypedConsumerCallbackFactory) kernel.ModuleFactory
- func NewUntypedConsumerFactory(callbacks UntypedConsumerCallbackMap) kernel.ModuleMultiFactory
- func ProducerDaemonFactory(ctx context.Context, config cfg.Config, logger log.Logger) (map[string]kernel.ModuleFactory, error)
- func ProvideProducerDaemon(ctx context.Context, config cfg.Config, logger log.Logger, name string) (*producerDaemon, error)
- func ResetInMemoryInputs()
- func ResetInMemoryOutputs()
- func SetInputFactory(typ string, factory InputFactory)
- func SnsMarshaller(msg *Message) (*string, error)
- func WithDefaultMessageBodyEncoding(encoding EncodingType)
- type AcknowledgeableInput
- type AggregateFlush
- type BaseConsumerCallback
- type BaseOutputConfiguration
- type BaseOutputConfigurationAware
- type BaseOutputConfigurationTracing
- type BatchConsumer
- type BatchConsumerCallback
- type BatchConsumerCallbackFactory
- type BatchConsumerCallbackMap
- type BatchConsumerSettings
- type Chunk
- type Chunks
- type CompressionType
- type Consumer
- type ConsumerCallback
- type ConsumerCallbackFactory
- type ConsumerCallbackMap
- type ConsumerMetadata
- type ConsumerRetrySettings
- type ConsumerSettings
- type EncodeHandler
- type EncodingType
- type FileOutputMode
- type FileOutputSettings
- type FileSettings
- type InMemoryInput
- type InMemoryOutput
- func (o *InMemoryOutput) Clear()
- func (o *InMemoryOutput) ContainsBody(body string) bool
- func (o *InMemoryOutput) Get(i int) (*Message, bool)
- func (o *InMemoryOutput) Len() int
- func (o *InMemoryOutput) Size() int
- func (o *InMemoryOutput) Write(_ context.Context, batch []WritableMessage) error
- func (o *InMemoryOutput) WriteOne(ctx context.Context, msg WritableMessage) error
- type InMemoryOutputConfiguration
- type InMemorySettings
- type InitializeableCallback
- type Input
- func NewConfigurableInput(ctx context.Context, config cfg.Config, logger log.Logger, name string) (Input, error)
- func NewFileInput(_ cfg.Config, logger log.Logger, settings FileSettings) Input
- func NewFileInputWithInterfaces(logger log.Logger, settings FileSettings) Input
- func NewKinesisInput(ctx context.Context, config cfg.Config, logger log.Logger, ...) (Input, error)
- func NewNoopInput() Input
- func NewRedisListInput(ctx context.Context, config cfg.Config, logger log.Logger, ...) (Input, error)
- func NewRedisListInputWithInterfaces(logger log.Logger, client redis.Client, mw metric.Writer, ...) Input
- func ProvideConfigurableInput(ctx context.Context, config cfg.Config, logger log.Logger, name string) (Input, error)
- type InputFactory
- type KafkaInput
- func (i *KafkaInput) Ack(ctx context.Context, msg *Message, _ bool) error
- func (i *KafkaInput) AckBatch(ctx context.Context, msgs []*Message, _ []bool) error
- func (i *KafkaInput) Data() <-chan *Message
- func (i *KafkaInput) IsHealthy() bool
- func (i *KafkaInput) Run(ctx context.Context) error
- func (i *KafkaInput) Stop(ctx context.Context)
- type KafkaOutput
- type KafkaSourceMessage
- type KinesisInputConfiguration
- type KinesisOutputConfiguration
- type KinesisOutputSettings
- type KinsumerAutoscaleModule
- type KinsumerAutoscaleModuleDynamoDbNamingSettings
- type KinsumerAutoscaleModuleDynamoDbSettings
- type KinsumerAutoscaleModuleEcsSettings
- type KinsumerAutoscaleModuleSettings
- type KinsumerAutoscaleOrchestrator
- type KinsumerAutoscaleOrchestratorFactory
- type LifecycleManager
- type Message
- func BuildAggregateMessage(aggregateBody string, attributes ...map[string]string) *Message
- func KafkaToGosoMessage(k kafka.Message) *Message
- func MarshalJsonMessage(body any, attributes ...map[string]string) (*Message, error)
- func MarshalProtobufMessage(body ProtobufEncodable, attributes ...map[string]string) (*Message, error)
- func MessageUnmarshaller(data *string) (*Message, error)
- func NewJsonMessage(body string, attributes ...map[string]string) *Message
- func NewMessage(body string, attributes ...map[string]string) *Message
- func NewProtobufMessage(body string, attributes ...map[string]string) *Message
- func RawUnmarshaller(data *string) (*Message, error)
- func SnsUnmarshaller(data *string) (*Message, error)
- type MessageBodyCompressor
- type MessageBodyEncoder
- type MessageEncoder
- type MessageEncoderSettings
- type ModelMsg
- type NoOpOutput
- type Output
- func NewConfigurableMultiOutput(ctx context.Context, config cfg.Config, logger log.Logger, base string) (Output, error)
- func NewConfigurableOutput(ctx context.Context, config cfg.Config, logger log.Logger, name string) (Output, error)
- func NewFileOutput(_ cfg.Config, logger log.Logger, settings *FileOutputSettings) Output
- func NewKinesisOutput(ctx context.Context, config cfg.Config, logger log.Logger, ...) (Output, error)
- func NewKinesisOutputWithInterfaces(recordWriter gosoKinesis.RecordWriter) Output
- func NewRedisListOutput(ctx context.Context, config cfg.Config, logger log.Logger, ...) (Output, error)
- func NewRedisListOutputWithInterfaces(logger log.Logger, mw metric.Writer, client redis.Client, ...) Output
- func NewSnsOutput(ctx context.Context, config cfg.Config, logger log.Logger, ...) (Output, error)
- func NewSnsOutputWithInterfaces(logger log.Logger, topic sns.Topic) Output
- func NewSqsOutput(ctx context.Context, config cfg.Config, logger log.Logger, ...) (Output, error)
- func NewSqsOutputWithInterfaces(logger log.Logger, queue sqs.Queue, settings *SqsOutputSettings) Output
- type OutputChannel
- type OutputFactory
- type PartitionedOutput
- type PartitionerRand
- type Producer
- type ProducerDaemonAggregator
- func NewProducerDaemonAggregator(settings ProducerDaemonSettings, compression CompressionType, ...) (ProducerDaemonAggregator, error)
- func NewProducerDaemonPartitionedAggregator(logger log.Logger, settings ProducerDaemonSettings, ...) (ProducerDaemonAggregator, error)
- func NewProducerDaemonPartitionedAggregatorWithInterfaces(logger log.Logger, rand PartitionerRand, aggregators int, ...) (ProducerDaemonAggregator, error)
- type ProducerDaemonBatcher
- type ProducerDaemonSettings
- type ProducerMetadata
- type ProducerSettings
- type ProtobufEncodable
- type RawMessage
- type RedisListInputSettings
- type RedisListOutputSettings
- type RetryHandler
- type RetryHandlerFactory
- type RetryHandlerNoop
- type RetryHandlerSettings
- type RetryHandlerSqs
- type RetryHandlerSqsSettings
- type RetryingInput
- type RunnableBatchConsumerCallback
- type RunnableCallback
- type RunnableConsumerCallback
- type RunnableUntypedBatchConsumerCallback
- type RunnableUntypedConsumerCallback
- type SizeRestrictedOutput
- type SnsInputConfiguration
- type SnsInputSettings
- type SnsInputTarget
- type SnsInputTargetConfiguration
- type SnsOutputConfiguration
- type SnsOutputSettings
- type SqsInputSettings
- type SqsOutputConfiguration
- type SqsOutputSettings
- type UnmarshallerFunc
- type UntypedBatchConsumerCallback
- type UntypedBatchConsumerCallbackFactory
- type UntypedBatchConsumerCallbackMap
- type UntypedConsumerCallback
- type UntypedConsumerCallbackFactory
- type UntypedConsumerCallbackMap
- type WritableMessage
Constants ¶
const ( AggregateMessageModeAtLeastOnce = "atLeastOnce" AggregateMessageModeAtMostOnce = "atMostOnce" )
const ( InputTypeFile = "file" InputTypeInMemory = "inMemory" InputTypeKinesis = "kinesis" InputTypeRedis = "redis" InputTypeSns = "sns" InputTypeSqs = "sqs" InputTypeKafka = "kafka" )
const ( AttributeSqsMessageId = "sqsMessageId" AttributeSqsReceiptHandle = "sqsReceiptHandle" AttributeSqsApproximateReceiveCount = "sqsApproximateReceiveCount" )
const ( AttributeEncoding = "encoding" AttributeCompression = "compression" )
const ( AttributeKafkaKey = "KafkaKey" MetaDataKafkaOriginalMessage = "KafkaOriginal" )
const ( PrmHandlerName = "stream_messages_per_runner" PerRunnerMetricName = "StreamMessages" MessagesAvailableMetricName = "StreamMessagesAvailable" MessagesSentMetricName = "StreamMessagesSent" )
const ( OutputTypeFile = "file" OutputTypeInMemory = "inMemory" OutputTypeKinesis = "kinesis" OutputTypeMultiple = "multiple" OutputTypeNoOp = "noop" OutputTypeRedis = "redis" OutputTypeSns = "sns" OutputTypeSqs = "sqs" OutputTypeKafka = "kafka" )
const ( AttributeKinesisPartitionKey = "gosoline.kinesis.partitionKey" AttributeKinesisExplicitHashKey = "gosoline.kinesis.explicitHashKey" )
const ( AttributeAggregate = "goso.aggregate" AttributeAggregateCount = "goso.aggregate.count" )
const ( AttributeRetry = "goso.retry" AttributeRetryId = "goso.retry.id" )
const ( UnmarshallerMsg = "msg" UnmarshallerRaw = "raw" UnmarshallerSns = "sns" )
const (
ConfigKeyStream = "stream"
)
const MetadataKeyProducers = "stream.producers"
const SqsOutputBatchSize = 10
Variables ¶
This section is empty.
Functions ¶
func AddDefaultEncodeHandler ¶
func AddDefaultEncodeHandler(handler EncodeHandler)
func AddMessageBodyEncoder ¶
func AddMessageBodyEncoder(encoding EncodingType, encoder MessageBodyEncoder)
func BatchConsumerFactory ¶ added in v0.50.0
func BatchConsumerFactory(callbacks UntypedBatchConsumerCallbackMap) (map[string]kernel.ModuleFactory, error)
func ByteChunkToInterfaces ¶
func ByteChunkToStrings ¶
func CompressMessage ¶
func CompressMessage(compression CompressionType, body []byte) ([]byte, error)
func ConfigurableConsumerKey ¶
func ConfigurableInputKey ¶
func ConfigurableOutputKey ¶
func ConfigurableProducerKey ¶
func ConsumerFactory ¶
func ConsumerFactory(callbacks UntypedConsumerCallbackMap) (map[string]kernel.ModuleFactory, error)
func DecodeMessage ¶
func DecodeMessage(encoding EncodingType, data []byte, out any) error
func DecompressMessage ¶
func DecompressMessage(compression CompressionType, body []byte) ([]byte, error)
func EncodeMessage ¶
func EncodeMessage(encoding EncodingType, data any) ([]byte, error)
func GetAllConsumerNames ¶ added in v0.26.2
func GosoToKafkaMessage ¶
func GosoToKafkaMessages ¶
func KafkaHeadersToGosoAttributes ¶ added in v0.12.0
func KinsumerAutoscaleModuleFactory ¶ added in v0.34.1
func MessagesPerRunnerHandlerFactory ¶ added in v0.22.0
func MessagesPerRunnerHandlerFactory(ctx context.Context, config cfg.Config, logger log.Logger, calculatorSettings *calculator.CalculatorSettings) (calculator.Handler, error)
func NewBaseConsumer ¶
func NewBaseConsumerWithInterfaces ¶
func NewBaseConsumerWithInterfaces( uuidGen uuid.Uuid, logger log.Logger, metricWriter metric.Writer, tracer tracing.Tracer, input Input, encoder MessageEncoder, retryInput Input, retryHandler RetryHandler, consumerCallback any, settings ConsumerSettings, name string, appId cfg.AppId, ) *baseConsumer
func NewBatchConsumerFactory ¶ added in v0.50.0
func NewBatchConsumerFactory[M any](callbacks BatchConsumerCallbackMap[M]) kernel.ModuleMultiFactory
func NewConsumer ¶
func NewConsumer[M any](name string, callbackFactory ConsumerCallbackFactory[M]) kernel.ModuleFactory
func NewConsumerFactory ¶
func NewConsumerFactory[M any](callbacks ConsumerCallbackMap[M]) kernel.ModuleMultiFactory
func NewKafkaMessage ¶
func NewKafkaMessage(writable WritableMessage) kafka.Message
func NewKafkaMessageAttrs ¶
func NewKafkaMessages ¶
func NewKafkaMessages(ms []WritableMessage) []kafka.Message
func NewKinesisMessageHandler ¶
func NewKinesisMessageHandler(channel chan *Message) kinesis.MessageHandler
func NewLifecycleManager ¶ added in v0.37.0
func NewLifecycleManager(settings *SqsInputSettings, targets []SnsInputTarget) reslife.LifeCycleerFactory
func NewMessageEncoder ¶
func NewMessageEncoder(config *MessageEncoderSettings) *messageEncoder
func NewMessagesPerRunnerHandlerWithInterfaces ¶ added in v0.22.0
func NewMessagesPerRunnerHandlerWithInterfaces( logger log.Logger, clock clock.Clock, cwClient gosoCloudwatch.Client, baseHandler calculator.PerRunnerMetricHandler, calculatorSettings *calculator.CalculatorSettings, handlerSettings *calculator.PerRunnerMetricSettings, queueNames []string, ) calculator.Handler
func NewOutputTracer ¶
func NewProducer ¶
func NewProducerDaemon ¶
func NewProducerDaemonWithInterfaces ¶
func NewProducerDaemonWithInterfaces( logger log.Logger, metric metric.Writer, aggregator ProducerDaemonAggregator, output Output, clock clock.Clock, name string, settings ProducerDaemonSettings, ) *producerDaemon
func NewProducerWithInterfaces ¶
func NewProducerWithInterfaces(encoder MessageEncoder, output Output) *producer
func NewRetryHandler ¶
func NewRetryHandlerNoop ¶
func NewRetryHandlerSqs ¶
func NewSnsInput ¶
func NewSnsInput(ctx context.Context, config cfg.Config, logger log.Logger, settings *SnsInputSettings, targets []SnsInputTarget) (*snsInput, error)
func NewSnsInputWithInterfaces ¶
func NewSnsInputWithInterfaces(sqsInput *sqsInput) *snsInput
func NewSqsInput ¶
func NewSqsInputWithInterfaces ¶
func NewSqsInputWithInterfaces( logger log.Logger, queue sqs.Queue, unmarshaller UnmarshallerFunc, healthCheckTimer clock.HealthCheckTimer, settings *SqsInputSettings, ) *sqsInput
func NewTypedBatchConsumer ¶ added in v0.41.0
func NewTypedBatchConsumer[M any](name string, callbackFactory BatchConsumerCallbackFactory[M]) kernel.ModuleFactory
func NewUntypedBatchConsumer ¶ added in v0.41.0
func NewUntypedBatchConsumer(name string, callbackFactory UntypedBatchConsumerCallbackFactory) kernel.ModuleFactory
func NewUntypedBatchConsumerFactory ¶ added in v0.50.0
func NewUntypedBatchConsumerFactory(callbacks UntypedBatchConsumerCallbackMap) kernel.ModuleMultiFactory
func NewUntypedConsumer ¶ added in v0.41.0
func NewUntypedConsumer(name string, callbackFactory UntypedConsumerCallbackFactory) kernel.ModuleFactory
func NewUntypedConsumerFactory ¶ added in v0.41.0
func NewUntypedConsumerFactory(callbacks UntypedConsumerCallbackMap) kernel.ModuleMultiFactory
func ProducerDaemonFactory ¶
func ProvideProducerDaemon ¶
func ResetInMemoryInputs ¶
func ResetInMemoryInputs()
func ResetInMemoryOutputs ¶
func ResetInMemoryOutputs()
func SetInputFactory ¶
func SetInputFactory(typ string, factory InputFactory)
func SnsMarshaller ¶
func WithDefaultMessageBodyEncoding ¶
func WithDefaultMessageBodyEncoding(encoding EncodingType)
Types ¶
type AcknowledgeableInput ¶
type AcknowledgeableInput interface {
Input
// Ack acknowledges a single message. If possible, prefer calling AckBatch as it is more efficient.
Ack(ctx context.Context, msg *Message, ack bool) error
// AckBatch does the same as calling Ack for every single message would, but it might use fewer calls to an external
// service.
AckBatch(ctx context.Context, msgs []*Message, acks []bool) error
}
An AcknowledgeableInput is an Input with the additional ability to mark messages as successfully consumed. For example, an SQS queue would provide a message after its visibility timeout a second time if we didn't acknowledge it.
type AggregateFlush ¶
type BaseConsumerCallback ¶
type BaseOutputConfiguration ¶
type BaseOutputConfiguration struct {
Tracing BaseOutputConfigurationTracing `cfg:"tracing"`
}
func (*BaseOutputConfiguration) SetTracing ¶
func (b *BaseOutputConfiguration) SetTracing(enabled bool)
type BaseOutputConfigurationAware ¶
type BaseOutputConfigurationAware interface {
SetTracing(enabled bool)
}
type BaseOutputConfigurationTracing ¶
type BaseOutputConfigurationTracing struct {
Enabled bool `cfg:"enabled" default:"true"`
}
type BatchConsumer ¶
type BatchConsumer struct {
// contains filtered or unexported fields
}
func NewUntypedBatchConsumerWithInterfaces ¶ added in v0.41.0
func NewUntypedBatchConsumerWithInterfaces(base *baseConsumer, callback UntypedBatchConsumerCallback, ticker *time.Ticker, settings *BatchConsumerSettings) *BatchConsumer
type BatchConsumerCallback ¶
type BatchConsumerCallbackMap ¶ added in v0.50.0
type BatchConsumerCallbackMap[M any] map[string]BatchConsumerCallbackFactory[M]
type BatchConsumerSettings ¶
type Chunks ¶
type Chunks []Chunk
func BuildChunks ¶
func BuildChunks(batch []WritableMessage, size int) (Chunks, error)
type CompressionType ¶
type CompressionType string
const ( CompressionNone CompressionType = "none" CompressionGZip CompressionType = "application/gzip" )
func GetCompressionAttribute ¶
func GetCompressionAttribute(attributes map[string]string) *CompressionType
GetCompressionAttribute returns the compression attribute if one is set, nil if none is set, and an error if the set value is of the wrong type.
func (CompressionType) String ¶
func (s CompressionType) String() string
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func NewUntypedConsumerWithInterfaces ¶ added in v0.41.0
func NewUntypedConsumerWithInterfaces(base *baseConsumer, callback UntypedConsumerCallback, healthCheckTimer clock.HealthCheckTimer) *Consumer
type ConsumerCallback ¶
type ConsumerCallbackFactory ¶
type ConsumerCallbackMap ¶
type ConsumerCallbackMap[M any] map[string]ConsumerCallbackFactory[M]
type ConsumerMetadata ¶
type ConsumerRetrySettings ¶
type ConsumerSettings ¶
type ConsumerSettings struct {
Input string `cfg:"input" default:"consumer" validate:"required"`
RunnerCount int `cfg:"runner_count" default:"1" validate:"min=1"`
Encoding EncodingType `cfg:"encoding" default:"application/json"`
IdleTimeout time.Duration `cfg:"idle_timeout" default:"10s"`
AcknowledgeGraceTime time.Duration `cfg:"acknowledge_grace_time" default:"10s"`
ConsumeGraceTime time.Duration `cfg:"consume_grace_time" default:"10s"`
Retry ConsumerRetrySettings `cfg:"retry"`
Healthcheck health.HealthCheckSettings `cfg:"healthcheck"`
AggregateMessageMode string `cfg:"aggregate_message_mode" default:"atMostOnce" validate:"oneof=atLeastOnce atMostOnce"`
}
func ReadConsumerSettings ¶ added in v0.28.3
func ReadConsumerSettings(config cfg.Config, name string) (ConsumerSettings, error)
type EncodeHandler ¶
type EncodingType ¶
type EncodingType string
const ( EncodingJson EncodingType = "application/json" EncodingProtobuf EncodingType = "application/x-protobuf" )
func GetEncodingAttribute ¶
func GetEncodingAttribute(attributes map[string]string) *EncodingType
GetEncodingAttribute returns the encoding attribute if one is set, nil if none is set, and an error if the set value is of the wrong type.
func (EncodingType) String ¶
func (s EncodingType) String() string
type FileOutputMode ¶
type FileOutputMode string
const ( FileOutputModeAppend FileOutputMode = "append" FileOutputModeSingle FileOutputMode = "single" FileOutputModeTruncate FileOutputMode = "truncate" )
type FileOutputSettings ¶
type FileOutputSettings struct {
Filename string `cfg:"filename"`
Mode FileOutputMode `cfg:"mode" default:"append"`
}
type FileSettings ¶
type InMemoryInput ¶
type InMemoryInput struct {
// contains filtered or unexported fields
}
func NewInMemoryInput ¶
func NewInMemoryInput(settings *InMemorySettings) *InMemoryInput
func ProvideInMemoryInput ¶
func ProvideInMemoryInput(name string, settings *InMemorySettings) *InMemoryInput
func (*InMemoryInput) Data ¶
func (i *InMemoryInput) Data() <-chan *Message
func (*InMemoryInput) IsHealthy ¶ added in v0.40.17
func (i *InMemoryInput) IsHealthy() bool
func (*InMemoryInput) Publish ¶
func (i *InMemoryInput) Publish(messages ...*Message)
func (*InMemoryInput) Reset ¶
func (i *InMemoryInput) Reset()
func (*InMemoryInput) Stop ¶
func (i *InMemoryInput) Stop(ctx context.Context)
type InMemoryOutput ¶
type InMemoryOutput struct {
// contains filtered or unexported fields
}
func NewInMemoryOutput ¶
func NewInMemoryOutput() *InMemoryOutput
func ProvideInMemoryOutput ¶
func ProvideInMemoryOutput(name string) *InMemoryOutput
func (*InMemoryOutput) Clear ¶
func (o *InMemoryOutput) Clear()
func (*InMemoryOutput) ContainsBody ¶
func (o *InMemoryOutput) ContainsBody(body string) bool
func (*InMemoryOutput) Len ¶
func (o *InMemoryOutput) Len() int
func (*InMemoryOutput) Size ¶
func (o *InMemoryOutput) Size() int
func (*InMemoryOutput) Write ¶
func (o *InMemoryOutput) Write(_ context.Context, batch []WritableMessage) error
func (*InMemoryOutput) WriteOne ¶
func (o *InMemoryOutput) WriteOne(ctx context.Context, msg WritableMessage) error
type InMemoryOutputConfiguration ¶
type InMemoryOutputConfiguration struct {
BaseOutputConfiguration
Type string `cfg:"type" default:"inMemory"`
}
type InMemorySettings ¶
type InMemorySettings struct {
Size int `cfg:"size" default:"1"`
}
type InitializeableCallback ¶ added in v0.37.0
type Input ¶
type Input interface {
// Run provides a steady stream of messages, returned via Data. Run does not return until Stop is called and thus
// should be called in its own go routine. The only exception to this is if we either fail to produce messages and
// return an error or if the input is depleted (like an InMemoryInput).
//
// Run should only be called once, not all inputs can be resumed.
Run(ctx context.Context) error
// Stop causes Run to return as fast as possible. Calling Stop is preferable to canceling the context passed to Run
// as it allows Run to shut down cleaner (and might take a bit longer, e.g., to finish processing the current batch
// of messages).
Stop(ctx context.Context)
// Data returns a channel containing the messages produced by this input.
Data() <-chan *Message
// IsHealthy checks if the input is still able to produce data. An Input is healthy if it produces zero or more
// messages repeatedly. Producing zero messages would for example happen if the input requested data from an
// external queue, but the queue was empty. An Input is unhealthy if it is no longer able to produce any messages.
//
// If an input exhausts its source (file, finished stream, fixed list, ...), it is still considered as healthy.
IsHealthy() bool
}
An Input provides you with a steady stream of messages until you Stop it.
func NewConfigurableInput ¶
func NewFileInput ¶
func NewFileInputWithInterfaces ¶
func NewFileInputWithInterfaces(logger log.Logger, settings FileSettings) Input
func NewKinesisInput ¶
func NewNoopInput ¶ added in v0.14.0
func NewNoopInput() Input
func NewRedisListInput ¶
func NewRedisListInputWithInterfaces ¶
func NewRedisListInputWithInterfaces( logger log.Logger, client redis.Client, mw metric.Writer, settings *RedisListInputSettings, healthCheckTimer clock.HealthCheckTimer, ) Input
type InputFactory ¶
type KafkaInput ¶
type KafkaInput struct {
// contains filtered or unexported fields
}
func NewKafkaInput ¶
func NewKafkaInputWithInterfaces ¶
func NewKafkaInputWithInterfaces(consumer *kafkaConsumer.Consumer) (*KafkaInput, error)
func (*KafkaInput) Ack ¶
Ack acknowledges a message. If possible, prefer calling Ack with a batch as it is more efficient.
func (*KafkaInput) AckBatch ¶
AckBatch does the same as calling Ack for every single message would, but it might use fewer calls to an external service.
func (*KafkaInput) Data ¶
func (i *KafkaInput) Data() <-chan *Message
Data returns a channel containing the messages produced by this input.
func (*KafkaInput) IsHealthy ¶ added in v0.40.17
func (i *KafkaInput) IsHealthy() bool
func (*KafkaInput) Run ¶
func (i *KafkaInput) Run(ctx context.Context) error
Run provides a steady stream of messages, returned via Data. Run does not return until Stop is called and thus should be called in its own go routine. The only exception to this is if we either fail to produce messages and return an error or if the input is depleted (like an InMemoryInput).
Run should only be called once, not all inputs can be resumed.
func (*KafkaInput) Stop ¶
func (i *KafkaInput) Stop(ctx context.Context)
Stop causes Run to return as fast as possible. Calling Stop is preferable to canceling the context passed to Run as it allows Run to shut down cleaner (and might take a bit longer, e.g., to finish processing the current batch of messages).
type KafkaOutput ¶
type KafkaOutput struct {
// contains filtered or unexported fields
}
func NewKafkaOutput ¶
func NewKafkaOutputWithInterfaces ¶
func NewKafkaOutputWithInterfaces(ctx context.Context, producer *kafkaProducer.Producer) (*KafkaOutput, error)
func (*KafkaOutput) Write ¶
func (o *KafkaOutput) Write(ctx context.Context, ms []WritableMessage) error
func (*KafkaOutput) WriteOne ¶
func (o *KafkaOutput) WriteOne(ctx context.Context, m WritableMessage) error
type KafkaSourceMessage ¶
func (KafkaSourceMessage) MarshalJSON ¶
func (k KafkaSourceMessage) MarshalJSON() ([]byte, error)
type KinesisOutputConfiguration ¶
type KinesisOutputConfiguration struct {
BaseOutputConfiguration
Type string `cfg:"type" default:"kinesis"`
Project string `cfg:"project"`
Family string `cfg:"family"`
Group string `cfg:"group"`
Application string `cfg:"application"`
ClientName string `cfg:"client_name" default:"default"`
StreamName string `cfg:"stream_name"`
}
type KinesisOutputSettings ¶
func (KinesisOutputSettings) GetAppId ¶
func (s KinesisOutputSettings) GetAppId() cfg.AppId
func (KinesisOutputSettings) GetClientName ¶
func (s KinesisOutputSettings) GetClientName() string
func (KinesisOutputSettings) GetStreamName ¶
func (s KinesisOutputSettings) GetStreamName() string
type KinsumerAutoscaleModule ¶ added in v0.34.1
type KinsumerAutoscaleModule struct {
kernel.BackgroundModule
kernel.ApplicationStage
// contains filtered or unexported fields
}
func NewKinsumerAutoscaleModuleWithInterfaces ¶ added in v0.34.1
func NewKinsumerAutoscaleModuleWithInterfaces( logger log.Logger, kinesisClient gosoKinesis.Client, kinesisStreamName string, leaderElection ddb.LeaderElection, memberId string, orchestrator KinsumerAutoscaleOrchestrator, settings KinsumerAutoscaleModuleSettings, ticker clock.Ticker, ) *KinsumerAutoscaleModule
type KinsumerAutoscaleModuleDynamoDbNamingSettings ¶ added in v0.34.1
type KinsumerAutoscaleModuleDynamoDbNamingSettings struct {
Pattern string `cfg:"pattern,nodecode" default:"{env}-kinsumer-autoscale-leaders"`
}
type KinsumerAutoscaleModuleDynamoDbSettings ¶ added in v0.34.1
type KinsumerAutoscaleModuleDynamoDbSettings struct {
Naming KinsumerAutoscaleModuleDynamoDbNamingSettings `cfg:"naming"`
}
type KinsumerAutoscaleModuleEcsSettings ¶ added in v0.34.1
type KinsumerAutoscaleModuleSettings ¶ added in v0.34.1
type KinsumerAutoscaleModuleSettings struct {
Ecs KinsumerAutoscaleModuleEcsSettings `cfg:"ecs"`
Enabled bool `cfg:"enabled" default:"true"`
DynamoDb KinsumerAutoscaleModuleDynamoDbSettings `cfg:"dynamodb"`
LeaderElection string `cfg:"leader_election" default:"kinsumer-autoscale"`
Orchestrator string `cfg:"orchestrator" default:"ecs"`
Period time.Duration `cfg:"period" default:"1m"`
}
type KinsumerAutoscaleOrchestrator ¶ added in v0.34.1
type KinsumerAutoscaleOrchestratorFactory ¶ added in v0.34.1
type LifecycleManager ¶ added in v0.37.0
type LifecycleManager interface {
reslife.LifeCycleer
reslife.Creator
}
type Message ¶
type Message struct {
Attributes map[string]string `json:"attributes"`
Body string `json:"body"`
// contains filtered or unexported fields
}
func BuildAggregateMessage ¶
func KafkaToGosoMessage ¶
func MarshalJsonMessage ¶
func MarshalProtobufMessage ¶
func MarshalProtobufMessage(body ProtobufEncodable, attributes ...map[string]string) (*Message, error)
func MessageUnmarshaller ¶
func NewProtobufMessage ¶
func RawUnmarshaller ¶
func SnsUnmarshaller ¶
func (*Message) GetAttributes ¶
func (*Message) MarshalToBytes ¶
func (*Message) MarshalToString ¶
func (*Message) UnmarshalFromBytes ¶
func (*Message) UnmarshalFromString ¶
type MessageBodyCompressor ¶
type MessageBodyEncoder ¶
type MessageBodyEncoder interface {
Encode(data any) ([]byte, error)
Decode(data []byte, out any) error
}
func NewJsonEncoder ¶
func NewJsonEncoder() MessageBodyEncoder
func NewProtobufEncoder ¶
func NewProtobufEncoder() MessageBodyEncoder
type MessageEncoder ¶
type MessageEncoderSettings ¶
type MessageEncoderSettings struct {
Encoding EncodingType
Compression CompressionType
EncodeHandlers []EncodeHandler
}
type ModelMsg ¶
func CreateModelMsg ¶
type NoOpOutput ¶
type NoOpOutput struct{}
func (*NoOpOutput) Write ¶
func (o *NoOpOutput) Write(_ context.Context, _ []WritableMessage) error
func (*NoOpOutput) WriteOne ¶
func (o *NoOpOutput) WriteOne(_ context.Context, _ WritableMessage) error
type Output ¶
type Output interface {
WriteOne(ctx context.Context, msg WritableMessage) error
Write(ctx context.Context, batch []WritableMessage) error
}
func NewConfigurableOutput ¶
func NewFileOutput ¶
func NewKinesisOutput ¶
func NewKinesisOutputWithInterfaces ¶
func NewKinesisOutputWithInterfaces(recordWriter gosoKinesis.RecordWriter) Output
func NewRedisListOutput ¶
func NewSnsOutput ¶
func NewSqsOutput ¶
type OutputChannel ¶
type OutputChannel interface {
Read() ([]WritableMessage, bool)
Write(ctx context.Context, msg []WritableMessage)
Close(ctx context.Context)
IsClosed() bool
}
func NewOutputChannel ¶
func NewOutputChannel(logger log.Logger, bufferSize int) OutputChannel
type OutputFactory ¶
type PartitionedOutput ¶
type PartitionerRand ¶
type Producer ¶
type Producer interface {
WriteOne(ctx context.Context, model any, attributeSets ...map[string]string) error
Write(ctx context.Context, models any, attributeSets ...map[string]string) error
}
func NewMultiProducer ¶
type ProducerDaemonAggregator ¶
type ProducerDaemonAggregator interface {
Write(ctx context.Context, msg *Message) ([]AggregateFlush, error)
Flush() ([]AggregateFlush, error)
}
func NewProducerDaemonAggregator ¶
func NewProducerDaemonAggregator(settings ProducerDaemonSettings, compression CompressionType, attributeSets ...map[string]string) (ProducerDaemonAggregator, error)
func NewProducerDaemonPartitionedAggregator ¶
func NewProducerDaemonPartitionedAggregator(logger log.Logger, settings ProducerDaemonSettings, compression CompressionType) (ProducerDaemonAggregator, error)
func NewProducerDaemonPartitionedAggregatorWithInterfaces ¶
func NewProducerDaemonPartitionedAggregatorWithInterfaces(logger log.Logger, rand PartitionerRand, aggregators int, createAggregator func(attributes map[string]string) (ProducerDaemonAggregator, error)) (ProducerDaemonAggregator, error)
type ProducerDaemonBatcher ¶
type ProducerDaemonBatcher interface {
Append(msg *Message) ([]WritableMessage, error)
Flush() []WritableMessage
}
func NewProducerDaemonBatcher ¶
func NewProducerDaemonBatcher(settings ProducerDaemonSettings) ProducerDaemonBatcher
type ProducerDaemonSettings ¶
type ProducerDaemonSettings struct {
Enabled bool `cfg:"enabled" default:"false"`
// Amount of time spend waiting for messages before sending out a batch.
Interval time.Duration `cfg:"interval" default:"1m"`
// Size of the buffer channel, i.e., how many messages can be in-flight at once? Generally it is a good idea to match
// this with the number of runners.
BufferSize int `cfg:"buffer_size" default:"10" validate:"min=1"`
// Number of daemons running in the background, writing complete batches to the output.
RunnerCount int `cfg:"runner_count" default:"10" validate:"min=1"`
// How many SQS messages do we submit in a single batch? SQS can accept up to 10 messages per batch.
// SNS doesn't support batching, so the value doesn't matter for SNS.
BatchSize int `cfg:"batch_size" default:"10" validate:"min=1"`
// How large may the sum of all messages in the aggregation be? For SQS you can't send more than 256 KB in one batch,
// for SNS a single message can't be larger than 256 KB. We use 252 KB as default to leave some room for request
// encoding and overhead.
BatchMaxSize int `cfg:"batch_max_size" default:"258048" validate:"min=0"`
// How many stream.Messages do we pack together in a single batch (one message in SQS) at once?
AggregationSize int `cfg:"aggregation_size" default:"1" validate:"min=1"`
// Maximum size in bytes of a batch. Defaults to 64 KB to leave some room for encoding overhead.
// Set to 0 to disable limiting the maximum size for a batch (it will still not put more than BatchSize messages
// in a batch).
//
// Note: Gosoline can't ensure your messages stay below this size if your messages are quite large (especially when
// using compression). Imagine you already aggregated 40kb of compressed messages (around 53kb when base64 encoded)
// and are now writing a message that compresses to 20 kb. Now your buffer reaches 60 kb and 80 kb base64 encoded.
// Gosoline will not already output a 53 kb message if you requested 64 kb messages (it would accept a 56 kb message),
// but after writing the next message
AggregationMaxSize int `cfg:"aggregation_max_size" default:"65536" validate:"min=0"`
// If you are writing to an output using a partition key, we ensure messages are still distributed to a partition
// according to their partition key (although not necessary the same partition as without the producer daemon).
// For this, we split the messages into buckets while collecting them, thus potentially aggregating more messages in
// memory (depending on the number of buckets you configure).
//
// Note: This still does not guarantee that your messages are perfectly ordered - this is impossible as soon as you
// have more than once producer. However, messages with the same partition key will end up in the same shard, so if
// you are reading two different shards and one is much further behind than the other, you will not see messages
// *massively* out of order - it should be roughly bounded by the time you buffer messages (the Interval setting) and
// thus be not much more than a minute (using the default setting) instead of hours (if one shard is half a day behind
// while the other is up-to-date).
//
// Second note: If you change the amount of partitions, messages might move between buckets and thus end up in different
// shards than before. Thus, only do this if you can handle it (e.g., because no shard is currently lagging behind).
PartitionBucketCount int `cfg:"partition_bucket_count" default:"128" validate:"min=1"`
// Additional attributes we append to each message
MessageAttributes map[string]string `cfg:"message_attributes"`
}
type ProducerMetadata ¶
type ProducerSettings ¶
type ProducerSettings struct {
Output string `cfg:"output"`
Encoding EncodingType `cfg:"encoding"`
Compression CompressionType `cfg:"compression" default:"none"`
Daemon ProducerDaemonSettings `cfg:"daemon"`
}
type ProtobufEncodable ¶
type RawMessage ¶
type RawMessage struct {
Body any
Encoder MessageBodyEncoder
}
func NewRawJsonMessage ¶
func NewRawJsonMessage(body any) *RawMessage
NewRawJsonMessage works like NewRawMessage with the encoder set to marshal the body as JSON.
func NewRawMessage ¶
func NewRawMessage(body any, encoder MessageBodyEncoder) *RawMessage
NewRawMessage creates a new RawMessage. It uses the provided encoder to encode the message body.
func (*RawMessage) MarshalToBytes ¶
func (m *RawMessage) MarshalToBytes() ([]byte, error)
func (*RawMessage) MarshalToString ¶
func (m *RawMessage) MarshalToString() (string, error)
type RedisListInputSettings ¶
type RedisListOutputSettings ¶
type RetryHandler ¶
func NewManualSqsRetryHandler ¶ added in v0.14.0
func NewManualSqsRetryHandler(logger log.Logger, queue sqs.Queue, settings *SqsOutputSettings) RetryHandler
func NewManualSqsRetryHandlerFromInterfaces ¶ added in v0.14.0
func NewManualSqsRetryHandlerFromInterfaces(output Output) RetryHandler
type RetryHandlerFactory ¶
type RetryHandlerNoop ¶
type RetryHandlerNoop struct{}
func NewRetryHandlerNoopWithInterfaces ¶
func NewRetryHandlerNoopWithInterfaces() RetryHandlerNoop
type RetryHandlerSettings ¶
type RetryHandlerSqs ¶
type RetryHandlerSqs struct {
// contains filtered or unexported fields
}
func NewRetryHandlerSqsWithInterfaces ¶
func NewRetryHandlerSqsWithInterfaces(output Output, settings *RetryHandlerSqsSettings) *RetryHandlerSqs
type RetryHandlerSqsSettings ¶
type RetryHandlerSqsSettings struct {
cfg.AppId
RetryHandlerSettings
ClientName string `cfg:"client_name" default:"default"`
MaxNumberOfMessages int32 `cfg:"max_number_of_messages" default:"10" validate:"min=1,max=10"`
WaitTime int32 `cfg:"wait_time" default:"10"`
RunnerCount int `cfg:"runner_count" default:"1"`
QueueId string `cfg:"queue_id"`
Healthcheck health.HealthCheckSettings `cfg:"healthcheck"`
}
type RetryingInput ¶ added in v0.14.0
type RetryingInput interface {
GetRetryHandler() (Input, RetryHandler)
}
type RunnableBatchConsumerCallback ¶
type RunnableBatchConsumerCallback[M any] interface { BatchConsumerCallback[M] RunnableCallback }
type RunnableCallback ¶
type RunnableConsumerCallback ¶
type RunnableConsumerCallback[M any] interface { ConsumerCallback[M] RunnableCallback }
type RunnableUntypedBatchConsumerCallback ¶ added in v0.41.0
type RunnableUntypedBatchConsumerCallback interface {
UntypedBatchConsumerCallback
RunnableCallback
}
type RunnableUntypedConsumerCallback ¶ added in v0.41.0
type RunnableUntypedConsumerCallback interface {
UntypedConsumerCallback
RunnableCallback
}
type SizeRestrictedOutput ¶
type SizeRestrictedOutput interface {
Output
// GetMaxMessageSize returns the maximum size of a message for this output (or nil if there is no limit on message size).
GetMaxMessageSize() *int
// GetMaxBatchSize returns the maximum number of messages we can write at once to the output (or nil if there is no limit).
GetMaxBatchSize() *int
}
type SnsInputConfiguration ¶
type SnsInputConfiguration struct {
Type string `cfg:"type" default:"sns"`
ConsumerId string `cfg:"id" validate:"required"`
Family string `cfg:"family" default:""`
Group string `cfg:"group" default:""`
Application string `cfg:"application" default:""`
Targets []SnsInputTargetConfiguration `cfg:"targets" validate:"min=1"`
MaxNumberOfMessages int32 `cfg:"max_number_of_messages" default:"10" validate:"min=1,max=10"`
WaitTime int32 `cfg:"wait_time" default:"3" validate:"min=1"`
VisibilityTimeout int `cfg:"visibility_timeout" default:"30" validate:"min=1"`
RunnerCount int `cfg:"runner_count" default:"1" validate:"min=1"`
RedrivePolicy sqs.RedrivePolicy `cfg:"redrive_policy"`
ClientName string `cfg:"client_name" default:"default"`
Healthcheck health.HealthCheckSettings `cfg:"healthcheck"`
}
type SnsInputSettings ¶
type SnsInputSettings struct {
cfg.AppId
QueueId string `cfg:"queue_id"`
MaxNumberOfMessages int32 `cfg:"max_number_of_messages" default:"10" validate:"min=1,max=10"`
WaitTime int32 `cfg:"wait_time"`
RedrivePolicy sqs.RedrivePolicy `cfg:"redrive_policy"`
VisibilityTimeout int `cfg:"visibility_timeout"`
RunnerCount int `cfg:"runner_count"`
ClientName string `cfg:"client_name"`
Healthcheck health.HealthCheckSettings `cfg:"healthcheck"`
}
func (SnsInputSettings) GetAppId ¶
func (s SnsInputSettings) GetAppId() cfg.AppId
func (SnsInputSettings) GetClientName ¶
func (s SnsInputSettings) GetClientName() string
func (SnsInputSettings) GetQueueId ¶
func (s SnsInputSettings) GetQueueId() string
func (SnsInputSettings) IsFifoEnabled ¶
func (s SnsInputSettings) IsFifoEnabled() bool
type SnsInputTarget ¶
type SnsInputTarget struct {
cfg.AppId
TopicId string
Attributes map[string]string
ClientName string
}
func (SnsInputTarget) GetAppId ¶
func (t SnsInputTarget) GetAppId() cfg.AppId
func (SnsInputTarget) GetClientName ¶
func (t SnsInputTarget) GetClientName() string
func (SnsInputTarget) GetTopicId ¶
func (t SnsInputTarget) GetTopicId() string
type SnsInputTargetConfiguration ¶
type SnsInputTargetConfiguration struct {
Family string `cfg:"family"`
Group string `cfg:"group" validate:"required"`
Application string `cfg:"application" validate:"required"`
TopicId string `cfg:"topic_id" validate:"required"`
Attributes map[string]string `cfg:"attributes"`
ClientName string `cfg:"client_name" default:"default"`
}
type SnsOutputConfiguration ¶
type SnsOutputConfiguration struct {
BaseOutputConfiguration
Type string `cfg:"type" default:"sns"`
Project string `cfg:"project"`
Family string `cfg:"family"`
Group string `cfg:"group"`
Application string `cfg:"application"`
TopicId string `cfg:"topic_id" validate:"required"`
ClientName string `cfg:"client_name" default:"default"`
}
type SnsOutputSettings ¶
func (SnsOutputSettings) GetAppId ¶
func (s SnsOutputSettings) GetAppId() cfg.AppId
func (SnsOutputSettings) GetClientName ¶
func (s SnsOutputSettings) GetClientName() string
func (SnsOutputSettings) GetTopicId ¶
func (s SnsOutputSettings) GetTopicId() string
type SqsInputSettings ¶
type SqsInputSettings struct {
cfg.AppId
QueueId string `cfg:"queue_id"`
MaxNumberOfMessages int32 `cfg:"max_number_of_messages" default:"10" validate:"min=1,max=10"`
WaitTime int32 `cfg:"wait_time"`
VisibilityTimeout int `cfg:"visibility_timeout"`
RunnerCount int `cfg:"runner_count"`
Fifo sqs.FifoSettings `cfg:"fifo"`
RedrivePolicy sqs.RedrivePolicy `cfg:"redrive_policy"`
ClientName string `cfg:"client_name"`
Unmarshaller string `cfg:"unmarshaller" default:"msg"`
Healthcheck health.HealthCheckSettings `cfg:"healthcheck"`
}
func (SqsInputSettings) GetAppId ¶
func (s SqsInputSettings) GetAppId() cfg.AppId
func (SqsInputSettings) GetClientName ¶
func (s SqsInputSettings) GetClientName() string
func (SqsInputSettings) GetQueueId ¶
func (s SqsInputSettings) GetQueueId() string
func (SqsInputSettings) IsFifoEnabled ¶
func (s SqsInputSettings) IsFifoEnabled() bool
type SqsOutputConfiguration ¶
type SqsOutputConfiguration struct {
BaseOutputConfiguration
Type string `cfg:"type" default:"sqs"`
Project string `cfg:"project"`
Family string `cfg:"family"`
Group string `cfg:"group"`
Application string `cfg:"application"`
QueueId string `cfg:"queue_id" validate:"required"`
VisibilityTimeout int `cfg:"visibility_timeout" default:"30" validate:"gt=0"`
RedrivePolicy sqs.RedrivePolicy `cfg:"redrive_policy"`
Fifo sqs.FifoSettings `cfg:"fifo"`
ClientName string `cfg:"client_name" default:"default"`
}
type SqsOutputSettings ¶
type SqsOutputSettings struct {
cfg.AppId
ClientName string
Fifo sqs.FifoSettings
QueueId string
RedrivePolicy sqs.RedrivePolicy
VisibilityTimeout int
}
func (SqsOutputSettings) GetAppId ¶
func (s SqsOutputSettings) GetAppId() cfg.AppId
func (SqsOutputSettings) GetClientName ¶
func (s SqsOutputSettings) GetClientName() string
func (SqsOutputSettings) GetQueueId ¶
func (s SqsOutputSettings) GetQueueId() string
func (SqsOutputSettings) IsFifoEnabled ¶
func (s SqsOutputSettings) IsFifoEnabled() bool
type UnmarshallerFunc ¶
type UntypedBatchConsumerCallback ¶ added in v0.41.0
type UntypedBatchConsumerCallback interface {
BaseConsumerCallback
Consume(ctx context.Context, models []any, attributes []map[string]string) ([]bool, error)
}
func EraseBatchConsumerCallbackTypes ¶ added in v0.41.0
func EraseBatchConsumerCallbackTypes[M any](consumerCallback BatchConsumerCallback[M]) UntypedBatchConsumerCallback
type UntypedBatchConsumerCallbackFactory ¶ added in v0.41.0
type UntypedBatchConsumerCallbackFactory func(ctx context.Context, config cfg.Config, logger log.Logger) (UntypedBatchConsumerCallback, error)
func EraseBatchConsumerCallbackFactoryTypes ¶ added in v0.41.0
func EraseBatchConsumerCallbackFactoryTypes[M any](callbackFactory BatchConsumerCallbackFactory[M]) UntypedBatchConsumerCallbackFactory
type UntypedBatchConsumerCallbackMap ¶ added in v0.50.0
type UntypedBatchConsumerCallbackMap map[string]UntypedBatchConsumerCallbackFactory
type UntypedConsumerCallback ¶ added in v0.41.0
type UntypedConsumerCallback interface {
BaseConsumerCallback
Consume(ctx context.Context, model any, attributes map[string]string) (bool, error)
}
func EraseConsumerCallbackTypes ¶ added in v0.41.0
func EraseConsumerCallbackTypes[M any](consumerCallback ConsumerCallback[M]) UntypedConsumerCallback
type UntypedConsumerCallbackFactory ¶ added in v0.41.0
type UntypedConsumerCallbackFactory func(ctx context.Context, config cfg.Config, logger log.Logger) (UntypedConsumerCallback, error)
func EraseConsumerCallbackFactoryTypes ¶ added in v0.41.0
func EraseConsumerCallbackFactoryTypes[M any](callbackFactory ConsumerCallbackFactory[M]) UntypedConsumerCallbackFactory
type UntypedConsumerCallbackMap ¶ added in v0.41.0
type UntypedConsumerCallbackMap map[string]UntypedConsumerCallbackFactory
type WritableMessage ¶
type WritableMessage interface {
MarshalToBytes() ([]byte, error)
MarshalToString() (string, error)
}
func MessagesToWritableMessages ¶
func MessagesToWritableMessages(batch []*Message) []WritableMessage
Source Files
¶
- chunking.go
- compression.go
- config.go
- consumer.go
- consumer_acknowledge.go
- consumer_base.go
- consumer_batch.go
- consumer_batch_module_factory.go
- consumer_module_factory.go
- consumer_settings.go
- encoding.go
- encoding_json.go
- encoding_protobuf.go
- input.go
- input_configurable.go
- input_file.go
- input_in_memory.go
- input_kafka.go
- input_kinesis.go
- input_noop.go
- input_redis_list.go
- input_sns.go
- input_sns_lifecycle.go
- input_sqs.go
- kinsumer_autoscale_module.go
- kinsumer_autoscale_module_settings.go
- kinsumer_autoscale_orchestrator.go
- kinsumer_autoscale_orchestrator_ecs.go
- manual_sqs_retry_handler.go
- message.go
- message_builder.go
- message_encoding.go
- message_kafka.go
- message_model.go
- mpr_handler.go
- mpr_queue_names.go
- multi_producer.go
- output.go
- output_channel.go
- output_configurable.go
- output_configurable_multiple.go
- output_file.go
- output_in_memory.go
- output_kafka.go
- output_kinesis.go
- output_noop.go
- output_redis_list.go
- output_sns.go
- output_sqs.go
- output_tracer.go
- producer.go
- producer_daemon.go
- producer_daemon_aggregator.go
- producer_daemon_batcher.go
- producer_daemon_config_postprocessor.go
- producer_daemon_factory.go
- producer_daemon_partitioned_aggregator.go
- raw_message.go
- retry.go
- retry_noop.go
- retry_sqs.go
- typed_batch_consumer.go
- typed_consumer.go
- unmarshal.go