internal

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2026 License: MIT Imports: 5 Imported by: 0

Documentation

Overview

Package internal provides the NATS JetStream broker module for cross-instance game event delivery.

Index

Constants

This section is empty.

Variables

View Source
var Version = "0.0.0"

Version is set at build time via -ldflags "-X github.com/GoCodeAlone/workflow-plugin-broker/internal.Version=X.Y.Z". Default is a bare semver so plugin loaders that validate semver accept unreleased dev builds; goreleaser overrides with the real release tag.

Functions

This section is empty.

Types

type Broker

type Broker interface {
	Publisher
	Subscriber
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
}

Broker combines Publisher and Subscriber.

type NATSModule

type NATSModule struct {
	// contains filtered or unexported fields
}

NATSModule implements Broker using NATS JetStream. Thread-safe; multiple goroutines may call Publish/Subscribe concurrently.

func NewNATSModule

func NewNATSModule(name string, cfg map[string]any) *NATSModule

NewNATSModule constructs a NATSModule from config map. Recognised keys: url (string), stream (string), jetstream (bool).

func (*NATSModule) Publish

func (m *NATSModule) Publish(subject string, data []byte) error

Publish sends data to the given NATS subject via JetStream.

func (*NATSModule) Start

func (m *NATSModule) Start(_ context.Context) error

Start connects to NATS and (if jetstream=true) ensures the stream exists.

func (*NATSModule) Stop

func (m *NATSModule) Stop(_ context.Context) error

Stop drains subscriptions and closes the NATS connection.

func (*NATSModule) Subscribe

func (m *NATSModule) Subscribe(subject string, handler func([]byte)) error

Subscribe creates a non-durable core NATS subscription. The handler is called on each message in a goroutine managed by the NATS client.

func (*NATSModule) SubscribeDurable

func (m *NATSModule) SubscribeDurable(subject, consumerName string, handler func([]byte)) error

SubscribeDurable creates a durable JetStream push-subscribe consumer. Late subscribers receive messages published before they connected (from sequence 1).

func (*NATSModule) SubscribeWithSubject

func (m *NATSModule) SubscribeWithSubject(subject string, handler func(subject string, data []byte)) error

SubscribeWithSubject creates a non-durable core NATS subscription that delivers both the full subject and the message payload to the handler. This is needed for wildcard subscriptions where the subject suffix carries routing metadata (e.g. the connection ID in NodeRelay subjects).

type NATSModuleConfig

type NATSModuleConfig struct {
	// URL is the NATS server URL (default: nats://localhost:4222).
	URL string `yaml:"url" mapstructure:"url"`
	// Stream is the JetStream stream name (default: GAME_EVENTS).
	Stream string `yaml:"stream" mapstructure:"stream"`
	// JetStream enables JetStream persistence (default: false for plain pub/sub).
	JetStream bool `yaml:"jetstream" mapstructure:"jetstream"`
}

NATSModuleConfig holds typed configuration for the NATS broker module.

func (NATSModuleConfig) Validate

func (c NATSModuleConfig) Validate() error

Validate checks that required fields are present.

type Plugin

type Plugin struct {
	// contains filtered or unexported fields
}

Plugin wires together the NATSModule with the publish/subscribe steps. A single Plugin instance is created per workflow-server process.

func NewPlugin

func NewPlugin(name string, cfg map[string]any) *Plugin

NewPlugin creates a broker plugin from config. Config keys: url (string), stream (string), jetstream (bool).

func (*Plugin) Name

func (p *Plugin) Name() string

Name returns the module name.

func (*Plugin) Publish

func (p *Plugin) Publish(subject string, data []byte) error

Publish sends data to the given subject.

func (*Plugin) PublishStep

func (p *Plugin) PublishStep() *PublishStep

PublishStep returns the step.broker_publish executor.

func (*Plugin) Start

func (p *Plugin) Start(ctx context.Context) error

Start initialises the NATS connection.

func (*Plugin) Stop

func (p *Plugin) Stop(ctx context.Context) error

Stop drains subscriptions and closes the NATS connection.

func (*Plugin) Subscribe

func (p *Plugin) Subscribe(subject string, handler func([]byte)) error

Subscribe creates a non-durable subscription.

func (*Plugin) SubscribeDurable

func (p *Plugin) SubscribeDurable(subject, consumerName string, handler func([]byte)) error

SubscribeDurable creates a durable JetStream subscription.

func (*Plugin) SubscribeStep

func (p *Plugin) SubscribeStep() *SubscribeStep

SubscribeStep returns the step.broker_subscribe executor.

func (*Plugin) SubscribeWithSubject

func (p *Plugin) SubscribeWithSubject(subject string, handler func(subject string, data []byte)) error

SubscribeWithSubject creates a non-durable subscription that delivers both the full NATS subject and the message payload to the handler.

type PublishStep

type PublishStep struct {
	// contains filtered or unexported fields
}

PublishStep implements step.broker_publish. Config: topic (string), payload (string or map serialized to JSON).

func NewPublishStep

func NewPublishStep(broker Publisher) *PublishStep

NewPublishStep creates a step.broker_publish executor.

func (*PublishStep) Execute

func (s *PublishStep) Execute(ctx context.Context, params map[string]any) (map[string]any, error)

Execute publishes data to the broker topic extracted from params. Params:

topic   (string, required) — NATS subject to publish to.
payload (string|map|any)  — message body; maps are JSON-encoded.

type Publisher

type Publisher interface {
	Publish(subject string, data []byte) error
}

Publisher publishes a message to the given NATS subject.

type SubscribeStep

type SubscribeStep struct {
	// contains filtered or unexported fields
}

SubscribeStep implements step.broker_subscribe. It is a trigger-mode step: starts a durable JetStream subscription and invokes the handler callback for each inbound message.

func NewSubscribeStep

func NewSubscribeStep(broker Subscriber) *SubscribeStep

NewSubscribeStep creates a step.broker_subscribe executor.

func (*SubscribeStep) Subscribe

func (s *SubscribeStep) Subscribe(
	ctx context.Context,
	params map[string]any,
	handler func(data []byte),
) error

Subscribe sets up a durable subscription for the given topic and consumer name. Each message is delivered to handler as raw bytes. Params:

topic         (string, required) — NATS subject filter (wildcards supported).
consumer_name (string, required) — durable consumer name for JetStream replay.

type Subscriber

type Subscriber interface {
	Subscribe(subject string, handler func([]byte)) error
	// SubscribeWithSubject is like Subscribe but delivers the full NATS subject
	// alongside the payload. This is required for wildcard subscriptions where
	// the subject suffix carries routing information (e.g. NodeRelay).
	SubscribeWithSubject(subject string, handler func(subject string, data []byte)) error
	SubscribeDurable(subject, consumerName string, handler func([]byte)) error
}

Subscriber subscribes to a NATS subject.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL