Documentation
¶
Index ¶
- Variables
- func CheckVersion(ctx context.Context, nc *nats.Conn) error
- func ContextLoggerWithWfState(ctx context.Context, state *model.WorkflowState) (context.Context, *slog.Logger)
- func CopyWorkflowState(state *model.WorkflowState) *model.WorkflowState
- func Create(ctx context.Context, wf jetstream.KeyValue, k string, v []byte) error
- func CreateObj(ctx context.Context, wf jetstream.KeyValue, k string, v proto.Message) error
- func CreateOrUpdateNatsWaitGroup(ctx context.Context, js jetstream.JetStream, name string, stream string, ...) (jetstream.ConsumeContext, error)
- func Delete(ctx context.Context, kv jetstream.KeyValue, key string) error
- func DeleteLarge(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, ...) error
- func DropStateParams(state *model.WorkflowState)
- func ElementTable(process *model.Workflow) map[string]*model.Element
- func EnsureBucket(ctx context.Context, js jetstream.JetStream, storageType jetstream.StorageType, ...) error
- func EnsureBuckets(ctx context.Context, js jetstream.JetStream, storageType jetstream.StorageType, ...) error
- func ExtendLock(ctx context.Context, kv jetstream.KeyValue, lockID string) error
- func GetCallerInfo(skip int) (info string)
- func Handle(ctx context.Context, dependencies HandlerDependencies, messageSpec MessageSpec) error
- func History(ctx context.Context, wf jetstream.KeyValue, k string) ([]jetstream.KeyValueEntry, error)
- func IndexProcessElements(elements []*model.Element, el map[string]*model.Element)
- func KSuidTo64bit(k string) [8]byte
- func KSuidTo128bit(k string) [16]byte
- func KeyPrefixSearch(ctx context.Context, js jetstream.JetStream, kv jetstream.KeyValue, ...) ([]string, error)
- func KeyPrefixSearchMap(ctx context.Context, js jetstream.JetStream, kv jetstream.KeyValue, ...) (map[string]struct{}, error)
- func Load(ctx context.Context, wf jetstream.KeyValue, k string) ([]byte, error)
- func LoadLarge(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, ...) ([]byte, error)
- func LoadLargeObj(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, ...) error
- func LoadObj(ctx context.Context, kvStore jetstream.KeyValue, k string, v proto.Message) error
- func Lock(ctx context.Context, kv jetstream.KeyValue, lockID string) (bool, error)
- func Log(ctx context.Context, js nats.JetStream, trackingID string, ...) error
- func MapToSlice[K comparable, V any, U any](input map[K]V, f func(k K, v V) U) []U
- func NewContextVarAwareHandler(h slog.Handler, contextVarKeys map[any]struct{}) (slog.Handler, error)
- func Process(ctx context.Context, js jetstream.JetStream, streamName string, ...) error
- func PublishOnce(ctx context.Context, js jetstream.JetStream, lockingKV jetstream.KeyValue, ...) error
- func SafeDelete(ctx context.Context, kv jetstream.KeyValue, key string) error
- func Save(ctx context.Context, wf jetstream.KeyValue, k string, v []byte) error
- func SaveLarge(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, ...) error
- func SaveLargeObj(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, ...) error
- func SaveObj(ctx context.Context, wf jetstream.KeyValue, k string, v proto.Message) error
- func SeekElement(process *model.Workflow, id string) *model.Element
- func SliceToMap[K comparable, V any](input []V, f func(v V) K) map[K]V
- func StreamingReplyClient(ctx context.Context, nc *nats.Conn, msg *nats.Msg, ...) error
- func StreamingReplyServer(nc streamNatsReplyconnection, subject string, ...) (*nats.Subscription, error)
- func TransformSlice[T any, U any](input []T, f func(v T) U) []U
- func UnLock(ctx context.Context, kv jetstream.KeyValue, lockID string) error
- func UpdateLargeObj[T proto.Message](ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, ...) (T, error)
- func UpdateObj[T proto.Message](ctx context.Context, wf jetstream.KeyValue, k string, msg T, ...) error
- func UpdateObjIsNew[T proto.Message](ctx context.Context, wf jetstream.KeyValue, k string, msg T, ...) (bool, error)
- type BackoffFn
- type BackoffProcessOption
- type ConsumerCfgOption
- type ContextVarAwareHandler
- func (cvah *ContextVarAwareHandler) Enabled(ctx context.Context, level slog.Level) bool
- func (cvah *ContextVarAwareHandler) Handle(ctx context.Context, r slog.Record) error
- func (cvah *ContextVarAwareHandler) WithAttrs(attrs []slog.Attr) slog.Handler
- func (cvah *ContextVarAwareHandler) WithGroup(name string) slog.Handler
- type HandlerDependencies
- type KeyPrefixResultOpts
- type MessageSpec
- type NatsConn
- type NatsMsgWrapper
- func (w *NatsMsgWrapper) Ack() error
- func (w *NatsMsgWrapper) Data() []byte
- func (w *NatsMsgWrapper) DoubleAck(context.Context) error
- func (w *NatsMsgWrapper) Headers() nats.Header
- func (w *NatsMsgWrapper) InProgress() error
- func (w *NatsMsgWrapper) Metadata() (*jetstream.MsgMetadata, error)
- func (w *NatsMsgWrapper) Nak() error
- func (w *NatsMsgWrapper) NakWithDelay(delay time.Duration) error
- func (w *NatsMsgWrapper) Reply() string
- func (w *NatsMsgWrapper) SetData(b []byte)
- func (w *NatsMsgWrapper) Subject() string
- func (w *NatsMsgWrapper) Term() error
- type ProcessOption
- type ProcessOpts
- type TrackingID
- type WaitGroupOpt
Constants ¶
This section is empty.
Variables ¶
var ErrStreamCancel = errors.New("stream cancelled")
ErrStreamCancel is an error variable that represents a stream cancellation.
Functions ¶
func CheckVersion ¶ added in v1.1.927
CheckVersion checks the NATS server version against a minimum supported version
func ContextLoggerWithWfState ¶ added in v1.1.754
func ContextLoggerWithWfState(ctx context.Context, state *model.WorkflowState) (context.Context, *slog.Logger)
ContextLoggerWithWfState will populate a context with relevant fields from a WorkflowState model
func CopyWorkflowState ¶ added in v1.0.446
func CopyWorkflowState(state *model.WorkflowState) *model.WorkflowState
CopyWorkflowState - clones a proto model.WorkflowState for modification.
func Create ¶ added in v1.1.1492
Create saves a value to a key value store erroring if the key is present
func CreateObj ¶ added in v1.1.1492
CreateObj save a protobuf message to a key value store erroring if the key existz
func CreateOrUpdateNatsWaitGroup ¶ added in v1.1.1532
func CreateOrUpdateNatsWaitGroup(ctx context.Context, js jetstream.JetStream, name string, stream string, groupPrefix string, rcvFn func(js jetstream.Stream, msg jetstream.Msg, n uint64) (bool, error), opts ...WaitGroupOpt) (jetstream.ConsumeContext, error)
CreateOrUpdateNatsWaitGroup allows a function to wait for a specific amount of messages to be received by NATS JetStream. A distributed lock is used to prevent race conditions. The provided function must execute with the lock time (30 seconds). An optional completion buffer can be provided which prevents resubmission of completed subjects. Once constructed, messages matching the prefix will trigger off the supplied function. If the function returns true, the message will be consumed, and any other messages with the same subject will be deleted.
func DeleteLarge ¶ added in v1.0.498
func DeleteLarge(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, k string) error
DeleteLarge deletes a large binary from the object store
func DropStateParams ¶ added in v1.0.451
func DropStateParams(state *model.WorkflowState)
DropStateParams removes any parameters unsafe to send across a state transition.
func ElementTable ¶ added in v0.1.134
ElementTable indexes an entire process for quick ID lookups
func EnsureBucket ¶ added in v1.0.477
func EnsureBucket(ctx context.Context, js jetstream.JetStream, storageType jetstream.StorageType, name string, ttl time.Duration) error
EnsureBucket creates a bucket if it does not exist
func EnsureBuckets ¶
func EnsureBuckets(ctx context.Context, js jetstream.JetStream, storageType jetstream.StorageType, names []string) error
EnsureBuckets ensures that a list of key value stores exist
func ExtendLock ¶ added in v1.0.477
ExtendLock extends the lock past its stale time.
func GetCallerInfo ¶ added in v1.1.1056
GetCallerInfo returns caller function info. 1 for current function, 2 for caller.
func Handle ¶ added in v1.1.1807
func Handle(ctx context.Context, dependencies HandlerDependencies, messageSpec MessageSpec) error
Handle the messages defined in the message spec
func History ¶ added in v1.1.1687
func History(ctx context.Context, wf jetstream.KeyValue, k string) ([]jetstream.KeyValueEntry, error)
History gets the history for a key value store
func IndexProcessElements ¶ added in v0.1.134
IndexProcessElements produces an index of process elements by ID
func KSuidTo64bit ¶ added in v0.1.78
KSuidTo64bit takes the most variable 64 bits of a KSuid and returns them as bytes.
func KSuidTo128bit ¶ added in v0.1.78
KSuidTo128bit returns a KSuid as bytes.
func KeyPrefixSearch ¶ added in v1.1.927
func KeyPrefixSearch(ctx context.Context, js jetstream.JetStream, kv jetstream.KeyValue, prefix string, opts KeyPrefixResultOpts) ([]string, error)
KeyPrefixSearch searches for keys in a key-value store that have a specified prefix. It retrieves the keys by querying the JetStream stream associated with the key-value store. It returns a slice of strings containing the keys, and an error if any.
func KeyPrefixSearchMap ¶ added in v1.1.1349
func KeyPrefixSearchMap(ctx context.Context, js jetstream.JetStream, kv jetstream.KeyValue, prefix string, opts KeyPrefixResultOpts) (map[string]struct{}, error)
KeyPrefixSearchMap searches for keys in a key-value store that have a specified prefix. It retrieves the keys by querying the JetStream stream associated with the key-value store. It returns a map of strings containing the keys, and an error if any.
func LoadLarge ¶ added in v1.0.498
func LoadLarge(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, key string, opt ...jetstream.GetObjectOpt) ([]byte, error)
LoadLarge load a large binary from the object store
func LoadLargeObj ¶ added in v1.0.498
func LoadLargeObj(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, k string, v proto.Message, opt ...jetstream.GetObjectOpt) error
LoadLargeObj loads a protobuf message from a key value store
func Lock ¶ added in v1.0.477
Lock ensures a lock on a given ID, it returns true if a lock was granted.
func Log ¶ added in v1.0.271
func Log(ctx context.Context, js nats.JetStream, trackingID string, source model.LogSource, severity messages.WorkflowLogLevel, code int32, message string, attrs map[string]string) error
Log is the generic metod to output to SHAR telemetry.
func MapToSlice ¶ added in v1.1.2120
func MapToSlice[K comparable, V any, U any](input map[K]V, f func(k K, v V) U) []U
MapToSlice applies a function to each element of a map and returns a new slice.
func NewContextVarAwareHandler ¶ added in v1.1.1500
func NewContextVarAwareHandler(h slog.Handler, contextVarKeys map[any]struct{}) (slog.Handler, error)
NewContextVarAwareHandler create new instance of ContextVarAwareHandler
func Process ¶ added in v0.1.78
func Process(ctx context.Context, js jetstream.JetStream, streamName string, traceName string, closer chan struct{}, subject string, durableConsumerName string, concurrency int, middleware []middleware.Receive, fn func(ctx context.Context, log *slog.Logger, msg jetstream.Msg) (bool, error), signalFatalErrFn func(ctx context.Context, state *model.WorkflowState, log *slog.Logger), opts ...ProcessOption) error
Process processes messages from a nats consumer and executes a function against each one.
func PublishOnce ¶ added in v1.1.732
func PublishOnce(ctx context.Context, js jetstream.JetStream, lockingKV jetstream.KeyValue, streamName string, consumerName string, publishFn func(context.Context) error) error
PublishOnce sets up a single message to be used as a timer.
func SafeDelete ¶ added in v1.1.1441
SafeDelete deletes a key from a JetStream KV ensuring the value is present, and the operation is idempotent.
func SaveLarge ¶ added in v1.0.498
func SaveLarge(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, key string, data []byte) error
SaveLarge saves a large binary from the object store
func SaveLargeObj ¶ added in v1.0.498
func SaveLargeObj(ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, k string, v proto.Message, opt ...nats.ObjectOpt) error
SaveLargeObj save an protobuf message to a document store
func SeekElement ¶ added in v1.1.1307
SeekElement locates an element from a workflow by ID
func SliceToMap ¶ added in v1.1.2120
func SliceToMap[K comparable, V any](input []V, f func(v V) K) map[K]V
SliceToMap applies a function to each element of a slice and returns a new map.
func StreamingReplyClient ¶ added in v1.1.975
func StreamingReplyClient(ctx context.Context, nc *nats.Conn, msg *nats.Msg, fn func(msg *nats.Msg) error) error
StreamingReplyClient establishes a streaming reply client. It creates a subscription for replies and invokes a callback function for each received message.
func StreamingReplyServer ¶ added in v1.1.975
func StreamingReplyServer(nc streamNatsReplyconnection, subject string, fn func(req *nats.Msg, ret chan *nats.Msg, errs chan error)) (*nats.Subscription, error)
StreamingReplyServer is a function that sets up a NATS subscription to handle streaming reply messages. When a message is received, it begins streaming by creating channels for return messages and error messages. It then executes the provided function to process the request and send the response messages. The function runs in a separate goroutine. It continuously listens for return messages and error messages, and publishes them to the reply inbox. It exits when an error or cancellation occurs. The function returns the NATS subscription and any error that occurred during setup.
func TransformSlice ¶ added in v1.1.2120
TransformSlice applies a function to each element of a slice and returns a new slice.
func UpdateLargeObj ¶ added in v1.0.498
func UpdateLargeObj[T proto.Message](ctx context.Context, ds jetstream.ObjectStore, mutex jetstream.KeyValue, k string, msg T, updateFn func(v T) (T, error)) (T, error)
UpdateLargeObj saves an protobuf message to a document store after using updateFN to update the message.
func UpdateObj ¶
func UpdateObj[T proto.Message](ctx context.Context, wf jetstream.KeyValue, k string, msg T, updateFn func(v T) (T, error)) error
UpdateObj saves an protobuf message to a key value store after using updateFN to update the message.
func UpdateObjIsNew ¶ added in v1.0.446
func UpdateObjIsNew[T proto.Message](ctx context.Context, wf jetstream.KeyValue, k string, msg T, updateFn func(v T) (T, error)) (bool, error)
UpdateObjIsNew saves an protobuf message to a key value store after using updateFN to update the message, and returns true if this is a new value.
Types ¶
type BackoffFn ¶ added in v1.0.623
BackoffFn represents a function that completely handles the backoff for a message including ACK/NAK
type BackoffProcessOption ¶ added in v1.0.623
type BackoffProcessOption struct {
// contains filtered or unexported fields
}
BackoffProcessOption holds the backoff function. Don't use this directly. Use the convenience function WithBackoffFn
func WithBackoffFn ¶ added in v1.0.623
func WithBackoffFn(fn BackoffFn) BackoffProcessOption
WithBackoffFn adds a back-off function to message processing
func (BackoffProcessOption) Set ¶ added in v1.0.623
func (b BackoffProcessOption) Set(opts *ProcessOpts)
Set the backoff function in the process settings
type ConsumerCfgOption ¶ added in v1.1.1807
type ConsumerCfgOption interface {
Apply(cfg *jetstream.ConsumerConfig)
}
ConsumerCfgOption defines interface for specifying a particular consumer cfg option
func ListenFromPriorTime ¶ added in v1.1.1807
func ListenFromPriorTime(ago time.Duration) ConsumerCfgOption
ListenFromPriorTime defines how long ago a consumer should start reading from
type ContextVarAwareHandler ¶ added in v1.1.1500
type ContextVarAwareHandler struct {
// contains filtered or unexported fields
}
ContextVarAwareHandler is a slog handler that looks for the specified context var keys and then adds them as log record attributes
func (*ContextVarAwareHandler) Enabled ¶ added in v1.1.1500
Enabled implementation of slog.Handler interface
func (*ContextVarAwareHandler) Handle ¶ added in v1.1.1500
Handle implementation of slog.Handler interface
type HandlerDependencies ¶ added in v1.1.1807
HandlerDependencies are components required by the message handler
type KeyPrefixResultOpts ¶ added in v1.1.927
type KeyPrefixResultOpts struct {
Sort bool // Sort the returned values
ExcludeDeleted bool // ExcludeDeleted filters deleted key-values from the result (cost penalty)¬.
}
KeyPrefixResultOpts represents the options for KeyPrefixSearch function. Sort field indicates whether the returned values should be sorted. ExcludeDeleted field filters out deleted key-values from the result.
type MessageSpec ¶ added in v1.1.1807
type MessageSpec struct {
MessageDescription string
FilterSubject string
MaxConcurrency int
ConsumerCfgOptions []ConsumerCfgOption
PreProcessMessageFn func(context.Context, jetstream.Msg) context.Context
HandleMsgFn func(ctx context.Context, msg jetstream.Msg) error
}
MessageSpec defines the messages to be processed and how to process them
type NatsConn ¶
type NatsConn interface {
QueueSubscribe(subj string, queue string, cb nats.MsgHandler) (*nats.Subscription, error)
Publish(subj string, bytes []byte) error
PublishMsg(msg *nats.Msg) error
Subscribe(subj string, cb nats.MsgHandler) (*nats.Subscription, error)
}
NatsConn is the trimmed down NATS Connection interface that only encompasses the methods used by SHAR Why do we need this?
type NatsMsgWrapper ¶ added in v1.1.927
NatsMsgWrapper is a wrapper type that combines the jetstream.Msg and nats.Msg types.
func NewNatsMsgWrapper ¶ added in v1.1.927
func NewNatsMsgWrapper(msg *nats.Msg) *NatsMsgWrapper
NewNatsMsgWrapper is a function that creates a new instance of NatsMsgWrapper, which is a wrapper type that combines the jetstream.Msg and nats.Msg types.
func (*NatsMsgWrapper) Ack ¶ added in v1.1.927
func (w *NatsMsgWrapper) Ack() error
Ack is a method that acknowledges the receipt of the NATS message by calling the underlying nats.Msg's Ack method. If an error occurs while acknowledging the message, it returns an error with a message indicating the failure. Returns nil if the acknowledgement is successful.
func (*NatsMsgWrapper) Data ¶ added in v1.1.927
func (w *NatsMsgWrapper) Data() []byte
Data is a method that retrieves the data from the underlying nats.Msg.
func (*NatsMsgWrapper) DoubleAck ¶ added in v1.1.927
func (w *NatsMsgWrapper) DoubleAck(context.Context) error
DoubleAck is a method that simulates a double acknowledgement of the NATS message. It returns an error with a message indicating that double ack is not allowed.
func (*NatsMsgWrapper) Headers ¶ added in v1.1.927
func (w *NatsMsgWrapper) Headers() nats.Header
Headers is a method that retrieves the headers from the underlying nats.Msg.
func (*NatsMsgWrapper) InProgress ¶ added in v1.1.927
func (w *NatsMsgWrapper) InProgress() error
InProgress is a method that indicates that the message is still in progress.
func (*NatsMsgWrapper) Metadata ¶ added in v1.1.927
func (w *NatsMsgWrapper) Metadata() (*jetstream.MsgMetadata, error)
Metadata is a method that retrieves the metadata from the underlying nats.Msg.
func (*NatsMsgWrapper) Nak ¶ added in v1.1.927
func (w *NatsMsgWrapper) Nak() error
Nak is a method that nak's the message..
func (*NatsMsgWrapper) NakWithDelay ¶ added in v1.1.927
func (w *NatsMsgWrapper) NakWithDelay(delay time.Duration) error
NakWithDelay is a method that nak's the message, and will not re-process before delay.
func (*NatsMsgWrapper) Reply ¶ added in v1.1.927
func (w *NatsMsgWrapper) Reply() string
Reply is a method that retrieves the reply from the underlying nats.Msg.
func (*NatsMsgWrapper) SetData ¶ added in v1.1.927
func (w *NatsMsgWrapper) SetData(b []byte)
SetData is a method that sets the data of the underlying nats.Msg.
func (*NatsMsgWrapper) Subject ¶ added in v1.1.927
func (w *NatsMsgWrapper) Subject() string
Subject is a method that retrieves the subject from the underlying nats.Msg.
func (*NatsMsgWrapper) Term ¶ added in v1.1.927
func (w *NatsMsgWrapper) Term() error
Term is a method that calls the `Term` method on the underlying `NatsMsgWrapper` instance.
type ProcessOption ¶ added in v1.0.623
type ProcessOption interface {
Set(opts *ProcessOpts)
}
ProcessOption represents an option function that can be passed to message processing.
type ProcessOpts ¶ added in v1.0.623
type ProcessOpts struct {
BackoffCalc BackoffFn
}
ProcessOpts holds the settings for message processing.
type TrackingID ¶ added in v1.0.215
type TrackingID []string
TrackingID is an ID stack that maintains the callstack
func (TrackingID) Ancestor ¶ added in v1.0.215
func (t TrackingID) Ancestor(gen int) string
Ancestor provides the ID of the caller back <gen> generations.
func (TrackingID) NewID ¶ added in v1.1.1099
func (t TrackingID) NewID() TrackingID
NewID adds a new random ID to the trackingID
func (TrackingID) ParentID ¶ added in v1.0.215
func (t TrackingID) ParentID() string
ParentID provides the ID of the caller.
func (TrackingID) Pop ¶ added in v1.0.215
func (t TrackingID) Pop() TrackingID
Pop removes the current ID from the callstack.
func (TrackingID) Push ¶ added in v1.0.215
func (t TrackingID) Push(id string) TrackingID
Push adds a new ID to the callstack.
type WaitGroupOpt ¶ added in v1.1.1532
type WaitGroupOpt func(cfg *waitgroupOpts)
WaitGroupOpt Provides options to CreateOrUpdateNatsWaitGroup
func WithCompletionBuffer ¶ added in v1.1.1532
func WithCompletionBuffer(name string, duration time.Duration) WaitGroupOpt
WithCompletionBuffer prevents future calls to a WaitGroup subject for a specific duration.
func WithLockTimeout ¶ added in v1.1.1532
func WithLockTimeout(duration time.Duration) WaitGroupOpt
WithLockTimeout specifies the time to wait before unlocking a subject group in a WaitGroup.
func WithMemoryStorage ¶ added in v1.1.1532
func WithMemoryStorage() WaitGroupOpt
WithMemoryStorage specifies that the WaitGroup should use memory storage for ephemeral behaviour.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package base62 implements base62 encoding.
|
Package base62 implements base62 encoding. |
|
Code generated by build process.
|
Code generated by build process. |