Documentation
¶
Index ¶
- Constants
- func AddHex(s string) string
- func Base58Decode(b string) []byte
- func Base58Encode(b []byte) string
- func BytesToHex(b []byte) string
- func CalculateAverageDurationFromSlice(durations []time.Duration) time.Duration
- func CalculatePercentile(durations []time.Duration, percentile int) time.Duration
- func DBUrl(config MgrConfig) string
- func DBUrlSecure(config MgrConfig) string
- func EventProcess(data interface{}, matcher Matcher) (interface{}, bool, error)
- func ExtractTimestamp(extrinsics []byte) (ts string, err error)
- func GetAddressTableName(relayChain, chain string) string
- func GetBlocksPrimaryKeyName(relayChain, chain string) string
- func GetBlocksTableName(relayChain, chain string) string
- func GetListOfRegisteredQueries() (iter.Seq[NamedQuery], error)
- func GetStatsPerMonthTableName(relayChain, chain string) string
- func GetSystemMemoryGB() (int, error)
- func HexToBytes(s string) []byte
- func IntInSlice(a int, list []int) bool
- func IntToHex(i interface{}) string
- func IsValidAddress(address string) bool
- func ParseTimestamp(timestamp string) (time.Time, error)
- func ProcessBlockBatch(ctx context.Context, blockIDs []int, relayChain, chain string, db Database, ...)
- func ProcessSingleBlock(ctx context.Context, blockID int, relayChain, chain string, db Database, ...)
- func RegisterQuery(name, sqlTemplate, description string) error
- func SS58Decode(address string, addressType int) string
- func SS58Encode(address string, addressType int) string
- func SetupSignalHandler(cancel context.CancelFunc)
- func TrimHex(s string) string
- type BlockData
- type Bucket
- type BucketStats
- type ChainReader
- type Config
- type DBDialect
- type DBPoolConfig
- type Database
- type DatabaseInfo
- type DotidxBatch
- type DotidxDB
- type DotidxFE
- type Duration
- type EncodedBlock
- type EncodedDigest
- type EncodedHeader
- type EventsBalance
- type EventsStaking
- type FallbackChainReader
- func (f *FallbackChainReader) FetchBlock(ctx context.Context, id int) (BlockData, error)
- func (f *FallbackChainReader) FetchBlockRange(ctx context.Context, blockIDs []int) ([]BlockData, error)
- func (f *FallbackChainReader) GetChainHeadID() (int, error)
- func (f *FallbackChainReader) GetStats() *MetricsStats
- func (f *FallbackChainReader) Ping() error
- type FilesystemConfig
- type Filter
- type Matcher
- type Metrics
- type MetricsStats
- type MgrConfig
- type MonitoringConfig
- type NamedQuery
- type NamedQueryParameters
- type OrchestratorConfig
- type ParaChainConfig
- type RuntimeVersion
- type SQLDatabase
- func (s *SQLDatabase) Close() error
- func (s *SQLDatabase) CreateDotidxTable(relayChain, chain string) error
- func (s *SQLDatabase) CreateIndex(relayChain, chain string) error
- func (s *SQLDatabase) CreateTable(relayChain, chain, firstTimestamp, lastTimestamp string) error
- func (s *SQLDatabase) CreateTableAddress2Blocks(relayChain, chain string) error
- func (s *SQLDatabase) CreateTableAddress2BlocksPartitions(relayChain, chain string) error
- func (s *SQLDatabase) CreateTableBlocks(relayChain, chain string) error
- func (s *SQLDatabase) CreateTableBlocksPartitions(relayChain, chain, firstTimestamp, lastTimestamp string) error
- func (s *SQLDatabase) CreateTableMonthlyQueryResults() error
- func (s *SQLDatabase) DoUpgrade() error
- func (s *SQLDatabase) ExecuteAndStoreNamedQuery(ctx context.Context, relayChain, chain, queryName string, year, month int) error
- func (s *SQLDatabase) ExecuteNamedQuery(ctx context.Context, relayChain, chain, queryName string, year, month int) (SqlResult, error)
- func (s *SQLDatabase) GetDatabaseInfo() ([]DatabaseInfo, error)
- func (s *SQLDatabase) GetExistingBlocks(relayChain, chain string, startRange, endRange int) (map[int]bool, error)
- func (s *SQLDatabase) GetStats() *MetricsStats
- func (s *SQLDatabase) Ping() error
- func (s *SQLDatabase) ReadTimeNamedQuery(ctx context.Context, relayChain, chain, queryName string, year, month int) (t time.Time, err error)
- func (s *SQLDatabase) Save(items []BlockData, relayChain, chain string) error
- func (s *SQLDatabase) StoreMonthlyQueryResult(ctx context.Context, relayChain, chain, queryName string, year, month int, ...) error
- type ServiceHealthStatus
- type ServiceManager
- type ServiceNode
- type Sidecar
- type SqlResult
- type SubstrateRPCReader
- func (r *SubstrateRPCReader) FetchBlock(ctx context.Context, id int) (BlockData, error)
- func (r *SubstrateRPCReader) FetchBlockRange(ctx context.Context, blockIDs []int) ([]BlockData, error)
- func (r *SubstrateRPCReader) GetChainHeadID() (int, error)
- func (r *SubstrateRPCReader) GetStats() *MetricsStats
- func (r *SubstrateRPCReader) Ping() error
- type TemporalConfig
Constants ¶
const SQLDatabaseSchemaVersion = 2
Variables ¶
This section is empty.
Functions ¶
func Base58Decode ¶
Decode decodes a modified base58 string to a byte slice.
func Base58Encode ¶
Encode encodes a byte slice to a modified base58 string.
func BytesToHex ¶
func CalculatePercentile ¶
func DBUrlSecure ¶
func EventProcess ¶
func ExtractTimestamp ¶
func GetAddressTableName ¶
func GetBlocksPrimaryKeyName ¶
func GetBlocksTableName ¶
func GetListOfRegisteredQueries ¶
func GetListOfRegisteredQueries() (iter.Seq[NamedQuery], error)
func GetSystemMemoryGB ¶
GetSystemMemoryGB detects the system's total memory in GB
func HexToBytes ¶
func IntInSlice ¶
func IsValidAddress ¶
func ParseTimestamp ¶
ParseTimestamp parses a timestamp string in different formats and returns a time.Time. It tries multiple formats: - Unix timestamp (seconds since epoch) - RFC3339 format - ISO8601 format
func ProcessBlockBatch ¶
func ProcessBlockBatch( ctx context.Context, blockIDs []int, relayChain, chain string, db Database, reader ChainReader, )
ProcessBlockBatch fetches and processes a batch of blocks using fetchBlockRange
func ProcessSingleBlock ¶
func ProcessSingleBlock( ctx context.Context, blockID int, relayChain, chain string, db Database, reader ChainReader, )
ProcessSingleBlock fetches and processes a single block using fetchBlock
func RegisterQuery ¶
func SS58Decode ¶
func SS58Encode ¶
func SetupSignalHandler ¶
func SetupSignalHandler(cancel context.CancelFunc)
Types ¶
type BlockData ¶
type BlockData struct {
ID string `json:"number"`
Timestamp time.Time `json:"timestamp"`
Hash string `json:"hash"`
ParentHash string `json:"parentHash"`
StateRoot string `json:"stateRoot"`
ExtrinsicsRoot string `json:"extrinsicsRoot"`
AuthorID string `json:"authorId"`
Finalized bool `json:"finalized"`
OnInitialize json.RawMessage `json:"onInitialize"`
OnFinalize json.RawMessage `json:"onFinalize"`
Logs json.RawMessage `json:"logs"`
Extrinsics json.RawMessage `json:"extrinsics"`
// Elastic scaling fields (available when useRcBlock parameter is used)
RcBlockNumber *string `json:"rcBlockNumber,omitempty"`
RcBlockHash *string `json:"rcBlockHash,omitempty"`
}
type Bucket ¶
type Bucket struct {
// contains filtered or unexported fields
}
Bucket tracks performance metrics for sidecar API calls
func (*Bucket) GetStats ¶
func (m *Bucket) GetStats() (bs BucketStats)
func (*Bucket) PrintStats ¶
PrintStats prints the current metrics statistics
type BucketStats ¶
func NewBucketStats ¶
func NewBucketStats() BucketStats
func (BucketStats) RateSinceStart ¶
func (bs BucketStats) RateSinceStart() (rate float64)
type ChainReader ¶
type ChainReader interface {
GetChainHeadID() (int, error)
FetchBlockRange(ctx context.Context, blockIDs []int) ([]BlockData, error)
FetchBlock(ctx context.Context, id int) (BlockData, error)
Ping() error
GetStats() *MetricsStats
}
func NewChainReader ¶
func NewChainReader(relay, chain, wsUrl, httpUrl string) ChainReader
NewChainReader creates a ChainReader with fallback support It uses SubstrateRPC as primary and HTTP Sidecar as fallback
func NewChainReaderFromConfig ¶
func NewChainReaderFromConfig(relay, chain string, config ParaChainConfig) ChainReader
NewChainReaderFromConfig creates a ChainReader from ParaChainConfig It automatically constructs the WebSocket and HTTP URLs from config
type Config ¶
type Config struct {
StartRange int
EndRange int
ChainReaderURL string
DatabaseURL string
BatchSize int
MaxWorkers int
FlushTimeout time.Duration
Relaychain string
Chain string
Live bool
FrontendIP string
FrontendPort int
FrontendStatic string
}
func ParseFlags ¶
type DBPoolConfig ¶
type DBPoolConfig struct {
MaxOpenConns int // Maximum number of open connections
MaxIdleConns int // Maximum number of idle connections
ConnMaxLifetime time.Duration // Maximum lifetime of a connection
ConnMaxIdleTime time.Duration // Maximum idle time of a connection
}
DBPoolConfig contains the configuration for the database connection pool
func DefaultDBPoolConfig ¶
func DefaultDBPoolConfig() DBPoolConfig
type Database ¶
type Database interface {
CreateTable(relayChain, chain, firstTimestamp, lastTimestamp string) error
CreateIndex(relayChain, chain string) error
Save(items []BlockData, relayChain, chain string) error
GetExistingBlocks(relayChain, chain string, startRange, endRange int) (map[int]bool, error)
Ping() error
GetStats() *MetricsStats
DoUpgrade() error
Close() error
GetDatabaseInfo() ([]DatabaseInfo, error)
ReadTimeNamedQuery(ctx context.Context, relayChain, chain, queryName string, year, month int) (time.Time, error)
ExecuteNamedQuery(ctx context.Context, relayChain, chain, queryName string, year, month int) (SqlResult, error)
ExecuteAndStoreNamedQuery(ctx context.Context, relayChain, chain, queryName string, year, month int) error
}
Database defines the interface for database operations
type DatabaseInfo ¶
type DotidxBatch ¶
type DotidxDB ¶
type DotidxDB struct {
Type string `toml:"type"`
Version int `toml:"version"`
Name string `toml:"name"`
IP string `toml:"ip"`
User string `toml:"user"`
Port int `toml:"port"`
Password string `toml:"password"`
Memory string `toml:"memory"`
Data string `toml:"data"`
Run string `toml:"run"`
WhitelistedIP []string `toml:"whitelisted_ip"`
}
type Duration ¶
func (*Duration) UnmarshalText ¶
type EncodedBlock ¶
type EncodedBlock struct {
Block struct {
Header EncodedHeader `json:"header"`
Extrinsics []string `json:"extrinsics"`
} `json:"block"`
Justifications any `json:"justifications"`
}
EncodedBlock represents a block received via RPC/WS
type EncodedDigest ¶
type EncodedDigest struct {
Logs []string `json:"logs"`
}
EncodedDigest represents the digest in a block header
type EncodedHeader ¶
type EncodedHeader struct {
Number string `json:"number"`
ParentHash string `json:"parentHash"`
StateRoot string `json:"stateRoot"`
ExtrinsicsRoot string `json:"extrinsicsRoot"`
Digest EncodedDigest `json:"digest"`
}
EncodedHeader represents the block header
type EventsBalance ¶
type EventsBalance struct {
// contains filtered or unexported fields
}
func NewEventsBalance ¶
func NewEventsBalance(address string) *EventsBalance
func (*EventsBalance) Process ¶
func (eb *EventsBalance) Process(extrinsics json.RawMessage) (filtered json.RawMessage, found bool, err error)
func (*EventsBalance) ProcessGJson ¶
func (eb *EventsBalance) ProcessGJson(extrinsics json.RawMessage) (filtered json.RawMessage, err error)
type EventsStaking ¶
type EventsStaking struct {
// contains filtered or unexported fields
}
func NewEventsStaking ¶
func NewEventsStaking(address string) *EventsStaking
func (*EventsStaking) Process ¶
func (es *EventsStaking) Process(extrinsics json.RawMessage) (filtered json.RawMessage, found bool, err error)
func (*EventsStaking) ProcessGJson ¶
func (es *EventsStaking) ProcessGJson(extrinsics json.RawMessage) (filtered json.RawMessage, err error)
type FallbackChainReader ¶
type FallbackChainReader struct {
// contains filtered or unexported fields
}
FallbackChainReader implements ChainReader with a fallback mechanism It tries the primary reader first, and falls back to the secondary reader on failure
func NewFallbackChainReader ¶
func NewFallbackChainReader(relay, chain, wsUrl, httpUrl string) *FallbackChainReader
NewFallbackChainReader creates a new FallbackChainReader wsUrl: WebSocket URL for the primary SubstrateRPC reader httpUrl: HTTP URL for the fallback Sidecar reader
func (*FallbackChainReader) FetchBlock ¶
FetchBlock implements ChainReader interface with fallback
func (*FallbackChainReader) FetchBlockRange ¶
func (f *FallbackChainReader) FetchBlockRange(ctx context.Context, blockIDs []int) ([]BlockData, error)
FetchBlockRange implements ChainReader interface with fallback
func (*FallbackChainReader) GetChainHeadID ¶
func (f *FallbackChainReader) GetChainHeadID() (int, error)
GetChainHeadID implements ChainReader interface with fallback
func (*FallbackChainReader) GetStats ¶
func (f *FallbackChainReader) GetStats() *MetricsStats
GetStats implements ChainReader interface Returns stats from the primary reader (SubstrateRPC)
func (*FallbackChainReader) Ping ¶
func (f *FallbackChainReader) Ping() error
Ping implements ChainReader interface with fallback
type FilesystemConfig ¶
type FilesystemConfig struct {
ZFS bool `toml:"zfs"`
}
type Filter ¶
type Filter interface {
ProcessGJson(extrinsics json.RawMessage) (filtered json.RawMessage, err error)
Process(extrinsics json.RawMessage) (filtered json.RawMessage, found bool, err error)
}
type Metrics ¶
type Metrics struct {
Buckets []*Bucket
}
Metrics tracks performance metrics for API calls
func (*Metrics) GetStats ¶
func (m *Metrics) GetStats() (s *MetricsStats)
GetStats returns the current metrics statistics
func (*Metrics) PrintStats ¶
PrintStats prints the current metrics statistics
type MetricsStats ¶
type MetricsStats struct {
BucketsStats [4]BucketStats
}
func NewMetricsStats ¶
func NewMetricsStats() *MetricsStats
type MgrConfig ¶
type MgrConfig struct {
TargetDir string `toml:"target_dir"`
Name string `toml:"name"`
UnixUser string // Runtime: set from environment
SystemMemoryGB int // Runtime: detected system memory in GB
MaintenanceWorkMemory string // Runtime: calculated maintenance_work_mem
MaxWalSize string // Runtime: calculated max_wal_size
DbCache int // Runtime: calculated db_cache
RpcMaxConnections int // Runtime: calculated rpc_max_connections
DotidxRoot string `toml:"dotidx_root"`
DotidxBackup string `toml:"dotidx_backup"`
DotidxRun string `toml:"dotidx_run"`
DotidxRuntime string `toml:"dotidx_runtime"`
DotidxLogs string `toml:"dotidx_logs"`
DotidxBin string `toml:"dotidx_bin"`
DotidxStatic string `toml:"dotidx_static"`
DotidxBatch DotidxBatch `toml:"dotidx_batch"`
DotidxDB DotidxDB `toml:"dotidx_db"`
DotidxFE DotidxFE `toml:"dotidx_fe"`
Parachains map[string]map[string]ParaChainConfig `toml:"parachains"`
Filesystem FilesystemConfig `toml:"filesystem"`
Monitoring MonitoringConfig `toml:"monitoring"`
Watcher OrchestratorConfig `toml:"watcher"`
Temporal TemporalConfig `toml:"temporal"`
}
func LoadMgrConfig ¶
func (*MgrConfig) CalculateMemorySettings ¶
func (c *MgrConfig) CalculateMemorySettings()
CalculateMemorySettings calculates PostgreSQL and Node memory settings based on system memory
type MonitoringConfig ¶
type NamedQuery ¶
type NamedQueryParameters ¶
type OrchestratorConfig ¶
type ParaChainConfig ¶
type ParaChainConfig struct {
Name string `toml:"name"`
Bin string `toml:"bin"`
PortRPC int `toml:"port_rpc"`
PortWS int `toml:"port_ws"`
Basepath string `toml:"basepath"`
ChainreaderIP string `toml:"chainreader_ip"`
ChainreaderPort int `toml:"chainreader_port"`
SidecarIP string `toml:"sidecar_ip"`
SidecarPort int `toml:"sidecar_port"`
SidecarPrometheusPort int `toml:"sidecar_prometheus_port"`
SidecarCount int `toml:"sidecar_count"`
PrometheusPort int `toml:"prometheus_port"`
RelayIP string `toml:"relay_ip"`
NodeIP string `toml:"node_ip"`
BootNodes string `toml:"bootnodes"`
}
func (ParaChainConfig) ComputePort ¶
func (ParaChainConfig) ComputePort(i, j int) int
type RuntimeVersion ¶
type RuntimeVersion struct {
SpecName string `json:"specName"`
SpecVersion int `json:"specVersion"`
ImplName string `json:"implName"`
ImplVersion int `json:"implVersion"`
AuthoringVersion int `json:"authoringVersion"`
TransactionVersion int `json:"transactionVersion"`
StateVersion int `json:"stateVersion"`
APIs [][]any `json:"apis"`
}
RuntimeVersion represents the runtime version information
type SQLDatabase ¶
type SQLDatabase struct {
// contains filtered or unexported fields
}
func NewSQLDatabase ¶
func NewSQLDatabase(config MgrConfig) *SQLDatabase
NewSQLDatabase creates a new Database instance
func NewSQLDatabaseWithDB ¶
func NewSQLDatabaseWithDB(db *sql.DB) *SQLDatabase
func NewSQLDatabaseWithPool ¶
func NewSQLDatabaseWithPool(db *sql.DB, poolCfg DBPoolConfig) *SQLDatabase
NewSQLDatabaseWithPool creates a new Database instance with custom connection pool settings Defaults to Postgres dialect for backward compatibility
func NewSQLDatabaseWithPoolAndDialect ¶
func NewSQLDatabaseWithPoolAndDialect(db *sql.DB, poolCfg DBPoolConfig, dialect DBDialect) *SQLDatabase
NewSQLDatabaseWithPoolAndDialect creates a new Database instance with custom connection pool settings and dialect
func (*SQLDatabase) Close ¶
func (s *SQLDatabase) Close() error
func (*SQLDatabase) CreateDotidxTable ¶
func (s *SQLDatabase) CreateDotidxTable(relayChain, chain string) error
func (*SQLDatabase) CreateIndex ¶
func (s *SQLDatabase) CreateIndex(relayChain, chain string) error
TODO: adapt to the new partionning when tables are full (a month) they are immutable so we can write the index once and forall this index is very large and costly, currently on hold it is significanlty faster to grep in the FE
func (*SQLDatabase) CreateTable ¶
func (s *SQLDatabase) CreateTable(relayChain, chain, firstTimestamp, lastTimestamp string) error
func (*SQLDatabase) CreateTableAddress2Blocks ¶
func (s *SQLDatabase) CreateTableAddress2Blocks(relayChain, chain string) error
func (*SQLDatabase) CreateTableAddress2BlocksPartitions ¶
func (s *SQLDatabase) CreateTableAddress2BlocksPartitions(relayChain, chain string) error
func (*SQLDatabase) CreateTableBlocks ¶
func (s *SQLDatabase) CreateTableBlocks(relayChain, chain string) error
func (*SQLDatabase) CreateTableBlocksPartitions ¶
func (s *SQLDatabase) CreateTableBlocksPartitions(relayChain, chain, firstTimestamp, lastTimestamp string) error
func (*SQLDatabase) CreateTableMonthlyQueryResults ¶
func (s *SQLDatabase) CreateTableMonthlyQueryResults() error
func (*SQLDatabase) DoUpgrade ¶
func (s *SQLDatabase) DoUpgrade() error
func (*SQLDatabase) ExecuteAndStoreNamedQuery ¶
func (*SQLDatabase) ExecuteNamedQuery ¶
func (*SQLDatabase) GetDatabaseInfo ¶
func (s *SQLDatabase) GetDatabaseInfo() ([]DatabaseInfo, error)
func (*SQLDatabase) GetExistingBlocks ¶
func (*SQLDatabase) GetStats ¶
func (s *SQLDatabase) GetStats() *MetricsStats
func (*SQLDatabase) Ping ¶
func (s *SQLDatabase) Ping() error
func (*SQLDatabase) ReadTimeNamedQuery ¶
func (*SQLDatabase) Save ¶
func (s *SQLDatabase) Save(items []BlockData, relayChain, chain string) error
func (*SQLDatabase) StoreMonthlyQueryResult ¶
type ServiceHealthStatus ¶
type ServiceHealthStatus string
ServiceHealthStatus represents the health of a managed service
const ( StatusUnknown ServiceHealthStatus = "UNKNOWN" StatusOk ServiceHealthStatus = "OK" StatusKo ServiceHealthStatus = "KO" // "Knocked Out" / Not OK )
type ServiceManager ¶
type ServiceManager struct {
RootNodes []*ServiceNode
// contains filtered or unexported fields
}
func NewServiceManager ¶
func NewServiceManager(rootNodes []*ServiceNode, config OrchestratorConfig) (*ServiceManager, error)
func (*ServiceManager) StartTree ¶
func (sm *ServiceManager) StartTree(ctx context.Context)
StartTree initiates the management of the service tree.
func (*ServiceManager) StopTree ¶
func (sm *ServiceManager) StopTree()
StopTree gracefully stops all watchers and closes the D-Bus connection.
type ServiceNode ¶
type ServiceNode struct {
Name string // Logical name for this node
SystemdUnit string // Actual systemd unit name (if this node is a manageable service)
IsLeaf bool // True if this is a leaf node that should be directly managed/watched
Children []*ServiceNode
// contains filtered or unexported fields
}
ServiceNode represents a node in the service tree
type Sidecar ¶
type Sidecar struct {
// contains filtered or unexported fields
}
Sidecar implements the ChainReader interface using Substrate API Sidecar Supports both regular blocks and elastic scaling enabled parachains Note: Elastic scaling support (v20.9.0+) allows multiple blocks per block height The database schema uses (hash, created_at) as primary key to handle this
func NewSidecar ¶
func (*Sidecar) FetchBlock ¶
fetchBlock makes a call to the sidecar API to fetch a single block Note: With elastic scaling, multiple blocks may exist at the same height This function returns the canonical block. For multi-block queries, use useRcBlock parameter
func (*Sidecar) FetchBlockRange ¶
fetchBlockRange fetches blocks with the specified IDs from the sidecar API
func (*Sidecar) GetChainHeadID ¶
fetchHeadBlock fetches the current head block from the sidecar API
func (*Sidecar) GetStats ¶
func (s *Sidecar) GetStats() *MetricsStats
type SubstrateRPCReader ¶
type SubstrateRPCReader struct {
// contains filtered or unexported fields
}
SubstrateRPCReader implements ChainReader using the Go substrate-rpc-api library This provides a native Go alternative to the HTTP-based Sidecar service
func NewSubstrateRPCReader ¶
func NewSubstrateRPCReader(relay, chain, wsUrl string) *SubstrateRPCReader
NewSubstrateRPCReader creates a new SubstrateRPCReader instance
func (*SubstrateRPCReader) FetchBlock ¶
FetchBlock implements ChainReader interface
func (*SubstrateRPCReader) FetchBlockRange ¶
func (r *SubstrateRPCReader) FetchBlockRange(ctx context.Context, blockIDs []int) ([]BlockData, error)
FetchBlockRange implements ChainReader interface
func (*SubstrateRPCReader) GetChainHeadID ¶
func (r *SubstrateRPCReader) GetChainHeadID() (int, error)
GetChainHeadID implements ChainReader interface
func (*SubstrateRPCReader) GetStats ¶
func (r *SubstrateRPCReader) GetStats() *MetricsStats
GetStats implements ChainReader interface
func (*SubstrateRPCReader) Ping ¶
func (r *SubstrateRPCReader) Ping() error
Ping implements ChainReader interface