engine

package
v0.42.4-pebble.4-fix-a... Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 22, 2025 License: AGPL-3.0 Imports: 9 Imported by: 42

README

Notifier

The Notifier implements the following state machine Notifier State Machine

The intended usage pattern is:

  • there are goroutines, aka Producers, that append work to a queue pendingWorkQueue
  • there is a number of goroutines, aka Consumers, that pull work from the pendingWorkQueue
    • they consume work until they have drained the pendingWorkQueue
    • when they find that the pendingWorkQueue contains no more work, they go back to the notifier and await notification

Notifier Usage Pattern

Note that the consumer / producer interact in a different order with the pendingWorkQueue vs the notifier:

  • the producer first drops its work into the queue and subsequently sends the notification
  • the consumer first processes elements from the queue and subsequently checks for a notification Thereby, it is guaranteed that at least one consumer routine will be notified when work is added

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// IncompatibleInputTypeError indicates that the input has an incompatible type
	IncompatibleInputTypeError = errors.New("incompatible input type")
)

Functions

func IsDuplicatedEntryError added in v0.14.0

func IsDuplicatedEntryError(err error) bool

func IsIncompatibleInputTypeError added in v0.23.2

func IsIncompatibleInputTypeError(err error) bool

func IsInvalidInputError

func IsInvalidInputError(err error) bool

IsInvalidInputError returns whether the given error is an InvalidInputError error

func IsNetworkTransmissionError added in v0.29.0

func IsNetworkTransmissionError(err error) bool

IsNetworkTransmissionError returns whether the given error is a NetworkTransmissionError error

func IsOutdatedInputError

func IsOutdatedInputError(err error) bool

func IsUnverifiableInputError added in v0.15.0

func IsUnverifiableInputError(err error) bool

func LogError

func LogError(log zerolog.Logger, err error)

LogError logs the engine processing error

func LogErrorWithMsg added in v0.15.0

func LogErrorWithMsg(log zerolog.Logger, msg string, err error)

func MatchType added in v0.31.0

func MatchType[T any](m *Message) bool

func NewDuplicatedEntryErrorf added in v0.14.0

func NewDuplicatedEntryErrorf(msg string, args ...interface{}) error

func NewInvalidInputErrorf

func NewInvalidInputErrorf(msg string, args ...interface{}) error

func NewNetworkTransmissionErrorf added in v0.29.0

func NewNetworkTransmissionErrorf(msg string, args ...interface{}) error

func NewOutdatedInputErrorf

func NewOutdatedInputErrorf(msg string, args ...interface{}) error

func NewUnverifiableInputError added in v0.15.0

func NewUnverifiableInputError(msg string, args ...interface{}) error

Types

type Broadcaster added in v0.30.2

type Broadcaster struct {
	// contains filtered or unexported fields
}

Broadcaster is a distributor for Notifier objects. It implements a simple generic pub/sub pattern. Callers can subscribe to single-channel notifications by passing a Notifier object to the Subscribe method. When Publish is called, all subscribers are notified.

func NewBroadcaster added in v0.30.2

func NewBroadcaster() *Broadcaster

NewBroadcaster creates a new Broadcaster

func (*Broadcaster) Publish added in v0.30.2

func (b *Broadcaster) Publish()

Publish sends notifications to all subscribers

func (*Broadcaster) Subscribe added in v0.30.2

func (b *Broadcaster) Subscribe(n Notifiable)

Subscribe adds a Notifier to the list of subscribers to be notified when Publish is called

type DuplicatedEntryError added in v0.14.0

type DuplicatedEntryError struct {
	// contains filtered or unexported fields
}

func (DuplicatedEntryError) Error added in v0.14.0

func (e DuplicatedEntryError) Error() string

func (DuplicatedEntryError) Unwrap added in v0.14.0

func (e DuplicatedEntryError) Unwrap() error

type FifoMessageStore added in v0.17.2

type FifoMessageStore struct {
	*fifoqueue.FifoQueue
}

FifoMessageStore wraps a FiFo Queue to implement the MessageStore interface.

func (*FifoMessageStore) Get added in v0.17.2

func (s *FifoMessageStore) Get() (*Message, bool)

func (*FifoMessageStore) Put added in v0.17.2

func (s *FifoMessageStore) Put(msg *Message) bool

type FilterFunc added in v0.17.2

type FilterFunc func(*Message) bool

type InvalidInputError

type InvalidInputError struct {
	// contains filtered or unexported fields
}

