brokers

package
v0.1.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 28, 2025 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewMemoryBroker

func NewMemoryBroker(logger forge.Logger, metrics forge.Metrics) core.MessageBroker

NewMemoryBroker creates a new memory broker

Types

type MemoryBroker

type MemoryBroker struct {
	// contains filtered or unexported fields
}

MemoryBroker implements MessageBroker interface using in-memory channels

func (*MemoryBroker) Close

func (mb *MemoryBroker) Close(ctx context.Context) error

Close implements MessageBroker

func (*MemoryBroker) Connect

func (mb *MemoryBroker) Connect(ctx context.Context, config interface{}) error

Connect implements MessageBroker

func (*MemoryBroker) GetStats

func (mb *MemoryBroker) GetStats() map[string]interface{}

GetStats implements MessageBroker

func (*MemoryBroker) HealthCheck

func (mb *MemoryBroker) HealthCheck(ctx context.Context) error

HealthCheck implements MessageBroker

func (*MemoryBroker) Publish

func (mb *MemoryBroker) Publish(ctx context.Context, topic string, event core.Event) error

Publish implements MessageBroker

func (*MemoryBroker) Subscribe

func (mb *MemoryBroker) Subscribe(ctx context.Context, topic string, handler core.EventHandler) error

Subscribe implements MessageBroker

func (*MemoryBroker) Unsubscribe

func (mb *MemoryBroker) Unsubscribe(ctx context.Context, topic string, handlerName string) error

Unsubscribe implements MessageBroker

type NATSBroker

type NATSBroker struct {
	// contains filtered or unexported fields
}

NATSBroker implements MessageBroker for NATS

func NewNATSBroker

func NewNATSBroker(config map[string]interface{}, logger forge.Logger, metrics forge.Metrics) (*NATSBroker, error)

NewNATSBroker creates a new NATS broker

func (*NATSBroker) Close

func (nb *NATSBroker) Close(ctx context.Context) error

Close implements MessageBroker

func (*NATSBroker) Connect

func (nb *NATSBroker) Connect(ctx context.Context, config interface{}) error

Connect implements MessageBroker

func (*NATSBroker) GetStats

func (nb *NATSBroker) GetStats() map[string]interface{}

GetStats implements MessageBroker

func (*NATSBroker) HealthCheck

func (nb *NATSBroker) HealthCheck(ctx context.Context) error

HealthCheck implements MessageBroker

func (*NATSBroker) Publish

func (nb *NATSBroker) Publish(ctx context.Context, topic string, event core.Event) error

Publish implements MessageBroker

func (*NATSBroker) Subscribe

func (nb *NATSBroker) Subscribe(ctx context.Context, topic string, handler core.EventHandler) error

Subscribe implements MessageBroker

func (*NATSBroker) Unsubscribe

func (nb *NATSBroker) Unsubscribe(ctx context.Context, topic string, handlerName string) error

Unsubscribe implements MessageBroker

type NATSBrokerStats

type NATSBrokerStats struct {
	Connected         bool       `json:"connected"`
	Subscriptions     int        `json:"subscriptions"`
	MessagesPublished int64      `json:"messages_published"`
	MessagesReceived  int64      `json:"messages_received"`
	PublishErrors     int64      `json:"publish_errors"`
	ReceiveErrors     int64      `json:"receive_errors"`
	ConnectionErrors  int64      `json:"connection_errors"`
	LastConnected     *time.Time `json:"last_connected"`
	LastError         *time.Time `json:"last_error"`
	TotalPublishTime  time.Duration
	AvgPublishTime    time.Duration
}

NATSBrokerStats contains NATS broker statistics

type NATSConfig

type NATSConfig struct {
	URL               string        `yaml:"url" json:"url"`
	Name              string        `yaml:"name" json:"name"`
	Username          string        `yaml:"username" json:"username"`
	Password          string        `yaml:"password" json:"password"`
	Token             string        `yaml:"token" json:"token"`
	MaxReconnects     int           `yaml:"max_reconnects" json:"max_reconnects"`
	ReconnectWait     time.Duration `yaml:"reconnect_wait" json:"reconnect_wait"`
	ConnectTimeout    time.Duration `yaml:"connect_timeout" json:"connect_timeout"`
	PingInterval      time.Duration `yaml:"ping_interval" json:"ping_interval"`
	MaxPingsOut       int           `yaml:"max_pings_out" json:"max_pings_out"`
	EnableCompression bool          `yaml:"enable_compression" json:"enable_compression"`
}

NATSConfig defines configuration for NATS broker

func DefaultNATSConfig

func DefaultNATSConfig() *NATSConfig

DefaultNATSConfig returns default NATS configuration

type RedisBroker

type RedisBroker struct {
	// contains filtered or unexported fields
}

RedisBroker implements MessageBroker for Redis pub/sub

func NewRedisBroker

func NewRedisBroker(config map[string]interface{}, logger forge.Logger, metrics forge.Metrics) (*RedisBroker, error)

NewRedisBroker creates a new Redis broker

func (*RedisBroker) Close

func (rb *RedisBroker) Close(ctx context.Context) error

Close implements MessageBroker

func (*RedisBroker) Connect

func (rb *RedisBroker) Connect(ctx context.Context, config interface{}) error

Connect implements MessageBroker

func (*RedisBroker) GetStats

func (rb *RedisBroker) GetStats() map[string]interface{}

GetStats implements MessageBroker

func (*RedisBroker) HealthCheck

func (rb *RedisBroker) HealthCheck(ctx context.Context) error

HealthCheck implements MessageBroker

func (*RedisBroker) Publish

func (rb *RedisBroker) Publish(ctx context.Context, topic string, event core.Event) error

Publish implements MessageBroker

func (*RedisBroker) Subscribe

func (rb *RedisBroker) Subscribe(ctx context.Context, topic string, handler core.EventHandler) error

Subscribe implements MessageBroker

func (*RedisBroker) Unsubscribe

func (rb *RedisBroker) Unsubscribe(ctx context.Context, topic string, handlerName string) error

Unsubscribe implements MessageBroker

type RedisBrokerStats

type RedisBrokerStats struct {
	Connected         bool       `json:"connected"`
	Subscriptions     int        `json:"subscriptions"`
	MessagesPublished int64      `json:"messages_published"`
	MessagesReceived  int64      `json:"messages_received"`
	PublishErrors     int64      `json:"publish_errors"`
	ReceiveErrors     int64      `json:"receive_errors"`
	ConnectionErrors  int64      `json:"connection_errors"`
	LastConnected     *time.Time `json:"last_connected"`
	LastError         *time.Time `json:"last_error"`
	TotalPublishTime  time.Duration
	AvgPublishTime    time.Duration
	PoolStats         *RedisPoolStats `json:"pool_stats"`
}

RedisBrokerStats contains Redis broker statistics

type RedisConfig

type RedisConfig struct {
	Addresses       []string      `yaml:"addresses" json:"addresses"`
	Username        string        `yaml:"username" json:"username"`
	Password        string        `yaml:"password" json:"password"`
	Database        int           `yaml:"database" json:"database"`
	MasterName      string        `yaml:"master_name" json:"master_name"`
	PoolSize        int           `yaml:"pool_size" json:"pool_size"`
	MinIdleConns    int           `yaml:"min_idle_conns" json:"min_idle_conns"`
	MaxIdleConns    int           `yaml:"max_idle_conns" json:"max_idle_conns"`
	ConnMaxIdleTime time.Duration `yaml:"conn_max_idle_time" json:"conn_max_idle_time"`
	ConnMaxLifetime time.Duration `yaml:"conn_max_lifetime" json:"conn_max_lifetime"`
	DialTimeout     time.Duration `yaml:"dial_timeout" json:"dial_timeout"`
	ReadTimeout     time.Duration `yaml:"read_timeout" json:"read_timeout"`
	WriteTimeout    time.Duration `yaml:"write_timeout" json:"write_timeout"`
	MaxRetries      int           `yaml:"max_retries" json:"max_retries"`
	MinRetryBackoff time.Duration `yaml:"min_retry_backoff" json:"min_retry_backoff"`
	MaxRetryBackoff time.Duration `yaml:"max_retry_backoff" json:"max_retry_backoff"`
	ChannelSize     int           `yaml:"channel_size" json:"channel_size"`
	EnableStreams   bool          `yaml:"enable_streams" json:"enable_streams"`
	StreamMaxLen    int64         `yaml:"stream_max_len" json:"stream_max_len"`
	ConsumerGroup   string        `yaml:"consumer_group" json:"consumer_group"`
	ConsumerName    string        `yaml:"consumer_name" json:"consumer_name"`
}

RedisConfig defines configuration for Redis broker

func DefaultRedisConfig

func DefaultRedisConfig() *RedisConfig

DefaultRedisConfig returns default Redis configuration

type RedisPoolStats

type RedisPoolStats struct {
	TotalConns int `json:"total_conns"`
	IdleConns  int `json:"idle_conns"`
	StaleConns int `json:"stale_conns"`
	Hits       int `json:"hits"`
	Misses     int `json:"misses"`
	Timeouts   int `json:"timeouts"`
}

RedisPoolStats contains Redis connection pool statistics

type RedisSubscription

type RedisSubscription struct {
	// contains filtered or unexported fields
}

RedisSubscription wraps a Redis pub/sub subscription

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL