Documentation
¶
Index ¶
- func NewMemoryBroker(logger forge.Logger, metrics forge.Metrics) core.MessageBroker
- type MemoryBroker
- func (mb *MemoryBroker) Close(ctx context.Context) error
- func (mb *MemoryBroker) Connect(ctx context.Context, config interface{}) error
- func (mb *MemoryBroker) GetStats() map[string]interface{}
- func (mb *MemoryBroker) HealthCheck(ctx context.Context) error
- func (mb *MemoryBroker) Publish(ctx context.Context, topic string, event core.Event) error
- func (mb *MemoryBroker) Subscribe(ctx context.Context, topic string, handler core.EventHandler) error
- func (mb *MemoryBroker) Unsubscribe(ctx context.Context, topic string, handlerName string) error
- type NATSBroker
- func (nb *NATSBroker) Close(ctx context.Context) error
- func (nb *NATSBroker) Connect(ctx context.Context, config interface{}) error
- func (nb *NATSBroker) GetStats() map[string]interface{}
- func (nb *NATSBroker) HealthCheck(ctx context.Context) error
- func (nb *NATSBroker) Publish(ctx context.Context, topic string, event core.Event) error
- func (nb *NATSBroker) Subscribe(ctx context.Context, topic string, handler core.EventHandler) error
- func (nb *NATSBroker) Unsubscribe(ctx context.Context, topic string, handlerName string) error
- type NATSBrokerStats
- type NATSConfig
- type RedisBroker
- func (rb *RedisBroker) Close(ctx context.Context) error
- func (rb *RedisBroker) Connect(ctx context.Context, config interface{}) error
- func (rb *RedisBroker) GetStats() map[string]interface{}
- func (rb *RedisBroker) HealthCheck(ctx context.Context) error
- func (rb *RedisBroker) Publish(ctx context.Context, topic string, event core.Event) error
- func (rb *RedisBroker) Subscribe(ctx context.Context, topic string, handler core.EventHandler) error
- func (rb *RedisBroker) Unsubscribe(ctx context.Context, topic string, handlerName string) error
- type RedisBrokerStats
- type RedisConfig
- type RedisPoolStats
- type RedisSubscription
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewMemoryBroker ¶
NewMemoryBroker creates a new memory broker
Types ¶
type MemoryBroker ¶
type MemoryBroker struct {
// contains filtered or unexported fields
}
MemoryBroker implements MessageBroker interface using in-memory channels
func (*MemoryBroker) Close ¶
func (mb *MemoryBroker) Close(ctx context.Context) error
Close implements MessageBroker
func (*MemoryBroker) Connect ¶
func (mb *MemoryBroker) Connect(ctx context.Context, config interface{}) error
Connect implements MessageBroker
func (*MemoryBroker) GetStats ¶
func (mb *MemoryBroker) GetStats() map[string]interface{}
GetStats implements MessageBroker
func (*MemoryBroker) HealthCheck ¶
func (mb *MemoryBroker) HealthCheck(ctx context.Context) error
HealthCheck implements MessageBroker
func (*MemoryBroker) Subscribe ¶
func (mb *MemoryBroker) Subscribe(ctx context.Context, topic string, handler core.EventHandler) error
Subscribe implements MessageBroker
func (*MemoryBroker) Unsubscribe ¶
Unsubscribe implements MessageBroker
type NATSBroker ¶
type NATSBroker struct {
// contains filtered or unexported fields
}
NATSBroker implements MessageBroker for NATS
func NewNATSBroker ¶
func NewNATSBroker(config map[string]interface{}, logger forge.Logger, metrics forge.Metrics) (*NATSBroker, error)
NewNATSBroker creates a new NATS broker
func (*NATSBroker) Close ¶
func (nb *NATSBroker) Close(ctx context.Context) error
Close implements MessageBroker
func (*NATSBroker) Connect ¶
func (nb *NATSBroker) Connect(ctx context.Context, config interface{}) error
Connect implements MessageBroker
func (*NATSBroker) GetStats ¶
func (nb *NATSBroker) GetStats() map[string]interface{}
GetStats implements MessageBroker
func (*NATSBroker) HealthCheck ¶
func (nb *NATSBroker) HealthCheck(ctx context.Context) error
HealthCheck implements MessageBroker
func (*NATSBroker) Subscribe ¶
func (nb *NATSBroker) Subscribe(ctx context.Context, topic string, handler core.EventHandler) error
Subscribe implements MessageBroker
func (*NATSBroker) Unsubscribe ¶
Unsubscribe implements MessageBroker
type NATSBrokerStats ¶
type NATSBrokerStats struct {
Connected bool `json:"connected"`
Subscriptions int `json:"subscriptions"`
MessagesPublished int64 `json:"messages_published"`
MessagesReceived int64 `json:"messages_received"`
PublishErrors int64 `json:"publish_errors"`
ReceiveErrors int64 `json:"receive_errors"`
ConnectionErrors int64 `json:"connection_errors"`
LastConnected *time.Time `json:"last_connected"`
LastError *time.Time `json:"last_error"`
TotalPublishTime time.Duration
AvgPublishTime time.Duration
}
NATSBrokerStats contains NATS broker statistics
type NATSConfig ¶
type NATSConfig struct {
URL string `yaml:"url" json:"url"`
Name string `yaml:"name" json:"name"`
Username string `yaml:"username" json:"username"`
Password string `yaml:"password" json:"password"`
Token string `yaml:"token" json:"token"`
MaxReconnects int `yaml:"max_reconnects" json:"max_reconnects"`
ReconnectWait time.Duration `yaml:"reconnect_wait" json:"reconnect_wait"`
ConnectTimeout time.Duration `yaml:"connect_timeout" json:"connect_timeout"`
PingInterval time.Duration `yaml:"ping_interval" json:"ping_interval"`
MaxPingsOut int `yaml:"max_pings_out" json:"max_pings_out"`
EnableCompression bool `yaml:"enable_compression" json:"enable_compression"`
}
NATSConfig defines configuration for NATS broker
func DefaultNATSConfig ¶
func DefaultNATSConfig() *NATSConfig
DefaultNATSConfig returns default NATS configuration
type RedisBroker ¶
type RedisBroker struct {
// contains filtered or unexported fields
}
RedisBroker implements MessageBroker for Redis pub/sub
func NewRedisBroker ¶
func NewRedisBroker(config map[string]interface{}, logger forge.Logger, metrics forge.Metrics) (*RedisBroker, error)
NewRedisBroker creates a new Redis broker
func (*RedisBroker) Close ¶
func (rb *RedisBroker) Close(ctx context.Context) error
Close implements MessageBroker
func (*RedisBroker) Connect ¶
func (rb *RedisBroker) Connect(ctx context.Context, config interface{}) error
Connect implements MessageBroker
func (*RedisBroker) GetStats ¶
func (rb *RedisBroker) GetStats() map[string]interface{}
GetStats implements MessageBroker
func (*RedisBroker) HealthCheck ¶
func (rb *RedisBroker) HealthCheck(ctx context.Context) error
HealthCheck implements MessageBroker
func (*RedisBroker) Subscribe ¶
func (rb *RedisBroker) Subscribe(ctx context.Context, topic string, handler core.EventHandler) error
Subscribe implements MessageBroker
func (*RedisBroker) Unsubscribe ¶
Unsubscribe implements MessageBroker
type RedisBrokerStats ¶
type RedisBrokerStats struct {
Connected bool `json:"connected"`
Subscriptions int `json:"subscriptions"`
MessagesPublished int64 `json:"messages_published"`
MessagesReceived int64 `json:"messages_received"`
PublishErrors int64 `json:"publish_errors"`
ReceiveErrors int64 `json:"receive_errors"`
ConnectionErrors int64 `json:"connection_errors"`
LastConnected *time.Time `json:"last_connected"`
LastError *time.Time `json:"last_error"`
TotalPublishTime time.Duration
AvgPublishTime time.Duration
PoolStats *RedisPoolStats `json:"pool_stats"`
}
RedisBrokerStats contains Redis broker statistics
type RedisConfig ¶
type RedisConfig struct {
Addresses []string `yaml:"addresses" json:"addresses"`
Username string `yaml:"username" json:"username"`
Password string `yaml:"password" json:"password"`
Database int `yaml:"database" json:"database"`
MasterName string `yaml:"master_name" json:"master_name"`
PoolSize int `yaml:"pool_size" json:"pool_size"`
MinIdleConns int `yaml:"min_idle_conns" json:"min_idle_conns"`
MaxIdleConns int `yaml:"max_idle_conns" json:"max_idle_conns"`
ConnMaxIdleTime time.Duration `yaml:"conn_max_idle_time" json:"conn_max_idle_time"`
ConnMaxLifetime time.Duration `yaml:"conn_max_lifetime" json:"conn_max_lifetime"`
DialTimeout time.Duration `yaml:"dial_timeout" json:"dial_timeout"`
ReadTimeout time.Duration `yaml:"read_timeout" json:"read_timeout"`
WriteTimeout time.Duration `yaml:"write_timeout" json:"write_timeout"`
MaxRetries int `yaml:"max_retries" json:"max_retries"`
MinRetryBackoff time.Duration `yaml:"min_retry_backoff" json:"min_retry_backoff"`
MaxRetryBackoff time.Duration `yaml:"max_retry_backoff" json:"max_retry_backoff"`
ChannelSize int `yaml:"channel_size" json:"channel_size"`
EnableStreams bool `yaml:"enable_streams" json:"enable_streams"`
StreamMaxLen int64 `yaml:"stream_max_len" json:"stream_max_len"`
ConsumerGroup string `yaml:"consumer_group" json:"consumer_group"`
ConsumerName string `yaml:"consumer_name" json:"consumer_name"`
}
RedisConfig defines configuration for Redis broker
func DefaultRedisConfig ¶
func DefaultRedisConfig() *RedisConfig
DefaultRedisConfig returns default Redis configuration
type RedisPoolStats ¶
type RedisPoolStats struct {
TotalConns int `json:"total_conns"`
IdleConns int `json:"idle_conns"`
StaleConns int `json:"stale_conns"`
Hits int `json:"hits"`
Misses int `json:"misses"`
Timeouts int `json:"timeouts"`
}
RedisPoolStats contains Redis connection pool statistics
type RedisSubscription ¶
type RedisSubscription struct {
// contains filtered or unexported fields
}
RedisSubscription wraps a Redis pub/sub subscription