Documentation
¶
Index ¶
- Constants
- Variables
- func BatchToString(batchID types.BatchID) string
- func ShortDigestString(digest []byte) string
- type Assembler
- type AssemblerIndex
- type AssemblerLedgerWriter
- type AssemblerRole
- type BatchBringer
- type BatchBringerFactory
- type BatchCache
- type BatchCacheFactory
- type BatchFetcher
- type BatchHeap
- type BatchHeapItem
- type BatchMapper
- type BatchReplicator
- type DefaultBatchBringerFactory
- type DefaultBatchCacheFactory
- type DefaultPartitionPrefetchIndexerFactory
- type DefaultPrefetchIndexerFactory
- type DefaultPrefetcherFactory
- type DefaultTimerFactory
- type HeapLessComperator
- type NetStopper
- type OrderedBatchAttestationReplicator
- type PartitionPrefetchIndex
- type PartitionPrefetchIndexer
- type PartitionPrefetchIndexerFactory
- type PrefetchIndex
- type PrefetchIndexer
- type PrefetchIndexerFactory
- type Prefetcher
- type PrefetcherController
- type PrefetcherFactory
- type ShardPrimary
- type StoppableTimer
- type TimerFactory
Constants ¶
const ( BATCH_CACHE_TAG = "cache" FORCE_PUT_CACHE_TAG = "force put batches" )
Variables ¶
Functions ¶
func BatchToString ¶
TODO: use stringer/formatter/gostringer for more general solution
func ShortDigestString ¶ added in v0.0.19
ShortDigestString provides a short string from a potentially long (32B) digest.
Types ¶
type Assembler ¶
type Assembler struct {
// contains filtered or unexported fields
}
func NewAssembler ¶
func NewAssembler(config *config.AssemblerNodeConfig, net NetStopper, genesisBlock *common.Block, logger types.Logger) *Assembler
func NewDefaultAssembler ¶
func NewDefaultAssembler( logger types.Logger, net NetStopper, config *config.AssemblerNodeConfig, genesisBlock *common.Block, assemblerLedgerFactory node_ledger.AssemblerLedgerFactory, prefetchIndexFactory PrefetchIndexerFactory, prefetcherFactory PrefetcherFactory, batchBringerFactory BatchBringerFactory, consensusBringerFactory delivery.ConsensusBringerFactory, ) *Assembler
func (*Assembler) Broadcast ¶
func (a *Assembler) Broadcast(server orderer.AtomicBroadcast_BroadcastServer) error
func (*Assembler) Deliver ¶
func (a *Assembler) Deliver(server orderer.AtomicBroadcast_DeliverServer) error
func (*Assembler) GetTxCount ¶
type AssemblerIndex ¶ added in v0.0.19
type AssemblerLedgerWriter ¶ added in v0.0.19
type AssemblerLedgerWriter interface { Append(batch types.Batch, orderingInfo types.OrderingInfo) Close() }
type AssemblerRole ¶ added in v0.0.19
type AssemblerRole struct { ShardCount int Ledger AssemblerLedgerWriter Logger types.Logger OrderedBatchAttestationReplicator OrderedBatchAttestationReplicator Replicator BatchReplicator Index AssemblerIndex Shards []types.ShardID // contains filtered or unexported fields }
func (*AssemblerRole) Run ¶ added in v0.0.19
func (a *AssemblerRole) Run()
func (*AssemblerRole) WaitTermination ¶ added in v0.0.19
func (a *AssemblerRole) WaitTermination()
WaitTermination the core Assembler is stopped by the node Assembler when the channels for batches and BAs are closed. This methods only waits for the core go routines to finish.
type BatchBringer ¶
type BatchBringerFactory ¶
type BatchBringerFactory interface {
Create(initialBatchFrontier map[types.ShardID]map[types.PartyID]types.BatchSequence, config *config.AssemblerNodeConfig, logger types.Logger) BatchBringer
}
type BatchCache ¶
type BatchCache struct {
// contains filtered or unexported fields
}
func NewBatchCache ¶
func NewBatchCache(partition ShardPrimary, tag string) *BatchCache
func (*BatchCache) SizeBytes ¶
func (bc *BatchCache) SizeBytes() int
type BatchCacheFactory ¶
type BatchCacheFactory interface { CreateWithTag(partition ShardPrimary, tag string) *BatchCache Create(partition ShardPrimary) *BatchCache }
type BatchFetcher ¶
type BatchFetcher struct {
// contains filtered or unexported fields
}
func NewBatchFetcher ¶
func NewBatchFetcher(initialBatchFrontier map[types.ShardID]map[types.PartyID]types.BatchSequence, config *config.AssemblerNodeConfig, logger types.Logger) *BatchFetcher
func (*BatchFetcher) GetBatch ¶
GetBatch polls every batcher in the shard for a batch that has a specific batchID. The ShardID is taken from the batchID. It polls all the batchers in parallel but cancels the requests as soon as the first match is found. The Arma protocol ensures that if the batchID is from consensus, at least one batcher in the shard has it.
func (*BatchFetcher) Replicate ¶
func (br *BatchFetcher) Replicate(shardID types.ShardID) <-chan types.Batch
func (*BatchFetcher) Stop ¶
func (br *BatchFetcher) Stop()
type BatchHeap ¶
type BatchHeap[T any] struct { // contains filtered or unexported fields }
BatchHeap is a min (according to the given less comparator) heap for batches, the top item will be the batch with the min value. Not thread safe.
func NewBatchHeap ¶
func NewBatchHeap[T any](partition ShardPrimary, less HeapLessComperator[T]) *BatchHeap[T]
func (*BatchHeap[T]) Peek ¶
func (h *BatchHeap[T]) Peek() *BatchHeapItem[T]
Peek retrieves next batch by order
func (*BatchHeap[T]) Push ¶
func (h *BatchHeap[T]) Push(item *BatchHeapItem[T]) error
Push adds new batch
type BatchHeapItem ¶
type BatchMapper ¶
BatchMapper is an efficient generic mapper from types.BatchID derived type to any type. Note that shard and party of all the batches need to be the same or else a panic will be raised. Not thread safe.
func NewBatchMapper ¶
func NewBatchMapper[K types.BatchID, V any](partition ShardPrimary) *BatchMapper[K, V]
func (*BatchMapper[K, V]) Get ¶
func (m *BatchMapper[K, V]) Get(batchId K) (V, error)
func (*BatchMapper[K, V]) Has ¶
func (m *BatchMapper[K, V]) Has(batchId K) bool
func (*BatchMapper[K, V]) Insert ¶
func (m *BatchMapper[K, V]) Insert(batchId K, value V) bool
func (*BatchMapper[K, V]) Put ¶
func (m *BatchMapper[K, V]) Put(batchId K, value V)
func (*BatchMapper[K, V]) Remove ¶
func (m *BatchMapper[K, V]) Remove(batchId K) (V, error)
type BatchReplicator ¶ added in v0.0.19
type DefaultBatchBringerFactory ¶
type DefaultBatchBringerFactory struct{}
func (*DefaultBatchBringerFactory) Create ¶
func (f *DefaultBatchBringerFactory) Create(initialBatchFrontier map[types.ShardID]map[types.PartyID]types.BatchSequence, config *config.AssemblerNodeConfig, logger types.Logger) BatchBringer
type DefaultBatchCacheFactory ¶
type DefaultBatchCacheFactory struct{}
func (*DefaultBatchCacheFactory) Create ¶
func (dbcf *DefaultBatchCacheFactory) Create(partition ShardPrimary) *BatchCache
func (*DefaultBatchCacheFactory) CreateWithTag ¶
func (dbcf *DefaultBatchCacheFactory) CreateWithTag(partition ShardPrimary, tag string) *BatchCache
type DefaultPartitionPrefetchIndexerFactory ¶
type DefaultPartitionPrefetchIndexerFactory struct{}
func (*DefaultPartitionPrefetchIndexerFactory) Create ¶
func (f *DefaultPartitionPrefetchIndexerFactory) Create(partition ShardPrimary, logger types.Logger, defaultTtl time.Duration, maxSizeBytes int, timerFactory TimerFactory, batchCacheFactory BatchCacheFactory, batchRequestChan chan types.BatchID, popWaitMonitorTimeout time.Duration) PartitionPrefetchIndexer
type DefaultPrefetchIndexerFactory ¶
type DefaultPrefetchIndexerFactory struct{}
func (*DefaultPrefetchIndexerFactory) Create ¶
func (f *DefaultPrefetchIndexerFactory) Create( shards []types.ShardID, parties []types.PartyID, logger types.Logger, defaultTtl time.Duration, maxPartitionSizeBytes int, requestChannelSize int, timerFactory TimerFactory, batchCacheFactory BatchCacheFactory, partitionPrefetchIndexerFactory PartitionPrefetchIndexerFactory, popWaitMonitorTimeout time.Duration, ) PrefetchIndexer
type DefaultPrefetcherFactory ¶
type DefaultPrefetcherFactory struct{}
func (*DefaultPrefetcherFactory) Create ¶
func (f *DefaultPrefetcherFactory) Create(shards []types.ShardID, parties []types.PartyID, prefetchIndex PrefetchIndexer, batchFetcher BatchBringer, logger types.Logger) PrefetcherController
type DefaultTimerFactory ¶
type DefaultTimerFactory struct{}
func (*DefaultTimerFactory) Create ¶
func (dtf *DefaultTimerFactory) Create(d time.Duration, f func()) StoppableTimer
type HeapLessComperator ¶
type HeapLessComperator[T any] func(item1, item2 *BatchHeapItem[T]) bool
type NetStopper ¶
type NetStopper interface {
Stop()
}
type OrderedBatchAttestationReplicator ¶ added in v0.0.19
type OrderedBatchAttestationReplicator interface {
Replicate() <-chan types.OrderedBatchAttestation
}
type PartitionPrefetchIndex ¶
type PartitionPrefetchIndex struct {
// contains filtered or unexported fields
}
PartitionPrefetchIndex handles the prefetch actions for a specific partition (shard + primary),
The core purpose of this struct is to pair batch ids with full batches:
- An external module, with separate a goroutine, is getting batch ids from the Consensus and is calling PopOrWait to get the full batch. Please note that there is no constraint on the sequence of the batch.
- An external module, with separate a goroutine, "stream fetcher" is getting the full batch from the Batcher and using Put to insert the batch into the indexer. Please note that it is assumed that the batches are arriving and inserted with Put in a stream of increasing sequence numbers.
- An external module, with separate a goroutine, "unary fetcher" is fetching a single full batch upon specific request from the Batcher and using PutForce to insert the batch into the indexer.
Here is a detailed technical explanation:
Getting batches - PopOrWait:
A batch id is extracted from a Consensus decision, PopOrWait is called with this batch id.
PopOrWait updates the max requested batch sequence - maxPoppedCallSeq and Broadcasts on stateCond. It is important for the operation of Put - this variable indicates for the Put method if the batch it is trying to put is relevant or it should wait.
PopOrWait tries to pop the relevant batch from it's cache, if the batch already exists, the batch is removed from the indexer and returned.
If the batch does not exist, we request the batch from the unary fetcher external module only if: a. This batch was not requested by the current call to the method (we avoid multiple requests). b. This batch sequence is less than the sequence of the most recently Put batch (since Put is called with increasing sequence, and we already passed it, we need to make a separate request). c. This batch sequence is equal to the sequence of the most recently Put batch - in this case this is a batch with the same <shard, primary, sequence> but with different digests (otherwise we would have returned it).
We keep waiting on stateCond, waiting for Put or PutForce to add the desired batch.
Note: a. If the sequence is higher than the sequence of the most recently Put batch - we wait for it to be added to the index using Put. b. If the context is cancelled, the method will return with an error.
Put:
If the batch is too large, it will return ErrBatchTooLarge.
If the batch is already in the index, it will return ErrBatchAlreadyExists.
Aquire the lock.
We update lastPutSeqRequest with the sequence of the batch.
If there is space in the index - put the batch, Broadcast on stateCond and return.
While there is no space for the batch: a. If the sequence of the batch is lower than the max popped sequence (stored in `maxPoppedCallSeq`) we are prefetching too fast, wait (sync.Cond wait) for the batches in the index to be popped prior to putting this batch. b. Else - we remove the oldest batch from the index.
There is a free space - insert the batch into the index.
Release the lock.
Note: If the context is cancelled, the method will return with an error.
PutForce:
If PopOrWait has requested a batch, this batch needs to be put into the index from an external module using PutForce, unlike the regular Put that has space constraints, this put always succeeds immediately, since this batch is needed now.
Auto Eviction: Each batch is put with a TTL timer, after it is expired, we automatically remove the batch and Broadcast on stateCond
func NewPartitionPrefetchIndex ¶
func NewPartitionPrefetchIndex(partition ShardPrimary, logger types.Logger, defaultTtl time.Duration, maxSizeBytes int, timerFactory TimerFactory, batchCacheFactory BatchCacheFactory, batchRequestChan chan types.BatchID, popWaitMonitorTimeout time.Duration) *PartitionPrefetchIndex
func (*PartitionPrefetchIndex) PutForce ¶
func (bs *PartitionPrefetchIndex) PutForce(batch types.Batch) error
func (*PartitionPrefetchIndex) Stop ¶
func (bs *PartitionPrefetchIndex) Stop()
type PartitionPrefetchIndexerFactory ¶
type PartitionPrefetchIndexerFactory interface { Create( partition ShardPrimary, logger types.Logger, defaultTtl time.Duration, maxSizeBytes int, timerFactory TimerFactory, batchCacheFactory BatchCacheFactory, batchRequestChan chan types.BatchID, popWaitMonitorTimeout time.Duration) PartitionPrefetchIndexer }
type PrefetchIndex ¶
type PrefetchIndex struct {
// contains filtered or unexported fields
}
func NewPrefetchIndex ¶
func NewPrefetchIndex( shards []types.ShardID, parties []types.PartyID, logger types.Logger, defaultTtl time.Duration, maxPartitionSizeBytes int, requestChannelSize int, timerFactory TimerFactory, batchCacheFactory BatchCacheFactory, partitionPrefetchIndexerFactory PartitionPrefetchIndexerFactory, popWaitMonitorTimeout time.Duration, ) *PrefetchIndex
func (*PrefetchIndex) Requests ¶
func (pi *PrefetchIndex) Requests() <-chan types.BatchID
func (*PrefetchIndex) Stop ¶
func (pi *PrefetchIndex) Stop()
type PrefetchIndexer ¶
type PrefetchIndexerFactory ¶
type PrefetchIndexerFactory interface { Create( shards []types.ShardID, parties []types.PartyID, logger types.Logger, defaultTtl time.Duration, maxPartitionSizeBytes int, requestChannelSize int, timerFactory TimerFactory, batchCacheFactory BatchCacheFactory, partitionPrefetchIndexerFactory PartitionPrefetchIndexerFactory, popWaitMonitorTimeout time.Duration, ) PrefetchIndexer }
type Prefetcher ¶
type Prefetcher struct {
// contains filtered or unexported fields
}
func NewPrefetcher ¶
func NewPrefetcher( shards []types.ShardID, parties []types.PartyID, prefetchIndex PrefetchIndexer, batchFetcher BatchBringer, logger types.Logger, ) *Prefetcher
type PrefetcherController ¶
type PrefetcherController interface { Start() Stop() }
type PrefetcherFactory ¶
type PrefetcherFactory interface {
Create(shards []types.ShardID, parties []types.PartyID, prefetchIndex PrefetchIndexer, batchFetcher BatchBringer, logger types.Logger) PrefetcherController
}
type ShardPrimary ¶
func ShardPrimaryFromBatch ¶
func ShardPrimaryFromBatch(batchId types.BatchID) ShardPrimary
type StoppableTimer ¶
type StoppableTimer interface {
Stop() bool
}
type TimerFactory ¶
type TimerFactory interface {
Create(time.Duration, func()) StoppableTimer
}