common

package
v1.1.2155 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 2, 2026 License: MIT Imports: 33 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrStreamCancel = errors.New("stream cancelled")

ErrStreamCancel is an error variable that represents a stream cancellation.

Functions

func CheckVersion added in v1.1.927

func CheckVersion(ctx context.Context, nc *nats.Conn) error

CheckVersion checks the NATS server version against a minimum supported version

func ContextLoggerWithWfState added in v1.1.754

func ContextLoggerWithWfState(ctx context.Context, state *model.WorkflowState) (context.Context, *slog.Logger)

ContextLoggerWithWfState will populate a context with relevant fields from a WorkflowState model

func CopyWorkflowState added in v1.0.446

func CopyWorkflowState(state *model.WorkflowState) *model.WorkflowState

CopyWorkflowState - clones a proto model.WorkflowState for modification.

func Create added in v1.1.1492

func Create(ctx context.Context, wf jetstream.KeyValue, k string, v []byte) error

Create saves a value to a key value store erroring if the key is present

func CreateObj added in v1.1.1492

func CreateObj(ctx context.Context, wf jetstream.KeyValue, k string, v proto.Message) error

CreateObj save a protobuf message to a key value store erroring if the key existz

func CreateOrUpdateNatsWaitGroup added in v1.1.1532

func CreateOrUpdateNatsWaitGroup(ctx context.Context, js jetstream.JetStream, name string, stream string, groupPrefix string, rcvFn func(js jetstream.Stream, msg jetstream.Msg, n uint64) (bool, error), opts ...WaitGroupOpt) (jetstream.ConsumeContext, error)

CreateOrUpdateNatsWaitGroup allows a function to wait for a specific amount of messages to be received by NATS JetStream. A distributed lock is used to prevent race conditions. The provided function must execute with the lock time (30 seconds). An optional completion buffer can be provided which prevents resubmission of completed subjects. Once constructed, messages matching the prefix will trigger off the supplied function. If the function returns true, the message will be consumed, and any other messages with the same subject will be deleted.

func Delete added in v0.1.192

func Delete(ctx context.Context, kv jetstream.KeyValue, key string) error

Delete deletes an item from a key value store.

func DeleteLarge added in v1.0.498

func DeleteLarge(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, k string) error

DeleteLarge deletes a large binary from the object store

func DropStateParams added in v1.0.451

func DropStateParams(state *model.WorkflowState)

DropStateParams removes any parameters unsafe to send across a state transition.

func ElementTable added in v0.1.134

func ElementTable(process *model.Workflow) map[string]*model.Element

ElementTable indexes an entire process for quick ID lookups

func EnsureBucket added in v1.0.477

func EnsureBucket(ctx context.Context, js jetstream.JetStream, storageType jetstream.StorageType, name string, ttl time.Duration) error

EnsureBucket creates a bucket if it does not exist

func EnsureBuckets

func EnsureBuckets(ctx context.Context, js jetstream.JetStream, storageType jetstream.StorageType, names []string) error

EnsureBuckets ensures that a list of key value stores exist

func ExtendLock added in v1.0.477

func ExtendLock(ctx context.Context, kv jetstream.KeyValue, lockID string) error

ExtendLock extends the lock past its stale time.

func GetCallerInfo added in v1.1.1056

func GetCallerInfo(skip int) (info string)

GetCallerInfo returns caller function info. 1 for current function, 2 for caller.

func Handle added in v1.1.1807

func Handle(ctx context.Context, dependencies HandlerDependencies, messageSpec MessageSpec) error

Handle the messages defined in the message spec

func History added in v1.1.1687

History gets the history for a key value store

func IndexProcessElements added in v0.1.134

func IndexProcessElements(elements []*model.Element, el map[string]*model.Element)

IndexProcessElements produces an index of process elements by ID

func KSuidTo64bit added in v0.1.78

func KSuidTo64bit(k string) [8]byte

KSuidTo64bit takes the most variable 64 bits of a KSuid and returns them as bytes.

func KSuidTo128bit added in v0.1.78

func KSuidTo128bit(k string) [16]byte

KSuidTo128bit returns a KSuid as bytes.

func KeyPrefixSearch added in v1.1.927

func KeyPrefixSearch(ctx context.Context, js jetstream.JetStream, kv jetstream.KeyValue, prefix string, opts KeyPrefixResultOpts) ([]string, error)

KeyPrefixSearch searches for keys in a key-value store that have a specified prefix. It retrieves the keys by querying the JetStream stream associated with the key-value store. It returns a slice of strings containing the keys, and an error if any.

func KeyPrefixSearchMap added in v1.1.1349

func KeyPrefixSearchMap(ctx context.Context, js jetstream.JetStream, kv jetstream.KeyValue, prefix string, opts KeyPrefixResultOpts) (map[string]struct{}, error)

KeyPrefixSearchMap searches for keys in a key-value store that have a specified prefix. It retrieves the keys by querying the JetStream stream associated with the key-value store. It returns a map of strings containing the keys, and an error if any.

func Load

func Load(ctx context.Context, wf jetstream.KeyValue, k string) ([]byte, error)

Load loads a value from a key value store

func LoadLarge added in v1.0.498

func LoadLarge(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, key string, opt ...jetstream.GetObjectOpt) ([]byte, error)

LoadLarge load a large binary from the object store

func LoadLargeObj added in v1.0.498

LoadLargeObj loads a protobuf message from a key value store

func LoadObj

func LoadObj(ctx context.Context, kvStore jetstream.KeyValue, k string, v proto.Message) error

LoadObj loads a protobuf message from a key value store

func Lock added in v1.0.477

func Lock(ctx context.Context, kv jetstream.KeyValue, lockID string) (bool, error)

Lock ensures a lock on a given ID, it returns true if a lock was granted.

func Log added in v1.0.271

func Log(ctx context.Context, js nats.JetStream, trackingID string, source model.LogSource, severity messages.WorkflowLogLevel, code int32, message string, attrs map[string]string) error

Log is the generic metod to output to SHAR telemetry.

func MapToSlice added in v1.1.2120

func MapToSlice[K comparable, V any, U any](input map[K]V, f func(k K, v V) U) []U

MapToSlice applies a function to each element of a map and returns a new slice.

func NewContextVarAwareHandler added in v1.1.1500

func NewContextVarAwareHandler(h slog.Handler, contextVarKeys map[any]struct{}) (slog.Handler, error)

NewContextVarAwareHandler create new instance of ContextVarAwareHandler

func Process added in v0.1.78

func Process(ctx context.Context, js jetstream.JetStream, streamName string, traceName string, closer chan struct{}, subject string, durableConsumerName string, concurrency int, middleware []middleware.Receive, fn func(ctx context.Context, log *slog.Logger, msg jetstream.Msg) (bool, error), signalFatalErrFn func(ctx context.Context, state *model.WorkflowState, log *slog.Logger), opts ...ProcessOption) error

Process processes messages from a nats consumer and executes a function against each one.

func PublishOnce added in v1.1.732

func PublishOnce(ctx context.Context, js jetstream.JetStream, lockingKV jetstream.KeyValue, streamName string, consumerName string, publishFn func(context.Context) error) error

PublishOnce sets up a single message to be used as a timer.

func SafeDelete added in v1.1.1441

func SafeDelete(ctx context.Context, kv jetstream.KeyValue, key string) error

SafeDelete deletes a key from a JetStream KV ensuring the value is present, and the operation is idempotent.

func Save

func Save(ctx context.Context, wf jetstream.KeyValue, k string, v []byte) error

Save saves a value to a key value store

func SaveLarge added in v1.0.498

func SaveLarge(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, key string, data []byte) error

SaveLarge saves a large binary from the object store

func SaveLargeObj added in v1.0.498

func SaveLargeObj(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, k string, v proto.Message, opt ...nats.ObjectOpt) error

SaveLargeObj save an protobuf message to a document store

func SaveObj

func SaveObj(ctx context.Context, wf jetstream.KeyValue, k string, v proto.Message) error

SaveObj save a protobuf message to a key value store

func SeekElement added in v1.1.1307

func SeekElement(process *model.Workflow, id string) *model.Element

SeekElement locates an element from a workflow by ID

func SliceToMap added in v1.1.2120

func SliceToMap[K comparable, V any](input []V, f func(v V) K) map[K]V

SliceToMap applies a function to each element of a slice and returns a new map.

func StreamingReplyClient added in v1.1.975

func StreamingReplyClient(ctx context.Context, nc *nats.Conn, msg *nats.Msg, fn func(msg *nats.Msg) error) error

StreamingReplyClient establishes a streaming reply client. It creates a subscription for replies and invokes a callback function for each received message.

func StreamingReplyServer added in v1.1.975

func StreamingReplyServer(nc streamNatsReplyconnection, subject string, fn func(req *nats.Msg, ret chan *nats.Msg, errs chan error)) (*nats.Subscription, error)

StreamingReplyServer is a function that sets up a NATS subscription to handle streaming reply messages. When a message is received, it begins streaming by creating channels for return messages and error messages. It then executes the provided function to process the request and send the response messages. The function runs in a separate goroutine. It continuously listens for return messages and error messages, and publishes them to the reply inbox. It exits when an error or cancellation occurs. The function returns the NATS subscription and any error that occurred during setup.

func TransformSlice added in v1.1.2120

func TransformSlice[T any, U any](input []T, f func(v T) U) []U

TransformSlice applies a function to each element of a slice and returns a new slice.

func UnLock added in v1.0.477

func UnLock(ctx context.Context, kv jetstream.KeyValue, lockID string) error

UnLock closes an existing lock.

func UpdateLargeObj added in v1.0.498

func UpdateLargeObj[T proto.Message](ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, k string, msg T, updateFn func(v T) (T, error)) (T, error)

UpdateLargeObj saves an protobuf message to a document store after using updateFN to update the message.

func UpdateObj

func UpdateObj[T proto.Message](ctx context.Context, wf jetstream.KeyValue, k string, msg T, updateFn func(v T) (T, error)) error

UpdateObj saves an protobuf message to a key value store after using updateFN to update the message.

func UpdateObjIsNew added in v1.0.446

func UpdateObjIsNew[T proto.Message](ctx context.Context, wf jetstream.KeyValue, k string, msg T, updateFn func(v T) (T, error)) (bool, error)

UpdateObjIsNew saves an protobuf message to a key value store after using updateFN to update the message, and returns true if this is a new value.

Types

type BackoffFn added in v1.0.623

type BackoffFn func(ctx context.Context, msg jetstream.Msg) error

BackoffFn represents a function that completely handles the backoff for a message including ACK/NAK

type BackoffProcessOption added in v1.0.623

type BackoffProcessOption struct {
	// contains filtered or unexported fields
}

BackoffProcessOption holds the backoff function. Don't use this directly. Use the convenience function WithBackoffFn

func WithBackoffFn added in v1.0.623

func WithBackoffFn(fn BackoffFn) BackoffProcessOption

WithBackoffFn adds a back-off function to message processing

func (BackoffProcessOption) Set added in v1.0.623

func (b BackoffProcessOption) Set(opts *ProcessOpts)

Set the backoff function in the process settings

type ConsumerCfgOption added in v1.1.1807

type ConsumerCfgOption interface {
	Apply(cfg *jetstream.ConsumerConfig)
}

ConsumerCfgOption defines interface for specifying a particular consumer cfg option

func ListenFromPriorTime added in v1.1.1807

func ListenFromPriorTime(ago time.Duration) ConsumerCfgOption

ListenFromPriorTime defines how long ago a consumer should start reading from

type ContextVarAwareHandler added in v1.1.1500

type ContextVarAwareHandler struct {
	// contains filtered or unexported fields
}

ContextVarAwareHandler is a slog handler that looks for the specified context var keys and then adds them as log record attributes

func (*ContextVarAwareHandler) Enabled added in v1.1.1500

func (cvah *ContextVarAwareHandler) Enabled(ctx context.Context, level slog.Level) bool

Enabled implementation of slog.Handler interface

func (*ContextVarAwareHandler) Handle added in v1.1.1500

func (cvah *ContextVarAwareHandler) Handle(ctx context.Context, r slog.Record) error

Handle implementation of slog.Handler interface

func (*ContextVarAwareHandler) WithAttrs added in v1.1.1500

func (cvah *ContextVarAwareHandler) WithAttrs(attrs []slog.Attr) slog.Handler

WithAttrs implementation of slog.Handler interface

func (*ContextVarAwareHandler) WithGroup added in v1.1.1500

func (cvah *ContextVarAwareHandler) WithGroup(name string) slog.Handler

WithGroup implementation of slog.Handler interface

type HandlerDependencies added in v1.1.1807

type HandlerDependencies struct {
	Js     jetstream.JetStream
	Closer chan struct{}
	Logger *slog.Logger
}

HandlerDependencies are components required by the message handler

type KeyPrefixResultOpts added in v1.1.927

type KeyPrefixResultOpts struct {
	Sort           bool // Sort the returned values
	ExcludeDeleted bool // ExcludeDeleted filters deleted key-values from the result (cost penalty)¬.
}

KeyPrefixResultOpts represents the options for KeyPrefixSearch function. Sort field indicates whether the returned values should be sorted. ExcludeDeleted field filters out deleted key-values from the result.

type MessageSpec added in v1.1.1807

type MessageSpec struct {
	MessageDescription  string
	FilterSubject       string
	MaxConcurrency      int
	ConsumerCfgOptions  []ConsumerCfgOption
	PreProcessMessageFn func(context.Context, jetstream.Msg) context.Context
	HandleMsgFn         func(ctx context.Context, msg jetstream.Msg) error
}

MessageSpec defines the messages to be processed and how to process them

type NatsConn

type NatsConn interface {
	QueueSubscribe(subj string, queue string, cb nats.MsgHandler) (*nats.Subscription, error)
	Publish(subj string, bytes []byte) error
	PublishMsg(msg *nats.Msg) error
	Subscribe(subj string, cb nats.MsgHandler) (*nats.Subscription, error)
}

NatsConn is the trimmed down NATS Connection interface that only encompasses the methods used by SHAR Why do we need this?

type NatsMsgWrapper added in v1.1.927

type NatsMsgWrapper struct {
	jetstream.Msg
	// contains filtered or unexported fields
}

NatsMsgWrapper is a wrapper type that combines the jetstream.Msg and nats.Msg types.

func NewNatsMsgWrapper added in v1.1.927

func NewNatsMsgWrapper(msg *nats.Msg) *NatsMsgWrapper

NewNatsMsgWrapper is a function that creates a new instance of NatsMsgWrapper, which is a wrapper type that combines the jetstream.Msg and nats.Msg types.

func (*NatsMsgWrapper) Ack added in v1.1.927

func (w *NatsMsgWrapper) Ack() error

Ack is a method that acknowledges the receipt of the NATS message by calling the underlying nats.Msg's Ack method. If an error occurs while acknowledging the message, it returns an error with a message indicating the failure. Returns nil if the acknowledgement is successful.

func (*NatsMsgWrapper) Data added in v1.1.927

func (w *NatsMsgWrapper) Data() []byte

Data is a method that retrieves the data from the underlying nats.Msg.

func (*NatsMsgWrapper) DoubleAck added in v1.1.927

func (w *NatsMsgWrapper) DoubleAck(context.Context) error

DoubleAck is a method that simulates a double acknowledgement of the NATS message. It returns an error with a message indicating that double ack is not allowed.

func (*NatsMsgWrapper) Headers added in v1.1.927

func (w *NatsMsgWrapper) Headers() nats.Header

Headers is a method that retrieves the headers from the underlying nats.Msg.

func (*NatsMsgWrapper) InProgress added in v1.1.927

func (w *NatsMsgWrapper) InProgress() error

InProgress is a method that indicates that the message is still in progress.

func (*NatsMsgWrapper) Metadata added in v1.1.927

func (w *NatsMsgWrapper) Metadata() (*jetstream.MsgMetadata, error)

Metadata is a method that retrieves the metadata from the underlying nats.Msg.

func (*NatsMsgWrapper) Nak added in v1.1.927

func (w *NatsMsgWrapper) Nak() error

Nak is a method that nak's the message..

func (*NatsMsgWrapper) NakWithDelay added in v1.1.927

func (w *NatsMsgWrapper) NakWithDelay(delay time.Duration) error

NakWithDelay is a method that nak's the message, and will not re-process before delay.

func (*NatsMsgWrapper) Reply added in v1.1.927

func (w *NatsMsgWrapper) Reply() string

Reply is a method that retrieves the reply from the underlying nats.Msg.

func (*NatsMsgWrapper) SetData added in v1.1.927

func (w *NatsMsgWrapper) SetData(b []byte)

SetData is a method that sets the data of the underlying nats.Msg.

func (*NatsMsgWrapper) Subject added in v1.1.927

func (w *NatsMsgWrapper) Subject() string

Subject is a method that retrieves the subject from the underlying nats.Msg.

func (*NatsMsgWrapper) Term added in v1.1.927

func (w *NatsMsgWrapper) Term() error

Term is a method that calls the `Term` method on the underlying `NatsMsgWrapper` instance.

type ProcessOption added in v1.0.623

type ProcessOption interface {
	Set(opts *ProcessOpts)
}

ProcessOption represents an option function that can be passed to message processing.

type ProcessOpts added in v1.0.623

type ProcessOpts struct {
	BackoffCalc BackoffFn
}

ProcessOpts holds the settings for message processing.

type TrackingID added in v1.0.215

type TrackingID []string

TrackingID is an ID stack that maintains the callstack

func (TrackingID) Ancestor added in v1.0.215

func (t TrackingID) Ancestor(gen int) string

Ancestor provides the ID of the caller back <gen> generations.

func (TrackingID) ID added in v1.0.215

func (t TrackingID) ID() string

ID provides the current ID

func (TrackingID) NewID added in v1.1.1099

func (t TrackingID) NewID() TrackingID

NewID adds a new random ID to the trackingID

func (TrackingID) ParentID added in v1.0.215

func (t TrackingID) ParentID() string

ParentID provides the ID of the caller.

func (TrackingID) Pop added in v1.0.215

func (t TrackingID) Pop() TrackingID

Pop removes the current ID from the callstack.

func (TrackingID) Push added in v1.0.215

func (t TrackingID) Push(id string) TrackingID

Push adds a new ID to the callstack.

type WaitGroupOpt added in v1.1.1532

type WaitGroupOpt func(cfg *waitgroupOpts)

WaitGroupOpt Provides options to CreateOrUpdateNatsWaitGroup

func WithCompletionBuffer added in v1.1.1532

func WithCompletionBuffer(name string, duration time.Duration) WaitGroupOpt

WithCompletionBuffer prevents future calls to a WaitGroup subject for a specific duration.

func WithLockTimeout added in v1.1.1532

func WithLockTimeout(duration time.Duration) WaitGroupOpt

WithLockTimeout specifies the time to wait before unlocking a subject group in a WaitGroup.

func WithMemoryStorage added in v1.1.1532

func WithMemoryStorage() WaitGroupOpt

WithMemoryStorage specifies that the WaitGroup should use memory storage for ephemeral behaviour.

Directories

Path Synopsis
Package base62 implements base62 encoding.
Package base62 implements base62 encoding.
Code generated by build process.
Code generated by build process.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL