sink

package
v1.18.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2026 License: Apache-2.0 Imports: 37 Imported by: 2

README

Substreams Sink

This is a Substreams Sink library. You can use to build any sink application that consumes Substreams in Golang

Features

What you get by using this library:

  • Handles connection and reconnections
  • Throughput Logging (block rates, etc)
  • Best Practices error handling

Usage

The library provides a Sinker class that can be used to connect to the Substreams API. The Sinker class is a wrapper around the substreams library, which is a low-level library that provides a convenient way to connect to the Substreams API.

The user's primary responsibility when creating a custom sink is to implement handlers for processing Substreams data. The sink library provides two main patterns for creating handlers:

Basic Handlers

For simple use cases, you can use NewSinkerHandlers which requires only the essential handlers:

import (
	pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2"
)

handlers := sink.NewSinkerHandlers(
	handleBlockScopedData func(ctx context.Context, data *pbsubstreamsrpc.BlockScopedData, isLive *bool, cursor *Cursor) error,
	handleBlockUndoSignal func(ctx context.Context, undoSignal *pbsubstreamsrpc.BlockUndoSignal, cursor *Cursor) error,
)

Full Handlers

For advanced use cases that need session initialization, progress tracking, and debug capabilities, use NewSinkerFullHandlers:

handlers := sink.NewSinkerFullHandlers(
	handleBlockScopedData func(ctx context.Context, data *pbsubstreamsrpc.BlockScopedData, isLive *bool, cursor *Cursor) error,
	handleBlockUndoSignal func(ctx context.Context, undoSignal *pbsubstreamsrpc.BlockUndoSignal, cursor *Cursor) error,
	handleSessionInit func(ctx context.Context, req *pbsubstreamsrpc.Request, session *pbsubstreamsrpc.SessionInit) error,
	handleProgress func(ctx context.Context, progress *pbsubstreamsrpc.ModulesProgress),
	handleInitialSnapshotData func(ctx context.Context, debug *pbsubstreamsrpc.InitialSnapshotData) error,
	handleInitialSnapshotComplete func(ctx context.Context, complete *pbsubstreamsrpc.InitialSnapshotComplete) error,
	handleError func(ctx context.Context, error *pbsubstreamsrpc.Error),
)

We invite you to take a look at our:

[!NOTE] We highly recommend to use the Advanced Example example for any serious sink implementation!

Handler Details

Required Handlers
handleBlockScopedData (Required)
  • ctx context.Context is the sink.Sinker actual context.Context.
  • data *pbsubstreamsrpc.BlockScopedData contains the data that was received from the Substreams API, refer to it's definition for proper usage.
  • isLive *bool will be non-nil if a LivenessChecker has been configured on the Sinker instance. When non-nil, *isLive indicates whether the current block being processed is considered "live" (recently produced) or historical.
  • cursor *Cursor is the cursor at the given block, this cursor should be saved regularly as a checkpoint in case the process is interrupted.
handleBlockUndoSignal (Required)
  • ctx context.Context is the sink.Sinker actual context.Context.
  • undoSignal *pbsubstreamsrpc.BlockUndoSignal contains the last valid block that is still valid, any data saved after this last saved block should be discarded.
  • cursor *Cursor is the cursor to use after the undo, this cursor should be saved regularly as a checkpoint in case the process is interrupted.
Optional Handlers (Available in NewSinkerFullHandlers)
handleSessionInit (Optional)

Called when a new session is initialized with the Substreams server. This is useful for logging session information or performing setup tasks.

  • ctx context.Context is the sink.Sinker actual context.Context.
  • req *pbsubstreamsrpc.Request is the request that was sent to the Substreams API.
  • session *pbsubstreamsrpc.SessionInit contains the session initialization data including trace ID, resolved start block, and linear handoff information.
handleProgress (Optional)

Called periodically to report processing progress. Useful for monitoring and displaying progress information.

  • ctx context.Context is the sink.Sinker actual context.Context.
  • progress *pbsubstreamsrpc.ModulesProgress contains progress information for each module being processed, including processed block ranges and execution statistics.
handleInitialSnapshotData (Optional)

Called when initial snapshot data is available for store modules. Only called in development mode when debug snapshots are enabled.

  • ctx context.Context is the sink.Sinker actual context.Context.
  • snapshotData *pbsubstreamsrpc.InitialSnapshotData contains the initial state data for store modules, useful for debugging.
handleInitialSnapshotComplete (Optional)

Called when all initial snapshot data has been delivered. Signals that the snapshot phase is complete.

  • ctx context.Context is the sink.Sinker actual context.Context.
  • complete *pbsubstreamsrpc.InitialSnapshotComplete indicates completion of the initial snapshot delivery.
handleError (Optional)

Called when errors are received from the Substreams server. Useful for custom error logging or handling.

  • ctx context.Context is the sink.Sinker actual context.Context.
  • error *pbsubstreamsrpc.Error contains error information from the Substreams API.

Alternative: Interface-Based Handlers

Instead of using function-based handlers, you can implement the handler interfaces directly. This approach provides better type safety and organization for complex sinks:

// Implement the required interface
type MySink struct {
    // your sink fields
}

func (s *MySink) HandleBlockScopedData(ctx context.Context, data *pbsubstreamsrpc.BlockScopedData, isLive *bool, cursor *Cursor) error {
    // your implementation
}

func (s *MySink) HandleBlockUndoSignal(ctx context.Context, undoSignal *pbsubstreamsrpc.BlockUndoSignal, cursor *Cursor) error {
    // your implementation
}

// Optional interfaces you can implement
func (s *MySink) HandleProgress(ctx context.Context, progress *pbsubstreamsrpc.ModulesProgress) {
    // your implementation
}

func (s *MySink) HandleSessionInit(ctx context.Context, req *pbsubstreamsrpc.Request, session *pbsubstreamsrpc.SessionInit) error {
    // your implementation
}

func (s *MySink) HandleInitialSnapshotData(ctx context.Context, snapshotData *pbsubstreamsrpc.InitialSnapshotData) error {
    // your implementation
}

func (s *MySink) HandleInitialSnapshotComplete(ctx context.Context, complete *pbsubstreamsrpc.InitialSnapshotComplete) error {
    // your implementation
}

func (s *MySink) HandleError(ctx context.Context, err error) {
    // your implementation
}

func (s *MySink) HandleBlockRangeCompletion(ctx context.Context, cursor *Cursor) error {
    // Called when the sinker finishes processing the requested block range
    // Only called when streaming has a defined end block (not in live mode)
    // your implementation
}

// Pass your sink directly to the sinker
mySink := &MySink{}
sinker.Run(ctx, cursor, mySink)

The sink library automatically detects which interfaces your handler implements and calls the appropriate methods.

Handlers Flow

The basic pattern for using the Sinker is as follows:

  1. Setup Phase: Create your data layer responsible for decoding Substreams data and saving it to desired storage
  2. Handler Implementation: Implement the required handlers (either using functions or interfaces)
  3. Sinker Creation: Create a Sinker object using sink.New with your configuration
  4. Handler Registration: Pass your handlers to the sinker using sinker.Run(ctx, cursor, handlers)
Execution Flow

When the sinker is running, handlers are called in the following order:

  1. HandleSessionInit (if implemented) - Called once when the session starts
  2. HandleInitialSnapshotData (if implemented) - Called for each initial snapshot in development mode
  3. HandleInitialSnapshotComplete (if implemented) - Called when all snapshots are delivered
  4. HandleProgress (if implemented) - Called periodically during processing
  5. HandleBlockScopedData (required) - Called for each block's data
  6. HandleBlockUndoSignal (required) - Called when blockchain reorganizations occur
  7. HandleError (if implemented) - Called when errors are received from the server
  8. HandleBlockRangeCompletion (if implemented) - Called when the specified block range is fully processed
Data Processing

The HandleBlockScopedData handler is called for each block scoped data message received from the Substreams API. It contains all the data output for the given Substreams module. In your handler, you are responsible for:

  • Decoding and processing the data
  • Saving the data to your storage system
  • Persisting the cursor for checkpoint recovery
  • Returning an error if there's a problem (which will trigger retries or termination based on configuration)
Handling Blockchain Reorganizations

The HandleBlockUndoSignal handler is called when a blockchain reorganization (fork) occurs. The message contains:

  • LastValidBlock: Points to the last block that should be assumed to be part of the canonical chain
  • LastValidCursor: The cursor that should be used as the current active position

Important: You must treat every piece of data from blocks where BlockScopedData.Clock.Number > LastValidBlock.Number as invalid and remove it from your storage. For example, if your entities include block numbers, you should delete all entities where blockNumber > LastValidBlock.Number.

Error Handling

The sink library provides robust error handling:

  • Retryable Errors: Wrap errors in derr.NewRetryableError(err) to indicate they should be retried
  • Fatal Errors: Return unwrapped errors to indicate fatal conditions that should stop processing
  • Backoff Strategy: The library uses exponential backoff for retryable errors
  • Max Retries: Can be configured for production deployments (0=no retries, -1=infinite, >0=specific count)

Practical Example

See examples from the 'examples' folder

From Viper

Our sink(s) are all using Viper/Cobra and a set of public StreamingFast libraries to deal with flags, logging, environment variables, etc. If you use the same structure as us, you can benefit from sink.AddFlagsToSet and sink.ConfigFromViper which both perform most of the boilerplate to bootstrap a sinker from flags.

Configuration Options

The SinkerConfig struct provides extensive configuration options:

config := &sink.SinkerConfig{
    // Required: Substreams package and output module
    Pkg:          pkg,                    // *pbsubstreams.Package
    OutputModule: outputModule,           // *pbsubstreams.Module

    // Connection settings
    ClientConfig: clientConfig,           // *client.SubstreamsClientConfig

    // Block range
    StartBlock:   0,                      // int64
    StopBlock:    0,                      // uint64 (0 means live streaming)

    // Processing mode
    Mode:         sink.SubstreamsModeDevelopment, // or SubstreamsModeProduction

    // Error handling
    MaxRetries: -1,                       // int - maximum retries (0=no retries, -1=infinite, >0=specific count)

    // Performance tuning
    FinalBlocksOnly: false,               // bool - disable undo handling for faster processing
    UndoBufferSize:  12,                  // int - number of blocks to buffer for undo handling

    // Development features
    DevOutputModules:    []string{},      // Debug specific modules
    DevOutputSnapshots:  []string{},      // Get initial snapshots for stores

    // Monitoring
    LivenessChecker: livenessChecker,     // *sink.LivenessChecker (optional)

    // Logging
    Logger: logger,                       // *zap.Logger
    Tracer: tracer,                       // *otellib.Tracer
}

Launching

The sinker can be launched by calling the Run method on the Sinker object. The Run method will block until the sinker is stopped or encounters an error.

ctx := context.Background()
cursor, err := sink.NewCursor("") // Start from beginning, or load from checkpoint
if err != nil {
    return fmt.Errorf("creating cursor: %w", err)
}

// Run the sinker with your handlers
err = sinker.Run(ctx, cursor, handlers)
if err != nil {
    return fmt.Errorf("sinker failed: %w", err)
}

The sinker implements the shutter interface which can be used to handle all shutdown logic (e.g., flushing any remaining data to storage, stopping the sink in case of database disconnection, etc.).

Graceful Shutdown

You can implement graceful shutdown using context cancellation:

ctx, cancel := context.WithCancel(context.Background())

// Handle shutdown signals
go func() {
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
    <-sigChan
    fmt.Println("Shutting down gracefully...")
    cancel()
}()

err := sinker.Run(ctx, cursor, handlers)
if err != nil && err != context.Canceled {
    log.Fatalf("Sinker error: %v", err)
}
Monitoring and Statistics

The sinker provides built-in statistics that can be accessed after completion:

sinker.Run(ctx, cursor, handlers)

// Print statistics
fmt.Printf("Total Processed Bytes: %d\n", sink.ProgressMessageProcessedBytes.Get())
fmt.Printf("Total Processed Blocks: %d\n", sink.ProgressMessageTotalProcessedBlocks.Get())
fmt.Printf("Total Received Bytes: %d\n", sink.DataMessageSizeBytes.Get())

Example Implementations

The following sinks are included with the substreams binary:

  • example - A simple example
  • webhook - Webhook sink for HTTP endpoints
  • noop - Not exactly a sink -> used to force the substreams server to prepare its cache, with minimal data egress.
  • protojson - Sink to JSONL files (using ProtoJSON encoding)

The following repositories are production examples of how the sink library can be used:

Documentation

Index

Constants

View Source
const (
	FlagEndpoint        = "endpoint"
	ShortFlagEndoint    = "e"
	FlagStartBlock      = "start-block"
	ShortFlagStartBlock = "s"
	FlagStopBlock       = "stop-block"
	ShortFlagStopBlock  = "t"
	FlagCursor          = "cursor"
	ShortFlagCursor     = "c"

	FlagFinalBlocksOnly = "final-blocks-only"
	FlagAPIKeyEnvvar    = "api-key-envvar"
	FlagAPITokenEnvvar  = "api-token-envvar"

	FlagNetwork              = "network"
	FlagParams               = "params"
	FlagInsecure             = "insecure"
	FlagPlaintext            = "plaintext"
	FlagForceProtocolVersion = "force-protocol-version"

	FlagUndoBufferSize     = "undo-buffer-size"
	FlagLiveBlockTimeDelta = "live-block-time-delta"
	FlagDevelopmentMode    = "development-mode"
	FlagMaxRetries         = "max-retries"

	FlagSkipPackageValidation        = "skip-package-validation"
	FlagPartialBlocks                = "partial-blocks"
	FlagExtraHeaders                 = "header"
	FlagNoopMode                     = "noop-mode"
	FlagProtoPath                    = "proto-path"
	FlagProtoDescriptorSet           = "proto-descriptor-set"
	FlagPrometheusAddr               = "prometheus-addr"
	FlagSkipCheckModuleBinariesExist = "skip-check-module-binaries-exist"
)
View Source
const IgnoreOutputModuleType string = ""

IgnoreOutputModuleType can be used instead of the expected output module type when you want to validate this yourself, for example if you accept multiple output type(s).

View Source
const InferOutputModuleFromPackage string = "@!##_InferOutputModuleFromSpkg_##!@"

InferOutputModuleFromPackage can be used instead of the actual module's output name and has the effect that output module is extracted directly from the pbsubstreams.Package via the `SinkModule` field.

Variables

View Source
var AvgBlockTimeDelta = dmetrics.NewAvgDurationCounter(30*time.Second, time.Second, "Average duration for BlockTimeDelta")
View Source
var AvgBlockWaitTime = dmetrics.NewAvgDurationCounter(30*time.Second, time.Second, "Average duration for BlockWaitTime")
View Source
var AvgLocalProcessingTime = dmetrics.NewAvgDurationCounter(30*time.Second, time.Second, "Average duration for LocalProcessingTime")
View Source
var BackprocessingCompletion = Metrics.NewGauge("substreams_sink_backprocessing_completion", "Determines if backprocessing is completed, which is if we receive a first data message")
View Source
var BlockTimeDelta = Metrics.NewGauge("substreams_sink_block_time_delta", "The difference between the last received block's BlockTime and the previous block's BlockTime -- can be skewed very high when processing older segments with a BlockFilter or skipped outputs")
View Source
var BlockWaitTime = Metrics.NewGauge("substreams_sink_block_wait_time", "The time that the sinks spends waiting for the next block from substreams -- should converge to the block production time of the chain")
View Source
var DataMessageCount = Metrics.NewCounter("substreams_sink_data_message", "The number of data message received")
View Source
var ErrBackOffExpired = errors.New("unable to complete work within backoff time limit")
View Source
var ErrHandlerNotImplemented = errors.New("handler not implemented")
View Source
var FlagIgnore = FlagExcludeDefault

Deprecated: use FlagExclude instead

View Source
var HeadBlockNumber = Metrics.NewHeadBlockNumber("substreams_sink")
View Source
var HeadBlockTimeDrift = Metrics.NewHeadTimeDrift("substreams_sink")
View Source
var LocalProcessingTime = Metrics.NewGauge("substreams_sink_local_processing_time", "The time that the sinks spends processing the received block")
View Source
var MessageSizeBytes = Metrics.NewCounter("substreams_sink_message_size_bytes", "The number of total bytes of message received from the Substreams backend")
View Source
var Metrics = dmetrics.NewSet()
View Source
var ProcessedBlocks = Metrics.NewGauge("substreams_sink_processed_blocks", "Total processed blocks on Substreams server")
View Source
var ProcessedBytes = Metrics.NewGauge("substreams_sink_processed_bytes", "Total processed bytes on Substreams server")
View Source
var ProgressMessageCount = Metrics.NewGauge("substreams_sink_progress_message", "The number of progress message received")
View Source
var ProgressMessageLastBlock = Metrics.NewGaugeVec("substreams_sink_progress_message_last_block", []string{"stage"}, "Latest progress reported processed range end block for each stage (not necessarily contiguous)")
View Source
var ProgressMessageLastContiguousBlock = Metrics.NewGaugeVec("substreams_sink_progress_message_last_contiguous_block", []string{"stage"}, "Latest progress reported processed end block for the first completed (contiguous) range")
View Source
var ProgressMessageRunningJobs = Metrics.NewGaugeVec("substreams_sink_progress_message_running_jobs", []string{"stage"}, "Latest reported number of active jobs for each stage")
View Source
var ServerEgressBytes = Metrics.NewCounter("substreams_sink_server_egress_bytes", "Total egress bytes from Substreams server (total uncompressed bytes received)")
View Source
var SubstreamsErrorCount = Metrics.NewCounter("substreams_sink_error", "The error count we encountered when interacting with Substreams for which we had to restart the connection loop")
View Source
var UndoMessageCount = Metrics.NewCounter("substreams_sink_undo_message", "The number of block undo message received")
View Source
var UnknownMessageCount = Metrics.NewCounter("substreams_sink_unknown_message", "The number of unknown message received")

Functions

func AddFlagsToSet

func AddFlagsToSet(flags *pflag.FlagSet, ignore ...FlagInclusionExclusion)

AddFlagsToSet can be used to import standard flags a controls which flags to ignore (if default is to include it) or include (if default is to ignore it).

needed for sink to configure itself. By using

this method to define your flag and using `cli.ConfigureViper` (import "github.com/streamingfast/cli") in your main application command, `NewFromViper` is usable to easily create a `sink.Sinker` instance.

Default Inclusion

Added by default: FlagEndpoint, FlagStartBlock, FlagStopBlock, FlagNetwork, FlagParams, FlagInsecure, FlagPlaintext, FlagForceProtocolVersion, FlagUndoBufferSize, FlagLiveBlockTimeDelta, FlagDevelopmentMode, FlagNoopMode, FlagFinalBlocksOnly, FlagMaxRetries, FlagSkipPackageValidation, FlagExtraHeaders, FlagAPIKeyEnvvar, FlagAPITokenEnvvar, FlagProtoPath, FlagProtoDescriptorSet, FlagPrometheusAddr

To avoid adding a default included flag, use [FlagExcludeDefault(<flag>, ...)].

Possible Inclusion

Not added by default but can be explicitly included: FlagCursor, FlagSkipCheckModuleBinariesExist, FlagPartialBlocks.

You add those flags by using [FlagIncludeOptional(<flag>, ...)].

Reference Table

FlagEndpoint: `--endpoint` (-e) (defaults `""`)
FlagStartBlock: `--start-block` (-s) (defaults `""`)
FlagStopBlock: `--stop-block` (-t) (defaults `"0"`)
FlagCursor: `--cursor` (-c) (defaults `""`)
FlagNetwork: `--network` (defaults `""`)
FlagParams: `--params` (-p) (defaults `[]`)
FlagInsecure: `--insecure` (defaults `false`)
FlagPlaintext: `--plaintext` (defaults `false`)
FlagForceProtocolVersion: `--force-protocol-version` (defaults `0`)
FlagUndoBufferSize: `--undo-buffer-size` (defaults `0`)
FlagLiveBlockTimeDelta: `--live-block-time-delta` (defaults `0`)
FlagDevelopmentMode: `--development-mode` (defaults `false`)
FlagNoopMode: `--noop-mode` (defaults `false`)
FlagFinalBlocksOnly: `--final-blocks-only` (defaults `false`)
FlagMaxRetries: `--max-retries` (defaults `3`)
FlagSkipPackageValidation: `--skip-package-validation` (defaults `false`)
FlagPartialBlocks: `--partial-blocks` (defaults `false`)
FlagSkipCheckModuleBinariesExist: `--skip-check-module-binaries-exist` (defaults `true`)
FlagExtraHeaders: `--header` (-H) (defaults `[]`)
FlagAPIKeyEnvvar: `--api-key-envvar` (default `SUBSTREAMS_API_KEY`)
FlagAPITokenEnvvar: `--api-token-envvar` (default `SUBSTREAMS_API_TOKEN`)
FlagProtoPath: `--proto-path` (defaults `""`)
FlagProtoDescriptorSet: `--proto-descriptor-set` (defaults `""`)
FlagPrometheusAddr: `--prometheus-addr` (defaults `"localhost:9102"`)

func LoadSubstreamsAuthEnvFile

func LoadSubstreamsAuthEnvFile(manifestPath string)

LoadSubstreamsAuthEnvFile loads authentication environment variables from .substreams.env file

func ReadManifestAndModule

func ReadManifestAndModule(
	manifestPath string,
	network string,
	paramsStrings []string,
	outputModuleName string,
	expectedOutputModuleType string,
	skipPackageValidation bool,
	additionalOptions []manifest.Option,
	zlog *zap.Logger,
) (
	pkg *pbsubstreams.Package,
	module *pbsubstreams.Module,
	outputModuleHash manifest.ModuleHash,
	err error,
)

ReadManifestAndModule reads the manifest and returns the package, the output module and its hash.

If outputModuleName is set to InferOutputModuleFromPackage, the sink will try to infer the output module from the package's sink_module field, if present.

If expectedOutputModuleType is set to IgnoreOutputModuleType, the sink will not validate the output module type.

If skipPackageValidation is set to true, the sink will not validate the package, you will have to do it yourself.

func RegisterMetrics deprecated added in v1.17.0

func RegisterMetrics()

RegisterMetrics registers all sink-related metrics.

Deprecated: the metrics are now auto-registered on Sinker.Run call, remove this call.

func SubstreamsModeNames

func SubstreamsModeNames() []string

SubstreamsModeNames returns a list of possible string values of SubstreamsMode.

func WriteCursor

func WriteCursor(filename string, cursor *Cursor) error

WriteCursor writes cursor to file using temp file and rename. Returns nil if filename is empty or cursor is nil.

Types

type BackOffStringer

type BackOffStringer struct{ backoff.BackOff }

func (BackOffStringer) String

func (s BackOffStringer) String() string

type BytesRepresentation

type BytesRepresentation int
const (
	BytesAsBase64 BytesRepresentation = iota
	BytesAsHex
	BytesAsString
	BytesAsBase58
)

func BytesEncodingToRepresentation

func BytesEncodingToRepresentation(enc string) BytesRepresentation

Helper to map string to BytesRepresentation

func InferBytesRepresentation

func InferBytesRepresentation(network string, endpoint string) BytesRepresentation

InferBytesRepresentation infers the bytes representation based on the network or endpoint. It first checks the network ID, and if not found, it checks the endpoint. If neither is provided, it defaults to Hex encoding. It returns a dynamic.BytesRepresentation based on the encoding.

type Cursor

type Cursor struct {
	*bstream.Cursor
}

func MustNewCursor

func MustNewCursor(cursor string) *Cursor

func NewBlankCursor

func NewBlankCursor() *Cursor

func NewCursor

func NewCursor(cursor string) (*Cursor, error)

func ReadCursor

func ReadCursor(filename string) (*Cursor, error)

ReadCursor will return nil if filename is empty or not found

func (*Cursor) Block

func (c *Cursor) Block() bstream.BlockRef

func (*Cursor) IsBlank

func (c *Cursor) IsBlank() bool

func (*Cursor) IsEqualTo

func (c *Cursor) IsEqualTo(other *Cursor) bool

func (*Cursor) MarshalLogObject

func (c *Cursor) MarshalLogObject(encoder zapcore.ObjectEncoder) error

func (*Cursor) String

func (c *Cursor) String() string

String returns a string representation suitable for handling a Firehose request meaning a blank cursor returns "".

type CursorBasedLivenessChecker added in v1.16.4

type CursorBasedLivenessChecker struct {
	// contains filtered or unexported fields
}

func NewCursorBasedLivenessChecker added in v1.16.4

func NewCursorBasedLivenessChecker() *CursorBasedLivenessChecker

func (*CursorBasedLivenessChecker) CheckCursor added in v1.16.4

func (t *CursorBasedLivenessChecker) CheckCursor(cursor string)

func (*CursorBasedLivenessChecker) IsLive added in v1.16.4

type DeltaLivenessChecker

type DeltaLivenessChecker struct {
	// contains filtered or unexported fields
}

func NewDeltaLivenessChecker

func NewDeltaLivenessChecker(delta time.Duration) *DeltaLivenessChecker

func (*DeltaLivenessChecker) IsLive

func (t *DeltaLivenessChecker) IsLive(clock *pbsubstreams.Clock) bool

type FlagIgnored deprecated

type FlagIgnored = FlagInclusionExclusion

Deprecated: Use FlagInclusionExclusion instead

type FlagInclusionExclusion added in v1.17.0

type FlagInclusionExclusion interface {
	IsExplicitlyIncluded(flag string) bool
	IsExplicitlyExcluded(flag string) bool
}

func FlagExcludeDefault added in v1.17.0

func FlagExcludeDefault(in ...string) FlagInclusionExclusion

FlagExcludeDefault can be used to exclude one or more of the default flags from being added to a flag set when using AddFlagsToSet.

func FlagIncludeOptional added in v1.17.0

func FlagIncludeOptional(in ...string) FlagInclusionExclusion

FlagIncludeOptional can be used to include one or more optional flags to be added to a flag set when using AddFlagsToSet.

type LivenessChecker

type LivenessChecker interface {
	IsLive(block *pbsubstreams.Clock) bool
}

type Option added in v1.17.0

type Option func(s *Sinker)

func WithAgent deprecated added in v1.17.0

func WithAgent(agent string) Option

WithAgent configures the Sinker instance to use a custom agent string when connecting to the Substreams backend server. Otherwise, the default agent string is used.

Deprecated: Do not rely on New and Option, instead use NewFromConfig and configure the SinkerConfig directly.

func WithBlockDataBuffer deprecated added in v1.17.0

func WithBlockDataBuffer(bufferSize int) Option

WithBlockDataBuffer creates a buffer of block data which is used to handle undo fork steps.

Ensure that this buffer is large enough to capture all block reorganizations. If the buffer is too small, the sinker will not be able to handle the reorganization and will error if an undo is received for a block which has already been returned to the sink. If the buffer is too large, the sinker will take more time than necessary to write data to the sink.

If the sink is configured to handle irreversible blocks, the default buffer size is 12. If you pass 0, block data buffer will be disabled completely.

Deprecated: Do not rely on New and Option, instead use NewFromConfig and configure the SinkerConfig directly, the counterpart of this option is to set [SinkerConfig.UndoBufferSize] to a value > 0, which is effective only if [SinkerConfig.FinalBlocksOnly] is false, meaning that undo signals are expected.

func WithBlockRange deprecated added in v1.17.0

func WithBlockRange(blockRange *bstream.Range) Option

WithBlockRange configures the Sinker instance to only stream for the range specified. If there is no range specified on the Sinker, the Sinker is going to sink automatically from module's start block to live never ending.

Deprecated: Do not rely on New and Option, instead use NewFromConfig and configure the SinkerConfig directly.

func WithExtraHeaders deprecated added in v1.17.0

func WithExtraHeaders(headers []string) Option

WithExtraHeaders configures the Sinker instance to send extra headers to the Substreams backend server.

Deprecated: Do not rely on New and Option, instead use NewFromConfig and configure the SinkerConfig directly.

func WithFinalBlocksOnly deprecated added in v1.17.0

func WithFinalBlocksOnly() Option

WithFinalBlocksOnly configures the Sinker to only stream Substreams output that is considered final by the Substreams backend server.

This means that `WithBlockDataBuffer` if used is discarded and [BlockUndoSignalHandler] will never be called.

Deprecated: Do not rely on New and Option, instead use NewFromConfig and configure the SinkerConfig directly.

func WithInfiniteRetry deprecated added in v1.17.0

func WithInfiniteRetry() Option

WithInfiniteRetry remove the maximum retry limit of 15 (hard-coded right now) which spans approximately 5m so that retry is perform indefinitely without never exiting the process.

Deprecated: Do not rely on New and Option, instead use NewFromConfig and configure the SinkerConfig directly.

func WithLivenessChecker deprecated added in v1.17.0

func WithLivenessChecker(livenessChecker LivenessChecker) Option

WithLivenessChecker configures a [LivnessCheck] on the Sinker instance.

By configuring a liveness checker, the [MessageContext] received by [BlockScopedDataHandler] and [BlockUndoSignalHandler] will have the field [MessageContext.IsLive] properly populated.

Deprecated: Do not rely on New and options, instead use NewFromConfig and configure the SinkerConfig directly.

func WithRetryBackOff deprecated added in v1.17.0

func WithRetryBackOff(backOff backoff.BackOff) Option

WithRetryBackOff configures the Sinker to which itself configurs the Substreams gRPC stream to only send pbsubstreamsrpc.BlockScopedData once the block is final, this means that `WithBlockDataBuffer` if used has is discarded and [BlockUndoSignalHandler] will never be called.

Deprecated: Do not rely on New and Option, instead use NewFromConfig and configure the SinkerConfig directly.

type Sinker

type Sinker struct {
	*shutter.Shutter
	*SinkerConfig
	// contains filtered or unexported fields
}

func New deprecated

func New(
	mode SubstreamsMode,
	noopMode bool,
	pkg *pbsubstreams.Package,
	outputModule *pbsubstreams.Module,
	hash manifest.ModuleHash,
	clientConfig *client.SubstreamsClientConfig,
	logger *zap.Logger,
	tracer logging.Tracer,
	opts ...Option,
) (*Sinker, error)

New creates a new Sinker instance with the provided parameters.

Deprecated: use NewFromConfig instead which takes a SinkerConfig struct, most options can simply be applied to the SinkerConfig struct directly.

func NewFromConfig added in v1.17.0

func NewFromConfig(
	config *SinkerConfig,
) (*Sinker, error)

New creates a new Sinker instance from the provided SinkerConfig. This function replaces the previous Options pattern with a more structured configuration approach. All configuration is now contained within the SinkerConfig struct, making it easier to manage and test.

func NewFromViper

func NewFromViper(
	cmd *cobra.Command,
	expectedOutputModuleType string,
	manifestPath string,
	outputModuleName string,
	userAgent string,
	zlog *zap.Logger,
	tracer logging.Tracer,
) (*Sinker, error)

NewFromViper constructs a new Sinker instance from a fixed set of "known" flags. This function creates a SinkerConfig from the Viper configuration and then uses it to create a new Sinker instance.

If you want to extract the sink output module's name directly from the Substreams package, if supported by your sink, instead of an actual name for paramater `outputModuleNameArg`, use `sink.InferOutputModuleFromPackage`.

The `expectedOutputModuleType` should be the fully qualified expected Protobuf package.

The `manifestPath` can be left empty in which case we this method is going to look in the current directory for a `substreams.yaml` file. If the `manifestPath` is non-empty and points to a directory, we will look for a `substreams.yaml` file in that directory.

func (*Sinker) ApiToken

func (s *Sinker) ApiToken() string

ApiToken returns the currently defined ApiToken sets on this sinker instance, "" is no api token was configured

func (*Sinker) BlockRange added in v1.17.0

func (s *Sinker) BlockRange() *bstream.Range

BlockRange returns a bstream.Range representing the start and stop blocks configured in the SinkerConfig. If StopBlock is 0, it returns an open-ended range starting from StartBlock.

func (*Sinker) BytesRepresentation

func (s *Sinker) BytesRepresentation() BytesRepresentation

func (*Sinker) ClientConfig

func (s *Sinker) ClientConfig() *client.SubstreamsClientConfig

ClientConfig returns the `SubstreamsClientConfig` used by this sinker instance.

func (*Sinker) EndpointConfig

func (s *Sinker) EndpointConfig() (endpoint string, plaintext bool, insecure bool)

EndpointConfig returns the endpoint configuration used by this sinker instance, this is an extraction of the endpoint configuration from the client configuration.

func (*Sinker) OutputModuleHash

func (s *Sinker) OutputModuleHash() string

OutputModuleHash returns the module output hash, can be used by consumer to warn if the module changed between restart of the process.

func (*Sinker) OutputModuleName

func (s *Sinker) OutputModuleName() string

func (*Sinker) OutputModuleTypePrefixed

func (s *Sinker) OutputModuleTypePrefixed() (prefixed string)

OutputModuleTypePrefixed returns the prefixed output module's type so the type will always be prefixed with "proto:".

func (*Sinker) OutputModuleTypeUnprefixed

func (s *Sinker) OutputModuleTypeUnprefixed() (unprefixed string)

OutputModuleTypeUnprefixed returns the unprefixed output module's type so the type will **never** be prefixed with "proto:".

func (*Sinker) Package

func (s *Sinker) Package() *pbsubstreams.Package

func (*Sinker) PrintStats

func (s *Sinker) PrintStats()

func (*Sinker) Request

func (s *Sinker) Request() *pbsubstreamsrpcv3.Request

func (*Sinker) Run

func (s *Sinker) Run(ctx context.Context, cursor *Cursor, handler SinkerHandler)

func (*Sinker) StartBlock

func (s *Sinker) StartBlock() int64

StartBlock is always defined, defaults to module initial block if not set by the user, 0 if module initial block is not set which means start from chain's first streamable block.

func (*Sinker) StopBlock

func (s *Sinker) StopBlock() uint64

StopBlock is optional, 0 means run until the chain's head and should be treated as infinite/open-ended stream of blocks.

The stop block is considered exclusive, meaning if you set StopBlock to 100, the last block processed will be 99.

type SinkerCompletionHandler

type SinkerCompletionHandler interface {
	// HandleBlockRangeCompletion is called when the sinker is done processing the requested range, only when
	// the stream has correctly reached its end block. If the sinker is configured to stream live, this callback
	// will never be called.
	//
	// If the sinker terminates with an error, this callback will not be called.
	//
	// The handler receives the following arguments:
	// - `ctx` is the context runtime, your handler should be minimal, so normally you shouldn't use this.
	// - `cursor` is the cursor at the given block, this cursor should be saved regularly as a checkpoint in case the process is interrupted.
	HandleBlockRangeCompletion(ctx context.Context, cursor *Cursor) error
}

SinkerCompletionHandler defines an extra interface that can be implemented on top of `SinkerHandler` where the callback will be invoked when the sinker is done processing the requested range. This is useful to implement a checkpointing mechanism where when the range has correctly fully processed, you can do something meaningful.

type SinkerConfig

type SinkerConfig struct {
	// Substreams package configuration
	Pkg                              *pbsubstreams.Package
	OutputModule                     *pbsubstreams.Module
	OutputModuleHash                 manifest.ModuleHash
	SupportIndexOutputProductionMode bool

	// Client configuration
	ClientConfig *client.SubstreamsClientConfig

	// Operational configuration
	Mode                 SubstreamsMode
	NoopMode             bool
	LimitProcessedBlocks uint64

	// Block processing configuration
	// StartBlock is always defined, defaults to module initial block if not set by the user, 0 if module initial block is not set
	// which means start from chain's first streamable block.
	StartBlock int64
	// StopBlock is optional, 0 means run until the chain's head and should be treated as infinite/open-ended
	// stream of blocks.
	//
	// The stop block is considered exclusive, meaning if you set StopBlock to 100, the last block processed
	// will be 99.
	StopBlock       uint64
	UndoBufferSize  int
	FinalBlocksOnly bool
	PartialBlocks   bool

	// Dev-mode extras
	DevOutputSnapshots []string
	DevOutputModules   []string // if this is empty, the request will contain the output module in here

	// Retry and reliability configuration
	MaxRetries int // 0 = no retries, -1 = infinite retries, >0 = specific number of retries
	BackOff    backoff.BackOff

	// Liveness configuration
	LiveBlockTimeDelta time.Duration
	LivenessChecker    LivenessChecker

	// Additional configuration
	ExtraHeaders []string

	// Logging and tracing
	Logger *zap.Logger
	Tracer logging.Tracer

	// Expose metrics
	PrometheusAddr string // if non-null, will listen to this address

	// Legacy fields for backward compatibility
	Params                []string
	Network               string
	SkipPackageValidation bool
}

SinkerConfig contains all configuration needed to create and run a Sinker.

func ConfigFromViper

func ConfigFromViper(
	cmd *cobra.Command,
	expectedOutputModuleType string,
	manifestPath string,
	outputModuleName string,
	userAgent string,
	zlog *zap.Logger,
	tracer logging.Tracer,
) (*SinkerConfig, error)

ConfigFromViper creates a SinkerConfig from the provided Viper configuration. This function extracts all necessary configuration from the command flags and creates a complete SinkerConfig that can be used to create a Sinker instance. The endpoint is read from the FlagEndpoint flag, and the block range is computed from the FlagStartBlock and FlagStopBlock flags.

If you want to dynamically validate the output module type yourself, for argument `expectedOutputModuleType`, you can pass IgnoreOutputModuleType or the empty string.

func (*SinkerConfig) BlockRange added in v1.17.0

func (c *SinkerConfig) BlockRange() *bstream.Range

BlockRange returns a bstream.Range representing the start and stop blocks configured in the SinkerConfig. If StopBlock is 0, it returns an open-ended range starting from StartBlock.

func (*SinkerConfig) CloneWithNewModule added in v1.17.10

func (config *SinkerConfig) CloneWithNewModule(
	expectedOutputModuleType string,
	manifestPath string,
	outputModuleName string,
) (*SinkerConfig, error)

CloneWithNewModule creates a new SinkerConfig based on the current one but replacing the package, output module and output module hash with the ones read from the manifest at `manifestPath`, using `outputModuleName` as the output module to extract from the package.

The rest of the configuration remains the same and you can modify it further if needed on the returned SinkerConfig.

func (*SinkerConfig) ExtractDefaultParams

func (c *SinkerConfig) ExtractDefaultParams() []string

ExtractDefaultParams extracts default parameter values from the package modules that are not already specified in the existing parameters. This is useful for GUI applications that want to show default values from the package.

type SinkerErrorHandler

type SinkerErrorHandler interface {

	// HandleError defines the callback that will handle Substreams errors.
	//
	// The handler receives the following arguments:
	// - `ctx` is the context runtime, your handler should be minimal, so normally you shouldn't use this.
	// - `error` is simply the error received from the Substreams API.
	//
	// The [HandleError] is optional and can be nil.
	HandleError(ctx context.Context, err error)
}

type SinkerHandler

type SinkerHandler interface {
	// HandleBlockScopedData defines the callback that will handle Substreams `BlockScopedData` messages.
	//
	// The handler receives the following arguments:
	// - `ctx` is the context runtime, your handler should be minimal, so normally you shouldn't use this.
	// - `data` contains the block scoped data that was received from the Substreams API, refer to it's definition for proper usage.
	// - `isLive` will be non-nil if a [LivenessChecker] has been configured on the [Sinker] instance that call the handler.
	// - `cursor` is the cursor at the given block, this cursor should be saved regularly as a checkpoint in case the process is interrupted.
	//
	// The [HandleBlockScopedData] must be non-nil, the [Sinker] enforces this.
	//
	// Your handler must return an error value that can be nil or non-nil. If non-nil, the error is assumed to be a fatal
	// error and the [Sinker] will not retry it. If the error is retryable, wrap it in `derr.NewRetryableError(err)` to notify
	// the [Sinker] that it should retry from last valid cursor. It's your responsibility to ensure no data was persisted prior the
	// the error.
	HandleBlockScopedData(ctx context.Context, data *pbsubstreamsrpc.BlockScopedData, isLive *bool, cursor *Cursor) error

	// HandleBlockUndoSignal defines the callback that will handle Substreams `BlockUndoSignal` messages.
	//
	// The handler receives the following arguments:
	// - `ctx` is the context runtime, your handler should be minimal, so normally you shouldn't use this.
	// - `undoSignal` contains the last valid block that is still valid, any data saved after this last saved block should be discarded.
	// - `cursor` is the cursor at the given block, this cursor should be saved regularly as a checkpoint in case the process is interrupted.
	//
	// The [HandleBlockUndoSignal] can be nil if the sinker is configured to stream final blocks only, otherwise it must be set,
	// the [Sinker] enforces this.
	//
	// Your handler must return an error value that can be nil or non-nil. If non-nil, the error is assumed to be a fatal
	// error and the [Sinker] will not retry it. If the error is retryable, wrap it in `derr.NewRetryableError(err)` to notify
	// the [Sinker] that it should retry from last valid cursor. It's your responsibility to ensure no data was persisted prior the
	// the error.
	HandleBlockUndoSignal(ctx context.Context, undoSignal *pbsubstreamsrpc.BlockUndoSignal, cursor *Cursor) error
}

func NewSinkerFullHandlers

func NewSinkerFullHandlers(
	handleBlockScopedData func(ctx context.Context, data *pbsubstreamsrpc.BlockScopedData, isLive *bool, cursor *Cursor) error,
	handleBlockUndoSignal func(ctx context.Context, undoSignal *pbsubstreamsrpc.BlockUndoSignal, cursor *Cursor) error,
	handleSessionInit func(ctx context.Context, req *pbsubstreamsrpc.Request, session *pbsubstreamsrpc.SessionInit) error,
	handleProgress func(ctx context.Context, progress *pbsubstreamsrpc.ModulesProgress),
	handleInitialSnapshotData func(ctx context.Context, debug *pbsubstreamsrpc.InitialSnapshotData) error,
	handleInitialSnapshotComplete func(ctx context.Context, complete *pbsubstreamsrpc.InitialSnapshotComplete) error,
	handleError func(ctx context.Context, error *pbsubstreamsrpc.Error),
) SinkerHandler

NewSinkerFullHandlers creates a SinkerHandler with extra interfaces for handling session initialization, progress and debug data

func NewSinkerFullHandlersWithPartial added in v1.17.8

func NewSinkerFullHandlersWithPartial(
	handleBlockScopedData func(ctx context.Context, data *pbsubstreamsrpc.BlockScopedData, isLive *bool, cursor *Cursor) error,
	handleBlockUndoSignal func(ctx context.Context, undoSignal *pbsubstreamsrpc.BlockUndoSignal, cursor *Cursor) error,
	handleSessionInit func(ctx context.Context, req *pbsubstreamsrpc.Request, session *pbsubstreamsrpc.SessionInit) error,
	handleProgress func(ctx context.Context, progress *pbsubstreamsrpc.ModulesProgress),
	handleInitialSnapshotData func(ctx context.Context, debug *pbsubstreamsrpc.InitialSnapshotData) error,
	handleInitialSnapshotComplete func(ctx context.Context, complete *pbsubstreamsrpc.InitialSnapshotComplete) error,
	handleError func(ctx context.Context, error *pbsubstreamsrpc.Error),
) SinkerHandler

NewSinkerFullHandlersWithPartial creates a SinkerHandler with extra interfaces for handling session initialization, progress and debug data

func NewSinkerHandlers

func NewSinkerHandlers(
	handleBlockScopedData func(ctx context.Context, data *pbsubstreamsrpc.BlockScopedData, isLive *bool, cursor *Cursor) error,
	handleBlockUndoSignal func(ctx context.Context, undoSignal *pbsubstreamsrpc.BlockUndoSignal, cursor *Cursor) error,
) SinkerHandler

NewSinkerHandlers creates a new SinkerHandler with only the required handlers and optional handlers

type SinkerProgressHandler

type SinkerProgressHandler interface {
	// HandleProgress defines the callback that will handle Substreams `ModulesProgress` messages.
	//
	// The handler receives the following arguments:
	// - `ctx` is the context runtime, your handler should be minimal, so normally you shouldn't use this.
	// - `progress` contains the progress information that was received from the Substreams API.
	//
	// The [HandleProgress] is optional and can be nil.
	HandleProgress(ctx context.Context, progress *pbsubstreamsrpc.ModulesProgress)
}

SinkerProgressHandler defines an extra interface that handles the Progress message.

type SinkerSessionInitHandler

type SinkerSessionInitHandler interface {
	// HandleSessionInit defines the callback that will handle Substreams `SessionInit` messages.
	//
	// The handler receives the following arguments:
	// - `ctx` is the context runtime, your handler should be minimal, so normally you shouldn't use this.
	// - `req` is the request that was sent to the Substreams API.
	// - `sessionInit` contains the session initialization data that was received from the Substreams API.
	//
	// The [HandleSessionInit] is optional and can be nil.
	//
	// Your handler must return an error value that can be nil or non-nil. If non-nil, the error is assumed to be a fatal
	// error and the [Sinker] will shutdown
	HandleSessionInit(ctx context.Context, req *pbsubstreamsrpcv3.Request, sessionInit *pbsubstreamsrpc.SessionInit) error
}

type SinkerSnapshotHandler

type SinkerSnapshotHandler interface {
	// HandleInitialSnapshotData defines the callback that will handle Substreams `InitialSnapshotData` messages.
	//
	// The handler receives the following arguments:
	// - `ctx` is the context runtime, your handler should be minimal, so normally you shouldn't use this.
	// - `snapshotData` contains the initial snapshot data that was received from the Substreams API.
	//
	// The [HandleInitialSnapshotData] is optional and can be nil.
	//
	// Your handler must return an error value that can be nil or non-nil. If non-nil, the error is assumed to be a fatal
	// error and the [Sinker] will not retry it. If the error is retryable, wrap it in `derr.NewRetryableError(err)` to notify
	// the [Sinker] that it should retry from last valid cursor.
	HandleInitialSnapshotData(ctx context.Context, snapshotData *pbsubstreamsrpc.InitialSnapshotData) error

	// HandleInitialSnapshotComplete defines the callback that will handle Substreams `InitialSnapshotComplete` messages.
	//
	// The handler receives the following arguments:
	// - `ctx` is the context runtime, your handler should be minimal, so normally you shouldn't use this.
	// - `snapshotComplete` contains the initial snapshot completion information that was received from the Substreams API.
	//
	// The [HandleInitialSnapshotComplete] is optional and can be nil.
	//
	// Your handler must return an error value that can be nil or non-nil. If non-nil, the error is assumed to be a fatal
	// error and the [Sinker] will not retry it. If the error is retryable, wrap it in `derr.NewRetryableError(err)` to notify
	// the [Sinker] that it should retry from last valid cursor.
	HandleInitialSnapshotComplete(ctx context.Context, snapshotComplete *pbsubstreamsrpc.InitialSnapshotComplete) error
}

type Stats

type Stats struct {
	*shutter.Shutter
	// contains filtered or unexported fields
}

func (*Stats) Close

func (s *Stats) Close()

func (*Stats) LogNow

func (s *Stats) LogNow()

func (*Stats) RecordBlock

func (s *Stats) RecordBlock(block bstream.BlockRef)

func (*Stats) SetIndexOutputProductionMode added in v1.16.6

func (s *Stats) SetIndexOutputProductionMode()

func (*Stats) SetLiveness

func (s *Stats) SetLiveness(isLive *bool)

func (*Stats) SetNoop added in v1.17.0

func (s *Stats) SetNoop()

func (*Stats) Start

func (s *Stats) Start(each time.Duration)

type SubstreamsMode

type SubstreamsMode uint

ENUM(

Development
Production

)

const (
	// SubstreamsModeDevelopment is a SubstreamsMode of type Development.
	SubstreamsModeDevelopment SubstreamsMode = iota
	// SubstreamsModeProduction is a SubstreamsMode of type Production.
	SubstreamsModeProduction
)

func ParseSubstreamsMode

func ParseSubstreamsMode(name string) (SubstreamsMode, error)

ParseSubstreamsMode attempts to convert a string to a SubstreamsMode

func (SubstreamsMode) MarshalText

func (x SubstreamsMode) MarshalText() ([]byte, error)

MarshalText implements the text marshaller method

func (SubstreamsMode) String

func (x SubstreamsMode) String() string

String implements the Stringer interface.

func (*SubstreamsMode) UnmarshalText

func (x *SubstreamsMode) UnmarshalText(text []byte) error

UnmarshalText implements the text unmarshaller method

Directories

Path Synopsis
examples
simple-sink command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL