Documentation
¶
Index ¶
- Constants
- func GenReqID() uint64
- func GetErrorCategory(errorType StreamErrorType) string
- func GetErrorSeverity(errorType StreamErrorType) string
- func GetRetryDelay(errorType StreamErrorType) int
- func GetWriteErrorSeverity(err error) string
- func IsCriticalError(errorType StreamErrorType) bool
- func IsMessageError(err error) bool
- func IsRecoverableError(errorType StreamErrorType) bool
- func IsRemoteDeadError(errorType StreamErrorType) bool
- func IsStreamWriteError(err error) bool
- func RecordCriticalError(errorType StreamErrorType)
- func RecordRecoverableError(errorType StreamErrorType)
- func RecordStreamClosedByRecoverableErrors()
- func ShouldCloseStream(err error) bool
- func ShouldRetryConnection(errorType StreamErrorType) bool
- type BaseStream
- func (st *BaseStream) AddFailedTimes(faultRecoveryThreshold time.Duration)
- func (st *BaseStream) Close() error
- func (st *BaseStream) CloseOnExit() error
- func (st *BaseStream) Failures() int32
- func (st *BaseStream) GetProgressTracker() *ProgressTracker
- func (st *BaseStream) GetTimeoutConfig() *StreamTimeoutConfig
- func (st *BaseStream) ID() StreamID
- func (st *BaseStream) IsHealthy() bool
- func (st *BaseStream) IsTrusted() bool
- func (st *BaseStream) ProtoID() ProtoID
- func (st *BaseStream) ProtoSpec() (ProtoSpec, error)
- func (st *BaseStream) ReadBytes() (content []byte, err error)
- func (st *BaseStream) ReadBytesWithProgress(progressTracker *ProgressTracker) (content []byte, err error)
- func (st *BaseStream) ResetFailedTimes()
- func (st *BaseStream) SetProgressTracker(tracker *ProgressTracker)
- func (st *BaseStream) SetTimeoutConfig(config *StreamTimeoutConfig)
- func (st *BaseStream) WriteBytes(b []byte) (err error)
- type MessageError
- type ProgressTracker
- func (pt *ProgressTracker) GetHealthSummary() map[string]interface{}
- func (pt *ProgressTracker) GetProgressRate() float64
- func (pt *ProgressTracker) GetStats() (totalBytes int64, lastProgress time.Time)
- func (pt *ProgressTracker) IsHealthy() bool
- func (pt *ProgressTracker) IsTracking() bool
- func (pt *ProgressTracker) ResetTimeout()
- func (pt *ProgressTracker) ShouldTimeout() bool
- func (pt *ProgressTracker) StartTracking()
- func (pt *ProgressTracker) StopTracking()
- func (pt *ProgressTracker) UpdateProgress(newSize int)
- type ProtoID
- type ProtoSpec
- type Protocol
- type Request
- type Response
- type SafeMap
- func (m *SafeMap[K, V]) Clear()
- func (m *SafeMap[K, V]) Delete(key K)
- func (m *SafeMap[K, V]) Exists(key K) bool
- func (m *SafeMap[K, V]) Get(key K) (V, bool)
- func (m *SafeMap[K, V]) Iterate(f func(key K, value V))
- func (m *SafeMap[K, V]) Keys() []K
- func (m *SafeMap[K, V]) Length() int
- func (m *SafeMap[K, V]) Set(key K, value V)
- type Stream
- type StreamErrorType
- type StreamID
- type StreamTimeoutConfig
- type StreamWriteError
Constants ¶
const ( // ProtoIDCommonPrefix is the common prefix for stream protocol ProtoIDCommonPrefix = "harmony" // ProtoIDFormat is the format of stream protocol ID ProtoIDFormat = "%s/%s/%s/%d/%s" )
Variables ¶
This section is empty.
Functions ¶
func GetErrorCategory ¶
func GetErrorCategory(errorType StreamErrorType) string
GetErrorCategory returns a human-readable category for the error
func GetErrorSeverity ¶
func GetErrorSeverity(errorType StreamErrorType) string
GetErrorSeverity returns the severity level of an error type
func GetRetryDelay ¶
func GetRetryDelay(errorType StreamErrorType) int
GetRetryDelay returns the recommended retry delay for an error type
func GetWriteErrorSeverity ¶
GetWriteErrorSeverity returns the appropriate log severity for write errors
func IsCriticalError ¶
func IsCriticalError(errorType StreamErrorType) bool
IsCriticalError determines if an error is critical and should trigger immediate stream removal
func IsMessageError ¶
IsMessageError checks if the error is a MessageError
func IsRecoverableError ¶
func IsRecoverableError(errorType StreamErrorType) bool
IsRecoverableError determines if an error type is potentially recoverable
func IsRemoteDeadError ¶
func IsRemoteDeadError(errorType StreamErrorType) bool
IsRemoteDeadError determines if an error indicates the remote peer is dead/unreachable
func IsStreamWriteError ¶
IsStreamWriteError checks if the error is a StreamWriteError
func RecordCriticalError ¶
func RecordCriticalError(errorType StreamErrorType)
RecordCriticalError records a critical error metric
func RecordRecoverableError ¶
func RecordRecoverableError(errorType StreamErrorType)
RecordRecoverableError records a recoverable error metric
func RecordStreamClosedByRecoverableErrors ¶
func RecordStreamClosedByRecoverableErrors()
RecordStreamClosedByRecoverableErrors records when a stream is closed due to too many recoverable errors
func ShouldCloseStream ¶
ShouldCloseStream determines if an error should cause the stream to be closed
func ShouldRetryConnection ¶
func ShouldRetryConnection(errorType StreamErrorType) bool
ShouldRetryConnection determines if a connection should be retried based on error type
Types ¶
type BaseStream ¶
type BaseStream struct {
// contains filtered or unexported fields
}
BaseStream is the wrapper around
func NewBaseStream ¶
func NewBaseStream(st libp2p_network.Stream, trusted bool) *BaseStream
NewBaseStream creates BaseStream as the wrapper of libp2p Stream
func NewBaseStreamWithConfig ¶
func NewBaseStreamWithConfig(st libp2p_network.Stream, trusted bool, config *StreamTimeoutConfig) *BaseStream
NewBaseStreamWithConfig creates BaseStream with custom timeout configuration
func (*BaseStream) AddFailedTimes ¶
func (st *BaseStream) AddFailedTimes(faultRecoveryThreshold time.Duration)
func (*BaseStream) Close ¶
func (st *BaseStream) Close() error
Close reset the stream, and close the connection for both sides.
func (*BaseStream) CloseOnExit ¶
func (st *BaseStream) CloseOnExit() error
CloseOnExit resets the stream during the shutdown of the node
func (*BaseStream) Failures ¶
func (st *BaseStream) Failures() int32
func (*BaseStream) GetProgressTracker ¶
func (st *BaseStream) GetProgressTracker() *ProgressTracker
GetProgressTracker returns the progress tracker for this stream
func (*BaseStream) GetTimeoutConfig ¶
func (st *BaseStream) GetTimeoutConfig() *StreamTimeoutConfig
GetTimeoutConfig returns the timeout configuration for this stream
func (*BaseStream) IsHealthy ¶
func (st *BaseStream) IsHealthy() bool
func (*BaseStream) IsTrusted ¶
func (st *BaseStream) IsTrusted() bool
func (*BaseStream) ProtoID ¶
func (st *BaseStream) ProtoID() ProtoID
ProtoID return the remote protocol ID of the stream
func (*BaseStream) ProtoSpec ¶
func (st *BaseStream) ProtoSpec() (ProtoSpec, error)
ProtoSpec get the parsed protocol Specifier of the stream
func (*BaseStream) ReadBytes ¶
func (st *BaseStream) ReadBytes() (content []byte, err error)
ReadBytes reads bytes from the stream with blocking behavior. It will wait indefinitely for data unless: - The stream is explicitly closed - A network error occurs - The message size exceeds maxMsgBytes
func (*BaseStream) ReadBytesWithProgress ¶
func (st *BaseStream) ReadBytesWithProgress(progressTracker *ProgressTracker) (content []byte, err error)
ReadBytesWithProgress reads bytes from the stream with progress-based timeout. Progress tracking only starts after reading size prefix and stops when read completes/errors.
func (*BaseStream) ResetFailedTimes ¶
func (st *BaseStream) ResetFailedTimes()
func (*BaseStream) SetProgressTracker ¶
func (st *BaseStream) SetProgressTracker(tracker *ProgressTracker)
SetProgressTracker sets a custom progress tracker for this stream
func (*BaseStream) SetTimeoutConfig ¶
func (st *BaseStream) SetTimeoutConfig(config *StreamTimeoutConfig)
SetTimeoutConfig sets a custom timeout configuration for this stream
func (*BaseStream) WriteBytes ¶
func (st *BaseStream) WriteBytes(b []byte) (err error)
WriteBytes writes the bytes to the stream. First 4 bytes is used as the size bytes, and the rest is the content
type MessageError ¶
type MessageError struct {
Err error
}
MessageError represents an error related to message validation or parsing that should NOT cause the stream to be closed
func (*MessageError) Error ¶
func (e *MessageError) Error() string
func (*MessageError) Unwrap ¶
func (e *MessageError) Unwrap() error
type ProgressTracker ¶
type ProgressTracker struct {
// contains filtered or unexported fields
}
ProgressTracker monitors data transfer progress during content reading operations It implements a simple, focused approach that only tracks reading progress and provides timeout functionality when no progress is made
func NewProgressTracker ¶
func NewProgressTracker(timeoutDuration time.Duration, resetThreshold int64) *ProgressTracker
NewProgressTracker creates a new progress tracker with the given configuration
func (*ProgressTracker) GetHealthSummary ¶
func (pt *ProgressTracker) GetHealthSummary() map[string]interface{}
GetHealthSummary returns a simple health summary focused on reading progress
func (*ProgressTracker) GetProgressRate ¶
func (pt *ProgressTracker) GetProgressRate() float64
GetProgressRate calculates the current progress rate in bytes per second This is a simple calculation based on total bytes read and time since creation
func (*ProgressTracker) GetStats ¶
func (pt *ProgressTracker) GetStats() (totalBytes int64, lastProgress time.Time)
GetStats returns current progress statistics
func (*ProgressTracker) IsHealthy ¶
func (pt *ProgressTracker) IsHealthy() bool
IsHealthy checks if stream is healthy (only applies when tracking)
func (*ProgressTracker) IsTracking ¶
func (pt *ProgressTracker) IsTracking() bool
IsTracking returns whether progress tracking is currently active
func (*ProgressTracker) ResetTimeout ¶
func (pt *ProgressTracker) ResetTimeout()
ResetTimeout resets the progress timeout - called when progress is detected
func (*ProgressTracker) ShouldTimeout ¶
func (pt *ProgressTracker) ShouldTimeout() bool
ShouldTimeout checks if timeout should occur (only when tracking)
func (*ProgressTracker) StartTracking ¶
func (pt *ProgressTracker) StartTracking()
StartTracking begins progress tracking - call when starting to read content
func (*ProgressTracker) StopTracking ¶
func (pt *ProgressTracker) StopTracking()
StopTracking ends progress tracking - call when read completes or errors
func (*ProgressTracker) UpdateProgress ¶
func (pt *ProgressTracker) UpdateProgress(newSize int)
UpdateProgress updates the progress tracker with new data received during content reading
type ProtoID ¶
type ProtoID libp2p_proto.ID
ProtoID is the protocol id for streaming, an alias of libp2p stream protocol ID。 The stream protocol ID is composed of following components: ex: harmony/sync/partner/0/1.0.0/1 1. Service - Currently, only sync service is supported. 2. NetworkType - mainnet, testnet, stn, e.t.c. 3. ShardID - shard ID of the current protocol. 4. Version - Stream protocol version for backward compatibility. 5. IsBeaconValidator - whether stream is from a beacon chain node or shard chain node
type ProtoSpec ¶
type ProtoSpec struct {
Service string
NetworkType nodeconfig.NetworkType
ShardID nodeconfig.ShardID
Version *version.Version
}
ProtoSpec is the un-serialized stream proto id specification TODO: move this to service wise module since different protocol might have different protoID information
func ProtoIDToProtoSpec ¶
ProtoIDToProtoSpec converts a ProtoID to ProtoSpec
type Protocol ¶
type Protocol interface {
p2ptypes.LifeCycle
Version() *version.Version
ProtoID() ProtoID
ServiceID() string
IsBeaconValidator() bool
Match(id protocol.ID) bool
HandleStream(st libp2p_network.Stream, trusted bool)
}
Protocol is the interface of protocol to be registered to libp2p.
type Request ¶
type Request interface {
ReqID() uint64
SetReqID(rid uint64)
String() string
IsSupportedByProto(ProtoSpec) bool
Encode() ([]byte, error)
}
Request is the interface of a stream request used for common stream utils.
type SafeMap ¶
type SafeMap[K comparable, V any] struct { // contains filtered or unexported fields }
SafeMap is a thread-safe map with its own lock for reading and writing.
func NewSafeMap ¶
func NewSafeMap[K comparable, V any]() *SafeMap[K, V]
NewSafeMap initializes and returns a new SafeMap.
func NewSafeMapWithInitialValues ¶
func NewSafeMapWithInitialValues[K comparable, V any](initialValues map[K]V) *SafeMap[K, V]
NewSafeMapWithInitialValues creates a new SafeMap with optional initial values.
func (*SafeMap[K, V]) Clear ¶
func (m *SafeMap[K, V]) Clear()
Clear removes all key-value pairs from the map.
func (*SafeMap[K, V]) Delete ¶
func (m *SafeMap[K, V]) Delete(key K)
Delete removes a key-value pair from the map.
func (*SafeMap[K, V]) Get ¶
Get retrieves the value for a given key. It returns the value and a boolean indicating if the key exists.
func (*SafeMap[K, V]) Keys ¶
func (m *SafeMap[K, V]) Keys() []K
Keys returns a slice of all keys in the map.
type Stream ¶
type Stream interface {
ID() StreamID
ProtoID() ProtoID
ProtoSpec() (ProtoSpec, error)
IsTrusted() bool
WriteBytes([]byte) error
ReadBytes() ([]byte, error)
Close(reason string, criticalErr bool) error
CloseOnExit() error
Failures() int32
AddFailedTimes(faultRecoveryThreshold time.Duration)
ResetFailedTimes()
GetProgressTracker() *ProgressTracker
}
Stream is the interface for streams implemented in each service. The stream interface is used for stream management as well as rate limiters
type StreamErrorType ¶
type StreamErrorType int
StreamErrorType represents different types of stream errors
const ( ErrorTypeNoError StreamErrorType = iota ErrorTypeRemoteDisconnect ErrorTypeLocalNetwork ErrorTypeTimeout ErrorTypeProgressTimeout ErrorTypeReadDeadline ErrorTypeWriteDeadline ErrorTypeResourceExhaustion ErrorTypeProtocol ErrorTypeConnectionReset ErrorTypeBrokenPipe ErrorTypeUnknown )
func ClassifyStreamError ¶
func ClassifyStreamError(err error) (StreamErrorType, string)
ClassifyStreamError classifies stream errors to help with better error handling It uses type assertions first for reliability, then falls back to string matching
func (StreamErrorType) String ¶
func (e StreamErrorType) String() string
String returns the string representation of StreamErrorType
type StreamID ¶
type StreamID string
StreamID is the unique identifier for the stream. It has the value of libp2p_network_peer.ID
type StreamTimeoutConfig ¶
type StreamTimeoutConfig struct {
// ProgressTimeout is the maximum time without progress before timeout
ProgressTimeout time.Duration
// ProgressThreshold is the minimum bytes to consider as progress
ProgressThreshold int64
// HealthCheckInterval is how often to check stream health
HealthCheckInterval time.Duration
// ChunkReadTimeout is the timeout for individual chunk reads
ChunkReadTimeout time.Duration
// ChunkSize is the size of chunks to read at once (should align with ProgressThreshold)
ChunkSize int64
}
StreamTimeoutConfig holds configuration for progress-based timeouts
func DefaultStreamTimeoutConfig ¶
func DefaultStreamTimeoutConfig() *StreamTimeoutConfig
DefaultStreamTimeoutConfig returns the default timeout configuration
func NewStreamTimeoutConfig ¶
func NewStreamTimeoutConfig( progressTimeout time.Duration, progressThreshold int64, healthCheckInterval time.Duration, chunkReadTimeout time.Duration, chunkSize int64, ) *StreamTimeoutConfig
NewStreamTimeoutConfig creates a new timeout configuration with custom values
func (*StreamTimeoutConfig) Validate ¶
func (c *StreamTimeoutConfig) Validate() error
Validate checks if the configuration is valid and provides warnings for misalignments
type StreamWriteError ¶
type StreamWriteError struct {
Err error
}
StreamWriteError represents an error that occurred during actual stream writing and should cause the stream to be closed
func (*StreamWriteError) Error ¶
func (e *StreamWriteError) Error() string
func (*StreamWriteError) Unwrap ¶
func (e *StreamWriteError) Unwrap() error