nats

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 14, 2026 License: MIT Imports: 15 Imported by: 0

Documentation

Overview

Package nats provides the NATS JetStream connection, subject routing, and event/queue subscribers shared by all Tupic services.

Index

Constants

This section is empty.

Variables

ConnectionModule provides the NATS connection and JetStreamContext. Requires a nats.Config in the graph, supplied by the service.

SubscriberModule wires the shared Router, the EventSubscriber (non-queue.* subjects), and exposes the Router as a MessageHandlerRegisterer.

WorkerModule wires the QueueSubscriber (queue.* subjects). Requires the shared Router already provided by SubscriberModule.

Functions

func NewConnection

func NewConnection(l logger.Logger, cfg Config) (*natsLib.Conn, natsLib.JetStreamContext, error)

func RegisterConnectionLifecycle

func RegisterConnectionLifecycle(lc fx.Lifecycle, nc *natsLib.Conn)

func RegisterSubscriberLifecycle

func RegisterSubscriberLifecycle(lc fx.Lifecycle, s *EventSubscriber)

func RegisterWorkerLifecycle

func RegisterWorkerLifecycle(lc fx.Lifecycle, w *QueueSubscriber)

Types

type Config

type Config struct {
	URL           string
	Token         string
	SubjectPrefix string
	AppSlug       string
}

Config carries NATS connection settings plus the service identity used for durable consumer naming and connection naming.

type EventSubscriber

type EventSubscriber struct {
	// contains filtered or unexported fields
}

EventSubscriber handles non-queue subjects (integration events from other services).

func NewEventSubscriber

func NewEventSubscriber(
	l logger.Logger,
	js natslib.JetStreamContext,
	handler *Router,
	cfg Config,
) *EventSubscriber

func (*EventSubscriber) Start

func (s *EventSubscriber) Start(_ context.Context) error

func (*EventSubscriber) Stop

func (s *EventSubscriber) Stop(ctx context.Context) error

Stop cancels the handler context, unsubscribes all consumers (collecting any errors), then waits for in-flight handlers to drain. All subscriptions are unsubscribed regardless of individual errors. If ctx expires before the drain completes a warning is logged and the function returns so downstream lifecycle hooks are not blocked.

type FailedMessage

type FailedMessage struct {
	ID        uuid.UUID      `gorm:"column:id;type:char(36);primaryKey"`
	Type      string         `gorm:"column:type;type:varchar(255);not null"`
	Version   string         `gorm:"column:version;type:varchar(20);not null"`
	Payload   datatypes.JSON `gorm:"column:payload;type:jsonb;not null"`
	Attempts  int            `gorm:"column:attempts;type:int;not null"`
	LastError string         `gorm:"column:last_error;type:text;not null"`
	FailedAt  time.Time      `gorm:"column:failed_at;type:timestamp;not null"`
}

FailedMessage is the GORM model for the failed_tasks DLQ table. It records every message that the Worker gave up on after MaxDeliver attempts or a terminal error from the handler.

The table name remains "failed_tasks" for backward compatibility with existing migrations. Operators query / re-publish from here; nothing inside the running app reads it on a hot path.

func (*FailedMessage) TableName

func (*FailedMessage) TableName() string

type Message

type Message struct {
	Version string
	Payload json.RawMessage
}

Message is the payload handed to subscription handlers after envelope unwrapping.

type MessageHandler

type MessageHandler func(ctx context.Context, m Message) error

MessageHandler handles a single message for a subject.

type MessageHandlerRegisterer

type MessageHandlerRegisterer interface {
	Register(subject string, h MessageHandler)
}

MessageHandlerRegisterer registers subject handlers; implemented by Router.

type QueueSubscriber

type QueueSubscriber struct {
	// contains filtered or unexported fields
}

QueueSubscriber subscribes to queue.* subjects and dispatches each task to the Router. Terminal failures (apperror) and exhausted MaxDeliver are written to the failed_messages DLQ and Term'd; transient failures are Nak'd for JetStream retry.

func NewQueueSubscriber

func NewQueueSubscriber(
	l logger.Logger,
	c clock.Clock,
	js natslib.JetStreamContext,
	router *Router,
	cfg Config,
	db *gorm.DB,
) (*QueueSubscriber, error)

func (*QueueSubscriber) Start

func (w *QueueSubscriber) Start(_ context.Context) error

Start subscribes to every registered queue.* subject. Each subject gets its own durable consumer so JetStream load-balances across replicas naturally.

func (*QueueSubscriber) Stop

func (w *QueueSubscriber) Stop(ctx context.Context) error

Stop cancels the handler context, unsubscribes all consumers (collecting any errors), then waits for in-flight handlers to drain. All subscriptions are unsubscribed regardless of individual errors. If ctx expires before the drain completes a warning is logged so downstream lifecycle hooks are not blocked.

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router maps NATS subjects to MessageHandlers.

func NewRouter

func NewRouter(l logger.Logger) *Router

func (*Router) Handle

func (r *Router) Handle(ctx context.Context, subject string, m Message) error

func (*Router) Register

func (r *Router) Register(subject string, h MessageHandler)

func (*Router) Subjects

func (r *Router) Subjects() []string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL