events

package
v0.19.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 12, 2024 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrPublishTimeout = errors.New("publish timeout")

ErrPublishTimeout is used when publishing of an event does not meet the deadline.

View Source
var ErrUnboundSubject = errors.New("unbound subject, no stream found for subject")

ErrUnboundSubject is used when a subject is not bound to a stream.

View Source
var ErrWrongSequence = errors.New("wrong sequence")

ErrWrongSequence is used when the expected sequence number does not match the actual sequence number.

Module for FX that enables events.

Functions

func IsValidConsumerName

func IsValidConsumerName(name string) bool

IsValidConsumerName checks if the consumer name is valid.

NATS provides the guidance that durable names of consumers cannot contain whitespace, `.`, `*`, `>`, path separators (forward or backwards slash), and non-printable characters.

In Windshift consumer names are aligned with subject names and allow only the characters `a`-`z`, `A`-`Z`, `0`-`9`, `_`, and `-`.

Empty consumer names are not allowed.

func IsValidStreamName

func IsValidStreamName(name string) bool

IsValidStreamName checks if the stream name is valid.

NATS provides the guidance that stream names cannot contain whitespace, `.`, `*`, `>“, path separators (forward or backwards slash), and non-printable characters.

In Windshift stream names are aligned with subject names and allow only the characters `a`-`z`, `A`-`Z`, `0`-`9`, `_`, and `-`.

Empty stream names are not allowed.

func IsValidSubject

func IsValidSubject(subject string, allowWildcards bool) bool

IsValidSubject checks if the subject is valid, either allowing or disallowing wildcards.

NATS recommends ASCII characters for subjects, but does not enforce it. In Windshift subjects are limited to the characters `a`-`z`, `A`-`Z`, `0`-`9`, `_`, `-`, and `.`.

If wildcards are allowed, then `*` and `>` are also allowed. `*` can be used to match a single token, and `>` can be used to match one or more tokens but only at the end of the subject.

Period is a special character that indicates a subject hierarchy. We validate that the subject does not start or end with a period, and that periods are not adjacent.

Empty subjects are not allowed.

func IsValidationError

func IsValidationError(err error) bool

Types

type Consumer

type Consumer struct {
	// ID is the ID of the consumer.
	ID string
}

Consumer describes a consumer of events from a stream.

type ConsumerConfig

type ConsumerConfig struct {
	// Name of the consumer. If empty, an ephemeral consumer will be created.
	Name string
	// Stream to consume events from. Must be specified and should be created
	// using Manager.EnsureStream().
	Stream string
	// Subjects to consume events from. At least one subject must be specified,
	// and multiple subjects are supported from NATS 2.10+.
	Subjects []string

	// Timeout is the timeout for processing an event. If not specified, the
	// default timeout of 30 seconds will be used.
	Timeout time.Duration

	// MaxDeliveryAttempts is the maximum number of times an event will be
	// delivered before it is considered failed.
	MaxDeliveryAttempts uint

	// From describes where to start consuming from. If not specified, the
	// default policy is to only consume new events.
	From *StreamPointer
}

ConsumerConfig is the configuration for creating a consumer.

type DiscardPolicy

type DiscardPolicy int

DiscardPolicy controls the policy for discarding messages when the stream reaches its maximum size.

const (
	// DiscardPolicyOld discards old messages when the stream reaches its
	// maximum size.
	DiscardPolicyOld DiscardPolicy = iota
	// DiscardPolicyNew discards new messages when the stream reaches its
	// maximum size.
	DiscardPolicyNew
)

type Event

type Event struct {

	// Context is the context of this event. It will be valid until the event
	// expires, is acknowledged or rejected.
	Context context.Context

	// Subject is the subject the event was published to.
	Subject string

	// ConsumerSeq is the sequence number of the event.
	ConsumerSeq uint64

	// StreamSeq is the sequence number of the event in the event stream. Can
	// be used for resuming from a certain point in time. For example with an
	// ephemeral consumer, the consumer can store the last seen StreamSeq and
	// resume from there on the next run.
	StreamSeq uint64

	// DeliveryAttempt is the number of times the event has been delivered to
	// a consumer. The first delivery is 1.
	DeliveryAttempt uint64

	// Headers contains the headers of the event.
	Headers *Headers

	// Data is the protobuf message published by the producer.
	Data *anypb.Any
	// contains filtered or unexported fields
}

Event represents a single event consumed from a stream. It is received via NATS and should be processed within a certain deadline, using Ack() or Reject(shouldRetry) to acknowledge the event. If the deadline is exceeded, the event will be redelivered. To extend the deadline, use Ping().

func (*Event) Ack

func (e *Event) Ack() error

Ack acknowledges the event. The event will be removed from the consumer.

func (*Event) DiscardData

func (e *Event) DiscardData()

DiscardData discards the data of the event. This should be called if the event data is not needed anymore. Acknowledging or rejecting the event will continue working after this.

func (*Event) Ping

func (e *Event) Ping() error

Ping extends the deadline of the event. This should be called periodically to prevent the event from being redelivered.

func (*Event) Reject

func (e *Event) Reject() error

Reject rejects the event.

func (*Event) RejectPermanently

func (e *Event) RejectPermanently() error

RejectPermanently permanently rejects the event. The event will be removed and no redelivery will be attempted.

func (*Event) RejectWithDelay

func (e *Event) RejectWithDelay(delay time.Duration) error

RejectWithDelay rejects the event with a delay. The event will be redelivered after the delay.

type EventConsumeConfig

type EventConsumeConfig struct {
	// Stream is the name of the stream to consume events from.
	Stream string
	// Name is the name of the consumer to consume events from.
	Name string
	// MaxPendingEvents is the maximum number of events that can be pending
	// before the consumer will stop receiving events. If set to 0, a default
	// value of 50 will be used.
	MaxPendingEvents uint
}

EventConsumeConfig is the configuration for consuming events from a stream.

type Events

type Events struct {

	// Timeout is the timeout for processing an event. Will be fetched
	// from the consumer configuration.
	Timeout time.Duration
	// contains filtered or unexported fields
}

Events is used to receive events from a stream. Events are tied to an an already defined consumer which has been previously defined using Manager.EnsureConsumer().

func (*Events) Close

func (q *Events) Close() error

Close closes the event consumer. Will stop receiving events and wait for pending events to be processed.

func (*Events) Incoming

func (q *Events) Incoming() <-chan *Event

Incoming returns the channel that events will be sent to.

type Headers

type Headers struct {
	// PublishedAt is the time the event was published by the producer.
	PublishedAt time.Time
	// IdempotencyKey is the idempotency key of the event. An idempotency key
	// is used to ensure that an event is only published once. May be empty.
	IdempotencyKey *string
	// TraceParent is the trace parent of the event. May be empty.
	TraceParent *string
	// TraceState is the trace state of the event. May be empty.
	TraceState *string
}

Headers contains information about an event.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager is used to manage everything related to events, streams, consumers, event publishing, and event consuming.

func NewManager

func NewManager(
	logger *zap.Logger,
	tracer trace.Tracer,
	js jetstream.JetStream,
) (*Manager, error)

NewManager creates a new event manager.

func (*Manager) EnsureConsumer

func (m *Manager) EnsureConsumer(ctx context.Context, config *ConsumerConfig) (*Consumer, error)

EnsureConsumer ensures that a consumer exists for the specified stream and name. Name can be empty, in which case an ephemeral consumer will be created. If the consumer already exists, it will be updated with the specified configuration.

func (*Manager) EnsureStream

func (m *Manager) EnsureStream(ctx context.Context, config *StreamConfig) (*Stream, error)

EnsureStream ensures that a JetStream stream exists with the given configuration. If the stream already exists, it will be updated with the new configuration.

func (*Manager) Events

func (m *Manager) Events(ctx context.Context, config *EventConsumeConfig) (*Events, error)

Events creates a new event consumer for the specified stream and consumer. The consumer must have been previously created using Manager.EnsureConsumer().

The returned Events instance must be closed when it is no longer needed. This will stop the event consumer and wait for any pending events to be processed.

This will honor the context passed in, and will stop the event consumer when the context is done.

Example:

events, err := manager.Events(ctx, &events.EventConsumeConfig{
	Stream: "my-stream",
	Name:   "my-consumer",
})
if err != nil {
	// Handle error
}
defer events.Close()

for event := range events.Incoming() {
	// Handle event
}

func (*Manager) Publish

func (m *Manager) Publish(ctx context.Context, config *PublishConfig) (*PublishedEvent, error)

Publish publishes an event to a stream.

type PublishConfig

type PublishConfig struct {
	// Subject to publish event to
	Subject string
	// Data to publish
	Data *anypb.Any
	// ExpectedSubjectSeq is the expected sequence number of the subject.
	ExpectedSubjectSeq *uint64
	// PublishedTime is the time the event was published. If nil, the current time will be used.
	PublishedTime *time.Time
	// IdempotencyKey is the idempotency key for the event. If empty, the event will not be idempotent.
	IdempotencyKey string
}

PublishConfig is the configuration for publishing an event.

type PublishedEvent

type PublishedEvent struct {
	// ID is the sequence number of the event.
	ID uint64
}

PublishedEvent contains information about a published event.

type StorageType

type StorageType int

StorageType controls the type of storage to use for a stream.

const (
	// StorageTypeFile uses file storage for the stream.
	StorageTypeFile StorageType = iota
	// StorageTypeMemory uses memory storage for the stream.
	StorageTypeMemory
)

type Stream

type Stream struct{}

Stream contains information about a stream.

type StreamConfig

type StreamConfig struct {
	// Name of the stream.
	Name string

	// MaxAge is the maximum age of a message before it is considered stale.
	MaxAge time.Duration
	// MaxMsgs is the maximum number of messages to retain in the stream.
	MaxMsgs uint
	// MaxBytes is the maximum number of bytes to retain in the stream.
	MaxBytes uint
	// MaxMsgsPerSubject is the maximum number of messages to retain per subject.
	MaxMsgsPerSubject uint

	// DiscardPolicy controls the policy for discarding messages when the
	// stream reaches its maximum size.
	DiscardPolicy DiscardPolicy
	// DiscardNewPerSubject controls whether the discard policy applies to
	// each subject individually, or to the stream as a whole. Only used when
	// DiscardPolicy is set to DiscardPolicy.New.
	DiscardNewPerSubject bool

	// Subjects that the stream will receive events from.
	Subjects []string
	// Mirror makes it so this stream mirrors messages from another stream.
	Mirror *StreamSource
	// Sources is a list of streams to receive events from.
	Sources []*StreamSource

	// StorageType is the type of storage to use for the stream.
	StorageType StorageType
	// Replicas is the number of replicas to keep of the stream.
	Replicas *uint

	// DeduplicationWindow is the amount of time to keep idempotency keys.
	DeduplicationWindow *time.Duration
	// MaxEventSize is the maximum size of an event.
	MaxEventSize *uint
}

StreamConfig is the configuration for creating or updating a stream.

type StreamPointer

type StreamPointer struct {
	// ID is the ID of the message to start consuming from.
	ID uint64
	// Time is the time to start consuming from.
	Time time.Time
	// First indicates that the consumer should start consuming from the
	// first message in the stream.
	First bool
}

StreamPointer is a pointer to a position in a stream. It is used when creating a consumer to specify where to start consuming from.

Only one of the fields should be set. If no field is set the policy is interpreted as an intent to only consume new events.

type StreamSource

type StreamSource struct {
	// Name of the stream to mirror.
	Name string
	// From is the position in the stream to start mirroring from.
	From *StreamPointer
	// FilterSubjects is a list of subjects to filter messages from.
	FilterSubjects []string
}

StreamSource defines a source for events in a stream. It is used for mirroring and sourcing events from other streams.

type ZapConsumerConfig

type ZapConsumerConfig jetstream.ConsumerConfig

ZapConsumerConfig is a wrapper around jetstream.ConsumerConfig that makes it loggable in a structured way.

func (*ZapConsumerConfig) MarshalLogObject

func (c *ZapConsumerConfig) MarshalLogObject(enc zapcore.ObjectEncoder) error

type ZapStreamConfig

type ZapStreamConfig jetstream.StreamConfig

func (*ZapStreamConfig) MarshalLogObject

func (c *ZapStreamConfig) MarshalLogObject(enc zapcore.ObjectEncoder) error

type ZapStreamSource

type ZapStreamSource jetstream.StreamSource

func (*ZapStreamSource) MarshalLogObject

func (s *ZapStreamSource) MarshalLogObject(enc zapcore.ObjectEncoder) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL