protocol

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2024 License: MIT Imports: 17 Imported by: 2

README

protocol

import "github.com/Azure/iot-operations-sdks/go/protocol"

Index

Constants

DefaultMessageExpiry is the MessageExpiry applied to Invoke or Send if none is specified (10 seconds).

const DefaultMessageExpiry = 10

func Listen

func Listen(ctx context.Context, listeners ...Listener) (func(), error)

Listen starts all of the provided listeners.

func WithLogger

func WithLogger(logger *slog.Logger) interface {
    Option
    CommandExecutorOption
    CommandInvokerOption
    TelemetryReceiverOption
    TelemetrySenderOption
}

WithLogger enables logging with the provided slog logger.

type CommandExecutor

CommandExecutor provides the ability to execute a single command.

type CommandExecutor[Req any, Res any] struct {
    // contains filtered or unexported fields
}

func NewCommandExecutor
func NewCommandExecutor[Req, Res any](client mqtt.Client, requestEncoding Encoding[Req], responseEncoding Encoding[Res], requestTopic string, handler CommandHandler[Req, Res], opt ...CommandExecutorOption) (ce *CommandExecutor[Req, Res], err error)

NewCommandExecutor creates a new command executor.

func (*CommandExecutor[Req, Res]) Listen
func (ce *CommandExecutor[Req, Res]) Listen(ctx context.Context) (func(), error)

Listen to the MQTT request topic. Returns a function to stop listening. Note that cancelling this context will cause the unsubscribe call to fail.

type CommandExecutorOption

CommandExecutorOption represents a single command executor option.

type CommandExecutorOption interface {
    // contains filtered or unexported methods
}

type CommandExecutorOptions

CommandExecutorOptions are the resolved command executor options.

type CommandExecutorOptions struct {
    Idempotent bool
    CacheTTL   time.Duration

    Concurrency      uint
    ExecutionTimeout time.Duration
    ShareName        string

    TopicNamespace string
    TopicTokens    map[string]string
    Logger         *slog.Logger
}

func (*CommandExecutorOptions) Apply
func (o *CommandExecutorOptions) Apply(opts []CommandExecutorOption, rest ...CommandExecutorOption)

Apply resolves the provided list of options.

func (*CommandExecutorOptions) ApplyOptions
func (o *CommandExecutorOptions) ApplyOptions(opts []Option, rest ...Option)

ApplyOptions filters and resolves the provided list of options.

type CommandHandler

CommandHandler is the user-provided implementation of a single command execution. It is treated as blocking; all parallelism is handled by the library. This *must* be thread-safe.

type CommandHandler[Req any, Res any] func(
    context.Context,
    *CommandRequest[Req],
) (*CommandResponse[Res], error)

type CommandInvoker

CommandInvoker provides the ability to invoke a single command.

type CommandInvoker[Req any, Res any] struct {
    // contains filtered or unexported fields
}

func NewCommandInvoker
func NewCommandInvoker[Req, Res any](client mqtt.Client, requestEncoding Encoding[Req], responseEncoding Encoding[Res], requestTopic string, opt ...CommandInvokerOption) (ci *CommandInvoker[Req, Res], err error)

NewCommandInvoker creates a new command invoker.

func (*CommandInvoker[Req, Res]) Invoke
func (ci *CommandInvoker[Req, Res]) Invoke(ctx context.Context, req Req, opt ...InvokeOption) (res *CommandResponse[Res], err error)

Invoke calls the command. This call will block until the command returns; any desired parallelism between invocations should be handled by the caller using normal Go constructs.

func (*CommandInvoker[Req, Res]) Listen
func (ci *CommandInvoker[Req, Res]) Listen(ctx context.Context) (func(), error)

Listen to the response topic(s). Returns a function to stop listening. Must be called before any calls to Invoke. Note that cancelling this context will cause the unsubscribe call to fail.

type CommandInvokerOption

CommandInvokerOption represents a single command invoker option.

type CommandInvokerOption interface {
    // contains filtered or unexported methods
}

type CommandInvokerOptions

CommandInvokerOptions are the resolved command invoker options.

type CommandInvokerOptions struct {
    ResponseTopic       func(string) string
    ResponseTopicPrefix string
    ResponseTopicSuffix string

    TopicNamespace string
    TopicTokens    map[string]string
    Logger         *slog.Logger
}

func (*CommandInvokerOptions) Apply
func (o *CommandInvokerOptions) Apply(opts []CommandInvokerOption, rest ...CommandInvokerOption)

Apply resolves the provided list of options.

func (*CommandInvokerOptions) ApplyOptions
func (o *CommandInvokerOptions) ApplyOptions(opts []Option, rest ...Option)

ApplyOptions filters and resolves the provided list of options.

type CommandRequest

CommandRequest contains per-message data and methods that are exposed to the command handlers.

type CommandRequest[Req any] struct {
    FencingToken hlc.HybridLogicalClock
    // contains filtered or unexported fields
}

type CommandResponse

CommandResponse contains per-message data and methods that are returned by the command handlers.

type CommandResponse[Res any] struct {
    // contains filtered or unexported fields
}

func Respond
func Respond[Res any](payload Res, opt ...RespondOption) (*CommandResponse[Res], error)

Respond is a shorthand to create a command response with required values and options set appropriately. Note that the response may be incomplete and will be filled out by the library after being returned.

type Empty

Empty represents an encoding that contains no value.

type Empty struct{}

func (Empty) ContentType
func (Empty) ContentType() string

ContentType returns the empty MIME type.

func (Empty) Deserialize
func (Empty) Deserialize(data []byte) (any, error)

Deserialize validates that the payload is empty.

func (Empty) IsUTF8
func (Empty) IsUTF8() bool

IsUTF8 indicates that empty is not (meaningfully) valid UTF8.

func (Empty) Serialize
func (Empty) Serialize(t any) ([]byte, error)

Serialize validates that the payload is empty.

type Encoding

Encoding is a translation between a concrete Go type T and byte data. All methods *must* be thread-safe.

type Encoding[T any] interface {
    ContentType() string
    IsUTF8() bool
    Serialize(T) ([]byte, error)
    Deserialize([]byte) (T, error)
}

type InvocationError

InvocationError represents an error intentionally returned by a handler to indicate incorrect invocation.

type InvocationError struct {
    Message       string
    PropertyName  string
    PropertyValue any
}

func (InvocationError) Error
func (e InvocationError) Error() string

Error returns the invocation error as a string.

type InvokeOption

InvokeOption represent a single per-invoke option.

type InvokeOption interface {
    // contains filtered or unexported methods
}

type InvokeOptions

InvokeOptions are the resolved per-invoke options.

type InvokeOptions struct {
    FencingToken hlc.HybridLogicalClock

    MessageExpiry uint32
    TopicTokens   map[string]string
    Metadata      map[string]string
}

func (*InvokeOptions) Apply
func (o *InvokeOptions) Apply(opts []InvokeOption, rest ...InvokeOption)

Apply resolves the provided list of options.

type JSON

JSON is a simple implementation of a JSON encoding.

type JSON[T any] struct{}

func (JSON[T]) ContentType
func (JSON[T]) ContentType() string

ContentType returns the JSON MIME type.

func (JSON[T]) Deserialize
func (JSON[T]) Deserialize(data []byte) (T, error)

Deserialize translates JSON bytes into the Go type T.

func (JSON[T]) IsUTF8
func (JSON[T]) IsUTF8() bool

IsUTF8 indicates that JSON is valid UTF8.

func (JSON[T]) Serialize
func (JSON[T]) Serialize(t T) ([]byte, error)

Serialize translates the Go type T into JSON bytes.

type Listener

Listener represents an object which will listen to a MQTT topic.

type Listener interface {
    Listen(context.Context) (func(), error)
}

type Message

Message contains common message data that is exposed to message handlers.

type Message[T any] struct {
    // The message payload.
    Payload T

    // The ID of the calling MQTT client.
    // TODO: Rename to "source" to align to Cloud Events spec?
    ClientID string

    // The data that identifies a single unique request.
    CorrelationData string

    // The timestamp of when the message was sent.
    Timestamp hlc.HybridLogicalClock

    // Any user-provided metadata values.
    Metadata map[string]string
}

type Option

Option represents any of the option types, and can be filtered and applied by the ApplyOptions methods on the option structs.

type Option interface {
    // contains filtered or unexported methods
}

type Raw

Raw represents no encoding.

type Raw struct{}

func (Raw) ContentType
func (Raw) ContentType() string

ContentType returns the raw MIME type.

func (Raw) Deserialize
func (Raw) Deserialize(data []byte) ([]byte, error)

Deserialize returns the bytes unchanged.

func (Raw) IsUTF8
func (Raw) IsUTF8() bool

IsUTF8 indicates that raw is not known to be valid UTF8.

func (Raw) Serialize
func (Raw) Serialize(t []byte) ([]byte, error)

Serialize returns the bytes unchanged.

type RespondOption

RespondOption represent a single per-response option.

type RespondOption interface {
    // contains filtered or unexported methods
}

type RespondOptions

RespondOptions are the resolved per-response options.

type RespondOptions struct {
    Metadata map[string]string
}

func (*RespondOptions) Apply
func (o *RespondOptions) Apply(opts []RespondOption, rest ...RespondOption)

Apply resolves the provided list of options.

type SendOption

SendOption represent a single per-send option.

type SendOption interface {
    // contains filtered or unexported methods
}

type SendOptions

SendOptions are the resolved per-send options.

type SendOptions struct {
    Retain bool

    MessageExpiry uint32
    TopicTokens   map[string]string
    Metadata      map[string]string
}

func (*SendOptions) Apply
func (o *SendOptions) Apply(opts []SendOption, rest ...SendOption)

Apply resolves the provided list of options.

type TelemetryHandler

TelemetryHandler is the user-provided implementation of a single telemetry event handler. It is treated as blocking; all parallelism is handled by the library. This *must* be thread-safe.

type TelemetryHandler[T any] func(context.Context, *TelemetryMessage[T]) error

type TelemetryMessage

TelemetryMessage contains per-message data and methods that are exposed to the telemetry handlers.

type TelemetryMessage[T any] struct {

    // Ack provides a function to manually ack if enabled; it will be nil
    // otherwise.
    Ack func() error
    // contains filtered or unexported fields
}

type TelemetryReceiver

TelemetryReceiver provides the ability to handle the receipt of a single telemetry.

type TelemetryReceiver[T any] struct {
    // contains filtered or unexported fields
}

func NewTelemetryReceiver
func NewTelemetryReceiver[T any](client mqtt.Client, encoding Encoding[T], topic string, handler TelemetryHandler[T], opt ...TelemetryReceiverOption) (tr *TelemetryReceiver[T], err error)

NewTelemetryReceiver creates a new telemetry receiver.

func (*TelemetryReceiver[T]) Listen
func (tr *TelemetryReceiver[T]) Listen(ctx context.Context) (func(), error)

Listen to the MQTT telemetry topic. Returns a function to stop listening. Note that cancelling this context will cause the unsubscribe call to fail.

type TelemetryReceiverOption

TelemetryReceiverOption represents a single telemetry receiver option.

type TelemetryReceiverOption interface {
    // contains filtered or unexported methods
}

type TelemetryReceiverOptions

TelemetryReceiverOptions are the resolved telemetry receiver options.

type TelemetryReceiverOptions struct {
    ManualAck bool

    Concurrency      uint
    ExecutionTimeout time.Duration
    ShareName        string

    TopicNamespace string
    TopicTokens    map[string]string
    Logger         *slog.Logger
}

func (*TelemetryReceiverOptions) Apply
func (o *TelemetryReceiverOptions) Apply(opts []TelemetryReceiverOption, rest ...TelemetryReceiverOption)

Apply resolves the provided list of options.

func (*TelemetryReceiverOptions) ApplyOptions
func (o *TelemetryReceiverOptions) ApplyOptions(opts []Option, rest ...Option)

ApplyOptions filters and resolves the provided list of options.

type TelemetrySender

TelemetrySender provides the ability to send a single telemetry.

type TelemetrySender[T any] struct {
    // contains filtered or unexported fields
}

func NewTelemetrySender
func NewTelemetrySender[T any](client mqtt.Client, encoding Encoding[T], topic string, opt ...TelemetrySenderOption) (ts *TelemetrySender[T], err error)

NewTelemetrySender creates a new telemetry sender.

func (*TelemetrySender[T]) Send
func (ts *TelemetrySender[T]) Send(ctx context.Context, val T, opt ...SendOption) (err error)

Send emits the telemetry. This will block until the message is ack'd.

type TelemetrySenderOption

TelemetrySenderOption represents a single telemetry sender option.

type TelemetrySenderOption interface {
    // contains filtered or unexported methods
}

type TelemetrySenderOptions

TelemetrySenderOptions are the resolved telemetry sender options.

type TelemetrySenderOptions struct {
    TopicNamespace string
    TopicTokens    map[string]string
    Logger         *slog.Logger
}

func (*TelemetrySenderOptions) Apply
func (o *TelemetrySenderOptions) Apply(opts []TelemetrySenderOption, rest ...TelemetrySenderOption)

Apply resolves the provided list of options.

func (*TelemetrySenderOptions) ApplyOptions
func (o *TelemetrySenderOptions) ApplyOptions(opts []Option, rest ...Option)

ApplyOptions filters and resolves the provided list of options.

type WithCacheTTL

WithCacheTTL indicates how long results of this command will live in the cache. This is only valid for idempotent commands.

type WithCacheTTL time.Duration

type WithConcurrency

WithConcurrency indicates how many handlers can execute in parallel.

type WithConcurrency uint

type WithExecutionTimeout

WithExecutionTimeout applies a context timeout to the handler execution.

type WithExecutionTimeout time.Duration

type WithFencingToken

WithFencingToken provides a fencing token to be used by the executor.

type WithFencingToken hlc.HybridLogicalClock

type WithIdempotent

WithIdempotent marks the command as idempotent.

type WithIdempotent bool

type WithManualAck

WithManualAck indicates that the handler is responsible for manually acking the telemetry message.

type WithManualAck bool

type WithMessageExpiry

WithMessageExpiry applies an MQTT message expiry (in seconds).

type WithMessageExpiry uint32

type WithMetadata

WithMetadata specifies user-provided metadata values.

type WithMetadata map[string]string

type WithResponseTopic

WithResponseTopic specifies a translation function from the request topic to the response topic. Note that this overrides any provided response topic prefix or suffix.

type WithResponseTopic func(string) string

type WithResponseTopicPrefix

WithResponseTopicPrefix specifies a custom prefix for the response topic.

type WithResponseTopicPrefix string

type WithResponseTopicSuffix

WithResponseTopicSuffix specifies a custom suffix for the response topic.

type WithResponseTopicSuffix string

type WithRetain

WithRetain indicates that the telemetry event should be retained by the broker.

type WithRetain bool

type WithShareName

WithShareName connects this listener to a shared MQTT subscription.

type WithShareName string

type WithTopicNamespace

WithTopicNamespace specifies a namespace that will be prepended to the topic.

type WithTopicNamespace string

type WithTopicTokenNamespace

WithTopicTokenNamespace specifies a namespace that will be prepended to all previously-specified topic tokens. Topic tokens specified after this option will not be namespaced, allowing this to differentiate user tokens from system tokens.

type WithTopicTokenNamespace string

type WithTopicTokens

WithTopicTokens specifies topic token values.

type WithTopicTokens map[string]string

Generated by gomarkdoc

Documentation

Index

Constants

View Source
const DefaultMessageExpiry = 10

DefaultMessageExpiry is the MessageExpiry applied to Invoke or Send if none is specified (10 seconds).

Variables

This section is empty.

Functions

func Listen

func Listen(ctx context.Context, listeners ...Listener) (func(), error)

Listen starts all of the provided listeners.

func WithLogger

func WithLogger(logger *slog.Logger) interface {
	Option
	CommandExecutorOption
	CommandInvokerOption
	TelemetryReceiverOption
	TelemetrySenderOption
}

WithLogger enables logging with the provided slog logger.

Types

type CommandExecutor

type CommandExecutor[Req any, Res any] struct {
	// contains filtered or unexported fields
}

CommandExecutor provides the ability to execute a single command.

func NewCommandExecutor

func NewCommandExecutor[Req, Res any](
	client mqtt.Client,
	requestEncoding Encoding[Req],
	responseEncoding Encoding[Res],
	requestTopic string,
	handler CommandHandler[Req, Res],
	opt ...CommandExecutorOption,
) (ce *CommandExecutor[Req, Res], err error)

NewCommandExecutor creates a new command executor.

func (*CommandExecutor[Req, Res]) Listen

func (ce *CommandExecutor[Req, Res]) Listen(
	ctx context.Context,
) (func(), error)

Listen to the MQTT request topic. Returns a function to stop listening. Note that cancelling this context will cause the unsubscribe call to fail.

type CommandExecutorOption

type CommandExecutorOption interface {
	// contains filtered or unexported methods
}

CommandExecutorOption represents a single command executor option.

type CommandExecutorOptions

type CommandExecutorOptions struct {
	Idempotent bool
	CacheTTL   time.Duration

	Concurrency      uint
	ExecutionTimeout time.Duration
	ShareName        string

	TopicNamespace string
	TopicTokens    map[string]string
	Logger         *slog.Logger
}

CommandExecutorOptions are the resolved command executor options.

func (*CommandExecutorOptions) Apply

func (o *CommandExecutorOptions) Apply(
	opts []CommandExecutorOption,
	rest ...CommandExecutorOption,
)

Apply resolves the provided list of options.

func (*CommandExecutorOptions) ApplyOptions

func (o *CommandExecutorOptions) ApplyOptions(opts []Option, rest ...Option)

ApplyOptions filters and resolves the provided list of options.

type CommandHandler

type CommandHandler[Req any, Res any] func(
	context.Context,
	*CommandRequest[Req],
) (*CommandResponse[Res], error)

CommandHandler is the user-provided implementation of a single command execution. It is treated as blocking; all parallelism is handled by the library. This *must* be thread-safe.

type CommandInvoker

type CommandInvoker[Req any, Res any] struct {
	// contains filtered or unexported fields
}

CommandInvoker provides the ability to invoke a single command.

func NewCommandInvoker

func NewCommandInvoker[Req, Res any](
	client mqtt.Client,
	requestEncoding Encoding[Req],
	responseEncoding Encoding[Res],
	requestTopic string,
	opt ...CommandInvokerOption,
) (ci *CommandInvoker[Req, Res], err error)

NewCommandInvoker creates a new command invoker.

func (*CommandInvoker[Req, Res]) Invoke

func (ci *CommandInvoker[Req, Res]) Invoke(
	ctx context.Context,
	req Req,
	opt ...InvokeOption,
) (res *CommandResponse[Res], err error)

Invoke calls the command. This call will block until the command returns; any desired parallelism between invocations should be handled by the caller using normal Go constructs.

func (*CommandInvoker[Req, Res]) Listen

func (ci *CommandInvoker[Req, Res]) Listen(
	ctx context.Context,
) (func(), error)

Listen to the response topic(s). Returns a function to stop listening. Must be called before any calls to Invoke. Note that cancelling this context will cause the unsubscribe call to fail.

type CommandInvokerOption

type CommandInvokerOption interface {
	// contains filtered or unexported methods
}

CommandInvokerOption represents a single command invoker option.

type CommandInvokerOptions

type CommandInvokerOptions struct {
	ResponseTopic       func(string) string
	ResponseTopicPrefix string
	ResponseTopicSuffix string

	TopicNamespace string
	TopicTokens    map[string]string
	Logger         *slog.Logger
}

CommandInvokerOptions are the resolved command invoker options.

func (*CommandInvokerOptions) Apply

func (o *CommandInvokerOptions) Apply(
	opts []CommandInvokerOption,
	rest ...CommandInvokerOption,
)

Apply resolves the provided list of options.

func (*CommandInvokerOptions) ApplyOptions

func (o *CommandInvokerOptions) ApplyOptions(opts []Option, rest ...Option)

ApplyOptions filters and resolves the provided list of options.

type CommandRequest

type CommandRequest[Req any] struct {
	Message[Req]

	FencingToken hlc.HybridLogicalClock
}

CommandRequest contains per-message data and methods that are exposed to the command handlers.

type CommandResponse

type CommandResponse[Res any] struct {
	Message[Res]
}

CommandResponse contains per-message data and methods that are returned by the command handlers.

func Respond

func Respond[Res any](
	payload Res,
	opt ...RespondOption,
) (*CommandResponse[Res], error)

Respond is a shorthand to create a command response with required values and options set appropriately. Note that the response may be incomplete and will be filled out by the library after being returned.

type Empty

type Empty struct{}

Empty represents an encoding that contains no value.

func (Empty) ContentType

func (Empty) ContentType() string

ContentType returns the empty MIME type.

func (Empty) Deserialize

func (Empty) Deserialize(data []byte) (any, error)

Deserialize validates that the payload is empty.

func (Empty) IsUTF8

func (Empty) IsUTF8() bool

IsUTF8 indicates that empty is not (meaningfully) valid UTF8.

func (Empty) Serialize

func (Empty) Serialize(t any) ([]byte, error)

Serialize validates that the payload is empty.

type Encoding

type Encoding[T any] interface {
	ContentType() string
	IsUTF8() bool
	Serialize(T) ([]byte, error)
	Deserialize([]byte) (T, error)
}

Encoding is a translation between a concrete Go type T and byte data. All methods *must* be thread-safe.

type InvocationError

type InvocationError struct {
	Message       string
	PropertyName  string
	PropertyValue any
}

InvocationError represents an error intentionally returned by a handler to indicate incorrect invocation.

func (InvocationError) Error

func (e InvocationError) Error() string

Error returns the invocation error as a string.

type InvokeOption

type InvokeOption interface {
	// contains filtered or unexported methods
}

InvokeOption represent a single per-invoke option.

type InvokeOptions

type InvokeOptions struct {
	FencingToken hlc.HybridLogicalClock

	MessageExpiry uint32
	TopicTokens   map[string]string
	Metadata      map[string]string
}

InvokeOptions are the resolved per-invoke options.

func (*InvokeOptions) Apply

func (o *InvokeOptions) Apply(
	opts []InvokeOption,
	rest ...InvokeOption,
)

Apply resolves the provided list of options.

type JSON

type JSON[T any] struct{}

JSON is a simple implementation of a JSON encoding.

func (JSON[T]) ContentType

func (JSON[T]) ContentType() string

ContentType returns the JSON MIME type.

func (JSON[T]) Deserialize

func (JSON[T]) Deserialize(data []byte) (T, error)

Deserialize translates JSON bytes into the Go type T.

func (JSON[T]) IsUTF8

func (JSON[T]) IsUTF8() bool

IsUTF8 indicates that JSON is valid UTF8.

func (JSON[T]) Serialize

func (JSON[T]) Serialize(t T) ([]byte, error)

Serialize translates the Go type T into JSON bytes.

type Listener

type Listener interface {
	Listen(context.Context) (func(), error)
}

Listener represents an object which will listen to a MQTT topic.

type Message

type Message[T any] struct {
	// The message payload.
	Payload T

	// The ID of the calling MQTT client.
	// TODO: Rename to "source" to align to Cloud Events spec?
	ClientID string

	// The data that identifies a single unique request.
	CorrelationData string

	// The timestamp of when the message was sent.
	Timestamp hlc.HybridLogicalClock

	// Any user-provided metadata values.
	Metadata map[string]string
}

Message contains common message data that is exposed to message handlers.

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option represents any of the option types, and can be filtered and applied by the ApplyOptions methods on the option structs.

type Raw added in v0.1.1

type Raw struct{}

Raw represents no encoding.

func (Raw) ContentType added in v0.1.1

func (Raw) ContentType() string

ContentType returns the raw MIME type.

func (Raw) Deserialize added in v0.1.1

func (Raw) Deserialize(data []byte) ([]byte, error)

Deserialize returns the bytes unchanged.

func (Raw) IsUTF8 added in v0.1.1

func (Raw) IsUTF8() bool

IsUTF8 indicates that raw is not known to be valid UTF8.

func (Raw) Serialize added in v0.1.1

func (Raw) Serialize(t []byte) ([]byte, error)

Serialize returns the bytes unchanged.

type RespondOption

type RespondOption interface {
	// contains filtered or unexported methods
}

RespondOption represent a single per-response option.

type RespondOptions

type RespondOptions struct {
	Metadata map[string]string
}

RespondOptions are the resolved per-response options.

func (*RespondOptions) Apply

func (o *RespondOptions) Apply(
	opts []RespondOption,
	rest ...RespondOption,
)

Apply resolves the provided list of options.

type SendOption

type SendOption interface {
	// contains filtered or unexported methods
}

SendOption represent a single per-send option.

type SendOptions

type SendOptions struct {
	Retain bool

	MessageExpiry uint32
	TopicTokens   map[string]string
	Metadata      map[string]string
}

SendOptions are the resolved per-send options.

func (*SendOptions) Apply

func (o *SendOptions) Apply(
	opts []SendOption,
	rest ...SendOption,
)

Apply resolves the provided list of options.

type TelemetryHandler

type TelemetryHandler[T any] func(context.Context, *TelemetryMessage[T]) error

TelemetryHandler is the user-provided implementation of a single telemetry event handler. It is treated as blocking; all parallelism is handled by the library. This *must* be thread-safe.

type TelemetryMessage

type TelemetryMessage[T any] struct {
	Message[T]

	// Ack provides a function to manually ack if enabled; it will be nil
	// otherwise.
	Ack func() error
}

TelemetryMessage contains per-message data and methods that are exposed to the telemetry handlers.

type TelemetryReceiver

type TelemetryReceiver[T any] struct {
	// contains filtered or unexported fields
}

TelemetryReceiver provides the ability to handle the receipt of a single telemetry.

func NewTelemetryReceiver

func NewTelemetryReceiver[T any](
	client mqtt.Client,
	encoding Encoding[T],
	topic string,
	handler TelemetryHandler[T],
	opt ...TelemetryReceiverOption,
) (tr *TelemetryReceiver[T], err error)

NewTelemetryReceiver creates a new telemetry receiver.

func (*TelemetryReceiver[T]) Listen

func (tr *TelemetryReceiver[T]) Listen(
	ctx context.Context,
) (func(), error)

Listen to the MQTT telemetry topic. Returns a function to stop listening. Note that cancelling this context will cause the unsubscribe call to fail.

type TelemetryReceiverOption

type TelemetryReceiverOption interface {
	// contains filtered or unexported methods
}

TelemetryReceiverOption represents a single telemetry receiver option.

type TelemetryReceiverOptions

type TelemetryReceiverOptions struct {
	ManualAck bool

	Concurrency      uint
	ExecutionTimeout time.Duration
	ShareName        string

	TopicNamespace string
	TopicTokens    map[string]string
	Logger         *slog.Logger
}

TelemetryReceiverOptions are the resolved telemetry receiver options.

func (*TelemetryReceiverOptions) Apply

func (o *TelemetryReceiverOptions) Apply(
	opts []TelemetryReceiverOption,
	rest ...TelemetryReceiverOption,
)

Apply resolves the provided list of options.

func (*TelemetryReceiverOptions) ApplyOptions

func (o *TelemetryReceiverOptions) ApplyOptions(opts []Option, rest ...Option)

ApplyOptions filters and resolves the provided list of options.

type TelemetrySender

type TelemetrySender[T any] struct {
	// contains filtered or unexported fields
}

TelemetrySender provides the ability to send a single telemetry.

func NewTelemetrySender

func NewTelemetrySender[T any](
	client mqtt.Client,
	encoding Encoding[T],
	topic string,
	opt ...TelemetrySenderOption,
) (ts *TelemetrySender[T], err error)

NewTelemetrySender creates a new telemetry sender.

func (*TelemetrySender[T]) Send

func (ts *TelemetrySender[T]) Send(
	ctx context.Context,
	val T,
	opt ...SendOption,
) (err error)

Send emits the telemetry. This will block until the message is ack'd.

type TelemetrySenderOption

type TelemetrySenderOption interface {
	// contains filtered or unexported methods
}

TelemetrySenderOption represents a single telemetry sender option.

type TelemetrySenderOptions

type TelemetrySenderOptions struct {
	TopicNamespace string
	TopicTokens    map[string]string
	Logger         *slog.Logger
}

TelemetrySenderOptions are the resolved telemetry sender options.

func (*TelemetrySenderOptions) Apply

func (o *TelemetrySenderOptions) Apply(
	opts []TelemetrySenderOption,
	rest ...TelemetrySenderOption,
)

Apply resolves the provided list of options.

func (*TelemetrySenderOptions) ApplyOptions

func (o *TelemetrySenderOptions) ApplyOptions(opts []Option, rest ...Option)

ApplyOptions filters and resolves the provided list of options.

type WithCacheTTL

type WithCacheTTL time.Duration

WithCacheTTL indicates how long results of this command will live in the cache. This is only valid for idempotent commands.

type WithConcurrency

type WithConcurrency uint

WithConcurrency indicates how many handlers can execute in parallel.

type WithExecutionTimeout

type WithExecutionTimeout time.Duration

WithExecutionTimeout applies a context timeout to the handler execution.

type WithFencingToken

type WithFencingToken hlc.HybridLogicalClock

WithFencingToken provides a fencing token to be used by the executor.

type WithIdempotent

type WithIdempotent bool

WithIdempotent marks the command as idempotent.

type WithManualAck

type WithManualAck bool

WithManualAck indicates that the handler is responsible for manually acking the telemetry message.

type WithMessageExpiry

type WithMessageExpiry uint32

WithMessageExpiry applies an MQTT message expiry (in seconds).

type WithMetadata

type WithMetadata map[string]string

WithMetadata specifies user-provided metadata values.

type WithResponseTopic

type WithResponseTopic func(string) string

WithResponseTopic specifies a translation function from the request topic to the response topic. Note that this overrides any provided response topic prefix or suffix.

type WithResponseTopicPrefix

type WithResponseTopicPrefix string

WithResponseTopicPrefix specifies a custom prefix for the response topic.

type WithResponseTopicSuffix

type WithResponseTopicSuffix string

WithResponseTopicSuffix specifies a custom suffix for the response topic.

type WithRetain

type WithRetain bool

WithRetain indicates that the telemetry event should be retained by the broker.

type WithShareName

type WithShareName string

WithShareName connects this listener to a shared MQTT subscription.

type WithTopicNamespace

type WithTopicNamespace string

WithTopicNamespace specifies a namespace that will be prepended to the topic.

type WithTopicTokenNamespace

type WithTopicTokenNamespace string

WithTopicTokenNamespace specifies a namespace that will be prepended to all previously-specified topic tokens. Topic tokens specified after this option will not be namespaced, allowing this to differentiate user tokens from system tokens.

type WithTopicTokens

type WithTopicTokens map[string]string

WithTopicTokens specifies topic token values.

Directories

Path Synopsis
log

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL