memorystream

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 6, 2025 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package memorystream provides an in-memory implementation of eventstream.Stream.

Index

Constants

View Source
const DefaultBufferSize = 100

DefaultBufferSize is the default number of recent events to buffer in memory.

Variables

This section is empty.

Functions

This section is empty.

Types

type Stream

type Stream struct {
	// App is the identity of the application that owns the stream.
	App configkit.Identity

	// Types is the set of supported event types.
	Types *sets.Set[message.Type]

	// FirstOffset is the first offset that will be kept in this stream.
	//
	// This value is used to quickly reject Open() calls that ask for events
	// that will never be buffered, rather than waiting until they timeout, or
	// worse yet blocking forever if their context has no deadline.
	//
	// When the stream is used as a recent event cache, this should be set to
	// the next unused offset in the event repository before new events are
	// allowed to be produced.
	FirstOffset uint64

	// BufferSize is the maximum number of messages to buffer in memory. If it
	// is non-positive, DefaultBufferSize is used.
	//
	// When the number of buffered events exceeds this limit, the oldest nodes
	// in the buffer are truncated until the size falls below the limit again.
	BufferSize int
	// contains filtered or unexported fields
}

Stream is an implementation of eventstream.Stream that reads events from in an in-memory buffer.

It is primarily intended as a cache of the most recent events from an application's event repository. Although the implementation details reflect and favor that use case, it is a well-behaved stream implementation that can also be used for testing and prototyping.

The stream is "self truncating", dropping the oldest events when the size exceeds a pre-defined limit.

The stream's buffer is implemented as an append-only singly-linked list. Each node in the list contains a single event. A linked list is used to allow the oldest events to be truncated from the buffer without invalidating cursors that may still be iterating through those events. Coupled with the use of atomic operations for reading the head and tail of the linked list, the stream's cursor implementation is lock-free.

func (*Stream) Add

func (s *Stream) Add(events []eventstream.Event)

Add events to the stream's buffer.

The events do not necessarily have to be in order. Any out of order events are added to the "reordering" queue until sufficient events have been added to close the "gap", at which point they are made visible to open cursors.

func (*Stream) Append

func (s *Stream) Append(parcels ...parcel.Parcel)

Append adds events to the tail of the stream.

func (*Stream) Application

func (s *Stream) Application() configkit.Identity

Application returns the identity of the application that owns the stream.

func (*Stream) EventTypes

func (s *Stream) EventTypes(context.Context) (*sets.Set[message.Type], error)

EventTypes returns the set of event types that may appear on the stream.

func (*Stream) Open

func (s *Stream) Open(
	ctx context.Context,
	o uint64,
	f *sets.Set[message.Type],
) (eventstream.Cursor, error)

Open returns a cursor that reads events from the stream.

o is the offset of the first event to read. The first event on a stream is always at offset 0.

f is the set of "filter" event types to be returned by Cursor.Next(). Any other event types are ignored.

It returns an error if any of the event types in f are not supported, as indicated by EventTypes().

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL