sttypes

package
v1.10.3-0...-638a6e7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 3, 2026 License: LGPL-3.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// ProtoIDCommonPrefix is the common prefix for stream protocol
	ProtoIDCommonPrefix = "harmony"

	// ProtoIDFormat is the format of stream protocol ID
	ProtoIDFormat = "%s/%s/%s/%d/%s"
)

Variables

This section is empty.

Functions

func GenReqID

func GenReqID() uint64

GenReqID generates a random ReqID

func GetErrorCategory

func GetErrorCategory(errorType StreamErrorType) string

GetErrorCategory returns a human-readable category for the error

func GetErrorSeverity

func GetErrorSeverity(errorType StreamErrorType) string

GetErrorSeverity returns the severity level of an error type

func GetRetryDelay

func GetRetryDelay(errorType StreamErrorType) int

GetRetryDelay returns the recommended retry delay for an error type

func GetWriteErrorSeverity

func GetWriteErrorSeverity(err error) string

GetWriteErrorSeverity returns the appropriate log severity for write errors

func IsCriticalError

func IsCriticalError(errorType StreamErrorType) bool

IsCriticalError determines if an error is critical and should trigger immediate stream removal

func IsMessageError

func IsMessageError(err error) bool

IsMessageError checks if the error is a MessageError

func IsRecoverableError

func IsRecoverableError(errorType StreamErrorType) bool

IsRecoverableError determines if an error type is potentially recoverable

func IsRemoteDeadError

func IsRemoteDeadError(errorType StreamErrorType) bool

IsRemoteDeadError determines if an error indicates the remote peer is dead/unreachable

func IsStreamWriteError

func IsStreamWriteError(err error) bool

IsStreamWriteError checks if the error is a StreamWriteError

func RecordCriticalError

func RecordCriticalError(errorType StreamErrorType)

RecordCriticalError records a critical error metric

func RecordRecoverableError

func RecordRecoverableError(errorType StreamErrorType)

RecordRecoverableError records a recoverable error metric

func RecordStreamClosedByRecoverableErrors

func RecordStreamClosedByRecoverableErrors()

RecordStreamClosedByRecoverableErrors records when a stream is closed due to too many recoverable errors

func ShouldCloseStream

func ShouldCloseStream(err error) bool

ShouldCloseStream determines if an error should cause the stream to be closed

func ShouldRetryConnection

func ShouldRetryConnection(errorType StreamErrorType) bool

ShouldRetryConnection determines if a connection should be retried based on error type

Types

type BaseStream

type BaseStream struct {
	// contains filtered or unexported fields
}

BaseStream is the wrapper around

func NewBaseStream

func NewBaseStream(st libp2p_network.Stream, trusted bool) *BaseStream

NewBaseStream creates BaseStream as the wrapper of libp2p Stream

func NewBaseStreamWithConfig

func NewBaseStreamWithConfig(st libp2p_network.Stream, trusted bool, config *StreamTimeoutConfig) *BaseStream

NewBaseStreamWithConfig creates BaseStream with custom timeout configuration

func (*BaseStream) AddFailedTimes

func (st *BaseStream) AddFailedTimes(faultRecoveryThreshold time.Duration)

func (*BaseStream) Close

func (st *BaseStream) Close() error

Close reset the stream, and close the connection for both sides.

func (*BaseStream) CloseOnExit

func (st *BaseStream) CloseOnExit() error

CloseOnExit resets the stream during the shutdown of the node

func (*BaseStream) Failures

func (st *BaseStream) Failures() int32

func (*BaseStream) GetProgressTracker

func (st *BaseStream) GetProgressTracker() *ProgressTracker

GetProgressTracker returns the progress tracker for this stream

func (*BaseStream) GetTimeoutConfig

func (st *BaseStream) GetTimeoutConfig() *StreamTimeoutConfig

GetTimeoutConfig returns the timeout configuration for this stream

func (*BaseStream) ID

func (st *BaseStream) ID() StreamID

ID return the StreamID of the stream

func (*BaseStream) IsHealthy

func (st *BaseStream) IsHealthy() bool

func (*BaseStream) IsTrusted

func (st *BaseStream) IsTrusted() bool

func (*BaseStream) ProtoID

func (st *BaseStream) ProtoID() ProtoID

ProtoID return the remote protocol ID of the stream

func (*BaseStream) ProtoSpec

func (st *BaseStream) ProtoSpec() (ProtoSpec, error)

ProtoSpec get the parsed protocol Specifier of the stream

func (*BaseStream) ReadBytes

func (st *BaseStream) ReadBytes() (content []byte, err error)

ReadBytes reads bytes from the stream with blocking behavior. It will wait indefinitely for data unless: - The stream is explicitly closed - A network error occurs - The message size exceeds maxMsgBytes

func (*BaseStream) ReadBytesWithProgress

func (st *BaseStream) ReadBytesWithProgress(progressTracker *ProgressTracker) (content []byte, err error)

ReadBytesWithProgress reads bytes from the stream with progress-based timeout. Progress tracking only starts after reading size prefix and stops when read completes/errors.

func (*BaseStream) ResetFailedTimes

func (st *BaseStream) ResetFailedTimes()

func (*BaseStream) SetProgressTracker

func (st *BaseStream) SetProgressTracker(tracker *ProgressTracker)

SetProgressTracker sets a custom progress tracker for this stream

func (*BaseStream) SetTimeoutConfig

func (st *BaseStream) SetTimeoutConfig(config *StreamTimeoutConfig)

SetTimeoutConfig sets a custom timeout configuration for this stream

func (*BaseStream) WriteBytes

func (st *BaseStream) WriteBytes(b []byte) (err error)

WriteBytes writes the bytes to the stream. First 4 bytes is used as the size bytes, and the rest is the content

type MessageError

type MessageError struct {
	Err error
}

MessageError represents an error related to message validation or parsing that should NOT cause the stream to be closed

func (*MessageError) Error

func (e *MessageError) Error() string

func (*MessageError) Unwrap

func (e *MessageError) Unwrap() error

type ProgressTracker

type ProgressTracker struct {
	// contains filtered or unexported fields
}

ProgressTracker monitors data transfer progress during content reading operations It implements a simple, focused approach that only tracks reading progress and provides timeout functionality when no progress is made

func NewProgressTracker

func NewProgressTracker(timeoutDuration time.Duration, resetThreshold int64) *ProgressTracker

NewProgressTracker creates a new progress tracker with the given configuration

func (*ProgressTracker) GetHealthSummary

func (pt *ProgressTracker) GetHealthSummary() map[string]interface{}

GetHealthSummary returns a simple health summary focused on reading progress

func (*ProgressTracker) GetProgressRate

func (pt *ProgressTracker) GetProgressRate() float64

GetProgressRate calculates the current progress rate in bytes per second This is a simple calculation based on total bytes read and time since creation

func (*ProgressTracker) GetStats

func (pt *ProgressTracker) GetStats() (totalBytes int64, lastProgress time.Time)

GetStats returns current progress statistics

func (*ProgressTracker) IsHealthy

func (pt *ProgressTracker) IsHealthy() bool

IsHealthy checks if stream is healthy (only applies when tracking)

func (*ProgressTracker) IsTracking

func (pt *ProgressTracker) IsTracking() bool

IsTracking returns whether progress tracking is currently active

func (*ProgressTracker) ResetTimeout

func (pt *ProgressTracker) ResetTimeout()

ResetTimeout resets the progress timeout - called when progress is detected

func (*ProgressTracker) ShouldTimeout

func (pt *ProgressTracker) ShouldTimeout() bool

ShouldTimeout checks if timeout should occur (only when tracking)

func (*ProgressTracker) StartTracking

func (pt *ProgressTracker) StartTracking()

StartTracking begins progress tracking - call when starting to read content

func (*ProgressTracker) StopTracking

func (pt *ProgressTracker) StopTracking()

StopTracking ends progress tracking - call when read completes or errors

func (*ProgressTracker) UpdateProgress

func (pt *ProgressTracker) UpdateProgress(newSize int)

UpdateProgress updates the progress tracker with new data received during content reading

type ProtoID

type ProtoID libp2p_proto.ID

ProtoID is the protocol id for streaming, an alias of libp2p stream protocol ID。 The stream protocol ID is composed of following components: ex: harmony/sync/partner/0/1.0.0/1 1. Service - Currently, only sync service is supported. 2. NetworkType - mainnet, testnet, stn, e.t.c. 3. ShardID - shard ID of the current protocol. 4. Version - Stream protocol version for backward compatibility. 5. IsBeaconValidator - whether stream is from a beacon chain node or shard chain node

type ProtoSpec

type ProtoSpec struct {
	Service     string
	NetworkType nodeconfig.NetworkType
	ShardID     nodeconfig.ShardID
	Version     *version.Version
}

ProtoSpec is the un-serialized stream proto id specification TODO: move this to service wise module since different protocol might have different protoID information

func ProtoIDToProtoSpec

func ProtoIDToProtoSpec(id ProtoID) (ProtoSpec, error)

ProtoIDToProtoSpec converts a ProtoID to ProtoSpec

func (ProtoSpec) ToProtoID

func (spec ProtoSpec) ToProtoID() ProtoID

ToProtoID convert a ProtoSpec to ProtoID.

type Protocol

type Protocol interface {
	p2ptypes.LifeCycle

	Version() *version.Version
	ProtoID() ProtoID
	ServiceID() string
	IsBeaconValidator() bool
	Match(id protocol.ID) bool
	HandleStream(st libp2p_network.Stream, trusted bool)
}

Protocol is the interface of protocol to be registered to libp2p.

type Request

type Request interface {
	ReqID() uint64
	SetReqID(rid uint64)
	String() string
	IsSupportedByProto(ProtoSpec) bool
	Encode() ([]byte, error)
}

Request is the interface of a stream request used for common stream utils.

type Response

type Response interface {
	ReqID() uint64
	String() string
}

Response is the interface of a stream response used for common stream utils

type SafeMap

type SafeMap[K comparable, V any] struct {
	// contains filtered or unexported fields
}

SafeMap is a thread-safe map with its own lock for reading and writing.

func NewSafeMap

func NewSafeMap[K comparable, V any]() *SafeMap[K, V]

NewSafeMap initializes and returns a new SafeMap.

func NewSafeMapWithInitialValues

func NewSafeMapWithInitialValues[K comparable, V any](initialValues map[K]V) *SafeMap[K, V]

NewSafeMapWithInitialValues creates a new SafeMap with optional initial values.

func (*SafeMap[K, V]) Clear

func (m *SafeMap[K, V]) Clear()

Clear removes all key-value pairs from the map.

func (*SafeMap[K, V]) Delete

func (m *SafeMap[K, V]) Delete(key K)

Delete removes a key-value pair from the map.

func (*SafeMap[K, V]) Exists

func (m *SafeMap[K, V]) Exists(key K) bool

Exists checks if a key exists in the map.

func (*SafeMap[K, V]) Get

func (m *SafeMap[K, V]) Get(key K) (V, bool)

Get retrieves the value for a given key. It returns the value and a boolean indicating if the key exists.

func (*SafeMap[K, V]) Iterate

func (m *SafeMap[K, V]) Iterate(f func(key K, value V))

func (*SafeMap[K, V]) Keys

func (m *SafeMap[K, V]) Keys() []K

Keys returns a slice of all keys in the map.

func (*SafeMap[K, V]) Length

func (m *SafeMap[K, V]) Length() int

Length returns the number of key-value pairs in the map.

func (*SafeMap[K, V]) Set

func (m *SafeMap[K, V]) Set(key K, value V)

Set inserts or updates a key-value pair in the map.

type Stream

type Stream interface {
	ID() StreamID
	ProtoID() ProtoID
	ProtoSpec() (ProtoSpec, error)
	IsTrusted() bool
	WriteBytes([]byte) error
	ReadBytes() ([]byte, error)
	Close(reason string, criticalErr bool) error
	CloseOnExit() error
	Failures() int32
	AddFailedTimes(faultRecoveryThreshold time.Duration)
	ResetFailedTimes()
	GetProgressTracker() *ProgressTracker
}

Stream is the interface for streams implemented in each service. The stream interface is used for stream management as well as rate limiters

type StreamErrorType

type StreamErrorType int

StreamErrorType represents different types of stream errors

const (
	ErrorTypeNoError StreamErrorType = iota
	ErrorTypeRemoteDisconnect
	ErrorTypeLocalNetwork
	ErrorTypeTimeout
	ErrorTypeProgressTimeout
	ErrorTypeReadDeadline
	ErrorTypeWriteDeadline
	ErrorTypeResourceExhaustion
	ErrorTypeProtocol
	ErrorTypeConnectionReset
	ErrorTypeBrokenPipe
	ErrorTypeUnknown
)

func ClassifyStreamError

func ClassifyStreamError(err error) (StreamErrorType, string)

ClassifyStreamError classifies stream errors to help with better error handling It uses type assertions first for reliability, then falls back to string matching

func (StreamErrorType) String

func (e StreamErrorType) String() string

String returns the string representation of StreamErrorType

type StreamID

type StreamID string

StreamID is the unique identifier for the stream. It has the value of libp2p_network_peer.ID

type StreamTimeoutConfig

type StreamTimeoutConfig struct {
	// ProgressTimeout is the maximum time without progress before timeout
	ProgressTimeout time.Duration
	// ProgressThreshold is the minimum bytes to consider as progress
	ProgressThreshold int64
	// HealthCheckInterval is how often to check stream health
	HealthCheckInterval time.Duration
	// ChunkReadTimeout is the timeout for individual chunk reads
	ChunkReadTimeout time.Duration
	// ChunkSize is the size of chunks to read at once (should align with ProgressThreshold)
	ChunkSize int64
}

StreamTimeoutConfig holds configuration for progress-based timeouts

func DefaultStreamTimeoutConfig

func DefaultStreamTimeoutConfig() *StreamTimeoutConfig

DefaultStreamTimeoutConfig returns the default timeout configuration

func NewStreamTimeoutConfig

func NewStreamTimeoutConfig(
	progressTimeout time.Duration,
	progressThreshold int64,
	healthCheckInterval time.Duration,
	chunkReadTimeout time.Duration,
	chunkSize int64,
) *StreamTimeoutConfig

NewStreamTimeoutConfig creates a new timeout configuration with custom values

func (*StreamTimeoutConfig) Validate

func (c *StreamTimeoutConfig) Validate() error

Validate checks if the configuration is valid and provides warnings for misalignments

type StreamWriteError

type StreamWriteError struct {
	Err error
}

StreamWriteError represents an error that occurred during actual stream writing and should cause the stream to be closed

func (*StreamWriteError) Error

func (e *StreamWriteError) Error() string

func (*StreamWriteError) Unwrap

func (e *StreamWriteError) Unwrap() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL