grpcbroker

package
v1.23.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 26, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BidiStream

type BidiStream[TMessage any] interface {
	// Send sends a message on the stream
	Send(msg *TMessage) error

	// Recv receives a message from the stream
	Recv() (*TMessage, error)
}

BidiStream is a unified interface for bidirectional gRPC streams. Both grpc.BidiStreamingClient[T, T] and grpc.BidiStreamingServer[T, T] implement this interface, allowing the MessageBroker to work with either client-side or server-side streams.

type MessageBroker

type MessageBroker[TMessage any] struct {
	// contains filtered or unexported fields
}

MessageBroker handles bidirectional message routing for gRPC streams. It supports both client pattern (request/response correlation via RequestId) and server pattern (handler registration for incoming requests).

TMessage is the raw message type used by the gRPC stream. The ops parameter provides stateless operations for manipulating messages.

This broker works with both client-side (grpc.BidiStreamingClient) and server-side (grpc.BidiStreamingServer) streams through the unified BidiStream interface.

func NewMessageBroker

func NewMessageBroker[TMessage any](
	stream BidiStream[TMessage],
	ops MessageEnvelope[TMessage],
	name string,
	logger *log.Logger,
) *MessageBroker[TMessage]

NewMessageBroker creates a new message broker for the given stream. The stream parameter can be either a client stream (grpc.BidiStreamingClient) or a server stream (grpc.BidiStreamingServer) as both implement the BidiStream interface. The ops parameter provides stateless operations for message manipulation. The name parameter is used for logging identification. The logger parameter sets the broker's private logger for trace output: Pass log.Default on the server side (azd core CLI) to inherit --debug semantics. Pass nil for silent operation (e.g., in extension processes where AZD_EXT_DEBUG controls logging).

func (*MessageBroker[TMessage]) Close

func (mb *MessageBroker[TMessage]) Close()

Close gracefully shuts down the broker (optional, for cleanup)

func (*MessageBroker[TMessage]) On

func (mb *MessageBroker[TMessage]) On(handler any) error

On registers a handler for a specific message type. The handler function signature should be one of:

  • func(ctx context.Context, req *RequestType) (*TMessage, error)
  • func(ctx context.Context, req *RequestType, progress ProgressFunc) (*TMessage, error)

The handler must return a complete envelope message. The broker will automatically set the RequestId and Error fields before sending the response.

func (*MessageBroker[TMessage]) Ready

func (mb *MessageBroker[TMessage]) Ready(ctx context.Context) error

Ready blocks until the message broker starts receiving messages or the context is cancelled. Multiple goroutines can call Ready() simultaneously - they will all be unblocked when Run() starts. Once the broker has started, subsequent calls to Ready() return immediately. Returns nil when ready, or context error if the context is cancelled before the broker becomes ready.

func (*MessageBroker[TMessage]) Run

func (mb *MessageBroker[TMessage]) Run(ctx context.Context) error

Run begins receiving and dispatching messages. This method blocks until the context is cancelled, the stream encounters an error, or the stream is closed by the remote peer. Returns nil on graceful shutdown (context cancelled or EOF), or the error that terminated the stream.

func (*MessageBroker[TMessage]) Send

func (mb *MessageBroker[TMessage]) Send(ctx context.Context, msg *TMessage) error

Send sends a message without waiting for a response. This is useful for fire-and-forget scenarios like subscriptions or notifications where no response is expected or needed. Returns an error only if the send operation itself fails.

func (*MessageBroker[TMessage]) SendAndWait

func (mb *MessageBroker[TMessage]) SendAndWait(ctx context.Context, msg *TMessage) (*TMessage, error)

SendAndWait sends a message and waits for the response

func (*MessageBroker[TMessage]) SendAndWaitWithProgress

func (mb *MessageBroker[TMessage]) SendAndWaitWithProgress(
	ctx context.Context,
	msg *TMessage,
	onProgress func(string),
) (*TMessage, error)

SendAndWaitWithProgress sends a message and waits for the response, handling progress updates

type MessageEnvelope

type MessageEnvelope[T any] interface {
	// GetRequestId extracts or generates the request/correlation ID from a message.
	// For messages with a RequestId field, this returns that field.
	// For messages without RequestId, this can generate a correlation key from message content and context.
	GetRequestId(ctx context.Context, msg *T) string

	// SetRequestId sets the request ID on a message.
	// For messages without a RequestId field, this can be a no-op.
	SetRequestId(ctx context.Context, msg *T, id string)

	// GetError extracts the error from a message, if any
	GetError(msg *T) error

	// SetError sets an error on a message
	SetError(msg *T, err error)

	// GetInnerMessage extracts the inner message from the envelope's oneof field
	GetInnerMessage(msg *T) any

	// IsProgressMessage returns true if the message is a progress message
	IsProgressMessage(msg *T) bool

	// GetProgressMessage extracts the progress message text from a progress message.
	// Returns empty string if the message is not a progress message.
	GetProgressMessage(msg *T) string

	// CreateProgressMessage creates a new progress message envelope with the given text.
	// This is used by server-side handlers to send progress updates back to clients.
	CreateProgressMessage(requestId string, message string) *T
}

MessageEnvelope provides broker-specific operations on message types. This is a stateless service that knows how to extract and manipulate message fields. The methods work with pointers (*T) to avoid copying and to match gRPC's pointer-based APIs. Context is provided to allow envelopes to extract metadata (e.g., extension claims from gRPC context).

type ProgressFunc

type ProgressFunc func(message string)

ProgressFunc is a callback function for sending progress updates during handler execution

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL