internal

package
v0.2.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 24, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package internal provides the NATS JetStream broker module for cross-instance game event delivery.

Index

Constants

This section is empty.

Variables

View Source
var Version = "0.0.0"

Version is set at build time via -ldflags "-X github.com/GoCodeAlone/workflow-plugin-broker/internal.Version=X.Y.Z". Default is a bare semver so plugin loaders that validate semver accept unreleased dev builds; goreleaser overrides with the real release tag.

Functions

This section is empty.

Types

type Broker

type Broker interface {
	Publisher
	Subscriber
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
}

Broker combines Publisher and Subscriber.

type BrokerProvider added in v0.2.4

type BrokerProvider struct{}

BrokerProvider implements the workflow external plugin SDK interfaces so that the broker can be served as a proper gRPC plugin via sdk.Serve.

Implemented interfaces:

  • sdk.PluginProvider (Manifest)
  • sdk.ModuleProvider (ModuleTypes, CreateModule)
  • sdk.StepProvider (StepTypes, CreateStep)
  • sdk.SchemaProvider (ModuleSchemas)

func (*BrokerProvider) ContractRegistry added in v0.2.4

func (p *BrokerProvider) ContractRegistry() *pb.ContractRegistry

ContractRegistry returns the typed contract descriptors for all broker module and step types. The workflow engine calls this via the sdk.ContractProvider interface for strict validation.

func (*BrokerProvider) CreateModule added in v0.2.4

func (p *BrokerProvider) CreateModule(typeName, name string, config map[string]any) (sdk.ModuleInstance, error)

CreateModule creates a NATS broker module instance of the given type.

func (*BrokerProvider) CreateStep added in v0.2.4

func (p *BrokerProvider) CreateStep(typeName, name string, config map[string]any) (sdk.StepInstance, error)

CreateStep creates a step instance of the given type. The step's broker connection is created lazily on first Execute call using the config fields (url, stream).

func (*BrokerProvider) Manifest added in v0.2.4

func (p *BrokerProvider) Manifest() sdk.PluginManifest

Manifest returns the plugin metadata.

func (*BrokerProvider) ModuleSchemas added in v0.2.4

func (p *BrokerProvider) ModuleSchemas() []sdk.ModuleSchemaData

ModuleSchemas returns typed schema descriptions for all module types.

func (*BrokerProvider) ModuleTypes added in v0.2.4

func (p *BrokerProvider) ModuleTypes() []string

ModuleTypes returns the module type names provided by this plugin.

func (*BrokerProvider) StepTypes added in v0.2.4

func (p *BrokerProvider) StepTypes() []string

StepTypes returns the step type names provided by this plugin.

type NATSModule

type NATSModule struct {
	// contains filtered or unexported fields
}

NATSModule implements Broker using NATS JetStream. Thread-safe; multiple goroutines may call Publish/Subscribe concurrently.

func NewNATSModule

func NewNATSModule(name string, cfg map[string]any) *NATSModule

NewNATSModule constructs a NATSModule from config map. Recognised keys: url (string), stream (string), jetstream (bool).

func (*NATSModule) FetchOneDurable added in v0.2.4

func (m *NATSModule) FetchOneDurable(ctx context.Context, subject, consumerName string) ([]byte, error)

FetchOneDurable creates (or reuses) a durable pull consumer and fetches exactly one message, blocking until a message is available or ctx is cancelled. Only the single fetched message is acknowledged; no other messages are consumed or lost, avoiding the ACK-race present in push-subscribe approaches.

func (*NATSModule) Publish

func (m *NATSModule) Publish(subject string, data []byte) error

Publish sends data to the given NATS subject via JetStream.

func (*NATSModule) Start

func (m *NATSModule) Start(_ context.Context) error

Start connects to NATS and (if jetstream=true) ensures the stream exists.

func (*NATSModule) Stop

func (m *NATSModule) Stop(_ context.Context) error

Stop drains subscriptions and closes the NATS connection.

func (*NATSModule) Subscribe

func (m *NATSModule) Subscribe(subject string, handler func([]byte)) error

Subscribe creates a non-durable core NATS subscription. The handler is called on each message in a goroutine managed by the NATS client.

func (*NATSModule) SubscribeDurable

func (m *NATSModule) SubscribeDurable(subject, consumerName string, handler func([]byte)) error

SubscribeDurable creates a durable JetStream push-subscribe consumer. Late subscribers receive messages published before they connected (from sequence 1).

func (*NATSModule) SubscribeWithSubject

func (m *NATSModule) SubscribeWithSubject(subject string, handler func(subject string, data []byte)) error

SubscribeWithSubject creates a non-durable core NATS subscription that delivers both the full subject and the message payload to the handler. This is needed for wildcard subscriptions where the subject suffix carries routing metadata (e.g. the connection ID in NodeRelay subjects).

type NATSModuleConfig

type NATSModuleConfig struct {
	// URL is the NATS server URL (default: nats://localhost:4222).
	URL string `yaml:"url" mapstructure:"url"`
	// Stream is the JetStream stream name (default: GAME_EVENTS).
	Stream string `yaml:"stream" mapstructure:"stream"`
	// JetStream enables JetStream persistence (default: false for plain pub/sub).
	JetStream bool `yaml:"jetstream" mapstructure:"jetstream"`
}

NATSModuleConfig holds typed configuration for the NATS broker module.

func (NATSModuleConfig) Validate

func (c NATSModuleConfig) Validate() error

Validate checks that required fields are present.

type Plugin

type Plugin struct {
	// contains filtered or unexported fields
}

Plugin wires together the NATSModule with the publish/subscribe steps. A single Plugin instance is created per workflow-server process.

func NewPlugin

func NewPlugin(name string, cfg map[string]any) *Plugin

NewPlugin creates a broker plugin from config. Config keys: url (string), stream (string), jetstream (bool).

func (*Plugin) Name

func (p *Plugin) Name() string

Name returns the module name.

func (*Plugin) Publish

func (p *Plugin) Publish(subject string, data []byte) error

Publish sends data to the given subject.

func (*Plugin) PublishStep

func (p *Plugin) PublishStep() *PublishStep

PublishStep returns the step.broker_publish executor.

func (*Plugin) Start

func (p *Plugin) Start(ctx context.Context) error

Start initialises the NATS connection.

func (*Plugin) Stop

func (p *Plugin) Stop(ctx context.Context) error

Stop drains subscriptions and closes the NATS connection.

func (*Plugin) Subscribe

func (p *Plugin) Subscribe(subject string, handler func([]byte)) error

Subscribe creates a non-durable subscription.

func (*Plugin) SubscribeDurable

func (p *Plugin) SubscribeDurable(subject, consumerName string, handler func([]byte)) error

SubscribeDurable creates a durable JetStream subscription.

func (*Plugin) SubscribeStep

func (p *Plugin) SubscribeStep() *SubscribeStep

SubscribeStep returns the step.broker_subscribe executor.

func (*Plugin) SubscribeWithSubject

func (p *Plugin) SubscribeWithSubject(subject string, handler func(subject string, data []byte)) error

SubscribeWithSubject creates a non-durable subscription that delivers both the full NATS subject and the message payload to the handler.

type PublishStep

type PublishStep struct {
	// contains filtered or unexported fields
}

PublishStep implements step.broker_publish. Config: topic (string), payload (string or map serialized to JSON).

func NewPublishStep

func NewPublishStep(broker Publisher) *PublishStep

NewPublishStep creates a step.broker_publish executor.

func (*PublishStep) Execute

func (s *PublishStep) Execute(ctx context.Context, params map[string]any) (map[string]any, error)

Execute publishes data to the broker topic extracted from params. Params:

topic   (string, required) — NATS subject to publish to.
payload (string|map|any)  — message body; maps are JSON-encoded.

type Publisher

type Publisher interface {
	Publish(subject string, data []byte) error
}

Publisher publishes a message to the given NATS subject.

type SubscribeStep

type SubscribeStep struct {
	// contains filtered or unexported fields
}

SubscribeStep implements step.broker_subscribe. It is a trigger-mode step: starts a durable JetStream subscription and invokes the handler callback for each inbound message.

func NewSubscribeStep

func NewSubscribeStep(broker Subscriber) *SubscribeStep

NewSubscribeStep creates a step.broker_subscribe executor.

func (*SubscribeStep) Subscribe

func (s *SubscribeStep) Subscribe(
	ctx context.Context,
	params map[string]any,
	handler func(data []byte),
) error

Subscribe sets up a durable subscription for the given topic and consumer name. Each message is delivered to handler as raw bytes. Params:

topic         (string, required) — NATS subject filter (wildcards supported).
consumer_name (string, required) — durable consumer name for JetStream replay.

type Subscriber

type Subscriber interface {
	Subscribe(subject string, handler func([]byte)) error
	// SubscribeWithSubject is like Subscribe but delivers the full NATS subject
	// alongside the payload. This is required for wildcard subscriptions where
	// the subject suffix carries routing information (e.g. NodeRelay).
	SubscribeWithSubject(subject string, handler func(subject string, data []byte)) error
	SubscribeDurable(subject, consumerName string, handler func([]byte)) error
}

Subscriber subscribes to a NATS subject.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL