sqs

package
v1.48.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2026 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// ContextKeyResponseQueueURL is the context key that allows fetching
	// and setting the response queue URL from and into context.
	ContextKeyResponseQueueURL contextKey = iota
)

Variables

This section is empty.

Functions

func DefaultErrorEncoder

func DefaultErrorEncoder(context.Context, error, types.Message, SQSClient)

DefaultErrorEncoder simply ignores the message.

func EncodeJSONRequest

func EncodeJSONRequest(_ context.Context, msg *sqs.SendMessageInput, request any) error

EncodeJSONRequest is an EncodeRequestFunc that serializes the request as a JSON object and loads it as the MessageBody of the sqs.SendMessageInput. This can be enough for most JSON over SQS communications.

func EncodeJSONResponse

func EncodeJSONResponse(_ context.Context, input *sqs.SendMessageInput, response any) error

EncodeJSONResponse marshals response as json and loads it into an sqs.SendMessageInput MessageBody.

func NoResponseDecode

func NoResponseDecode(_ context.Context, _ types.Message) (any, error)

NoResponseDecode is a DecodeResponseFunc that can be used when no response is needed. It returns nil value and nil error.

Types

type DecodeRequestFunc

type DecodeRequestFunc func(context.Context, types.Message) (request any, err error)

DecodeRequestFunc extracts a user-domain request object from an SQS message object. It is designed to be used in Subscribers.

type DecodeResponseFunc

type DecodeResponseFunc func(context.Context, types.Message) (response any, err error)

DecodeResponseFunc extracts a user-domain response object from an SQS message object. It is designed to be used in Publishers.

type EncodeRequestFunc

type EncodeRequestFunc func(context.Context, *sqs.SendMessageInput, any) error

EncodeRequestFunc encodes the passed payload object into an SQS message object. It is designed to be used in Publishers.

type EncodeResponseFunc

type EncodeResponseFunc func(context.Context, *sqs.SendMessageInput, any) error

EncodeResponseFunc encodes the passed response object to an SQS message object. It is designed to be used in Subscribers.

type ErrorEncoder

type ErrorEncoder func(ctx context.Context, err error, req types.Message, sqsClient SQSClient)

ErrorEncoder is responsible for encoding an error to the subscriber's reply. Users are encouraged to use custom ErrorEncoders to encode errors to their replies, and will likely want to pass and check for their own error types.

type Publisher

type Publisher struct {
	Handler SQSPublisher
	// contains filtered or unexported fields
}

Publisher wraps an Publisher client, and provides a method that implements endpoint.Endpoint.

func NewPublisher

func NewPublisher(
	handler SQSPublisher,
	queueURL string,
	enc EncodeRequestFunc,
	dec DecodeResponseFunc,
	options ...PublisherOption,
) *Publisher

NewPublisher constructs a usable Publisher for a single remote method.

func (Publisher) Endpoint

func (p Publisher) Endpoint() endpoint.Endpoint

Endpoint returns a usable endpoint that invokes the remote endpoint.

type PublisherOption

type PublisherOption func(*Publisher)

PublisherOption sets an optional parameter for clients.

func PublisherAfter

func PublisherAfter(after ...PublisherResponseFunc) PublisherOption

PublisherAfter sets the ClientResponseFuncs applied to the incoming SQS request prior to it being decoded. This is useful for obtaining the response and adding any information onto the context prior to decoding.

func PublisherBefore

func PublisherBefore(before ...PublisherRequestFunc) PublisherOption

PublisherBefore sets the RequestFuncs that are applied to the outgoing SQS request before it's invoked.

type PublisherRequestFunc

type PublisherRequestFunc func(ctx context.Context, input *sqs.SendMessageInput) context.Context

PublisherRequestFunc may take information from a producer request and put it into a request context, or add some informations to SendMessageInput. In Publishers, RequestFuncs are executed prior to publishing the message but after encoding. use cases eg. in Publisher : enforce some message attributes to SendMessageInput.

func SetPublisherLogger

func SetPublisherLogger(l sdklogger.Logger) PublisherRequestFunc

SetPublisherLogger returns PublisherRequestFunc that sets SDK Logger to the request context. It will also try to setup context values to the logger fields.

func SetPublisherMetrics

func SetPublisherMetrics(m sdkmetrics.Metrics) PublisherRequestFunc

SetPublisherMetrics returns PublisherRequestFunc that sets the Metrics client to the request context.

func SetPublisherResponseQueueURL

func SetPublisherResponseQueueURL(url string) PublisherRequestFunc

SetPublisherResponseQueueURL can be used as a before function to add provided url as responseQueueURL in context.

type PublisherResponseFunc

type PublisherResponseFunc func(
	context.Context, SQSPublisher, *sqs.SendMessageOutput) (context.Context, types.Message, error)

PublisherResponseFunc may take information from an sqs.SendMessageOutput and fetch response using the Client. SQS is not req-reply out-of-the-box. Responses need to be fetched. PublisherResponseFunc are only executed in producers, after a request has been made, but prior to its response being decoded. So this is the perfect place to fetch actual response.

type SQSClient

type SQSClient interface {
	ChangeMessageVisibility(
		ctx context.Context,
		input *sqs.ChangeMessageVisibilityInput,
		optFns ...func(opts *sqs.Options)) (*sqs.ChangeMessageVisibilityOutput, error)
	DeleteMessage(
		ctx context.Context,
		input *sqs.DeleteMessageInput,
		optFns ...func(opts *sqs.Options)) (*sqs.DeleteMessageOutput, error)
}

type SQSPublisher

type SQSPublisher interface {
	Publish(ctx context.Context, message *sqs.SendMessageInput) (*sqs.SendMessageOutput, error)
}

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber wraps an endpoint and provides a handler for SQS messages.

func NewSubscriber

func NewSubscriber(
	sqsClient SQSClient,
	e endpoint.Endpoint,
	dec DecodeRequestFunc,
	enc EncodeResponseFunc,
	queueURL string,
	options ...SubscriberOption,
) *Subscriber

NewSubscriber constructs a new Subscriber, which provides a ServeMessage method and message handlers that wrap the provided endpoint.

func (Subscriber) ServeMessage

func (s Subscriber) ServeMessage(ctx context.Context) func(msg types.Message) error

ServeMessage serves an SQS message.

type SubscriberFinalizerFunc

type SubscriberFinalizerFunc func(ctx context.Context, msg types.Message)

SubscriberFinalizerFunc can be used to perform work at the end of a request from a producer, after the response has been written to the producer. The principal intended use is for request logging. Can also be used to delete messages once fully proccessed.

type SubscriberOption

type SubscriberOption func(*Subscriber)

SubscriberOption sets an optional parameter for subscribers.

func SubscriberAfter

func SubscriberAfter(after ...SubscriberResponseFunc) SubscriberOption

SubscriberAfter functions are executed on the subscriber reply after the endpoint is invoked.

func SubscriberBefore

func SubscriberBefore(before ...SubscriberRequestFunc) SubscriberOption

SubscriberBefore functions are executed on the producer request object before the request is decoded.

func SubscriberDeleteMessageAfter

func SubscriberDeleteMessageAfter() SubscriberOption

SubscriberDeleteMessageAfter returns a SubscriberOption that appends a function that delete a message from queue to the list of subscriber's after functions.

func SubscriberDeleteMessageBefore

func SubscriberDeleteMessageBefore() SubscriberOption

SubscriberDeleteMessageBefore returns a SubscriberOption that appends a function that delete the message from queue to the list of subscriber's before functions.

func SubscriberErrorEncoder

func SubscriberErrorEncoder(ee ErrorEncoder) SubscriberOption

SubscriberErrorEncoder is used to encode errors to the subscriber reply whenever they're encountered in the processing of a request. Clients can use this to provide custom error formatting. By default, errors will be published with the DefaultErrorEncoder.

func SubscriberErrorHandler

func SubscriberErrorHandler(errorHandler transport.ErrorHandler) SubscriberOption

SubscriberErrorHandler is used to handle non-terminal errors. By default, non-terminal errors are ignored. This is intended as a diagnostic measure. Finer-grained control of error handling, including logging in more detail, should be performed in a custom SubscriberErrorEncoder which has access to the context.

func SubscriberFinalizer

func SubscriberFinalizer(f ...SubscriberFinalizerFunc) SubscriberOption

SubscriberFinalizer is executed once all the received SQS messages are done being processed. By default, no finalizer is registered.

func SubscriberNackMessageErrorEncoder

func SubscriberNackMessageErrorEncoder() SubscriberOption

SubscriberNackMessageErrorEncoder can be used to perform an immediate nack on the message.

func SubscriberSetContextTimeout

func SubscriberSetContextTimeout(timeout time.Duration) SubscriberOption

SubscriberSetContextTimeout returns a SubscriberOption that sets the context timeout.

type SubscriberRequestFunc

type SubscriberRequestFunc func(
	ctx context.Context, cancel context.CancelFunc, message types.Message) context.Context

SubscriberRequestFunc may take information from a subscriber request result and put it into a request context. In Subscribers, RequestFuncs are executed prior to invoking the endpoint. use cases eg. in Subscriber : extract message information into context.

func SetSubscriberDatabase added in v1.40.0

func SetSubscriberDatabase(db *gorm.DB) SubscriberRequestFunc

SetSubscriberDatabase returns SubscriberRequestFunc that sets the GORM database to the request context.

func SetSubscriberDatabaseLogging added in v1.40.0

func SetSubscriberDatabaseLogging() SubscriberRequestFunc

SetSubscriberDatabaseLogging returns SubscriberRequestFunc that sets the SDL Logger to GORM database and sets new gorm DB to the request context.

func SetSubscriberLogger

func SetSubscriberLogger(l sdklogger.Logger) SubscriberRequestFunc

SetSubscriberLogger returns SubscriberRequestFunc that sets SDK Logger to the request context. It will also try to setup context values to the logger fields.

func SetSubscriberMetrics

func SetSubscriberMetrics(m sdkmetrics.Metrics) SubscriberRequestFunc

SetSubscriberMetrics returns SubscriberRequestFunc that sets the Metrics client to the request context.

type SubscriberResponseFunc

type SubscriberResponseFunc func(
	ctx context.Context, cancel context.CancelFunc, message types.Message, resp any) context.Context

SubscriberResponseFunc may take information from a request context and use it to manipulate a Publisher. SubscriberResponseFunc are only executed in subscriber, after invoking the endpoint. use cases eg. : Pipe information from request message, delete msg from queue, etc.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL