assembler

package
v0.0.19 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 3, 2025 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	BATCH_CACHE_TAG     = "cache"
	FORCE_PUT_CACHE_TAG = "force put batches"
)

Variables

View Source
var (
	ErrBatchAlreadyExists = errors.New("batch already exists")
	ErrBatchDoesNotExist  = errors.New("batch does not exist")
	ErrBatchTooLarge      = errors.New("batch too large")
)

Functions

func BatchToString

func BatchToString(batchID types.BatchID) string

TODO: use stringer/formatter/gostringer for more general solution

func ShortDigestString added in v0.0.19

func ShortDigestString(digest []byte) string

ShortDigestString provides a short string from a potentially long (32B) digest.

Types

type Assembler

type Assembler struct {
	// contains filtered or unexported fields
}

func NewAssembler

func NewAssembler(config *config.AssemblerNodeConfig, net NetStopper, genesisBlock *common.Block, logger types.Logger) *Assembler

func NewDefaultAssembler

func NewDefaultAssembler(
	logger types.Logger,
	net NetStopper,
	config *config.AssemblerNodeConfig,
	genesisBlock *common.Block,
	assemblerLedgerFactory node_ledger.AssemblerLedgerFactory,
	prefetchIndexFactory PrefetchIndexerFactory,
	prefetcherFactory PrefetcherFactory,
	batchBringerFactory BatchBringerFactory,
	consensusBringerFactory delivery.ConsensusBringerFactory,
) *Assembler

func (*Assembler) Broadcast

func (*Assembler) Deliver

func (*Assembler) GetTxCount

func (a *Assembler) GetTxCount() uint64

func (*Assembler) Stop

func (a *Assembler) Stop()

type AssemblerIndex added in v0.0.19

type AssemblerIndex interface {
	PopOrWait(batchId types.BatchID) (types.Batch, error)
	Put(batch types.Batch) error
	Stop()
}

type AssemblerLedgerWriter added in v0.0.19

type AssemblerLedgerWriter interface {
	Append(batch types.Batch, orderingInfo types.OrderingInfo)
	Close()
}

type AssemblerRole added in v0.0.19

type AssemblerRole struct {
	ShardCount                        int
	Ledger                            AssemblerLedgerWriter
	Logger                            types.Logger
	OrderedBatchAttestationReplicator OrderedBatchAttestationReplicator
	Replicator                        BatchReplicator
	Index                             AssemblerIndex
	Shards                            []types.ShardID
	// contains filtered or unexported fields
}

func (*AssemblerRole) Run added in v0.0.19

func (a *AssemblerRole) Run()

func (*AssemblerRole) WaitTermination added in v0.0.19

func (a *AssemblerRole) WaitTermination()

WaitTermination the core Assembler is stopped by the node Assembler when the channels for batches and BAs are closed. This methods only waits for the core go routines to finish.

type BatchBringer

type BatchBringer interface {
	Replicate(shardID types.ShardID) <-chan types.Batch
	GetBatch(batchID types.BatchID) (types.Batch, error)
	Stop()
}

type BatchBringerFactory

type BatchBringerFactory interface {
	Create(initialBatchFrontier map[types.ShardID]map[types.PartyID]types.BatchSequence, config *config.AssemblerNodeConfig, logger types.Logger) BatchBringer
}

type BatchCache

type BatchCache struct {
	// contains filtered or unexported fields
}

func NewBatchCache

func NewBatchCache(partition ShardPrimary, tag string) *BatchCache

func (*BatchCache) Get

func (bc *BatchCache) Get(batchId types.BatchID) (types.Batch, error)

func (*BatchCache) Has

func (bc *BatchCache) Has(batchId types.BatchID) bool

func (*BatchCache) Pop

func (bc *BatchCache) Pop(batchId types.BatchID) (types.Batch, error)

func (*BatchCache) Put

func (bc *BatchCache) Put(batch types.Batch) error

func (*BatchCache) SizeBytes

func (bc *BatchCache) SizeBytes() int

type BatchCacheFactory

type BatchCacheFactory interface {
	CreateWithTag(partition ShardPrimary, tag string) *BatchCache
	Create(partition ShardPrimary) *BatchCache
}

type BatchFetcher

type BatchFetcher struct {
	// contains filtered or unexported fields
}

func NewBatchFetcher

func NewBatchFetcher(initialBatchFrontier map[types.ShardID]map[types.PartyID]types.BatchSequence, config *config.AssemblerNodeConfig, logger types.Logger) *BatchFetcher

func (*BatchFetcher) GetBatch

func (br *BatchFetcher) GetBatch(batchID types.BatchID) (types.Batch, error)

GetBatch polls every batcher in the shard for a batch that has a specific batchID. The ShardID is taken from the batchID. It polls all the batchers in parallel but cancels the requests as soon as the first match is found. The Arma protocol ensures that if the batchID is from consensus, at least one batcher in the shard has it.

func (*BatchFetcher) Replicate

func (br *BatchFetcher) Replicate(shardID types.ShardID) <-chan types.Batch

func (*BatchFetcher) Stop

func (br *BatchFetcher) Stop()

type BatchHeap

type BatchHeap[T any] struct {
	// contains filtered or unexported fields
}

BatchHeap is a min (according to the given less comparator) heap for batches, the top item will be the batch with the min value. Not thread safe.

func NewBatchHeap

func NewBatchHeap[T any](partition ShardPrimary, less HeapLessComperator[T]) *BatchHeap[T]

func (*BatchHeap[T]) Has

func (h *BatchHeap[T]) Has(batchId types.BatchID) bool

Has returns true if batch exists

func (*BatchHeap[T]) Peek

func (h *BatchHeap[T]) Peek() *BatchHeapItem[T]

Peek retrieves next batch by order

func (*BatchHeap[T]) Pop

func (h *BatchHeap[T]) Pop() *BatchHeapItem[T]

Pop next batch by order

func (*BatchHeap[T]) Push

func (h *BatchHeap[T]) Push(item *BatchHeapItem[T]) error

Push adds new batch

func (*BatchHeap[T]) Remove

func (h *BatchHeap[T]) Remove(batchId types.BatchID) (*BatchHeapItem[T], error)

Remove batch from heap

type BatchHeapItem

type BatchHeapItem[T any] struct {
	Batch types.Batch
	Value T
}

type BatchMapper

type BatchMapper[K types.BatchID, V any] struct {
	// contains filtered or unexported fields
}

BatchMapper is an efficient generic mapper from types.BatchID derived type to any type. Note that shard and party of all the batches need to be the same or else a panic will be raised. Not thread safe.

func NewBatchMapper

func NewBatchMapper[K types.BatchID, V any](partition ShardPrimary) *BatchMapper[K, V]

func (*BatchMapper[K, V]) Get

func (m *BatchMapper[K, V]) Get(batchId K) (V, error)

func (*BatchMapper[K, V]) Has

func (m *BatchMapper[K, V]) Has(batchId K) bool

func (*BatchMapper[K, V]) Insert

func (m *BatchMapper[K, V]) Insert(batchId K, value V) bool

func (*BatchMapper[K, V]) Put

func (m *BatchMapper[K, V]) Put(batchId K, value V)

func (*BatchMapper[K, V]) Remove

func (m *BatchMapper[K, V]) Remove(batchId K) (V, error)

type BatchReplicator added in v0.0.19

type BatchReplicator interface {
	Replicate(shardID types.ShardID) <-chan types.Batch
}

type DefaultBatchBringerFactory

type DefaultBatchBringerFactory struct{}

func (*DefaultBatchBringerFactory) Create

type DefaultBatchCacheFactory

type DefaultBatchCacheFactory struct{}

func (*DefaultBatchCacheFactory) Create

func (dbcf *DefaultBatchCacheFactory) Create(partition ShardPrimary) *BatchCache

func (*DefaultBatchCacheFactory) CreateWithTag

func (dbcf *DefaultBatchCacheFactory) CreateWithTag(partition ShardPrimary, tag string) *BatchCache

type DefaultPartitionPrefetchIndexerFactory

type DefaultPartitionPrefetchIndexerFactory struct{}

func (*DefaultPartitionPrefetchIndexerFactory) Create

func (f *DefaultPartitionPrefetchIndexerFactory) Create(partition ShardPrimary, logger types.Logger, defaultTtl time.Duration, maxSizeBytes int, timerFactory TimerFactory, batchCacheFactory BatchCacheFactory, batchRequestChan chan types.BatchID, popWaitMonitorTimeout time.Duration) PartitionPrefetchIndexer

type DefaultPrefetchIndexerFactory

type DefaultPrefetchIndexerFactory struct{}

func (*DefaultPrefetchIndexerFactory) Create

func (f *DefaultPrefetchIndexerFactory) Create(
	shards []types.ShardID,
	parties []types.PartyID,
	logger types.Logger,
	defaultTtl time.Duration,
	maxPartitionSizeBytes int,
	requestChannelSize int,
	timerFactory TimerFactory,
	batchCacheFactory BatchCacheFactory,
	partitionPrefetchIndexerFactory PartitionPrefetchIndexerFactory,
	popWaitMonitorTimeout time.Duration,
) PrefetchIndexer

type DefaultPrefetcherFactory

type DefaultPrefetcherFactory struct{}

func (*DefaultPrefetcherFactory) Create

func (f *DefaultPrefetcherFactory) Create(shards []types.ShardID, parties []types.PartyID, prefetchIndex PrefetchIndexer, batchFetcher BatchBringer, logger types.Logger) PrefetcherController

type DefaultTimerFactory

type DefaultTimerFactory struct{}

func (*DefaultTimerFactory) Create

func (dtf *DefaultTimerFactory) Create(d time.Duration, f func()) StoppableTimer

type HeapLessComperator

type HeapLessComperator[T any] func(item1, item2 *BatchHeapItem[T]) bool

type NetStopper

type NetStopper interface {
	Stop()
}

type OrderedBatchAttestationReplicator added in v0.0.19

type OrderedBatchAttestationReplicator interface {
	Replicate() <-chan types.OrderedBatchAttestation
}

type PartitionPrefetchIndex

type PartitionPrefetchIndex struct {
	// contains filtered or unexported fields
}

PartitionPrefetchIndex handles the prefetch actions for a specific partition (shard + primary),

The core purpose of this struct is to pair batch ids with full batches:

  1. An external module, with separate a goroutine, is getting batch ids from the Consensus and is calling PopOrWait to get the full batch. Please note that there is no constraint on the sequence of the batch.
  2. An external module, with separate a goroutine, "stream fetcher" is getting the full batch from the Batcher and using Put to insert the batch into the indexer. Please note that it is assumed that the batches are arriving and inserted with Put in a stream of increasing sequence numbers.
  3. An external module, with separate a goroutine, "unary fetcher" is fetching a single full batch upon specific request from the Batcher and using PutForce to insert the batch into the indexer.

Here is a detailed technical explanation:

Getting batches - PopOrWait:

  1. A batch id is extracted from a Consensus decision, PopOrWait is called with this batch id.

  2. PopOrWait updates the max requested batch sequence - maxPoppedCallSeq and Broadcasts on stateCond. It is important for the operation of Put - this variable indicates for the Put method if the batch it is trying to put is relevant or it should wait.

  3. PopOrWait tries to pop the relevant batch from it's cache, if the batch already exists, the batch is removed from the indexer and returned.

  4. If the batch does not exist, we request the batch from the unary fetcher external module only if: a. This batch was not requested by the current call to the method (we avoid multiple requests). b. This batch sequence is less than the sequence of the most recently Put batch (since Put is called with increasing sequence, and we already passed it, we need to make a separate request). c. This batch sequence is equal to the sequence of the most recently Put batch - in this case this is a batch with the same <shard, primary, sequence> but with different digests (otherwise we would have returned it).

  5. We keep waiting on stateCond, waiting for Put or PutForce to add the desired batch.

    Note: a. If the sequence is higher than the sequence of the most recently Put batch - we wait for it to be added to the index using Put. b. If the context is cancelled, the method will return with an error.

Put:

  1. If the batch is too large, it will return ErrBatchTooLarge.

  2. If the batch is already in the index, it will return ErrBatchAlreadyExists.

  3. Aquire the lock.

  4. We update lastPutSeqRequest with the sequence of the batch.

  5. If there is space in the index - put the batch, Broadcast on stateCond and return.

  6. While there is no space for the batch: a. If the sequence of the batch is lower than the max popped sequence (stored in `maxPoppedCallSeq`) we are prefetching too fast, wait (sync.Cond wait) for the batches in the index to be popped prior to putting this batch. b. Else - we remove the oldest batch from the index.

  7. There is a free space - insert the batch into the index.

  8. Release the lock.

    Note: If the context is cancelled, the method will return with an error.

PutForce:

If PopOrWait has requested a batch, this batch needs to be put into the index from an external module using PutForce, unlike the regular Put that has space constraints, this put always succeeds immediately, since this batch is needed now.

Auto Eviction: Each batch is put with a TTL timer, after it is expired, we automatically remove the batch and Broadcast on stateCond

func NewPartitionPrefetchIndex

func NewPartitionPrefetchIndex(partition ShardPrimary, logger types.Logger, defaultTtl time.Duration, maxSizeBytes int, timerFactory TimerFactory, batchCacheFactory BatchCacheFactory, batchRequestChan chan types.BatchID, popWaitMonitorTimeout time.Duration) *PartitionPrefetchIndex

func (*PartitionPrefetchIndex) PopOrWait

func (pi *PartitionPrefetchIndex) PopOrWait(batchId types.BatchID) (types.Batch, error)

func (*PartitionPrefetchIndex) Put

func (pi *PartitionPrefetchIndex) Put(batch types.Batch) error

func (*PartitionPrefetchIndex) PutForce

func (bs *PartitionPrefetchIndex) PutForce(batch types.Batch) error

func (*PartitionPrefetchIndex) Stop

func (bs *PartitionPrefetchIndex) Stop()

type PartitionPrefetchIndexer

type PartitionPrefetchIndexer interface {
	PopOrWait(batchId types.BatchID) (types.Batch, error)
	Put(batch types.Batch) error
	PutForce(batch types.Batch) error
	Stop()
}

type PartitionPrefetchIndexerFactory

type PartitionPrefetchIndexerFactory interface {
	Create(
		partition ShardPrimary,
		logger types.Logger,
		defaultTtl time.Duration,
		maxSizeBytes int,
		timerFactory TimerFactory,
		batchCacheFactory BatchCacheFactory,
		batchRequestChan chan types.BatchID,
		popWaitMonitorTimeout time.Duration) PartitionPrefetchIndexer
}

type PrefetchIndex

type PrefetchIndex struct {
	// contains filtered or unexported fields
}

func NewPrefetchIndex

func NewPrefetchIndex(
	shards []types.ShardID,
	parties []types.PartyID,
	logger types.Logger,
	defaultTtl time.Duration,
	maxPartitionSizeBytes int,
	requestChannelSize int,
	timerFactory TimerFactory,
	batchCacheFactory BatchCacheFactory,
	partitionPrefetchIndexerFactory PartitionPrefetchIndexerFactory,
	popWaitMonitorTimeout time.Duration,
) *PrefetchIndex

func (*PrefetchIndex) PopOrWait

func (pi *PrefetchIndex) PopOrWait(batchId types.BatchID) (types.Batch, error)

func (*PrefetchIndex) Put

func (pi *PrefetchIndex) Put(batch types.Batch) error

func (*PrefetchIndex) PutForce

func (pi *PrefetchIndex) PutForce(batch types.Batch) error

func (*PrefetchIndex) Requests

func (pi *PrefetchIndex) Requests() <-chan types.BatchID

func (*PrefetchIndex) Stop

func (pi *PrefetchIndex) Stop()

type PrefetchIndexer

type PrefetchIndexer interface {
	PopOrWait(batchId types.BatchID) (types.Batch, error)
	Put(batch types.Batch) error
	PutForce(batch types.Batch) error
	Requests() <-chan types.BatchID
	Stop()
}

type PrefetchIndexerFactory

type PrefetchIndexerFactory interface {
	Create(
		shards []types.ShardID,
		parties []types.PartyID,
		logger types.Logger,
		defaultTtl time.Duration,
		maxPartitionSizeBytes int,
		requestChannelSize int,
		timerFactory TimerFactory,
		batchCacheFactory BatchCacheFactory,
		partitionPrefetchIndexerFactory PartitionPrefetchIndexerFactory,
		popWaitMonitorTimeout time.Duration,
	) PrefetchIndexer
}

type Prefetcher

type Prefetcher struct {
	// contains filtered or unexported fields
}

func NewPrefetcher

func NewPrefetcher(
	shards []types.ShardID,
	parties []types.PartyID,
	prefetchIndex PrefetchIndexer,
	batchFetcher BatchBringer,
	logger types.Logger,
) *Prefetcher

func (*Prefetcher) Start

func (p *Prefetcher) Start()

Starts the prefetcher.

func (*Prefetcher) Stop

func (p *Prefetcher) Stop()

Stops the prefetcher.

type PrefetcherController

type PrefetcherController interface {
	Start()
	Stop()
}

type PrefetcherFactory

type PrefetcherFactory interface {
	Create(shards []types.ShardID, parties []types.PartyID, prefetchIndex PrefetchIndexer, batchFetcher BatchBringer, logger types.Logger) PrefetcherController
}

type ShardPrimary

type ShardPrimary struct {
	Shard   types.ShardID
	Primary types.PartyID
}

func ShardPrimaryFromBatch

func ShardPrimaryFromBatch(batchId types.BatchID) ShardPrimary

type StoppableTimer

type StoppableTimer interface {
	Stop() bool
}

type TimerFactory

type TimerFactory interface {
	Create(time.Duration, func()) StoppableTimer
}

Directories

Path Synopsis
Code generated by counterfeiter.
Code generated by counterfeiter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL