Documentation
¶
Overview ¶
Package ioswarm provides integration points for embedding the IOSwarm coordinator into an iotex-core node.
Integration into iotex-core requires:
1. Add to config/config.go:
type Config struct {
...
IOSwarm ioswarm.Config `yaml:"ioswarm"`
}
2. Add to server/itx/server.go — newServer():
if cfg.IOSwarm.Enabled {
svr.ioswarmCoord = ioswarm.NewCoordinator(
cfg.IOSwarm,
ioswarm.NewActPoolAdapter(cs.ActionPool(), cs.Blockchain()),
ioswarm.NewStateReaderAdapter(cs.StateFactory()),
)
}
3. Add to server/itx/server.go — Start():
if svr.ioswarmCoord != nil {
svr.ioswarmCoord.Start(ctx)
}
4. Add to server/itx/server.go — Stop():
if svr.ioswarmCoord != nil {
svr.ioswarmCoord.Stop()
}
5. Add to config YAML:
ioswarm: enabled: true grpcPort: 14689 maxAgents: 100 taskLevel: "L2" shadowMode: true pollIntervalMs: 1000
Production adapters (ActPoolAdapter, StateReaderAdapter) are in adapter.go.
Index ¶
- func AgentIDFromContext(ctx context.Context) string
- func DeriveAgentToken(masterSecret, agentID string) string
- func FormatIOTX(rau *big.Int) string
- func IOTXToWei(iotx float64) *big.Int
- func PrintPayoutTable(summary *EpochSummary) string
- func RegisterIOSwarmServer(s *grpc.Server, srv IOSwarmServer)
- func SetupStateDiffCallback(sf factory.Factory, coord *Coordinator)
- func ValidateAgentToken(masterSecret, agentID, token string) bool
- type ActPoolAdapter
- type ActPoolReader
- type AgentAccuracy
- type AgentInfo
- type AgentWork
- type Config
- type Coordinator
- func (c *Coordinator) CompareEVMShadow(agentResults []*pb.TaskResult, actualResults map[uint32]*EVMActualResult, ...)
- func (c *Coordinator) DiffBroadcaster() *StateDiffBroadcaster
- func (c *Coordinator) DiffStore() *DiffStore
- func (c *Coordinator) DrainRecentResults() []*pb.TaskResult
- func (c *Coordinator) LastTaskID() uint32
- func (c *Coordinator) OnBlockExecuted(blockHeight uint64, actualResults map[uint32]bool, ...)
- func (c *Coordinator) ReceiveBlock(blk *block.Block) error
- func (c *Coordinator) ReceiveStateDiff(height uint64, entries []StateDiffEntry, digest []byte)
- func (c *Coordinator) ShadowStats() ShadowStats
- func (c *Coordinator) Start(ctx context.Context) error
- func (c *Coordinator) Stop()
- type DiffStore
- func (ds *DiffStore) Append(diff *StateDiff) error
- func (ds *DiffStore) Close() error
- func (ds *DiffStore) Get(height uint64) (*StateDiff, error)
- func (ds *DiffStore) GetRange(from, to uint64) ([]*StateDiff, error)
- func (ds *DiffStore) LatestHeight() uint64
- func (ds *DiffStore) OldestHeight() uint64
- func (ds *DiffStore) Prune(keepAfter uint64) (int, error)
- type EVMActualResult
- type EpochSummary
- type IOSwarmServer
- type IOSwarm_GetTasksServer
- type IOSwarm_StreamStateDiffsServer
- type OnChainSettler
- type Option
- type Payout
- type PendingTx
- type Prefetcher
- type Registry
- func (r *Registry) Count() int
- func (r *Registry) EvictStale(threshold time.Duration) []string
- func (r *Registry) GetAgent(agentID string) (*AgentInfo, bool)
- func (r *Registry) Heartbeat(req *pb.HeartbeatRequest) bool
- func (r *Registry) LiveAgents(threshold time.Duration) []*AgentInfo
- func (r *Registry) Register(req *pb.RegisterRequest) (*AgentInfo, bool)
- func (r *Registry) Unregister(agentID string)
- type RewardConfig
- type RewardDistributor
- func (r *RewardDistributor) CurrentEpoch() uint64
- func (r *RewardDistributor) CurrentWork() map[string]*AgentWork
- func (r *RewardDistributor) Distribute(totalReward *big.Int) *EpochSummary
- func (r *RewardDistributor) EpochElapsed() time.Duration
- func (r *RewardDistributor) EpochHistory() []EpochSummary
- func (r *RewardDistributor) RecordShadowAccuracy(agentID string, matched uint64)
- func (r *RewardDistributor) RecordWork(agentID string, tasksProcessed, tasksCorrect uint64, totalLatencyUs uint64)
- func (r *RewardDistributor) SetAgentWallet(agentID, walletAddress string)
- type RewardSettler
- type Scheduler
- type ShadowComparator
- func (s *ShadowComparator) CompareEVMResults(agentResults []*pb.TaskResult, actualResults map[uint32]*EVMActualResult, ...)
- func (s *ShadowComparator) CompareWithActual(actualResults map[uint32]bool, taskSenders map[uint32]string, ...) ([]ShadowResult, map[string]*AgentAccuracy)
- func (s *ShadowComparator) RecordAgentResults(agentID string, batch *pb.BatchResult)
- func (s *ShadowComparator) ResetStats()
- func (s *ShadowComparator) Stats() ShadowStats
- type ShadowResult
- type ShadowStats
- type StateDiff
- type StateDiffBroadcaster
- func (b *StateDiffBroadcaster) BufferedCount() int
- func (b *StateDiffBroadcaster) DroppedCount(agentID string) int64
- func (b *StateDiffBroadcaster) GetRange(from, to uint64) []*StateDiff
- func (b *StateDiffBroadcaster) LatestHeight() uint64
- func (b *StateDiffBroadcaster) Publish(diff *StateDiff)
- func (b *StateDiffBroadcaster) Subscribe(agentID string) <-chan *StateDiff
- func (b *StateDiffBroadcaster) SubscriberCount() int
- func (b *StateDiffBroadcaster) Unsubscribe(agentID string)
- type StateDiffEntry
- type StateReader
- type StateReaderAdapter
- func (s *StateReaderAdapter) AccountState(addr string) (snap *pb.AccountSnapshot, err error)
- func (s *StateReaderAdapter) GetCode(addr string) (code []byte, err error)
- func (s *StateReaderAdapter) GetStorageAt(addr, slot string) (val string, err error)
- func (s *StateReaderAdapter) SimulateAccessList(from, to string, data []byte, value string, gasLimit uint64) (result map[string][]string, err error)
- type SwarmAPI
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AgentIDFromContext ¶
AgentIDFromContext extracts the verified agent ID from the context. Returns empty string if not present (e.g., auth disabled).
func DeriveAgentToken ¶
DeriveAgentToken generates an API key for the given agent using HMAC-SHA256. The key format is "iosw_" + hex(HMAC-SHA256(masterSecret, agentID)).
func FormatIOTX ¶
FormatIOTX formats a rau amount as a human-readable IOTX string.
func PrintPayoutTable ¶
func PrintPayoutTable(summary *EpochSummary) string
PrintPayoutTable formats a payout summary as a readable table.
func RegisterIOSwarmServer ¶
func RegisterIOSwarmServer(s *grpc.Server, srv IOSwarmServer)
RegisterIOSwarmServer registers the gRPC service handler. In production, this would be generated by protoc-gen-go-grpc.
func SetupStateDiffCallback ¶
func SetupStateDiffCallback(sf factory.Factory, coord *Coordinator)
SetupStateDiffCallback wires the state diff callback from the state factory to the coordinator's diff broadcaster. This enables L4 agents to receive per-block state diffs for stateful validation.
func ValidateAgentToken ¶
ValidateAgentToken checks that the token matches HMAC-SHA256(masterSecret, agentID). Uses constant-time comparison to prevent timing attacks.
Types ¶
type ActPoolAdapter ¶
type ActPoolAdapter struct {
// contains filtered or unexported fields
}
ActPoolAdapter wraps iotex-core's actpool.ActPool and blockchain.Blockchain to implement ActPoolReader for production use.
func NewActPoolAdapter ¶
func NewActPoolAdapter(pool actpool.ActPool, bc blockchain.Blockchain) *ActPoolAdapter
NewActPoolAdapter creates a new ActPoolAdapter.
func (*ActPoolAdapter) BlockHeight ¶
func (a *ActPoolAdapter) BlockHeight() uint64
BlockHeight returns the current tip block height.
func (*ActPoolAdapter) PendingActions ¶
func (a *ActPoolAdapter) PendingActions() (txs []*PendingTx)
PendingActions reads all pending transactions from the actpool and converts them to the ioswarm PendingTx format.
type ActPoolReader ¶
ActPoolReader is the interface for reading pending transactions. In production, this wraps iotex-core's ActPool.
type AgentAccuracy ¶
AgentAccuracy holds per-agent shadow match counts for a comparison batch.
type AgentInfo ¶
type AgentInfo struct {
ID string
Capability pb.TaskLevel
Region string
Version string
RegisteredAt time.Time
LastHeartbeat time.Time
TasksProcessed uint32
TasksPending uint32
CPUUsage float64
MemUsage float64
TaskChan chan *pb.TaskBatch // channel to push tasks to this agent's stream
// contains filtered or unexported fields
}
AgentInfo tracks a connected agent's state.
func (*AgentInfo) CloseTaskChan ¶
func (a *AgentInfo) CloseTaskChan()
CloseTaskChan safely closes the TaskChan exactly once.
type AgentWork ¶
type AgentWork struct {
AgentID string
WalletAddress string // agent's IOTX address for payout
TasksProcessed uint64
TasksCorrect uint64 // matched in shadow mode
TotalLatencyUs uint64
Uptime time.Duration // time since registration
Level string // L1, L2, L3
}
AgentWork tracks an agent's contribution in the current epoch.
type Config ¶
type Config struct {
Enabled bool `yaml:"enabled"`
GRPCPort int `yaml:"grpcPort"` // default 14689
SwarmAPIPort int `yaml:"swarmApiPort"` // default 14690 (0 to disable)
MaxAgents int `yaml:"maxAgents"` // default 100
TaskLevel string `yaml:"taskLevel"` // "L1", "L2", "L3", "L4"
ShadowMode bool `yaml:"shadowMode"` // default true
PollIntervalMS int `yaml:"pollIntervalMs"` // default 1000
MasterSecret string `yaml:"masterSecret"` // HMAC master secret for agent auth (empty = no auth)
DelegateAddress string `yaml:"delegateAddress"` // delegate's IOTX address for reward payout
EpochRewardIOTX float64 `yaml:"epochRewardIOTX"` // IOTX per epoch for reward distribution (default 800)
// On-chain reward pool settlement
RewardContract string `yaml:"rewardContract"` // AgentRewardPool contract address (empty = disabled)
RewardSignerKey string `yaml:"rewardSignerKey"` // hex private key for signing depositAndSettle txs
RewardRPCURL string `yaml:"rewardRpcUrl"` // RPC endpoint (default: https://babel-api.mainnet.iotex.io)
RewardChainID int64 `yaml:"rewardChainId"` // chain ID (default: 4689 for IoTeX mainnet)
DiffBufferSize int `yaml:"diffBufferSize"` // ring buffer size for state diff broadcaster (default 100)
DiffStoreEnabled bool `yaml:"diffStoreEnabled"` // persist state diffs to disk (default true for L4)
DiffStorePath string `yaml:"diffStorePath"` // path to statediffs.db (default: <datadir>/statediffs.db)
DiffRetainHeight uint64 `yaml:"diffRetainHeight"` // prune diffs older than this many blocks (default 10000 ≈ 27h; 0 = keep all)
Reward RewardConfig `yaml:"reward"`
}
Config holds IOSwarm coordinator configuration.
type Coordinator ¶
type Coordinator struct {
// contains filtered or unexported fields
}
Coordinator is the central orchestrator that polls the actpool, prefetches account state, and dispatches validation tasks to agents.
func NewCoordinator ¶
func NewCoordinator(cfg Config, actPool ActPoolReader, stateReader StateReader, opts ...Option) *Coordinator
NewCoordinator creates a new IOSwarm coordinator. actPool and stateReader are in-memory interfaces from iotex-core.
func (*Coordinator) CompareEVMShadow ¶
func (c *Coordinator) CompareEVMShadow(agentResults []*pb.TaskResult, actualResults map[uint32]*EVMActualResult, blockHeight uint64)
CompareEVMShadow feeds EVM reference results to the shadow comparator for comparison against agent-submitted EVM results.
func (*Coordinator) DiffBroadcaster ¶
func (c *Coordinator) DiffBroadcaster() *StateDiffBroadcaster
DiffBroadcaster returns the state diff broadcaster for gRPC streaming.
func (*Coordinator) DiffStore ¶
func (c *Coordinator) DiffStore() *DiffStore
DiffStore returns the persistent diff store (may be nil if disabled).
func (*Coordinator) DrainRecentResults ¶
func (c *Coordinator) DrainRecentResults() []*pb.TaskResult
DrainRecentResults removes and returns all recent task results (for EVM shadow comparison).
func (*Coordinator) LastTaskID ¶
func (c *Coordinator) LastTaskID() uint32
LastTaskID returns the most recently assigned task ID.
func (*Coordinator) OnBlockExecuted ¶
func (c *Coordinator) OnBlockExecuted(blockHeight uint64, actualResults map[uint32]bool, taskSenders map[uint32]string)
OnBlockExecuted should be called after iotex-core executes a block. It compares agent results against actual execution and invalidates the prefetcher cache so stale state isn't served to agents.
actualResults maps task_id → whether the tx was actually valid. In production, this is wired from iotex-core's block executor. In simulation, pass generated results.
func (*Coordinator) ReceiveBlock ¶
func (c *Coordinator) ReceiveBlock(blk *block.Block) error
ReceiveBlock implements blockchain.BlockCreationSubscriber. Called by iotex-core after each block is committed to chain. It maps on-chain tx results back to dispatched task IDs for shadow comparison.
func (*Coordinator) ReceiveStateDiff ¶
func (c *Coordinator) ReceiveStateDiff(height uint64, entries []StateDiffEntry, digest []byte)
ReceiveStateDiff is called by the stateDB diff callback after each block commit. It converts WriteQueueEntry to StateDiffEntry and publishes to the broadcaster.
func (*Coordinator) ShadowStats ¶
func (c *Coordinator) ShadowStats() ShadowStats
ShadowStats returns the current shadow comparison statistics.
func (*Coordinator) Start ¶
func (c *Coordinator) Start(ctx context.Context) error
Start launches the coordinator's gRPC server and main polling loop.
func (*Coordinator) Stop ¶
func (c *Coordinator) Stop()
Stop gracefully shuts down the coordinator with a 5-second timeout. If GracefulStop doesn't complete in time, it falls back to a hard Stop to avoid blocking the node's shutdown sequence.
type DiffStore ¶
type DiffStore struct {
// contains filtered or unexported fields
}
DiffStore persists state diffs to a BoltDB file so that StreamStateDiffs can serve any historical range (not just the last N blocks from the ring buffer).
func OpenDiffStore ¶
OpenDiffStore opens or creates a DiffStore at the given path.
func (*DiffStore) Append ¶
Append writes a state diff to the store. Idempotent by height — if a diff for this height already exists, it is silently overwritten.
func (*DiffStore) GetRange ¶
GetRange returns state diffs for heights [from, to] inclusive, ordered by height. Uses cursor seeking for efficient range scans.
func (*DiffStore) LatestHeight ¶
LatestHeight returns the highest stored height, or 0 if empty.
func (*DiffStore) OldestHeight ¶
OldestHeight returns the lowest stored height, or 0 if empty.
type EVMActualResult ¶
type EVMActualResult struct {
TaskID uint32
GasUsed uint64
StateChanges []*pb.StateChange
ExecError string
}
EVMActualResult holds the reference EVM execution result for shadow comparison.
type EpochSummary ¶
type EpochSummary struct {
Epoch uint64
TotalReward *big.Int
DelegateCut *big.Int
DelegateAddress string
AgentPool *big.Int
Payouts []Payout
AgentCount int
TotalTasks uint64
Timestamp time.Time
}
EpochSummary records a completed epoch's distribution.
type IOSwarmServer ¶
type IOSwarmServer interface {
Register(context.Context, *pb.RegisterRequest) (*pb.RegisterResponse, error)
GetTasks(*pb.GetTasksRequest, IOSwarm_GetTasksServer) error
SubmitResults(context.Context, *pb.BatchResult) (*pb.SubmitResponse, error)
Heartbeat(context.Context, *pb.HeartbeatRequest) (*pb.HeartbeatResponse, error)
StreamStateDiffs(*pb.StreamStateDiffsRequest, IOSwarm_StreamStateDiffsServer) error
}
IOSwarmServer is the gRPC service interface.
func NewGRPCHandler ¶
func NewGRPCHandler(c *Coordinator) IOSwarmServer
NewGRPCHandler creates a gRPC handler for this coordinator. Use with RegisterIOSwarmServer to register on a grpc.Server.
type IOSwarm_GetTasksServer ¶
type IOSwarm_GetTasksServer interface {
Send(*pb.TaskBatch) error
grpc.ServerStream
}
IOSwarm_GetTasksServer is the server-side streaming interface for GetTasks.
type IOSwarm_StreamStateDiffsServer ¶
type IOSwarm_StreamStateDiffsServer interface {
Send(*pb.StateDiffResponse) error
grpc.ServerStream
}
IOSwarm_StreamStateDiffsServer is the server-side streaming interface for StreamStateDiffs.
type OnChainSettler ¶
type OnChainSettler struct {
// contains filtered or unexported fields
}
OnChainSettler implements RewardSettler using go-ethereum's ethclient.
func NewOnChainSettler ¶
func NewOnChainSettler(cfg Config, logger *zap.Logger) (*OnChainSettler, error)
NewOnChainSettler creates a settler from Config. Returns (nil, nil) if RewardContract or RewardSignerKey is empty.
type Option ¶
type Option func(*options)
Option configures the coordinator.
func WithDataDir ¶
WithDataDir sets the directory for persistent stores (e.g. statediffs.db).
func WithRewardSettler ¶
func WithRewardSettler(s RewardSettler) Option
WithRewardSettler sets the on-chain reward settler. When set, distributeEpochReward will call depositAndSettle on the contract.
type Payout ¶
type Payout struct {
AgentID string
WalletAddress string
Amount *big.Int // in rau (1 IOTX = 10^18 rau)
AmountIOTX float64 // human-readable
TasksDone uint64
Accuracy float64
BonusApplied bool
}
Payout represents a single agent's reward for an epoch.
type PendingTx ¶
type PendingTx struct {
Hash string
From string
To string
Nonce uint64
Amount string // big.Int as string
GasLimit uint64
GasPrice string
Data []byte
RawBytes []byte // serialized envelope
}
PendingTx represents a pending transaction from the actpool.
type Prefetcher ¶
type Prefetcher struct {
// contains filtered or unexported fields
}
Prefetcher batch-reads account state for pending transactions.
func NewPrefetcher ¶
func NewPrefetcher(sr StateReader, logger *zap.Logger) *Prefetcher
NewPrefetcher creates a new state prefetcher.
func (*Prefetcher) InvalidateCache ¶
func (p *Prefetcher) InvalidateCache()
InvalidateCache clears cached state (call on new block). Uses atomic pointer swap to avoid races with concurrent Prefetch.
func (*Prefetcher) Prefetch ¶
func (p *Prefetcher) Prefetch(txs []*PendingTx) map[string]*pb.AccountSnapshot
Prefetch gathers account state for all addresses involved in the pending txs. Returns a map of address → AccountSnapshot.
func (*Prefetcher) PrefetchCode ¶
func (p *Prefetcher) PrefetchCode(addresses []string) map[string][]byte
PrefetchCode fetches contract bytecode for the given addresses with a 5s timeout.
func (*Prefetcher) PrefetchStorage ¶
func (p *Prefetcher) PrefetchStorage(address string, slots []string) map[string]string
PrefetchStorage fetches storage slots for a contract address with a 5s timeout.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry tracks all connected agents.
func NewRegistry ¶
NewRegistry creates a new agent registry.
func (*Registry) EvictStale ¶
EvictStale removes agents that haven't sent a heartbeat within the threshold.
func (*Registry) Heartbeat ¶
func (r *Registry) Heartbeat(req *pb.HeartbeatRequest) bool
Heartbeat updates an agent's last heartbeat time and stats.
func (*Registry) LiveAgents ¶
LiveAgents returns all agents with a heartbeat within the given threshold.
func (*Registry) Register ¶
func (r *Registry) Register(req *pb.RegisterRequest) (*AgentInfo, bool)
Register adds an agent to the registry. Returns false if at capacity.
func (*Registry) Unregister ¶
Unregister removes an agent from the registry.
type RewardConfig ¶
type RewardConfig struct {
// DelegateCutPct is the percentage the delegate keeps (0-100).
// Default: 10 (delegate keeps 10%, agents split 90%)
DelegateCutPct float64 `yaml:"delegateCutPct"`
// EpochBlocks is how many blocks per reward epoch.
// Default: 360 (IoTeX epoch = 1 hour at 10s blocks)
EpochBlocks uint64 `yaml:"epochBlocks"`
// MinTasksForReward is the minimum tasks an agent must process
// in an epoch to receive rewards. Prevents freeloading.
// Default: 10
MinTasksForReward uint64 `yaml:"minTasksForReward"`
// BonusAccuracyPct: agents with shadow accuracy above this get a bonus.
// Default: 99.5
BonusAccuracyPct float64 `yaml:"bonusAccuracyPct"`
// BonusMultiplier: bonus multiplier for high-accuracy agents.
// Default: 1.2 (20% bonus)
BonusMultiplier float64 `yaml:"bonusMultiplier"`
}
RewardConfig controls reward distribution parameters.
func DefaultRewardConfig ¶
func DefaultRewardConfig() RewardConfig
DefaultRewardConfig returns sane defaults.
type RewardDistributor ¶
type RewardDistributor struct {
// contains filtered or unexported fields
}
RewardDistributor tracks agent work and calculates reward distribution.
Flow:
- Coordinator calls RecordWork() on every SubmitResults
- At epoch end (every N blocks), delegate calls Distribute()
- Distribute() returns a payout list: agent_address → IOTX amount
- Delegate signs a batch transfer tx (or calls a RewardPool contract)
The delegate keeps a configurable cut (e.g. 10%) for operating costs, and distributes the rest proportionally by tasks validated.
func NewRewardDistributor ¶
func NewRewardDistributor(cfg RewardConfig, delegateAddress string, logger *zap.Logger) *RewardDistributor
NewRewardDistributor creates a new reward distributor.
func (*RewardDistributor) CurrentEpoch ¶
func (r *RewardDistributor) CurrentEpoch() uint64
CurrentEpoch returns the current epoch number.
func (*RewardDistributor) CurrentWork ¶
func (r *RewardDistributor) CurrentWork() map[string]*AgentWork
CurrentWork returns a snapshot of current epoch work stats.
func (*RewardDistributor) Distribute ¶
func (r *RewardDistributor) Distribute(totalReward *big.Int) *EpochSummary
Distribute computes payouts, freezes the snapshot, and advances the epoch atomically under a single lock hold. The returned summary is the exact snapshot that should be used for on-chain settlement.
func (*RewardDistributor) EpochElapsed ¶
func (r *RewardDistributor) EpochElapsed() time.Duration
EpochElapsed returns the time since the current epoch started.
func (*RewardDistributor) EpochHistory ¶
func (r *RewardDistributor) EpochHistory() []EpochSummary
EpochHistory returns all past epoch summaries.
func (*RewardDistributor) RecordShadowAccuracy ¶
func (r *RewardDistributor) RecordShadowAccuracy(agentID string, matched uint64)
RecordShadowAccuracy adds verified correct tasks from shadow comparison. Called from OnBlockExecuted after ground-truth comparison.
func (*RewardDistributor) RecordWork ¶
func (r *RewardDistributor) RecordWork(agentID string, tasksProcessed, tasksCorrect uint64, totalLatencyUs uint64)
RecordWork records an agent's completed work. Called on every SubmitResults.
func (*RewardDistributor) SetAgentWallet ¶
func (r *RewardDistributor) SetAgentWallet(agentID, walletAddress string)
SetAgentWallet sets the payout address for an agent.
type RewardSettler ¶
type RewardSettler interface {
// Settle calls depositSettleAndClaim on the reward pool contract.
// agents: list of 0x-prefixed agent wallet addresses
// weights: corresponding weight for each agent (same length)
// value: total IOTX to deposit in wei/rau
// claimees: optional list of agent addresses to auto-claim for (can be nil)
Settle(ctx context.Context, agents []string, weights []*big.Int, value *big.Int, claimees []string) error
}
RewardSettler sends on-chain settlement transactions to the AgentRewardPool contract.
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler assigns task batches to agents using least-loaded dispatch with capability filtering and a bounded retry queue.
func NewScheduler ¶
NewScheduler creates a new task scheduler.
func (*Scheduler) Dispatch ¶
func (s *Scheduler) Dispatch(tasks []*pb.TaskPackage, batchSize int) int
Dispatch sends tasks to available agents using least-loaded selection. Tasks are split into batches and dispatched with capability filtering. Returns the number of tasks dispatched.
func (*Scheduler) RetryQueueLen ¶
RetryQueueLen returns the number of batches in the retry queue. For testing.
type ShadowComparator ¶
type ShadowComparator struct {
// contains filtered or unexported fields
}
ShadowComparator compares agent validation results against iotex-core's actual block execution. In shadow mode, agent results never affect block production — they're purely observational.
func NewShadowComparator ¶
func NewShadowComparator(logger *zap.Logger) *ShadowComparator
NewShadowComparator creates a new shadow comparator.
func (*ShadowComparator) CompareEVMResults ¶
func (s *ShadowComparator) CompareEVMResults(agentResults []*pb.TaskResult, actualResults map[uint32]*EVMActualResult, blockHeight uint64)
CompareEVMResults compares agent EVM results against reference results.
func (*ShadowComparator) CompareWithActual ¶
func (s *ShadowComparator) CompareWithActual(actualResults map[uint32]bool, taskSenders map[uint32]string, blockHeight uint64) ([]ShadowResult, map[string]*AgentAccuracy)
CompareWithActual compares stored agent results against actual execution. actualResults maps task_id → whether the tx was actually valid. taskSenders maps task_id → sender address (for nonce race detection). Returns mismatches and per-agent accuracy counts.
func (*ShadowComparator) RecordAgentResults ¶
func (s *ShadowComparator) RecordAgentResults(agentID string, batch *pb.BatchResult)
RecordAgentResults stores agent results for later comparison.
func (*ShadowComparator) ResetStats ¶
func (s *ShadowComparator) ResetStats()
ResetStats resets the statistics counters.
func (*ShadowComparator) Stats ¶
func (s *ShadowComparator) Stats() ShadowStats
Stats returns current shadow comparison statistics.
type ShadowResult ¶
type ShadowResult struct {
TaskID uint32
AgentResult *pb.TaskResult
ActualValid bool
Match bool
AgentID string
BlockHeight uint64
}
ShadowResult holds the comparison between agent and actual results.
type ShadowStats ¶
type ShadowStats struct {
TotalCompared uint64
TotalMatched uint64
TotalMismatched uint64
FalsePositives uint64 // agent said valid, actual invalid
FalseNegatives uint64 // agent said invalid, actual valid
NonceRaceExcluded uint64 // false positives excused due to nonce race
// EVM-specific shadow stats (L3)
EVMGasMatches uint64
EVMGasMismatches uint64
EVMStateMatches uint64
EVMStateMismatches uint64
}
ShadowStats tracks shadow mode accuracy metrics.
type StateDiff ¶
type StateDiff struct {
Height uint64
Entries []StateDiffEntry
DigestBytes []byte // raw SerializeQueue output hash for verification
}
StateDiff contains all state changes for a single block.
type StateDiffBroadcaster ¶
type StateDiffBroadcaster struct {
// contains filtered or unexported fields
}
StateDiffBroadcaster buffers recent state diffs and fans them out to subscribers.
func NewStateDiffBroadcaster ¶
func NewStateDiffBroadcaster(maxBuffer int, logger *zap.Logger) *StateDiffBroadcaster
NewStateDiffBroadcaster creates a new broadcaster with the given buffer size.
func (*StateDiffBroadcaster) BufferedCount ¶
func (b *StateDiffBroadcaster) BufferedCount() int
BufferedCount returns the number of diffs currently in the ring buffer.
func (*StateDiffBroadcaster) DroppedCount ¶
func (b *StateDiffBroadcaster) DroppedCount(agentID string) int64
DroppedCount returns the number of diffs dropped for a subscriber.
func (*StateDiffBroadcaster) GetRange ¶
func (b *StateDiffBroadcaster) GetRange(from, to uint64) []*StateDiff
GetRange returns state diffs for heights [from, to] inclusive. Returns nil entries for heights that have been evicted from the buffer.
func (*StateDiffBroadcaster) LatestHeight ¶
func (b *StateDiffBroadcaster) LatestHeight() uint64
LatestHeight returns the height of the most recent state diff, or 0 if empty.
func (*StateDiffBroadcaster) Publish ¶
func (b *StateDiffBroadcaster) Publish(diff *StateDiff)
Publish adds a state diff to the ring buffer and fans out to all subscribers.
func (*StateDiffBroadcaster) Subscribe ¶
func (b *StateDiffBroadcaster) Subscribe(agentID string) <-chan *StateDiff
Subscribe returns a channel that receives new state diffs. The channel has a buffer of 64 to absorb brief bursts. If an agent is already subscribed, the old subscription is closed first.
func (*StateDiffBroadcaster) SubscriberCount ¶
func (b *StateDiffBroadcaster) SubscriberCount() int
SubscriberCount returns the number of active subscribers.
func (*StateDiffBroadcaster) Unsubscribe ¶
func (b *StateDiffBroadcaster) Unsubscribe(agentID string)
Unsubscribe removes a subscriber and closes its channel.
type StateDiffEntry ¶
type StateDiffEntry struct {
Type uint8 // 0=Put, 1=Delete
Namespace string
Key []byte
Value []byte
}
StateDiffEntry represents a single state mutation (Put or Delete).
type StateReader ¶
type StateReader interface {
AccountState(address string) (*pb.AccountSnapshot, error)
GetCode(address string) ([]byte, error) // contract bytecode
GetStorageAt(address, slot string) (string, error) // storage slot value (hex)
// SimulateAccessList runs a read-only EVM simulation and returns all storage
// slots accessed by the transaction. Returns nil map if not supported.
SimulateAccessList(from, to string, data []byte, value string, gasLimit uint64) (map[string][]string, error)
}
StateReader is the interface for reading account state. In production, this wraps iotex-core's StateFactory.
type StateReaderAdapter ¶
type StateReaderAdapter struct {
// contains filtered or unexported fields
}
StateReaderAdapter wraps iotex-core's factory.Factory to implement StateReader for production use.
func NewStateReaderAdapter ¶
func NewStateReaderAdapter(sf factory.Factory, bc blockchain.Blockchain, g genesis.Genesis) *StateReaderAdapter
NewStateReaderAdapter creates a new StateReaderAdapter.
func (*StateReaderAdapter) AccountState ¶
func (s *StateReaderAdapter) AccountState(addr string) (snap *pb.AccountSnapshot, err error)
AccountState reads the confirmed account state from the stateDB.
func (*StateReaderAdapter) GetCode ¶
func (s *StateReaderAdapter) GetCode(addr string) (code []byte, err error)
GetCode returns contract bytecode from the stateDB.
func (*StateReaderAdapter) GetStorageAt ¶
func (s *StateReaderAdapter) GetStorageAt(addr, slot string) (val string, err error)
GetStorageAt returns a storage slot value from the stateDB. slot is a hex-encoded 32-byte key (e.g. "0x0000...0000").
func (*StateReaderAdapter) SimulateAccessList ¶
func (s *StateReaderAdapter) SimulateAccessList(from, to string, data []byte, value string, gasLimit uint64) (result map[string][]string, err error)
SimulateAccessList runs a read-only EVM simulation and returns all storage slots accessed during execution, keyed by contract address (io1 format).
type SwarmAPI ¶
type SwarmAPI struct {
// contains filtered or unexported fields
}
SwarmAPI serves HTTP endpoints for monitoring the IOSwarm. Endpoints:
GET /api/stats — public network stats (CORS-enabled, no auth) GET /swarm/status — overall swarm status GET /swarm/agents — list all connected agents GET /swarm/leaderboard — agents ranked by tasks processed GET /swarm/epoch — current epoch work stats GET /swarm/shadow — shadow mode comparison stats GET /healthz — health check
func NewSwarmAPI ¶
func NewSwarmAPI(coord *Coordinator, reward *RewardDistributor) *SwarmAPI
NewSwarmAPI creates a new swarm API.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
cmd
|
|
|
keygen
command
Command keygen generates HMAC-based API keys for IOSwarm agents.
|
Command keygen generates HMAC-based API keys for IOSwarm agents. |
|
l4baseline
command
l4baseline converts a mainnet trie.db (BoltDB) into an IOSWSNAP baseline snapshot for L4 agent state sync.
|
l4baseline converts a mainnet trie.db (BoltDB) into an IOSWSNAP baseline snapshot for L4 agent state sync. |
|
l4sim
command
l4sim is a multi-agent stress simulation for the IOSwarm L4 state diff pipeline.
|
l4sim is a multi-agent stress simulation for the IOSwarm L4 state diff pipeline. |
|
l4test
command
l4test connects to a live delegate and validates StreamStateDiffs.
|
l4test connects to a live delegate and validates StreamStateDiffs. |
|
sim
command
sim runs a full IOSwarm simulation: 1 delegate coordinator + N agents.
|
sim runs a full IOSwarm simulation: 1 delegate coordinator + N agents. |
|
Package proto contains the IOSwarm gRPC types.
|
Package proto contains the IOSwarm gRPC types. |