udp

package
v1.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2026 License: MPL-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const CryptoFlagHandshake uint32 = 1 << 7
View Source
const CryptoFlagModeAes uint32 = 1 << 0

just copypasted constants from engines udp code

View Source
const CryptoKeyIdMask uint32 = 0xfff
View Source
const CryptoKeyIdShift uint32 = 8
View Source
const DefaultMaxWindowSize = 1000
View Source
const EncryptedFlagsMask uint32 = (1 << 7) - 1

TODO move to better place

View Source
const MaxAckSet = 50
View Source
const MaxFuzzChunkSize = 1 << 5
View Source
const MaxFuzzMessageSize = (1 << 8) - 1
View Source
const MaxFuzzTransportMemory = 2 * MaxFuzzMessageSize
View Source
const MaxResendTimeoutMillis = int64(60_000)
View Source
const MinCryptoKeyLen = 32
View Source
const MinResendRequestTimeoutMillis = int64(10)
View Source
const MinResendTimeoutMillis = int64(30)
View Source
const ObsoleteGenerationPayloadSize = 4 + 12 + 4
View Source
const ObsoleteHashPayloadSize = 4 + 8 + 12
View Source
const RegenerateTimeoutMillis = 5 * 60 * 1000
View Source
const ResendTimeoutCoef = float64(1.5)
View Source
const TransportVersion = 2

Variables

View Source
var ErrNoPayload = errors.New("udp datagram payload size less than 4 bytes")
View Source
var ErrOnlyListenTransport = errors.New("udp transport has local address 0.0.0.0, cannot connect as active side")
View Source
var ErrTransportClosed = errors.New("udp transport closed")
View Source
var ErrTransportShutdown = errors.New("udp transport shutdown")
View Source
var MaxChunkSize = 1000 // assuming minimal MTU for UDP is 508 bytes

TODO replace with OutgoingConnectionConfig

Functions

func CalcHash

func CalcHash(localPid tlnet.Pid, remotePid tlnet.Pid, generation uint32) uint64

func CopyIVFrom

func CopyIVFrom(vec *[8]uint32, w [32]byte)

func CopyIVTo

func CopyIVTo(w *[32]byte, vec [8]uint32)

func FuzzDyukov

func FuzzDyukov(fuzz []byte, withBumpGenerationsAndRestarts bool) int

for https://github.com/dvyukov/go-fuzz

func TestBackpressureTCP

func TestBackpressureTCP() error

func TestBackpressureUDP

func TestBackpressureUDP() error

We open udp socket but do not read from it. Then we write lots of packets there. We expect Write to hange when the outgoing UDP buffer is full. But in fact we successfully write all packets. Then we read packets and notice that most packets were not written. This is in contrast with low-level behavior in Linux. So there must be a bug in golang.

Types

type AcksToSend

type AcksToSend struct {
	// contains filtered or unexported fields
}

func (*AcksToSend) AddAckRange

func (a *AcksToSend) AddAckRange(ackFrom, ackTo uint32)

func (*AcksToSend) BuildAck

func (a *AcksToSend) BuildAck(enc *tlnetUdpPacket.EncHeader)

func (*AcksToSend) BuildNegativeAck

func (a *AcksToSend) BuildNegativeAck(req *tlnetUdpPacket.ResendRequest)

func (*AcksToSend) HaveHoles

func (a *AcksToSend) HaveHoles() bool

type Addr

type Addr struct {
	Ip   uint32
	Port uint16
}

Addr is UDP address that satisfies the Addr interface.

func (Addr) Network

func (n Addr) Network() string

func (Addr) String

func (n Addr) String() string

type ConnResendRequest

type ConnResendRequest struct {
	// contains filtered or unexported fields
}

type Connection

type Connection struct {

	// These fields are public, because higher protocol should set then in acceptHandler and can use them in closeHandler
	MessageHandle      MessageHandler
	StreamLikeIncoming bool
	UserData           any
	// contains filtered or unexported fields
}

Connection - Every Connection is created 1) either in Transport::ConnectTo() method and has fields MessageHandle, StreamLikeIncoming and UserData set to method parameters, 2) or in Transport::goRead() goroutine after datagram receive from the other endpoint. In second case fields MessageHandle, StreamLikeIncoming and UserData must be set by Transport::acceptHandler, obtained in NewTransport() method.

func (*Connection) GetFlag

func (c *Connection) GetFlag(flag int) bool

func (*Connection) KeyID

func (c *Connection) KeyID() [4]byte

func (*Connection) ListenAddr

func (c *Connection) ListenAddr() net.Addr

TODO - can we track this correctly?

func (*Connection) LocalAddr

func (c *Connection) LocalAddr() net.Addr

func (*Connection) RemoteAddr

func (c *Connection) RemoteAddr() net.Addr

func (*Connection) SendMessage

func (c *Connection) SendMessage(message *[]byte) error

func (*Connection) SendUnreliableMessage

func (c *Connection) SendUnreliableMessage(message *[]byte) error

func (*Connection) SetFlag

func (c *Connection) SetFlag(flag int, value bool)

type ConnectionHeader

type ConnectionHeader struct {
	// contains filtered or unexported fields
}

type ConnectionID

type ConnectionID struct {
	IP   uint32 // Actual remote address. Motivation - prevent collisions of hash between clients
	Port uint16 // Actual remote port. Motivation - prevent collisions of hash between clients
	Hash int64  // common connection identifier, defined in VK UDP Protocol. TODO - each side must be able to select its own hash
}

type ConnectionMessage

type ConnectionMessage struct {
	// contains filtered or unexported fields
}

TODO normal naming !!!!!!!!!!!!!

type ConnectionStatus

type ConnectionStatus uint8
const (
	ConnectionStatusWaitingForRemotePid ConnectionStatus = iota
	ConnectionStatusWaitingForHash
	ConnectionStatusEstablished
	ConnectionSentObsoleteHash
	ConnectionSentObsoleteGeneration
)

type CryptoKeysUdp

type CryptoKeysUdp struct {
	ReadKey  [32]byte
	WriteKey [32]byte
}

func DeriveCryptoKeysUdp

func DeriveCryptoKeysUdp(key string, localPid *tlnet.Pid, remotePid *tlnet.Pid, generation uint32) *CryptoKeysUdp

type FuzzTransportContext

type FuzzTransportContext struct {
	// contains filtered or unexported fields
}

type IncomingChunk

type IncomingChunk struct {
	// contains filtered or unexported fields
}

IncomingChunk seqNo is index in IncomingConnection windowData

type IncomingConnection

type IncomingConnection struct {
	// contains filtered or unexported fields
}

func (*IncomingConnection) OnAcquiredMemory

func (c *IncomingConnection) OnAcquiredMemory()

OnAcquiredMemory called from Transport::checkMemoryWaiters when there is enough free memory

func (*IncomingConnection) ReceiveDatagram

func (c *IncomingConnection) ReceiveDatagram(enc *tlnetUdpPacket.EncHeader, resendReq *tlnetUdpPacket.ResendRequest, packet []byte) (bool, error)

ReceiveDatagram parses payload in packet bytes, updates IncomingConnection state and calls MessageHandler if possible. Also updates enc.PacketNum or enc.PacketFrom + enc.PacketCount, if any chunks weren't actually received at first time: if chunks were outside the window, 1) either because they were in already received prefix, 2) or we couldn't afford enough memory for them, they are removed from header info.

We need this behaviour to avoid situation, when we hadn't actually received chunk, but goWrite() received enc header with this chunk, and sent ack to the peer, leading to protocol deadlock.

type IncomingMessage

type IncomingMessage struct {
	// contains filtered or unexported fields
}

type MessageHandler

type MessageHandler func(message *[]byte, canSave bool)

MessageHandler is called when full message is received by transport. message - pointer to slice with received message canSave - whether handler owns message pointer and can do with it whatever it wants (usually return it to pool after use). canSave == false means handler can only read message and must copy it in case he wants to use it after return.

type OutgoingChunk

type OutgoingChunk struct {
	// contains filtered or unexported fields
}

type OutgoingConnection

type OutgoingConnection struct {
	// contains filtered or unexported fields
}

func (*OutgoingConnection) AckChunk

func (o *OutgoingConnection) AckChunk(t *Transport, seqNum uint32) error

func (*OutgoingConnection) AckPrefix

func (o *OutgoingConnection) AckPrefix(t *Transport, prefixSeqNum uint32) error

TODO: clarify semantic

func (*OutgoingConnection) GetChunksToSend

func (o *OutgoingConnection) GetChunksToSend(t *Transport, chunks [][]byte) (_ [][]byte, firstSeqNum uint32, singleMessage bool)

GetChunksToSend TODO draw data stream with acked/un-acked, timeout-ed, resended, ... chunks to explain algorithm how to choose chunks to send

func (*OutgoingConnection) OnResendTimeout

func (o *OutgoingConnection) OnResendTimeout()

func (*OutgoingConnection) ResetFlowControl

func (o *OutgoingConnection) ResetFlowControl()

func (*OutgoingConnection) SetFlowControl

func (o *OutgoingConnection) SetFlowControl(seqNo uint32)

type OutgoingMessage

type OutgoingMessage struct {
	// contains filtered or unexported fields
}

type RandomMessage

type RandomMessage struct {
	// contains filtered or unexported fields
}

type TestDatagram

type TestDatagram struct {
	// contains filtered or unexported fields
}

type TimersPriorityQueue

type TimersPriorityQueue struct {
	// contains filtered or unexported fields
}

func (*TimersPriorityQueue) Add

func (q *TimersPriorityQueue) Add(conn *Connection)

func (*TimersPriorityQueue) Empty

func (q *TimersPriorityQueue) Empty() bool

func (*TimersPriorityQueue) ExtractMin

func (q *TimersPriorityQueue) ExtractMin() *Connection

func (*TimersPriorityQueue) Len

func (q *TimersPriorityQueue) Len() int

func (*TimersPriorityQueue) Min

func (q *TimersPriorityQueue) Min() *Connection

type Transport

type Transport struct {
	// contains filtered or unexported fields
}

func NewTransport

func NewTransport(
	incomingMessagesMemoryLimit int64,
	cryptoKeys []string,
	socket *net.UDPConn,
	socketAddr net.Addr,
	startTime uint32,
	acceptHandler func(*Connection),
	closeHandler func(*Connection),
	messageAllocator func(size int) (message *[]byte),
	messageDeallocator func(message *[]byte),
	resendTimeout time.Duration,
	debugUdpRPC int,
	printKeysGenerationInfo bool,
	debugUDPLatency bool,
	socketWriteLatencyMetric func(millis float64),
	socketReadLatencyMetric func(millis float64),
	writeScheduleLatencyMetric func(millis float64),
	readScheduleLatencyMetric func(millis float64),
) (*Transport, error)

func (*Transport) Close

func (t *Transport) Close() (err error)

func (*Transport) ConnectTo

func (t *Transport) ConnectTo(addr netip.AddrPort, incomingMessageHandle MessageHandler, streamLikeIncoming bool, userData any) (*Connection, error)

func (*Transport) DumpUdpTargets

func (t *Transport) DumpUdpTargets()

func (*Transport) GetStats

func (t *Transport) GetStats(res *TransportStats)

func (*Transport) GoReadHandleEncHdr

func (t *Transport) GoReadHandleEncHdr(conn *Connection, enc tlnetUdpPacket.EncHeader) (startedAckTimer bool)

GoReadHandleEncHdr must be called under t.writeMu lock

func (*Transport) Run

func (t *Transport) Run() error

func (*Transport) Shutdown

func (t *Transport) Shutdown()

func (*Transport) StartTime

func (t *Transport) StartTime() uint32

type TransportStats

type TransportStats struct {
	// these metrics are valuable in per second calculation
	NewIncomingMessages           atomic.Int64
	MessageHandlerCalled          atomic.Int64 // only for reliable messages
	MessageReleased               atomic.Int64
	DatagramRead                  atomic.Int64
	DatagramWritten               atomic.Int64
	DatagramSizeRead              atomic.Int64
	DatagramSizeWritten           atomic.Int64
	UnreliableMessagesReceived    atomic.Int64
	ObsoletePidReceived           atomic.Int64
	ObsoleteHashReceived          atomic.Int64
	ObsoleteGenerationReceived    atomic.Int64
	ResendRequestReceived         atomic.Int64
	UnreliableMessagesSent        atomic.Int64
	ObsoletePidSent               atomic.Int64
	ObsoleteHashSent              atomic.Int64
	ObsoleteGenerationSent        atomic.Int64
	ResendRequestSent             atomic.Int64
	AckTimerBurned                atomic.Int64
	ResendTimerBurned             atomic.Int64
	ResendRequestTimerBurned      atomic.Int64
	RegenerateTimerBurned         atomic.Int64
	HoleSeqNumsSent               atomic.Int64
	RequestedSeqNumsOutOfWindow   atomic.Int64
	RequestedSeqNumsActuallyAcked atomic.Int64
	RequestedSeqNumsNotAcked      atomic.Int64

	// the absolute values of these metrics are decent for us
	IncomingMessagesInflight atomic.Int64 // messages, that we started to receive, but didn't pass to handler
	OutgoingMessagesInflight atomic.Int64 // not acked outgoing messages
	ConnectionsMapSize       atomic.Int64
	MemoryWaitersSize        atomic.Int64
	AcquiredMemory           atomic.Int64
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL