manager

package
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 5, 2026 License: Apache-2.0, MIT Imports: 38 Imported by: 0

Documentation

Index

Constants

View Source
const (
	QueueName   = "manager"
	HandlerName = "add_roots"
)
View Source
const (
	// DefaultMaxBatchSizeBytes is the maximum number of pieces that may be submitted to be added as roots in a single operation
	DefaultMaxBatchSizeBytes = 10
	// DefaultPollInterval is the frequency the manager will flush its buffer to submit roots
	DefaultPollInterval = 30 * time.Second
)

DefaultMaxBatchSizeBytes is the maximum size of batch.

View Source
const ManagerKey = "manager/"

Variables

Functions

func GenerateReceipts

func GenerateReceipts(ctx context.Context, issuer ucan.Signer, aggregate types.Aggregate, resolver apitypes.PieceResolverAPI) ([]receipt.AnyReceipt, error)

func GenerateReceiptsForAggregates

func GenerateReceiptsForAggregates(ctx context.Context, issuer ucan.Signer, aggregates []types.Aggregate, resolver apitypes.PieceResolverAPI) ([]receipt.AnyReceipt, error)

func NewAddRootsTaskHandler

func NewAddRootsTaskHandler(
	api pdptypes.ProofSetAPI,
	proofSet pdptypes.ProofSetIDProvider,
	store types.Store,
	accepter *PieceAcceptor,
) jobqueue.TaskHandler[[]datamodel.Link]

NewAddRootsTaskHandler creates a TaskHandler that submits aggregate roots to the PDP Service

func NewQueue

func NewQueue(params QueueParams) (jobqueue.Service[[]datamodel.Link], error)

Types

type AddRootsTaskHandler

type AddRootsTaskHandler struct {
	// contains filtered or unexported fields
}

func (*AddRootsTaskHandler) Handle

func (a *AddRootsTaskHandler) Handle(ctx context.Context, links []datamodel.Link) (retErr error)

func (*AddRootsTaskHandler) Name

func (a *AddRootsTaskHandler) Name() string

type Aggregation

type Aggregation struct {
	Roots []datamodel.Link
}

type BufferStore

type BufferStore interface {
	// Aggregation retrieves the pending pieces aggregation.
	Aggregation(context.Context) (Aggregation, error)
	// AppendRoots adds roots to the pending aggregation
	AppendRoots(context.Context, []datamodel.Link) error
	// ClearRoots removes all roots from the current aggregation.
	ClearRoots(context.Context) error
}

BufferStore provides persistent storage for submission state

func NewSubmissionWorkspace

func NewSubmissionWorkspace(params SubmissionWorkspaceParams) (BufferStore, error)

NewSubmissionWorkspace creates a new submission workspace backed by the provided store

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager handles batched submission of aggregates to the blockchain

func NewManager

func NewManager(lc fx.Lifecycle, params ManagerParams) (*Manager, error)

NewManager creates a new submission manager

func (*Manager) Start

func (m *Manager) Start() error

Start begins background processing

func (*Manager) Stop

func (m *Manager) Stop(ctx context.Context) error

Stop gracefully shuts down the manager

func (*Manager) Submit

func (m *Manager) Submit(ctx context.Context, aggregateLinks ...datamodel.Link) error

Submit adds aggregates to the buffer for submission

type ManagerOption

type ManagerOption func(*Manager)

func WithClock

func WithClock(clock clock.Clock) ManagerOption

func WithMaxBatchSize

func WithMaxBatchSize(maxBatchSize int) ManagerOption

func WithPollInterval

func WithPollInterval(pollInterval time.Duration) ManagerOption

type ManagerParams

type ManagerParams struct {
	fx.In

	Queue       jobqueue.Service[[]datamodel.Link]
	TaskHandler jobqueue.TaskHandler[[]datamodel.Link]
	Buffer      BufferStore
	Options     []ManagerOption `optional:"true"`
}

type PieceAcceptor

type PieceAcceptor struct {
	// contains filtered or unexported fields
}

func NewPieceAccepter

func NewPieceAccepter(issuer principal.Signer, aggregateStore types.Store, receiptStore receiptstore.ReceiptStore, resolver apitypes.PieceResolverAPI) *PieceAcceptor

func (*PieceAcceptor) AcceptPieces

func (pa *PieceAcceptor) AcceptPieces(ctx context.Context, aggregateLinks []datamodel.Link) error

type QueueParams

type QueueParams struct {
	fx.In
	DB *sql.DB `name:"aggregator_db"`
}

type SubmissionWorkspaceParams

type SubmissionWorkspaceParams struct {
	fx.In
	Datastore datastore.Datastore `name:"aggregator_datastore"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL