log

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 25, 2022 License: MIT Imports: 24 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewBroadcaster

func NewBroadcaster(orm ORM, ethClient eth.Client, config Config, lggr logger.Logger, highestSavedHead *eth.Head) *broadcaster

NewBroadcaster creates a new instance of the broadcaster

func NewORM added in v0.10.3

func NewORM(db *sqlx.DB, lggr logger.Logger, cfg pg.LogConfig, evmChainID big.Int) *orm

Types

type Broadcast

type Broadcast interface {
	DecodedLog() interface{}
	RawLog() types.Log
	String() string
	LatestBlockNumber() uint64
	LatestBlockHash() common.Hash
	JobID() int32
	EVMChainID() big.Int
}

The Broadcast type wraps a types.Log but provides additional functionality for determining whether or not the log has been consumed and for marking the log as consumed

func NewLogBroadcast added in v0.10.8

func NewLogBroadcast(rawLog types.Log, evmChainID big.Int, decodedLog interface{}) Broadcast

type Broadcaster

type Broadcaster interface {
	utils.DependentAwaiter
	service.Service
	httypes.HeadTrackable
	ReplayFromBlock(number int64)

	IsConnected() bool
	Register(listener Listener, opts ListenerOpts) (unsubscribe func())

	WasAlreadyConsumed(lb Broadcast, qopts ...pg.QOpt) (bool, error)
	MarkConsumed(lb Broadcast, qopts ...pg.QOpt) error
}

The Broadcaster manages log subscription requests for the Chainlink node. Instead of creating a new subscription for each request, it multiplexes all subscriptions to all of the relevant contracts over a single connection and forwards the logs to the relevant subscribers.

In case of node crash and/or restart, the logs will be backfilled for subscribers that are added before all dependents of LogBroadcaster are done.

The backfill starts from the earliest block of either:

  • Latest DB head minus BlockBackfillDepth and the maximum number of confirmations.
  • Earliest pending or unconsumed log broadcast from DB.

If a subscriber is added after the LogBroadcaster does the initial backfill, then it's possible/likely that the backfill fill only have depth: 1 (from latest head)

Of course, these backfilled logs + any new logs will only be sent after the NumConfirmations for given subscriber.

type BroadcasterInTest added in v0.10.11

type BroadcasterInTest interface {
	Broadcaster
	BackfillBlockNumber() null.Int64
	TrackedAddressesCount() uint32
	// Pause pauses the eventLoop until Resume is called.
	Pause()
	// Resume resumes the eventLoop after calling Pause.
	Resume()
	LogsFromBlock(bh common.Hash) int
}

type Config added in v0.10.3

type Config interface {
	BlockBackfillDepth() uint64
	BlockBackfillSkip() bool
	EvmFinalityDepth() uint32
	EvmLogBackfillBatchSize() uint32
}

type Listener

type Listener interface {
	HandleLog(b Broadcast)
	JobID() int32
}

The Listener responds to log events through HandleLog.

type ListenerOpts added in v0.10.3

type ListenerOpts struct {
	Contract common.Address

	// Event types to receive, with value filter for each field in the event
	// No filter or an empty filter for a given field position mean: all values allowed
	// the key should be a result of AbigenLog.Topic() call
	LogsWithTopics map[common.Hash][][]Topic

	ParseLog ParseLogFunc

	// Minimum number of block confirmations before the log is received
	MinIncomingConfirmations uint32
}

type LogBroadcast added in v0.10.10

type LogBroadcast struct {
	BlockHash common.Hash
	Consumed  bool
	LogIndex  uint
	JobID     int32
}

LogBroadcast - data from log_broadcasts table columns

func (LogBroadcast) AsKey added in v0.10.10

func (b LogBroadcast) AsKey() LogBroadcastAsKey

type LogBroadcastAsKey added in v0.10.10

type LogBroadcastAsKey struct {
	BlockHash common.Hash
	LogIndex  uint
	JobId     int32
}

LogBroadcastAsKey - used as key in a map to filter out already consumed logs

func NewLogBroadcastAsKey added in v0.10.10

func NewLogBroadcastAsKey(log types.Log, listener Listener) LogBroadcastAsKey

type NullBroadcaster added in v0.10.8

type NullBroadcaster struct{ ErrMsg string }

func (*NullBroadcaster) AddDependents added in v0.10.8

func (n *NullBroadcaster) AddDependents(int)

func (*NullBroadcaster) AwaitDependents added in v0.10.8

func (n *NullBroadcaster) AwaitDependents() <-chan struct{}

func (*NullBroadcaster) BackfillBlockNumber added in v0.10.11

func (n *NullBroadcaster) BackfillBlockNumber() null.Int64

func (*NullBroadcaster) Close added in v0.10.8

func (n *NullBroadcaster) Close() error

func (*NullBroadcaster) DependentReady added in v0.10.8

func (n *NullBroadcaster) DependentReady()

func (*NullBroadcaster) Healthy added in v0.10.8

func (n *NullBroadcaster) Healthy() error

func (*NullBroadcaster) IsConnected added in v0.10.8

func (n *NullBroadcaster) IsConnected() bool

func (*NullBroadcaster) LogsFromBlock added in v1.1.0

func (n *NullBroadcaster) LogsFromBlock(common.Hash) int

func (*NullBroadcaster) MarkConsumed added in v0.10.8

func (n *NullBroadcaster) MarkConsumed(lb Broadcast, qopts ...pg.QOpt) error

func (*NullBroadcaster) OnNewLongestChain added in v0.10.8

func (n *NullBroadcaster) OnNewLongestChain(context.Context, *eth.Head)

func (*NullBroadcaster) Pause added in v1.1.0

func (n *NullBroadcaster) Pause()

func (*NullBroadcaster) Ready added in v0.10.8

func (n *NullBroadcaster) Ready() error

func (*NullBroadcaster) Register added in v0.10.8

func (n *NullBroadcaster) Register(listener Listener, opts ListenerOpts) (unsubscribe func())

func (*NullBroadcaster) ReplayFromBlock added in v0.10.11

func (n *NullBroadcaster) ReplayFromBlock(number int64)

func (*NullBroadcaster) Resume added in v1.1.0

func (n *NullBroadcaster) Resume()

func (*NullBroadcaster) Start added in v0.10.8

func (n *NullBroadcaster) Start() error

func (*NullBroadcaster) TrackedAddressesCount added in v0.10.8

func (n *NullBroadcaster) TrackedAddressesCount() uint32

func (*NullBroadcaster) WasAlreadyConsumed added in v0.10.8

func (n *NullBroadcaster) WasAlreadyConsumed(lb Broadcast, qopts ...pg.QOpt) (bool, error)

type ORM added in v0.10.3

type ORM interface {
	// FindBroadcasts returns broadcasts for a range of block numbers, both consumed and unconsumed.
	FindBroadcasts(fromBlockNum int64, toBlockNum int64) ([]LogBroadcast, error)
	// CreateBroadcast inserts an unconsumed log broadcast for jobID.
	CreateBroadcast(blockHash common.Hash, blockNumber uint64, logIndex uint, jobID int32, qopts ...pg.QOpt) error
	// WasBroadcastConsumed returns true if jobID consumed the log broadcast.
	WasBroadcastConsumed(blockHash common.Hash, logIndex uint, jobID int32, qopts ...pg.QOpt) (bool, error)
	// MarkBroadcastConsumed marks the log broadcast as consumed by jobID.
	MarkBroadcastConsumed(blockHash common.Hash, blockNumber uint64, logIndex uint, jobID int32, qopts ...pg.QOpt) error

	// SetPendingMinBlock sets the minimum block number for which there are pending broadcasts in the pool, or nil if empty.
	SetPendingMinBlock(blockNum *int64, qopts ...pg.QOpt) error
	// GetPendingMinBlock returns the minimum block number for which there were pending broadcasts in the pool, or nil if it was empty.
	GetPendingMinBlock(qopts ...pg.QOpt) (blockNumber *int64, err error)

	// Reinitialize cleans up the database by removing any unconsumed broadcasts, then updating (if necessary) and
	// returning the pending minimum block number.
	Reinitialize(qopts ...pg.QOpt) (blockNumber *int64, err error)
}

ORM is the interface for log broadcasts.

  • Unconsumed broadcasts are created just before notifying subscribers, who are responsible for marking them consumed.
  • Pending broadcast block numbers are synced to the min from the pool (or deleted when empty)
  • On reboot, backfill considers the min block number from unconsumed and pending broadcasts. Additionally, unconsumed entries are removed and the pending broadcasts number updated.

type ParseLogFunc added in v0.10.9

type ParseLogFunc func(log types.Log) (generated.AbigenLog, error)

type Topic added in v0.10.6

type Topic common.Hash

type Uint64 added in v0.10.10

type Uint64 uint64

func (Uint64) Compare added in v0.10.10

func (a Uint64) Compare(b heaps.Item) int

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL