Documentation
¶
Index ¶
- Variables
- type Config
- type FilterStore
- type LogStore
- type ObservedFilterStore
- func (o *ObservedFilterStore) GetAllActiveFilters(ctx context.Context) ([]models.Filter, error)
- func (o *ObservedFilterStore) GetDistinctAddresses(ctx context.Context) ([]*address.Address, error)
- func (o *ObservedFilterStore) GetFiltersByAddress(ctx context.Context, addr *address.Address) ([]models.Filter, error)
- func (o *ObservedFilterStore) HasFilter(ctx context.Context, name string) (bool, error)
- func (o *ObservedFilterStore) RegisterFilter(ctx context.Context, flt models.Filter) (int64, error)
- func (o *ObservedFilterStore) UnregisterFilter(ctx context.Context, name string) error
- type ObservedLogStore
- func (o *ObservedLogStore) GetHighestMCBlockSeqno(ctx context.Context) (uint32, bool, error)
- func (o *ObservedLogStore) QueryLogs(ctx context.Context, logQuery *query.LogQuery) ([]models.Log, bool, string, error)
- func (o *ObservedLogStore) SaveLogs(ctx context.Context, logs []models.Log, batchInsertSize, minBatchSize uint32) (int64, error)
- type RawLogProvider
- type Service
- type ServiceOptions
- type TxLoader
Constants ¶
This section is empty.
Variables ¶
var DefaultConfigSet = Config{ PollPeriod: config.MustNewDuration(5 * time.Second), PageSize: 100, LogPollerStartingLookback: config.MustNewDuration(24 * time.Hour), BlockTime: config.MustNewDuration(2500 * time.Millisecond), BatchInsertSize: 3500, MinBatchSize: 500, SaveThreshold: 7000, MCBlockCacheSize: 1000, MCBlockResolveMaxRetries: 3, MCBlockResolveBaseDelay: config.MustNewDuration(100 * time.Millisecond), PruningInterval: config.MustNewDuration(defaultPruningInterval), PruningBatchSize: defaultPruningBatchSize, PruningStartDelay: config.MustNewDuration(defaultPruningStartDelay), }
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// PollPeriod is the target interval between tick starts.
PollPeriod *config.Duration
// PageSize is the number of transactions fetched per API call. Larger values increase
// throughput but also increase per-tick processing time. Tune based on expected volume.
PageSize uint32
LogPollerStartingLookback *config.Duration
BlockTime *config.Duration
MCBlockCacheSize int // LRU cache maps shard block keys to masterchain seqno
// Database configuration - simple values with defaults
BatchInsertSize uint32
MinBatchSize uint32
SaveThreshold uint32 // Number of logs to buffer in memory before saving
// MC block resolution retry configuration
MCBlockResolveMaxRetries uint32 // Max retry attempts for masterchain block resolution
MCBlockResolveBaseDelay *config.Duration // Base delay for exponential backoff
// Pruning configuration
PruningInterval *config.Duration // How often to run pruning (default 10 minutes)
PruningBatchSize int64 // Max rows to delete per batch (default 1000)
PruningStartDelay *config.Duration // Delay before first pruning cycle (default 5 minutes)
}
Config holds the configuration for the log poller. NOTE: when adding new fields, please update ApplyDefaults, DefaultConfigSet, and ValidateConfig accordingly. Also check toml_test.go TestNewDecodedTOMLConfig() to ensure new fields are tested there.
Performance Note: The service loop is synchronous - each tick blocks until processing completes, so concurrent ticks cannot occur. However, if processing takes longer than PollPeriod, the poller falls behind chain head (a warning is logged when this happens). Processing time is primarily driven by PageSize and transaction volume. If processing consistently exceeds PollPeriod, reduce PageSize or increase PollPeriod. Monitor ton_logpoller_poll_duration_seconds metric.
func (*Config) ApplyDefaults ¶
func (c *Config) ApplyDefaults()
func (*Config) ValidateConfig ¶
type FilterStore ¶
type FilterStore interface {
// RegisterFilter adds a new filter or overwrites an existing one with the same name.
// Returns the ID of the created filter.
RegisterFilter(ctx context.Context, flt models.Filter) (int64, error)
// UnregisterFilter removes a filter by its unique name.
UnregisterFilter(ctx context.Context, name string) error
// HasFilter checks if a filter with the given name exists.
HasFilter(ctx context.Context, name string) (bool, error)
// GetDistinctAddresses returns a slice of unique addresses that are being monitored.
GetDistinctAddresses(ctx context.Context) ([]*address.Address, error)
// GetFiltersByAddress returns all filters for a specific address.
GetFiltersByAddress(ctx context.Context, addr *address.Address) ([]models.Filter, error)
// GetAllActiveFilters returns all non-deleted filters for the chain.
// Used to populate filter cache on startup.
GetAllActiveFilters(ctx context.Context) ([]models.Filter, error)
// DeleteEmptyFilters removes filter rows that are marked is_deleted=true
// and have no remaining logs in the logs table.
// This is the final cleanup step after DeleteLogsForDeletedFilters has removed all logs.
// Returns number of filter rows deleted.
DeleteEmptyFilters(ctx context.Context) (int64, error)
}
FilterStore defines an interface for storing and retrieving log filter specifications. Note: Filter changes at the store level are immediate, but the LogPoller service reads filters once per tick, so changes take effect on the next loop tick.
type LogStore ¶
type LogStore interface {
// SaveLogs saves logs to storage with configurable batching behavior(with transaction support in PostgreSQL).
// batchInsertSize controls the maximum number of logs per database batch operation.
// minBatchSize sets the minimum batch size for retry attempts on timeout errors.
// Returns the number of logs successfully saved.
SaveLogs(ctx context.Context, logs []models.Log, batchInsertSize, minBatchSize uint32) (int64, error)
// QueryLogs retrieves logs with TON-specific filtering capabilities including byte-level filtering,
// sorting, and pagination. This method handles all filtering, sorting, and pagination.
// The LogStore is responsible for translating parameters to its optimal execution strategy.
// Uses chainlink-common's LimitAndSort for standardized pagination and sorting.
QueryLogs(ctx context.Context, query *query.LogQuery) (logs []models.Log, hasMore bool, nextCursor string, err error)
// GetHighestMCBlockSeqno retrieves the highest masterchain block sequence number
// from stored logs. Returns (seqno, exists, err) where exists indicates whether any
// logs are stored. This is used for resuming processing from the last known state
// after a service restart.
GetHighestMCBlockSeqno(ctx context.Context) (seqno uint32, exists bool, err error)
// DeleteExpiredLogs removes logs that have passed their pre-computed expiration time.
// Uses the expires_at column (set at insert time as tx_timestamp + retention).
// Logs with expires_at = NULL (retention = 0, "keep forever") are never deleted.
// limit controls batch size per DELETE operation (use 0 for unlimited, not recommended).
// Returns total number of rows deleted across all batches.
DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error)
// DeleteExcessLogs removes logs exceeding max_logs_kept for each filter.
// Uses tx_lt + msg_index ordering (descending) to keep newest logs.
// Only processes filters with max_logs_kept > 0 (0 = unlimited).
// limit controls batch size (use 0 for unlimited, not recommended).
// Returns number of rows deleted.
DeleteExcessLogs(ctx context.Context, limit int64) (int64, error)
// DeleteLogsForDeletedFilters removes logs for filters marked is_deleted=true.
// Uses batched deletion with LIMIT for safe removal without table locks.
// Note: Filter row cleanup is handled separately by FilterStore.DeleteEmptyFilters.
// Returns number of log rows deleted.
DeleteLogsForDeletedFilters(ctx context.Context, limit int64) (int64, error)
}
LogStore defines the interface for storing and retrieving logs.
type ObservedFilterStore ¶
type ObservedFilterStore struct {
FilterStore
// contains filtered or unexported fields
}
ObservedFilterStore wraps a FilterStore with metrics instrumentation
func NewObservedFilterStore ¶
func NewObservedFilterStore(store FilterStore, metrics *logPollerMetrics, lggr logger.Logger) *ObservedFilterStore
NewObservedFilterStore creates a new observed filter store wrapper
func (*ObservedFilterStore) GetAllActiveFilters ¶
GetAllActiveFilters wraps the underlying GetAllActiveFilters with metrics
func (*ObservedFilterStore) GetDistinctAddresses ¶
GetDistinctAddresses wraps the underlying GetDistinctAddresses with metrics
func (*ObservedFilterStore) GetFiltersByAddress ¶
func (o *ObservedFilterStore) GetFiltersByAddress(ctx context.Context, addr *address.Address) ([]models.Filter, error)
GetFiltersByAddress wraps the underlying GetFiltersByAddress with metrics
func (*ObservedFilterStore) RegisterFilter ¶
RegisterFilter wraps the underlying RegisterFilter with metrics
func (*ObservedFilterStore) UnregisterFilter ¶
func (o *ObservedFilterStore) UnregisterFilter(ctx context.Context, name string) error
UnregisterFilter wraps the underlying UnregisterFilter with metrics
type ObservedLogStore ¶
type ObservedLogStore struct {
LogStore
// contains filtered or unexported fields
}
ObservedLogStore wraps a LogStore with metrics instrumentation
func NewObservedLogStore ¶
func NewObservedLogStore(store LogStore, metrics *logPollerMetrics, lggr logger.Logger) *ObservedLogStore
NewObservedLogStore creates a new observed log store wrapper
func (*ObservedLogStore) GetHighestMCBlockSeqno ¶
GetHighestMCBlockSeqno wraps the underlying GetHighestMCBlockSeqno with metrics
type RawLogProvider ¶
type RawLogProvider interface {
// GetLogs retrieves all external message outputs for an address between fromBlockSeqNo (exclusive) and toBlock (inclusive).
GetLogs(ctx context.Context, addr *address.Address, from uint32, to *ton.BlockIDExt) ([]models.RawLog, error)
}
RawLogProvider provides raw logs leveraging LogPoller libs without running the full service (o11y use case)
func NewTonO11yLogProvider ¶
func NewTonO11yLogProvider(client ton.APIClientWrapped, loader TxLoader, lggr logger.Logger) RawLogProvider
NewTonO11yLogProvider creates a new RawLogProvider backed by a TON o11y client.
type Service ¶
type Service interface {
services.Service
// RegisterFilter adds a new filter. Changes take effect on the next loop tick (up to pollPeriod delay).
RegisterFilter(ctx context.Context, flt models.Filter) (int64, error)
// UnregisterFilter removes a filter. Changes take effect on the next loop tick (up to pollPeriod delay).
// If called during an active tick, the old filter continues processing for that tick.
UnregisterFilter(ctx context.Context, name string) error
HasFilter(ctx context.Context, name string) (bool, error)
Replay(ctx context.Context, fromBlock uint32) error
ReplayStatus() models.ReplayStatus
NewQuery() query.Builder
}
Service defines the public interface for the TON log polling service.
func NewService ¶
func NewService( lggr logger.Logger, chainID string, clientProvider func(context.Context) (ton.APIClientWrapped, error), opts *ServiceOptions, ) (Service, error)
NewService creates a new TON log polling service instance
func NewServiceWith ¶
func NewServiceWith( ctx context.Context, lggr logger.Logger, chainID string, clientProvider func(context.Context) (ton.APIClientWrapped, error), opts *ServiceOptions, filters []models.Filter, ) (Service, error)
NewServiceWith creates a new TON log polling service and registers the provided filters. This is a convenience constructor for cases where filters are known upfront. The caller is responsible for starting the service with Start().
type ServiceOptions ¶
type ServiceOptions struct {
Config Config
FilterStore FilterStore
TxLoader TxLoader
LogStore LogStore
}
type TxLoader ¶
type TxLoader interface {
// LoadTxsForAddress retrieves transactions for a specific address within a block range.
// pageSize controls the number of transactions to fetch per TON API call for pagination.
// Transactions and runtime errors are written to the provided channels synchronously.
// Returns immediate validation/setup errors. The caller is responsible for spawning goroutines
// and managing channel lifecycle.
LoadTxsForAddress(ctx context.Context, blockRange *models.BlockRange, addr *address.Address, pageSize uint32, txOut chan<- models.Tx, errOut chan<- error) error
// GetTxsForAddress is a convenience wrapper around LoadTxsForAddress that returns
// transactions as a slice instead of streaming to a channel. This is suitable for
// bounded result sets where memory is not a concern.
// Use LoadTxsForAddress for streaming large result sets or when you need fine-grained
// control over concurrent processing.
//
// Warning: Be cautious about memory pressure when querying large ranges of blocks.
// For large ranges, consider using LoadTxsForAddress with streaming to process
// transactions incrementally.
GetTxsForAddress(ctx context.Context, blockRange *models.BlockRange, addr *address.Address, pageSize uint32) ([]models.Tx, error)
}
TxLoader defines the interface for loading transactions from the TON blockchain.