Documentation
¶
Overview ¶
Package libatbus_impl provides internal implementation details for libatbus. This file implements the connection context with encryption/decryption algorithm negotiation, compression algorithm negotiation, encryption/decryption flow, and pack/unpack flow.
Index ¶
- Variables
- func NegotiateCompression(local, remote []protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE) protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE
- func NegotiateCryptoAlgorithm(local, remote []protocol.ATBUS_CRYPTO_ALGORITHM_TYPE) protocol.ATBUS_CRYPTO_ALGORITHM_TYPE
- func NegotiateKDF(local, remote []protocol.ATBUS_CRYPTO_KDF_TYPE) protocol.ATBUS_CRYPTO_KDF_TYPE
- func NegotiateKeyExchange(local, remote protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE) protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE
- type CompressionSession
- func (cs *CompressionSession) Compress(data []byte) ([]byte, error)
- func (cs *CompressionSession) Decompress(data []byte, originalSize int) ([]byte, error)
- func (cs *CompressionSession) GetAlgorithm() protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE
- func (cs *CompressionSession) SetAlgorithm(algorithm protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE) error
- type Connection
- func (c *Connection) AddStatFault() uint64
- func (c *Connection) CheckFlag(flag types.ConnectionFlag) bool
- func (c *Connection) ClearStatFault()
- func (c *Connection) Connect() types.ErrorType
- func (c *Connection) Disconnect() types.ErrorType
- func (c *Connection) GetAddress() types.ChannelAddress
- func (c *Connection) GetBinding() types.Endpoint
- func (c *Connection) GetConnectionContext() types.ConnectionContext
- func (c *Connection) GetIoStreamConnection() *io_stream.IoStreamConnection
- func (c *Connection) GetStatistic() types.ConnectionStatistic
- func (c *Connection) GetStatus() types.ConnectionState
- func (c *Connection) IsConnected() bool
- func (c *Connection) IsRunning() bool
- func (c *Connection) Listen() types.ErrorType
- func (c *Connection) Proc() types.ErrorType
- func (c *Connection) Push(data []byte) types.ErrorType
- func (c *Connection) RemoveOwnerChecker()
- func (c *Connection) Reset()
- func (c *Connection) SetIoStreamConnection(conn *io_stream.IoStreamConnection)
- func (c *Connection) SetTemporary()
- type ConnectionContext
- func (cc *ConnectionContext) CreateHandshakeData() (*CryptoHandshakeData, error)
- func (cc *ConnectionContext) GetCompressSelectAlgorithm() protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE
- func (cc *ConnectionContext) GetCompression() *CompressionSession
- func (cc *ConnectionContext) GetCryptoKeyExchangeAlgorithm() protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE
- func (cc *ConnectionContext) GetCryptoSelectAlgorithm() protocol.ATBUS_CRYPTO_ALGORITHM_TYPE
- func (cc *ConnectionContext) GetCryptoSelectKdfType() protocol.ATBUS_CRYPTO_KDF_TYPE
- func (cc *ConnectionContext) GetHandshakeStartTime() time.Time
- func (cc *ConnectionContext) GetNextSequence() uint64
- func (cc *ConnectionContext) GetReadCrypto() *CryptoSession
- func (cc *ConnectionContext) GetSupportedCompressionAlgorithms() []protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE
- func (cc *ConnectionContext) GetSupportedCryptoAlgorithms() []protocol.ATBUS_CRYPTO_ALGORITHM_TYPE
- func (cc *ConnectionContext) GetSupportedKeyExchange() protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE
- func (cc *ConnectionContext) GetWriteCrypto() *CryptoSession
- func (cc *ConnectionContext) HandshakeGenerateSelfKey(peerSequenceId uint64) error_code.ErrorType
- func (cc *ConnectionContext) HandshakeReadPeerKey(peerPubKey *protocol.CryptoHandshakeData, ...) error_code.ErrorType
- func (cc *ConnectionContext) HandshakeWriteSelfPublicKey(selfPubKey *protocol.CryptoHandshakeData, ...) error_code.ErrorType
- func (cc *ConnectionContext) IsClosing() bool
- func (cc *ConnectionContext) IsCompressionAlgorithmSupported(algorithm protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE) bool
- func (cc *ConnectionContext) IsHandshakeDone() bool
- func (cc *ConnectionContext) NegotiateCompressionWithPeer(peerAlgorithms []protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE) error
- func (cc *ConnectionContext) PackMessage(m *types.Message, protocolVersion int32, maxBodySize int) (*buffer.StaticBufferBlock, error_code.ErrorType)
- func (cc *ConnectionContext) ProcessHandshakeData(peerData *CryptoHandshakeData) error
- func (cc *ConnectionContext) SetClosing(closing bool)
- func (cc *ConnectionContext) SetSupportedCompressionAlgorithms(algorithms []protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE)
- func (cc *ConnectionContext) SetSupportedCryptoAlgorithms(algorithms []protocol.ATBUS_CRYPTO_ALGORITHM_TYPE)
- func (cc *ConnectionContext) SetSupportedKeyExchange(keyExchange protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE)
- func (cc *ConnectionContext) SetupCryptoWithKey(algorithm protocol.ATBUS_CRYPTO_ALGORITHM_TYPE, key []byte, iv []byte) error_code.ErrorType
- func (cc *ConnectionContext) UnpackMessage(m *types.Message, input []byte, maxBodySize int) error_code.ErrorType
- func (cc *ConnectionContext) UpdateCompressionAlgorithm(algorithms []protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE) error_code.ErrorType
- type CryptoHandshakeData
- type CryptoSession
- func (cs *CryptoSession) ComputeSharedSecret(peerPublicKeyBytes []byte) ([]byte, error)
- func (cs *CryptoSession) Decrypt(ciphertext []byte) ([]byte, error)
- func (cs *CryptoSession) DecryptWithIV(ciphertext []byte, iv []byte) ([]byte, error)
- func (cs *CryptoSession) DecryptWithIVAndAAD(ciphertext []byte, iv []byte, aad []byte) ([]byte, error)
- func (cs *CryptoSession) DeriveKey(sharedSecret []byte, algorithm protocol.ATBUS_CRYPTO_ALGORITHM_TYPE, ...) error
- func (cs *CryptoSession) Encrypt(plaintext []byte) ([]byte, error)
- func (cs *CryptoSession) EncryptWithIV(plaintext []byte, iv []byte) ([]byte, error)
- func (cs *CryptoSession) EncryptWithIVAndAAD(plaintext []byte, iv []byte, aad []byte) ([]byte, error)
- func (cs *CryptoSession) GenerateKeyPair(keyExchange protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE) error
- func (cs *CryptoSession) GetPublicKey() []byte
- func (cs *CryptoSession) IsInitialized() bool
- func (cs *CryptoSession) SetKey(key, iv []byte, algorithm protocol.ATBUS_CRYPTO_ALGORITHM_TYPE) error
- type Endpoint
- func (e *Endpoint) AddConnection(conn types.Connection, forceData bool) bool
- func (e *Endpoint) AddListenAddress(addr string)
- func (e *Endpoint) AddPingTimer()
- func (e *Endpoint) AddStatisticFault() uint64
- func (e *Endpoint) ClearListenAddress()
- func (e *Endpoint) ClearPingTimer()
- func (e *Endpoint) ClearStatisticFault()
- func (e *Endpoint) GetCtrlConnection(peer types.Endpoint) types.Connection
- func (e *Endpoint) GetDataConnection(peer types.Endpoint, enableFallbackCtrl bool) types.Connection
- func (e *Endpoint) GetDataConnectionCount(enableFallbackCtrl bool) int
- func (e *Endpoint) GetFlag(f types.EndpointFlag) bool
- func (e *Endpoint) GetFlags() uint32
- func (e *Endpoint) GetHashCode() string
- func (e *Endpoint) GetHostname() string
- func (e *Endpoint) GetId() types.BusIdType
- func (e *Endpoint) GetListenAddress() []types.ChannelAddress
- func (e *Endpoint) GetOwner() types.Node
- func (e *Endpoint) GetPid() int32
- func (e *Endpoint) GetStatisticCreatedTime() time.Time
- func (e *Endpoint) GetStatisticLastPong() time.Time
- func (e *Endpoint) GetStatisticPingDelay() time.Duration
- func (e *Endpoint) GetStatisticPullSize() uint64
- func (e *Endpoint) GetStatisticPullTimes() uint64
- func (e *Endpoint) GetStatisticPushFailedSize() uint64
- func (e *Endpoint) GetStatisticPushFailedTimes() uint64
- func (e *Endpoint) GetStatisticPushStartSize() uint64
- func (e *Endpoint) GetStatisticPushStartTimes() uint64
- func (e *Endpoint) GetStatisticPushSuccessSize() uint64
- func (e *Endpoint) GetStatisticPushSuccessTimes() uint64
- func (e *Endpoint) GetStatisticUnfinishedPing() uint64
- func (e *Endpoint) IsAvailable() bool
- func (e *Endpoint) IsSchemeSupported(scheme string) bool
- func (e *Endpoint) RemoveConnection(conn types.Connection) bool
- func (e *Endpoint) Reset()
- func (e *Endpoint) SetFlag(f types.EndpointFlag, v bool)
- func (e *Endpoint) SetStatisticPingDelay(pd time.Duration, pongTimepoint time.Time)
- func (e *Endpoint) SetStatisticUnfinishedPing(p uint64)
- func (e *Endpoint) UpdateHashCode(code string)
- func (e *Endpoint) UpdateSupportSchemes(schemes []string)
- type Node
- func (n *Node) AddConnectionGcList(conn types.Connection)
- func (n *Node) AddEndpoint(ep types.Endpoint) error_code.ErrorType
- func (n *Node) AddEndpointGcList(ep types.Endpoint)
- func (n *Node) AddStatisticDispatchTimes()
- func (n *Node) AllocateMessageSequence() uint64
- func (n *Node) CheckAccessHash(accessKey *protocol.AccessData, plainText string, conn types.Connection) bool
- func (n *Node) CheckFlag(f types.NodeFlag) bool
- func (n *Node) Connect(address string) error_code.ErrorType
- func (n *Node) ConnectWithEndpoint(address string, ep types.Endpoint) error_code.ErrorType
- func (n *Node) CreateEndpoint(tid types.BusIdType, hostName string, pid int) types.Endpoint
- func (n *Node) DisableDebugMessageVerbose()
- func (n *Node) Disconnect(id types.BusIdType) error_code.ErrorType
- func (n *Node) DispatchAllSelfMessages() int32
- func (n *Node) EnableDebugMessageVerbose()
- func (n *Node) FatalShutdown(ep types.Endpoint, conn types.Connection, code error_code.ErrorType, err error) error_code.ErrorType
- func (n *Node) GetAccessCode() string
- func (n *Node) GetConfigure() *types.NodeConfigure
- func (n *Node) GetContext() context.Context
- func (n *Node) GetCryptoKeyExchangeType() protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE
- func (n *Node) GetEndpoint(tid types.BusIdType) types.Endpoint
- func (n *Node) GetEventHandleOnAddEndpoint() types.NodeOnEndpointEventFunc
- func (n *Node) GetEventHandleOnAvailable() types.NodeOnNodeUpFunc
- func (n *Node) GetEventHandleOnCloseConnection() types.NodeOnCloseConnectionFunc
- func (n *Node) GetEventHandleOnCustomCommandRequest() types.NodeOnCustomCommandRequestFunc
- func (n *Node) GetEventHandleOnCustomCommandResponse() types.NodeOnCustomCommandResponseFunc
- func (n *Node) GetEventHandleOnForwardRequest() types.NodeOnForwardRequestFunc
- func (n *Node) GetEventHandleOnForwardResponse() types.NodeOnForwardResponseFunc
- func (n *Node) GetEventHandleOnInvalidConnection() types.NodeOnInvalidConnectionFunc
- func (n *Node) GetEventHandleOnNewConnection() types.NodeOnNewConnectionFunc
- func (n *Node) GetEventHandleOnPingEndpoint() types.NodeOnPingPongEndpointFunc
- func (n *Node) GetEventHandleOnPongEndpoint() types.NodeOnPingPongEndpointFunc
- func (n *Node) GetEventHandleOnRegister() types.NodeOnRegisterFunc
- func (n *Node) GetEventHandleOnRemoveEndpoint() types.NodeOnEndpointEventFunc
- func (n *Node) GetEventHandleOnShutdown() types.NodeOnNodeDownFunc
- func (n *Node) GetEventHandleOnTopologyUpdateUpstream() types.NodeOnTopologyUpdateUpstreamFunc
- func (n *Node) GetHostname() string
- func (n *Node) GetId() types.BusIdType
- func (n *Node) GetImmediateEndpointSet() types.EndpointCollectionType
- func (n *Node) GetIoStreamChannel() types.IoStreamChannel
- func (n *Node) GetIoStreamConfigure() *types.IoStreamConfigure
- func (n *Node) GetListenList() []types.ChannelAddress
- func (n *Node) GetLogger() *utils_log.Logger
- func (n *Node) GetPeerChannel(tid types.BusIdType, ...) (error_code.ErrorType, types.Endpoint, types.Connection, types.TopologyPeer)
- func (n *Node) GetPid() int
- func (n *Node) GetProtocolMinimalVersion() int32
- func (n *Node) GetProtocolVersion() int32
- func (n *Node) GetSelfEndpoint() types.Endpoint
- func (n *Node) GetSelfEndpointInstance() types.Endpoint
- func (n *Node) GetState() types.NodeState
- func (n *Node) GetTimerTick() time.Time
- func (n *Node) GetTopologyRegistry() types.TopologyRegistry
- func (n *Node) GetTopologyRelation(tid types.BusIdType) (types.TopologyRelationType, types.TopologyPeer)
- func (n *Node) GetUpstreamEndpoint() types.Endpoint
- func (n *Node) GetUpstreamEndpointInstance() types.Endpoint
- func (n *Node) Init(id types.BusIdType, conf *types.NodeConfigure) error_code.ErrorType
- func (n *Node) IsDebugMessageVerboseEnabled() bool
- func (n *Node) IsEndpointAvailable(tid types.BusIdType) bool
- func (n *Node) Listen(address string) error_code.ErrorType
- func (n *Node) LogDebug(ep types.Endpoint, conn types.Connection, m *types.Message, msg string, ...)
- func (n *Node) LogError(ep types.Endpoint, conn types.Connection, status int, ...)
- func (n *Node) LogInfo(ep types.Endpoint, conn types.Connection, msg string, args ...any)
- func (n *Node) OnActived()
- func (n *Node) OnCustomCommandRequest(ep types.Endpoint, conn types.Connection, from types.BusIdType, argv [][]byte) (error_code.ErrorType, [][]byte)
- func (n *Node) OnCustomCommandResponse(ep types.Endpoint, conn types.Connection, from types.BusIdType, argv [][]byte, ...) error_code.ErrorType
- func (n *Node) OnDisconnect(ep types.Endpoint, conn types.Connection) error_code.ErrorType
- func (n *Node) OnNewConnection(conn types.Connection) error_code.ErrorType
- func (n *Node) OnPing(ep types.Endpoint, message *types.Message, body *protocol.PingData) error_code.ErrorType
- func (n *Node) OnPong(ep types.Endpoint, message *types.Message, body *protocol.PingData) error_code.ErrorType
- func (n *Node) OnReceiveData(ep types.Endpoint, conn types.Connection, message *types.Message, data []byte)
- func (n *Node) OnReceiveForwardResponse(ep types.Endpoint, conn types.Connection, message *types.Message)
- func (n *Node) OnRegister(ep types.Endpoint, conn types.Connection, code error_code.ErrorType)
- func (n *Node) OnShutdown(code error_code.ErrorType) error_code.ErrorType
- func (n *Node) OnUpstreamRegisterDone()
- func (n *Node) Poll() error_code.ErrorType
- func (n *Node) Proc(now time.Time) error_code.ErrorType
- func (n *Node) ReloadCompression(compressionAllowAlgorithms []protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE, ...) error_code.ErrorType
- func (n *Node) ReloadCrypto(cryptoKeyExchangeType protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE, ...) error_code.ErrorType
- func (n *Node) RemoveEndpoint(ep types.Endpoint) error_code.ErrorType
- func (n *Node) Reset() error_code.ErrorType
- func (n *Node) SendCtrlMessage(tid types.BusIdType, message *types.Message, ...) (error_code.ErrorType, types.Endpoint, types.Connection)
- func (n *Node) SendCustomCommand(tid types.BusIdType, args [][]byte) error_code.ErrorType
- func (n *Node) SendCustomCommandWithOptions(tid types.BusIdType, args [][]byte, options *types.NodeSendDataOptions) error_code.ErrorType
- func (n *Node) SendData(tid types.BusIdType, t int32, data []byte) error_code.ErrorType
- func (n *Node) SendDataMessage(tid types.BusIdType, message *types.Message, ...) (error_code.ErrorType, types.Endpoint, types.Connection)
- func (n *Node) SendDataWithOptions(tid types.BusIdType, t int32, data []byte, options *types.NodeSendDataOptions) error_code.ErrorType
- func (n *Node) SetEventHandleOnAddEndpoint(handle types.NodeOnEndpointEventFunc)
- func (n *Node) SetEventHandleOnAvailable(handle types.NodeOnNodeUpFunc)
- func (n *Node) SetEventHandleOnCloseConnection(handle types.NodeOnCloseConnectionFunc)
- func (n *Node) SetEventHandleOnCustomCommandRequest(handle types.NodeOnCustomCommandRequestFunc)
- func (n *Node) SetEventHandleOnCustomCommandResponse(handle types.NodeOnCustomCommandResponseFunc)
- func (n *Node) SetEventHandleOnForwardRequest(handle types.NodeOnForwardRequestFunc)
- func (n *Node) SetEventHandleOnForwardResponse(handle types.NodeOnForwardResponseFunc)
- func (n *Node) SetEventHandleOnInvalidConnection(handle types.NodeOnInvalidConnectionFunc)
- func (n *Node) SetEventHandleOnNewConnection(handle types.NodeOnNewConnectionFunc)
- func (n *Node) SetEventHandleOnPingEndpoint(handle types.NodeOnPingPongEndpointFunc)
- func (n *Node) SetEventHandleOnPongEndpoint(handle types.NodeOnPingPongEndpointFunc)
- func (n *Node) SetEventHandleOnRegister(handle types.NodeOnRegisterFunc)
- func (n *Node) SetEventHandleOnRemoveEndpoint(handle types.NodeOnEndpointEventFunc)
- func (n *Node) SetEventHandleOnShutdown(handle types.NodeOnNodeDownFunc)
- func (n *Node) SetEventHandleOnTopologyUpdateUpstream(handle types.NodeOnTopologyUpdateUpstreamFunc)
- func (n *Node) SetHostname(hostname string, force bool) bool
- func (n *Node) SetLogger(logger *utils_log.Logger)
- func (n *Node) SetTopologyUpstream(tid types.BusIdType)
- func (n *Node) Shutdown(reason error_code.ErrorType) error_code.ErrorType
- func (n *Node) Start() error_code.ErrorType
- func (n *Node) StartWithConfigure(conf *types.StartConfigure) error_code.ErrorType
- type TopologyPeer
- func (p *TopologyPeer) ContainsDownstream(busId types.BusIdType) bool
- func (p *TopologyPeer) ForeachDownstream(fn func(peer types.TopologyPeer) bool) bool
- func (p *TopologyPeer) GetBusId() types.BusIdType
- func (p *TopologyPeer) GetTopologyData() *types.TopologyData
- func (p *TopologyPeer) GetUpstream() types.TopologyPeer
- type TopologyRegistry
- func (r *TopologyRegistry) CheckPolicy(rule *types.TopologyPolicyRule, fromData *types.TopologyData, ...) bool
- func (r *TopologyRegistry) ForeachPeer(fn func(peer types.TopologyPeer) bool) bool
- func (r *TopologyRegistry) GetPeer(busId types.BusIdType) types.TopologyPeer
- func (r *TopologyRegistry) GetPeerInstance(busId types.BusIdType) *TopologyPeer
- func (r *TopologyRegistry) GetRelation(from types.BusIdType, to types.BusIdType) (types.TopologyRelationType, types.TopologyPeer)
- func (r *TopologyRegistry) RemovePeer(targetBusId types.BusIdType)
- func (r *TopologyRegistry) UpdatePeer(targetBusId types.BusIdType, upstreamBusId types.BusIdType, ...) bool
Constants ¶
This section is empty.
Variables ¶
var ( ErrCryptoNotInitialized = errors.New("crypto not initialized") ErrCryptoAlgorithmNotSupported = errors.New("crypto algorithm not supported") ErrCryptoInvalidKeySize = errors.New("invalid crypto key size") ErrCryptoInvalidIVSize = errors.New("invalid crypto iv/nonce size") ErrCryptoEncryptFailed = errors.New("crypto encrypt failed") ErrCryptoDecryptFailed = errors.New("crypto decrypt failed") ErrCryptoHandshakeFailed = errors.New("crypto handshake failed") ErrCryptoKeyExchangeFailed = errors.New("crypto key exchange failed") ErrCryptoKDFFailed = errors.New("crypto kdf failed") ErrCompressionNotSupported = errors.New("compression algorithm not supported") ErrCompressionFailed = errors.New("compression failed") ErrDecompressionFailed = errors.New("decompression failed") ErrPackFailed = errors.New("pack failed") ErrUnpackFailed = errors.New("unpack failed") ErrInvalidData = errors.New("invalid data") ErrConnectionClosing = errors.New("connection is closing") )
Error definitions for connection context.
Functions ¶
func NegotiateCompression ¶
func NegotiateCompression(local, remote []protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE) protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE
NegotiateCompression negotiates the compression algorithm based on supported algorithms.
func NegotiateCryptoAlgorithm ¶
func NegotiateCryptoAlgorithm(local, remote []protocol.ATBUS_CRYPTO_ALGORITHM_TYPE) protocol.ATBUS_CRYPTO_ALGORITHM_TYPE
NegotiateCryptoAlgorithm negotiates the crypto algorithm based on supported algorithms.
func NegotiateKDF ¶
func NegotiateKDF(local, remote []protocol.ATBUS_CRYPTO_KDF_TYPE) protocol.ATBUS_CRYPTO_KDF_TYPE
NegotiateKDF negotiates the KDF type based on supported types.
func NegotiateKeyExchange ¶
func NegotiateKeyExchange(local, remote protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE) protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE
NegotiateKeyExchange negotiates the key exchange algorithm.
Types ¶
type CompressionSession ¶
type CompressionSession struct {
Algorithm protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE
// contains filtered or unexported fields
}
CompressionSession handles compression and decompression.
func NewCompressionSession ¶
func NewCompressionSession() *CompressionSession
NewCompressionSession creates a new compression session.
func (*CompressionSession) Compress ¶
func (cs *CompressionSession) Compress(data []byte) ([]byte, error)
Compress compresses the data using the configured algorithm.
func (*CompressionSession) Decompress ¶
func (cs *CompressionSession) Decompress(data []byte, originalSize int) ([]byte, error)
Decompress decompresses the data using the configured algorithm.
func (*CompressionSession) GetAlgorithm ¶
func (cs *CompressionSession) GetAlgorithm() protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE
GetAlgorithm returns the current compression algorithm.
func (*CompressionSession) SetAlgorithm ¶
func (cs *CompressionSession) SetAlgorithm(algorithm protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE) error
SetAlgorithm sets the compression algorithm.
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
func CreateConnection ¶
func CreateConnection(owner *Node, addr string) *Connection
func (*Connection) AddStatFault ¶
func (c *Connection) AddStatFault() uint64
func (*Connection) CheckFlag ¶
func (c *Connection) CheckFlag(flag types.ConnectionFlag) bool
func (*Connection) ClearStatFault ¶
func (c *Connection) ClearStatFault()
func (*Connection) Connect ¶
func (c *Connection) Connect() types.ErrorType
func (*Connection) Disconnect ¶
func (c *Connection) Disconnect() types.ErrorType
func (*Connection) GetAddress ¶
func (c *Connection) GetAddress() types.ChannelAddress
func (*Connection) GetBinding ¶
func (c *Connection) GetBinding() types.Endpoint
func (*Connection) GetConnectionContext ¶
func (c *Connection) GetConnectionContext() types.ConnectionContext
func (*Connection) GetIoStreamConnection ¶
func (c *Connection) GetIoStreamConnection() *io_stream.IoStreamConnection
GetIoStreamConnection returns the underlying IoStreamConnection.
func (*Connection) GetStatistic ¶
func (c *Connection) GetStatistic() types.ConnectionStatistic
func (*Connection) GetStatus ¶
func (c *Connection) GetStatus() types.ConnectionState
func (*Connection) IsConnected ¶
func (c *Connection) IsConnected() bool
func (*Connection) IsRunning ¶
func (c *Connection) IsRunning() bool
func (*Connection) Listen ¶
func (c *Connection) Listen() types.ErrorType
func (*Connection) Proc ¶
func (c *Connection) Proc() types.ErrorType
func (*Connection) RemoveOwnerChecker ¶
func (c *Connection) RemoveOwnerChecker()
func (*Connection) Reset ¶
func (c *Connection) Reset()
func (*Connection) SetIoStreamConnection ¶
func (c *Connection) SetIoStreamConnection(conn *io_stream.IoStreamConnection)
SetIoStreamConnection sets the underlying IoStreamConnection.
func (*Connection) SetTemporary ¶
func (c *Connection) SetTemporary()
type ConnectionContext ¶
type ConnectionContext struct {
// contains filtered or unexported fields
}
ConnectionContext manages the connection state including crypto and compression.
func NewConnectionContext ¶
func NewConnectionContext(keyExchangeType protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE) *ConnectionContext
NewConnectionContext creates a new connection context with default settings.
func (*ConnectionContext) CreateHandshakeData ¶
func (cc *ConnectionContext) CreateHandshakeData() (*CryptoHandshakeData, error)
CreateHandshakeData creates the handshake data for initiating a handshake.
func (*ConnectionContext) GetCompressSelectAlgorithm ¶
func (cc *ConnectionContext) GetCompressSelectAlgorithm() protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE
GetCompressSelectAlgorithm returns the compression algorithm selected during handshake.
func (*ConnectionContext) GetCompression ¶
func (cc *ConnectionContext) GetCompression() *CompressionSession
GetCompression returns the compression session.
func (*ConnectionContext) GetCryptoKeyExchangeAlgorithm ¶
func (cc *ConnectionContext) GetCryptoKeyExchangeAlgorithm() protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE
GetCryptoKeyExchangeAlgorithm returns the key exchange algorithm used for crypto handshake.
func (*ConnectionContext) GetCryptoSelectAlgorithm ¶
func (cc *ConnectionContext) GetCryptoSelectAlgorithm() protocol.ATBUS_CRYPTO_ALGORITHM_TYPE
GetCryptoSelectAlgorithm returns the crypto algorithm selected during handshake.
func (*ConnectionContext) GetCryptoSelectKdfType ¶
func (cc *ConnectionContext) GetCryptoSelectKdfType() protocol.ATBUS_CRYPTO_KDF_TYPE
GetCryptoSelectKdfType returns the KDF type selected during handshake.
func (*ConnectionContext) GetHandshakeStartTime ¶
func (cc *ConnectionContext) GetHandshakeStartTime() time.Time
GetHandshakeStartTime returns the time when the handshake was started.
func (*ConnectionContext) GetNextSequence ¶
func (cc *ConnectionContext) GetNextSequence() uint64
GetNextSequence returns the next sequence number.
func (*ConnectionContext) GetReadCrypto ¶
func (cc *ConnectionContext) GetReadCrypto() *CryptoSession
GetReadCrypto returns the read crypto session.
func (*ConnectionContext) GetSupportedCompressionAlgorithms ¶
func (cc *ConnectionContext) GetSupportedCompressionAlgorithms() []protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE
GetSupportedCompressionAlgorithms returns the supported compression algorithms.
func (*ConnectionContext) GetSupportedCryptoAlgorithms ¶
func (cc *ConnectionContext) GetSupportedCryptoAlgorithms() []protocol.ATBUS_CRYPTO_ALGORITHM_TYPE
GetSupportedCryptoAlgorithms returns the supported crypto algorithms.
func (*ConnectionContext) GetSupportedKeyExchange ¶
func (cc *ConnectionContext) GetSupportedKeyExchange() protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE
GetSupportedKeyExchange returns the supported key exchange type.
func (*ConnectionContext) GetWriteCrypto ¶
func (cc *ConnectionContext) GetWriteCrypto() *CryptoSession
GetWriteCrypto returns the write crypto session.
func (*ConnectionContext) HandshakeGenerateSelfKey ¶
func (cc *ConnectionContext) HandshakeGenerateSelfKey(peerSequenceId uint64) error_code.ErrorType
HandshakeGenerateSelfKey generates the local ECDH key pair for handshake. In client mode, peerSequenceId should be 0 to generate a new sequence. In server mode, peerSequenceId should be the peer's handshake sequence.
func (*ConnectionContext) HandshakeReadPeerKey ¶
func (cc *ConnectionContext) HandshakeReadPeerKey(peerPubKey *protocol.CryptoHandshakeData, supportedCryptoAlgorithms []protocol.ATBUS_CRYPTO_ALGORITHM_TYPE, ) error_code.ErrorType
HandshakeReadPeerKey reads the peer's public key and computes the shared secret.
func (*ConnectionContext) HandshakeWriteSelfPublicKey ¶
func (cc *ConnectionContext) HandshakeWriteSelfPublicKey( selfPubKey *protocol.CryptoHandshakeData, supportedCryptoAlgorithms []protocol.ATBUS_CRYPTO_ALGORITHM_TYPE, ) error_code.ErrorType
HandshakeWriteSelfPublicKey writes the local public key to the handshake data structure.
func (*ConnectionContext) IsClosing ¶
func (cc *ConnectionContext) IsClosing() bool
IsClosing returns true if the connection is closing.
func (*ConnectionContext) IsCompressionAlgorithmSupported ¶
func (cc *ConnectionContext) IsCompressionAlgorithmSupported( algorithm protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE, ) bool
IsCompressionAlgorithmSupported reports whether the specified compression algorithm is supported.
func (*ConnectionContext) IsHandshakeDone ¶
func (cc *ConnectionContext) IsHandshakeDone() bool
IsHandshakeDone returns true if the handshake is completed.
func (*ConnectionContext) NegotiateCompressionWithPeer ¶
func (cc *ConnectionContext) NegotiateCompressionWithPeer(peerAlgorithms []protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE) error
NegotiateCompressionWithPeer negotiates compression with peer's supported algorithms.
func (*ConnectionContext) PackMessage ¶
func (cc *ConnectionContext) PackMessage(m *types.Message, protocolVersion int32, maxBodySize int) (*buffer.StaticBufferBlock, error_code.ErrorType)
PackMessage packs a Message into a StaticBufferBlock for transmission. This matches the C++ connection_context::pack_message signature. Message frame format: vint(header_length) + header + body
func (*ConnectionContext) ProcessHandshakeData ¶
func (cc *ConnectionContext) ProcessHandshakeData(peerData *CryptoHandshakeData) error
ProcessHandshakeData processes the received handshake data and completes the key exchange.
func (*ConnectionContext) SetClosing ¶
func (cc *ConnectionContext) SetClosing(closing bool)
SetClosing sets the closing state.
func (*ConnectionContext) SetSupportedCompressionAlgorithms ¶
func (cc *ConnectionContext) SetSupportedCompressionAlgorithms(algorithms []protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE)
SetSupportedCompressionAlgorithms sets the supported compression algorithms.
func (*ConnectionContext) SetSupportedCryptoAlgorithms ¶
func (cc *ConnectionContext) SetSupportedCryptoAlgorithms(algorithms []protocol.ATBUS_CRYPTO_ALGORITHM_TYPE)
SetSupportedCryptoAlgorithms sets the supported crypto algorithms.
func (*ConnectionContext) SetSupportedKeyExchange ¶
func (cc *ConnectionContext) SetSupportedKeyExchange(keyExchange protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE)
SetSupportedKeyExchange sets the supported key exchange type.
func (*ConnectionContext) SetupCryptoWithKey ¶
func (cc *ConnectionContext) SetupCryptoWithKey( algorithm protocol.ATBUS_CRYPTO_ALGORITHM_TYPE, key []byte, iv []byte, ) error_code.ErrorType
SetupCryptoWithKey directly sets the encryption key and IV, skipping key exchange. This is primarily used for testing purposes.
func (*ConnectionContext) UnpackMessage ¶
func (cc *ConnectionContext) UnpackMessage(m *types.Message, input []byte, maxBodySize int) error_code.ErrorType
UnpackMessage unpacks binary data into a Message. This matches the C++ connection_context::unpack_message signature. Message frame format: vint(header_length) + header + body
func (*ConnectionContext) UpdateCompressionAlgorithm ¶
func (cc *ConnectionContext) UpdateCompressionAlgorithm( algorithms []protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE, ) error_code.ErrorType
UpdateCompressionAlgorithm updates the list of supported compression algorithms.
type CryptoHandshakeData ¶
type CryptoHandshakeData struct {
Sequence uint64
KeyExchange protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE
KDFTypes []protocol.ATBUS_CRYPTO_KDF_TYPE
Algorithms []protocol.ATBUS_CRYPTO_ALGORITHM_TYPE
PublicKey []byte
IVSize uint32
TagSize uint32
}
CryptoHandshakeData holds the data for crypto handshake.
type CryptoSession ¶
type CryptoSession struct {
// Negotiated algorithm and parameters
Algorithm protocol.ATBUS_CRYPTO_ALGORITHM_TYPE
KeyExchange protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE
KDFType protocol.ATBUS_CRYPTO_KDF_TYPE
Key []byte
IV []byte
TagSize uint32
IVSize uint32
// contains filtered or unexported fields
}
CryptoSession holds the crypto session state.
func NewCryptoSession ¶
func NewCryptoSession() *CryptoSession
NewCryptoSession creates a new crypto session.
func (*CryptoSession) ComputeSharedSecret ¶
func (cs *CryptoSession) ComputeSharedSecret(peerPublicKeyBytes []byte) ([]byte, error)
ComputeSharedSecret computes the shared secret using the peer's public key.
func (*CryptoSession) Decrypt ¶
func (cs *CryptoSession) Decrypt(ciphertext []byte) ([]byte, error)
Decrypt decrypts the ciphertext data.
func (*CryptoSession) DecryptWithIV ¶
func (cs *CryptoSession) DecryptWithIV(ciphertext []byte, iv []byte) ([]byte, error)
DecryptWithIV decrypts data for non-AEAD algorithms that require an IV (e.g., CBC). The IV must be provided by the caller and is not expected to be prepended to ciphertext.
func (*CryptoSession) DecryptWithIVAndAAD ¶
func (cs *CryptoSession) DecryptWithIVAndAAD(ciphertext []byte, iv []byte, aad []byte) ([]byte, error)
DecryptWithIVAndAAD decrypts data using AEAD with a caller-provided IV/nonce and AAD. This is used for cross-language compatibility where IV/AAD are carried in message headers.
func (*CryptoSession) DeriveKey ¶
func (cs *CryptoSession) DeriveKey(sharedSecret []byte, algorithm protocol.ATBUS_CRYPTO_ALGORITHM_TYPE, kdfType protocol.ATBUS_CRYPTO_KDF_TYPE) error
DeriveKey derives the encryption key and IV from the shared secret using HKDF.
func (*CryptoSession) Encrypt ¶
func (cs *CryptoSession) Encrypt(plaintext []byte) ([]byte, error)
Encrypt encrypts the plaintext data.
func (*CryptoSession) EncryptWithIV ¶
func (cs *CryptoSession) EncryptWithIV(plaintext []byte, iv []byte) ([]byte, error)
EncryptWithIV encrypts data for non-AEAD algorithms that require an IV (e.g., CBC). The IV must be provided by the caller and will not be prepended to ciphertext.
func (*CryptoSession) EncryptWithIVAndAAD ¶
func (cs *CryptoSession) EncryptWithIVAndAAD(plaintext []byte, iv []byte, aad []byte) ([]byte, error)
EncryptWithIVAndAAD encrypts data using AEAD with a caller-provided IV/nonce and AAD. This is used for cross-language compatibility where IV/AAD are carried in message headers.
func (*CryptoSession) GenerateKeyPair ¶
func (cs *CryptoSession) GenerateKeyPair(keyExchange protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE) error
GenerateKeyPair generates a new ECDH key pair for the given key exchange type.
func (*CryptoSession) GetPublicKey ¶
func (cs *CryptoSession) GetPublicKey() []byte
GetPublicKey returns the public key bytes.
func (*CryptoSession) IsInitialized ¶
func (cs *CryptoSession) IsInitialized() bool
IsInitialized returns true if the crypto session is initialized.
func (*CryptoSession) SetKey ¶
func (cs *CryptoSession) SetKey(key, iv []byte, algorithm protocol.ATBUS_CRYPTO_ALGORITHM_TYPE) error
SetKey directly sets the encryption key and IV.
type Endpoint ¶
type Endpoint struct {
// contains filtered or unexported fields
}
func CreateEndpoint ¶
func (*Endpoint) AddConnection ¶
func (e *Endpoint) AddConnection(conn types.Connection, forceData bool) bool
func (*Endpoint) AddListenAddress ¶
func (*Endpoint) AddPingTimer ¶
func (e *Endpoint) AddPingTimer()
func (*Endpoint) AddStatisticFault ¶
func (*Endpoint) ClearListenAddress ¶
func (e *Endpoint) ClearListenAddress()
func (*Endpoint) ClearPingTimer ¶
func (e *Endpoint) ClearPingTimer()
func (*Endpoint) ClearStatisticFault ¶
func (e *Endpoint) ClearStatisticFault()
func (*Endpoint) GetCtrlConnection ¶
func (e *Endpoint) GetCtrlConnection(peer types.Endpoint) types.Connection
func (*Endpoint) GetDataConnection ¶
func (*Endpoint) GetDataConnectionCount ¶
func (*Endpoint) GetHashCode ¶
func (*Endpoint) GetHostname ¶
func (*Endpoint) GetListenAddress ¶
func (e *Endpoint) GetListenAddress() []types.ChannelAddress
func (*Endpoint) GetStatisticCreatedTime ¶
func (*Endpoint) GetStatisticLastPong ¶
func (*Endpoint) GetStatisticPingDelay ¶
func (*Endpoint) GetStatisticPullSize ¶
func (*Endpoint) GetStatisticPullTimes ¶
func (*Endpoint) GetStatisticPushFailedSize ¶
func (*Endpoint) GetStatisticPushFailedTimes ¶
func (*Endpoint) GetStatisticPushStartSize ¶
func (*Endpoint) GetStatisticPushStartTimes ¶
func (*Endpoint) GetStatisticPushSuccessSize ¶
func (*Endpoint) GetStatisticPushSuccessTimes ¶
func (*Endpoint) GetStatisticUnfinishedPing ¶
func (*Endpoint) IsAvailable ¶
func (*Endpoint) IsSchemeSupported ¶
func (*Endpoint) RemoveConnection ¶
func (e *Endpoint) RemoveConnection(conn types.Connection) bool
func (*Endpoint) SetStatisticPingDelay ¶
func (*Endpoint) SetStatisticUnfinishedPing ¶
func (*Endpoint) UpdateHashCode ¶
func (*Endpoint) UpdateSupportSchemes ¶
type Node ¶
type Node struct {
// contains filtered or unexported fields
}
func (*Node) AddConnectionGcList ¶
func (n *Node) AddConnectionGcList(conn types.Connection)
func (*Node) AddEndpoint ¶
func (n *Node) AddEndpoint(ep types.Endpoint) error_code.ErrorType
func (*Node) AddEndpointGcList ¶
func (*Node) AddStatisticDispatchTimes ¶
func (n *Node) AddStatisticDispatchTimes()
func (*Node) AllocateMessageSequence ¶
func (*Node) CheckAccessHash ¶
func (n *Node) CheckAccessHash(accessKey *protocol.AccessData, plainText string, conn types.Connection) bool
func (*Node) ConnectWithEndpoint ¶
func (*Node) CreateEndpoint ¶
func (*Node) DisableDebugMessageVerbose ¶
func (n *Node) DisableDebugMessageVerbose()
func (*Node) Disconnect ¶
func (n *Node) Disconnect(id types.BusIdType) error_code.ErrorType
func (*Node) DispatchAllSelfMessages ¶
func (*Node) EnableDebugMessageVerbose ¶
func (n *Node) EnableDebugMessageVerbose()
func (*Node) FatalShutdown ¶
func (n *Node) FatalShutdown(ep types.Endpoint, conn types.Connection, code error_code.ErrorType, err error) error_code.ErrorType
func (*Node) GetAccessCode ¶
func (*Node) GetConfigure ¶
func (n *Node) GetConfigure() *types.NodeConfigure
func (*Node) GetContext ¶
func (*Node) GetCryptoKeyExchangeType ¶
func (n *Node) GetCryptoKeyExchangeType() protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE
func (*Node) GetEventHandleOnAddEndpoint ¶
func (n *Node) GetEventHandleOnAddEndpoint() types.NodeOnEndpointEventFunc
func (*Node) GetEventHandleOnAvailable ¶
func (n *Node) GetEventHandleOnAvailable() types.NodeOnNodeUpFunc
func (*Node) GetEventHandleOnCloseConnection ¶
func (n *Node) GetEventHandleOnCloseConnection() types.NodeOnCloseConnectionFunc
func (*Node) GetEventHandleOnCustomCommandRequest ¶
func (n *Node) GetEventHandleOnCustomCommandRequest() types.NodeOnCustomCommandRequestFunc
func (*Node) GetEventHandleOnCustomCommandResponse ¶
func (n *Node) GetEventHandleOnCustomCommandResponse() types.NodeOnCustomCommandResponseFunc
func (*Node) GetEventHandleOnForwardRequest ¶
func (n *Node) GetEventHandleOnForwardRequest() types.NodeOnForwardRequestFunc
func (*Node) GetEventHandleOnForwardResponse ¶
func (n *Node) GetEventHandleOnForwardResponse() types.NodeOnForwardResponseFunc
func (*Node) GetEventHandleOnInvalidConnection ¶
func (n *Node) GetEventHandleOnInvalidConnection() types.NodeOnInvalidConnectionFunc
func (*Node) GetEventHandleOnNewConnection ¶
func (n *Node) GetEventHandleOnNewConnection() types.NodeOnNewConnectionFunc
func (*Node) GetEventHandleOnPingEndpoint ¶
func (n *Node) GetEventHandleOnPingEndpoint() types.NodeOnPingPongEndpointFunc
func (*Node) GetEventHandleOnPongEndpoint ¶
func (n *Node) GetEventHandleOnPongEndpoint() types.NodeOnPingPongEndpointFunc
func (*Node) GetEventHandleOnRegister ¶
func (n *Node) GetEventHandleOnRegister() types.NodeOnRegisterFunc
func (*Node) GetEventHandleOnRemoveEndpoint ¶
func (n *Node) GetEventHandleOnRemoveEndpoint() types.NodeOnEndpointEventFunc
func (*Node) GetEventHandleOnShutdown ¶
func (n *Node) GetEventHandleOnShutdown() types.NodeOnNodeDownFunc
func (*Node) GetEventHandleOnTopologyUpdateUpstream ¶
func (n *Node) GetEventHandleOnTopologyUpdateUpstream() types.NodeOnTopologyUpdateUpstreamFunc
func (*Node) GetHostname ¶
func (*Node) GetImmediateEndpointSet ¶
func (n *Node) GetImmediateEndpointSet() types.EndpointCollectionType
func (*Node) GetIoStreamChannel ¶
func (n *Node) GetIoStreamChannel() types.IoStreamChannel
func (*Node) GetIoStreamConfigure ¶
func (n *Node) GetIoStreamConfigure() *types.IoStreamConfigure
func (*Node) GetListenList ¶
func (n *Node) GetListenList() []types.ChannelAddress
func (*Node) GetPeerChannel ¶
func (n *Node) GetPeerChannel(tid types.BusIdType, fn func(from types.Endpoint, to types.Endpoint) types.Connection, options *types.NodeGetPeerOptions) (error_code.ErrorType, types.Endpoint, types.Connection, types.TopologyPeer)
func (*Node) GetProtocolMinimalVersion ¶
func (*Node) GetProtocolVersion ¶
func (*Node) GetSelfEndpoint ¶
func (*Node) GetSelfEndpointInstance ¶
func (*Node) GetTimerTick ¶
func (*Node) GetTopologyRegistry ¶
func (n *Node) GetTopologyRegistry() types.TopologyRegistry
func (*Node) GetTopologyRelation ¶
func (n *Node) GetTopologyRelation(tid types.BusIdType) (types.TopologyRelationType, types.TopologyPeer)
func (*Node) GetUpstreamEndpoint ¶
func (*Node) GetUpstreamEndpointInstance ¶
func (*Node) Init ¶
func (n *Node) Init(id types.BusIdType, conf *types.NodeConfigure) error_code.ErrorType
func (*Node) IsDebugMessageVerboseEnabled ¶
func (*Node) LogError ¶
func (n *Node) LogError(ep types.Endpoint, conn types.Connection, status int, errcode error_code.ErrorType, msg string, args ...any)
func (*Node) OnCustomCommandRequest ¶
func (*Node) OnCustomCommandResponse ¶
func (*Node) OnDisconnect ¶
func (n *Node) OnDisconnect(ep types.Endpoint, conn types.Connection) error_code.ErrorType
func (*Node) OnNewConnection ¶
func (n *Node) OnNewConnection(conn types.Connection) error_code.ErrorType
func (*Node) OnReceiveData ¶
func (*Node) OnReceiveForwardResponse ¶
func (*Node) OnRegister ¶
func (n *Node) OnRegister(ep types.Endpoint, conn types.Connection, code error_code.ErrorType)
func (*Node) OnShutdown ¶
func (n *Node) OnShutdown(code error_code.ErrorType) error_code.ErrorType
func (*Node) OnUpstreamRegisterDone ¶
func (n *Node) OnUpstreamRegisterDone()
func (*Node) Poll ¶
func (n *Node) Poll() error_code.ErrorType
func (*Node) ReloadCompression ¶
func (n *Node) ReloadCompression(compressionAllowAlgorithms []protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE, compressionLevel protocol.ATBUS_COMPRESSION_LEVEL, ) error_code.ErrorType
func (*Node) ReloadCrypto ¶
func (n *Node) ReloadCrypto(cryptoKeyExchangeType protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE, cryptoKeyRefreshInterval time.Duration, cryptoAllowAlgorithms []protocol.ATBUS_CRYPTO_ALGORITHM_TYPE, ) error_code.ErrorType
func (*Node) RemoveEndpoint ¶
func (n *Node) RemoveEndpoint(ep types.Endpoint) error_code.ErrorType
func (*Node) Reset ¶
func (n *Node) Reset() error_code.ErrorType
func (*Node) SendCtrlMessage ¶
func (n *Node) SendCtrlMessage(tid types.BusIdType, message *types.Message, options *types.NodeSendDataOptions) (error_code.ErrorType, types.Endpoint, types.Connection)
func (*Node) SendCustomCommand ¶
func (*Node) SendCustomCommandWithOptions ¶
func (n *Node) SendCustomCommandWithOptions(tid types.BusIdType, args [][]byte, options *types.NodeSendDataOptions) error_code.ErrorType
func (*Node) SendDataMessage ¶
func (n *Node) SendDataMessage(tid types.BusIdType, message *types.Message, options *types.NodeSendDataOptions) (error_code.ErrorType, types.Endpoint, types.Connection)
func (*Node) SendDataWithOptions ¶
func (n *Node) SendDataWithOptions(tid types.BusIdType, t int32, data []byte, options *types.NodeSendDataOptions) error_code.ErrorType
func (*Node) SetEventHandleOnAddEndpoint ¶
func (n *Node) SetEventHandleOnAddEndpoint(handle types.NodeOnEndpointEventFunc)
func (*Node) SetEventHandleOnAvailable ¶
func (n *Node) SetEventHandleOnAvailable(handle types.NodeOnNodeUpFunc)
func (*Node) SetEventHandleOnCloseConnection ¶
func (n *Node) SetEventHandleOnCloseConnection(handle types.NodeOnCloseConnectionFunc)
func (*Node) SetEventHandleOnCustomCommandRequest ¶
func (n *Node) SetEventHandleOnCustomCommandRequest(handle types.NodeOnCustomCommandRequestFunc)
func (*Node) SetEventHandleOnCustomCommandResponse ¶
func (n *Node) SetEventHandleOnCustomCommandResponse(handle types.NodeOnCustomCommandResponseFunc)
func (*Node) SetEventHandleOnForwardRequest ¶
func (n *Node) SetEventHandleOnForwardRequest(handle types.NodeOnForwardRequestFunc)
func (*Node) SetEventHandleOnForwardResponse ¶
func (n *Node) SetEventHandleOnForwardResponse(handle types.NodeOnForwardResponseFunc)
func (*Node) SetEventHandleOnInvalidConnection ¶
func (n *Node) SetEventHandleOnInvalidConnection(handle types.NodeOnInvalidConnectionFunc)
func (*Node) SetEventHandleOnNewConnection ¶
func (n *Node) SetEventHandleOnNewConnection(handle types.NodeOnNewConnectionFunc)
func (*Node) SetEventHandleOnPingEndpoint ¶
func (n *Node) SetEventHandleOnPingEndpoint(handle types.NodeOnPingPongEndpointFunc)
func (*Node) SetEventHandleOnPongEndpoint ¶
func (n *Node) SetEventHandleOnPongEndpoint(handle types.NodeOnPingPongEndpointFunc)
func (*Node) SetEventHandleOnRegister ¶
func (n *Node) SetEventHandleOnRegister(handle types.NodeOnRegisterFunc)
func (*Node) SetEventHandleOnRemoveEndpoint ¶
func (n *Node) SetEventHandleOnRemoveEndpoint(handle types.NodeOnEndpointEventFunc)
func (*Node) SetEventHandleOnShutdown ¶
func (n *Node) SetEventHandleOnShutdown(handle types.NodeOnNodeDownFunc)
func (*Node) SetEventHandleOnTopologyUpdateUpstream ¶
func (n *Node) SetEventHandleOnTopologyUpdateUpstream(handle types.NodeOnTopologyUpdateUpstreamFunc)
func (*Node) SetTopologyUpstream ¶
func (*Node) Shutdown ¶
func (n *Node) Shutdown(reason error_code.ErrorType) error_code.ErrorType
func (*Node) Start ¶
func (n *Node) Start() error_code.ErrorType
func (*Node) StartWithConfigure ¶
func (n *Node) StartWithConfigure(conf *types.StartConfigure) error_code.ErrorType
type TopologyPeer ¶
type TopologyPeer struct {
// contains filtered or unexported fields
}
func CreateTopologyPeer ¶
func CreateTopologyPeer(busId types.BusIdType) *TopologyPeer
func (*TopologyPeer) ContainsDownstream ¶
func (p *TopologyPeer) ContainsDownstream(busId types.BusIdType) bool
func (*TopologyPeer) ForeachDownstream ¶
func (p *TopologyPeer) ForeachDownstream(fn func(peer types.TopologyPeer) bool) bool
func (*TopologyPeer) GetBusId ¶
func (p *TopologyPeer) GetBusId() types.BusIdType
func (*TopologyPeer) GetTopologyData ¶
func (p *TopologyPeer) GetTopologyData() *types.TopologyData
func (*TopologyPeer) GetUpstream ¶
func (p *TopologyPeer) GetUpstream() types.TopologyPeer
type TopologyRegistry ¶
type TopologyRegistry struct {
// contains filtered or unexported fields
}
func CreateTopologyRegistry ¶
func CreateTopologyRegistry() *TopologyRegistry
func (*TopologyRegistry) CheckPolicy ¶
func (r *TopologyRegistry) CheckPolicy(rule *types.TopologyPolicyRule, fromData *types.TopologyData, toData *types.TopologyData) bool
func (*TopologyRegistry) ForeachPeer ¶
func (r *TopologyRegistry) ForeachPeer(fn func(peer types.TopologyPeer) bool) bool
func (*TopologyRegistry) GetPeer ¶
func (r *TopologyRegistry) GetPeer(busId types.BusIdType) types.TopologyPeer
func (*TopologyRegistry) GetPeerInstance ¶
func (r *TopologyRegistry) GetPeerInstance(busId types.BusIdType) *TopologyPeer
func (*TopologyRegistry) GetRelation ¶
func (r *TopologyRegistry) GetRelation(from types.BusIdType, to types.BusIdType) (types.TopologyRelationType, types.TopologyPeer)
func (*TopologyRegistry) RemovePeer ¶
func (r *TopologyRegistry) RemovePeer(targetBusId types.BusIdType)
func (*TopologyRegistry) UpdatePeer ¶
func (r *TopologyRegistry) UpdatePeer(targetBusId types.BusIdType, upstreamBusId types.BusIdType, data *types.TopologyData) bool