stream

package
v0.0.52 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 30, 2025 License: AGPL-3.0 Imports: 22 Imported by: 0

README

Overview

The stream package provides an implementation of an asynchronous, reliable, and encrypted communication protocol on top of the Katzenpost network.

This library is provided for establishing reliable bidirectional communication channel between a pair of clients using a shared secret, and a key-value scratchpad service for exchanging messages. From the shared secret, sequences of message storage addresses (32 bytes) and symmetric message encryption keys are derived. Each client runs protocol state machines that fetch, transmit and acknowledge frames of data, and re-transmit unacknowledged frames in order to provide a reliable delivery of data via a lossy storage service. Storage addresses are mapped to nodes published in Katzenpost's Directory Authority system, which runs a service called "Map" that provides a simple lossy storage service where content is limited to a configurable buffer size and automatically expire.

Usage

Stream implements the io.Writer, io.Reader, and net.Conn interfaces. Streams need a Transport:

Put(addr []byte, payload []byte) error
Get(addr []byte) ([]byte, error)
PayloadSize() int

At present, Stream uses https://github.com/katzenpost/katzenpost/tree/add_reliable_streams/map/client as the Transport. Map uses a cryptographic capability system that uses blinded ed25519 to derive message storage addresses that are valid ed25519 public keys and are used to verify the payload signed by the corresponding private key.

func DuplexFromSeed(c *Client, initiator bool, secret []byte) RWClient

A Transport may be initialized from a shared secret, as currently used by Stream, using the map.DuplexFromSeed method which returns a RWClient. RWClient implements Transport and encapsulates a pair of capabilities, a read capability to access messages written by another peer and a write capability to write messages to that peer. Only one peer must be the "initiator", which corresponds to the Listener role in a Stream. Instead of deriving both capabilities from the same shared secret, a key exchange may be performed (for example, using PANDA or REUNION with the shared secret) which will enable clients to exchange read-only capabilities so that multiple readers may share the same read-only capability.

Creating a Stream

func Dial(c Transport, network, addr string) (*Stream, error)

Dial creates a new Stream for initiating communication. It takes a transport, network identifier, and address, and returns a Stream initialized with the provided parameters.

func Listen(c Transport, network string, addr *StreamAddr) (*Stream, error)

Listen creates a new Stream for listening. It takes a transport, network identifier, and a StreamAddr, and returns a Stream initialized with the provided parameters.

// StreamAddr implements net.Addr
type StreamAddr struct {
	network, address string
}

StreamAddr implements net.Addr interface and encapsulates a network and address string. By convesntion, address is base64 encoded and is the shared secret used to initialize a Stream. Stream implements net.Conn, and both tream.LocalAddr() and Stream.RemoteAddr() returns a StreamAddr containing the shared secret.

func ListenDuplex(s *client.Session, network, addr string) (*Stream, error)

ListenDuplex creates a new Stream as the Listener, and uses a map.Client initialized from the shared secret for the Transport.

func DialDuplex(s *client.Session, network, addr string) (*Stream, error)

DialDuplex creates a new Stream as the Dialer, and uses a map.Client initialized from the shared secret for the Transport.

Reading and Writing

Read

func (s *Stream) Read(p []byte) (n int, err error)

Read reads data from the stream into the provided byte slice. It blocks until data is available or the stream is closed.

Write

func (s *Stream) Write(p []byte) (n int, err error)

Write writes data to the stream. It blocks until the data is written or the stream is closed.

Sync

func (s *Stream) Sync() error

Sync blocks until the WriteBuf is flushed.

Closing a Stream

Close

func (s *Stream) Close() error

Close terminates the stream with a final frame and blocks future writes. It does not drain WriteBuf; use Sync() to flush WriteBuf first.

Halting the workers

Halt

func (s *Stream) Halt()

Stream inherits Halt from https://github.com/katzenpost/katzenpost/core/worker. Calling Halt causes the reader and writer routines to terminate.

Saving Stream state

func (s *Stream) Save() ([]byte, error)

Save() returns the CBOR serialization of a Stream struct

Restoring Stream state

func LoadStream(s *client.Session, state []byte) (*Stream, error)

LoadStream initializes a Stream from state saved by Stream.Save()

Restarting a Stream

func (s *Stream) Start()

Start initializes and starts the reader and writer workers

Example

// session is provided by https://github.com/katzenpost/katzenpost/client
s := NewStream(session)
r, _ := DialDuplex(session, "", s.RemoteAddr().String())

msg := []byte("Hello World")
s.Write(msg)
io.ReadAtLeast(r, make([]byte, len(msg)), len(msg))
r.Write([]byte("Goodbye World"))
io.ReadAtLeast(s, make([]byte, len(msg)), len(msg))

s.Sync()
s.Close()
r.Sync()
r.Close()
s.Halt()
r.Halt()

unit tests

Unit tests are run using go test:

go test -v ./...

GitHub CI tests

End-to-End tests are run using dockerized instances of the Katzenpost mixnet.

To start a locally running testnet, navigate to the docker directory of this repository and follow the README.rst to familiarize yourself with starting and stopping a local mixnet using the make commands.

Once you have a mixnet running, e.g.:

git clone https://github.com/katzenpost/katzenpost -b add_reliable_streams && cd katzenpost/docker && make start wait

You can then run the end-to-end tests like so:

  cd ../katzenpost/stream && make dockerdockertest

License

AGPLv3

Donations

Your donations are welcomed and can be made through Open Collective here.

Supported By

NGI NLnet Foundation NGI Assure

This project has received funding from:

  • NGI Assure Fund, a fund established by NLnet with financial support from the European Commission's Next Generation Internet programme, under the aegis of DG Communications Networks, Content and Technology under grant agreement No 957073.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrStreamClosed    = errors.New("Stream Closed")
	ErrFrameDecrypt    = errors.New("Failed to decrypt")
	ErrGeometryChanged = errors.New("Stream Payload Geometry Change")
	ErrInvalidAddr     = errors.New("Invalid StreamAddr")
	ErrHalted          = errors.New("Halted")
)

Functions

func H

func H(i []byte) (res common.MessageID)

func PayloadSize

func PayloadSize(c Transport) int

Types

type BufferedStream

type BufferedStream struct {
	worker.Worker
	sync.Mutex
	Stream *Stream
	Buffer *bytes.Buffer
}

BufferedStream holds a Stream and Buffer of bytes that are used with cbor.Decoder

func (*BufferedStream) CBORDecode

func (b *BufferedStream) CBORDecode(instance interface{}) error

CBORDecode deserializes CBOR from Stream into the instance passed

func (*BufferedStream) CBORDecodeAsync

func (b *BufferedStream) CBORDecodeAsync(instance interface{}) chan interface{}

CBORDecodeAsync is the routine that reads from the Stream until an instance is deserialized or the stream is closed. It returns the deserialiezd instance or error via a channel

func (*BufferedStream) Close

func (s *BufferedStream) Close() error

Close calls Stream.Close

func (*BufferedStream) Read

func (s *BufferedStream) Read(p []byte) (n int, err error)

Read calls Stream.Read

func (*BufferedStream) Start

func (b *BufferedStream) Start()

Start starts BufferedStreams Stream

func (*BufferedStream) Write

func (s *BufferedStream) Write(p []byte) (n int, err error)

Write calls Stream.Write

type Frame

type Frame struct {
	Type FrameType
	// Ack is the sequence number of last consequtive frame seen by peer
	Id      uint64
	Ack     uint64
	Payload []byte // transported data
}

Frame is the container for Stream payloads and contains Stream metadata that indicates whether the Frame is the first, last, or an intermediary block. This

func (*Frame) String added in v0.0.47

func (f *Frame) String() string

String returns a description of the frame and payload

type FrameType

type FrameType uint8

FrameType indicates the state of Stream at the current Frame

const (
	// StreamStart indicates that this is the first Frame in a Stream
	StreamStart FrameType = iota
	// StreamData indicates that this is a data carrying Frame in a Stream
	StreamData
	// StreamEnd indicates that this is the last Frame in a Stream
	StreamEnd
)

type FrameWithPriority added in v0.0.47

type FrameWithPriority struct {
	Frame         *Frame // payload of message
	FramePriority uint64 // the time in nanoseconds of when to retransmit an unacknowledged message
}

FrameWithPriority implmeents client.Item and holds the retransmit deadline and Frame for use with a TimerQueue

func (*FrameWithPriority) Priority added in v0.0.47

func (s *FrameWithPriority) Priority() uint64

Priority implements client.Item interface; used by TimerQueue for retransmissions

type ReTx

type ReTx struct {
	sync.Mutex

	Wack map[uint64]*FrameWithPriority
	// contains filtered or unexported fields
}

ReTx implmements client.nqueue and re-transmits unacknowledged frames

func (*ReTx) Ack added in v0.0.47

func (r *ReTx) Ack(frameId uint64) bool

Ack removes any unAcknowledged Frames <= frameId from Wack

func (*ReTx) Push

func (r *ReTx) Push(i client.Item) error

Push is called by the TimerQueue (Stream.TQ) with a client.Item when its deadline expires.

type Stream

type Stream struct {
	worker.Worker

	// address of the Stream
	Addr *StreamAddr

	// Initiator is true if Stream is created by NewStream or Listen methods
	Initiator bool

	// Mode indicates what type of Stream, e.g. EndToEnd or Finite
	Mode StreamMode

	// frame encryption secrets
	WriteKey *[keySize]byte // secretbox key to encrypt with
	ReadKey  *[keySize]byte // secretbox key to decrypt with

	// read/write secrets initialized from handshake
	WriteIDBase common.MessageID
	ReadIDBase  common.MessageID

	// XXX: bytes.Buffer is not restored by cbor.Marshal
	WriteBuf []byte

	ReadBuf []byte

	// our frame pointers
	ReadIdx  uint64
	WriteIdx uint64

	// idx of last ack
	AckIdx uint64

	// last ack received from peer
	PeerAckIdx uint64

	// TQ is used to schedule retransmission events of unacknowledged messages
	TQ *client.TimerQueue

	// R holds state needed to reschedule unacknowledged messages expiring in TQ
	R *ReTx

	// PayloadSize is the stream frame payload length, and must not change once
	// a stream has been initialized.
	PayloadSize int

	// WindowSize is the number of messages ahead of peer's
	// ackknowledgement that the writeworker will periodically retransmit
	WindowSize uint64

	// MaxWriteBufSize is the number of bytes to buffer before blocking calls to Write()
	MaxWriteBufSize int

	// RState indicates Reader State
	RState StreamState

	// RState indicates Writer State
	WState StreamState

	// timeout value used internally before failing a blocking call
	Timeout time.Duration
	// contains filtered or unexported fields
}

func Dial

func Dial(c Transport, network, addr string) (*Stream, error)

Dial returns a Stream initialized with secret address

func DialDuplex

func DialDuplex(s *client.Session, network, addr string) (*Stream, error)

DialDuplex returns a stream using capability backed map storage (Duplex)

func Listen

func Listen(c Transport, network, addr string) (*Stream, error)

Listen should be net.Listener

func ListenDuplex

func ListenDuplex(s *client.Session, network, addr string) (*Stream, error)

ListenDuplex returns a Stream using capability map storage (Duplex) as initiator

func LoadStream

func LoadStream(state []byte) (*Stream, error)

LoadStream initializes a Stream from state saved by Save()

func NewDuplex

func NewDuplex(s *client.Session) (*Stream, error)

NewDuplex returns a Stream using capability map storage (Duplex) a Listener

func NewMulticastStream

func NewMulticastStream(s *client.Session) *Stream

NewMulticastStream generates a new address and starts the read/write workers with Multicast mode

func NewStream

func NewStream(s *client.Session) *Stream

NewStream generates a new address and starts the read/write workers with End to End mode func NewStream(c Transport, identity sign.PrivateKey, sign.PublicKey) *Stream {

func (*Stream) Close

func (s *Stream) Close() error

Close terminates the Stream with a final Frame and blocks future Writes it does *not* drain writeBuf, call Sync() to flush writeBuf first.

func (*Stream) LocalAddr

func (s *Stream) LocalAddr() *StreamAddr

LocalAddr implements net.Addr LocalAddr()

func (*Stream) MarshalCBOR added in v0.0.47

func (s *Stream) MarshalCBOR() ([]byte, error)

func (*Stream) Read

func (s *Stream) Read(p []byte) (n int, err error)

Read impl io.Reader

func (*Stream) RemoteAddr

func (s *Stream) RemoteAddr() *StreamAddr

LocalAddr implements net.Conn RemoteAddr()

func (*Stream) Save

func (s *Stream) Save() ([]byte, error)

Save serializes the current state of the Stream

func (*Stream) SetTransport added in v0.0.47

func (s *Stream) SetTransport(t Transport)

func (*Stream) Start

func (s *Stream) Start()

Start starts the reader and writer workers

func (*Stream) StartWithTransport added in v0.0.47

func (s *Stream) StartWithTransport(trans Transport)

StartWithTransport starts the reader and writer workers

func (*Stream) String added in v0.0.47

func (s *Stream) String() string

String returns a description of the stream

func (*Stream) Sync

func (s *Stream) Sync() error

Sync() blocks until Stream.WriteBuf is flushed

func (*Stream) UnmarshalCBOR added in v0.0.47

func (s *Stream) UnmarshalCBOR(data []byte) error

func (*Stream) Write

func (s *Stream) Write(p []byte) (n int, err error)

Write impl io.Writer

type StreamAddr

type StreamAddr struct {
	Snetwork, Saddress string
}

StreamAddr implements net.Addr

func (*StreamAddr) Network

func (s *StreamAddr) Network() string

Network implements net.Addr

func (*StreamAddr) String

func (s *StreamAddr) String() string

String implements net.Addr String()

type StreamMode

type StreamMode uint8

StreamMode is the type of stream.

EndToEnd streams require the reader to acknowledge frames of data read
  before a sender will continue transmitting.
Multicast streams are suitable for multiple readers and do not
  ACK frames nor block the writer by waiting for ACKs.
The default StreamMode is EndToEnd.
const (
	Multicast StreamMode = iota // no Acknowledgements
	EndToEnd                    // requires interactive Acknowledge
)

type StreamState

type StreamState uint8

StreamState are the states that the reader and writer routines can be in

const (
	StreamOpen StreamState = iota
	StreamClosing
	StreamClosed
)

type Transport

type Transport mClient.ReadWriteClient

Transport describes the interface to Get or Put Frames

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL