Documentation
¶
Index ¶
- Constants
- Variables
- func CalcHash(localPid tlnet.Pid, remotePid tlnet.Pid, generation uint32) uint64
- func CopyIVFrom(vec *[8]uint32, w [32]byte)
- func CopyIVTo(w *[32]byte, vec [8]uint32)
- func FuzzDyukov(fuzz []byte, withBumpGenerationsAndRestarts bool) int
- func TestBackpressureTCP() error
- func TestBackpressureUDP() error
- type AcksToSend
- type Addr
- type ConnResendRequest
- type Connection
- func (c *Connection) GetFlag(flag int) bool
- func (c *Connection) KeyID() [4]byte
- func (c *Connection) ListenAddr() net.Addr
- func (c *Connection) LocalAddr() net.Addr
- func (c *Connection) RemoteAddr() net.Addr
- func (c *Connection) SendMessage(message *[]byte) error
- func (c *Connection) SendUnreliableMessage(message *[]byte) error
- func (c *Connection) SetFlag(flag int, value bool)
- type ConnectionHeader
- type ConnectionID
- type ConnectionMessage
- type ConnectionStatus
- type CryptoKeysUdp
- type FuzzTransportContext
- type IncomingChunk
- type IncomingConnection
- type IncomingMessage
- type MessageHandler
- type OutgoingChunk
- type OutgoingConnection
- func (o *OutgoingConnection) AckChunk(t *Transport, seqNum uint32) error
- func (o *OutgoingConnection) AckPrefix(t *Transport, prefixSeqNum uint32) error
- func (o *OutgoingConnection) GetChunksToSend(t *Transport, chunks [][]byte) (_ [][]byte, firstSeqNum uint32, singleMessage bool)
- func (o *OutgoingConnection) OnResendTimeout()
- func (o *OutgoingConnection) ResetFlowControl()
- func (o *OutgoingConnection) SetFlowControl(seqNo uint32)
- type OutgoingMessage
- type RandomMessage
- type TestDatagram
- type TimersPriorityQueue
- type Transport
- func (t *Transport) Close() (err error)
- func (t *Transport) ConnectTo(addr netip.AddrPort, incomingMessageHandle MessageHandler, ...) (*Connection, error)
- func (t *Transport) DumpUdpTargets()
- func (t *Transport) GetStats(res *TransportStats)
- func (t *Transport) GoReadHandleEncHdr(conn *Connection, enc tlnetUdpPacket.EncHeader) (startedAckTimer bool)
- func (t *Transport) Run() error
- func (t *Transport) Shutdown()
- func (t *Transport) StartTime() uint32
- type TransportStats
Constants ¶
const CryptoFlagHandshake uint32 = 1 << 7
const CryptoFlagModeAes uint32 = 1 << 0
just copypasted constants from engines udp code
const CryptoKeyIdMask uint32 = 0xfff
const CryptoKeyIdShift uint32 = 8
const DefaultMaxWindowSize = 1000
const EncryptedFlagsMask uint32 = (1 << 7) - 1
TODO move to better place
const MaxAckSet = 50
const MaxFuzzChunkSize = 1 << 5
const MaxFuzzMessageSize = (1 << 8) - 1
const MaxFuzzTransportMemory = 2 * MaxFuzzMessageSize
const MaxResendTimeoutMillis = int64(60_000)
const MinCryptoKeyLen = 32
const MinResendRequestTimeoutMillis = int64(10)
const MinResendTimeoutMillis = int64(30)
const ObsoleteGenerationPayloadSize = 4 + 12 + 4
const ObsoleteHashPayloadSize = 4 + 8 + 12
const RegenerateTimeoutMillis = 5 * 60 * 1000
const ResendTimeoutCoef = float64(1.5)
const TransportVersion = 2
Variables ¶
var ErrNoPayload = errors.New("udp datagram payload size less than 4 bytes")
var ErrOnlyListenTransport = errors.New("udp transport has local address 0.0.0.0, cannot connect as active side")
var ErrTransportClosed = errors.New("udp transport closed")
var ErrTransportShutdown = errors.New("udp transport shutdown")
var MaxChunkSize = 1000 // assuming minimal MTU for UDP is 508 bytes
TODO replace with OutgoingConnectionConfig
Functions ¶
func CopyIVFrom ¶
func FuzzDyukov ¶
func TestBackpressureTCP ¶
func TestBackpressureTCP() error
func TestBackpressureUDP ¶
func TestBackpressureUDP() error
We open udp socket but do not read from it. Then we write lots of packets there. We expect Write to hange when the outgoing UDP buffer is full. But in fact we successfully write all packets. Then we read packets and notice that most packets were not written. This is in contrast with low-level behavior in Linux. So there must be a bug in golang.
Types ¶
type AcksToSend ¶
type AcksToSend struct {
// contains filtered or unexported fields
}
func (*AcksToSend) AddAckRange ¶
func (a *AcksToSend) AddAckRange(ackFrom, ackTo uint32)
func (*AcksToSend) BuildAck ¶
func (a *AcksToSend) BuildAck(enc *tlnetUdpPacket.EncHeader)
func (*AcksToSend) BuildNegativeAck ¶
func (a *AcksToSend) BuildNegativeAck(req *tlnetUdpPacket.ResendRequest)
func (*AcksToSend) HaveHoles ¶
func (a *AcksToSend) HaveHoles() bool
type ConnResendRequest ¶
type ConnResendRequest struct {
// contains filtered or unexported fields
}
type Connection ¶
type Connection struct {
// These fields are public, because higher protocol should set then in acceptHandler and can use them in closeHandler
MessageHandle MessageHandler
StreamLikeIncoming bool
UserData any
// contains filtered or unexported fields
}
Connection - Every Connection is created 1) either in Transport::ConnectTo() method and has fields MessageHandle, StreamLikeIncoming and UserData set to method parameters, 2) or in Transport::goRead() goroutine after datagram receive from the other endpoint. In second case fields MessageHandle, StreamLikeIncoming and UserData must be set by Transport::acceptHandler, obtained in NewTransport() method.
func (*Connection) GetFlag ¶
func (c *Connection) GetFlag(flag int) bool
func (*Connection) KeyID ¶
func (c *Connection) KeyID() [4]byte
func (*Connection) ListenAddr ¶
func (c *Connection) ListenAddr() net.Addr
TODO - can we track this correctly?
func (*Connection) LocalAddr ¶
func (c *Connection) LocalAddr() net.Addr
func (*Connection) RemoteAddr ¶
func (c *Connection) RemoteAddr() net.Addr
func (*Connection) SendMessage ¶
func (c *Connection) SendMessage(message *[]byte) error
func (*Connection) SendUnreliableMessage ¶
func (c *Connection) SendUnreliableMessage(message *[]byte) error
func (*Connection) SetFlag ¶
func (c *Connection) SetFlag(flag int, value bool)
type ConnectionHeader ¶
type ConnectionHeader struct {
// contains filtered or unexported fields
}
type ConnectionID ¶
type ConnectionID struct {
IP uint32 // Actual remote address. Motivation - prevent collisions of hash between clients
Port uint16 // Actual remote port. Motivation - prevent collisions of hash between clients
Hash int64 // common connection identifier, defined in VK UDP Protocol. TODO - each side must be able to select its own hash
}
type ConnectionMessage ¶
type ConnectionMessage struct {
// contains filtered or unexported fields
}
TODO normal naming !!!!!!!!!!!!!
type ConnectionStatus ¶
type ConnectionStatus uint8
const ( ConnectionStatusWaitingForRemotePid ConnectionStatus = iota ConnectionStatusWaitingForHash ConnectionStatusEstablished ConnectionSentObsoleteHash ConnectionSentObsoleteGeneration )
type CryptoKeysUdp ¶
func DeriveCryptoKeysUdp ¶
type FuzzTransportContext ¶
type FuzzTransportContext struct {
// contains filtered or unexported fields
}
type IncomingChunk ¶
type IncomingChunk struct {
// contains filtered or unexported fields
}
IncomingChunk seqNo is index in IncomingConnection windowData
type IncomingConnection ¶
type IncomingConnection struct {
// contains filtered or unexported fields
}
func (*IncomingConnection) OnAcquiredMemory ¶
func (c *IncomingConnection) OnAcquiredMemory()
OnAcquiredMemory called from Transport::checkMemoryWaiters when there is enough free memory
func (*IncomingConnection) ReceiveDatagram ¶
func (c *IncomingConnection) ReceiveDatagram(enc *tlnetUdpPacket.EncHeader, resendReq *tlnetUdpPacket.ResendRequest, packet []byte) (bool, error)
ReceiveDatagram parses payload in packet bytes, updates IncomingConnection state and calls MessageHandler if possible. Also updates enc.PacketNum or enc.PacketFrom + enc.PacketCount, if any chunks weren't actually received at first time: if chunks were outside the window, 1) either because they were in already received prefix, 2) or we couldn't afford enough memory for them, they are removed from header info.
We need this behaviour to avoid situation, when we hadn't actually received chunk, but goWrite() received enc header with this chunk, and sent ack to the peer, leading to protocol deadlock.
type IncomingMessage ¶
type IncomingMessage struct {
// contains filtered or unexported fields
}
type MessageHandler ¶
MessageHandler is called when full message is received by transport. message - pointer to slice with received message canSave - whether handler owns message pointer and can do with it whatever it wants (usually return it to pool after use). canSave == false means handler can only read message and must copy it in case he wants to use it after return.
type OutgoingChunk ¶
type OutgoingChunk struct {
// contains filtered or unexported fields
}
type OutgoingConnection ¶
type OutgoingConnection struct {
// contains filtered or unexported fields
}
func (*OutgoingConnection) AckChunk ¶
func (o *OutgoingConnection) AckChunk(t *Transport, seqNum uint32) error
func (*OutgoingConnection) AckPrefix ¶
func (o *OutgoingConnection) AckPrefix(t *Transport, prefixSeqNum uint32) error
TODO: clarify semantic
func (*OutgoingConnection) GetChunksToSend ¶
func (o *OutgoingConnection) GetChunksToSend(t *Transport, chunks [][]byte) (_ [][]byte, firstSeqNum uint32, singleMessage bool)
GetChunksToSend TODO draw data stream with acked/un-acked, timeout-ed, resended, ... chunks to explain algorithm how to choose chunks to send
func (*OutgoingConnection) OnResendTimeout ¶
func (o *OutgoingConnection) OnResendTimeout()
func (*OutgoingConnection) ResetFlowControl ¶
func (o *OutgoingConnection) ResetFlowControl()
func (*OutgoingConnection) SetFlowControl ¶
func (o *OutgoingConnection) SetFlowControl(seqNo uint32)
type OutgoingMessage ¶
type OutgoingMessage struct {
// contains filtered or unexported fields
}
type RandomMessage ¶
type RandomMessage struct {
// contains filtered or unexported fields
}
type TestDatagram ¶
type TestDatagram struct {
// contains filtered or unexported fields
}
type TimersPriorityQueue ¶
type TimersPriorityQueue struct {
// contains filtered or unexported fields
}
func (*TimersPriorityQueue) Add ¶
func (q *TimersPriorityQueue) Add(conn *Connection)
func (*TimersPriorityQueue) Empty ¶
func (q *TimersPriorityQueue) Empty() bool
func (*TimersPriorityQueue) ExtractMin ¶
func (q *TimersPriorityQueue) ExtractMin() *Connection
func (*TimersPriorityQueue) Len ¶
func (q *TimersPriorityQueue) Len() int
func (*TimersPriorityQueue) Min ¶
func (q *TimersPriorityQueue) Min() *Connection
type Transport ¶
type Transport struct {
// contains filtered or unexported fields
}
func NewTransport ¶
func NewTransport( incomingMessagesMemoryLimit int64, cryptoKeys []string, socket *net.UDPConn, socketAddr net.Addr, startTime uint32, acceptHandler func(*Connection), closeHandler func(*Connection), messageAllocator func(size int) (message *[]byte), messageDeallocator func(message *[]byte), resendTimeout time.Duration, debugUdpRPC int, printKeysGenerationInfo bool, debugUDPLatency bool, socketWriteLatencyMetric func(millis float64), socketReadLatencyMetric func(millis float64), writeScheduleLatencyMetric func(millis float64), readScheduleLatencyMetric func(millis float64), ) (*Transport, error)
func (*Transport) ConnectTo ¶
func (t *Transport) ConnectTo(addr netip.AddrPort, incomingMessageHandle MessageHandler, streamLikeIncoming bool, userData any) (*Connection, error)
func (*Transport) DumpUdpTargets ¶
func (t *Transport) DumpUdpTargets()
func (*Transport) GetStats ¶
func (t *Transport) GetStats(res *TransportStats)
func (*Transport) GoReadHandleEncHdr ¶
func (t *Transport) GoReadHandleEncHdr(conn *Connection, enc tlnetUdpPacket.EncHeader) (startedAckTimer bool)
GoReadHandleEncHdr must be called under t.writeMu lock
type TransportStats ¶
type TransportStats struct {
// these metrics are valuable in per second calculation
NewIncomingMessages atomic.Int64
MessageHandlerCalled atomic.Int64 // only for reliable messages
MessageReleased atomic.Int64
DatagramRead atomic.Int64
DatagramWritten atomic.Int64
DatagramSizeRead atomic.Int64
DatagramSizeWritten atomic.Int64
UnreliableMessagesReceived atomic.Int64
ObsoletePidReceived atomic.Int64
ObsoleteHashReceived atomic.Int64
ObsoleteGenerationReceived atomic.Int64
ResendRequestReceived atomic.Int64
UnreliableMessagesSent atomic.Int64
ObsoletePidSent atomic.Int64
ObsoleteHashSent atomic.Int64
ObsoleteGenerationSent atomic.Int64
ResendRequestSent atomic.Int64
AckTimerBurned atomic.Int64
ResendTimerBurned atomic.Int64
ResendRequestTimerBurned atomic.Int64
RegenerateTimerBurned atomic.Int64
HoleSeqNumsSent atomic.Int64
RequestedSeqNumsOutOfWindow atomic.Int64
RequestedSeqNumsActuallyAcked atomic.Int64
RequestedSeqNumsNotAcked atomic.Int64
// the absolute values of these metrics are decent for us
IncomingMessagesInflight atomic.Int64 // messages, that we started to receive, but didn't pass to handler
OutgoingMessagesInflight atomic.Int64 // not acked outgoing messages
ConnectionsMapSize atomic.Int64
MemoryWaitersSize atomic.Int64
AcquiredMemory atomic.Int64
}