Documentation
¶
Index ¶
- func NewMemoryBroker(logger forge.Logger, metrics forge.Metrics) core.MessageBroker
- type MemoryBroker
- func (mb *MemoryBroker) Close(ctx context.Context) error
- func (mb *MemoryBroker) Connect(ctx context.Context, config any) error
- func (mb *MemoryBroker) GetStats() map[string]any
- func (mb *MemoryBroker) HealthCheck(ctx context.Context) error
- func (mb *MemoryBroker) Publish(ctx context.Context, topic string, event core.Event) error
- func (mb *MemoryBroker) Subscribe(ctx context.Context, topic string, handler core.EventHandler) error
- func (mb *MemoryBroker) Unsubscribe(ctx context.Context, topic string, handlerName string) error
- type NATSBroker
- func (nb *NATSBroker) Close(ctx context.Context) error
- func (nb *NATSBroker) Connect(ctx context.Context, config any) error
- func (nb *NATSBroker) GetStats() map[string]any
- func (nb *NATSBroker) HealthCheck(ctx context.Context) error
- func (nb *NATSBroker) Publish(ctx context.Context, topic string, event core.Event) error
- func (nb *NATSBroker) Subscribe(ctx context.Context, topic string, handler core.EventHandler) error
- func (nb *NATSBroker) Unsubscribe(ctx context.Context, topic string, handlerName string) error
- type NATSBrokerStats
- type NATSConfig
- type RedisBroker
- func (rb *RedisBroker) Close(ctx context.Context) error
- func (rb *RedisBroker) Connect(ctx context.Context, config any) error
- func (rb *RedisBroker) GetStats() map[string]any
- func (rb *RedisBroker) HealthCheck(ctx context.Context) error
- func (rb *RedisBroker) Publish(ctx context.Context, topic string, event core.Event) error
- func (rb *RedisBroker) Subscribe(ctx context.Context, topic string, handler core.EventHandler) error
- func (rb *RedisBroker) Unsubscribe(ctx context.Context, topic string, handlerName string) error
- type RedisBrokerStats
- type RedisConfig
- type RedisPoolStats
- type RedisSubscription
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewMemoryBroker ¶
NewMemoryBroker creates a new memory broker.
Types ¶
type MemoryBroker ¶
type MemoryBroker struct {
// contains filtered or unexported fields
}
MemoryBroker implements MessageBroker interface using in-memory channels.
func (*MemoryBroker) Close ¶
func (mb *MemoryBroker) Close(ctx context.Context) error
Close implements MessageBroker.
func (*MemoryBroker) Connect ¶
func (mb *MemoryBroker) Connect(ctx context.Context, config any) error
Connect implements MessageBroker.
func (*MemoryBroker) GetStats ¶
func (mb *MemoryBroker) GetStats() map[string]any
GetStats implements MessageBroker.
func (*MemoryBroker) HealthCheck ¶
func (mb *MemoryBroker) HealthCheck(ctx context.Context) error
HealthCheck implements MessageBroker.
func (*MemoryBroker) Subscribe ¶
func (mb *MemoryBroker) Subscribe(ctx context.Context, topic string, handler core.EventHandler) error
Subscribe implements MessageBroker.
func (*MemoryBroker) Unsubscribe ¶
Unsubscribe implements MessageBroker.
type NATSBroker ¶
type NATSBroker struct {
// contains filtered or unexported fields
}
NATSBroker implements MessageBroker for NATS.
func NewNATSBroker ¶
func NewNATSBroker(config map[string]any, logger forge.Logger, metrics forge.Metrics) (*NATSBroker, error)
NewNATSBroker creates a new NATS broker.
func (*NATSBroker) Close ¶
func (nb *NATSBroker) Close(ctx context.Context) error
Close implements MessageBroker.
func (*NATSBroker) Connect ¶
func (nb *NATSBroker) Connect(ctx context.Context, config any) error
Connect implements MessageBroker.
func (*NATSBroker) GetStats ¶
func (nb *NATSBroker) GetStats() map[string]any
GetStats implements MessageBroker.
func (*NATSBroker) HealthCheck ¶
func (nb *NATSBroker) HealthCheck(ctx context.Context) error
HealthCheck implements MessageBroker.
func (*NATSBroker) Subscribe ¶
func (nb *NATSBroker) Subscribe(ctx context.Context, topic string, handler core.EventHandler) error
Subscribe implements MessageBroker.
func (*NATSBroker) Unsubscribe ¶
Unsubscribe implements MessageBroker.
type NATSBrokerStats ¶
type NATSBrokerStats struct {
Connected bool `json:"connected"`
Subscriptions int `json:"subscriptions"`
MessagesPublished int64 `json:"messages_published"`
MessagesReceived int64 `json:"messages_received"`
PublishErrors int64 `json:"publish_errors"`
ReceiveErrors int64 `json:"receive_errors"`
ConnectionErrors int64 `json:"connection_errors"`
LastConnected *time.Time `json:"last_connected"`
LastError *time.Time `json:"last_error"`
TotalPublishTime time.Duration
AvgPublishTime time.Duration
}
NATSBrokerStats contains NATS broker statistics.
type NATSConfig ¶
type NATSConfig struct {
URL string `json:"url" yaml:"url"`
Name string `json:"name" yaml:"name"`
Username string `json:"username" yaml:"username"`
Password string `json:"password" yaml:"password"`
Token string `json:"token" yaml:"token"`
MaxReconnects int `json:"max_reconnects" yaml:"max_reconnects"`
ReconnectWait time.Duration `json:"reconnect_wait" yaml:"reconnect_wait"`
ConnectTimeout time.Duration `json:"connect_timeout" yaml:"connect_timeout"`
PingInterval time.Duration `json:"ping_interval" yaml:"ping_interval"`
MaxPingsOut int `json:"max_pings_out" yaml:"max_pings_out"`
EnableCompression bool `json:"enable_compression" yaml:"enable_compression"`
}
NATSConfig defines configuration for NATS broker.
func DefaultNATSConfig ¶
func DefaultNATSConfig() *NATSConfig
DefaultNATSConfig returns default NATS configuration.
type RedisBroker ¶
type RedisBroker struct {
// contains filtered or unexported fields
}
RedisBroker implements MessageBroker for Redis pub/sub.
func NewRedisBroker ¶
func NewRedisBroker(config map[string]any, logger forge.Logger, metrics forge.Metrics) (*RedisBroker, error)
NewRedisBroker creates a new Redis broker.
func (*RedisBroker) Close ¶
func (rb *RedisBroker) Close(ctx context.Context) error
Close implements MessageBroker.
func (*RedisBroker) Connect ¶
func (rb *RedisBroker) Connect(ctx context.Context, config any) error
Connect implements MessageBroker.
func (*RedisBroker) GetStats ¶
func (rb *RedisBroker) GetStats() map[string]any
GetStats implements MessageBroker.
func (*RedisBroker) HealthCheck ¶
func (rb *RedisBroker) HealthCheck(ctx context.Context) error
HealthCheck implements MessageBroker.
func (*RedisBroker) Subscribe ¶
func (rb *RedisBroker) Subscribe(ctx context.Context, topic string, handler core.EventHandler) error
Subscribe implements MessageBroker.
func (*RedisBroker) Unsubscribe ¶
Unsubscribe implements MessageBroker.
type RedisBrokerStats ¶
type RedisBrokerStats struct {
Connected bool `json:"connected"`
Subscriptions int `json:"subscriptions"`
MessagesPublished int64 `json:"messages_published"`
MessagesReceived int64 `json:"messages_received"`
PublishErrors int64 `json:"publish_errors"`
ReceiveErrors int64 `json:"receive_errors"`
ConnectionErrors int64 `json:"connection_errors"`
LastConnected *time.Time `json:"last_connected"`
LastError *time.Time `json:"last_error"`
TotalPublishTime time.Duration
AvgPublishTime time.Duration
PoolStats *RedisPoolStats `json:"pool_stats"`
}
RedisBrokerStats contains Redis broker statistics.
type RedisConfig ¶
type RedisConfig struct {
Addresses []string `json:"addresses" yaml:"addresses"`
Username string `json:"username" yaml:"username"`
Password string `json:"password" yaml:"password"`
Database int `json:"database" yaml:"database"`
MasterName string `json:"master_name" yaml:"master_name"`
PoolSize int `json:"pool_size" yaml:"pool_size"`
MinIdleConns int `json:"min_idle_conns" yaml:"min_idle_conns"`
MaxIdleConns int `json:"max_idle_conns" yaml:"max_idle_conns"`
ConnMaxIdleTime time.Duration `json:"conn_max_idle_time" yaml:"conn_max_idle_time"`
ConnMaxLifetime time.Duration `json:"conn_max_lifetime" yaml:"conn_max_lifetime"`
DialTimeout time.Duration `json:"dial_timeout" yaml:"dial_timeout"`
ReadTimeout time.Duration `json:"read_timeout" yaml:"read_timeout"`
WriteTimeout time.Duration `json:"write_timeout" yaml:"write_timeout"`
MaxRetries int `json:"max_retries" yaml:"max_retries"`
MinRetryBackoff time.Duration `json:"min_retry_backoff" yaml:"min_retry_backoff"`
MaxRetryBackoff time.Duration `json:"max_retry_backoff" yaml:"max_retry_backoff"`
ChannelSize int `json:"channel_size" yaml:"channel_size"`
EnableStreams bool `json:"enable_streams" yaml:"enable_streams"`
StreamMaxLen int64 `json:"stream_max_len" yaml:"stream_max_len"`
ConsumerGroup string `json:"consumer_group" yaml:"consumer_group"`
ConsumerName string `json:"consumer_name" yaml:"consumer_name"`
}
RedisConfig defines configuration for Redis broker.
func DefaultRedisConfig ¶
func DefaultRedisConfig() *RedisConfig
DefaultRedisConfig returns default Redis configuration.
type RedisPoolStats ¶
type RedisPoolStats struct {
TotalConns int `json:"total_conns"`
IdleConns int `json:"idle_conns"`
StaleConns int `json:"stale_conns"`
Hits int `json:"hits"`
Misses int `json:"misses"`
Timeouts int `json:"timeouts"`
}
RedisPoolStats contains Redis connection pool statistics.
type RedisSubscription ¶
type RedisSubscription struct {
// contains filtered or unexported fields
}
RedisSubscription wraps a Redis pub/sub subscription.