Documentation
¶
Index ¶
- Variables
- func AcquireConn(ctx context.Context, db *sql.DB) (*sql.Conn, error)
- type AggregateProjector
- type AggregateProjectorStorage
- type EventStreamLoader
- type Execer
- type Listener
- type MessageFactory
- type Metrics
- type NotificationQueue
- func (nq *NotificationQueue) Empty() bool
- func (nq *NotificationQueue) Next(ctx context.Context) (*ProjectionNotification, bool)
- func (nq *NotificationQueue) Open() func()
- func (nq *NotificationQueue) Queue(ctx context.Context, notification *ProjectionNotification) error
- func (nq *NotificationQueue) ReQueue(ctx context.Context, notification *ProjectionNotification) error
- type NotificationQueuer
- type PersistenceStrategy
- type ProcessHandler
- type ProjectionErrorAction
- type ProjectionErrorCallback
- type ProjectionHandlerError
- type ProjectionNotification
- type ProjectionNotificationProcessor
- func (b *ProjectionNotificationProcessor) Execute(ctx context.Context, handler ProcessHandler, ...) error
- func (b *ProjectionNotificationProcessor) Queue(ctx context.Context, notification *ProjectionNotification) error
- func (b *ProjectionNotificationProcessor) Start(ctx context.Context, handler ProcessHandler) func()
- type ProjectionRawState
- type ProjectionState
- type ProjectionStateSerialization
- type ProjectionTrigger
- type ProjectorStorage
- type ProjectorTransaction
- type Queryer
- type ReadOnlyEventStore
- type StreamProjector
- type StreamProjectorStorage
Constants ¶
This section is empty.
Variables ¶
var ( // ErrConnFailedToAcquire occurs when a connection cannot be acquired within the timelimit ErrConnFailedToAcquire = errors.New("goengine: unable to acquire projection lock") // ErrProjectionFailedToLock occurs when the projector cannot acquire the projection lock ErrProjectionFailedToLock = errors.New("goengine: unable to acquire projection lock") // ErrProjectionPreviouslyLocked occurs when a projection was lock was acquired but a previous lock is still in place ErrProjectionPreviouslyLocked = errors.New("goengine: unable to lock projection due to a previous lock being in place") // ErrNoProjectionRequired occurs when a notification was being acquired but the projection was already at the indicated position ErrNoProjectionRequired = errors.New("goengine: no projection acquisition required") )
Functions ¶
Types ¶
type AggregateProjector ¶
AggregateProjector is a postgres projector used to execute a projection per aggregate instance against an event stream
func NewAggregateProjector ¶
func NewAggregateProjector( db *sql.DB, eventLoader EventStreamLoader, resolver goengine.MessagePayloadResolver, projection goengine.Projection, projectorStorage AggregateProjectorStorage, projectionErrorHandler ProjectionErrorCallback, logger goengine.Logger, metrics Metrics, retryDelay time.Duration, ) (*AggregateProjector, error)
NewAggregateProjector creates a new projector for a projection
func (*AggregateProjector) Run ¶
func (a *AggregateProjector) Run(ctx context.Context) error
Run executes the projection and manages the state of the projection
func (*AggregateProjector) RunAndListen ¶
func (a *AggregateProjector) RunAndListen(ctx context.Context, listener Listener) error
RunAndListen executes the projection and listens to any changes to the event store
type AggregateProjectorStorage ¶
type AggregateProjectorStorage interface {
ProjectorStorage
LoadOutOfSync(ctx context.Context, conn Queryer) (*sql.Rows, error)
PersistFailure(conn Execer, notification *ProjectionNotification) error
}
AggregateProjectorStorage the storage interface that will persist and load the projection state
type EventStreamLoader ¶
type EventStreamLoader func(ctx context.Context, conn *sql.Conn, notification *ProjectionNotification, position int64) (goengine.EventStream, error)
EventStreamLoader loads a event stream based on the provided notification and state
func AggregateProjectionEventStreamLoader ¶
func AggregateProjectionEventStreamLoader(eventStore ReadOnlyEventStore, streamName goengine.StreamName, aggregateTypeName string) EventStreamLoader
AggregateProjectionEventStreamLoader returns a EventStreamLoader for the AggregateProjector
func StreamProjectionEventStreamLoader ¶
func StreamProjectionEventStreamLoader(eventStore ReadOnlyEventStore, streamName goengine.StreamName) EventStreamLoader
StreamProjectionEventStreamLoader returns a EventStreamLoader for the StreamProjector
type Execer ¶
type Execer interface {
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
}
Execer a interface used to execute a query on a sql.DB, sql.Conn or sql.Tx
type Listener ¶
type Listener interface {
// Listen starts listening to the event stream and call the trigger when a event was appended
Listen(ctx context.Context, trigger ProjectionTrigger) error
}
Listener listens to a event stream and triggers a notification when a event was appended
type MessageFactory ¶
type MessageFactory interface {
// CreateEventStream reconstructs the message from the provided rows
CreateEventStream(rows *sql.Rows) (goengine.EventStream, error)
}
MessageFactory reconstruct messages from the database
type Metrics ¶
type Metrics interface {
// ReceivedNotification sends the metric to keep count of notifications received by goengine
ReceivedNotification(isNotification bool)
// QueueNotification is called when a notification is queued.
// It saves start time for an event on aggregate when it's queued
QueueNotification(notification *ProjectionNotification)
// StartNotificationProcessing is called when a notification processing is started
// It saves start time for an event on aggregate when it's picked to be processed by background processor
StartNotificationProcessing(notification *ProjectionNotification)
// FinishNotificationProcessing is called when a notification processing is finished
// It actually sends metrics calculating duration for which a notification spends in queue and then processed by background processor
FinishNotificationProcessing(notification *ProjectionNotification, success bool)
}
Metrics a structured metrics interface
var NopMetrics Metrics = &nopMetrics{}
NopMetrics is default Metrics handler in case nil is passed
type NotificationQueue ¶
type NotificationQueue struct {
// contains filtered or unexported fields
}
NotificationQueue implements a smart queue
func (*NotificationQueue) Empty ¶
func (nq *NotificationQueue) Empty() bool
Empty returns whether the queue is empty
func (*NotificationQueue) Next ¶
func (nq *NotificationQueue) Next(ctx context.Context) (*ProjectionNotification, bool)
Next yields the next notification on the queue or stopped when processor has stopped
func (*NotificationQueue) Open ¶
func (nq *NotificationQueue) Open() func()
Open enables the queue for business
func (*NotificationQueue) Queue ¶
func (nq *NotificationQueue) Queue(ctx context.Context, notification *ProjectionNotification) error
Queue sends a notification to the queue
func (*NotificationQueue) ReQueue ¶
func (nq *NotificationQueue) ReQueue(ctx context.Context, notification *ProjectionNotification) error
ReQueue sends a notification to the queue after setting the ValidAfter property
type NotificationQueuer ¶
type NotificationQueuer interface {
Open() func()
Empty() bool
Next(context.Context) (*ProjectionNotification, bool)
Queue(context.Context, *ProjectionNotification) error
ReQueue(context.Context, *ProjectionNotification) error
}
NotificationQueuer describes a smart queue for projection notifications
type PersistenceStrategy ¶
type PersistenceStrategy interface {
CreateSchema(tableName string) []string
// EventColumnsNames represent the event store columns selected from the event stream table. Used by PrepareSearch
EventColumnNames() []string
// InsertColumnNames represent the ordered event store columns that are used to insert data into the event stream.
InsertColumnNames() []string
PrepareData([]goengine.Message) ([]interface{}, error)
PrepareSearch(metadata.Matcher) ([]byte, []interface{})
GenerateTableName(streamName goengine.StreamName) (string, error)
}
PersistenceStrategy interface describes strategy of persisting messages in the database
type ProcessHandler ¶
type ProcessHandler func(context.Context, *ProjectionNotification, ProjectionTrigger) error
ProcessHandler is a func used to trigger a notification but with the addition of providing a Trigger func so the original notification can trigger other notifications
type ProjectionErrorAction ¶
type ProjectionErrorAction int
ProjectionErrorAction a type containing the action that the projector should take after an error
const ( // ProjectionFail indicated that the projection failed and cannot be recovered // This means that a human as a service is needed ProjectionFail ProjectionErrorAction = iota // ProjectionRetry indicated that the notification should be retried // This can be used in combination with retry mechanism in the ProjectorErrorCallback ProjectionRetry ProjectionErrorAction = iota // ProjectionIgnoreError indicated that the projection failed but the failure can be ignored // This can be used when the ProjectorErrorCallback recovered the system from the error ProjectionIgnoreError ProjectionErrorAction = iota )
type ProjectionErrorCallback ¶
type ProjectionErrorCallback func(err error, notification *ProjectionNotification) ProjectionErrorAction
ProjectionErrorCallback is a function used to determin what action to take based on a failed projection
type ProjectionHandlerError ¶
type ProjectionHandlerError struct {
// contains filtered or unexported fields
}
ProjectionHandlerError an error indicating that a projection handler failed
func NewProjectionHandlerError ¶
func NewProjectionHandlerError(err error) *ProjectionHandlerError
NewProjectionHandlerError return a ProjectionHandlerError with the cause being the provided error
func (*ProjectionHandlerError) Cause ¶
func (e *ProjectionHandlerError) Cause() error
Cause returns the actual projection errors. This also adds support for github.com/pkg/errors.Cause
func (*ProjectionHandlerError) Error ¶
func (e *ProjectionHandlerError) Error() string
Error return the error message
type ProjectionNotification ¶
type ProjectionNotification struct {
No int64 `json:"no"`
AggregateID string `json:"aggregate_id"`
ValidAfter time.Time `json:"valid_after"`
}
ProjectionNotification is a representation of the data provided by database notify
func (*ProjectionNotification) MarshalEasyJSON ¶
func (p *ProjectionNotification) MarshalEasyJSON(w *jwriter.Writer)
MarshalEasyJSON supports easyjson.Marshaler interface
func (*ProjectionNotification) UnmarshalEasyJSON ¶
func (p *ProjectionNotification) UnmarshalEasyJSON(in *jlexer.Lexer)
UnmarshalEasyJSON supports easyjson.Unmarshaler interface
func (*ProjectionNotification) UnmarshalJSON ¶
func (p *ProjectionNotification) UnmarshalJSON(data []byte) error
UnmarshalJSON supports json.Unmarshaler interface
type ProjectionNotificationProcessor ¶
type ProjectionNotificationProcessor struct {
// contains filtered or unexported fields
}
ProjectionNotificationProcessor provides a way to Trigger a notification using a set of background processes.
func NewBackgroundProcessor ¶
func NewBackgroundProcessor( queueProcessors, queueBuffer int, logger goengine.Logger, metrics Metrics, notificationQueue NotificationQueuer, ) (*ProjectionNotificationProcessor, error)
NewBackgroundProcessor create a new projectionNotificationProcessor
func (*ProjectionNotificationProcessor) Execute ¶
func (b *ProjectionNotificationProcessor) Execute(ctx context.Context, handler ProcessHandler, notification *ProjectionNotification) error
Execute starts the background worker and wait for the notification to be executed
func (*ProjectionNotificationProcessor) Queue ¶
func (b *ProjectionNotificationProcessor) Queue(ctx context.Context, notification *ProjectionNotification) error
Queue puts the notification on the queue to be processed
func (*ProjectionNotificationProcessor) Start ¶
func (b *ProjectionNotificationProcessor) Start(ctx context.Context, handler ProcessHandler) func()
Start starts the background processes that will call the ProcessHandler based on the notification queued by Exec
type ProjectionRawState ¶
ProjectionRawState the raw projection projectionState returned by ProjectorStorage.Acquire
type ProjectionState ¶
type ProjectionState struct {
Position int64
ProjectionState interface{}
}
ProjectionState is a projection projectionState
type ProjectionStateSerialization ¶
type ProjectionStateSerialization interface {
// Init initializes the state
Init(ctx context.Context) (interface{}, error)
// DecodeState reconstitute the projection state based on the provided state data
DecodeState(data []byte) (interface{}, error)
// EncodeState encode the given object for storage
EncodeState(obj interface{}) ([]byte, error)
}
ProjectionStateSerialization is an interface describing how a projection state can be initialized, serialized/encoded anf deserialized/decoded
func GetProjectionStateSerialization ¶
func GetProjectionStateSerialization(projection goengine.Projection) ProjectionStateSerialization
GetProjectionStateSerialization returns a ProjectionStateSerialization based on the provided projection
type ProjectionTrigger ¶
type ProjectionTrigger func(ctx context.Context, notification *ProjectionNotification) error
ProjectionTrigger triggers the notification for processing
type ProjectorStorage ¶
type ProjectorStorage interface {
// Acquire this function is used to acquire the projection and it's projectionState
// A projection can only be acquired once and must be released using the returned func
Acquire(ctx context.Context, conn *sql.Conn, notification *ProjectionNotification) (ProjectorTransaction, int64, error)
}
ProjectorStorage is an interface for handling the projection storage
type ProjectorTransaction ¶
type ProjectorTransaction interface {
AcquireState(ctx context.Context) (ProjectionState, error)
CommitState(ProjectionState) error
Close() error
}
ProjectorTransaction is a transaction type object returned by the ProjectorStorage
type Queryer ¶
type Queryer interface {
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
}
Queryer an interface used to query a sql.DB, sql.Conn or sql.Tx
type ReadOnlyEventStore ¶
type ReadOnlyEventStore interface {
// LoadWithConnection returns a eventstream based on the provided constraints using the provided Queryer
LoadWithConnection(ctx context.Context, conn Queryer, streamName goengine.StreamName, fromNumber int64, count *uint, metadataMatcher metadata.Matcher) (goengine.EventStream, error)
}
ReadOnlyEventStore an interface describing a readonly event store that supports providing a SQL conn
type StreamProjector ¶
StreamProjector is a postgres projector used to execute a projection against an event stream.
func NewStreamProjector ¶
func NewStreamProjector( db *sql.DB, eventLoader EventStreamLoader, resolver goengine.MessagePayloadResolver, projection goengine.Projection, projectorStorage StreamProjectorStorage, projectionErrorHandler ProjectionErrorCallback, logger goengine.Logger, ) (*StreamProjector, error)
NewStreamProjector creates a new projector for a projection
func (*StreamProjector) Run ¶
func (s *StreamProjector) Run(ctx context.Context) error
Run executes the projection and manages the state of the projection
func (*StreamProjector) RunAndListen ¶
func (s *StreamProjector) RunAndListen(ctx context.Context, listener Listener) error
RunAndListen executes the projection and listens to any changes to the event store
type StreamProjectorStorage ¶
type StreamProjectorStorage interface {
ProjectorStorage
CreateProjection(ctx context.Context, conn Execer) error
}
StreamProjectorStorage the storage interface that will persist and load the projection state