Documentation
¶
Overview ¶
Package memorystream provides an in-memory implementation of eventstream.Stream.
Index ¶
- Constants
- type Stream
- func (s *Stream) Add(events []eventstream.Event)
- func (s *Stream) Append(parcels ...parcel.Parcel)
- func (s *Stream) Application() configkit.Identity
- func (s *Stream) EventTypes(context.Context) (*sets.Set[message.Type], error)
- func (s *Stream) Open(ctx context.Context, o uint64, f *sets.Set[message.Type]) (eventstream.Cursor, error)
Constants ¶
const DefaultBufferSize = 100
DefaultBufferSize is the default number of recent events to buffer in memory.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Stream ¶
type Stream struct {
// App is the identity of the application that owns the stream.
App configkit.Identity
// Types is the set of supported event types.
Types *sets.Set[message.Type]
// FirstOffset is the first offset that will be kept in this stream.
//
// This value is used to quickly reject Open() calls that ask for events
// that will never be buffered, rather than waiting until they timeout, or
// worse yet blocking forever if their context has no deadline.
//
// When the stream is used as a recent event cache, this should be set to
// the next unused offset in the event repository before new events are
// allowed to be produced.
FirstOffset uint64
// BufferSize is the maximum number of messages to buffer in memory. If it
// is non-positive, DefaultBufferSize is used.
//
// When the number of buffered events exceeds this limit, the oldest nodes
// in the buffer are truncated until the size falls below the limit again.
BufferSize int
// contains filtered or unexported fields
}
Stream is an implementation of eventstream.Stream that reads events from in an in-memory buffer.
It is primarily intended as a cache of the most recent events from an application's event repository. Although the implementation details reflect and favor that use case, it is a well-behaved stream implementation that can also be used for testing and prototyping.
The stream is "self truncating", dropping the oldest events when the size exceeds a pre-defined limit.
The stream's buffer is implemented as an append-only singly-linked list. Each node in the list contains a single event. A linked list is used to allow the oldest events to be truncated from the buffer without invalidating cursors that may still be iterating through those events. Coupled with the use of atomic operations for reading the head and tail of the linked list, the stream's cursor implementation is lock-free.
func (*Stream) Add ¶
func (s *Stream) Add(events []eventstream.Event)
Add events to the stream's buffer.
The events do not necessarily have to be in order. Any out of order events are added to the "reordering" queue until sufficient events have been added to close the "gap", at which point they are made visible to open cursors.
func (*Stream) Application ¶
Application returns the identity of the application that owns the stream.
func (*Stream) EventTypes ¶
EventTypes returns the set of event types that may appear on the stream.
func (*Stream) Open ¶
func (s *Stream) Open( ctx context.Context, o uint64, f *sets.Set[message.Type], ) (eventstream.Cursor, error)
Open returns a cursor that reads events from the stream.
o is the offset of the first event to read. The first event on a stream is always at offset 0.
f is the set of "filter" event types to be returned by Cursor.Next(). Any other event types are ignored.
It returns an error if any of the event types in f are not supported, as indicated by EventTypes().