streamclient

package
v0.14.0-beta.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 11, 2026 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AckSender

type AckSender interface {
	SendAck(ackPk wshrpc.CommandStreamAckData)
}

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

func NewBroker

func NewBroker(rpcClient StreamRpcInterface) *Broker

func (*Broker) AttachStreamWriter

func (b *Broker) AttachStreamWriter(meta *wshrpc.StreamMeta, writer StreamWriter) error

func (*Broker) Close

func (b *Broker) Close()

func (*Broker) CreateStreamReader

func (b *Broker) CreateStreamReader(readerRoute string, writerRoute string, rwnd int64) (*Reader, *wshrpc.StreamMeta)

func (*Broker) CreateStreamReaderWithSeq

func (b *Broker) CreateStreamReaderWithSeq(readerRoute string, writerRoute string, rwnd int64, startSeq int64) (*Reader, *wshrpc.StreamMeta)

func (*Broker) CreateStreamWriter

func (b *Broker) CreateStreamWriter(meta *wshrpc.StreamMeta) (*Writer, error)

func (*Broker) DetachStreamWriter

func (b *Broker) DetachStreamWriter(streamId string)

func (*Broker) RecvAck

func (b *Broker) RecvAck(ackPk wshrpc.CommandStreamAckData)

func (*Broker) RecvData

func (b *Broker) RecvData(dataPk wshrpc.CommandStreamData)

RecvData and RecvAck are designed to be non-blocking and must remain so to prevent deadlock. They only enqueue work items to be processed asynchronously by the work queue's goroutine. These methods are called from the main RPC runServer loop, so blocking here would stall all RPC processing.

func (*Broker) SendAck

func (b *Broker) SendAck(ackPk wshrpc.CommandStreamAckData)

func (*Broker) SendData

func (b *Broker) SendData(dataPk wshrpc.CommandStreamData)

type DataSender

type DataSender interface {
	SendData(dataPk wshrpc.CommandStreamData)
}

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

func NewReader

func NewReader(id string, readWindow int64, ackSender AckSender) *Reader

func NewReaderWithSeq

func NewReaderWithSeq(id string, readWindow int64, startSeq int64, ackSender AckSender) *Reader

func (*Reader) Close

func (r *Reader) Close() error

func (*Reader) Read

func (r *Reader) Read(p []byte) (int, error)

func (*Reader) RecvData

func (r *Reader) RecvData(dataPk wshrpc.CommandStreamData)

func (*Reader) UpdateNextSeq

func (r *Reader) UpdateNextSeq(newSeq int64)

type StreamRpcInterface

type StreamRpcInterface interface {
	StreamDataAckCommand(data wshrpc.CommandStreamAckData, opts *wshrpc.RpcOpts) error
	StreamDataCommand(data wshrpc.CommandStreamData, opts *wshrpc.RpcOpts) error
}

type StreamWriter

type StreamWriter interface {
	RecvAck(ackPk wshrpc.CommandStreamAckData)
}

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

func NewWriter

func NewWriter(id string, readWindow int64, dataSender DataSender) *Writer

func (*Writer) Close

func (w *Writer) Close() error

If Close() is called while a Write is blocked, the Write will return an error and buffered data may be discarded.

func (*Writer) CloseWithError

func (w *Writer) CloseWithError(err error) error

If CloseWithError() is called while a Write is blocked, the Write will return an error and buffered data may be discarded.

func (*Writer) GetAckState

func (w *Writer) GetAckState() (maxAckedSeq int64, finAcked bool, canceled bool)

func (*Writer) GetCanceledChan

func (w *Writer) GetCanceledChan() <-chan struct{}

func (*Writer) RecvAck

func (w *Writer) RecvAck(ackPk wshrpc.CommandStreamAckData)

func (*Writer) Write

func (w *Writer) Write(p []byte) (int, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL