Documentation
¶
Overview ¶
Package nats provides the NATS JetStream connection, subject routing, and event/queue subscribers shared by all Tupic services.
Index ¶
- Variables
- func NewConnection(l logger.Logger, cfg Config) (*natsLib.Conn, natsLib.JetStreamContext, error)
- func RegisterConnectionLifecycle(lc fx.Lifecycle, nc *natsLib.Conn)
- func RegisterSubscriberLifecycle(lc fx.Lifecycle, s *EventSubscriber)
- func RegisterWorkerLifecycle(lc fx.Lifecycle, w *QueueSubscriber)
- type Config
- type EventSubscriber
- type FailedMessage
- type Message
- type MessageHandler
- type MessageHandlerRegisterer
- type QueueSubscriber
- type Router
Constants ¶
This section is empty.
Variables ¶
var ConnectionModule = fx.Options( fx.Provide(NewConnection), fx.Invoke(RegisterConnectionLifecycle), )
ConnectionModule provides the NATS connection and JetStreamContext. Requires a nats.Config in the graph, supplied by the service.
var SubscriberModule = fx.Options( fx.Provide( NewRouter, NewEventSubscriber, func(r *Router) MessageHandlerRegisterer { return r }, ), fx.Invoke(RegisterSubscriberLifecycle), )
SubscriberModule wires the shared Router, the EventSubscriber (non-queue.* subjects), and exposes the Router as a MessageHandlerRegisterer.
var WorkerModule = fx.Options( fx.Provide(NewQueueSubscriber), fx.Invoke(RegisterWorkerLifecycle), )
WorkerModule wires the QueueSubscriber (queue.* subjects). Requires the shared Router already provided by SubscriberModule.
Functions ¶
func NewConnection ¶
func RegisterSubscriberLifecycle ¶
func RegisterSubscriberLifecycle(lc fx.Lifecycle, s *EventSubscriber)
func RegisterWorkerLifecycle ¶
func RegisterWorkerLifecycle(lc fx.Lifecycle, w *QueueSubscriber)
Types ¶
type Config ¶
Config carries NATS connection settings plus the service identity used for durable consumer naming and connection naming.
type EventSubscriber ¶
type EventSubscriber struct {
// contains filtered or unexported fields
}
EventSubscriber handles non-queue subjects (integration events from other services).
func NewEventSubscriber ¶
func NewEventSubscriber( l logger.Logger, js natslib.JetStreamContext, handler *Router, cfg Config, ) *EventSubscriber
func (*EventSubscriber) Stop ¶
func (s *EventSubscriber) Stop(ctx context.Context) error
Stop cancels the handler context, unsubscribes all consumers (collecting any errors), then waits for in-flight handlers to drain. All subscriptions are unsubscribed regardless of individual errors. If ctx expires before the drain completes a warning is logged and the function returns so downstream lifecycle hooks are not blocked.
type FailedMessage ¶
type FailedMessage struct {
ID uuid.UUID `gorm:"column:id;type:char(36);primaryKey"`
Type string `gorm:"column:type;type:varchar(255);not null"`
Version string `gorm:"column:version;type:varchar(20);not null"`
Payload datatypes.JSON `gorm:"column:payload;type:jsonb;not null"`
Attempts int `gorm:"column:attempts;type:int;not null"`
LastError string `gorm:"column:last_error;type:text;not null"`
FailedAt time.Time `gorm:"column:failed_at;type:timestamp;not null"`
}
FailedMessage is the GORM model for the failed_tasks DLQ table. It records every message that the Worker gave up on after MaxDeliver attempts or a terminal error from the handler.
The table name remains "failed_tasks" for backward compatibility with existing migrations. Operators query / re-publish from here; nothing inside the running app reads it on a hot path.
func (*FailedMessage) TableName ¶
func (*FailedMessage) TableName() string
type Message ¶
type Message struct {
Version string
Payload json.RawMessage
}
Message is the payload handed to subscription handlers after envelope unwrapping.
type MessageHandler ¶
MessageHandler handles a single message for a subject.
type MessageHandlerRegisterer ¶
type MessageHandlerRegisterer interface {
Register(subject string, h MessageHandler)
}
MessageHandlerRegisterer registers subject handlers; implemented by Router.
type QueueSubscriber ¶
type QueueSubscriber struct {
// contains filtered or unexported fields
}
QueueSubscriber subscribes to queue.* subjects and dispatches each task to the Router. Terminal failures (apperror) and exhausted MaxDeliver are written to the failed_messages DLQ and Term'd; transient failures are Nak'd for JetStream retry.
func NewQueueSubscriber ¶
func (*QueueSubscriber) Start ¶
func (w *QueueSubscriber) Start(_ context.Context) error
Start subscribes to every registered queue.* subject. Each subject gets its own durable consumer so JetStream load-balances across replicas naturally.
func (*QueueSubscriber) Stop ¶
func (w *QueueSubscriber) Stop(ctx context.Context) error
Stop cancels the handler context, unsubscribes all consumers (collecting any errors), then waits for in-flight handlers to drain. All subscriptions are unsubscribed regardless of individual errors. If ctx expires before the drain completes a warning is logged so downstream lifecycle hooks are not blocked.