nats

package
v0.30.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 4, 2026 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewSnapshotter

func NewSnapshotter(cfg KvConfig) (*es.KeyValueSnapshotter, error)

NewSnapshotter creates a new jetstream key-value-store based snapshotter.

Types

type AggCpStore

type AggCpStore struct {
	// contains filtered or unexported fields
}

func NewAggCpStore

func NewAggCpStore(cfg AggCpStoreConfig) (*AggCpStore, error)

func (*AggCpStore) Get

func (c *AggCpStore) Get(ctx context.Context, projectionName, aggKey string) (lastVersion es.Version, err error)

func (*AggCpStore) Set

func (c *AggCpStore) Set(ctx context.Context, projectionName, aggKey string, lastVersion es.Version) error

type AggCpStoreConfig

type AggCpStoreConfig struct {
	Connect   Connector
	Bucket    string
	KeyPrefix string
	Timeout   time.Duration // Timeout for checkpoint operations (default: 10s)
}

type Connector

type Connector func() (nc *natsgo.Conn, err error)

func ConnectDefault

func ConnectDefault() Connector

func ConnectURL

func ConnectURL(natsURL string) Connector

func NewTestContainer

func NewTestContainer(t Testing) Connector

type CpStore

type CpStore struct {
	// contains filtered or unexported fields
}

func NewCpStore

func NewCpStore(cfg CpStoreConfig) (*CpStore, error)

func (*CpStore) Get

func (s *CpStore) Get(ctx context.Context) (lastSeq uint64, err error)

func (*CpStore) Set

func (s *CpStore) Set(ctx context.Context, lastSeq uint64) error

type CpStoreConfig

type CpStoreConfig struct {
	Connect Connector
	Bucket  string
	Key     string
	Timeout time.Duration // Timeout for checkpoint operations (default: 10s)
}

type EventStore

type EventStore struct {
	// contains filtered or unexported fields
}

func NewEventStore

func NewEventStore(cfg EventStoreConfig) (*EventStore, error)

func (*EventStore) Append

func (e *EventStore) Append(
	ctx context.Context,
	aggType string,
	aggID string,
	expectedVersion es.Version,
	events []es.Envelope,
) (res *es.StoreAppendResult, err error)

func (*EventStore) Close

func (e *EventStore) Close() error

func (*EventStore) Load

func (e *EventStore) Load(
	ctx context.Context,
	aggType string,
	aggID string,
	opts ...es.StoreLoadOption,
) (loadedEvents []es.Envelope, err error)

func (*EventStore) Subscribe

func (e *EventStore) Subscribe(ctx context.Context, opts ...es.SubscribeOption) (es.Subscription, error)

type EventStoreConfig

type EventStoreConfig struct {
	Connect        Connector    // Connect is used to create the underlying NATS connection. If nil, ConnectDefault() is used.
	Log            *slog.Logger // Log for diagnostics (optional)
	SubjectPrefix  string       // SubjectPrefix is the prefix used to store events
	StreamSubjects []string     // StreamSubjects is the list of subjects the stream is fed with
	StreamName     string
	RenameType     func(string) string

	// Retention defines the retention policy for the stream (default: RetentionLimits).
	Retention RetentionPolicy

	// MaxAge is the maximum age of messages in the stream.
	MaxAge time.Duration

	// MaxBytes is the maximum total size of messages in the stream.
	MaxBytes int64

	// MaxMsgs is the maximum number of messages in the stream.
	MaxMsgs int64
}

type KvConfig

type KvConfig struct {
	Connect      Connector
	Bucket       string
	TTL          time.Duration
	MaxBytes     int
	MaxValueSize int
	KeyPrefix    string
}

type KvStore

type KvStore struct {
	// contains filtered or unexported fields
}

func NewKvStore

func NewKvStore(cfg KvConfig) (*KvStore, error)

func (*KvStore) Delete

func (k *KvStore) Delete(ctx context.Context, key string) error

func (*KvStore) Get

func (k *KvStore) Get(ctx context.Context, key string) (entry kv.Entry, err error)

func (*KvStore) Put

func (k *KvStore) Put(ctx context.Context, key string, entry kv.Entry, opts kv.PutOptions) (err error)

type RetentionPolicy

type RetentionPolicy int

RetentionPolicy defines how messages are retained in the stream.

const (
	// RetentionLimits keeps messages until limits (MaxMsgs, MaxBytes, MaxAge) are reached.
	// This is the default NATS behavior - messages are kept until explicitly limited.
	RetentionLimits RetentionPolicy = iota

	// RetentionInterest keeps messages only while there are consumers with interest.
	// Messages are deleted once all interested consumers have acknowledged.
	RetentionInterest

	// RetentionWorkQueue makes each message available to only one consumer (queue semantics).
	// Messages are deleted after being consumed by any one consumer.
	RetentionWorkQueue
)

type Testing

type Testing interface {
	require.TestingT
	Context() context.Context
	Logf(format string, args ...any)
	Cleanup(func())
}

type Transport

type Transport struct {
	// contains filtered or unexported fields
}

func NewTransport

func NewTransport(cfg TransportConfig) (*Transport, error)

func (*Transport) Close

func (t *Transport) Close() error

func (*Transport) Request

func (t *Transport) Request(ctx context.Context, env cluster.Envelope) ([]byte, error)

func (*Transport) SubscribeShard

func (t *Transport) SubscribeShard(ctx context.Context, shardID uint32, h cluster.ServerHandlerFunc) (cluster.Subscription, error)

SubscribeShard subscribes to messages for a specific shard.

type TransportConfig

type TransportConfig struct {
	Connect       Connector    // Connect is used to create the underlying NATS connection. If nil, ConnectDefault() is used.
	Log           *slog.Logger // Log for diagnostics (optional)
	SubjectPrefix string       // SubjectPrefix for shard subjects, e.g. "clstr" -> clstr.shard.<id>
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL