Documentation
¶
Overview ¶
Package bus defines the message bus interface and implementations.
Index ¶
Constants ¶
const ( // ServiceRegistrationRequestTopic is the topic for service registration requests. ServiceRegistrationRequestTopic = "service_registration_requests" // ServiceRegistrationResultTopic is the topic for service registration results. ServiceRegistrationResultTopic = "service_registration_results" // ServiceListRequestTopic is the topic for service list requests. ServiceListRequestTopic = "service_list_requests" // ServiceListResultTopic is the topic for service list results. ServiceListResultTopic = "service_list_results" // ToolExecutionRequestTopic is the topic for tool execution requests. ToolExecutionRequestTopic = "tool_execution_requests" // ToolExecutionResultTopic is the topic for tool execution results. ToolExecutionResultTopic = "tool_execution_results" )
Variables ¶
var GetBusHook func(p *Provider, topic string) any
GetBusHook is a test hook for overriding the bus retrieval logic.
var NewProviderHook func(*bus.MessageBus) (*Provider, error)
NewProviderHook is a test hook for overriding the NewProvider logic.
Functions ¶
This section is empty.
Types ¶
type BaseMessage ¶
type BaseMessage struct {
CID string `json:"cid"`
}
BaseMessage provides a default implementation of the Message interface. It includes a correlation ID field (`CID`) and can be embedded in other message structs to provide a common mechanism for message tracking.
func (*BaseMessage) CorrelationID ¶
func (m *BaseMessage) CorrelationID() string
CorrelationID returns the correlation ID of the message. This ID is used to associate requests with their corresponding responses in asynchronous workflows.
func (*BaseMessage) SetCorrelationID ¶
func (m *BaseMessage) SetCorrelationID(id string)
SetCorrelationID sets the correlation ID for the message. This is typically called by the message publisher to assign a unique ID to a request.
type Bus ¶
type Bus[T any] interface { // Publish sends a message to all subscribers of a given topic. The message // is sent to each subscriber's channel, and the handler is invoked by a // dedicated goroutine for that subscriber. // // ctx is the context for the publish operation. // topic is the topic to publish the message to. // msg is the message to be sent. Publish(ctx context.Context, topic string, msg T) error // Subscribe registers a handler function for a given topic. It starts a // dedicated goroutine for the subscription to process messages from a // channel. // // ctx is the context for the subscribe operation. // topic is the topic to subscribe to. // handler is the function to be called with the message. // It returns a function that can be called to unsubscribe the handler. Subscribe(ctx context.Context, topic string, handler func(T)) (unsubscribe func()) // SubscribeOnce registers a handler function that will be invoked only once // for a given topic. After the handler is called, the subscription is // automatically removed. // // ctx is the context for the subscribe operation. // topic is the topic to subscribe to. // handler is the function to be called with the message. // It returns a function that can be called to unsubscribe the handler // before it has been invoked. SubscribeOnce(ctx context.Context, topic string, handler func(T)) (unsubscribe func()) }
Bus defines the interface for a generic, type-safe event bus that facilitates communication between different parts of the application. The type parameter T specifies the type of message that the bus will handle.
type Message ¶
type Message interface {
// CorrelationID returns the unique identifier used to correlate messages.
CorrelationID() string
// SetCorrelationID sets the correlation identifier for the message.
SetCorrelationID(id string)
}
Message defines the interface that all messages exchanged on the event bus must implement. It provides a standard way to manage correlation IDs for tracking requests and their corresponding responses.
type Provider ¶
type Provider struct {
// contains filtered or unexported fields
}
Provider is a thread-safe container for managing multiple, type-safe bus instances, with each bus being dedicated to a specific topic. It ensures that for any given topic, there is only one bus instance, creating one on demand if it doesn't already exist.
This allows different parts of the application to get a bus for a specific message type and topic without needing to manage the lifecycle of the bus instances themselves.
func NewProvider ¶
func NewProvider(messageBus *bus.MessageBus) (*Provider, error)
NewProvider creates and returns a new Provider, which is used to manage multiple topic-based bus instances.
type ServiceListRequest ¶
type ServiceListRequest struct {
BaseMessage
}
ServiceListRequest is a message sent to the bus to request a list of all registered services.
type ServiceListResult ¶
type ServiceListResult struct {
BaseMessage
Services []*configv1.UpstreamServiceConfig
Error error
}
ServiceListResult is a message published in response to a ServiceListRequest. It contains a list of all registered services.
type ServiceRegistrationRequest ¶
type ServiceRegistrationRequest struct {
BaseMessage
Context context.Context
Config *configv1.UpstreamServiceConfig
}
ServiceRegistrationRequest is a message sent to the bus to request the registration of a new upstream service. It contains the service's configuration and the context for the request.
type ServiceRegistrationResult ¶
type ServiceRegistrationResult struct {
BaseMessage
ServiceKey string
DiscoveredTools []*configv1.ToolDefinition
DiscoveredResources []*configv1.ResourceDefinition
Error error
}
ServiceRegistrationResult is a message published in response to a ServiceRegistrationRequest. It contains the outcome of the registration process, including the generated service key, a list of any tools that were discovered, or an error if the registration failed.
type ToolExecutionRequest ¶
type ToolExecutionRequest struct {
BaseMessage
Context context.Context
ToolName string
ToolInputs json.RawMessage
}
ToolExecutionRequest is a message sent to the bus to request the execution of a specific tool on an upstream service. It includes the name of the tool and its inputs in raw JSON format.
type ToolExecutionResult ¶
type ToolExecutionResult struct {
BaseMessage
Result json.RawMessage
Error error
}
ToolExecutionResult is a message published in response to a ToolExecutionRequest. It contains the result of the tool execution, in raw JSON format, or an error if the execution failed.
Directories
¶
| Path | Synopsis |
|---|---|
|
Package memory provides in-memory implementations of the bus interface.
|
Package memory provides in-memory implementations of the bus interface. |
|
Package nats provides a NATS-based message bus implementation.
|
Package nats provides a NATS-based message bus implementation. |
|
Package redis provides a Redis implementation of the bus.
|
Package redis provides a Redis implementation of the bus. |