 Documentation
      ¶
      Documentation
      ¶
    
    
  
    
  
    Overview ¶
Copyright (c) Microsoft Corporation. Licensed under the MIT License.
Copyright (c) Microsoft Corporation. Licensed under the MIT License.
Copyright (c) Microsoft Corporation. Licensed under the MIT License.
Copyright (c) Microsoft Corporation. Licensed under the MIT License.
Copyright (c) Microsoft Corporation. Licensed under the MIT License.
Copyright (c) Microsoft Corporation. Licensed under the MIT License.
Copyright (c) Microsoft Corporation. Licensed under the MIT License.
Copyright (c) Microsoft Corporation. Licensed under the MIT License.
Copyright (c) Microsoft Corporation. Licensed under the MIT License.
Copyright (c) Microsoft Corporation. Licensed under the MIT License.
Index ¶
- Constants
- func WithLogger(logger *slog.Logger) interface{ ... }
- type CloudEvent
- type CommandExecutor
- type CommandExecutorOption
- type CommandExecutorOptions
- type CommandHandler
- type CommandInvoker
- type CommandInvokerOption
- type CommandInvokerOptions
- type CommandRequest
- type CommandResponse
- type Empty
- type Encoding
- type InvocationError
- type InvokeOption
- type InvokeOptions
- type JSON
- type Listener
- type Listeners
- type Message
- type MqttClient
- type Option
- type Raw
- type RespondOption
- type RespondOptions
- type SendOption
- type SendOptions
- type TelemetryHandler
- type TelemetryMessage
- type TelemetryReceiver
- type TelemetryReceiverOption
- type TelemetryReceiverOptions
- type TelemetrySender
- type TelemetrySenderOption
- type TelemetrySenderOptions
- type WithCacheTTL
- type WithConcurrency
- type WithFencingToken
- type WithIdempotent
- type WithManualAck
- type WithMetadata
- type WithResponseTopic
- type WithResponseTopicPrefix
- type WithResponseTopicSuffix
- type WithRetain
- type WithShareName
- type WithTimeout
- type WithTopicNamespace
- type WithTopicTokenNamespace
- type WithTopicTokens
Constants ¶
const ( DefaultCloudEventSpecVersion = "1.0" DefaultCloudEventType = "ms.aio.telemetry" )
const DefaultTimeout = 10 * time.Second
    DefaultTimeout is the timeout applied to Invoke or Send if none is specified.
Variables ¶
This section is empty.
Functions ¶
func WithLogger ¶
func WithLogger(logger *slog.Logger) interface { Option CommandExecutorOption CommandInvokerOption TelemetryReceiverOption TelemetrySenderOption }
WithLogger enables logging with the provided slog logger.
Types ¶
type CloudEvent ¶ added in v0.2.0
type CloudEvent struct {
	ID          string
	Source      *url.URL
	SpecVersion string
	Type        string
	DataContentType string
	DataSchema      *url.URL
	Subject         string
	Time            time.Time
}
    CloudEvent provides an implementation of the CloudEvents 1.0 spec; see: https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md
func (*CloudEvent) Attrs ¶ added in v0.2.0
func (ce *CloudEvent) Attrs() []slog.Attr
Attrs returns additional attributes for slog.
type CommandExecutor ¶
CommandExecutor provides the ability to execute a single command.
func NewCommandExecutor ¶
func NewCommandExecutor[Req, Res any]( client MqttClient, requestEncoding Encoding[Req], responseEncoding Encoding[Res], requestTopicPattern string, handler CommandHandler[Req, Res], opt ...CommandExecutorOption, ) (ce *CommandExecutor[Req, Res], err error)
NewCommandExecutor creates a new command executor.
func (*CommandExecutor[Req, Res]) Close ¶ added in v0.2.0
func (ce *CommandExecutor[Req, Res]) Close()
Close the command executor to free its resources.
type CommandExecutorOption ¶
type CommandExecutorOption interface {
	// contains filtered or unexported methods
}
    CommandExecutorOption represents a single command executor option.
type CommandExecutorOptions ¶
type CommandExecutorOptions struct {
	Idempotent bool
	CacheTTL   time.Duration
	Concurrency uint
	Timeout     time.Duration
	TopicNamespace string
	TopicTokens    map[string]string
	Logger         *slog.Logger
}
    CommandExecutorOptions are the resolved command executor options.
func (*CommandExecutorOptions) Apply ¶
func (o *CommandExecutorOptions) Apply( opts []CommandExecutorOption, rest ...CommandExecutorOption, )
Apply resolves the provided list of options.
func (*CommandExecutorOptions) ApplyOptions ¶
func (o *CommandExecutorOptions) ApplyOptions(opts []Option, rest ...Option)
ApplyOptions filters and resolves the provided list of options.
type CommandHandler ¶
type CommandHandler[Req any, Res any] func( context.Context, *CommandRequest[Req], ) (*CommandResponse[Res], error)
CommandHandler is the user-provided implementation of a single command execution. It is treated as blocking; all parallelism is handled by the library. This *must* be thread-safe.
type CommandInvoker ¶
CommandInvoker provides the ability to invoke a single command.
func NewCommandInvoker ¶
func NewCommandInvoker[Req, Res any]( client MqttClient, requestEncoding Encoding[Req], responseEncoding Encoding[Res], requestTopicPattern string, opt ...CommandInvokerOption, ) (ci *CommandInvoker[Req, Res], err error)
NewCommandInvoker creates a new command invoker.
func (*CommandInvoker[Req, Res]) Close ¶ added in v0.2.0
func (ci *CommandInvoker[Req, Res]) Close()
Close the command invoker to free its resources.
func (*CommandInvoker[Req, Res]) Invoke ¶
func (ci *CommandInvoker[Req, Res]) Invoke( ctx context.Context, req Req, opt ...InvokeOption, ) (res *CommandResponse[Res], err error)
Invoke calls the command. This call will block until the command returns; any desired parallelism between invocations should be handled by the caller using normal Go constructs.
type CommandInvokerOption ¶
type CommandInvokerOption interface {
	// contains filtered or unexported methods
}
    CommandInvokerOption represents a single command invoker option.
type CommandInvokerOptions ¶
type CommandInvokerOptions struct {
	ResponseTopic       func(string) string
	ResponseTopicPrefix string
	ResponseTopicSuffix string
	TopicNamespace string
	TopicTokens    map[string]string
	Logger         *slog.Logger
}
    CommandInvokerOptions are the resolved command invoker options.
func (*CommandInvokerOptions) Apply ¶
func (o *CommandInvokerOptions) Apply( opts []CommandInvokerOption, rest ...CommandInvokerOption, )
Apply resolves the provided list of options.
func (*CommandInvokerOptions) ApplyOptions ¶
func (o *CommandInvokerOptions) ApplyOptions(opts []Option, rest ...Option)
ApplyOptions filters and resolves the provided list of options.
type CommandRequest ¶
type CommandRequest[Req any] struct { Message[Req] FencingToken hlc.HybridLogicalClock }
CommandRequest contains per-message data and methods that are exposed to the command handlers.
type CommandResponse ¶
CommandResponse contains per-message data and methods that are returned by the command handlers.
type Empty ¶
type Empty struct{}
    Empty represents an encoding that contains no value.
func (Empty) ContentType ¶
func (Empty) ContentType() string
ContentType returns the empty MIME type.
func (Empty) Deserialize ¶
Deserialize validates that the payload is empty.
func (Empty) PayloadFormat ¶ added in v0.2.0
func (Empty) PayloadFormat() byte
PayloadFormat indicates that empty is not (meaningfully) valid UTF8.
type Encoding ¶
type Encoding[T any] interface { ContentType() string PayloadFormat() byte Serialize(T) ([]byte, error) Deserialize([]byte) (T, error) }
Encoding is a translation between a concrete Go type T and byte data. All methods *must* be thread-safe.
type InvocationError ¶
InvocationError represents an error intentionally returned by a handler to indicate incorrect invocation.
type InvokeOption ¶
type InvokeOption interface {
	// contains filtered or unexported methods
}
    InvokeOption represent a single per-invoke option.
type InvokeOptions ¶
type InvokeOptions struct {
	FencingToken hlc.HybridLogicalClock
	Timeout     time.Duration
	TopicTokens map[string]string
	Metadata    map[string]string
}
    InvokeOptions are the resolved per-invoke options.
type JSON ¶
type JSON[T any] struct{}
JSON is a simple implementation of a JSON encoding.
func (JSON[T]) ContentType ¶
func (JSON[T]) ContentType() string
ContentType returns the JSON MIME type.
func (JSON[T]) Deserialize ¶
Deserialize translates JSON bytes into the Go type T.
func (JSON[T]) PayloadFormat ¶ added in v0.2.0
func (JSON[T]) PayloadFormat() byte
PayloadFormat indicates that JSON is valid UTF8.
type Listeners ¶ added in v0.2.0
type Listeners []Listener
Listeners represents a collection of MQTT listeners.
type Message ¶
type Message[T any] struct { // The message payload. Payload T // The ID of the calling MQTT client. ClientID string // The data that identifies a single unique request. CorrelationData string // The timestamp of when the message was sent. Timestamp hlc.HybridLogicalClock // All topic tokens resolved from the incoming topic. TopicTokens map[string]string // Any user-provided metadata values. Metadata map[string]string }
Message contains common message data that is exposed to message handlers.
type MqttClient ¶ added in v0.2.0
type MqttClient interface {
	ID() string
	Publish(
		context.Context,
		string,
		[]byte,
		...mqtt.PublishOption,
	) (*mqtt.Ack, error)
	RegisterMessageHandler(mqtt.MessageHandler) func()
	Subscribe(
		context.Context,
		string,
		...mqtt.SubscribeOption,
	) (*mqtt.Ack, error)
	Unsubscribe(
		context.Context,
		string,
		...mqtt.UnsubscribeOption,
	) (*mqtt.Ack, error)
}
    MqttClient is the client used for the underlying MQTT connection.
type Option ¶
type Option interface {
	// contains filtered or unexported methods
}
    Option represents any of the option types, and can be filtered and applied by the ApplyOptions methods on the option structs.
type Raw ¶ added in v0.1.1
type Raw struct{}
    Raw represents no encoding.
func (Raw) ContentType ¶ added in v0.1.1
func (Raw) ContentType() string
ContentType returns the raw MIME type.
func (Raw) Deserialize ¶ added in v0.1.1
Deserialize returns the bytes unchanged.
func (Raw) PayloadFormat ¶ added in v0.2.0
func (Raw) PayloadFormat() byte
PayloadFormat indicates that raw is not known to be valid UTF8.
type RespondOption ¶
type RespondOption interface {
	// contains filtered or unexported methods
}
    RespondOption represent a single per-response option.
type RespondOptions ¶
RespondOptions are the resolved per-response options.
type SendOption ¶
type SendOption interface {
	// contains filtered or unexported methods
}
    SendOption represent a single per-send option.
func WithCloudEvent ¶ added in v0.2.0
func WithCloudEvent(ce *CloudEvent) SendOption
WithCloudEvent adds a cloud event payload to the telemetry message.
type SendOptions ¶
type SendOptions struct {
	CloudEvent *CloudEvent
	Retain     bool
	Timeout     time.Duration
	TopicTokens map[string]string
	Metadata    map[string]string
}
    SendOptions are the resolved per-send options.
type TelemetryHandler ¶
TelemetryHandler is the user-provided implementation of a single telemetry event handler. It is treated as blocking; all parallelism is handled by the library. This *must* be thread-safe.
type TelemetryMessage ¶
type TelemetryMessage[T any] struct { Message[T] // CloudEvent will be present if the message was sent with cloud events. *CloudEvent // Ack provides a function to manually ack if enabled and if possible; // it will be nil otherwise. Note that, since QoS 0 messages cannot be // acked, this will be nil in this case even if manual ack is enabled. Ack func() }
TelemetryMessage contains per-message data and methods that are exposed to the telemetry handlers.
type TelemetryReceiver ¶
type TelemetryReceiver[T any] struct { // contains filtered or unexported fields }
TelemetryReceiver provides the ability to handle the receipt of a single telemetry.
func NewTelemetryReceiver ¶
func NewTelemetryReceiver[T any]( client MqttClient, encoding Encoding[T], topicPattern string, handler TelemetryHandler[T], opt ...TelemetryReceiverOption, ) (tr *TelemetryReceiver[T], err error)
NewTelemetryReceiver creates a new telemetry receiver.
func (*TelemetryReceiver[T]) Close ¶ added in v0.2.0
func (tr *TelemetryReceiver[T]) Close()
Close the telemetry receiver to free its resources.
type TelemetryReceiverOption ¶
type TelemetryReceiverOption interface {
	// contains filtered or unexported methods
}
    TelemetryReceiverOption represents a single telemetry receiver option.
type TelemetryReceiverOptions ¶
type TelemetryReceiverOptions struct {
	ManualAck bool
	Concurrency uint
	Timeout     time.Duration
	TopicNamespace string
	TopicTokens    map[string]string
	Logger         *slog.Logger
}
    TelemetryReceiverOptions are the resolved telemetry receiver options.
func (*TelemetryReceiverOptions) Apply ¶
func (o *TelemetryReceiverOptions) Apply( opts []TelemetryReceiverOption, rest ...TelemetryReceiverOption, )
Apply resolves the provided list of options.
func (*TelemetryReceiverOptions) ApplyOptions ¶
func (o *TelemetryReceiverOptions) ApplyOptions(opts []Option, rest ...Option)
ApplyOptions filters and resolves the provided list of options.
type TelemetrySender ¶
type TelemetrySender[T any] struct { // contains filtered or unexported fields }
TelemetrySender provides the ability to send a single telemetry.
func NewTelemetrySender ¶
func NewTelemetrySender[T any]( client MqttClient, encoding Encoding[T], topicPattern string, opt ...TelemetrySenderOption, ) (ts *TelemetrySender[T], err error)
NewTelemetrySender creates a new telemetry sender.
type TelemetrySenderOption ¶
type TelemetrySenderOption interface {
	// contains filtered or unexported methods
}
    TelemetrySenderOption represents a single telemetry sender option.
type TelemetrySenderOptions ¶
type TelemetrySenderOptions struct {
	TopicNamespace string
	TopicTokens    map[string]string
	Logger         *slog.Logger
}
    TelemetrySenderOptions are the resolved telemetry sender options.
func (*TelemetrySenderOptions) Apply ¶
func (o *TelemetrySenderOptions) Apply( opts []TelemetrySenderOption, rest ...TelemetrySenderOption, )
Apply resolves the provided list of options.
func (*TelemetrySenderOptions) ApplyOptions ¶
func (o *TelemetrySenderOptions) ApplyOptions(opts []Option, rest ...Option)
ApplyOptions filters and resolves the provided list of options.
type WithCacheTTL ¶
WithCacheTTL indicates how long results of this command will live in the cache. This is only valid for idempotent commands.
type WithConcurrency ¶
type WithConcurrency uint
WithConcurrency indicates how many handlers can execute in parallel.
type WithFencingToken ¶
type WithFencingToken hlc.HybridLogicalClock
WithFencingToken provides a fencing token to be used by the executor.
type WithManualAck ¶
type WithManualAck bool
WithManualAck indicates that the handler is responsible for manually acking the telemetry message.
type WithMetadata ¶
WithMetadata specifies user-provided metadata values.
type WithResponseTopic ¶
WithResponseTopic specifies a translation function from the request topic to the response topic. Note that this overrides any provided response topic prefix or suffix.
type WithResponseTopicPrefix ¶
type WithResponseTopicPrefix string
WithResponseTopicPrefix specifies a custom prefix for the response topic.
type WithResponseTopicSuffix ¶
type WithResponseTopicSuffix string
WithResponseTopicSuffix specifies a custom suffix for the response topic.
type WithRetain ¶
type WithRetain bool
WithRetain indicates that the telemetry event should be retained by the broker.
type WithShareName ¶
type WithShareName string
WithShareName connects this listener to a shared MQTT subscription.
type WithTimeout ¶ added in v0.2.0
WithTimeout applies a context timeout to the message invocation or handler execution, as appropriate.
type WithTopicNamespace ¶
type WithTopicNamespace string
WithTopicNamespace specifies a namespace that will be prepended to the topic.
type WithTopicTokenNamespace ¶
type WithTopicTokenNamespace string
WithTopicTokenNamespace specifies a namespace that will be prepended to all previously-specified topic tokens. Topic tokens specified after this option will not be namespaced, allowing this to differentiate user tokens from system tokens.
type WithTopicTokens ¶
WithTopicTokens specifies topic token values.