Documentation
¶
Index ¶
- Variables
- func IsValidConsumerName(name string) bool
- func IsValidStreamName(name string) bool
- func IsValidSubject(subject string, allowWildcards bool) bool
- func IsValidationError(err error) bool
- type Consumer
- type ConsumerConfig
- type DiscardPolicy
- type Event
- type EventConsumeConfig
- type Events
- type Headers
- type Manager
- func (m *Manager) EnsureConsumer(ctx context.Context, config *ConsumerConfig) (*Consumer, error)
- func (m *Manager) EnsureStream(ctx context.Context, config *StreamConfig) (*Stream, error)
- func (m *Manager) Events(ctx context.Context, config *EventConsumeConfig) (*Events, error)
- func (m *Manager) Publish(ctx context.Context, config *PublishConfig) (*PublishedEvent, error)
- type PublishConfig
- type PublishedEvent
- type StorageType
- type Stream
- type StreamConfig
- type StreamPointer
- type StreamSource
- type ZapConsumerConfig
- type ZapStreamConfig
- type ZapStreamSource
Constants ¶
This section is empty.
Variables ¶
var ErrPublishTimeout = errors.New("publish timeout")
ErrPublishTimeout is used when publishing of an event does not meet the deadline.
var ErrUnboundSubject = errors.New("unbound subject, no stream found for subject")
ErrUnboundSubject is used when a subject is not bound to a stream.
var ErrWrongSequence = errors.New("wrong sequence")
ErrWrongSequence is used when the expected sequence number does not match the actual sequence number.
var Module = fx.Module( "events", fx.Provide(sprout.Logger("events"), fx.Private), fx.Provide(sprout.ServiceTracer(), fx.Private), fx.Provide(NewManager), )
Module for FX that enables events.
Functions ¶
func IsValidConsumerName ¶
IsValidConsumerName checks if the consumer name is valid.
NATS provides the guidance that durable names of consumers cannot contain whitespace, `.`, `*`, `>`, path separators (forward or backwards slash), and non-printable characters.
In Windshift consumer names are aligned with subject names and allow only the characters `a`-`z`, `A`-`Z`, `0`-`9`, `_`, and `-`.
Empty consumer names are not allowed.
func IsValidStreamName ¶
IsValidStreamName checks if the stream name is valid.
NATS provides the guidance that stream names cannot contain whitespace, `.`, `*`, `>“, path separators (forward or backwards slash), and non-printable characters.
In Windshift stream names are aligned with subject names and allow only the characters `a`-`z`, `A`-`Z`, `0`-`9`, `_`, and `-`.
Empty stream names are not allowed.
func IsValidSubject ¶
IsValidSubject checks if the subject is valid, either allowing or disallowing wildcards.
NATS recommends ASCII characters for subjects, but does not enforce it. In Windshift subjects are limited to the characters `a`-`z`, `A`-`Z`, `0`-`9`, `_`, `-`, and `.`.
If wildcards are allowed, then `*` and `>` are also allowed. `*` can be used to match a single token, and `>` can be used to match one or more tokens but only at the end of the subject.
Period is a special character that indicates a subject hierarchy. We validate that the subject does not start or end with a period, and that periods are not adjacent.
Empty subjects are not allowed.
func IsValidationError ¶
Types ¶
type Consumer ¶
type Consumer struct {
// ID is the ID of the consumer.
ID string
}
Consumer describes a consumer of events from a stream.
type ConsumerConfig ¶
type ConsumerConfig struct {
// Name of the consumer. If empty, an ephemeral consumer will be created.
Name string
// Stream to consume events from. Must be specified and should be created
// using Manager.EnsureStream().
Stream string
// Subjects to consume events from. At least one subject must be specified,
// and multiple subjects are supported from NATS 2.10+.
Subjects []string
// Timeout is the timeout for processing an event. If not specified, the
// default timeout of 30 seconds will be used.
Timeout time.Duration
// MaxDeliveryAttempts is the maximum number of times an event will be
// delivered before it is considered failed.
MaxDeliveryAttempts uint
// From describes where to start consuming from. If not specified, the
// default policy is to only consume new events.
From *StreamPointer
}
ConsumerConfig is the configuration for creating a consumer.
type DiscardPolicy ¶
type DiscardPolicy int
DiscardPolicy controls the policy for discarding messages when the stream reaches its maximum size.
const ( // DiscardPolicyOld discards old messages when the stream reaches its // maximum size. DiscardPolicyOld DiscardPolicy = iota // DiscardPolicyNew discards new messages when the stream reaches its // maximum size. DiscardPolicyNew )
type Event ¶
type Event struct {
// Context is the context of this event. It will be valid until the event
// expires, is acknowledged or rejected.
Context context.Context
// Subject is the subject the event was published to.
Subject string
// ConsumerSeq is the sequence number of the event.
ConsumerSeq uint64
// StreamSeq is the sequence number of the event in the event stream. Can
// be used for resuming from a certain point in time. For example with an
// ephemeral consumer, the consumer can store the last seen StreamSeq and
// resume from there on the next run.
StreamSeq uint64
// DeliveryAttempt is the number of times the event has been delivered to
// a consumer. The first delivery is 1.
DeliveryAttempt uint64
// Headers contains the headers of the event.
Headers *Headers
// Data is the protobuf message published by the producer.
Data *anypb.Any
// contains filtered or unexported fields
}
Event represents a single event consumed from a stream. It is received via NATS and should be processed within a certain deadline, using Ack() or Reject(shouldRetry) to acknowledge the event. If the deadline is exceeded, the event will be redelivered. To extend the deadline, use Ping().
func (*Event) DiscardData ¶
func (e *Event) DiscardData()
DiscardData discards the data of the event. This should be called if the event data is not needed anymore. Acknowledging or rejecting the event will continue working after this.
func (*Event) Ping ¶
Ping extends the deadline of the event. This should be called periodically to prevent the event from being redelivered.
func (*Event) RejectPermanently ¶
RejectPermanently permanently rejects the event. The event will be removed and no redelivery will be attempted.
type EventConsumeConfig ¶
type EventConsumeConfig struct {
// Stream is the name of the stream to consume events from.
Stream string
// Name is the name of the consumer to consume events from.
Name string
// MaxPendingEvents is the maximum number of events that can be pending
// before the consumer will stop receiving events. If set to 0, a default
// value of 50 will be used.
MaxPendingEvents uint
}
EventConsumeConfig is the configuration for consuming events from a stream.
type Events ¶
type Events struct {
// Timeout is the timeout for processing an event. Will be fetched
// from the consumer configuration.
Timeout time.Duration
// contains filtered or unexported fields
}
Events is used to receive events from a stream. Events are tied to an an already defined consumer which has been previously defined using Manager.EnsureConsumer().
type Headers ¶
type Headers struct {
// PublishedAt is the time the event was published by the producer.
PublishedAt time.Time
// IdempotencyKey is the idempotency key of the event. An idempotency key
// is used to ensure that an event is only published once. May be empty.
IdempotencyKey *string
// TraceParent is the trace parent of the event. May be empty.
TraceParent *string
// TraceState is the trace state of the event. May be empty.
TraceState *string
}
Headers contains information about an event.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager is used to manage everything related to events, streams, consumers, event publishing, and event consuming.
func NewManager ¶
func NewManager( logger *zap.Logger, tracer trace.Tracer, js jetstream.JetStream, ) (*Manager, error)
NewManager creates a new event manager.
func (*Manager) EnsureConsumer ¶
EnsureConsumer ensures that a consumer exists for the specified stream and name. Name can be empty, in which case an ephemeral consumer will be created. If the consumer already exists, it will be updated with the specified configuration.
func (*Manager) EnsureStream ¶
EnsureStream ensures that a JetStream stream exists with the given configuration. If the stream already exists, it will be updated with the new configuration.
func (*Manager) Events ¶
Events creates a new event consumer for the specified stream and consumer. The consumer must have been previously created using Manager.EnsureConsumer().
The returned Events instance must be closed when it is no longer needed. This will stop the event consumer and wait for any pending events to be processed.
This will honor the context passed in, and will stop the event consumer when the context is done.
Example:
events, err := manager.Events(ctx, &events.EventConsumeConfig{
Stream: "my-stream",
Name: "my-consumer",
})
if err != nil {
// Handle error
}
defer events.Close()
for event := range events.Incoming() {
// Handle event
}
func (*Manager) Publish ¶
func (m *Manager) Publish(ctx context.Context, config *PublishConfig) (*PublishedEvent, error)
Publish publishes an event to a stream.
type PublishConfig ¶
type PublishConfig struct {
// Subject to publish event to
Subject string
// Data to publish
Data *anypb.Any
// ExpectedSubjectSeq is the expected sequence number of the subject.
ExpectedSubjectSeq *uint64
// PublishedTime is the time the event was published. If nil, the current time will be used.
PublishedTime *time.Time
// IdempotencyKey is the idempotency key for the event. If empty, the event will not be idempotent.
IdempotencyKey string
}
PublishConfig is the configuration for publishing an event.
type PublishedEvent ¶
type PublishedEvent struct {
// ID is the sequence number of the event.
ID uint64
}
PublishedEvent contains information about a published event.
type StorageType ¶
type StorageType int
StorageType controls the type of storage to use for a stream.
const ( // StorageTypeFile uses file storage for the stream. StorageTypeFile StorageType = iota // StorageTypeMemory uses memory storage for the stream. StorageTypeMemory )
type StreamConfig ¶
type StreamConfig struct {
// Name of the stream.
Name string
// MaxAge is the maximum age of a message before it is considered stale.
MaxAge time.Duration
// MaxMsgs is the maximum number of messages to retain in the stream.
MaxMsgs uint
// MaxBytes is the maximum number of bytes to retain in the stream.
MaxBytes uint
// MaxMsgsPerSubject is the maximum number of messages to retain per subject.
MaxMsgsPerSubject uint
// DiscardPolicy controls the policy for discarding messages when the
// stream reaches its maximum size.
DiscardPolicy DiscardPolicy
// DiscardNewPerSubject controls whether the discard policy applies to
// each subject individually, or to the stream as a whole. Only used when
// DiscardPolicy is set to DiscardPolicy.New.
DiscardNewPerSubject bool
// Subjects that the stream will receive events from.
Subjects []string
// Mirror makes it so this stream mirrors messages from another stream.
Mirror *StreamSource
// Sources is a list of streams to receive events from.
Sources []*StreamSource
// StorageType is the type of storage to use for the stream.
StorageType StorageType
// Replicas is the number of replicas to keep of the stream.
Replicas *uint
// DeduplicationWindow is the amount of time to keep idempotency keys.
DeduplicationWindow *time.Duration
// MaxEventSize is the maximum size of an event.
MaxEventSize *uint
}
StreamConfig is the configuration for creating or updating a stream.
type StreamPointer ¶
type StreamPointer struct {
// ID is the ID of the message to start consuming from.
ID uint64
// Time is the time to start consuming from.
Time time.Time
// First indicates that the consumer should start consuming from the
// first message in the stream.
First bool
}
StreamPointer is a pointer to a position in a stream. It is used when creating a consumer to specify where to start consuming from.
Only one of the fields should be set. If no field is set the policy is interpreted as an intent to only consume new events.
type StreamSource ¶
type StreamSource struct {
// Name of the stream to mirror.
Name string
// From is the position in the stream to start mirroring from.
From *StreamPointer
// FilterSubjects is a list of subjects to filter messages from.
FilterSubjects []string
}
StreamSource defines a source for events in a stream. It is used for mirroring and sourcing events from other streams.
type ZapConsumerConfig ¶
type ZapConsumerConfig jetstream.ConsumerConfig
ZapConsumerConfig is a wrapper around jetstream.ConsumerConfig that makes it loggable in a structured way.
func (*ZapConsumerConfig) MarshalLogObject ¶
func (c *ZapConsumerConfig) MarshalLogObject(enc zapcore.ObjectEncoder) error
type ZapStreamConfig ¶
type ZapStreamConfig jetstream.StreamConfig
func (*ZapStreamConfig) MarshalLogObject ¶
func (c *ZapStreamConfig) MarshalLogObject(enc zapcore.ObjectEncoder) error
type ZapStreamSource ¶
type ZapStreamSource jetstream.StreamSource
func (*ZapStreamSource) MarshalLogObject ¶
func (s *ZapStreamSource) MarshalLogObject(enc zapcore.ObjectEncoder) error