 Documentation
      ¶
      Documentation
      ¶
    
    
  
    
  
    Index ¶
- Constants
- func Listen(ctx context.Context, listeners ...Listener) (func(), error)
- type CommandExecutor
- type CommandExecutorOption
- type CommandExecutorOptions
- type CommandHandler
- type CommandInvoker
- type CommandInvokerOption
- type CommandInvokerOptions
- type CommandRequest
- type CommandResponse
- type Empty
- type Encoding
- type InvocationError
- type InvokeOption
- type InvokeOptions
- type JSON
- type Listener
- type Message
- type Option
- type RespondOption
- type RespondOptions
- type SendOption
- type SendOptions
- type TelemetryHandler
- type TelemetryMessage
- type TelemetryReceiver
- type TelemetryReceiverOption
- type TelemetryReceiverOptions
- type TelemetrySender
- type TelemetrySenderOption
- type TelemetrySenderOptions
- type WithCacheTTL
- type WithConcurrency
- type WithExecutionTimeout
- type WithFencingToken
- type WithIdempotent
- type WithManualAck
- type WithMessageExpiry
- type WithMetadata
- type WithResponseTopic
- type WithResponseTopicPrefix
- type WithResponseTopicSuffix
- type WithRetain
- type WithShareName
- type WithTopicNamespace
- type WithTopicTokenNamespace
- type WithTopicTokens
Constants ¶
const DefaultMessageExpiry = 10
    DefaultMessageExpiry is the MessageExpiry applied to Invoke or Send if none is specified (10 seconds).
Variables ¶
This section is empty.
Functions ¶
Types ¶
type CommandExecutor ¶
CommandExecutor provides the ability to execute a single command.
func NewCommandExecutor ¶
func NewCommandExecutor[Req, Res any]( client mqtt.Client, requestEncoding Encoding[Req], responseEncoding Encoding[Res], requestTopic string, handler CommandHandler[Req, Res], opt ...CommandExecutorOption, ) (ce *CommandExecutor[Req, Res], err error)
NewCommandExecutor creates a new command executor.
type CommandExecutorOption ¶
type CommandExecutorOption interface {
	// contains filtered or unexported methods
}
    CommandExecutorOption represents a single command executor option.
type CommandExecutorOptions ¶
type CommandExecutorOptions struct {
	Idempotent bool
	CacheTTL   time.Duration
	Concurrency      uint
	ExecutionTimeout time.Duration
	TopicNamespace string
	TopicTokens    map[string]string
	Logger         *slog.Logger
}
    CommandExecutorOptions are the resolved command executor options.
func (*CommandExecutorOptions) Apply ¶
func (o *CommandExecutorOptions) Apply( opts []CommandExecutorOption, rest ...CommandExecutorOption, )
Apply resolves the provided list of options.
func (*CommandExecutorOptions) ApplyOptions ¶
func (o *CommandExecutorOptions) ApplyOptions(opts []Option, rest ...Option)
ApplyOptions filters and resolves the provided list of options.
type CommandHandler ¶
type CommandHandler[Req any, Res any] func( context.Context, *CommandRequest[Req], ) (*CommandResponse[Res], error)
CommandHandler is the user-provided implementation of a single command execution. It is treated as blocking; all parallelism is handled by the library. This *must* be thread-safe.
type CommandInvoker ¶
CommandInvoker provides the ability to invoke a single command.
func NewCommandInvoker ¶
func NewCommandInvoker[Req, Res any]( client mqtt.Client, requestEncoding Encoding[Req], responseEncoding Encoding[Res], requestTopic string, opt ...CommandInvokerOption, ) (ci *CommandInvoker[Req, Res], err error)
NewCommandInvoker creates a new command invoker.
func (*CommandInvoker[Req, Res]) Invoke ¶
func (ci *CommandInvoker[Req, Res]) Invoke( ctx context.Context, req Req, opt ...InvokeOption, ) (res *CommandResponse[Res], err error)
Invoke calls the command. This call will block until the command returns; any desired parallelism between invocations should be handled by the caller using normal Go constructs.
func (*CommandInvoker[Req, Res]) Listen ¶
func (ci *CommandInvoker[Req, Res]) Listen( ctx context.Context, ) (func(), error)
Listen to the response topic(s). Returns a function to stop listening. Must be called before any calls to Invoke. Note that cancelling this context will cause the unsubscribe call to fail.
type CommandInvokerOption ¶
type CommandInvokerOption interface {
	// contains filtered or unexported methods
}
    CommandInvokerOption represents a single command invoker option.
type CommandInvokerOptions ¶
type CommandInvokerOptions struct {
	ResponseTopic       func(string) string
	ResponseTopicPrefix string
	ResponseTopicSuffix string
	TopicNamespace string
	TopicTokens    map[string]string
	Logger         *slog.Logger
}
    CommandInvokerOptions are the resolved command invoker options.
func (*CommandInvokerOptions) Apply ¶
func (o *CommandInvokerOptions) Apply( opts []CommandInvokerOption, rest ...CommandInvokerOption, )
Apply resolves the provided list of options.
func (*CommandInvokerOptions) ApplyOptions ¶
func (o *CommandInvokerOptions) ApplyOptions(opts []Option, rest ...Option)
ApplyOptions filters and resolves the provided list of options.
type CommandRequest ¶
type CommandRequest[Req any] struct { Message[Req] FencingToken hlc.HybridLogicalClock }
CommandRequest contains per-message data and methods that are exposed to the command handlers.
type CommandResponse ¶
CommandResponse contains per-message data and methods that are returned by the command handlers.
type Empty ¶
type Empty struct{}
    Empty represents an encoding that contains no value.
func (Empty) ContentType ¶
func (Empty) ContentType() string
ContentType returns the empty MIME type.
func (Empty) Deserialize ¶
Deserialize validates that the payload is empty.
type Encoding ¶
type Encoding[T any] interface { ContentType() string IsUTF8() bool Serialize(T) ([]byte, error) Deserialize([]byte) (T, error) }
Encoding is a translation between a concrete Go type T and byte data. All methods *must* be thread-safe.
type InvocationError ¶
InvocationError represents an error intentionally returned by a handler to indicate incorrect invocation.
type InvokeOption ¶
type InvokeOption interface {
	// contains filtered or unexported methods
}
    InvokeOption represent a single per-invoke option.
type InvokeOptions ¶
type InvokeOptions struct {
	FencingToken hlc.HybridLogicalClock
	MessageExpiry uint32
	TopicTokens   map[string]string
	Metadata      map[string]string
}
    InvokeOptions are the resolved per-invoke options.
type JSON ¶
type JSON[T any] struct{}
JSON is a simple implementation of a JSON encoding.
func (JSON[T]) ContentType ¶
func (JSON[T]) ContentType() string
ContentType returns the JSON MIME type.
func (JSON[T]) Deserialize ¶
Deserialize translates JSON bytes into the Go type T.
type Message ¶
type Message[T any] struct { // The message payload. Payload T // The ID of the calling MQTT client. // TODO: Rename to "source" to align to Cloud Events spec? ClientID string // The data that identifies a single unique request. CorrelationData string // The timestamp of when the message was sent. Timestamp hlc.HybridLogicalClock // Any user-provided metadata values. Metadata map[string]string }
Message contains common message data that is exposed to message handlers.
type Option ¶
type Option interface {
	// contains filtered or unexported methods
}
    Option represents any of the option types, and can be filtered and applied by the ApplyOptions methods on the option structs.
func WithLogger ¶
WithLogger enables logging with the provided slog logger.
type RespondOption ¶
type RespondOption interface {
	// contains filtered or unexported methods
}
    RespondOption represent a single per-response option.
type RespondOptions ¶
RespondOptions are the resolved per-response options.
type SendOption ¶
type SendOption interface {
	// contains filtered or unexported methods
}
    SendOption represent a single per-send option.
type SendOptions ¶
type SendOptions struct {
	Retain bool
	MessageExpiry uint32
	TopicTokens   map[string]string
	Metadata      map[string]string
}
    SendOptions are the resolved per-send options.
type TelemetryHandler ¶
TelemetryHandler is the user-provided implementation of a single telemetry event handler. It is treated as blocking; all parallelism is handled by the library. This *must* be thread-safe.
type TelemetryMessage ¶
type TelemetryMessage[T any] struct { Message[T] // Ack provides a function to manually ack if enabled; it will be nil // otherwise. Ack func() error }
TelemetryMessage contains per-message data and methods that are exposed to the telemetry handlers.
type TelemetryReceiver ¶
type TelemetryReceiver[T any] struct { // contains filtered or unexported fields }
TelemetryReceiver provides the ability to handle the receipt of a single telemetry.
func NewTelemetryReceiver ¶
func NewTelemetryReceiver[T any]( client mqtt.Client, encoding Encoding[T], topic string, handler TelemetryHandler[T], opt ...TelemetryReceiverOption, ) (tr *TelemetryReceiver[T], err error)
NewTelemetryReceiver creates a new telemetry receiver.
type TelemetryReceiverOption ¶
type TelemetryReceiverOption interface {
	// contains filtered or unexported methods
}
    TelemetryReceiverOption represents a single telemetry receiver option.
type TelemetryReceiverOptions ¶
type TelemetryReceiverOptions struct {
	ManualAck bool
	Concurrency      uint
	ExecutionTimeout time.Duration
	TopicNamespace string
	TopicTokens    map[string]string
	Logger         *slog.Logger
}
    TelemetryReceiverOptions are the resolved telemetry receiver options.
func (*TelemetryReceiverOptions) Apply ¶
func (o *TelemetryReceiverOptions) Apply( opts []TelemetryReceiverOption, rest ...TelemetryReceiverOption, )
Apply resolves the provided list of options.
func (*TelemetryReceiverOptions) ApplyOptions ¶
func (o *TelemetryReceiverOptions) ApplyOptions(opts []Option, rest ...Option)
ApplyOptions filters and resolves the provided list of options.
type TelemetrySender ¶
type TelemetrySender[T any] struct { // contains filtered or unexported fields }
TelemetrySender provides the ability to send a single telemetry.
func NewTelemetrySender ¶
func NewTelemetrySender[T any]( client mqtt.Client, encoding Encoding[T], topic string, opt ...TelemetrySenderOption, ) (ts *TelemetrySender[T], err error)
NewTelemetrySender creates a new telemetry sender.
type TelemetrySenderOption ¶
type TelemetrySenderOption interface {
	// contains filtered or unexported methods
}
    TelemetrySenderOption represents a single telemetry sender option.
type TelemetrySenderOptions ¶
type TelemetrySenderOptions struct {
	TopicNamespace string
	TopicTokens    map[string]string
	Logger         *slog.Logger
}
    TelemetrySenderOptions are the resolved telemetry sender options.
func (*TelemetrySenderOptions) Apply ¶
func (o *TelemetrySenderOptions) Apply( opts []TelemetrySenderOption, rest ...TelemetrySenderOption, )
Apply resolves the provided list of options.
func (*TelemetrySenderOptions) ApplyOptions ¶
func (o *TelemetrySenderOptions) ApplyOptions(opts []Option, rest ...Option)
ApplyOptions filters and resolves the provided list of options.
type WithCacheTTL ¶
WithCacheTTL indicates how long results of this command will live in the cache. This is only valid for idempotent commands.
type WithConcurrency ¶
type WithConcurrency uint
WithConcurrency indicates how many handlers can execute in parallel.
type WithExecutionTimeout ¶
WithExecutionTimeout applies a context timeout to the handler execution.
type WithFencingToken ¶
type WithFencingToken hlc.HybridLogicalClock
WithFencingToken provides a fencing token to be used by the executor.
type WithManualAck ¶
type WithManualAck bool
WithManualAck indicates that the handler is responsible for manually acking the telemetry message.
type WithMessageExpiry ¶
type WithMessageExpiry uint32
WithMessageExpiry applies an MQTT message expiry (in seconds).
type WithMetadata ¶
WithMetadata specifies user-provided metadata values.
type WithResponseTopic ¶
WithResponseTopic specifies a translation function from the request topic to the response topic. Note that this overrides any provided response topic prefix or suffix.
type WithResponseTopicPrefix ¶
type WithResponseTopicPrefix string
WithResponseTopicPrefix specifies a custom prefix for the response topic.
type WithResponseTopicSuffix ¶
type WithResponseTopicSuffix string
WithResponseTopicSuffix specifies a custom suffix for the response topic.
type WithRetain ¶
type WithRetain bool
WithRetain indicates that the telemetry event should be retained by the broker.
type WithShareName ¶
type WithShareName string
WithShareName connects this listener to a shared MQTT subscription.
type WithTopicNamespace ¶
type WithTopicNamespace string
WithTopicNamespace specifies a namespace that will be prepended to the topic.
type WithTopicTokenNamespace ¶
type WithTopicTokenNamespace string
WithTopicTokenNamespace specifies a namespace that will be prepended to all previously-specified topic tokens. Topic tokens specified after this option will not be namespaced, allowing this to differentiate user tokens from system tokens.
type WithTopicTokens ¶
WithTopicTokens specifies topic token values.