Documentation
¶
Index ¶
- Constants
- Variables
- type BackedPipe
- type BackedReader
- func (br *BackedReader) Close() error
- func (br *BackedReader) Connected() bool
- func (br *BackedReader) Read(p []byte) (int, error)
- func (br *BackedReader) Reconnect(seqNum chan<- uint64, newR <-chan io.Reader)
- func (br *BackedReader) SequenceNum() uint64
- func (br *BackedReader) SetGeneration(generation uint64)
- type BackedWriter
- func (bw *BackedWriter) Close() error
- func (bw *BackedWriter) Connected() bool
- func (bw *BackedWriter) Reconnect(replayFromSeq uint64, newWriter io.Writer) error
- func (bw *BackedWriter) SequenceNum() uint64
- func (bw *BackedWriter) SetGeneration(generation uint64)
- func (bw *BackedWriter) Write(p []byte) (int, error)
- type ErrorEvent
- type Reconnector
Constants ¶
const (
// Default buffer capacity used by the writer - 64MB
DefaultBufferSize = 64 * 1024 * 1024
)
Variables ¶
var ( ErrPipeClosed = xerrors.New("pipe is closed") ErrPipeAlreadyConnected = xerrors.New("pipe is already connected") ErrReconnectionInProgress = xerrors.New("reconnection already in progress") ErrReconnectFailed = xerrors.New("reconnect failed") ErrInvalidSequenceNumber = xerrors.New("remote sequence number exceeds local sequence") ErrReconnectWriterFailed = xerrors.New("reconnect writer failed") )
var ( ErrWriterClosed = xerrors.New("cannot reconnect closed writer") ErrNilWriter = xerrors.New("new writer cannot be nil") ErrFutureSequence = xerrors.New("cannot replay from future sequence") ErrReplayFailed = xerrors.New("replay failed") ErrPartialReplay = xerrors.New("partial replay") )
Functions ¶
This section is empty.
Types ¶
type BackedPipe ¶
type BackedPipe struct {
// contains filtered or unexported fields
}
BackedPipe provides a reliable bidirectional byte stream over unreliable network connections. It orchestrates a BackedReader and BackedWriter to provide transparent reconnection and data replay capabilities.
func NewBackedPipe ¶
func NewBackedPipe(ctx context.Context, reconnector Reconnector) *BackedPipe
NewBackedPipe creates a new BackedPipe with default options and the specified reconnector. The pipe starts disconnected and must be connected using Connect().
func (*BackedPipe) Close ¶
func (bp *BackedPipe) Close() error
Close closes the pipe and all underlying connections.
func (*BackedPipe) Connect ¶
func (bp *BackedPipe) Connect() error
Connect establishes the initial connection using the reconnect function.
func (*BackedPipe) Connected ¶
func (bp *BackedPipe) Connected() bool
Connected returns whether the pipe is currently connected.
func (*BackedPipe) ForceReconnect ¶
func (bp *BackedPipe) ForceReconnect() error
ForceReconnect forces a reconnection attempt immediately. This can be used to force a reconnection if a new connection is established. It prevents duplicate reconnections when called concurrently.
type BackedReader ¶
type BackedReader struct {
// contains filtered or unexported fields
}
BackedReader wraps an unreliable io.Reader and makes it resilient to disconnections. It tracks sequence numbers for all bytes read and can handle reconnection, blocking reads when disconnected instead of erroring.
func NewBackedReader ¶
func NewBackedReader(errorEventChan chan<- ErrorEvent) *BackedReader
NewBackedReader creates a new BackedReader with generation-aware error reporting. The reader is initially disconnected and must be connected using Reconnect before reads will succeed. The errorEventChan will receive ErrorEvent structs containing error details, component info, and connection generation.
func (*BackedReader) Close ¶
func (br *BackedReader) Close() error
Close the reader and wake up any blocked reads. After closing, all Read calls will return io.EOF.
func (*BackedReader) Connected ¶
func (br *BackedReader) Connected() bool
Connected returns whether the reader is currently connected.
func (*BackedReader) Read ¶
func (br *BackedReader) Read(p []byte) (int, error)
Read implements io.Reader. It blocks when disconnected until either: 1. A reconnection is established 2. The reader is closed
When connected, it reads from the underlying reader and updates sequence numbers. Connection failures are automatically detected and reported to the higher layer via callback.
func (*BackedReader) Reconnect ¶
func (br *BackedReader) Reconnect(seqNum chan<- uint64, newR <-chan io.Reader)
Reconnect coordinates reconnection using channels for better synchronization. The seqNum channel is used to send the current sequence number to the caller. The newR channel is used to receive the new reader from the caller. This allows for better coordination during the reconnection process.
func (*BackedReader) SequenceNum ¶
func (br *BackedReader) SequenceNum() uint64
SequenceNum returns the current sequence number (total bytes read).
func (*BackedReader) SetGeneration ¶
func (br *BackedReader) SetGeneration(generation uint64)
SetGeneration sets the current connection generation for error reporting.
type BackedWriter ¶
type BackedWriter struct {
// contains filtered or unexported fields
}
BackedWriter wraps an unreliable io.Writer and makes it resilient to disconnections. It maintains a ring buffer of recent writes for replay during reconnection.
func NewBackedWriter ¶
func NewBackedWriter(capacity int, errorEventChan chan<- ErrorEvent) *BackedWriter
NewBackedWriter creates a new BackedWriter with generation-aware error reporting. The writer is initially disconnected and will block writes until connected. The errorEventChan will receive ErrorEvent structs containing error details, component info, and connection generation. Capacity must be > 0.
func (*BackedWriter) Close ¶
func (bw *BackedWriter) Close() error
Close closes the writer and prevents further writes. After closing, all Write calls will return os.ErrClosed. This code keeps the Close() signature consistent with io.Closer, but it never actually returns an error.
IMPORTANT: You must close the current underlying writer, if any, before calling this method. Otherwise, if a Write operation is currently blocked in the underlying writer's Write method, this method will deadlock waiting for the mutex that Write holds.
func (*BackedWriter) Connected ¶
func (bw *BackedWriter) Connected() bool
Connected returns whether the writer is currently connected.
func (*BackedWriter) Reconnect ¶
func (bw *BackedWriter) Reconnect(replayFromSeq uint64, newWriter io.Writer) error
Reconnect replaces the current writer with a new one and replays data from the specified sequence number. If the requested sequence number is no longer in the buffer, returns an error indicating data loss.
IMPORTANT: You must close the current writer, if any, before calling this method. Otherwise, if a Write operation is currently blocked in the underlying writer's Write method, this method will deadlock waiting for the mutex that Write holds.
func (*BackedWriter) SequenceNum ¶
func (bw *BackedWriter) SequenceNum() uint64
SequenceNum returns the current sequence number (total bytes written).
func (*BackedWriter) SetGeneration ¶
func (bw *BackedWriter) SetGeneration(generation uint64)
SetGeneration sets the current connection generation for error reporting.
func (*BackedWriter) Write ¶
func (bw *BackedWriter) Write(p []byte) (int, error)
Write implements io.Writer. When connected, it writes to both the ring buffer (to preserve data in case we need to replay it) and the underlying writer. If the underlying write fails, the writer is marked as disconnected and the write blocks until reconnection occurs.
type ErrorEvent ¶
type ErrorEvent struct {
Err error
Component string // "reader" or "writer"
Generation uint64 // connection generation when error occurred
}
ErrorEvent represents an error from a reader or writer with connection generation info.
type Reconnector ¶
type Reconnector interface {
Reconnect(ctx context.Context, readerSeqNum uint64) (conn io.ReadWriteCloser, remoteReaderSeqNum uint64, err error)
}
Reconnector is an interface for establishing connections when the BackedPipe needs to reconnect. Implementations should: 1. Establish a new connection to the remote side 2. Exchange sequence numbers with the remote side 3. Return the new connection and the remote's reader sequence number
The readerSeqNum parameter is the local reader's current sequence number (total bytes successfully read from the remote). This must be sent to the remote so it can replay its data to us starting from this number.
The returned remoteReaderSeqNum should be the remote side's reader sequence number (how many bytes of our outbound data it has successfully read). This informs our writer where to resume (i.e., which bytes to replay to the remote).