Documentation
¶
Index ¶
- Constants
- Variables
- func AddFlagsToSet(flags *pflag.FlagSet, ignore ...FlagIgnored)
- func LoadSubstreamsAuthEnvFile(manifestPath string)
- func ReadManifestAndModule(manifestPath string, network string, paramsStrings []string, ...) (pkg *pbsubstreams.Package, module *pbsubstreams.Module, ...)
- func SubstreamsModeNames() []string
- func WriteCursor(filename string, cursor *Cursor) error
- type BackOffStringer
- type BytesRepresentation
- type Cursor
- type DeltaLivenessChecker
- type FlagIgnored
- type LivenessChecker
- type Sinker
- func (s *Sinker) ApiToken() string
- func (s *Sinker) BytesRepresentation() BytesRepresentation
- func (s *Sinker) ClientConfig() *client.SubstreamsClientConfig
- func (s *Sinker) EndpointConfig() (endpoint string, plaintext bool, insecure bool)
- func (s *Sinker) OutputModuleHash() string
- func (s *Sinker) OutputModuleName() string
- func (s *Sinker) OutputModuleTypePrefixed() (prefixed string)
- func (s *Sinker) OutputModuleTypeUnprefixed() (unprefixed string)
- func (s *Sinker) Package() *pbsubstreams.Package
- func (s *Sinker) PrintStats()
- func (s *Sinker) Request() *pbsubstreamsrpc.Request
- func (s *Sinker) Run(ctx context.Context, cursor *Cursor, handler SinkerHandler)
- func (s *Sinker) StartBlock() int64
- func (s *Sinker) StopBlock() uint64
- type SinkerCompletionHandler
- type SinkerConfig
- type SinkerErrorHandler
- type SinkerHandler
- type SinkerProgressHandler
- type SinkerSessionInitHandler
- type SinkerSnapshotHandler
- type Stats
- type SubstreamsMode
Constants ¶
const ( FlagEndpoint = "endpoint" ShortFlagEndoint = "e" FlagStartBlock = "start-block" ShortFlagStartBlock = "s" FlagStopBlock = "stop-block" ShortFlagStopBlock = "t" FlagCursor = "cursor" ShortFlagCursor = "c" FlagFinalBlocksOnly = "final-blocks-only" FlagAPIKeyEnvvar = "api-key-envvar" FlagAPITokenEnvvar = "api-token-envvar" FlagNetwork = "network" FlagParams = "params" FlagInsecure = "insecure" FlagPlaintext = "plaintext" FlagUndoBufferSize = "undo-buffer-size" FlagLiveBlockTimeDelta = "live-block-time-delta" FlagDevelopmentMode = "development-mode" FlagInfiniteRetry = "infinite-retry" FlagSkipPackageValidation = "skip-package-validation" FlagExtraHeaders = "header" FlagNoopMode = "noop-mode" FlagProtoPath = "proto-path" FlagProtoDescriptorSet = "proto-descriptor-set" FlagPrometheusAddr = "prometheus-addr" )
const IgnoreOutputModuleType string = ""
IgnoreOutputModuleType can be used instead of the expected output module type when you want to validate this yourself, for example if you accept multiple output type(s).
const InferOutputModuleFromPackage string = "@!##_InferOutputModuleFromSpkg_##!@"
InferOutputModuleFromPackage can be used instead of the actual module's output name and has the effect that output module is extracted directly from the pbsubstreams.Package via the `SinkModule` field.
Variables ¶
var AvgBlockTimeDelta = dmetrics.NewAvgDurationCounter(30*time.Second, time.Second, "Average duration for BlockTimeDelta")
var AvgBlockWaitTime = dmetrics.NewAvgDurationCounter(30*time.Second, time.Second, "Average duration for BlockWaitTime")
var AvgLocalProcessingTime = dmetrics.NewAvgDurationCounter(30*time.Second, time.Second, "Average duration for LocalProcessingTime")
var BackprocessingCompletion = Metrics.NewGauge("substreams_sink_backprocessing_completion", "Determines if backprocessing is completed, which is if we receive a first data message")
var BlockTimeDelta = Metrics.NewGauge("substreams_sink_block_time_delta", "The difference between the last received block's BlockTime and the previous block's BlockTime -- can be skewed very high when processing older segments with a BlockFilter or skipped outputs")
var BlockWaitTime = Metrics.NewGauge("substreams_sink_block_wait_time", "The time that the sinks spends waiting for the next block from substreams -- should converge to the block production time of the chain")
var DataMessageCount = Metrics.NewCounter("substreams_sink_data_message", "The number of data message received")
var ErrBackOffExpired = errors.New("unable to complete work within backoff time limit")
var ErrHandlerNotImplemented = errors.New("handler not implemented")
var HeadBlockNumber = Metrics.NewHeadBlockNumber("substreams_sink")
var HeadBlockTimeDrift = Metrics.NewHeadTimeDrift("substreams_sink")
var LocalProcessingTime = Metrics.NewGauge("substreams_sink_local_processing_time", "The time that the sinks spends processing the received block")
var MessageSizeBytes = Metrics.NewCounter("substreams_sink_message_size_bytes", "The number of total bytes of message received from the Substreams backend")
var Metrics = dmetrics.NewSet()
var ProcessedBlocks = Metrics.NewGauge("substreams_sink_processed_blocks", "Total processed blocks on Substreams server")
var ProcessedBytes = Metrics.NewGauge("substreams_sink_processed_bytes", "Total processed bytes on Substreams server")
var ProgressMessageCount = Metrics.NewGauge("substreams_sink_progress_message", "The number of progress message received")
var ProgressMessageLastBlock = Metrics.NewGaugeVec("substreams_sink_progress_message_last_block", []string{"stage"}, "Latest progress reported processed range end block for each stage (not necessarily contiguous)")
var ProgressMessageLastContiguousBlock = Metrics.NewGaugeVec("substreams_sink_progress_message_last_contiguous_block", []string{"stage"}, "Latest progress reported processed end block for the first completed (contiguous) range")
var ProgressMessageRunningJobs = Metrics.NewGaugeVec("substreams_sink_progress_message_running_jobs", []string{"stage"}, "Latest reported number of active jobs for each stage")
var ServerEgressBytes = Metrics.NewCounter("substreams_sink_server_egress_bytes", "Total egress bytes from Substreams server (total uncompressed bytes received)")
var SubstreamsErrorCount = Metrics.NewCounter("substreams_sink_error", "The error count we encountered when interacting with Substreams for which we had to restart the connection loop")
var UndoMessageCount = Metrics.NewCounter("substreams_sink_undo_message", "The number of block undo message received")
var UnknownMessageCount = Metrics.NewCounter("substreams_sink_unknown_message", "The number of unknown message received")
Functions ¶
func AddFlagsToSet ¶
func AddFlagsToSet(flags *pflag.FlagSet, ignore ...FlagIgnored)
AddFlagsToSet can be used to import standard flags needed for sink to configure itself. By using this method to define your flag and using `cli.ConfigureViper` (import "github.com/streamingfast/cli") in your main application command, `NewFromViper` is usable to easily create a `sink.Sinker` instance.
Defines
Flag `--endpoint` (-e) (defaults `""`) Flag `--start-block` (-s) (defaults `""`) Flag `--stop-block` (-t) (defaults `""`) Flag `--cursor` (-c) (defaults `""`) Flag `--network` (defaults `""`) Flag `--params` (-p) (defaults `[]`) Flag `--insecure` (defaults `false`) Flag `--plaintext` (defaults `false`) Flag `--undo-buffer-size` (defaults `12`) Flag `--live-block-time-delta` (defaults `300*time.Second`) Flag `--development-mode` (defaults `false`) Flag `--noop-mode` (defaults `false`) Flag `--final-blocks-only` (defaults `false`) Flag `--infinite-retry` (defaults `false`) Flag `--skip-package-validation` (defaults `false`) Flag `--header` (-H) (defaults `[]`) Flag `--api-key-envvar` (default `SUBSTREAMS_API_KEY`) Flag `--api-token-envvar` (default `SUBSTREAMS_API_TOKEN`) Flag `--proto-path` (defaults `""`) Flag `--proto-descriptor-set` (defaults `""`) Flag `--prometheus-addr` (defaults `""`)
The `ignore` field can be used to multiple times to avoid adding the specified `flags` to the the set. This can be used for example to avoid adding `--final-blocks-only` when the sink is always final only.
AddFlagsToSet(flags, sink.FlagIgnore(sink.FlagFinalBlocksOnly))
func LoadSubstreamsAuthEnvFile ¶
func LoadSubstreamsAuthEnvFile(manifestPath string)
LoadSubstreamsAuthEnvFile loads authentication environment variables from .substreams.env file
func ReadManifestAndModule ¶
func ReadManifestAndModule( manifestPath string, network string, paramsStrings []string, outputModuleName string, expectedOutputModuleType string, skipPackageValidation bool, additionalOptions []manifest.Option, zlog *zap.Logger, ) ( pkg *pbsubstreams.Package, module *pbsubstreams.Module, outputModuleHash manifest.ModuleHash, err error, )
ReadManifestAndModule reads the manifest and returns the package, the output module and its hash.
If outputModuleName is set to InferOutputModuleFromPackage, the sink will try to infer the output module from the package's sink_module field, if present.
If expectedOutputModuleType is set to IgnoreOutputModuleType, the sink will not validate the output module type.
If skipPackageValidation is set to true, the sink will not validate the package, you will have to do it yourself.
func SubstreamsModeNames ¶
func SubstreamsModeNames() []string
SubstreamsModeNames returns a list of possible string values of SubstreamsMode.
func WriteCursor ¶
WriteCursor writes cursor to file using temp file and rename. Returns nil if filename is empty or cursor is nil.
Types ¶
type BackOffStringer ¶
func (BackOffStringer) String ¶
func (s BackOffStringer) String() string
type BytesRepresentation ¶
type BytesRepresentation int
const ( BytesAsBase64 BytesRepresentation = iota BytesAsHex BytesAsString BytesAsBase58 )
func BytesEncodingToRepresentation ¶
func BytesEncodingToRepresentation(enc string) BytesRepresentation
Helper to map string to BytesRepresentation
func InferBytesRepresentation ¶
func InferBytesRepresentation(network string, endpoint string) BytesRepresentation
InferBytesRepresentation infers the bytes representation based on the network or endpoint. It first checks the network ID, and if not found, it checks the endpoint. If neither is provided, it defaults to Hex encoding. It returns a dynamic.BytesRepresentation based on the encoding.
type Cursor ¶
func MustNewCursor ¶
func NewBlankCursor ¶
func NewBlankCursor() *Cursor
func ReadCursor ¶
ReadCursor will return nil if filename is empty or not found
func (*Cursor) MarshalLogObject ¶
func (c *Cursor) MarshalLogObject(encoder zapcore.ObjectEncoder) error
type DeltaLivenessChecker ¶
type DeltaLivenessChecker struct {
// contains filtered or unexported fields
}
func NewDeltaLivenessChecker ¶
func NewDeltaLivenessChecker(delta time.Duration) *DeltaLivenessChecker
func (*DeltaLivenessChecker) IsLive ¶
func (t *DeltaLivenessChecker) IsLive(clock *pbsubstreams.Clock) bool
type FlagIgnored ¶
func FlagIgnore ¶
func FlagIgnore(in ...string) FlagIgnored
type LivenessChecker ¶
type LivenessChecker interface {
IsLive(block *pbsubstreams.Clock) bool
}
type Sinker ¶
type Sinker struct {
*shutter.Shutter
*SinkerConfig
// contains filtered or unexported fields
}
func New ¶
func New( config *SinkerConfig, ) *Sinker
New creates a new Sinker instance from the provided SinkerConfig. This function replaces the previous Options pattern with a more structured configuration approach. All configuration is now contained within the SinkerConfig struct, making it easier to manage and test.
func NewFromViper ¶
func NewFromViper( cmd *cobra.Command, expectedOutputModuleType string, manifestPath string, outputModuleName string, userAgent string, zlog *zap.Logger, tracer logging.Tracer, ) (*Sinker, error)
NewFromViper constructs a new Sinker instance from a fixed set of "known" flags. This function creates a SinkerConfig from the Viper configuration and then uses it to create a new Sinker instance.
If you want to extract the sink output module's name directly from the Substreams package, if supported by your sink, instead of an actual name for paramater `outputModuleNameArg`, use `sink.InferOutputModuleFromPackage`.
The `expectedOutputModuleType` should be the fully qualified expected Protobuf package.
The `manifestPath` can be left empty in which case we this method is going to look in the current directory for a `substreams.yaml` file. If the `manifestPath` is non-empty and points to a directory, we will look for a `substreams.yaml` file in that directory.
func (*Sinker) ApiToken ¶
ApiToken returns the currently defined ApiToken sets on this sinker instance, "" is no api token was configured
func (*Sinker) BytesRepresentation ¶
func (s *Sinker) BytesRepresentation() BytesRepresentation
func (*Sinker) ClientConfig ¶
func (s *Sinker) ClientConfig() *client.SubstreamsClientConfig
ClientConfig returns the the `SubstreamsClientConfig`used by this sinker instance.
func (*Sinker) EndpointConfig ¶
EndpointConfig returns the endpoint configuration used by this sinker instance, this is an extraction of the endpoint configuration from the client configuration.
func (*Sinker) OutputModuleHash ¶
OutputModuleHash returns the module output hash, can be used by consumer to warn if the module changed between restart of the process.
func (*Sinker) OutputModuleName ¶
func (*Sinker) OutputModuleTypePrefixed ¶
OutputModuleTypePrefixed returns the prefixed output module's type so the type will always be prefixed with "proto:".
func (*Sinker) OutputModuleTypeUnprefixed ¶
OutputModuleTypeUnprefixed returns the unprefixed output module's type so the type will **never** be prefixed with "proto:".
func (*Sinker) Package ¶
func (s *Sinker) Package() *pbsubstreams.Package
func (*Sinker) PrintStats ¶
func (s *Sinker) PrintStats()
func (*Sinker) Request ¶
func (s *Sinker) Request() *pbsubstreamsrpc.Request
func (*Sinker) Run ¶
func (s *Sinker) Run(ctx context.Context, cursor *Cursor, handler SinkerHandler)
func (*Sinker) StartBlock ¶
type SinkerCompletionHandler ¶
type SinkerCompletionHandler interface {
// HandleBlockRangeCompletion is called when the sinker is done processing the requested range, only when
// the stream has correctly reached its end block. If the sinker is configured to stream live, this callback
// will never be called.
//
// If the sinker terminates with an error, this callback will not be called.
//
// The handler receives the following arguments:
// - `ctx` is the context runtime, your handler should be minimal, so normally you shouldn't use this.
// - `cursor` is the cursor at the given block, this cursor should be saved regularly as a checkpoint in case the process is interrupted.
HandleBlockRangeCompletion(ctx context.Context, cursor *Cursor) error
}
SinkerCompletionHandler defines an extra interface that can be implemented on top of `SinkerHandler` where the callback will be invoked when the sinker is done processing the requested range. This is useful to implement a checkpointing mechanism where when the range has correctly fully processed, you can do something meaningful.
type SinkerConfig ¶
type SinkerConfig struct {
// Substreams package configuration
Pkg *pbsubstreams.Package
OutputModule *pbsubstreams.Module
OutputModuleHash manifest.ModuleHash
// Client configuration
ClientConfig *client.SubstreamsClientConfig
// Operational configuration
Mode SubstreamsMode
NoopMode bool
LimitProcessedBlocks uint64
// Block processing configuration
StartBlock int64
StopBlock uint64
UndoBufferSize int
FinalBlocksOnly bool
// Dev-mode extras
DevOutputSnapshots []string
DevOutputModules []string // if this is empty, the request will contain the output module in here
// Retry and reliability configuration
InfiniteRetry bool
BackOff backoff.BackOff
// Liveness configuration
LiveBlockTimeDelta time.Duration
LivenessChecker LivenessChecker
// Additional configuration
ExtraHeaders []string
// Logging and tracing
Logger *zap.Logger
Tracer logging.Tracer
// Expose metrics
PrometheusAddr string // if non-null, will listen to this address
// Legacy fields for backward compatibility
Params []string
Network string
SkipPackageValidation bool
}
SinkerConfig contains all configuration needed to create and run a Sinker.
func ConfigFromViper ¶
func ConfigFromViper( cmd *cobra.Command, expectedOutputModuleType string, manifestPath string, outputModuleName string, userAgent string, zlog *zap.Logger, tracer logging.Tracer, ) (*SinkerConfig, error)
ConfigFromViper creates a SinkerConfig from the provided Viper configuration. This function extracts all necessary configuration from the command flags and creates a complete SinkerConfig that can be used to create a Sinker instance. The endpoint is read from the FlagEndpoint flag, and the block range is computed from the FlagStartBlock and FlagStopBlock flags.
func (*SinkerConfig) ExtractDefaultParams ¶
func (c *SinkerConfig) ExtractDefaultParams() []string
ExtractDefaultParams extracts default parameter values from the package modules that are not already specified in the existing parameters. This is useful for GUI applications that want to show default values from the package.
type SinkerErrorHandler ¶
type SinkerErrorHandler interface {
// HandleError defines the callback that will handle Substreams errors.
//
// The handler receives the following arguments:
// - `ctx` is the context runtime, your handler should be minimal, so normally you shouldn't use this.
// - `error` is simply the error received from the Substreams API.
//
// The [HandleError] is optional and can be nil.
HandleError(ctx context.Context, err error)
}
type SinkerHandler ¶
type SinkerHandler interface {
// HandleBlockScopedData defines the callback that will handle Substreams `BlockScopedData` messages.
//
// The handler receives the following arguments:
// - `ctx` is the context runtime, your handler should be minimal, so normally you shouldn't use this.
// - `data` contains the block scoped data that was received from the Substreams API, refer to it's definition for proper usage.
// - `isLive` will be non-nil if a [LivenessChecker] has been configured on the [Sinker] instance that call the handler.
// - `cursor` is the cursor at the given block, this cursor should be saved regularly as a checkpoint in case the process is interrupted.
//
// The [HandleBlockScopedData] must be non-nil, the [Sinker] enforces this.
//
// Your handler must return an error value that can be nil or non-nil. If non-nil, the error is assumed to be a fatal
// error and the [Sinker] will not retry it. If the error is retryable, wrap it in `derr.NewRetryableError(err)` to notify
// the [Sinker] that it should retry from last valid cursor. It's your responsibility to ensure no data was persisted prior the
// the error.
HandleBlockScopedData(ctx context.Context, data *pbsubstreamsrpc.BlockScopedData, isLive *bool, cursor *Cursor) error
// HandleBlockUndoSignal defines the callback that will handle Substreams `BlockUndoSignal` messages.
//
// The handler receives the following arguments:
// - `ctx` is the context runtime, your handler should be minimal, so normally you shouldn't use this.
// - `undoSignal` contains the last valid block that is still valid, any data saved after this last saved block should be discarded.
// - `cursor` is the cursor at the given block, this cursor should be saved regularly as a checkpoint in case the process is interrupted.
//
// The [HandleBlockUndoSignal] can be nil if the sinker is configured to stream final blocks only, otherwise it must be set,
// the [Sinker] enforces this.
//
// Your handler must return an error value that can be nil or non-nil. If non-nil, the error is assumed to be a fatal
// error and the [Sinker] will not retry it. If the error is retryable, wrap it in `derr.NewRetryableError(err)` to notify
// the [Sinker] that it should retry from last valid cursor. It's your responsibility to ensure no data was persisted prior the
// the error.
HandleBlockUndoSignal(ctx context.Context, undoSignal *pbsubstreamsrpc.BlockUndoSignal, cursor *Cursor) error
}
func NewSinkerFullHandlers ¶
func NewSinkerFullHandlers( handleBlockScopedData func(ctx context.Context, data *pbsubstreamsrpc.BlockScopedData, isLive *bool, cursor *Cursor) error, handleBlockUndoSignal func(ctx context.Context, undoSignal *pbsubstreamsrpc.BlockUndoSignal, cursor *Cursor) error, handleSessionInit func(ctx context.Context, req *pbsubstreamsrpc.Request, session *pbsubstreamsrpc.SessionInit) error, handleProgress func(ctx context.Context, progress *pbsubstreamsrpc.ModulesProgress), handleInitialSnapshotData func(ctx context.Context, debug *pbsubstreamsrpc.InitialSnapshotData) error, handleInitialSnapshotComplete func(ctx context.Context, complete *pbsubstreamsrpc.InitialSnapshotComplete) error, handleError func(ctx context.Context, error *pbsubstreamsrpc.Error), ) SinkerHandler
NewSinkerFullHandlers creates a SinkerHandler with extra interfaces for handling session initialization, progress and debug data
func NewSinkerHandlers ¶
func NewSinkerHandlers( handleBlockScopedData func(ctx context.Context, data *pbsubstreamsrpc.BlockScopedData, isLive *bool, cursor *Cursor) error, handleBlockUndoSignal func(ctx context.Context, undoSignal *pbsubstreamsrpc.BlockUndoSignal, cursor *Cursor) error, ) SinkerHandler
NewSinkerHandlers creates a new SinkerHandler with only the required handlers and optional handlers
type SinkerProgressHandler ¶
type SinkerProgressHandler interface {
// HandleProgress defines the callback that will handle Substreams `ModulesProgress` messages.
//
// The handler receives the following arguments:
// - `ctx` is the context runtime, your handler should be minimal, so normally you shouldn't use this.
// - `progress` contains the progress information that was received from the Substreams API.
//
// The [HandleProgress] is optional and can be nil.
HandleProgress(ctx context.Context, progress *pbsubstreamsrpc.ModulesProgress)
}
SinkerProgressHandler defines an extra interface that handles the Progress message.
type SinkerSessionInitHandler ¶
type SinkerSessionInitHandler interface {
// HandleSessionInit defines the callback that will handle Substreams `SessionInit` messages.
//
// The handler receives the following arguments:
// - `ctx` is the context runtime, your handler should be minimal, so normally you shouldn't use this.
// - `req` is the request that was sent to the Substreams API.
// - `sessionInit` contains the session initialization data that was received from the Substreams API.
//
// The [HandleSessionInit] is optional and can be nil.
//
// Your handler must return an error value that can be nil or non-nil. If non-nil, the error is assumed to be a fatal
// error and the [Sinker] will shutdown
HandleSessionInit(ctx context.Context, req *pbsubstreamsrpc.Request, sessionInit *pbsubstreamsrpc.SessionInit) error
}
type SinkerSnapshotHandler ¶
type SinkerSnapshotHandler interface {
// HandleInitialSnapshotData defines the callback that will handle Substreams `InitialSnapshotData` messages.
//
// The handler receives the following arguments:
// - `ctx` is the context runtime, your handler should be minimal, so normally you shouldn't use this.
// - `snapshotData` contains the initial snapshot data that was received from the Substreams API.
//
// The [HandleInitialSnapshotData] is optional and can be nil.
//
// Your handler must return an error value that can be nil or non-nil. If non-nil, the error is assumed to be a fatal
// error and the [Sinker] will not retry it. If the error is retryable, wrap it in `derr.NewRetryableError(err)` to notify
// the [Sinker] that it should retry from last valid cursor.
HandleInitialSnapshotData(ctx context.Context, snapshotData *pbsubstreamsrpc.InitialSnapshotData) error
// HandleInitialSnapshotComplete defines the callback that will handle Substreams `InitialSnapshotComplete` messages.
//
// The handler receives the following arguments:
// - `ctx` is the context runtime, your handler should be minimal, so normally you shouldn't use this.
// - `snapshotComplete` contains the initial snapshot completion information that was received from the Substreams API.
//
// The [HandleInitialSnapshotComplete] is optional and can be nil.
//
// Your handler must return an error value that can be nil or non-nil. If non-nil, the error is assumed to be a fatal
// error and the [Sinker] will not retry it. If the error is retryable, wrap it in `derr.NewRetryableError(err)` to notify
// the [Sinker] that it should retry from last valid cursor.
HandleInitialSnapshotComplete(ctx context.Context, snapshotComplete *pbsubstreamsrpc.InitialSnapshotComplete) error
}
type SubstreamsMode ¶
type SubstreamsMode uint
ENUM(
Development Production
)
const ( // SubstreamsModeDevelopment is a SubstreamsMode of type Development. SubstreamsModeDevelopment SubstreamsMode = iota // SubstreamsModeProduction is a SubstreamsMode of type Production. SubstreamsModeProduction )
func ParseSubstreamsMode ¶
func ParseSubstreamsMode(name string) (SubstreamsMode, error)
ParseSubstreamsMode attempts to convert a string to a SubstreamsMode
func (SubstreamsMode) MarshalText ¶
func (x SubstreamsMode) MarshalText() ([]byte, error)
MarshalText implements the text marshaller method
func (SubstreamsMode) String ¶
func (x SubstreamsMode) String() string
String implements the Stringer interface.
func (*SubstreamsMode) UnmarshalText ¶
func (x *SubstreamsMode) UnmarshalText(text []byte) error
UnmarshalText implements the text unmarshaller method