Versions in this module Expand all Collapse all v2 v2.0.0 Feb 20, 2026 Changes in this version + const DefaultKinesisPollInterval + const EventTypeBusStarted + const EventTypeBusStopped + const EventTypeConfigLoaded + const EventTypeMessageFailed + const EventTypeMessagePublished + const EventTypeMessageReceived + const EventTypeSubscriptionCreated + const EventTypeSubscriptionRemoved + const EventTypeTopicCreated + const EventTypeTopicDeleted + const ModuleName + const ServiceName + var ErrDuplicateEngineName = errors.New("duplicate engine name") + var ErrEngineNotFound = errors.New("engine not found") + var ErrEventBusNotStarted = errors.New("event bus not started") + var ErrEventBusShutdownTimeout = errors.New("event bus shutdown timed out") + var ErrEventHandlerNil = errors.New("event handler cannot be nil") + var ErrInvalidPollInterval = errors.New("pollInterval must be positive") + var ErrInvalidShardCount = errors.New("invalid shard count") + var ErrInvalidSubscriptionType = errors.New("invalid subscription type") + var ErrNATSConnectionNotEstablished = errors.New("NATS connection is not established") + var ErrNoSubjectForEventEmission = errors.New("no subject available for event emission") + var ErrSubscriptionNotFound = errors.New("subscription not found in any engine") + var ErrUnknownEngineRef = errors.New("routing rule references unknown engine") + var ErrUnknownEngineType = errors.New("unknown engine type") + func GetRegisteredEngines() []string + func NewModule() modular.Module + func PartitionKeyFromContext(ctx context.Context) (string, bool) + func RegisterEngine(engineType string, factory EngineFactory) + func WithPartitionKey(ctx context.Context, key string) context.Context + type CustomMemoryConfig struct + DefaultEventBufferSize int + EnableMetrics bool + EventFilters []map[string]interface{} + MaxEventQueueSize int + MetricsInterval time.Duration + type CustomMemoryEventBus struct + func (c *CustomMemoryEventBus) GetMetrics() *EventMetrics + func (c *CustomMemoryEventBus) Publish(ctx context.Context, event Event) error + func (c *CustomMemoryEventBus) Start(ctx context.Context) error + func (c *CustomMemoryEventBus) Stop(ctx context.Context) error + func (c *CustomMemoryEventBus) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error) + func (c *CustomMemoryEventBus) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error) + func (c *CustomMemoryEventBus) SubscriberCount(topic string) int + func (c *CustomMemoryEventBus) Topics() []string + func (c *CustomMemoryEventBus) Unsubscribe(ctx context.Context, subscription Subscription) error + type DatadogStatsdExporter struct + func NewDatadogStatsdExporter(eventBus *EventBusModule, prefix, addr string, interval time.Duration, ...) (*DatadogStatsdExporter, error) + func (e *DatadogStatsdExporter) Close() error + func (e *DatadogStatsdExporter) Run(ctx context.Context) + type DeliveryStats struct + Delivered uint64 + Dropped uint64 + type EngineConfig struct + Config map[string]interface{} + Name string + Type string + type EngineFactory func(config map[string]interface{}) (EventBus, error) + type EngineRouter struct + func NewEngineRouter(config *EventBusConfig) (*EngineRouter, error) + func (r *EngineRouter) CollectPerEngineStats() map[string]DeliveryStats + func (r *EngineRouter) CollectStats() (delivered uint64, dropped uint64) + func (r *EngineRouter) GetEngineForTopic(topic string) string + func (r *EngineRouter) GetEngineNames() []string + func (r *EngineRouter) Publish(ctx context.Context, event Event) error + func (r *EngineRouter) SetModuleReference(module *EventBusModule) + func (r *EngineRouter) Start(ctx context.Context) error + func (r *EngineRouter) Stop(ctx context.Context) error + func (r *EngineRouter) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error) + func (r *EngineRouter) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error) + func (r *EngineRouter) SubscriberCount(topic string) int + func (r *EngineRouter) Topics() []string + func (r *EngineRouter) Unsubscribe(ctx context.Context, subscription Subscription) error + type Event = cloudevents.Event + type EventBus interface + Publish func(ctx context.Context, event Event) error + Start func(ctx context.Context) error + Stop func(ctx context.Context) error + Subscribe func(ctx context.Context, topic string, handler EventHandler) (Subscription, error) + SubscribeAsync func(ctx context.Context, topic string, handler EventHandler) (Subscription, error) + SubscriberCount func(topic string) int + Topics func() []string + Unsubscribe func(ctx context.Context, subscription Subscription) error + func NewCustomMemoryEventBus(config map[string]interface{}) (EventBus, error) + func NewKafkaEventBus(config map[string]interface{}) (EventBus, error) + func NewKinesisEventBus(config map[string]interface{}) (EventBus, error) + func NewNatsEventBus(config map[string]interface{}) (EventBus, error) + func NewRedisEventBus(config map[string]interface{}) (EventBus, error) + type EventBusConfig struct + DefaultEventBufferSize int + DeliveryMode string + Engine string + Engines []EngineConfig + EventTTL time.Duration + ExternalBrokerPassword string + ExternalBrokerURL string + ExternalBrokerUser string + MaxEventQueueSize int + PublishBlockTimeout time.Duration + RetentionDays int + RotateSubscriberOrder bool + Routing []RoutingRule + Source string + WorkerCount int + func (c *EventBusConfig) GetDefaultEngine() string + func (c *EventBusConfig) IsMultiEngine() bool + func (c *EventBusConfig) ValidateConfig() error + type EventBusModule struct + func (m *EventBusModule) Constructor() modular.ModuleConstructor + func (m *EventBusModule) Dependencies() []string + func (m *EventBusModule) EmitEvent(ctx context.Context, event cloudevents.Event) error + func (m *EventBusModule) GetRegisteredEventTypes() []string + func (m *EventBusModule) GetRouter() *EngineRouter + func (m *EventBusModule) Init(app modular.Application) error + func (m *EventBusModule) Name() string + func (m *EventBusModule) PerEngineStats() map[string]DeliveryStats + func (m *EventBusModule) ProvidesServices() []modular.ServiceProvider + func (m *EventBusModule) Publish(ctx context.Context, topic string, payload interface{}) error + func (m *EventBusModule) RegisterConfig(app modular.Application) error + func (m *EventBusModule) RegisterObservers(subject modular.Subject) error + func (m *EventBusModule) RequiresServices() []modular.ServiceDependency + func (m *EventBusModule) Start(ctx context.Context) error + func (m *EventBusModule) Stats() (delivered uint64, dropped uint64) + func (m *EventBusModule) Stop(ctx context.Context) error + func (m *EventBusModule) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error) + func (m *EventBusModule) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error) + func (m *EventBusModule) SubscriberCount(topic string) int + func (m *EventBusModule) Topics() []string + func (m *EventBusModule) Unsubscribe(ctx context.Context, subscription Subscription) error + type EventFilter interface + Name func() string + ShouldProcess func(event Event) bool + type EventHandler func(ctx context.Context, event Event) error + type EventMetrics struct + AverageProcessingTime time.Duration + EventsPerTopic map[string]int64 + LastResetTime time.Time + TotalEvents int64 + type KafkaConfig struct + Brokers []string + ConsumerConfig map[string]string + GroupID string + ProducerConfig map[string]string + SecurityConfig map[string]string + type KafkaConsumerGroupHandler struct + func (h *KafkaConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error + func (h *KafkaConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error + func (h *KafkaConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error + type KafkaEventBus struct + func (k *KafkaEventBus) Publish(ctx context.Context, event Event) error + func (k *KafkaEventBus) Start(ctx context.Context) error + func (k *KafkaEventBus) Stop(ctx context.Context) error + func (k *KafkaEventBus) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error) + func (k *KafkaEventBus) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error) + func (k *KafkaEventBus) SubscriberCount(topic string) int + func (k *KafkaEventBus) Topics() []string + func (k *KafkaEventBus) Unsubscribe(ctx context.Context, subscription Subscription) error + type KinesisClient interface + CreateStream func(ctx context.Context, params *kinesis.CreateStreamInput, ...) (*kinesis.CreateStreamOutput, error) + DescribeStream func(ctx context.Context, params *kinesis.DescribeStreamInput, ...) (*kinesis.DescribeStreamOutput, error) + GetRecords func(ctx context.Context, params *kinesis.GetRecordsInput, ...) (*kinesis.GetRecordsOutput, error) + GetShardIterator func(ctx context.Context, params *kinesis.GetShardIteratorInput, ...) (*kinesis.GetShardIteratorOutput, error) + PutRecord func(ctx context.Context, params *kinesis.PutRecordInput, ...) (*kinesis.PutRecordOutput, error) + type KinesisConfig struct + AccessKeyID string + PollInterval time.Duration + Region string + SecretAccessKey string + SessionToken string + ShardCount int32 + StreamName string + type KinesisEventBus struct + func (k *KinesisEventBus) Publish(ctx context.Context, event Event) error + func (k *KinesisEventBus) Start(ctx context.Context) error + func (k *KinesisEventBus) Stop(ctx context.Context) error + func (k *KinesisEventBus) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error) + func (k *KinesisEventBus) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error) + func (k *KinesisEventBus) SubscriberCount(topic string) int + func (k *KinesisEventBus) Topics() []string + func (k *KinesisEventBus) Unsubscribe(ctx context.Context, subscription Subscription) error + type MemoryEventBus struct + func NewMemoryEventBus(config *EventBusConfig) *MemoryEventBus + func (m *MemoryEventBus) Publish(ctx context.Context, event Event) error + func (m *MemoryEventBus) SetModule(module *EventBusModule) + func (m *MemoryEventBus) Start(ctx context.Context) error + func (m *MemoryEventBus) Stats() (delivered uint64, dropped uint64) + func (m *MemoryEventBus) Stop(ctx context.Context) error + func (m *MemoryEventBus) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error) + func (m *MemoryEventBus) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error) + func (m *MemoryEventBus) SubscriberCount(topic string) int + func (m *MemoryEventBus) Topics() []string + func (m *MemoryEventBus) Unsubscribe(ctx context.Context, subscription Subscription) error + type NatsConfig struct + AllowReconnect bool + ConnectionName string + MaxPingsOut int + MaxReconnects int + Password string + PingInterval int + ReconnectWait int + SubscribeTimeout int + Token string + URL string + Username string + type NatsEventBus struct + func (n *NatsEventBus) Publish(ctx context.Context, event Event) error + func (n *NatsEventBus) Start(ctx context.Context) error + func (n *NatsEventBus) Stop(ctx context.Context) error + func (n *NatsEventBus) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error) + func (n *NatsEventBus) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error) + func (n *NatsEventBus) SubscriberCount(topic string) int + func (n *NatsEventBus) Topics() []string + func (n *NatsEventBus) Unsubscribe(ctx context.Context, subscription Subscription) error + type PrometheusCollector struct + func NewPrometheusCollector(eventBus *EventBusModule, namespace string) *PrometheusCollector + func (c *PrometheusCollector) Collect(ch chan<- prometheus.Metric) + func (c *PrometheusCollector) Describe(ch chan<- *prometheus.Desc) + type RedisConfig struct + DB int + Password string + PoolSize int + URL string + Username string + type RedisEventBus struct + func (r *RedisEventBus) Publish(ctx context.Context, event Event) error + func (r *RedisEventBus) Start(ctx context.Context) error + func (r *RedisEventBus) Stop(ctx context.Context) error + func (r *RedisEventBus) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error) + func (r *RedisEventBus) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error) + func (r *RedisEventBus) SubscriberCount(topic string) int + func (r *RedisEventBus) Topics() []string + func (r *RedisEventBus) Unsubscribe(ctx context.Context, subscription Subscription) error + type RoutingRule struct + Engine string + Topics []string + type Subscription interface + Cancel func() error + ID func() string + IsAsync func() bool + Topic func() string + type TopicPrefixFilter struct + AllowedPrefixes []string + func (f *TopicPrefixFilter) Name() string + func (f *TopicPrefixFilter) ShouldProcess(event Event) bool Other modules containing this package github.com/CrisisTextLine/modular/modules/eventbus