jetstream

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 26, 2025 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package jetstream provides a NATS JetStream transport for protoflow.

Index

Constants

View Source
const (
	// DefaultMaxDeliver is the default max delivery attempts.
	DefaultMaxDeliver = 3

	// DefaultAckWait is the default ack wait timeout.
	DefaultAckWait = 30 * time.Second

	// MetadataDelay is the metadata key for delay.
	MetadataDelay = "pf_delay_ms"
)
View Source
const TransportName = "nats-jetstream"

TransportName is the name used to register this transport.

Variables

This section is empty.

Functions

func Build

Build creates a new NATS JetStream transport.

func Capabilities

func Capabilities() transport.Capabilities

Capabilities returns the capabilities of this transport.

func Register added in v0.5.0

func Register()

Register registers the JetStream transport with the default registry. This should be called from an init() function in an importing package, or explicitly before using the transport.

Types

type Config

type Config struct {
	// URL is the NATS server URL.
	URL string

	// StreamName is the name of the JetStream stream to use.
	// If empty, defaults to "PROTOFLOW".
	StreamName string

	// MaxDeliver is the maximum number of delivery attempts.
	MaxDeliver int

	// AckWait is the duration to wait for acknowledgment.
	AckWait time.Duration

	// Replicas is the number of stream replicas (for clustering).
	Replicas int

	// RetentionPolicy: "limits" (default), "interest", or "workqueue"
	RetentionPolicy string
}

Config holds NATS JetStream-specific configuration.

type Transport

type Transport struct {
	// contains filtered or unexported fields
}

Transport implements Publisher and Subscriber for NATS JetStream.

func New

func New(cfg Config, logger watermill.LoggerAdapter) (*Transport, error)

New creates a new NATS JetStream transport.

func (*Transport) Close

func (t *Transport) Close() error

Close closes the JetStream transport.

func (*Transport) GetCapabilities

func (t *Transport) GetCapabilities() transport.Capabilities

GetCapabilities returns the JetStream transport capabilities.

func (*Transport) Publish

func (t *Transport) Publish(topic string, messages ...*message.Message) error

Publish publishes messages to the JetStream stream.

func (*Transport) Subscribe

func (t *Transport) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)

Subscribe subscribes to a topic and returns a channel of messages.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL