Documentation
¶
Overview ¶
Package journal provides an abstraction of an append-only log with optimistic concurrency control.
Index ¶
- func IgnoreNotFound(err error) error
- func IsConflict(err error) bool
- func IsEmpty[T any](ctx context.Context, j Journal[T]) (bool, error)
- func IsFresh[T any](ctx context.Context, j Journal[T]) (bool, error)
- func IsNotFound(err error) bool
- func RangeFromSearchResult[T any](ctx context.Context, j Journal[T], i Interval, cmp CompareFunc[T], ...) error
- func RunBenchmarks(b *testing.B, store BinaryStore)
- func RunTests(t *testing.T, store BinaryStore)
- func Scan[T, V any](ctx context.Context, j Journal[T], pos Position, scan ScanFunc[T, V]) (V, error)
- func ScanFromSearchResult[T, V any](ctx context.Context, j Journal[T], i Interval, cmp CompareFunc[T], ...) (V, error)
- type AdaptiveProbeFunc
- type BinaryInterceptor
- type BinaryJournal
- type BinaryRangeFunc
- type BinaryStore
- type CompareFunc
- type ConflictError
- type Interceptor
- type Interval
- type Journal
- type Position
- func AdaptiveSearch[T any](ctx context.Context, j Journal[T], i Interval, probe Position, ...) (Position, T, error)
- func AppendWithConflictResolution[T any](ctx context.Context, j Journal[T], end Position, rec T, ...) (Position, error)
- func FirstRecord[T any](ctx context.Context, j Journal[T]) (Position, T, bool, error)
- func LastRecord[T any](ctx context.Context, j Journal[T]) (Position, T, bool, error)
- func Search[T any](ctx context.Context, j Journal[T], i Interval, cmp CompareFunc[T]) (pos Position, rec T, err error)
- type RangeFunc
- type RecordNotFoundError
- type ScanFunc
- type Store
- type ValueNotFoundError
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IgnoreNotFound ¶ added in v0.9.3
IgnoreNotFound returns nil if err is a caused by RecordNotFoundError or ValueNotFoundError error. Otherwise it returns err unchanged.
func IsConflict ¶ added in v0.10.0
IsConflict returns true if err is caused by ConflictError.
func IsNotFound ¶ added in v0.10.0
IsNotFound returns true if err is caused by RecordNotFoundError or ValueNotFoundError.
func RangeFromSearchResult ¶ added in v0.9.2
func RangeFromSearchResult[T any]( ctx context.Context, j Journal[T], i Interval, cmp CompareFunc[T], fn RangeFunc[T], ) error
RangeFromSearchResult invokes fn for each record in the journal, in order, beginning with the record within the interval i for which cmp() returns zero.
It returns a ValueNotFoundError if there is no such record.
func RunBenchmarks ¶ added in v0.6.0
func RunBenchmarks( b *testing.B, store BinaryStore, )
RunBenchmarks runs benchmarks against a BinaryStore implementation.
func RunTests ¶
func RunTests( t *testing.T, store BinaryStore, )
RunTests runs tests that confirm a journal implementation behaves correctly.
func Scan ¶ added in v0.8.0
func Scan[T, V any]( ctx context.Context, j Journal[T], pos Position, scan ScanFunc[T, V], ) (V, error)
Scan finds a value of type V within j by scanning all records beginning with the record at the given position.
It returns a ValueNotFoundError if the value is not found.
This function is useful when the value being searched is not ordered, or when there are a small number of records to scan. If the records are structured in such a way that it's possible to know if the value appears before or after a specific record, use Search instead.
func ScanFromSearchResult ¶ added in v0.8.0
func ScanFromSearchResult[T, V any]( ctx context.Context, j Journal[T], i Interval, cmp CompareFunc[T], scan ScanFunc[T, V], ) (V, error)
ScanFromSearchResult finds a value within j by scanning all records beginning with the record for which a binary search of the interval i using cmp as the comparator returns true. See Scan and Search.
It returns a ValueNotFoundError if the value is not found.
Types ¶
type AdaptiveProbeFunc ¶ added in v0.16.0
type AdaptiveProbeFunc[T any] func( ctx context.Context, i Interval, pos Position, rec T, ) (next Position, found bool, err error)
AdaptiveProbeFunc is a function that determines the next probe position for an AdaptiveSearch.
i is the current search bracket — the half-open interval of journal positions still under consideration.
pos and rec are the position and record of the most recent probe.
If found is true, the record at pos is the target and next is ignored.
When found is false, next must be within i and must differ from pos. If either condition is violated, AdaptiveSearch returns a ValueNotFoundError.
type BinaryInterceptor ¶ added in v0.13.0
type BinaryInterceptor = Interceptor[[]byte]
BinaryInterceptor is an Interceptor that can be used to intercept operations on a BinaryJournal.
type BinaryJournal ¶ added in v0.8.0
A BinaryJournal is an append-only log of binary records.
type BinaryRangeFunc ¶ added in v0.8.0
A BinaryRangeFunc is a function used to range over the records in a BinaryJournal.
If err is non-nil, ranging stops and err is propagated up the stack. Otherwise, if ok is false, ranging stops without any error being propagated.
type BinaryStore ¶ added in v0.8.0
BinaryStore is a Store of journals that contain opaque binary records.
func WithTelemetry ¶ added in v0.2.1
func WithTelemetry( s BinaryStore, t trace.TracerProvider, m metric.MeterProvider, l log.LoggerProvider, ) BinaryStore
WithTelemetry returns a BinaryStore that adds telemetry to s.
type CompareFunc ¶ added in v0.3.0
CompareFunc is a function that compares a record to some datum.
If the record is less than the datum, cmp is negative. If the record is greater than the datum, cmp is positive. Otherwise, the record is considered equal to the datum.
type ConflictError ¶ added in v0.10.0
ConflictError is returned by [Journal.Append] if there is already a record at the specified position.
func (ConflictError) Error ¶ added in v0.10.0
func (e ConflictError) Error() string
type Interceptor ¶ added in v0.13.0
type Interceptor[T any] struct { // contains filtered or unexported fields }
Interceptor defines functions that are invoked around journal operations.
func (*Interceptor[T]) AfterAppend ¶ added in v0.13.0
func (i *Interceptor[T]) AfterAppend(fn func(journal string, rec T) error)
AfterAppend sets the function that is invoked after a record is appended to the Journal.
func (*Interceptor[T]) BeforeAppend ¶ added in v0.13.0
func (i *Interceptor[T]) BeforeAppend(fn func(journal string, rec T) error)
BeforeAppend sets the function that is invoked before a record is appended to the Journal.
func (*Interceptor[T]) BeforeOpen ¶ added in v0.13.0
func (i *Interceptor[T]) BeforeOpen(fn func(name string) error)
BeforeOpen sets the function that is invoked before a Journal is opened.
type Interval ¶ added in v0.10.0
type Interval struct {
// Begin is the position of the first record in the interval.
Begin Position
// End is the position immediately after the last record in the interval.
End Position
}
Interval describes a half-open interval of positions in a Journal.
func (Interval) Contains ¶ added in v0.10.0
Contains returns true if the interval contains the given position.
func (Interval) IsEmpty ¶ added in v0.10.0
IsEmpty returns true if the interval contains no records.
type Journal ¶
type Journal[T any] interface { // Name returns the name of the journal. Name() string // Bounds returns the half-open interval [begin, end) describing the // positions of the first and last records in the journal. Bounds(ctx context.Context) (Interval, error) // Get returns the record at the given position. // // It returns a [RecordNotFoundError] if there is no record at the given // position. Get(ctx context.Context, pos Position) (rec T, err error) // Range invokes fn for each record in the journal, in order, starting with // the record at the given position. // // It returns a [RecordNotFoundError] if there is no record at the given // position. Range(ctx context.Context, pos Position, fn RangeFunc[T]) error // Append adds a record to the journal as the given position. // // The record is stored at the given position and the end of the journal // becomes pos + 1. // // pos must be the end of the journal, as returned by [Bounds]. If pos < end // then a [ConflictError] is returned, indicating that there is already a // record at the given position. The behavior is undefined if pos > end. Append(ctx context.Context, pos Position, rec T) error // Truncate removes journal records in the half-open interval [begin, pos), // such that pos becomes the new beginning of the journal. // // If it returns an error the truncation may have been partially applied. // // The behavior is undefined if pos > end. Truncate(ctx context.Context, pos Position) error // Close closes the journal. Close() error }
A Journal is an append-only log containing records of type T.
type Position ¶
type Position uint64
Position is the index of a record within a Journal. The first record is always at position 0.
func AdaptiveSearch ¶ added in v0.16.0
func AdaptiveSearch[T any]( ctx context.Context, j Journal[T], i Interval, probe Position, fn AdaptiveProbeFunc[T], ) (Position, T, error)
AdaptiveSearch searches j within the interval i for a target record, using fn to determine which position to probe at each iteration.
Unlike Search, which always probes the midpoint of the remaining interval, AdaptiveSearch allows the caller to choose the next probe position based on the content of the previously probed record. This makes it suitable for implementing interpolation search and similar algorithms that use record content to estimate where the target is likely to be.
probe is the position of the first record to fetch. The caller is expected to compute this using any out-of-band knowledge available, such as a previously fetched record from the same journal.
It returns a ValueNotFoundError if the target record is not found.
func AppendWithConflictResolution ¶ added in v0.9.3
func AppendWithConflictResolution[T any]( ctx context.Context, j Journal[T], end Position, rec T, fn func(context.Context, Position) (Position, error), ) (Position, error)
AppendWithConflictResolution appends a record to j using fn to resolve optimistic concurrency conflicts. It returns the new journal end position.
If a conflict occurs fn is called and the append is retried with the offset it returns. If fn returns an error the append is not retried the error is returned.
func FirstRecord ¶ added in v0.4.0
FirstRecord returns the oldest record in a journal.
func LastRecord ¶ added in v0.4.0
LastRecord returns the newest record in a journal.
func Search ¶ added in v0.8.0
func Search[T any]( ctx context.Context, j Journal[T], i Interval, cmp CompareFunc[T], ) (pos Position, rec T, err error)
Search performs a binary search of j within the interval i to find the position of the record for which cmp() returns zero.
It returns a ValueNotFoundError if there is no such record.
type RangeFunc ¶
A RangeFunc is a function used to range over the records in a Journal.
If err is non-nil, ranging stops and err is propagated up the stack. Otherwise, if ok is false, ranging stops without any error being propagated.
type RecordNotFoundError ¶ added in v0.10.0
RecordNotFoundError is returned by [Journal.Get] and [Journal.Range] if the requested record does not exist, either because it has been truncated or because the given position has not been written yet.
func (RecordNotFoundError) Error ¶ added in v0.10.0
func (e RecordNotFoundError) Error() string
type ScanFunc ¶ added in v0.8.0
ScanFunc is a predicate function that produces a value of type V from a record of type T.
If the record cannot be used to produce a value of type V, ok is false.
type Store ¶
type Store[T any] interface { // Open returns the journal with the given name. Open(ctx context.Context, name string) (Journal[T], error) // Provision creates the infrastructure used by the store if it does not // already exist. Provision(ctx context.Context) error }
Store is a collection of journals containing records of type T.
func NewMarshalingStore ¶ added in v0.8.0
func NewMarshalingStore[T any]( s BinaryStore, m marshaler.Marshaler[T], ) Store[T]
NewMarshalingStore returns a new Store that marshals/unmarshals records of type T to/from an underlying BinaryStore.
func WithInterceptor ¶ added in v0.13.0
func WithInterceptor[T any](s Store[T], in *Interceptor[T]) Store[T]
WithInterceptor returns a Store that invokes the functions defined by the given Interceptor when performing operations on s.
type ValueNotFoundError ¶ added in v0.10.0
type ValueNotFoundError struct{}
ValueNotFoundError is returned by search and scan operations if the target value is not found.
func (ValueNotFoundError) Error ¶ added in v0.10.0
func (e ValueNotFoundError) Error() string