brokers

package
v0.8.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 31, 2025 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewMemoryBroker

func NewMemoryBroker(logger forge.Logger, metrics forge.Metrics) core.MessageBroker

NewMemoryBroker creates a new memory broker.

Types

type MemoryBroker

type MemoryBroker struct {
	// contains filtered or unexported fields
}

MemoryBroker implements MessageBroker interface using in-memory channels.

func (*MemoryBroker) Close

func (mb *MemoryBroker) Close(ctx context.Context) error

Close implements MessageBroker.

func (*MemoryBroker) Connect

func (mb *MemoryBroker) Connect(ctx context.Context, config any) error

Connect implements MessageBroker.

func (*MemoryBroker) GetStats

func (mb *MemoryBroker) GetStats() map[string]any

GetStats implements MessageBroker.

func (*MemoryBroker) HealthCheck

func (mb *MemoryBroker) HealthCheck(ctx context.Context) error

HealthCheck implements MessageBroker.

func (*MemoryBroker) Publish

func (mb *MemoryBroker) Publish(ctx context.Context, topic string, event core.Event) error

Publish implements MessageBroker.

func (*MemoryBroker) Subscribe

func (mb *MemoryBroker) Subscribe(ctx context.Context, topic string, handler core.EventHandler) error

Subscribe implements MessageBroker.

func (*MemoryBroker) Unsubscribe

func (mb *MemoryBroker) Unsubscribe(ctx context.Context, topic string, handlerName string) error

Unsubscribe implements MessageBroker.

type NATSBroker

type NATSBroker struct {
	// contains filtered or unexported fields
}

NATSBroker implements MessageBroker for NATS.

func NewNATSBroker

func NewNATSBroker(config map[string]any, logger forge.Logger, metrics forge.Metrics) (*NATSBroker, error)

NewNATSBroker creates a new NATS broker.

func (*NATSBroker) Close

func (nb *NATSBroker) Close(ctx context.Context) error

Close implements MessageBroker.

func (*NATSBroker) Connect

func (nb *NATSBroker) Connect(ctx context.Context, config any) error

Connect implements MessageBroker.

func (*NATSBroker) GetStats

func (nb *NATSBroker) GetStats() map[string]any

GetStats implements MessageBroker.

func (*NATSBroker) HealthCheck

func (nb *NATSBroker) HealthCheck(ctx context.Context) error

HealthCheck implements MessageBroker.

func (*NATSBroker) Publish

func (nb *NATSBroker) Publish(ctx context.Context, topic string, event core.Event) error

Publish implements MessageBroker.

func (*NATSBroker) Subscribe

func (nb *NATSBroker) Subscribe(ctx context.Context, topic string, handler core.EventHandler) error

Subscribe implements MessageBroker.

func (*NATSBroker) Unsubscribe

func (nb *NATSBroker) Unsubscribe(ctx context.Context, topic string, handlerName string) error

Unsubscribe implements MessageBroker.

type NATSBrokerStats

type NATSBrokerStats struct {
	Connected         bool       `json:"connected"`
	Subscriptions     int        `json:"subscriptions"`
	MessagesPublished int64      `json:"messages_published"`
	MessagesReceived  int64      `json:"messages_received"`
	PublishErrors     int64      `json:"publish_errors"`
	ReceiveErrors     int64      `json:"receive_errors"`
	ConnectionErrors  int64      `json:"connection_errors"`
	LastConnected     *time.Time `json:"last_connected"`
	LastError         *time.Time `json:"last_error"`
	TotalPublishTime  time.Duration
	AvgPublishTime    time.Duration
}

NATSBrokerStats contains NATS broker statistics.

type NATSConfig

type NATSConfig struct {
	URL               string        `json:"url"                yaml:"url"`
	Name              string        `json:"name"               yaml:"name"`
	Username          string        `json:"username"           yaml:"username"`
	Password          string        `json:"password"           yaml:"password"`
	Token             string        `json:"token"              yaml:"token"`
	MaxReconnects     int           `json:"max_reconnects"     yaml:"max_reconnects"`
	ReconnectWait     time.Duration `json:"reconnect_wait"     yaml:"reconnect_wait"`
	ConnectTimeout    time.Duration `json:"connect_timeout"    yaml:"connect_timeout"`
	PingInterval      time.Duration `json:"ping_interval"      yaml:"ping_interval"`
	MaxPingsOut       int           `json:"max_pings_out"      yaml:"max_pings_out"`
	EnableCompression bool          `json:"enable_compression" yaml:"enable_compression"`
}

NATSConfig defines configuration for NATS broker.

func DefaultNATSConfig

func DefaultNATSConfig() *NATSConfig

DefaultNATSConfig returns default NATS configuration.

type RedisBroker

type RedisBroker struct {
	// contains filtered or unexported fields
}

RedisBroker implements MessageBroker for Redis pub/sub.

func NewRedisBroker

func NewRedisBroker(config map[string]any, logger forge.Logger, metrics forge.Metrics) (*RedisBroker, error)

NewRedisBroker creates a new Redis broker.

func (*RedisBroker) Close

func (rb *RedisBroker) Close(ctx context.Context) error

Close implements MessageBroker.

func (*RedisBroker) Connect

func (rb *RedisBroker) Connect(ctx context.Context, config any) error

Connect implements MessageBroker.

func (*RedisBroker) GetStats

func (rb *RedisBroker) GetStats() map[string]any

GetStats implements MessageBroker.

func (*RedisBroker) HealthCheck

func (rb *RedisBroker) HealthCheck(ctx context.Context) error

HealthCheck implements MessageBroker.

func (*RedisBroker) Publish

func (rb *RedisBroker) Publish(ctx context.Context, topic string, event core.Event) error

Publish implements MessageBroker.

func (*RedisBroker) Subscribe

func (rb *RedisBroker) Subscribe(ctx context.Context, topic string, handler core.EventHandler) error

Subscribe implements MessageBroker.

func (*RedisBroker) Unsubscribe

func (rb *RedisBroker) Unsubscribe(ctx context.Context, topic string, handlerName string) error

Unsubscribe implements MessageBroker.

type RedisBrokerStats

type RedisBrokerStats struct {
	Connected         bool       `json:"connected"`
	Subscriptions     int        `json:"subscriptions"`
	MessagesPublished int64      `json:"messages_published"`
	MessagesReceived  int64      `json:"messages_received"`
	PublishErrors     int64      `json:"publish_errors"`
	ReceiveErrors     int64      `json:"receive_errors"`
	ConnectionErrors  int64      `json:"connection_errors"`
	LastConnected     *time.Time `json:"last_connected"`
	LastError         *time.Time `json:"last_error"`
	TotalPublishTime  time.Duration
	AvgPublishTime    time.Duration
	PoolStats         *RedisPoolStats `json:"pool_stats"`
}

RedisBrokerStats contains Redis broker statistics.

type RedisConfig

type RedisConfig struct {
	Addresses       []string      `json:"addresses"          yaml:"addresses"`
	Username        string        `json:"username"           yaml:"username"`
	Password        string        `json:"password"           yaml:"password"`
	Database        int           `json:"database"           yaml:"database"`
	MasterName      string        `json:"master_name"        yaml:"master_name"`
	PoolSize        int           `json:"pool_size"          yaml:"pool_size"`
	MinIdleConns    int           `json:"min_idle_conns"     yaml:"min_idle_conns"`
	MaxIdleConns    int           `json:"max_idle_conns"     yaml:"max_idle_conns"`
	ConnMaxIdleTime time.Duration `json:"conn_max_idle_time" yaml:"conn_max_idle_time"`
	ConnMaxLifetime time.Duration `json:"conn_max_lifetime"  yaml:"conn_max_lifetime"`
	DialTimeout     time.Duration `json:"dial_timeout"       yaml:"dial_timeout"`
	ReadTimeout     time.Duration `json:"read_timeout"       yaml:"read_timeout"`
	WriteTimeout    time.Duration `json:"write_timeout"      yaml:"write_timeout"`
	MaxRetries      int           `json:"max_retries"        yaml:"max_retries"`
	MinRetryBackoff time.Duration `json:"min_retry_backoff"  yaml:"min_retry_backoff"`
	MaxRetryBackoff time.Duration `json:"max_retry_backoff"  yaml:"max_retry_backoff"`
	ChannelSize     int           `json:"channel_size"       yaml:"channel_size"`
	EnableStreams   bool          `json:"enable_streams"     yaml:"enable_streams"`
	StreamMaxLen    int64         `json:"stream_max_len"     yaml:"stream_max_len"`
	ConsumerGroup   string        `json:"consumer_group"     yaml:"consumer_group"`
	ConsumerName    string        `json:"consumer_name"      yaml:"consumer_name"`
}

RedisConfig defines configuration for Redis broker.

func DefaultRedisConfig

func DefaultRedisConfig() *RedisConfig

DefaultRedisConfig returns default Redis configuration.

type RedisPoolStats

type RedisPoolStats struct {
	TotalConns int `json:"total_conns"`
	IdleConns  int `json:"idle_conns"`
	StaleConns int `json:"stale_conns"`
	Hits       int `json:"hits"`
	Misses     int `json:"misses"`
	Timeouts   int `json:"timeouts"`
}

RedisPoolStats contains Redis connection pool statistics.

type RedisSubscription

type RedisSubscription struct {
	// contains filtered or unexported fields
}

RedisSubscription wraps a Redis pub/sub subscription.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL