bus

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2025 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Package bus defines the message bus interface and implementations.

Index

Constants

View Source
const (
	// ServiceRegistrationRequestTopic is the topic for service registration requests.
	ServiceRegistrationRequestTopic = "service_registration_requests"
	// ServiceRegistrationResultTopic is the topic for service registration results.
	ServiceRegistrationResultTopic = "service_registration_results"
	// ServiceListRequestTopic is the topic for service list requests.
	ServiceListRequestTopic = "service_list_requests"
	// ServiceListResultTopic is the topic for service list results.
	ServiceListResultTopic = "service_list_results"
	// ToolExecutionRequestTopic is the topic for tool execution requests.
	ToolExecutionRequestTopic = "tool_execution_requests"
	// ToolExecutionResultTopic is the topic for tool execution results.
	ToolExecutionResultTopic = "tool_execution_results"
)

Variables

View Source
var GetBusHook func(p *Provider, topic string) any

GetBusHook is a test hook for overriding the bus retrieval logic.

View Source
var NewProviderHook func(*bus.MessageBus) (*Provider, error)

NewProviderHook is a test hook for overriding the NewProvider logic.

Functions

This section is empty.

Types

type BaseMessage

type BaseMessage struct {
	CID string `json:"cid"`
}

BaseMessage provides a default implementation of the Message interface. It includes a correlation ID field (`CID`) and can be embedded in other message structs to provide a common mechanism for message tracking.

func (*BaseMessage) CorrelationID

func (m *BaseMessage) CorrelationID() string

CorrelationID returns the correlation ID of the message. This ID is used to associate requests with their corresponding responses in asynchronous workflows.

func (*BaseMessage) SetCorrelationID

func (m *BaseMessage) SetCorrelationID(id string)

SetCorrelationID sets the correlation ID for the message. This is typically called by the message publisher to assign a unique ID to a request.

type Bus

type Bus[T any] interface {
	// Publish sends a message to all subscribers of a given topic. The message
	// is sent to each subscriber's channel, and the handler is invoked by a
	// dedicated goroutine for that subscriber.
	//
	// ctx is the context for the publish operation.
	// topic is the topic to publish the message to.
	// msg is the message to be sent.
	Publish(ctx context.Context, topic string, msg T) error

	// Subscribe registers a handler function for a given topic. It starts a
	// dedicated goroutine for the subscription to process messages from a
	// channel.
	//
	// ctx is the context for the subscribe operation.
	// topic is the topic to subscribe to.
	// handler is the function to be called with the message.
	// It returns a function that can be called to unsubscribe the handler.
	Subscribe(ctx context.Context, topic string, handler func(T)) (unsubscribe func())

	// SubscribeOnce registers a handler function that will be invoked only once
	// for a given topic. After the handler is called, the subscription is
	// automatically removed.
	//
	// ctx is the context for the subscribe operation.
	// topic is the topic to subscribe to.
	// handler is the function to be called with the message.
	// It returns a function that can be called to unsubscribe the handler
	// before it has been invoked.
	SubscribeOnce(ctx context.Context, topic string, handler func(T)) (unsubscribe func())
}

Bus defines the interface for a generic, type-safe event bus that facilitates communication between different parts of the application. The type parameter T specifies the type of message that the bus will handle.

func GetBus

func GetBus[T any](p *Provider, topic string) Bus[T]

GetBus retrieves a bus for the given topic. If a bus for the given topic already exists, it is returned; otherwise, a new one is created and stored for future use.

The type parameter T specifies the message type for the bus, ensuring type safety for each topic.

type Message

type Message interface {
	// CorrelationID returns the unique identifier used to correlate messages.
	CorrelationID() string
	// SetCorrelationID sets the correlation identifier for the message.
	SetCorrelationID(id string)
}

Message defines the interface that all messages exchanged on the event bus must implement. It provides a standard way to manage correlation IDs for tracking requests and their corresponding responses.

type Provider

type Provider struct {
	// contains filtered or unexported fields
}

Provider is a thread-safe container for managing multiple, type-safe bus instances, with each bus being dedicated to a specific topic. It ensures that for any given topic, there is only one bus instance, creating one on demand if it doesn't already exist.

This allows different parts of the application to get a bus for a specific message type and topic without needing to manage the lifecycle of the bus instances themselves.

func NewProvider

func NewProvider(messageBus *bus.MessageBus) (*Provider, error)

NewProvider creates and returns a new Provider, which is used to manage multiple topic-based bus instances.

type ServiceListRequest

type ServiceListRequest struct {
	BaseMessage
}

ServiceListRequest is a message sent to the bus to request a list of all registered services.

type ServiceListResult

type ServiceListResult struct {
	BaseMessage
	Services []*configv1.UpstreamServiceConfig
	Error    error
}

ServiceListResult is a message published in response to a ServiceListRequest. It contains a list of all registered services.

type ServiceRegistrationRequest

type ServiceRegistrationRequest struct {
	BaseMessage
	Context context.Context
	Config  *configv1.UpstreamServiceConfig
}

ServiceRegistrationRequest is a message sent to the bus to request the registration of a new upstream service. It contains the service's configuration and the context for the request.

type ServiceRegistrationResult

type ServiceRegistrationResult struct {
	BaseMessage
	ServiceKey          string
	DiscoveredTools     []*configv1.ToolDefinition
	DiscoveredResources []*configv1.ResourceDefinition
	Error               error
}

ServiceRegistrationResult is a message published in response to a ServiceRegistrationRequest. It contains the outcome of the registration process, including the generated service key, a list of any tools that were discovered, or an error if the registration failed.

type ToolExecutionRequest

type ToolExecutionRequest struct {
	BaseMessage
	Context    context.Context
	ToolName   string
	ToolInputs json.RawMessage
}

ToolExecutionRequest is a message sent to the bus to request the execution of a specific tool on an upstream service. It includes the name of the tool and its inputs in raw JSON format.

type ToolExecutionResult

type ToolExecutionResult struct {
	BaseMessage
	Result json.RawMessage
	Error  error
}

ToolExecutionResult is a message published in response to a ToolExecutionRequest. It contains the result of the tool execution, in raw JSON format, or an error if the execution failed.

Directories

Path Synopsis
Package memory provides in-memory implementations of the bus interface.
Package memory provides in-memory implementations of the bus interface.
Package nats provides a NATS-based message bus implementation.
Package nats provides a NATS-based message bus implementation.
Package redis provides a Redis implementation of the bus.
Package redis provides a Redis implementation of the bus.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL