Documentation
¶
Index ¶
- type BidiStream
- type MessageBroker
- func (mb *MessageBroker[TMessage]) Close()
- func (mb *MessageBroker[TMessage]) On(handler any) error
- func (mb *MessageBroker[TMessage]) Ready(ctx context.Context) error
- func (mb *MessageBroker[TMessage]) Run(ctx context.Context) error
- func (mb *MessageBroker[TMessage]) Send(ctx context.Context, msg *TMessage) error
- func (mb *MessageBroker[TMessage]) SendAndWait(ctx context.Context, msg *TMessage) (*TMessage, error)
- func (mb *MessageBroker[TMessage]) SendAndWaitWithProgress(ctx context.Context, msg *TMessage, onProgress func(string)) (*TMessage, error)
- type MessageEnvelope
- type ProgressFunc
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BidiStream ¶
type BidiStream[TMessage any] interface { // Send sends a message on the stream Send(msg *TMessage) error // Recv receives a message from the stream Recv() (*TMessage, error) }
BidiStream is a unified interface for bidirectional gRPC streams. Both grpc.BidiStreamingClient[T, T] and grpc.BidiStreamingServer[T, T] implement this interface, allowing the MessageBroker to work with either client-side or server-side streams.
type MessageBroker ¶
type MessageBroker[TMessage any] struct { // contains filtered or unexported fields }
MessageBroker handles bidirectional message routing for gRPC streams. It supports both client pattern (request/response correlation via RequestId) and server pattern (handler registration for incoming requests).
TMessage is the raw message type used by the gRPC stream. The ops parameter provides stateless operations for manipulating messages.
This broker works with both client-side (grpc.BidiStreamingClient) and server-side (grpc.BidiStreamingServer) streams through the unified BidiStream interface.
func NewMessageBroker ¶
func NewMessageBroker[TMessage any]( stream BidiStream[TMessage], ops MessageEnvelope[TMessage], name string, logger *log.Logger, ) *MessageBroker[TMessage]
NewMessageBroker creates a new message broker for the given stream. The stream parameter can be either a client stream (grpc.BidiStreamingClient) or a server stream (grpc.BidiStreamingServer) as both implement the BidiStream interface. The ops parameter provides stateless operations for message manipulation. The name parameter is used for logging identification. The logger parameter sets the broker's private logger for trace output: Pass log.Default on the server side (azd core CLI) to inherit --debug semantics. Pass nil for silent operation (e.g., in extension processes where AZD_EXT_DEBUG controls logging).
func (*MessageBroker[TMessage]) Close ¶
func (mb *MessageBroker[TMessage]) Close()
Close gracefully shuts down the broker (optional, for cleanup)
func (*MessageBroker[TMessage]) On ¶
func (mb *MessageBroker[TMessage]) On(handler any) error
On registers a handler for a specific message type. The handler function signature should be one of:
- func(ctx context.Context, req *RequestType) (*TMessage, error)
- func(ctx context.Context, req *RequestType, progress ProgressFunc) (*TMessage, error)
The handler must return a complete envelope message. The broker will automatically set the RequestId and Error fields before sending the response.
func (*MessageBroker[TMessage]) Ready ¶
func (mb *MessageBroker[TMessage]) Ready(ctx context.Context) error
Ready blocks until the message broker starts receiving messages or the context is cancelled. Multiple goroutines can call Ready() simultaneously - they will all be unblocked when Run() starts. Once the broker has started, subsequent calls to Ready() return immediately. Returns nil when ready, or context error if the context is cancelled before the broker becomes ready.
func (*MessageBroker[TMessage]) Run ¶
func (mb *MessageBroker[TMessage]) Run(ctx context.Context) error
Run begins receiving and dispatching messages. This method blocks until the context is cancelled, the stream encounters an error, or the stream is closed by the remote peer. Returns nil on graceful shutdown (context cancelled or EOF), or the error that terminated the stream.
func (*MessageBroker[TMessage]) Send ¶
func (mb *MessageBroker[TMessage]) Send(ctx context.Context, msg *TMessage) error
Send sends a message without waiting for a response. This is useful for fire-and-forget scenarios like subscriptions or notifications where no response is expected or needed. Returns an error only if the send operation itself fails.
func (*MessageBroker[TMessage]) SendAndWait ¶
func (mb *MessageBroker[TMessage]) SendAndWait(ctx context.Context, msg *TMessage) (*TMessage, error)
SendAndWait sends a message and waits for the response
func (*MessageBroker[TMessage]) SendAndWaitWithProgress ¶
func (mb *MessageBroker[TMessage]) SendAndWaitWithProgress( ctx context.Context, msg *TMessage, onProgress func(string), ) (*TMessage, error)
SendAndWaitWithProgress sends a message and waits for the response, handling progress updates
type MessageEnvelope ¶
type MessageEnvelope[T any] interface { // GetRequestId extracts or generates the request/correlation ID from a message. // For messages with a RequestId field, this returns that field. // For messages without RequestId, this can generate a correlation key from message content and context. GetRequestId(ctx context.Context, msg *T) string // SetRequestId sets the request ID on a message. // For messages without a RequestId field, this can be a no-op. SetRequestId(ctx context.Context, msg *T, id string) // GetError extracts the error from a message, if any GetError(msg *T) error // SetError sets an error on a message SetError(msg *T, err error) // GetInnerMessage extracts the inner message from the envelope's oneof field GetInnerMessage(msg *T) any // IsProgressMessage returns true if the message is a progress message IsProgressMessage(msg *T) bool // GetProgressMessage extracts the progress message text from a progress message. // Returns empty string if the message is not a progress message. GetProgressMessage(msg *T) string // CreateProgressMessage creates a new progress message envelope with the given text. // This is used by server-side handlers to send progress updates back to clients. CreateProgressMessage(requestId string, message string) *T }
MessageEnvelope provides broker-specific operations on message types. This is a stateless service that knows how to extract and manipulate message fields. The methods work with pointers (*T) to avoid copying and to match gRPC's pointer-based APIs. Context is provided to allow envelopes to extract metadata (e.g., extension claims from gRPC context).
type ProgressFunc ¶
type ProgressFunc func(message string)
ProgressFunc is a callback function for sending progress updates during handler execution