Documentation
¶
Overview ¶
Package persistence provides abstractions for data persistence.
Index ¶
- Variables
- type AggregateMetaData
- type AggregateRepository
- type Batch
- type ConflictError
- type DataStore
- type DataStoreSet
- type Event
- type EventRepository
- type EventResult
- type OffsetRepository
- type Operation
- type OperationVisitor
- type Persister
- type ProcessInstance
- type ProcessRepository
- type Provider
- type QueueMessage
- type QueueRepository
- type RemoveProcessInstance
- type RemoveQueueMessage
- type Result
- type SaveAggregateMetaData
- type SaveEvent
- type SaveOffset
- type SaveProcessInstance
- type SaveQueueMessage
- type UnknownMessageError
Constants ¶
This section is empty.
Variables ¶
var ErrDataStoreClosed = errors.New("data store is closed")
ErrDataStoreClosed is returned when performing any persistence operation on a closed data-store.
var ErrDataStoreLocked = errors.New("data store is locked")
ErrDataStoreLocked indicates that an application's data store can not be opened because it is locked by another engine instance.
Functions ¶
This section is empty.
Types ¶
type AggregateMetaData ¶
type AggregateMetaData struct {
// HandlerKey is the identity key of the aggregate message handler.
HandlerKey string
// InstanceID is the aggregate instance ID.
InstanceID string
// Revision is the instance's current version, used to enforce optimistic
// concurrency control.
Revision uint64
// LastEventID is the ID of the most recent event message recorded against
// the instance.
LastEventID string
}
AggregateMetaData contains meta-data about an aggregate instance.
type AggregateRepository ¶
type AggregateRepository interface {
// LoadAggregateMetaData loads the meta-data for an aggregate instance.
//
// hk is the aggregate handler's identity key, id is the instance ID.
LoadAggregateMetaData(
ctx context.Context,
hk, id string,
) (AggregateMetaData, error)
}
AggregateRepository is an interface for reading aggregate state.
type Batch ¶
type Batch []Operation
Batch is a set of operations that are committed to the data store atomically using a Persister.
func (Batch) AcceptVisitor ¶
func (b Batch) AcceptVisitor(ctx context.Context, v OperationVisitor) error
AcceptVisitor visits each operation in the batch.
func (Batch) MustValidate ¶
func (b Batch) MustValidate()
MustValidate panics if the batch contains any operations that operate on the same entity.
type ConflictError ¶
type ConflictError struct {
// Cause is the operation that caused the conflict.
Cause Operation
}
ConflictError is an error indicating one or more operations within a batch caused an optimistic concurrency conflict.
func (ConflictError) Error ¶
func (e ConflictError) Error() string
type DataStore ¶
type DataStore interface {
AggregateRepository
EventRepository
OffsetRepository
ProcessRepository
QueueRepository
Persister
// Close closes the data store.
//
// Closing a data-store causes any future calls to Persist() to return
// ErrDataStoreClosed.
//
// The behavior read operations on a closed data-store is
// implementation-defined.
//
// In general use it is expected that all pending calls to Persist() will
// have finished before a data-store is closed. Close() may block until any
// in-flight calls to Persist() return, or may prevent any such calls from
// succeeding.
Close() error
}
DataStore is an interface used by the engine to persist and retrieve data for a specific application.
type DataStoreSet ¶
type DataStoreSet struct {
Provider Provider
// contains filtered or unexported fields
}
DataStoreSet is a collection of data-stores for several applications.
func (*DataStoreSet) Close ¶
func (s *DataStoreSet) Close() error
Close closes all datastores in the set.
type Event ¶
type Event struct {
Offset uint64
Envelope *envelopespec.Envelope
}
Event is a persisted event message.
type EventRepository ¶
type EventRepository interface {
// NextEventOffset returns the next "unused" offset.
NextEventOffset(ctx context.Context) (uint64, error)
// LoadEventsByType loads events that match a specific set of message types.
//
// f is the set of message types to include in the result. The keys of f are
// the "portable type name" produced when the events are marshaled.
//
// o specifies the (inclusive) lower-bound of the offset range to include in
// the results.
LoadEventsByType(
ctx context.Context,
f map[string]struct{},
o uint64,
) (EventResult, error)
// LoadEventsBySource loads the events produced by a specific handler.
//
// hk is the handler's identity key.
//
// id is the instance ID, which must be empty if the handler type does not
// use instances.
LoadEventsBySource(
ctx context.Context,
hk, id string,
) (EventResult, error)
}
EventRepository is an interface for reading event messages.
type EventResult ¶
type EventResult interface {
// Next returns the next event in the result.
//
// It returns false if the are no more events in the result.
Next(ctx context.Context) (Event, bool, error)
// Close closes the cursor.
Close() error
}
EventResult is the result of a query made using an EventRepository.
EventResult values are not safe for concurrent use.
type OffsetRepository ¶
type OffsetRepository interface {
// LoadOffset loads the offset associated with a specific application.
//
// ak is the application's identity key.
LoadOffset(ctx context.Context, ak string) (uint64, error)
}
OffsetRepository is an interface for reading event stream offsets associated with remote applications.
type Operation ¶
type Operation interface {
// AcceptVisitor calls the appropriate visit method on the given visitor.
AcceptVisitor(context.Context, OperationVisitor) error
// contains filtered or unexported methods
}
Operation is a persistence operation that can be performed as part of an atomic batch.
type OperationVisitor ¶
type OperationVisitor interface {
VisitSaveAggregateMetaData(context.Context, SaveAggregateMetaData) error
VisitSaveEvent(context.Context, SaveEvent) error
VisitSaveProcessInstance(context.Context, SaveProcessInstance) error
VisitRemoveProcessInstance(context.Context, RemoveProcessInstance) error
VisitSaveQueueMessage(context.Context, SaveQueueMessage) error
VisitRemoveQueueMessage(context.Context, RemoveQueueMessage) error
VisitSaveOffset(context.Context, SaveOffset) error
}
OperationVisitor visits persistence operations.
type Persister ¶
type Persister interface {
// Persist commits a batch of operations atomically.
//
// If any one of the operations causes an optimistic concurrency conflict
// the entire batch is aborted and a ConflictError is returned.
Persist(context.Context, Batch) (Result, error)
}
A Persister is an interface for committing batches of atomic operations to the data store.
type ProcessInstance ¶
type ProcessInstance struct {
// HandlerKey is the identity key of the process message handler.
HandlerKey string
// InstanceID is the process instance ID.
InstanceID string
// Revision is the instance's current version, used to enforce optimistic
// concurrency control.
Revision uint64
// Packet contains the binary representation of the process state.
Packet marshaler.Packet
}
ProcessInstance contains the state of a process instance.
type ProcessRepository ¶
type ProcessRepository interface {
// LoadProcessInstance loads a process instance.
//
// hk is the process handler's identity key, id is the instance ID.
LoadProcessInstance(
ctx context.Context,
hk, id string,
) (ProcessInstance, error)
}
ProcessRepository is an interface for reading process state.
type Provider ¶
type Provider interface {
// Open returns a data-store for a specific application.
//
// k is the identity key of the application.
//
// Data stores are opened for exclusive use. If another engine instance has
// already opened this application's data-store, ErrDataStoreLocked is
// returned.
Open(ctx context.Context, k string) (DataStore, error)
}
Provider is an interface used by the engine to obtain application-specific DataStore instances.
type QueueMessage ¶
type QueueMessage struct {
Revision uint64
FailureCount uint
NextAttemptAt time.Time
Envelope *envelopespec.Envelope
}
QueueMessage is a message persisted in the message queue.
type QueueRepository ¶
type QueueRepository interface {
// LoadQueueMessages loads the next n messages from the queue.
LoadQueueMessages(ctx context.Context, n int) ([]QueueMessage, error)
}
QueueRepository is an interface for reading queued messages.
type RemoveProcessInstance ¶
type RemoveProcessInstance struct {
// Instance is the instance to remove.
//
// Instance.Revision must be the revision of the process instance as
// currently persisted, otherwise an optimistic concurrency conflict occurs
// and the entire batch of operations is rejected.
Instance ProcessInstance
}
RemoveProcessInstance is an Operation that removes a process instance.
The instance's pending timeout messages are removed from the message queue.
func (RemoveProcessInstance) AcceptVisitor ¶
func (op RemoveProcessInstance) AcceptVisitor(ctx context.Context, v OperationVisitor) error
AcceptVisitor calls v.VisitRemoveProcessInstance().
type RemoveQueueMessage ¶
type RemoveQueueMessage struct {
// Message is the message to remove from the queue.
//
// The message's revision field must be the revision of the message as
// currently persisted, otherwise an optimistic concurrency conflict occurs
// and the entire batch of operations is rejected.
Message QueueMessage
}
RemoveQueueMessage is an Operation that removes a message from the queue.
func (RemoveQueueMessage) AcceptVisitor ¶
func (op RemoveQueueMessage) AcceptVisitor(ctx context.Context, v OperationVisitor) error
AcceptVisitor calls v.VisitRemoveQueueMessage().
type Result ¶
type Result struct {
// EventOffset contains the offsets of the events saved within the batch,
// keyed by their message ID.
EventOffsets map[string]uint64
}
Result is the result of a successfully persisted batch of operations.
type SaveAggregateMetaData ¶
type SaveAggregateMetaData struct {
// MetaData is the meta-data to persist.
//
// MetaData.Revision must be the revision of the aggregate instance as
// currently persisted, otherwise an optimistic concurrency conflict occurs
// and the entire batch of operations is rejected.
MetaData AggregateMetaData
}
SaveAggregateMetaData is an Operation that creates or updates meta-data about an aggregate instance.
func (SaveAggregateMetaData) AcceptVisitor ¶
func (op SaveAggregateMetaData) AcceptVisitor(ctx context.Context, v OperationVisitor) error
AcceptVisitor calls v.VisitSaveAggregateMetaData().
type SaveEvent ¶
type SaveEvent struct {
// Envelope is the envelope containing the event to persist.
Envelope *envelopespec.Envelope
}
SaveEvent is an Operation that persists an event message.
func (SaveEvent) AcceptVisitor ¶
func (op SaveEvent) AcceptVisitor(ctx context.Context, v OperationVisitor) error
AcceptVisitor calls v.VisitSaveEvent().
type SaveOffset ¶
type SaveOffset struct {
// ApplicationKey is the identity key of the source application.
ApplicationKey string
// CurrentOffset must be offset currently associated with this application,
// otherwise an optimistic concurrency conflict occurs and the entire batch
// of operations is rejected.
CurrentOffset uint64
// NextOffset is the next offset to consume from this application.
NextOffset uint64
}
SaveOffset is an Operation that persists the offset of the next event to be consumed from a specific application.
func (SaveOffset) AcceptVisitor ¶
func (op SaveOffset) AcceptVisitor(ctx context.Context, v OperationVisitor) error
AcceptVisitor calls v.VisitSaveOffset().
type SaveProcessInstance ¶
type SaveProcessInstance struct {
// Instance is the instance to persist.
//
// Instance.Revision must be the revision of the process instance as
// currently persisted, otherwise an optimistic concurrency conflict occurs
// and the entire batch of operations is rejected.
Instance ProcessInstance
}
SaveProcessInstance is an Operation that creates or updates a process instance.
func (SaveProcessInstance) AcceptVisitor ¶
func (op SaveProcessInstance) AcceptVisitor(ctx context.Context, v OperationVisitor) error
AcceptVisitor calls v.VisitSaveProcessInstance().
type SaveQueueMessage ¶
type SaveQueueMessage struct {
// Message is the message to persist to the queue.
//
// The message's revision field must be the revision of the message as
// currently persisted, otherwise an optimistic concurrency conflict occurs
// and the entire batch of operations is rejected.
Message QueueMessage
}
SaveQueueMessage is an Operation that saves a message to the queue, or updates a message that is already on the queue.
func (SaveQueueMessage) AcceptVisitor ¶
func (op SaveQueueMessage) AcceptVisitor(ctx context.Context, v OperationVisitor) error
AcceptVisitor calls v.VisitSaveQueueMessage().
type UnknownMessageError ¶
type UnknownMessageError struct {
MessageID string
}
UnknownMessageError is the error returned when a message referenced by its ID does not exist.
func (UnknownMessageError) Error ¶
func (e UnknownMessageError) Error() string
Error returns a string representation of UnknownMessageError.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package boltpersistence is a BoltDB (bbolt) persistence provider.
|
Package boltpersistence is a BoltDB (bbolt) persistence provider. |
|
internal
|
|
|
providertest
Package providertest contains a common test suite for persistence.Provider implementations.
|
Package providertest contains a common test suite for persistence.Provider implementations. |
|
Package memorypersistence is an in-memory persistence provider.
|
Package memorypersistence is an in-memory persistence provider. |
|
Package sqlpersistence is an SQL-based persistence provider with drivers for several popular SQL database systems.
|
Package sqlpersistence is an SQL-based persistence provider with drivers for several popular SQL database systems. |
|
mysql
Package mysql is a MySQL driver for the SQL persistence provider.
|
Package mysql is a MySQL driver for the SQL persistence provider. |
|
postgres
Package postgres is a PostgreSQL driver for the SQL persistence provider.
|
Package postgres is a PostgreSQL driver for the SQL persistence provider. |
|
sqlite
Package sqlite is an SQlite v3 driver for the SQL persistence provider.
|
Package sqlite is an SQlite v3 driver for the SQL persistence provider. |