Documentation
¶
Overview ¶
Package internal provides the NATS JetStream broker module for cross-instance game event delivery.
Index ¶
- type Broker
- type NATSModule
- func (m *NATSModule) Publish(subject string, data []byte) error
- func (m *NATSModule) Start(_ context.Context) error
- func (m *NATSModule) Stop(_ context.Context) error
- func (m *NATSModule) Subscribe(subject string, handler func([]byte)) error
- func (m *NATSModule) SubscribeDurable(subject, consumerName string, handler func([]byte)) error
- func (m *NATSModule) SubscribeWithSubject(subject string, handler func(subject string, data []byte)) error
- type NATSModuleConfig
- type Plugin
- func (p *Plugin) Name() string
- func (p *Plugin) Publish(subject string, data []byte) error
- func (p *Plugin) PublishStep() *PublishStep
- func (p *Plugin) Start(ctx context.Context) error
- func (p *Plugin) Stop(ctx context.Context) error
- func (p *Plugin) Subscribe(subject string, handler func([]byte)) error
- func (p *Plugin) SubscribeDurable(subject, consumerName string, handler func([]byte)) error
- func (p *Plugin) SubscribeStep() *SubscribeStep
- func (p *Plugin) SubscribeWithSubject(subject string, handler func(subject string, data []byte)) error
- type PublishStep
- type Publisher
- type SubscribeStep
- type Subscriber
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Broker ¶
type Broker interface {
Publisher
Subscriber
Start(ctx context.Context) error
Stop(ctx context.Context) error
}
Broker combines Publisher and Subscriber.
type NATSModule ¶
type NATSModule struct {
// contains filtered or unexported fields
}
NATSModule implements Broker using NATS JetStream. Thread-safe; multiple goroutines may call Publish/Subscribe concurrently.
func NewNATSModule ¶
func NewNATSModule(name string, cfg map[string]any) *NATSModule
NewNATSModule constructs a NATSModule from config map. Recognised keys: url (string), stream (string), jetstream (bool).
func (*NATSModule) Publish ¶
func (m *NATSModule) Publish(subject string, data []byte) error
Publish sends data to the given NATS subject via JetStream.
func (*NATSModule) Start ¶
func (m *NATSModule) Start(_ context.Context) error
Start connects to NATS and (if jetstream=true) ensures the stream exists.
func (*NATSModule) Stop ¶
func (m *NATSModule) Stop(_ context.Context) error
Stop drains subscriptions and closes the NATS connection.
func (*NATSModule) Subscribe ¶
func (m *NATSModule) Subscribe(subject string, handler func([]byte)) error
Subscribe creates a non-durable core NATS subscription. The handler is called on each message in a goroutine managed by the NATS client.
func (*NATSModule) SubscribeDurable ¶
func (m *NATSModule) SubscribeDurable(subject, consumerName string, handler func([]byte)) error
SubscribeDurable creates a durable JetStream push-subscribe consumer. Late subscribers receive messages published before they connected (from sequence 1).
func (*NATSModule) SubscribeWithSubject ¶
func (m *NATSModule) SubscribeWithSubject(subject string, handler func(subject string, data []byte)) error
SubscribeWithSubject creates a non-durable core NATS subscription that delivers both the full subject and the message payload to the handler. This is needed for wildcard subscriptions where the subject suffix carries routing metadata (e.g. the connection ID in NodeRelay subjects).
type NATSModuleConfig ¶
type NATSModuleConfig struct {
// URL is the NATS server URL (default: nats://localhost:4222).
URL string `yaml:"url" mapstructure:"url"`
// Stream is the JetStream stream name (default: GAME_EVENTS).
Stream string `yaml:"stream" mapstructure:"stream"`
// JetStream enables JetStream persistence (default: false for plain pub/sub).
JetStream bool `yaml:"jetstream" mapstructure:"jetstream"`
}
NATSModuleConfig holds typed configuration for the NATS broker module.
func (NATSModuleConfig) Validate ¶
func (c NATSModuleConfig) Validate() error
Validate checks that required fields are present.
type Plugin ¶
type Plugin struct {
// contains filtered or unexported fields
}
Plugin wires together the NATSModule with the publish/subscribe steps. A single Plugin instance is created per workflow-server process.
func NewPlugin ¶
NewPlugin creates a broker plugin from config. Config keys: url (string), stream (string), jetstream (bool).
func (*Plugin) PublishStep ¶
func (p *Plugin) PublishStep() *PublishStep
PublishStep returns the step.broker_publish executor.
func (*Plugin) SubscribeDurable ¶
SubscribeDurable creates a durable JetStream subscription.
func (*Plugin) SubscribeStep ¶
func (p *Plugin) SubscribeStep() *SubscribeStep
SubscribeStep returns the step.broker_subscribe executor.
type PublishStep ¶
type PublishStep struct {
// contains filtered or unexported fields
}
PublishStep implements step.broker_publish. Config: topic (string), payload (string or map serialized to JSON).
func NewPublishStep ¶
func NewPublishStep(broker Publisher) *PublishStep
NewPublishStep creates a step.broker_publish executor.
type SubscribeStep ¶
type SubscribeStep struct {
// contains filtered or unexported fields
}
SubscribeStep implements step.broker_subscribe. It is a trigger-mode step: starts a durable JetStream subscription and invokes the handler callback for each inbound message.
func NewSubscribeStep ¶
func NewSubscribeStep(broker Subscriber) *SubscribeStep
NewSubscribeStep creates a step.broker_subscribe executor.
func (*SubscribeStep) Subscribe ¶
func (s *SubscribeStep) Subscribe( ctx context.Context, params map[string]any, handler func(data []byte), ) error
Subscribe sets up a durable subscription for the given topic and consumer name. Each message is delivered to handler as raw bytes. Params:
topic (string, required) — NATS subject filter (wildcards supported). consumer_name (string, required) — durable consumer name for JetStream replay.
type Subscriber ¶
type Subscriber interface {
Subscribe(subject string, handler func([]byte)) error
// SubscribeWithSubject is like Subscribe but delivers the full NATS subject
// alongside the payload. This is required for wildcard subscriptions where
// the subject suffix carries routing information (e.g. NodeRelay).
SubscribeWithSubject(subject string, handler func(subject string, data []byte)) error
SubscribeDurable(subject, consumerName string, handler func([]byte)) error
}
Subscriber subscribes to a NATS subject.