InvalidInputError are errors for caused by invalid inputs. It's useful to distinguish these known errors from exceptions. By distinguishing errors from exceptions, we can log them differently. For instance, log InvalidInputError error as a warn log, and log other error as an error log.

func (InvalidInputError) Error

func (e InvalidInputError) Error() string

func (InvalidInputError) Unwrap

func (e InvalidInputError) Unwrap() error

type MapFunc added in v0.17.2

type MapFunc func(*Message) (*Message, bool)

type MatchFunc added in v0.17.2

type MatchFunc func(*Message) bool

type Message added in v0.17.2

type Message struct {
	OriginID flow.Identifier
	Payload  interface{}
}

type MessageHandler added in v0.17.2

type MessageHandler struct {
	// contains filtered or unexported fields
}

func NewMessageHandler added in v0.17.2

func NewMessageHandler(log zerolog.Logger, notifier Notifier, patterns ...Pattern) *MessageHandler

func (*MessageHandler) GetNotifier added in v0.17.2

func (e *MessageHandler) GetNotifier() <-chan struct{}

func (*MessageHandler) Process added in v0.17.2

func (e *MessageHandler) Process(originID flow.Identifier, payload interface{}) error

Process iterates over the internal processing patterns and determines if the payload matches. The _first_ matching pattern processes the payload. Returns

  • IncompatibleInputTypeError if no matching processor was found
  • All other errors are potential symptoms of internal state corruption or bugs (fatal).

type MessageStore added in v0.17.2

type MessageStore interface {
	// Put adds the message to the message store. It returns true if the message
	// is stored, and false if it is immediately dropped.
	// Note: depending on the implementation, message stores might drop messages
	// later according to their internal ejection policy. In other words, a return
	// value of `true` does _not imply_ that the message is eventually processed.
	Put(*Message) bool
	// Get retrieves the next message from the message store. It returns true if
	// a message is retrieved, and false if the message store is empty.
	Get() (*Message, bool)
}

MessageStore is the interface to abstract how messages are buffered in memory while waiting to be processed.

type NetworkTransmissionError added in v0.29.0

type NetworkTransmissionError struct {
	// contains filtered or unexported fields
}

NetworkTransmissionError captures the general sentinel errors upon network transmission. It is used to distinguish network transmission errors from other errors.

func (NetworkTransmissionError) Error added in v0.29.0

func (e NetworkTransmissionError) Error() string

func (NetworkTransmissionError) Unwrap added in v0.29.0

func (e NetworkTransmissionError) Unwrap() error

type Notifiable added in v0.30.2

type Notifiable interface {
	// Notify sends a notification. This method must be concurrency safe and non-blocking.
	// It is expected to be a Notifier object, but does not have to be.
	Notify()
}

Notifiable is an interface for objects that can be notified

type Notifier added in v0.17.6

type Notifier struct {
	// contains filtered or unexported fields
}

Notifier is a concurrency primitive for informing worker routines about the arrival of new work unit(s). Notifiers essentially behave like channels in that they can be passed by value and still allow concurrent updates of the same internal state.

func NewNotifier added in v0.17.6

func NewNotifier() Notifier

NewNotifier instantiates a Notifier. Notifiers essentially behave like channels in that they can be passed by value and still allow concurrent updates of the same internal state.

func (Notifier) Channel added in v0.17.6

func (n Notifier) Channel() <-chan struct{}

Channel returns a channel for receiving notifications

func (Notifier) Notify added in v0.17.6

func (n Notifier) Notify()

Notify sends a notification

type OutdatedInputError

type OutdatedInputError struct {
	// contains filtered or unexported fields
}

OutdatedInputError are for inputs that are outdated. An outdated input doesn't mean whether the input was invalid or not, knowing that would take more computation that isn't necessary. An outdated input could also for a duplicated input: the duplication is outdated.

func (OutdatedInputError) Error

func (e OutdatedInputError) Error() string

func (OutdatedInputError) Unwrap

func (e OutdatedInputError) Unwrap() error

type Pattern added in v0.17.2

type Pattern struct {
	// Match is a function to match a message to this pattern, typically by payload type.
	Match MatchFunc
	// Map is a function to apply to messages before storing them. If not provided, then the message is stored in its original form.
	Map MapFunc
	// Store is an abstract message store where we will store the message upon receipt.
	Store MessageStore
}

type Unit

type Unit struct {
	sync.Mutex // can be used to synchronize the engine
	// contains filtered or unexported fields
}

Unit handles synchronization management, startup, and shutdown for engines. New components should use component.ComponentManager rather than Unit.

func NewUnit

func NewUnit() *Unit

NewUnit returns a new unit.

func (*Unit) Ctx

func (u *Unit) Ctx() context.Context

Ctx returns a context with the same lifecycle scope as the unit. In particular, it is cancelled when Done is called, so it can be used as the parent context for processes spawned by any engine whose lifecycle is managed by a unit.

func (*Unit) Do

func (u *Unit) Do(f func() error) error

Do synchronously executes the input function f unless the unit has shut down. It returns the result of f. If f is executed, the unit will not shut down until after f returns.

func (*Unit) Done

func (u *Unit) Done(actions ...func()) <-chan struct{}

Done returns a channel that is closed when the unit is done. A unit is done when (i) the series of "action" functions are executed and (ii) all pending functions invoked with `Do` or `Launch` have completed.

The engine using the unit is responsible for defining these action functions as required.

func (*Unit) Launch

func (u *Unit) Launch(f func())

Launch asynchronously executes the input function unless the unit has shut down. If f is executed, the unit will not shut down until after f returns.

func (*Unit) LaunchAfter

func (u *Unit) LaunchAfter(delay time.Duration, f func())

LaunchAfter asynchronously executes the input function after a certain delay unless the unit has shut down.

func (*Unit) LaunchPeriodically

func (u *Unit) LaunchPeriodically(f func(), interval time.Duration, delay time.Duration)

LaunchPeriodically asynchronously executes the input function on `interval` periods unless the unit has shut down. If f is executed, the unit will not shut down until after f returns.

func (*Unit) Quit

func (u *Unit) Quit() <-chan struct{}

Quit returns a channel that is closed when the unit begins to shut down.

func (*Unit) Ready

func (u *Unit) Ready(checks ...func()) <-chan struct{}

Ready returns a channel that is closed when the unit is ready. A unit is ready when the series of "check" functions are executed.

The engine using the unit is responsible for defining these check functions as required.

type UnverifiableInputError added in v0.15.0

type UnverifiableInputError struct {
	// contains filtered or unexported fields
}

UnverifiableInputError are for inputs that cannot be verified at this moment. Usually it means that we don't have enough data to verify it. A good example is missing data in DB to process input.

func (UnverifiableInputError) Error added in v0.15.0

func (e UnverifiableInputError) Error() string

func (UnverifiableInputError) Unwrap added in v0.15.0

func (e UnverifiableInputError) Unwrap() error

Directories

Path Synopsis
access
ingestion2
Package ingestion2 implements a modular ingestion engine responsible for orchestrating the processing of finalized blockchain data and receiving execution receipts from the network.
Package ingestion2 implements a modular ingestion engine responsible for orchestrating the processing of finalized blockchain data and receiving execution receipts from the network.
rest/common/models
* Access API * * No description provided (generated by Swagger Codegen https://github.com/swagger-api/swagger-codegen) * * API version: 1.0.0 * Generated by: Swagger Codegen (https://github.com/swagger-api/swagger-codegen.git)
* Access API * * No description provided (generated by Swagger Codegen https://github.com/swagger-api/swagger-codegen) * * API version: 1.0.0 * Generated by: Swagger Codegen (https://github.com/swagger-api/swagger-codegen.git)
rest/http/models
* Access API * * No description provided (generated by Swagger Codegen https://github.com/swagger-api/swagger-codegen) * * API version: 1.0.0 * Generated by: Swagger Codegen (https://github.com/swagger-api/swagger-codegen.git)
* Access API * * No description provided (generated by Swagger Codegen https://github.com/swagger-api/swagger-codegen) * * API version: 1.0.0 * Generated by: Swagger Codegen (https://github.com/swagger-api/swagger-codegen.git)
rpc
ingest
Package ingest implements an engine for receiving transactions that need to be packaged into a collection.
Package ingest implements an engine for receiving transactions that need to be packaged into a collection.
pusher
Package pusher implements an engine for providing access to resources held by the collection node, including collections, collection guarantees, and transactions.
Package pusher implements an engine for providing access to resources held by the collection node, including collections, collection guarantees, and transactions.
rpc
Package rpc implements accepting transactions into the system.
Package rpc implements accepting transactions into the system.
common
grpc/compressor/deflate
Package deflate implements and registers the DEFLATE compressor during initialization.
Package deflate implements and registers the DEFLATE compressor during initialization.
rpc
dkg
Package dkg implements engines for the DKG protocol.
Package dkg implements engines for the DKG protocol.
rpc
ghost
verification

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL