logpoller

package
v1.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2026 License: MIT Imports: 27 Imported by: 3

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultConfigSet = Config{
	PollPeriod:                config.MustNewDuration(5 * time.Second),
	PageSize:                  100,
	LogPollerStartingLookback: config.MustNewDuration(24 * time.Hour),
	BlockTime:                 config.MustNewDuration(2500 * time.Millisecond),

	BatchInsertSize: 3500,
	MinBatchSize:    500,
	SaveThreshold:   7000,

	MCBlockCacheSize: 1000,

	MCBlockResolveMaxRetries: 3,
	MCBlockResolveBaseDelay:  config.MustNewDuration(100 * time.Millisecond),

	PruningInterval:   config.MustNewDuration(defaultPruningInterval),
	PruningBatchSize:  defaultPruningBatchSize,
	PruningStartDelay: config.MustNewDuration(defaultPruningStartDelay),
}

Functions

This section is empty.

Types

type Config

type Config struct {
	// PollPeriod is the target interval between tick starts.
	PollPeriod *config.Duration
	// PageSize is the number of transactions fetched per API call. Larger values increase
	// throughput but also increase per-tick processing time. Tune based on expected volume.
	PageSize                  uint32
	LogPollerStartingLookback *config.Duration
	BlockTime                 *config.Duration
	MCBlockCacheSize          int // LRU cache maps shard block keys to masterchain seqno

	// Database configuration - simple values with defaults
	BatchInsertSize uint32
	MinBatchSize    uint32
	SaveThreshold   uint32 // Number of logs to buffer in memory before saving

	// MC block resolution retry configuration
	MCBlockResolveMaxRetries uint32           // Max retry attempts for masterchain block resolution
	MCBlockResolveBaseDelay  *config.Duration // Base delay for exponential backoff

	// Pruning configuration
	PruningInterval   *config.Duration // How often to run pruning (default 10 minutes)
	PruningBatchSize  int64            // Max rows to delete per batch (default 1000)
	PruningStartDelay *config.Duration // Delay before first pruning cycle (default 5 minutes)
}

Config holds the configuration for the log poller. NOTE: when adding new fields, please update ApplyDefaults, DefaultConfigSet, and ValidateConfig accordingly. Also check toml_test.go TestNewDecodedTOMLConfig() to ensure new fields are tested there.

Performance Note: The service loop is synchronous - each tick blocks until processing completes, so concurrent ticks cannot occur. However, if processing takes longer than PollPeriod, the poller falls behind chain head (a warning is logged when this happens). Processing time is primarily driven by PageSize and transaction volume. If processing consistently exceeds PollPeriod, reduce PageSize or increase PollPeriod. Monitor ton_logpoller_poll_duration_seconds metric.

func (*Config) ApplyDefaults

func (c *Config) ApplyDefaults()

func (*Config) ValidateConfig

func (c *Config) ValidateConfig() (err error)

type FilterStore

type FilterStore interface {
	// RegisterFilter adds a new filter or overwrites an existing one with the same name.
	// Returns the ID of the created filter.
	RegisterFilter(ctx context.Context, flt models.Filter) (int64, error)
	// UnregisterFilter removes a filter by its unique name.
	UnregisterFilter(ctx context.Context, name string) error
	// HasFilter checks if a filter with the given name exists.
	HasFilter(ctx context.Context, name string) (bool, error)
	// GetDistinctAddresses returns a slice of unique addresses that are being monitored.
	GetDistinctAddresses(ctx context.Context) ([]*address.Address, error)
	// GetFiltersByAddress returns all filters for a specific address.
	GetFiltersByAddress(ctx context.Context, addr *address.Address) ([]models.Filter, error)
	// GetAllActiveFilters returns all non-deleted filters for the chain.
	// Used to populate filter cache on startup.
	GetAllActiveFilters(ctx context.Context) ([]models.Filter, error)
	// DeleteEmptyFilters removes filter rows that are marked is_deleted=true
	// and have no remaining logs in the logs table.
	// This is the final cleanup step after DeleteLogsForDeletedFilters has removed all logs.
	// Returns number of filter rows deleted.
	DeleteEmptyFilters(ctx context.Context) (int64, error)
}

FilterStore defines an interface for storing and retrieving log filter specifications. Note: Filter changes at the store level are immediate, but the LogPoller service reads filters once per tick, so changes take effect on the next loop tick.

type LogStore

type LogStore interface {
	// SaveLogs saves logs to storage with configurable batching behavior(with transaction support in PostgreSQL).
	// batchInsertSize controls the maximum number of logs per database batch operation.
	// minBatchSize sets the minimum batch size for retry attempts on timeout errors.
	// Returns the number of logs successfully saved.
	SaveLogs(ctx context.Context, logs []models.Log, batchInsertSize, minBatchSize uint32) (int64, error)
	// QueryLogs retrieves logs with TON-specific filtering capabilities including byte-level filtering,
	// sorting, and pagination. This method handles all filtering, sorting, and pagination.
	// The LogStore is responsible for translating parameters to its optimal execution strategy.
	// Uses chainlink-common's LimitAndSort for standardized pagination and sorting.
	QueryLogs(ctx context.Context, query *query.LogQuery) (logs []models.Log, hasMore bool, nextCursor string, err error)
	// GetHighestMCBlockSeqno retrieves the highest masterchain block sequence number
	// from stored logs. Returns (seqno, exists, err) where exists indicates whether any
	// logs are stored. This is used for resuming processing from the last known state
	// after a service restart.
	GetHighestMCBlockSeqno(ctx context.Context) (seqno uint32, exists bool, err error)
	// DeleteExpiredLogs removes logs that have passed their pre-computed expiration time.
	// Uses the expires_at column (set at insert time as tx_timestamp + retention).
	// Logs with expires_at = NULL (retention = 0, "keep forever") are never deleted.
	// limit controls batch size per DELETE operation (use 0 for unlimited, not recommended).
	// Returns total number of rows deleted across all batches.
	DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error)
	// DeleteExcessLogs removes logs exceeding max_logs_kept for each filter.
	// Uses tx_lt + msg_index ordering (descending) to keep newest logs.
	// Only processes filters with max_logs_kept > 0 (0 = unlimited).
	// limit controls batch size (use 0 for unlimited, not recommended).
	// Returns number of rows deleted.
	DeleteExcessLogs(ctx context.Context, limit int64) (int64, error)
	// DeleteLogsForDeletedFilters removes logs for filters marked is_deleted=true.
	// Uses batched deletion with LIMIT for safe removal without table locks.
	// Note: Filter row cleanup is handled separately by FilterStore.DeleteEmptyFilters.
	// Returns number of log rows deleted.
	DeleteLogsForDeletedFilters(ctx context.Context, limit int64) (int64, error)
}

LogStore defines the interface for storing and retrieving logs.

type ObservedFilterStore

type ObservedFilterStore struct {
	FilterStore
	// contains filtered or unexported fields
}

ObservedFilterStore wraps a FilterStore with metrics instrumentation

func NewObservedFilterStore

func NewObservedFilterStore(store FilterStore, metrics *logPollerMetrics, lggr logger.Logger) *ObservedFilterStore

NewObservedFilterStore creates a new observed filter store wrapper

func (*ObservedFilterStore) GetAllActiveFilters

func (o *ObservedFilterStore) GetAllActiveFilters(ctx context.Context) ([]models.Filter, error)

GetAllActiveFilters wraps the underlying GetAllActiveFilters with metrics

func (*ObservedFilterStore) GetDistinctAddresses

func (o *ObservedFilterStore) GetDistinctAddresses(ctx context.Context) ([]*address.Address, error)

GetDistinctAddresses wraps the underlying GetDistinctAddresses with metrics

func (*ObservedFilterStore) GetFiltersByAddress

func (o *ObservedFilterStore) GetFiltersByAddress(ctx context.Context, addr *address.Address) ([]models.Filter, error)

GetFiltersByAddress wraps the underlying GetFiltersByAddress with metrics

func (*ObservedFilterStore) HasFilter

func (o *ObservedFilterStore) HasFilter(ctx context.Context, name string) (bool, error)

HasFilter wraps the underlying HasFilter with metrics

func (*ObservedFilterStore) RegisterFilter

func (o *ObservedFilterStore) RegisterFilter(ctx context.Context, flt models.Filter) (int64, error)

RegisterFilter wraps the underlying RegisterFilter with metrics

func (*ObservedFilterStore) UnregisterFilter

func (o *ObservedFilterStore) UnregisterFilter(ctx context.Context, name string) error

UnregisterFilter wraps the underlying UnregisterFilter with metrics

type ObservedLogStore

type ObservedLogStore struct {
	LogStore
	// contains filtered or unexported fields
}

ObservedLogStore wraps a LogStore with metrics instrumentation

func NewObservedLogStore

func NewObservedLogStore(store LogStore, metrics *logPollerMetrics, lggr logger.Logger) *ObservedLogStore

NewObservedLogStore creates a new observed log store wrapper

func (*ObservedLogStore) GetHighestMCBlockSeqno

func (o *ObservedLogStore) GetHighestMCBlockSeqno(ctx context.Context) (uint32, bool, error)

GetHighestMCBlockSeqno wraps the underlying GetHighestMCBlockSeqno with metrics

func (*ObservedLogStore) QueryLogs

func (o *ObservedLogStore) QueryLogs(ctx context.Context, logQuery *query.LogQuery) ([]models.Log, bool, string, error)

QueryLogs wraps the underlying QueryLogs with metrics

func (*ObservedLogStore) SaveLogs

func (o *ObservedLogStore) SaveLogs(ctx context.Context, logs []models.Log, batchInsertSize, minBatchSize uint32) (int64, error)

SaveLogs wraps the underlying SaveLogs with metrics

type RawLogProvider

type RawLogProvider interface {
	// GetLogs retrieves all external message outputs for an address between fromBlockSeqNo (exclusive) and toBlock (inclusive).
	GetLogs(ctx context.Context, addr *address.Address, from uint32, to *ton.BlockIDExt) ([]models.RawLog, error)
}

RawLogProvider provides raw logs leveraging LogPoller libs without running the full service (o11y use case)

func NewTonO11yLogProvider

func NewTonO11yLogProvider(client ton.APIClientWrapped, loader TxLoader, lggr logger.Logger) RawLogProvider

NewTonO11yLogProvider creates a new RawLogProvider backed by a TON o11y client.

type Service

type Service interface {
	services.Service
	// RegisterFilter adds a new filter. Changes take effect on the next loop tick (up to pollPeriod delay).
	RegisterFilter(ctx context.Context, flt models.Filter) (int64, error)
	// UnregisterFilter removes a filter. Changes take effect on the next loop tick (up to pollPeriod delay).
	// If called during an active tick, the old filter continues processing for that tick.
	UnregisterFilter(ctx context.Context, name string) error
	HasFilter(ctx context.Context, name string) (bool, error)
	Replay(ctx context.Context, fromBlock uint32) error
	ReplayStatus() models.ReplayStatus
	NewQuery() query.Builder
}

Service defines the public interface for the TON log polling service.

func NewService

func NewService(
	lggr logger.Logger,
	chainID string,
	clientProvider func(context.Context) (ton.APIClientWrapped, error),
	opts *ServiceOptions,
) (Service, error)

NewService creates a new TON log polling service instance

func NewServiceWith

func NewServiceWith(
	ctx context.Context,
	lggr logger.Logger,
	chainID string,
	clientProvider func(context.Context) (ton.APIClientWrapped, error),
	opts *ServiceOptions,
	filters []models.Filter,
) (Service, error)

NewServiceWith creates a new TON log polling service and registers the provided filters. This is a convenience constructor for cases where filters are known upfront. The caller is responsible for starting the service with Start().

type ServiceOptions

type ServiceOptions struct {
	Config      Config
	FilterStore FilterStore
	TxLoader    TxLoader
	LogStore    LogStore
}

type TxLoader

type TxLoader interface {
	// LoadTxsForAddress retrieves transactions for a specific address within a block range.
	// pageSize controls the number of transactions to fetch per TON API call for pagination.
	// Transactions and runtime errors are written to the provided channels synchronously.
	// Returns immediate validation/setup errors. The caller is responsible for spawning goroutines
	// and managing channel lifecycle.
	LoadTxsForAddress(ctx context.Context, blockRange *models.BlockRange, addr *address.Address, pageSize uint32, txOut chan<- models.Tx, errOut chan<- error) error

	// GetTxsForAddress is a convenience wrapper around LoadTxsForAddress that returns
	// transactions as a slice instead of streaming to a channel. This is suitable for
	// bounded result sets where memory is not a concern.
	// Use LoadTxsForAddress for streaming large result sets or when you need fine-grained
	// control over concurrent processing.
	//
	// Warning: Be cautious about memory pressure when querying large ranges of blocks.
	// For large ranges, consider using LoadTxsForAddress with streaming to process
	// transactions incrementally.
	GetTxsForAddress(ctx context.Context, blockRange *models.BlockRange, addr *address.Address, pageSize uint32) ([]models.Tx, error)
}

TxLoader defines the interface for loading transactions from the TON blockchain.

Directories

Path Synopsis
store

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL