backedpipe

package
v2.28.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 12, 2025 License: AGPL-3.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Default buffer capacity used by the writer - 64MB
	DefaultBufferSize = 64 * 1024 * 1024
)

Variables

View Source
var (
	ErrPipeClosed             = xerrors.New("pipe is closed")
	ErrPipeAlreadyConnected   = xerrors.New("pipe is already connected")
	ErrReconnectionInProgress = xerrors.New("reconnection already in progress")
	ErrReconnectFailed        = xerrors.New("reconnect failed")
	ErrInvalidSequenceNumber  = xerrors.New("remote sequence number exceeds local sequence")
	ErrReconnectWriterFailed  = xerrors.New("reconnect writer failed")
)
View Source
var (
	ErrWriterClosed          = xerrors.New("cannot reconnect closed writer")
	ErrNilWriter             = xerrors.New("new writer cannot be nil")
	ErrFutureSequence        = xerrors.New("cannot replay from future sequence")
	ErrReplayDataUnavailable = xerrors.New("failed to read replay data")
	ErrReplayFailed          = xerrors.New("replay failed")
	ErrPartialReplay         = xerrors.New("partial replay")
)

Functions

This section is empty.

Types

type BackedPipe

type BackedPipe struct {
	// contains filtered or unexported fields
}

BackedPipe provides a reliable bidirectional byte stream over unreliable network connections. It orchestrates a BackedReader and BackedWriter to provide transparent reconnection and data replay capabilities.

func NewBackedPipe

func NewBackedPipe(ctx context.Context, reconnector Reconnector) *BackedPipe

NewBackedPipe creates a new BackedPipe with default options and the specified reconnector. The pipe starts disconnected and must be connected using Connect().

func (*BackedPipe) Close

func (bp *BackedPipe) Close() error

Close closes the pipe and all underlying connections.

func (*BackedPipe) Connect

func (bp *BackedPipe) Connect() error

Connect establishes the initial connection using the reconnect function.

func (*BackedPipe) Connected

func (bp *BackedPipe) Connected() bool

Connected returns whether the pipe is currently connected.

func (*BackedPipe) ForceReconnect

func (bp *BackedPipe) ForceReconnect() error

ForceReconnect forces a reconnection attempt immediately. This can be used to force a reconnection if a new connection is established. It prevents duplicate reconnections when called concurrently.

func (*BackedPipe) Read

func (bp *BackedPipe) Read(p []byte) (int, error)

Read implements io.Reader by delegating to the BackedReader.

func (*BackedPipe) Write

func (bp *BackedPipe) Write(p []byte) (int, error)

Write implements io.Writer by delegating to the BackedWriter.

type BackedReader

type BackedReader struct {
	// contains filtered or unexported fields
}

BackedReader wraps an unreliable io.Reader and makes it resilient to disconnections. It tracks sequence numbers for all bytes read and can handle reconnection, blocking reads when disconnected instead of erroring.

func NewBackedReader

func NewBackedReader(errorEventChan chan<- ErrorEvent) *BackedReader

NewBackedReader creates a new BackedReader with generation-aware error reporting. The reader is initially disconnected and must be connected using Reconnect before reads will succeed. The errorEventChan will receive ErrorEvent structs containing error details, component info, and connection generation.

func (*BackedReader) Close

func (br *BackedReader) Close() error

Close the reader and wake up any blocked reads. After closing, all Read calls will return io.EOF.

func (*BackedReader) Connected

func (br *BackedReader) Connected() bool

Connected returns whether the reader is currently connected.

func (*BackedReader) Read

func (br *BackedReader) Read(p []byte) (int, error)

Read implements io.Reader. It blocks when disconnected until either: 1. A reconnection is established 2. The reader is closed

When connected, it reads from the underlying reader and updates sequence numbers. Connection failures are automatically detected and reported to the higher layer via callback.

func (*BackedReader) Reconnect

func (br *BackedReader) Reconnect(seqNum chan<- uint64, newR <-chan io.Reader)

Reconnect coordinates reconnection using channels for better synchronization. The seqNum channel is used to send the current sequence number to the caller. The newR channel is used to receive the new reader from the caller. This allows for better coordination during the reconnection process.

func (*BackedReader) SequenceNum

func (br *BackedReader) SequenceNum() uint64

SequenceNum returns the current sequence number (total bytes read).

func (*BackedReader) SetGeneration

func (br *BackedReader) SetGeneration(generation uint64)

SetGeneration sets the current connection generation for error reporting.

type BackedWriter

type BackedWriter struct {
	// contains filtered or unexported fields
}

BackedWriter wraps an unreliable io.Writer and makes it resilient to disconnections. It maintains a ring buffer of recent writes for replay during reconnection.

func NewBackedWriter

func NewBackedWriter(capacity int, errorEventChan chan<- ErrorEvent) *BackedWriter

NewBackedWriter creates a new BackedWriter with generation-aware error reporting. The writer is initially disconnected and will block writes until connected. The errorEventChan will receive ErrorEvent structs containing error details, component info, and connection generation. Capacity must be > 0.

func (*BackedWriter) Close

func (bw *BackedWriter) Close() error

Close closes the writer and prevents further writes. After closing, all Write calls will return os.ErrClosed. This code keeps the Close() signature consistent with io.Closer, but it never actually returns an error.

IMPORTANT: You must close the current underlying writer, if any, before calling this method. Otherwise, if a Write operation is currently blocked in the underlying writer's Write method, this method will deadlock waiting for the mutex that Write holds.

func (*BackedWriter) Connected

func (bw *BackedWriter) Connected() bool

Connected returns whether the writer is currently connected.

func (*BackedWriter) Reconnect

func (bw *BackedWriter) Reconnect(replayFromSeq uint64, newWriter io.Writer) error

Reconnect replaces the current writer with a new one and replays data from the specified sequence number. If the requested sequence number is no longer in the buffer, returns an error indicating data loss.

IMPORTANT: You must close the current writer, if any, before calling this method. Otherwise, if a Write operation is currently blocked in the underlying writer's Write method, this method will deadlock waiting for the mutex that Write holds.

func (*BackedWriter) SequenceNum

func (bw *BackedWriter) SequenceNum() uint64

SequenceNum returns the current sequence number (total bytes written).

func (*BackedWriter) SetGeneration

func (bw *BackedWriter) SetGeneration(generation uint64)

SetGeneration sets the current connection generation for error reporting.

func (*BackedWriter) Write

func (bw *BackedWriter) Write(p []byte) (int, error)

Write implements io.Writer. When connected, it writes to both the ring buffer (to preserve data in case we need to replay it) and the underlying writer. If the underlying write fails, the writer is marked as disconnected and the write blocks until reconnection occurs.

type ErrorEvent

type ErrorEvent struct {
	Err        error
	Component  string // "reader" or "writer"
	Generation uint64 // connection generation when error occurred
}

ErrorEvent represents an error from a reader or writer with connection generation info.

type Reconnector

type Reconnector interface {
	Reconnect(ctx context.Context, readerSeqNum uint64) (conn io.ReadWriteCloser, remoteReaderSeqNum uint64, err error)
}

Reconnector is an interface for establishing connections when the BackedPipe needs to reconnect. Implementations should: 1. Establish a new connection to the remote side 2. Exchange sequence numbers with the remote side 3. Return the new connection and the remote's reader sequence number

The readerSeqNum parameter is the local reader's current sequence number (total bytes successfully read from the remote). This must be sent to the remote so it can replay its data to us starting from this number.

The returned remoteReaderSeqNum should be the remote side's reader sequence number (how many bytes of our outbound data it has successfully read). This informs our writer where to resume (i.e., which bytes to replay to the remote).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL