Versions in this module Expand all Collapse all v1 v1.0.1 Mar 12, 2026 v1.0.0 Mar 11, 2026 Changes in this version + var ErrCompressionFailed = errors.New("compression failed") + var ErrCompressionNotSupported = errors.New("compression algorithm not supported") + var ErrConnectionClosing = errors.New("connection is closing") + var ErrCryptoAlgorithmNotSupported = errors.New("crypto algorithm not supported") + var ErrCryptoDecryptFailed = errors.New("crypto decrypt failed") + var ErrCryptoEncryptFailed = errors.New("crypto encrypt failed") + var ErrCryptoHandshakeFailed = errors.New("crypto handshake failed") + var ErrCryptoInvalidIVSize = errors.New("invalid crypto iv/nonce size") + var ErrCryptoInvalidKeySize = errors.New("invalid crypto key size") + var ErrCryptoKDFFailed = errors.New("crypto kdf failed") + var ErrCryptoKeyExchangeFailed = errors.New("crypto key exchange failed") + var ErrCryptoNotInitialized = errors.New("crypto not initialized") + var ErrDecompressionFailed = errors.New("decompression failed") + var ErrInvalidData = errors.New("invalid data") + var ErrPackFailed = errors.New("pack failed") + var ErrUnpackFailed = errors.New("unpack failed") + func NegotiateCompression(local, remote []protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE) protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE + func NegotiateCryptoAlgorithm(local, remote []protocol.ATBUS_CRYPTO_ALGORITHM_TYPE) protocol.ATBUS_CRYPTO_ALGORITHM_TYPE + func NegotiateKDF(local, remote []protocol.ATBUS_CRYPTO_KDF_TYPE) protocol.ATBUS_CRYPTO_KDF_TYPE + func NegotiateKeyExchange(local, remote protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE) protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE + type CompressionSession struct + Algorithm protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE + func NewCompressionSession() *CompressionSession + func (cs *CompressionSession) Compress(data []byte) ([]byte, error) + func (cs *CompressionSession) Decompress(data []byte, originalSize int) ([]byte, error) + func (cs *CompressionSession) GetAlgorithm() protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE + func (cs *CompressionSession) SetAlgorithm(algorithm protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE) error + type Connection struct + func CreateConnection(owner *Node, addr string) *Connection + func (c *Connection) AddStatFault() uint64 + func (c *Connection) CheckFlag(flag types.ConnectionFlag) bool + func (c *Connection) ClearStatFault() + func (c *Connection) Connect() types.ErrorType + func (c *Connection) Disconnect() types.ErrorType + func (c *Connection) GetAddress() types.ChannelAddress + func (c *Connection) GetBinding() types.Endpoint + func (c *Connection) GetConnectionContext() types.ConnectionContext + func (c *Connection) GetIoStreamConnection() *io_stream.IoStreamConnection + func (c *Connection) GetStatistic() types.ConnectionStatistic + func (c *Connection) GetStatus() types.ConnectionState + func (c *Connection) IsConnected() bool + func (c *Connection) IsRunning() bool + func (c *Connection) Listen() types.ErrorType + func (c *Connection) Proc() types.ErrorType + func (c *Connection) Push(data []byte) types.ErrorType + func (c *Connection) RemoveOwnerChecker() + func (c *Connection) Reset() + func (c *Connection) SetIoStreamConnection(conn *io_stream.IoStreamConnection) + func (c *Connection) SetTemporary() + type ConnectionContext struct + func NewConnectionContext(keyExchangeType protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE) *ConnectionContext + func (cc *ConnectionContext) CreateHandshakeData() (*CryptoHandshakeData, error) + func (cc *ConnectionContext) GetCompressSelectAlgorithm() protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE + func (cc *ConnectionContext) GetCompression() *CompressionSession + func (cc *ConnectionContext) GetCryptoKeyExchangeAlgorithm() protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE + func (cc *ConnectionContext) GetCryptoSelectAlgorithm() protocol.ATBUS_CRYPTO_ALGORITHM_TYPE + func (cc *ConnectionContext) GetCryptoSelectKdfType() protocol.ATBUS_CRYPTO_KDF_TYPE + func (cc *ConnectionContext) GetHandshakeStartTime() time.Time + func (cc *ConnectionContext) GetNextSequence() uint64 + func (cc *ConnectionContext) GetReadCrypto() *CryptoSession + func (cc *ConnectionContext) GetSupportedCompressionAlgorithms() []protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE + func (cc *ConnectionContext) GetSupportedCryptoAlgorithms() []protocol.ATBUS_CRYPTO_ALGORITHM_TYPE + func (cc *ConnectionContext) GetSupportedKeyExchange() protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE + func (cc *ConnectionContext) GetWriteCrypto() *CryptoSession + func (cc *ConnectionContext) HandshakeGenerateSelfKey(peerSequenceId uint64) error_code.ErrorType + func (cc *ConnectionContext) HandshakeReadPeerKey(peerPubKey *protocol.CryptoHandshakeData, ...) error_code.ErrorType + func (cc *ConnectionContext) HandshakeWriteSelfPublicKey(selfPubKey *protocol.CryptoHandshakeData, ...) error_code.ErrorType + func (cc *ConnectionContext) IsClosing() bool + func (cc *ConnectionContext) IsCompressionAlgorithmSupported(algorithm protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE) bool + func (cc *ConnectionContext) IsHandshakeDone() bool + func (cc *ConnectionContext) NegotiateCompressionWithPeer(peerAlgorithms []protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE) error + func (cc *ConnectionContext) PackMessage(m *types.Message, protocolVersion int32, maxBodySize int) (*buffer.StaticBufferBlock, error_code.ErrorType) + func (cc *ConnectionContext) ProcessHandshakeData(peerData *CryptoHandshakeData) error + func (cc *ConnectionContext) SetClosing(closing bool) + func (cc *ConnectionContext) SetSupportedCompressionAlgorithms(algorithms []protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE) + func (cc *ConnectionContext) SetSupportedCryptoAlgorithms(algorithms []protocol.ATBUS_CRYPTO_ALGORITHM_TYPE) + func (cc *ConnectionContext) SetSupportedKeyExchange(keyExchange protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE) + func (cc *ConnectionContext) SetupCryptoWithKey(algorithm protocol.ATBUS_CRYPTO_ALGORITHM_TYPE, key []byte, iv []byte) error_code.ErrorType + func (cc *ConnectionContext) UnpackMessage(m *types.Message, input []byte, maxBodySize int) error_code.ErrorType + func (cc *ConnectionContext) UpdateCompressionAlgorithm(algorithms []protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE) error_code.ErrorType + type CryptoHandshakeData struct + Algorithms []protocol.ATBUS_CRYPTO_ALGORITHM_TYPE + IVSize uint32 + KDFTypes []protocol.ATBUS_CRYPTO_KDF_TYPE + KeyExchange protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE + PublicKey []byte + Sequence uint64 + TagSize uint32 + type CryptoSession struct + Algorithm protocol.ATBUS_CRYPTO_ALGORITHM_TYPE + IV []byte + IVSize uint32 + KDFType protocol.ATBUS_CRYPTO_KDF_TYPE + Key []byte + KeyExchange protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE + TagSize uint32 + func NewCryptoSession() *CryptoSession + func (cs *CryptoSession) ComputeSharedSecret(peerPublicKeyBytes []byte) ([]byte, error) + func (cs *CryptoSession) Decrypt(ciphertext []byte) ([]byte, error) + func (cs *CryptoSession) DecryptWithIV(ciphertext []byte, iv []byte) ([]byte, error) + func (cs *CryptoSession) DecryptWithIVAndAAD(ciphertext []byte, iv []byte, aad []byte) ([]byte, error) + func (cs *CryptoSession) DeriveKey(sharedSecret []byte, algorithm protocol.ATBUS_CRYPTO_ALGORITHM_TYPE, ...) error + func (cs *CryptoSession) Encrypt(plaintext []byte) ([]byte, error) + func (cs *CryptoSession) EncryptWithIV(plaintext []byte, iv []byte) ([]byte, error) + func (cs *CryptoSession) EncryptWithIVAndAAD(plaintext []byte, iv []byte, aad []byte) ([]byte, error) + func (cs *CryptoSession) GenerateKeyPair(keyExchange protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE) error + func (cs *CryptoSession) GetPublicKey() []byte + func (cs *CryptoSession) IsInitialized() bool + func (cs *CryptoSession) SetKey(key, iv []byte, algorithm protocol.ATBUS_CRYPTO_ALGORITHM_TYPE) error + type Endpoint struct + func CreateEndpoint(owner *Node, id types.BusIdType, pid int64, hostname string) *Endpoint + func (e *Endpoint) AddConnection(conn types.Connection, forceData bool) bool + func (e *Endpoint) AddListenAddress(addr string) + func (e *Endpoint) AddPingTimer() + func (e *Endpoint) AddStatisticFault() uint64 + func (e *Endpoint) ClearListenAddress() + func (e *Endpoint) ClearPingTimer() + func (e *Endpoint) ClearStatisticFault() + func (e *Endpoint) GetCtrlConnection(peer types.Endpoint) types.Connection + func (e *Endpoint) GetDataConnection(peer types.Endpoint, enableFallbackCtrl bool) types.Connection + func (e *Endpoint) GetDataConnectionCount(enableFallbackCtrl bool) int + func (e *Endpoint) GetFlag(f types.EndpointFlag) bool + func (e *Endpoint) GetFlags() uint32 + func (e *Endpoint) GetHashCode() string + func (e *Endpoint) GetHostname() string + func (e *Endpoint) GetId() types.BusIdType + func (e *Endpoint) GetListenAddress() []types.ChannelAddress + func (e *Endpoint) GetOwner() types.Node + func (e *Endpoint) GetPid() int32 + func (e *Endpoint) GetStatisticCreatedTime() time.Time + func (e *Endpoint) GetStatisticLastPong() time.Time + func (e *Endpoint) GetStatisticPingDelay() time.Duration + func (e *Endpoint) GetStatisticPullSize() uint64 + func (e *Endpoint) GetStatisticPullTimes() uint64 + func (e *Endpoint) GetStatisticPushFailedSize() uint64 + func (e *Endpoint) GetStatisticPushFailedTimes() uint64 + func (e *Endpoint) GetStatisticPushStartSize() uint64 + func (e *Endpoint) GetStatisticPushStartTimes() uint64 + func (e *Endpoint) GetStatisticPushSuccessSize() uint64 + func (e *Endpoint) GetStatisticPushSuccessTimes() uint64 + func (e *Endpoint) GetStatisticUnfinishedPing() uint64 + func (e *Endpoint) IsAvailable() bool + func (e *Endpoint) IsSchemeSupported(scheme string) bool + func (e *Endpoint) RemoveConnection(conn types.Connection) bool + func (e *Endpoint) Reset() + func (e *Endpoint) SetFlag(f types.EndpointFlag, v bool) + func (e *Endpoint) SetStatisticPingDelay(pd time.Duration, pongTimepoint time.Time) + func (e *Endpoint) SetStatisticUnfinishedPing(p uint64) + func (e *Endpoint) UpdateHashCode(code string) + func (e *Endpoint) UpdateSupportSchemes(schemes []string) + type Node struct + func (n *Node) AddConnectionGcList(conn types.Connection) + func (n *Node) AddEndpoint(ep types.Endpoint) error_code.ErrorType + func (n *Node) AddEndpointGcList(ep types.Endpoint) + func (n *Node) AddStatisticDispatchTimes() + func (n *Node) AllocateMessageSequence() uint64 + func (n *Node) CheckAccessHash(accessKey *protocol.AccessData, plainText string, conn types.Connection) bool + func (n *Node) CheckFlag(f types.NodeFlag) bool + func (n *Node) Connect(address string) error_code.ErrorType + func (n *Node) ConnectWithEndpoint(address string, ep types.Endpoint) error_code.ErrorType + func (n *Node) CreateEndpoint(tid types.BusIdType, hostName string, pid int) types.Endpoint + func (n *Node) DisableDebugMessageVerbose() + func (n *Node) Disconnect(id types.BusIdType) error_code.ErrorType + func (n *Node) DispatchAllSelfMessages() int32 + func (n *Node) EnableDebugMessageVerbose() + func (n *Node) FatalShutdown(ep types.Endpoint, conn types.Connection, code error_code.ErrorType, err error) error_code.ErrorType + func (n *Node) GetAccessCode() string + func (n *Node) GetConfigure() *types.NodeConfigure + func (n *Node) GetContext() context.Context + func (n *Node) GetCryptoKeyExchangeType() protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE + func (n *Node) GetEndpoint(tid types.BusIdType) types.Endpoint + func (n *Node) GetEventHandleOnAddEndpoint() types.NodeOnEndpointEventFunc + func (n *Node) GetEventHandleOnAvailable() types.NodeOnNodeUpFunc + func (n *Node) GetEventHandleOnCloseConnection() types.NodeOnCloseConnectionFunc + func (n *Node) GetEventHandleOnCustomCommandRequest() types.NodeOnCustomCommandRequestFunc + func (n *Node) GetEventHandleOnCustomCommandResponse() types.NodeOnCustomCommandResponseFunc + func (n *Node) GetEventHandleOnForwardRequest() types.NodeOnForwardRequestFunc + func (n *Node) GetEventHandleOnForwardResponse() types.NodeOnForwardResponseFunc + func (n *Node) GetEventHandleOnInvalidConnection() types.NodeOnInvalidConnectionFunc + func (n *Node) GetEventHandleOnNewConnection() types.NodeOnNewConnectionFunc + func (n *Node) GetEventHandleOnPingEndpoint() types.NodeOnPingPongEndpointFunc + func (n *Node) GetEventHandleOnPongEndpoint() types.NodeOnPingPongEndpointFunc + func (n *Node) GetEventHandleOnRegister() types.NodeOnRegisterFunc + func (n *Node) GetEventHandleOnRemoveEndpoint() types.NodeOnEndpointEventFunc + func (n *Node) GetEventHandleOnShutdown() types.NodeOnNodeDownFunc + func (n *Node) GetEventHandleOnTopologyUpdateUpstream() types.NodeOnTopologyUpdateUpstreamFunc + func (n *Node) GetHostname() string + func (n *Node) GetId() types.BusIdType + func (n *Node) GetImmediateEndpointSet() types.EndpointCollectionType + func (n *Node) GetIoStreamChannel() types.IoStreamChannel + func (n *Node) GetIoStreamConfigure() *types.IoStreamConfigure + func (n *Node) GetListenList() []types.ChannelAddress + func (n *Node) GetLogger() *utils_log.Logger + func (n *Node) GetPeerChannel(tid types.BusIdType, ...) (error_code.ErrorType, types.Endpoint, types.Connection, types.TopologyPeer) + func (n *Node) GetPid() int + func (n *Node) GetProtocolMinimalVersion() int32 + func (n *Node) GetProtocolVersion() int32 + func (n *Node) GetSelfEndpoint() types.Endpoint + func (n *Node) GetSelfEndpointInstance() types.Endpoint + func (n *Node) GetState() types.NodeState + func (n *Node) GetTimerTick() time.Time + func (n *Node) GetTopologyRegistry() types.TopologyRegistry + func (n *Node) GetTopologyRelation(tid types.BusIdType) (types.TopologyRelationType, types.TopologyPeer) + func (n *Node) GetUpstreamEndpoint() types.Endpoint + func (n *Node) GetUpstreamEndpointInstance() types.Endpoint + func (n *Node) Init(id types.BusIdType, conf *types.NodeConfigure) error_code.ErrorType + func (n *Node) IsDebugMessageVerboseEnabled() bool + func (n *Node) IsEndpointAvailable(tid types.BusIdType) bool + func (n *Node) Listen(address string) error_code.ErrorType + func (n *Node) LogDebug(ep types.Endpoint, conn types.Connection, m *types.Message, msg string, ...) + func (n *Node) LogError(ep types.Endpoint, conn types.Connection, status int, ...) + func (n *Node) LogInfo(ep types.Endpoint, conn types.Connection, msg string, args ...any) + func (n *Node) OnActived() + func (n *Node) OnCustomCommandRequest(ep types.Endpoint, conn types.Connection, from types.BusIdType, argv [][]byte) (error_code.ErrorType, [][]byte) + func (n *Node) OnCustomCommandResponse(ep types.Endpoint, conn types.Connection, from types.BusIdType, argv [][]byte, ...) error_code.ErrorType + func (n *Node) OnDisconnect(ep types.Endpoint, conn types.Connection) error_code.ErrorType + func (n *Node) OnNewConnection(conn types.Connection) error_code.ErrorType + func (n *Node) OnPing(ep types.Endpoint, message *types.Message, body *protocol.PingData) error_code.ErrorType + func (n *Node) OnPong(ep types.Endpoint, message *types.Message, body *protocol.PingData) error_code.ErrorType + func (n *Node) OnReceiveData(ep types.Endpoint, conn types.Connection, message *types.Message, data []byte) + func (n *Node) OnReceiveForwardResponse(ep types.Endpoint, conn types.Connection, message *types.Message) + func (n *Node) OnRegister(ep types.Endpoint, conn types.Connection, code error_code.ErrorType) + func (n *Node) OnShutdown(code error_code.ErrorType) error_code.ErrorType + func (n *Node) OnUpstreamRegisterDone() + func (n *Node) Poll() error_code.ErrorType + func (n *Node) Proc(now time.Time) error_code.ErrorType + func (n *Node) ReloadCompression(compressionAllowAlgorithms []protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE, ...) error_code.ErrorType + func (n *Node) ReloadCrypto(cryptoKeyExchangeType protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE, ...) error_code.ErrorType + func (n *Node) RemoveEndpoint(ep types.Endpoint) error_code.ErrorType + func (n *Node) Reset() error_code.ErrorType + func (n *Node) SendCtrlMessage(tid types.BusIdType, message *types.Message, ...) (error_code.ErrorType, types.Endpoint, types.Connection) + func (n *Node) SendCustomCommand(tid types.BusIdType, args [][]byte) error_code.ErrorType + func (n *Node) SendCustomCommandWithOptions(tid types.BusIdType, args [][]byte, options *types.NodeSendDataOptions) error_code.ErrorType + func (n *Node) SendData(tid types.BusIdType, t int32, data []byte) error_code.ErrorType + func (n *Node) SendDataMessage(tid types.BusIdType, message *types.Message, ...) (error_code.ErrorType, types.Endpoint, types.Connection) + func (n *Node) SendDataWithOptions(tid types.BusIdType, t int32, data []byte, options *types.NodeSendDataOptions) error_code.ErrorType + func (n *Node) SetEventHandleOnAddEndpoint(handle types.NodeOnEndpointEventFunc) + func (n *Node) SetEventHandleOnAvailable(handle types.NodeOnNodeUpFunc) + func (n *Node) SetEventHandleOnCloseConnection(handle types.NodeOnCloseConnectionFunc) + func (n *Node) SetEventHandleOnCustomCommandRequest(handle types.NodeOnCustomCommandRequestFunc) + func (n *Node) SetEventHandleOnCustomCommandResponse(handle types.NodeOnCustomCommandResponseFunc) + func (n *Node) SetEventHandleOnForwardRequest(handle types.NodeOnForwardRequestFunc) + func (n *Node) SetEventHandleOnForwardResponse(handle types.NodeOnForwardResponseFunc) + func (n *Node) SetEventHandleOnInvalidConnection(handle types.NodeOnInvalidConnectionFunc) + func (n *Node) SetEventHandleOnNewConnection(handle types.NodeOnNewConnectionFunc) + func (n *Node) SetEventHandleOnPingEndpoint(handle types.NodeOnPingPongEndpointFunc) + func (n *Node) SetEventHandleOnPongEndpoint(handle types.NodeOnPingPongEndpointFunc) + func (n *Node) SetEventHandleOnRegister(handle types.NodeOnRegisterFunc) + func (n *Node) SetEventHandleOnRemoveEndpoint(handle types.NodeOnEndpointEventFunc) + func (n *Node) SetEventHandleOnShutdown(handle types.NodeOnNodeDownFunc) + func (n *Node) SetEventHandleOnTopologyUpdateUpstream(handle types.NodeOnTopologyUpdateUpstreamFunc) + func (n *Node) SetHostname(hostname string, force bool) bool + func (n *Node) SetLogger(logger *utils_log.Logger) + func (n *Node) SetTopologyUpstream(tid types.BusIdType) + func (n *Node) Shutdown(reason error_code.ErrorType) error_code.ErrorType + func (n *Node) Start() error_code.ErrorType + func (n *Node) StartWithConfigure(conf *types.StartConfigure) error_code.ErrorType + type TopologyPeer struct + func CreateTopologyPeer(busId types.BusIdType) *TopologyPeer + func (p *TopologyPeer) ContainsDownstream(busId types.BusIdType) bool + func (p *TopologyPeer) ForeachDownstream(fn func(peer types.TopologyPeer) bool) bool + func (p *TopologyPeer) GetBusId() types.BusIdType + func (p *TopologyPeer) GetTopologyData() *types.TopologyData + func (p *TopologyPeer) GetUpstream() types.TopologyPeer + type TopologyRegistry struct + func CreateTopologyRegistry() *TopologyRegistry + func (r *TopologyRegistry) CheckPolicy(rule *types.TopologyPolicyRule, fromData *types.TopologyData, ...) bool + func (r *TopologyRegistry) ForeachPeer(fn func(peer types.TopologyPeer) bool) bool + func (r *TopologyRegistry) GetPeer(busId types.BusIdType) types.TopologyPeer + func (r *TopologyRegistry) GetPeerInstance(busId types.BusIdType) *TopologyPeer + func (r *TopologyRegistry) GetRelation(from types.BusIdType, to types.BusIdType) (types.TopologyRelationType, types.TopologyPeer) + func (r *TopologyRegistry) RemovePeer(targetBusId types.BusIdType) + func (r *TopologyRegistry) UpdatePeer(targetBusId types.BusIdType, upstreamBusId types.BusIdType, ...) bool