de

package
v0.1.15-0...-5f9f748 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2026 License: MIT Imports: 34 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DeleteQueueMaxSize is a sanity limit to avoid unbounded memory consumption.
	// We should never get anywhere close to this under normal operation.
	DeleteQueueMaxSize = 1_000_000
	// DeleteBatchSize is the max number of transmission records to delete
	// in one query. Setting this larger may reduce overall total transaction
	// load on the DB at the expense of blocking inserts for longer.
	DeleteBatchSize = 1_000
	// FlushDeletesFrequency controls how often we wake up to check if there
	// are records in the delete queue, and if so, attempt to drain the queue
	// and delete them all.
	FlushDeletesFrequency = 15 * time.Second

	// PruneFrequency controls how often we wake up to check to see if the
	// transmissions table has exceeded its allowed size, and if so, truncate
	// it. This should already be automatically handled by the transmission
	// queue calling AsyncDelete, but it's here anyway for safety.
	PruneFrequency = 1 * time.Hour
	// PruneBatchSize is the max number of transmission records to delete in
	// one query when pruning the table.
	PruneBatchSize = 10_000

	// OvertimeDeleteTimeout is the maximum time we will spend trying to delete
	// queued transmissions after exit signal before giving up and logging an
	// error.
	OvertimeDeleteTimeout = 2 * time.Second
)
View Source
const (
	// Mercury server error codes
	DuplicateReport = 2
)

Variables

This section is empty.

Functions

func NewPersistenceManager

func NewPersistenceManager(lggr logger.Logger, orm ORM, serverURL string, maxTransmitQueueSize int, flushDeletesFrequency, pruneFrequency, maxAge time.Duration) *persistenceManager

Types

type Config

type Config interface {
	Protocol() MercuryTransmitterProtocol
	ReaperMaxAge() time.Duration
	TransmitConcurrency() uint32
	TransmitQueueMaxSize() uint32
	TransmitTimeout() time.Duration
}

type Mercury

type Mercury interface {
	Credentials(credName string) *types.MercuryCredentials
	Cache() MercuryCache
	TLS() MercuryTLS
	Transmitter() MercuryTransmitter
	VerboseLogging() bool
}

type MercuryCache

type MercuryCache interface {
	LatestReportTTL() time.Duration
	MaxStaleAge() time.Duration
	LatestReportDeadline() time.Duration
}

type MercuryTLS

type MercuryTLS interface {
	CertFile() string
}

type MercuryTransmitter

type MercuryTransmitter interface {
	Protocol() MercuryTransmitterProtocol
	TransmitQueueMaxSize() uint32
	TransmitTimeout() time.Duration
	TransmitConcurrency() uint32
	ReaperFrequency() time.Duration
	ReaperMaxAge() time.Duration
}

type MercuryTransmitterProtocol

type MercuryTransmitterProtocol string
const (
	MercuryTransmitterProtocolWSRPC MercuryTransmitterProtocol = "wsrpc"
	MercuryTransmitterProtocolGRPC  MercuryTransmitterProtocol = "grpc"
)

func (MercuryTransmitterProtocol) String

func (*MercuryTransmitterProtocol) UnmarshalText

func (m *MercuryTransmitterProtocol) UnmarshalText(text []byte) error

type ORM

type ORM interface {
	DonID() uint32
	Insert(ctx context.Context, transmissions []*Transmission) error
	Delete(ctx context.Context, hashes [][32]byte) error
	Get(ctx context.Context, serverURL string, limit int, maxAge time.Duration) ([]*Transmission, error)
	Prune(ctx context.Context, serverURL string, maxSize, batchSize int) (int64, error)
}

ORM is scoped to a single DON ID

func NewORM

func NewORM(ds sqlutil.DataSource, donID uint32) ORM

type Opts

type Opts struct {
	Lggr                 logger.Logger
	VerboseLogging       bool
	Cfg                  Config
	Clients              map[string]rpc.Client
	FromAccount          string
	DonID                uint32
	ORM                  ORM
	CapabilitiesRegistry coretypes.CapabilitiesRegistry
}

type QueueConfig

type QueueConfig interface {
	ReaperMaxAge() time.Duration
	TransmitQueueMaxSize() uint32
	TransmitTimeout() time.Duration
}

type ReportPacker

type ReportPacker interface {
	Pack(digest types.ConfigDigest, seqNr uint64, report ocr2types.Report, sigs []ocr2types.AttributedOnchainSignature) ([]byte, error)
}

type Transmission

type Transmission struct {
	ServerURL    string
	ConfigDigest types.ConfigDigest
	SeqNr        uint64
	Report       ocr3types.ReportWithInfo[llotypes.ReportInfo]
	Sigs         []types.AttributedOnchainSignature
}

func (Transmission) Hash

func (t Transmission) Hash() [32]byte

Hash takes sha256 hash of all fields

type TransmitQueue

type TransmitQueue interface {
	services.Service

	BlockingPop() (t *Transmission)
	Push(t *Transmission) (ok bool)
	Init(ts []*Transmission) error
	IsEmpty() bool
}

func NewTransmitQueue

func NewTransmitQueue(lggr logger.Logger, serverURL string, maxlen int, asyncDeleter asyncDeleter) TransmitQueue

maxlen controls how many items will be stored in the queue 0 means unlimited - be careful, this can cause memory leaks

type Transmitter

type Transmitter interface {
	llotypes.Transmitter
	services.Service
}

func New

func New(opts Opts) Transmitter

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL