Documentation
¶
Overview ¶
Package eventstream provides abstractions for consuming ordered streams of event messages.
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ( // ErrCursorClosed is returned by Cursor.Next() and Close() if the // stream is closed. ErrCursorClosed = errors.New("stream cursor is closed") // ErrTruncated indicates that a cursor can not be opened because the requested // offset is on a portion of the event stream that has been truncated. ErrTruncated = errors.New("can not open cursor, stream is truncated") )
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer struct {
// Stream is the event stream to consume.
Stream Stream
// EventTypes is the set of event types that the handler consumes.
EventTypes *sets.Set[message.Type]
// Handler is the target for the events from the stream.
Handler Handler
// Semaphore is used to limit the number of messages being handled
// concurrently.
Semaphore *semaphore.Weighted
// BackoffStrategy is the strategy used to delay restarting the consumer
// after a failure. If it is nil, backoff.DefaultStrategy is used.
BackoffStrategy backoff.Strategy
// Logger is the target for log messages from the consumer.
// If it is nil, logging.DefaultLogger is used.
Logger logging.Logger
// contains filtered or unexported fields
}
Consumer reads events from a stream in order to handle them.
type Cursor ¶
type Cursor interface {
// Next returns the next event in the stream that matches the filter.
//
// If the end of the stream is reached it blocks until a relevant event is
// appended to the stream or ctx is canceled.
//
// If the stream is closed before or during a call to Next(), it returns
// ErrCursorClosed.
//
// It returns ErrTruncated if the next event can not be obtained because it
// occupies a portion of the stream that has been truncated.
Next(ctx context.Context) (Event, error)
// Close discards the cursor.
//
// It returns ErrCursorClosed if the cursor is already closed.
// Any current or future calls to Next() return ErrCursorClosed.
Close() error
}
A Cursor reads events from a stream.
Cursors are not safe for concurrent use.
type Event ¶
type Event struct {
// Offset is the 0-based index of the event on the stream.
Offset uint64
// Parcel contains the event from the stream.
Parcel parcel.Parcel
}
Event is a container for an envelope and event stream specific meta-data.
type Handler ¶
type Handler interface {
// NextOffset returns the offset of the next event to be consumed from a
// specific application's event stream.
//
// id is the identity of the source application.
NextOffset(ctx context.Context, id configkit.Identity) (uint64, error)
// HandleEvent handles an event obtained from the event stream.
//
// o must be the offset that would be returned by NextOffset(). On success,
// the next call to NextOffset() will return ev.Offset + 1.
HandleEvent(ctx context.Context, o uint64, ev Event) error
}
Handler handles events consumed from a stream.
type Stream ¶
type Stream interface {
// Application returns the identity of the application that owns the stream.
Application() configkit.Identity
// EventTypes returns the set of event types that may appear on the stream.
EventTypes(ctx context.Context) (*sets.Set[message.Type], error)
// Open returns a cursor that reads events from the stream.
//
// o is the offset of the first event to read. The first event on a stream
// is always at offset 0.
//
// f is the set of "filter" event types to be returned by Cursor.Next(). Any
// other event types are ignored.
//
// It returns an error if any of the event types in f are not supported, as
// indicated by EventTypes().
Open(ctx context.Context, o uint64, f *sets.Set[message.Type]) (Cursor, error)
}
A Stream is an ordered sequence of event messages.
Directories
¶
| Path | Synopsis |
|---|---|
|
internal
|
|
|
streamtest
Package streamtest contains a common test suite for eventstream.Stream implementations.
|
Package streamtest contains a common test suite for eventstream.Stream implementations. |
|
Package memorystream provides an in-memory implementation of eventstream.Stream.
|
Package memorystream provides an in-memory implementation of eventstream.Stream. |
|
Package networkstream presents a persistence.EventRepository as an eventstream.Stream via the dogma.messaging.v1 EventStream service.
|
Package networkstream presents a persistence.EventRepository as an eventstream.Stream via the dogma.messaging.v1 EventStream service. |
|
Package persistedstream provides an implementation of eventstream.Stream that streams persisted events from an event repository.
|
Package persistedstream provides an implementation of eventstream.Stream that streams persisted events from an event repository. |
Click to show internal directories.
Click to hide internal directories.