rpc

package
v1.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2026 License: MPL-2.0 Imports: 51 Imported by: 0

README

Документация пакета RPC

Этот пакет реализует VK RPC для TCP/Unix и UDP транспорта.

Cвойства, которые есть сейчас

И которые нельзя терять при любых рефакторах пакета.

  1. Строгость - всё, что отклоняется от протокола должна приводить к ошибке.
  2. Скорость - обрабатывать 1млн RPS и более на среднем ноутбуке (2024г), и не хотим замедления.
  3. Отсутствие аллокаций при длительной работе соединения, аллокации только при установлении.
  4. Минимальные внешние и внутренние зависимости - из vkgo нельзя использовать ничего вне папки пакета.
  5. Гарантия лимита памяти rpc.Server и backpressure на клиентов - при любых внешних условиях не будет выделено больше памяти и горутин-работников, чем настроено. Из этого свойства часто следуют гарантии лимита памяти и сервисов, построенных на базе rpc.Server.
  6. Тестируемость - максимальное использование библиотек детерминистского тестирования.
Свойства, которые хотелось бы реализовать
  1. RPC level Congestion Control для rpc.Client - если клиент видит, что запрос вероятно не будет обработан сервером за таймаут, запрос должен генерировать ошибку локально.
  2. Лучшее инструментирование rpc.Server - писать события и различную информацию (например версию протокола) в Statshouse.

Как использовать rpc.Server

Инициализация

Обработчик и контекст обработчика

Цепочка обработчиков

Поддержка Long poll

Как использовать rpc.Client

Инициализация

Поддержка Long poll

Documentation

Index

Constants

View Source
const (
	DefaultClientConnReadBufSize  = maxGoAllocSizeClass
	DefaultClientConnWriteBufSize = maxGoAllocSizeClass
)
View Source
const (

	// In February 2026 rpc proxy still does not support proper version negotiation, will not work with version 2.
	// As soon as rpc proxy fixed, you can remove this const and always use LatestProtocolVersion as default
	DefaultProtocolVersion = 1
	LatestProtocolVersion  = 2
)
View Source
const (
	FlagP2PHijack = uint32(0x40000000) // #define RPC_CRYPTO_P2P_HIJACK         0x40000000

	PacketTypeRPCPing = constants.RpcPing
	PacketTypeRPCPong = constants.RpcPong

	DefaultPacketTimeout = 10 * time.Second

	DefaultConnTimeoutAccuracy = 100 * time.Millisecond
)
View Source
const (
	DefaultMaxWorkers             = 1024
	DefaultMaxConns               = 131072 // note, this number of connections will require 10+ GB of memory
	DefaultRequestMemoryLimit     = 256 * 1024 * 1024
	DefaultResponseMemoryLimit    = 2048*1024*1024 - 1
	DefaultServerConnReadBufSize  = maxGoAllocSizeClass
	DefaultServerConnWriteBufSize = maxGoAllocSizeClass
	DefaultServerRequestBufSize   = 4096 // we expect shorter requests on average
	DefaultServerResponseBufSize  = maxGoAllocSizeClass
	DefaultResponseMemEstimate    = 1024 * 1024 // we likely over-account unknown response length before the handler has finished

)
View Source
const MinCryptoKeyLen = 32

Variables

View Source
var (
	ErrClientClosed                 = errors.New("rpc: Client closed")
	ErrClientConnClosedSideEffect   = errors.New("rpc: client connection closed after request sent")
	ErrClientConnClosedNoSideEffect = errors.New("rpc: client connection closed (or connect failed) before request sent")

	ErrClientDropRequest = errors.New("rpc hook: drop request")
)
View Source
var (
	ErrNoHandler = &Error{Code: tlerrorcodes.NoHandler, Description: "rpc: no handler"} // Never wrap this error

	ErrLongpollNoEmptyResponse = &Error{Code: tlerrorcodes.Timeout, Description: "empty longpoll response not implemented by server"}
)

Functions

func ControlSetTCPReuse

func ControlSetTCPReuse(addr bool, port bool) func(_ string, _ string, c syscall.RawConn) error

addr: service can listen on the port again after restart, despite sockets in WAIT_CLOSED state port: 2 or more instances of the service can listen on the same port

func EncryptionToString

func EncryptionToString(schema int) string

func ErrorTag

func ErrorTag(err error) string

func GetExecutionContext

func GetExecutionContext(ctx context.Context) string

Execution context is a copy and may be used anywhere

func GetTracingContext

func GetTracingContext(ctx context.Context) tltracing.TraceContext

Tracing context is a copy and may be used anywhere

func HashSlice

func HashSlice(key []byte) uint64

func HashString

func HashString(key string) uint64

func IsLongpollResponse

func IsLongpollResponse(err error) bool

func KeyIDFromCryptoKey

func KeyIDFromCryptoKey(cryptoKey string) (keyID [4]byte)

first 4 bytes of cryptoKey are identifier. This is not a problem because arbitrary long keys are allowed.

func Listen

func Listen(network, address string, disableTCPReuseAddr bool) (net.Listener, error)

func ListenUDP

func ListenUDP(network, address string) (*net.UDPConn, error)

func ListenUDPWildcard

func ListenUDPWildcard(host string, port uint16) ([]*net.UDPConn, error)

TODO - support UDP listening on 0.0.0.0 in UDP Transport instead

func NoopLogf

func NoopLogf(string, ...any)

NoopLogf is a do-nothing log function

func ParseTrustedSubnetGroupsFromString

func ParseTrustedSubnetGroupsFromString(groups string) (trustedSubnetGroups [][]*net.IPNet, errs []error)

func ParseTrustedSubnets

func ParseTrustedSubnets(groups [][]string) (trustedSubnetGroups [][]*net.IPNet, errs []error)

Function returns all groups that parsed successfully and all errors

func TraceIDToByteSlice

func TraceIDToByteSlice(traceID TraceID, data []byte)

func TraceIDToString

func TraceIDToString(traceID TraceID) string

func TrustedSubnetGroupsString

func TrustedSubnetGroupsString(groups [][]string) string

func UpdateExtraTimeout

func UpdateExtraTimeout(extra *RequestExtra, timeout time.Duration)

We have several hierarchical timeouts, we update from high to lower priority. So if high priority timeout is set (even to 0 (infinite)), subsequent calls do nothing.

func WithExecutionContext

func WithExecutionContext(ctx context.Context, ec string) context.Context

func WithTracingContext

func WithTracingContext(ctx context.Context, tc tltracing.TraceContext) context.Context

Types

type CallbackContext

type CallbackContext struct {
	// contains filtered or unexported fields
}

func (CallbackContext) QueryID

func (cc CallbackContext) QueryID() int64

type Client

type Client interface {
	GetRequest() *Request
	PutResponse(cctx *Response)
	ResetReconnectDelay(address NetAddr)
	Do(ctx context.Context, network string, address string, req *Request) (*Response, error)
	DoCallback(ctx context.Context, network string, address string, req *Request, cb ClientCallback, userData any) (CallbackContext, error)
	Logf(format string, args ...any)
	Close() error

	Multi(n int) *Multi
	DoMulti(
		ctx context.Context,
		addresses []NetAddr,
		prepareRequest func(addr NetAddr, req *Request) error,
		processResponse func(addr NetAddr, resp *Response, err error) error,
	) error
}

func NewClient

func NewClient(options ...ClientOptionsFunc) Client

type ClientCallback

type ClientCallback func(client Client, resp *Response, err error)

Experimental API, can change any moment. For high-performance clients, like ingress proxy

type ClientHook

type ClientHook interface {
	BeforeSend(ctx context.Context, addr NetAddr, req *Request) (context.Context, error)
	AfterSend(resp *Response, err error)
}

type ClientImpl

type ClientImpl struct {
	// contains filtered or unexported fields
}

func UnwrapClient

func UnwrapClient(c Client) (*ClientImpl, bool)

func (*ClientImpl) CancelDoCallback

func (c *ClientImpl) CancelDoCallback(cc CallbackContext) (cancelled bool)

Callback will never be called twice, but you should be ready for callback even after you call Cancel. This is because callback could be in the process of calling. The best idea is to remember queryID (which is unique per ClientImpl) of call in your per call data structure and compare in callback. if cancelled == true, call was cancelled before callback scheduled for calling/called if cancelled == false, call result was already being delivered/delivered

func (*ClientImpl) Close

func (c *ClientImpl) Close() error

ClientImpl is used by other components of your app. So, you must first CloseWait() all other components, then Close() client. ClientImpl will have 0 outstanding requests if everything is done correctly, so can simply close all sackets

func (*ClientImpl) Do

func (c *ClientImpl) Do(ctx context.Context, network string, address string, req *Request) (*Response, error)

func (*ClientImpl) DoCallback

func (c *ClientImpl) DoCallback(ctx context.Context, network string, address string, req *Request, cb ClientCallback, userData any) (CallbackContext, error)

Either error is returned immediately, or ClientCallback will be called in the future. We add explicit userData, because for many users it will avoid allocation of lambda during capture of userData in cb

func (*ClientImpl) DoMulti

func (c *ClientImpl) DoMulti(
	ctx context.Context,
	addresses []NetAddr,
	prepareRequest func(addr NetAddr, req *Request) error,
	processResponse func(addr NetAddr, resp *Response, err error) error,
) error

DoMulti is a convenient way of doing multiple RPCs at once. If you need more control, consider using Multi directly.

func (*ClientImpl) GetRequest

func (c *ClientImpl) GetRequest() *Request

func (*ClientImpl) Logf

func (c *ClientImpl) Logf(format string, args ...any)

func (*ClientImpl) Multi

func (c *ClientImpl) Multi(n int) *Multi

Multi must be followed with a call to Multi.Close to release request state resources

func (*ClientImpl) PutResponse

func (c *ClientImpl) PutResponse(cctx *Response)

func (*ClientImpl) ResetReconnectDelay

func (c *ClientImpl) ResetReconnectDelay(address NetAddr)

ResetReconnectDelay resets timer before the next reconnect attempt. If connect goroutine is already waiting - ResetReconnectDelay forces it to wait again with minimal delay TODO - remove?

type ClientOptions

type ClientOptions struct {
	Logf                LoggerFunc // defaults to log.Printf; set to NoopLogf to disable all logging
	TracingInject       TracingInjectFunc
	TrustedSubnetGroups [][]*net.IPNet
	ForceEncryption     bool
	CryptoKey           string
	ConnReadBufSize     int
	ConnWriteBufSize    int
	PacketTimeout       time.Duration
	ProtocolVersion     uint32        // if >0, will send modified nonce packet. Server must be upgraded to at least ignore higher bits of nonce.Schema
	DefaultTimeout      time.Duration // has no effect if <= 0

	MaxReconnectDelay       time.Duration
	DebugUdp                int
	PrintKeysGenerationInfo bool
	HookMaker               func() ClientHook
	// contains filtered or unexported fields
}

type ClientOptionsFunc

type ClientOptionsFunc func(*ClientOptions)

func ClientWithConnReadBufSize

func ClientWithConnReadBufSize(size int) ClientOptionsFunc

func ClientWithConnWriteBufSize

func ClientWithConnWriteBufSize(size int) ClientOptionsFunc

func ClientWithCryptoKey

func ClientWithCryptoKey(key string) ClientOptionsFunc

func ClientWithDebugBeforeSendHookLatencyHandler

func ClientWithDebugBeforeSendHookLatencyHandler(h LatencyHandler) ClientOptionsFunc

func ClientWithDebugProxyLatencyHandler

func ClientWithDebugProxyLatencyHandler(h LatencyHandler) ClientOptionsFunc

func ClientWithDebugReceiveLatencyHandler

func ClientWithDebugReceiveLatencyHandler(h LatencyHandler) ClientOptionsFunc

func ClientWithDebugSendLatencyHandler

func ClientWithDebugSendLatencyHandler(h LatencyHandler) ClientOptionsFunc

func ClientWithDebugUdp

func ClientWithDebugUdp(DebugUdp int) ClientOptionsFunc

func ClientWithExperimentalLocalUDPAddress

func ClientWithExperimentalLocalUDPAddress(localUDPAddress string) ClientOptionsFunc

Services must not communicate via UDP directly, they must use TCP/Unix connection to local RPC Proxy This option is only for tests and implementing RPC proxies.

func ClientWithForceEncryption

func ClientWithForceEncryption(force bool) ClientOptionsFunc

func ClientWithHooks

func ClientWithHooks(hookMaker func() ClientHook) ClientOptionsFunc

func ClientWithLogf

func ClientWithLogf(f LoggerFunc) ClientOptionsFunc

func ClientWithMaxReconnectDelay

func ClientWithMaxReconnectDelay(delay time.Duration) ClientOptionsFunc

func ClientWithPacketTimeout

func ClientWithPacketTimeout(timeout time.Duration) ClientOptionsFunc

Changing this setting is not recommended and only added for use in testing environments. timeout <= 0 means no timeout. This can lead to connections stuck forever. timeout on the order of ping latency will lead to random disconnects and even inability to communicate at all.

func ClientWithProtocolVersion

func ClientWithProtocolVersion(protocolVersion uint32) ClientOptionsFunc

func ClientWithTracingInject

func ClientWithTracingInject(inject TracingInjectFunc) ClientOptionsFunc

func ClientWithTrustedSubnetGroups

func ClientWithTrustedSubnetGroups(groups [][]string) ClientOptionsFunc

type ClientUnwrapper

type ClientUnwrapper interface {
	Unwrap() Client
}

Client wrappers need to implement both Client and ClientUnwrapper interfaces to allow rpc code that depends on implementation details (ClusterClient) access ClientImpl

type ClusterClient

type ClusterClient struct {
	// contains filtered or unexported fields
}

func NewClusterClient

func NewClusterClient(client Client, opts ...ClusterClientOptionsFunc) *ClusterClient

func (*ClusterClient) DoAll

func (cc *ClusterClient) DoAll(
	ctx context.Context,
	write bool,
	prepareRequest func(addr NetAddr, req *Request) error,
	processResponse func(addr NetAddr, resp *Response, err error) error,
) error

func (*ClusterClient) DoAny

func (cc *ClusterClient) DoAny(ctx context.Context, write bool, req *Request) (*Response, error)

func (*ClusterClient) DoKey

func (cc *ClusterClient) DoKey(ctx context.Context, write bool, req *Request, key uint64) (*Response, error)

func (*ClusterClient) RPCClient

func (cc *ClusterClient) RPCClient() Client

func (*ClusterClient) SelectAll

func (cc *ClusterClient) SelectAll(write bool) []NetAddr

func (*ClusterClient) SelectAny

func (cc *ClusterClient) SelectAny(write bool) NetAddr

SelectAny returns empty address when no backend is found

func (*ClusterClient) SelectKey

func (cc *ClusterClient) SelectKey(write bool, key uint64) NetAddr

SelectKey returns empty address when no backend is found

func (*ClusterClient) UpdateCluster

func (cc *ClusterClient) UpdateCluster(shards []ClusterShard) error

type ClusterClientOptions

type ClusterClientOptions struct {
	BigCluster           bool
	ShardSelectKeyModulo bool
	OnDo                 func(addr NetAddr, req *Request)
}

type ClusterClientOptionsFunc

type ClusterClientOptionsFunc func(*ClusterClientOptions)

func ClusterClientWithBigCluster

func ClusterClientWithBigCluster(v bool) ClusterClientOptionsFunc

func ClusterClientWithOnDo

func ClusterClientWithOnDo(v func(addr NetAddr, req *Request)) ClusterClientOptionsFunc

func ClusterClientWithShardSelectKeyModulo

func ClusterClientWithShardSelectKeyModulo(v bool) ClusterClientOptionsFunc

type ClusterShard

type ClusterShard struct {
	Name       string
	ReadNodes  []NetAddr
	WriteNodes []NetAddr
}

type EncHeader

type EncHeader = tlnetUdpPacket.EncHeader // TODO - move to better place when UDP impl is ready

type ErrHandlerFunc

type ErrHandlerFunc func(err error)

type Error

type Error struct {
	Code        int32
	Description string
}

func NewDefaultError

func NewDefaultError(description string) *Error

func NewError

func NewError(code int32, description string) *Error

func (*Error) Error

func (err *Error) Error() string

func (*Error) IsApplicationLevelError

func (err *Error) IsApplicationLevelError() bool

type ForwardPacketResult

type ForwardPacketResult struct {
	ReadWriteError
	ServerWantsFin bool
	ClientWantsFin bool
	// contains filtered or unexported fields
}

func ForwardPacket

func ForwardPacket(dst, src *PacketConn, copyBodyBuf []byte, opt forwardPacketOptions) (res ForwardPacketResult)

func (ForwardPacketResult) String

func (p ForwardPacketResult) String() string

type ForwardPacketsResult

type ForwardPacketsResult struct {
	PacketHeaderCircularBuffer
	ReadWriteError
	ServerWantsFin bool
	ClientWantsFin bool
}

func ForwardPackets

func ForwardPackets(ctx context.Context, dst, src *PacketConn) ForwardPacketsResult

func (ForwardPacketsResult) String

func (res ForwardPacketsResult) String() string

type HandlerContext

type HandlerContext struct {
	Request []byte

	Response []byte

	RequestExtra  RequestExtra  // every proxy adds bits it needs to client extra, sends it to server, then clears all bits in response so client can interpret all bits
	ResponseExtra ResponseExtra // everything we set here will be sent if client requested it (bit of RequestExtra.flags set)

	// UserData allows caching common state between different requests.
	UserData any
	// contains filtered or unexported fields
}

HandlerContext must not be used outside the handler, except after FinishLongpoll call, when it must be sent immediately

func GetHandlerContext

func GetHandlerContext(ctx context.Context) *HandlerContext

rpc.HandlerContext must never be used outside of the handler

func (*HandlerContext) AccountResponseMem

func (hctx *HandlerContext) AccountResponseMem(respBodySizeEstimate int) error

func (*HandlerContext) ActorID

func (hctx *HandlerContext) ActorID() int64

func (*HandlerContext) BodyFormatTL2

func (hctx *HandlerContext) BodyFormatTL2() bool

func (*HandlerContext) ForwardAndFlush

func (hctx *HandlerContext) ForwardAndFlush(conn *PacketConn, tip uint32, timeout time.Duration) error

func (*HandlerContext) KeyID

func (hctx *HandlerContext) KeyID() [4]byte

func (*HandlerContext) ListenAddr

func (hctx *HandlerContext) ListenAddr() net.Addr

func (*HandlerContext) LocalAddr

func (hctx *HandlerContext) LocalAddr() net.Addr

func (*HandlerContext) LongpollStarted

func (hctx *HandlerContext) LongpollStarted() bool

Used by middleware (tlgen) to avoid serializing response returned by handler.

func (*HandlerContext) ParseInvokeReq

func (hctx *HandlerContext) ParseInvokeReq(opts *ServerOptions) (err error)

This method is temporarily public, do not use directly

func (*HandlerContext) PrepareResponse

func (hctx *HandlerContext) PrepareResponse(err error) (extraStart int)

We serialize extra after body into Body, then write into reversed order so full response is concatenation of hctx.Reponse[extraStart:], then hctx.Reponse[:extraStart]

func (*HandlerContext) ProtocolTransport

func (hctx *HandlerContext) ProtocolTransport() string

func (*HandlerContext) ProtocolVersion

func (hctx *HandlerContext) ProtocolVersion() uint32

func (*HandlerContext) QueryID

func (hctx *HandlerContext) QueryID() int64

func (*HandlerContext) RemoteAddr

func (hctx *HandlerContext) RemoteAddr() net.Addr

func (*HandlerContext) RequestFunctionName

func (hctx *HandlerContext) RequestFunctionName() string

func (*HandlerContext) RequestTag

func (hctx *HandlerContext) RequestTag() uint32

func (*HandlerContext) RequestTime

func (hctx *HandlerContext) RequestTime() time.Time

func (*HandlerContext) ResetTo

func (hctx *HandlerContext) ResetTo(
	commonConn HandlerContextConnection, queryID int64)

for implementing servers, also for tests with server mock ups

func (*HandlerContext) SendLongpollResponse

func (hctx *HandlerContext) SendLongpollResponse(err error)

Be careful, it's responsibility of the caller to synchronize SendLongpollResponse and CancelLongpoll

func (*HandlerContext) SetRequestFunctionName

func (hctx *HandlerContext) SetRequestFunctionName(name string)

func (*HandlerContext) StartLongpoll

func (hctx *HandlerContext) StartLongpoll(canceller LongpollCanceller) (LongpollHandle, error)

StartLongpoll releases Request bytes (and UserData) for reuse, so must be called only after Request processing is complete if this method returns err != nil, connection or server is in shutdown, long poll was not started, handle is empty, and you must not add it to your data structure. You can either return this error from your handler, or you can create empty response and return nil error, then empty response iwll be sent.

After starting longpoll you are responsible for (whatever happens first) 1) forget about hctx if canceller method is called (so you must update your data structures strictly after call StartLongpoll finished) 2) if 1) did not yet happen, can at any moment in the future call FinishLongpoll(), and then SendLongpollResponse()

func (*HandlerContext) StartLongpollWithTimeoutDeprecated

func (hctx *HandlerContext) StartLongpollWithTimeoutDeprecated(
	canceller LongpollCanceller,
	timeout time.Duration,
) (LongpollHandle, error)

This method allows users to set custom timeouts for long polls from a handler. Although this method exists, client code must respect timeouts from rpc.RequestExtra and shouldn't use this method. The method is going to be deleted once Persic migrates to default RPC timeouts instead of the custom timeout in request's tl body.

func (*HandlerContext) TraceIDStr

func (hctx *HandlerContext) TraceIDStr() string

func (*HandlerContext) WithContext

func (hctx *HandlerContext) WithContext(ctx context.Context) context.Context

Sets also Execution Context and Tracing Context

func (*HandlerContext) WriteReponseAndFlush

func (hctx *HandlerContext) WriteReponseAndFlush(conn *PacketConn, err error) error

type HandlerContextConnection

type HandlerContextConnection interface {
	StartLongpoll(hctx *HandlerContext, canceller LongpollCanceller) (LongpollHandle, error)
	CancelLongpoll(queryID int64) (_ LongpollCanceller, deadline int64)
	FinishLongpoll(LongpollHandle) (*HandlerContext, error)
	DebugName() string
	SendResponse(hctx *HandlerContext, err error)
	SendEmptyResponse(lh LongpollHandle) // Prefer only one request instead of a batch here, because it's difficult to aggregate batches to different connections
	AccountResponseMem(hctx *HandlerContext, respBodySizeEstimate int) error
	ListenAddr() net.Addr
	LocalAddr() net.Addr
	RemoteAddr() net.Addr
	KeyID() [4]byte
	ProtocolVersion() uint32
	ProtocolTransportID() byte
	ConnectionID() uintptr
}

commonality between UDP and TCP servers requried for HandlerContext

type HandlerFunc

type HandlerFunc func(ctx context.Context, hctx *HandlerContext) error

func ChainHandler

func ChainHandler(ff ...HandlerFunc) HandlerFunc

type HijackConnection

type HijackConnection struct {
	Magic []byte
	net.Conn
}

func (*HijackConnection) LooksLikeHTTP

func (h *HijackConnection) LooksLikeHTTP() bool

TODO - feel free to improve this function. Magic is [0..12] bytes, which is enough for most protocols.

func (*HijackConnection) Read

func (h *HijackConnection) Read(p []byte) (int, error)

type HijackListener

type HijackListener struct {
	ListenAddr net.Addr
	// contains filtered or unexported fields
}

func NewHijackListener

func NewHijackListener(listenAddr net.Addr) *HijackListener

func (*HijackListener) Accept

func (h *HijackListener) Accept() (net.Conn, error)

func (*HijackListener) AddConnection

func (h *HijackListener) AddConnection(conn net.Conn)

func (*HijackListener) Addr

func (h *HijackListener) Addr() net.Addr

func (*HijackListener) Close

func (h *HijackListener) Close() error

type InvokeReqExtra

type InvokeReqExtra struct {
	RequestExtra // embedded for historic reasons

	// Requests fail immediately when connection fails, so that switch to fallback is faster
	// Here, because generated code calls GetRequest() so caller has no access to request
	FailIfNoConnection bool

	// By settings this to 1 or 2, user selects preferred format to use, if generated code supports both
	PreferTLVersion int

	ResponseExtra ResponseExtra // after call, response extra is available here
}

type LatencyHandler

type LatencyHandler func(duration time.Duration)

type LoggerFunc

type LoggerFunc func(format string, args ...any)

type LongpollCanceller

type LongpollCanceller interface {
	CancelLongpoll(lh LongpollHandle)
	// should cancel longpoll and
	// either write empty (NOP) response, or return ErrLongpollNoEmptyResponse
	WriteEmptyResponse(lh LongpollHandle, resp *HandlerContext) error
}

Motivation - we want zero allocations, so we cannot use lambda

type LongpollHandle

type LongpollHandle struct {
	QueryID    int64
	CommonConn HandlerContextConnection
}

Fields in this struct are public, so the user could create his own unique handles for tests

func (*LongpollHandle) FinishLongpoll

func (lh *LongpollHandle) FinishLongpoll() (*HandlerContext, bool)

When you are ready to reply to longpoll, call this function to abtain hctx. if this function returns (nil, false) you must do nothing. otherwise it returns (!nil, true) and you should write response to hctx.Response and send it with hctx.SendResponse

type Multi

type Multi struct {
	// contains filtered or unexported fields
}

func (*Multi) Client

func (m *Multi) Client() Client

func (*Multi) Close

func (m *Multi) Close()

func (*Multi) Start

func (m *Multi) Start(ctx context.Context, network string, address string, req *Request) error

func (*Multi) Wait

func (m *Multi) Wait(ctx context.Context, queryID int64) (*Response, error)

func (*Multi) WaitAny

func (m *Multi) WaitAny(ctx context.Context) (int64, *Response, error)

type NetAddr

type NetAddr struct {
	Network string
	Address string
}

func (NetAddr) String

func (na NetAddr) String() string

type NetPID

type NetPID = tlnet.Pid

type PacketConn

type PacketConn struct {
	// contains filtered or unexported fields
}

transport stream, encrypted using standard VK rpc scheme

func NewDefaultPacketConn

func NewDefaultPacketConn(c net.Conn) *PacketConn

func NewPacketConn

func NewPacketConn(c net.Conn, readBufSize int, writeBufSize int) *PacketConn

func (*PacketConn) Close

func (pc *PacketConn) Close() error

func (*PacketConn) Encrypted

func (pc *PacketConn) Encrypted() bool

func (*PacketConn) FlagCancelReq

func (pc *PacketConn) FlagCancelReq() bool

Negotiated during handshake, does not change after handshake

func (*PacketConn) Flush

func (pc *PacketConn) Flush() error

func (*PacketConn) FlushUnlocked

func (pc *PacketConn) FlushUnlocked() error

If all writing is performed from the same goroutine, you can call Unlocked version of Write and Flush

func (*PacketConn) HandshakeClient

func (pc *PacketConn) HandshakeClient(cryptoKey string, trustedSubnetGroups [][]*net.IPNet, forceEncryption bool, startTime uint32, flags uint32, packetTimeout time.Duration, protocolVersion uint32) error

func (*PacketConn) HandshakeServer

func (pc *PacketConn) HandshakeServer(cryptoKeys []string, trustedSubnetGroups [][]*net.IPNet, forceEncryption bool, startTime uint32, packetTimeout time.Duration) ([]byte, uint32, error)

func (*PacketConn) HijackConnection

func (pc *PacketConn) HijackConnection() (net.Conn, []byte)

you will most likely never need to call this

func (*PacketConn) KeyID

func (pc *PacketConn) KeyID() [4]byte

func (*PacketConn) LocalAddr

func (pc *PacketConn) LocalAddr() net.Addr

func (*PacketConn) ProtocolVersion

func (pc *PacketConn) ProtocolVersion() uint32

func (*PacketConn) ReadPacket

func (pc *PacketConn) ReadPacket(body []byte, timeout time.Duration) (tip uint32, _ []byte, err error)

ReadPacket will resize/reuse body to size of packet + up to 8 bytes overhead

func (*PacketConn) ReadPacketBodyUnlocked

func (pc *PacketConn) ReadPacketBodyUnlocked(header *packetHeader, body []byte) (_ []byte, err error)

func (*PacketConn) ReadPacketHeaderUnlocked

func (pc *PacketConn) ReadPacketHeaderUnlocked(header *packetHeader, timeout time.Duration) (magicHead []byte, isBuiltin bool, builtinKind string, err error)

func (*PacketConn) ReadPacketUnlocked

func (pc *PacketConn) ReadPacketUnlocked(body []byte, timeout time.Duration) (tip uint32, _ []byte, isBuiltin bool, builtinKind string, err error)

ReadPacketUnlocked for high-efficiency users, which write with *Unlocked functions if isBuiltin is true, caller must call WritePacketBuiltinNoFlushUnlocked(timeout) TODO - builtinKind is only for temporary testing, remove later

func (*PacketConn) RemoteAddr

func (pc *PacketConn) RemoteAddr() net.Addr

func (*PacketConn) SetReadTimeoutUnlocked

func (pc *PacketConn) SetReadTimeoutUnlocked(timeout time.Duration) error

func (*PacketConn) SetWriteTimeoutUnlocked

func (pc *PacketConn) SetWriteTimeoutUnlocked(timeout time.Duration) error

func (*PacketConn) ShutdownWrite

func (pc *PacketConn) ShutdownWrite() error

Motivation - you call ShutdownWrite, and your blocking ReadPacket* will stop after receiving FIN with compatible sockets if you receive error for this method, you should call Close()

func (*PacketConn) TCPSetsockoptInt

func (pc *PacketConn) TCPSetsockoptInt(level int, opt int, value int) error

We want to dynamically set traffic class in Barsic and other projects, hence optimization with pc.tcpconn_fd

func (*PacketConn) WritePacket

func (pc *PacketConn) WritePacket(packetType uint32, body []byte, timeout time.Duration) error

func (*PacketConn) WritePacket2

func (pc *PacketConn) WritePacket2(packetType uint32, body []byte, body2 []byte, timeout time.Duration) error

it is common to write body consisting of 2 parts

func (*PacketConn) WritePacketBodyUnlocked

func (pc *PacketConn) WritePacketBodyUnlocked(body []byte) error

func (*PacketConn) WritePacketBuiltin

func (pc *PacketConn) WritePacketBuiltin(timeout time.Duration) error

func (*PacketConn) WritePacketBuiltinNoFlushUnlocked

func (pc *PacketConn) WritePacketBuiltinNoFlushUnlocked(timeout time.Duration) error

func (*PacketConn) WritePacketHeaderUnlocked

func (pc *PacketConn) WritePacketHeaderUnlocked(packetType uint32, packetBodyLen int, timeout time.Duration) error

how to use: first call WritePacketHeaderUnlocked with sum of all body chunk lengths you are going to write on the next step then call WritePacketBodyUnlocked 0 or more times then call WritePacketTrailerUnlocked

func (*PacketConn) WritePacketNoFlush

func (pc *PacketConn) WritePacketNoFlush(packetType uint32, body []byte, timeout time.Duration) error

func (*PacketConn) WritePacketNoFlushUnlocked

func (pc *PacketConn) WritePacketNoFlushUnlocked(packetType uint32, body []byte, timeout time.Duration) error

If all writing is performed from the same goroutine, you can call Unlocked version of Write and Flush

func (*PacketConn) WritePacketTrailerUnlocked

func (pc *PacketConn) WritePacketTrailerUnlocked()

func (*PacketConn) WriteRawBytes

func (pc *PacketConn) WriteRawBytes(b []byte) (int, error)

you will most likely never need to call this

type PacketHeaderCircularBuffer

type PacketHeaderCircularBuffer struct {
	// contains filtered or unexported fields
}

func (*PacketHeaderCircularBuffer) String

func (b *PacketHeaderCircularBuffer) String() string

type ReadWriteError

type ReadWriteError struct {
	ReadErr  error
	WriteErr error
}

func (ReadWriteError) Error

func (r ReadWriteError) Error() error

type Request

type Request struct {
	Body               []byte
	ActorID            int64
	Extra              RequestExtra
	FailIfNoConnection bool
	BodyFormatTL2      bool // body is in TL2 format

	FunctionName string // Generated calls fill this during request serialization.
	ReadOnly     bool   // no side effects, can be retried by client
	// contains filtered or unexported fields
}

func (*Request) QueryID

func (req *Request) QueryID() int64

QueryID is always non-zero (guaranteed by ClientImpl.GetRequest).

type RequestExtra

type RequestExtra = tl.RpcInvokeReqExtra

type RequestHookFunc

type RequestHookFunc func(hctx *HandlerContext, ctx context.Context) context.Context

type Response

type Response struct {
	Body []byte // rest of body after parsing various extras

	Extra ResponseExtra
	// contains filtered or unexported fields
}

func (*Response) BodyFormatTL2

func (resp *Response) BodyFormatTL2() bool

func (*Response) QueryID

func (resp *Response) QueryID() int64

func (*Response) UserData

func (resp *Response) UserData() any

type ResponseExtra

type ResponseExtra = tl.RpcReqResultExtra

type ResponseHookFunc

type ResponseHookFunc func(hctx *HandlerContext, err error)

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(options ...ServerOptionsFunc) *Server

func (*Server) Close

func (s *Server) Close() error

func (*Server) CloseWait

func (s *Server) CloseWait(ctx context.Context) error

Waits for all connections to be closed. If all clients follow protocol, this happens quickly.

func (*Server) CollectStats

func (s *Server) CollectStats(localAddr net.Addr) map[string]string

func (*Server) ConnectionsCurrent

func (s *Server) ConnectionsCurrent() int64

func (*Server) ConnectionsTCPCurrent

func (s *Server) ConnectionsTCPCurrent() int64

func (*Server) ConnectionsTCPTotal

func (s *Server) ConnectionsTCPTotal() int64

func (*Server) ConnectionsTotal

func (s *Server) ConnectionsTotal() int64

func (*Server) ConnectionsUDPCurrent

func (s *Server) ConnectionsUDPCurrent() int64

func (*Server) ConnectionsUDPTotal

func (s *Server) ConnectionsUDPTotal() int64

func (*Server) GetCryptoKeys

func (s *Server) GetCryptoKeys() []string

func (*Server) ListenAndServe

func (s *Server) ListenAndServe(network string, address string) error

func (*Server) LongPollsWaiting

func (s *Server) LongPollsWaiting() int64

func (*Server) RPS

func (s *Server) RPS() int64

func (*Server) RegisterHandlerFunc

func (s *Server) RegisterHandlerFunc(h HandlerFunc)

some users want to delay registering handler after server is created

func (*Server) RegisterStatsHandlerFunc

func (s *Server) RegisterStatsHandlerFunc(h StatsHandlerFunc)

some users want to delay registering handler after server is created

func (*Server) RegisterVerbosityHandlerFunc

func (s *Server) RegisterVerbosityHandlerFunc(h VerbosityHandlerFunc)

some users want to delay registering handler after server is created

func (*Server) RequestsCurrent

func (s *Server) RequestsCurrent() int64

func (*Server) RequestsMemory

func (s *Server) RequestsMemory() (current int64, total int64)

func (*Server) RequestsTotal

func (s *Server) RequestsTotal() int64

func (*Server) ResponsesMemory

func (s *Server) ResponsesMemory() (current int64, total int64)

func (*Server) Serve

func (s *Server) Serve(ln net.Listener) error

func (*Server) ServeUDP

func (s *Server) ServeUDP(conn *net.UDPConn) error

func (*Server) SetEngineShutdownStatus

func (s *Server) SetEngineShutdownStatus(engineShutdown bool)

Server automatically responds to engine.Pid as a healthcheck status. there is a protocol between barsic, pinger and server, that when barsic commands engine shutdown engine must respond to the engine.Pid with a special error condition.

func (*Server) Shutdown

func (s *Server) Shutdown()

Server stops accepting new clients, sends USER LEVEL FINs, continues to respond to pending requests Can be called as many times as needed. Does not wait.

func (*Server) WorkersPoolSize

func (s *Server) WorkersPoolSize() (current int, total int)

type ServerOptions

type ServerOptions struct {
	Logf                   LoggerFunc // defaults to log.Printf; set to NoopLogf to disable all logging
	SyncHandler            HandlerFunc
	Handler                HandlerFunc
	StatsHandler           StatsHandlerFunc
	RecoverPanics          bool
	VerbosityHandler       VerbosityHandlerFunc
	Version                string
	TransportHijackHandler func(conn *PacketConn) // Experimental, server handles connection to this function if FlagP2PHijack client flag set
	SocketHijackHandler    func(conn *HijackConnection)
	AcceptErrHandler       ErrHandlerFunc
	ConnErrHandler         ErrHandlerFunc
	RequestHook            RequestHookFunc
	ResponseHook           ResponseHookFunc
	TrustedSubnetGroupsSt  string // for stats
	TrustedSubnetGroups    [][]*net.IPNet
	ForceEncryption        bool

	MaxConns                int           // defaults to DefaultMaxConns
	MaxWorkers              int           // defaults to DefaultMaxWorkers; <= value disables worker pool completely
	RequestMemoryLimit      int           // defaults to DefaultRequestMemoryLimit
	ResponseMemoryLimit     int           // defaults to DefaultResponseMemoryLimit
	ConnReadBufSize         int           // defaults to DefaultServerConnReadBufSize
	ConnWriteBufSize        int           // defaults to DefaultServerConnWriteBufSize
	RequestBufSize          int           // defaults to DefaultServerRequestBufSize
	ResponseBufSize         int           // defaults to DefaultServerResponseBufSize
	ResponseMemEstimate     int           // defaults to DefaultResponseMemEstimate; must be greater than ResponseBufSize
	DefaultResponseTimeout  time.Duration // defaults to no timeout (0)
	MinimumLongpollTimeout  time.Duration
	DisableContextTimeout   bool
	DisableTCPReuseAddr     bool
	DebugRPC                bool // prints all incoming and outgoing RPC activity (very slow, only for protocol debug)
	DebugUdpRPC             int  // 0 - nothing; 1 - prints key udp events; 2 - prints all udp activities (<0 equals to 0; >2 equals to 2)
	PrintKeysGenerationInfo bool
	DebugUDPLatency         bool
	// contains filtered or unexported fields
}

func (*ServerOptions) AddCryptoKey

func (opts *ServerOptions) AddCryptoKey(key string)

type ServerOptionsFunc

type ServerOptionsFunc func(*ServerOptions)

func ServerWithAcceptErrorHandler

func ServerWithAcceptErrorHandler(fn ErrHandlerFunc) ServerOptionsFunc

func ServerWithConnErrorHandler

func ServerWithConnErrorHandler(fn ErrHandlerFunc) ServerOptionsFunc

func ServerWithConnReadBufSize

func ServerWithConnReadBufSize(size int) ServerOptionsFunc

func ServerWithConnWriteBufSize

func ServerWithConnWriteBufSize(size int) ServerOptionsFunc

func ServerWithCryptoKeys

func ServerWithCryptoKeys(keys []string) ServerOptionsFunc

func ServerWithDebugRPC

func ServerWithDebugRPC(debugRpc bool) ServerOptionsFunc

func ServerWithDebugUDPLatency

func ServerWithDebugUDPLatency(debugUDPLatency bool) ServerOptionsFunc

func ServerWithDebugUdpRPC

func ServerWithDebugUdpRPC(debugUdpRpc int) ServerOptionsFunc

func ServerWithDefaultResponseTimeout

func ServerWithDefaultResponseTimeout(timeout time.Duration) ServerOptionsFunc

func ServerWithDisableContextTimeout

func ServerWithDisableContextTimeout(status bool) ServerOptionsFunc

func ServerWithDisableTCPReuseAddr

func ServerWithDisableTCPReuseAddr() ServerOptionsFunc

func ServerWithForceEncryption

func ServerWithForceEncryption(status bool) ServerOptionsFunc

func ServerWithHandler

func ServerWithHandler(handler HandlerFunc) ServerOptionsFunc

func ServerWithLogf

func ServerWithLogf(logf LoggerFunc) ServerOptionsFunc

func ServerWithMaxConns

func ServerWithMaxConns(maxConns int) ServerOptionsFunc

func ServerWithMaxWorkers

func ServerWithMaxWorkers(maxWorkers int) ServerOptionsFunc

func ServerWithMinimumLongpollTimeout

func ServerWithMinimumLongpollTimeout(timeout time.Duration) ServerOptionsFunc

func ServerWithPrintKeysGenerationInfo

func ServerWithPrintKeysGenerationInfo(printKeysGenerationInfo bool) ServerOptionsFunc

func ServerWithRecoverPanics

func ServerWithRecoverPanics(recoverPanics bool) ServerOptionsFunc

func ServerWithRequestBufSize

func ServerWithRequestBufSize(size int) ServerOptionsFunc

func ServerWithRequestHook

func ServerWithRequestHook(fn RequestHookFunc) ServerOptionsFunc

called after request was received and extra parsed, but before calling SyncHandler

func ServerWithRequestMemoryLimit

func ServerWithRequestMemoryLimit(limit int) ServerOptionsFunc

func ServerWithResponseBufSize

func ServerWithResponseBufSize(size int) ServerOptionsFunc

func ServerWithResponseHook

func ServerWithResponseHook(fn ResponseHookFunc) ServerOptionsFunc

called after response was generated by SyncHandler or Handler but before sending. Also called when client cancels or finishes long poll, to balance # of calls to RequestHookFunc and ResponseHookFunc

func ServerWithResponseMemEstimate

func ServerWithResponseMemEstimate(size int) ServerOptionsFunc

func ServerWithResponseMemoryLimit

func ServerWithResponseMemoryLimit(limit int) ServerOptionsFunc

func ServerWithSocketHijackHandler

func ServerWithSocketHijackHandler(handler func(conn *HijackConnection)) ServerOptionsFunc

All connections not classified as PacketConn are passed here. You can then insert them into HijackListener If you have more than 1 protocol in your app, You can examine HijackConnection.Magic in your handler to classify connection

func ServerWithStatsHandler

func ServerWithStatsHandler(handler StatsHandlerFunc) ServerOptionsFunc

func ServerWithSyncHandler

func ServerWithSyncHandler(syncHandler HandlerFunc) ServerOptionsFunc

syncHandler is called directly from receive loop and must not wait anything if syncHandler returns ErrNoHandler, normal handler will be called from worker Only syncHandler can hujack longpoll responses for later processing You must not use Request or UserData after return from sync hanlder, they are reused by other calls

func ServerWithTransportHijackHandler

func ServerWithTransportHijackHandler(handler func(conn *PacketConn)) ServerOptionsFunc

func ServerWithTrustedSubnetGroups

func ServerWithTrustedSubnetGroups(groups [][]string) ServerOptionsFunc

func ServerWithUDPReadScheduleLatencyMetric

func ServerWithUDPReadScheduleLatencyMetric(readScheduleLatencyMetric func(millis float64)) ServerOptionsFunc

func ServerWithUDPSocketReadLatencyMetric

func ServerWithUDPSocketReadLatencyMetric(socketReadLatencyMetric func(millis float64)) ServerOptionsFunc

func ServerWithUDPSocketWriteLatencyMetric

func ServerWithUDPSocketWriteLatencyMetric(socketWriteLatencyMetric func(millis float64)) ServerOptionsFunc

func ServerWithUDPWriteScheduleLatencyMetric

func ServerWithUDPWriteScheduleLatencyMetric(writeScheduleLatencyMetric func(millis float64)) ServerOptionsFunc

func ServerWithVerbosityHandler

func ServerWithVerbosityHandler(handler VerbosityHandlerFunc) ServerOptionsFunc

func ServerWithVersion

func ServerWithVersion(version string) ServerOptionsFunc

type StatsHandlerFunc

type StatsHandlerFunc func(map[string]string)

type TargetError

type TargetError struct {
	Address NetAddr
	Err     error
}

func (TargetError) Error

func (err TargetError) Error() string

func (TargetError) Unwrap

func (err TargetError) Unwrap() error

type TraceContext

type TraceContext = tltracing.TraceContext

type TraceID

type TraceID = tltracing.TraceID

func TraceIDFromByteSlice

func TraceIDFromByteSlice(data []byte) TraceID

func TraceIDFromString

func TraceIDFromString(str string) (TraceID, error)

type TraceStatus

type TraceStatus = int
var (
	TraceStatusDrop   TraceStatus = 0
	TraceStatusRecord TraceStatus = 1
	TraceStatusDefer  TraceStatus = 2
)

func GetTraceStatus

func GetTraceStatus(tc *TraceContext) TraceStatus

type TracingInjectFunc

type TracingInjectFunc func(ctx context.Context, tc *TraceContext)

type UdpServerConn

type UdpServerConn struct {
	// contains filtered or unexported fields
}

func (*UdpServerConn) AccountResponseMem

func (sc *UdpServerConn) AccountResponseMem(hctx *HandlerContext, respBodySizeEstimate int) (err error)

func (*UdpServerConn) CancelLongpoll

func (sc *UdpServerConn) CancelLongpoll(queryID int64) (LongpollCanceller, int64)

func (*UdpServerConn) ConnectionID

func (sc *UdpServerConn) ConnectionID() uintptr

func (*UdpServerConn) DebugName

func (sc *UdpServerConn) DebugName() string

func (*UdpServerConn) FinishLongpoll

func (sc *UdpServerConn) FinishLongpoll(lh LongpollHandle) (*HandlerContext, error)

func (*UdpServerConn) KeyID

func (sc *UdpServerConn) KeyID() [4]byte

func (*UdpServerConn) ListenAddr

func (sc *UdpServerConn) ListenAddr() net.Addr

func (*UdpServerConn) LocalAddr

func (sc *UdpServerConn) LocalAddr() net.Addr

func (*UdpServerConn) ProtocolTransportID

func (sc *UdpServerConn) ProtocolTransportID() byte

func (*UdpServerConn) ProtocolVersion

func (sc *UdpServerConn) ProtocolVersion() uint32

func (*UdpServerConn) RemoteAddr

func (sc *UdpServerConn) RemoteAddr() net.Addr

func (*UdpServerConn) SendEmptyResponse

func (sc *UdpServerConn) SendEmptyResponse(lh LongpollHandle)

func (*UdpServerConn) SendResponse

func (sc *UdpServerConn) SendResponse(hctx *HandlerContext, err error)

func (*UdpServerConn) StartLongpoll

func (sc *UdpServerConn) StartLongpoll(hctx *HandlerContext, canceller LongpollCanceller) (LongpollHandle, error)

type UnencHeader

type UnencHeader = tlnetUdpPacket.UnencHeader // TODO - move to better place when UDP impl is ready

type VerbosityHandlerFunc

type VerbosityHandlerFunc func(int) error

Directories

Path Synopsis
internal
gen/constants
Code generated by tl2gen; DO NOT EDIT.
Code generated by tl2gen; DO NOT EDIT.
gen/factory
Code generated by tl2gen; DO NOT EDIT.
Code generated by tl2gen; DO NOT EDIT.
gen/internal
Code generated by tl2gen; DO NOT EDIT.
Code generated by tl2gen; DO NOT EDIT.
gen/internal/metainternal
Code generated by tl2gen; DO NOT EDIT.
Code generated by tl2gen; DO NOT EDIT.
gen/meta
Code generated by tl2gen; DO NOT EDIT.
Code generated by tl2gen; DO NOT EDIT.
gen/tl
Code generated by tl2gen; DO NOT EDIT.
Code generated by tl2gen; DO NOT EDIT.
gen/tlengine
Code generated by tl2gen; DO NOT EDIT.
Code generated by tl2gen; DO NOT EDIT.
gen/tlexactlyOnce
Code generated by tl2gen; DO NOT EDIT.
Code generated by tl2gen; DO NOT EDIT.
gen/tlgo
Code generated by tl2gen; DO NOT EDIT.
Code generated by tl2gen; DO NOT EDIT.
gen/tlnet
Code generated by tl2gen; DO NOT EDIT.
Code generated by tl2gen; DO NOT EDIT.
gen/tlnetUdpPacket
Code generated by tl2gen; DO NOT EDIT.
Code generated by tl2gen; DO NOT EDIT.
gen/tltracing
Code generated by tl2gen; DO NOT EDIT.
Code generated by tl2gen; DO NOT EDIT.
udp

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL