libatbus_impl

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2026 License: MIT Imports: 36 Imported by: 0

Documentation

Overview

Package libatbus_impl provides internal implementation details for libatbus. This file implements the connection context with encryption/decryption algorithm negotiation, compression algorithm negotiation, encryption/decryption flow, and pack/unpack flow.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrCryptoNotInitialized        = errors.New("crypto not initialized")
	ErrCryptoAlgorithmNotSupported = errors.New("crypto algorithm not supported")
	ErrCryptoInvalidKeySize        = errors.New("invalid crypto key size")
	ErrCryptoInvalidIVSize         = errors.New("invalid crypto iv/nonce size")
	ErrCryptoEncryptFailed         = errors.New("crypto encrypt failed")
	ErrCryptoDecryptFailed         = errors.New("crypto decrypt failed")
	ErrCryptoHandshakeFailed       = errors.New("crypto handshake failed")
	ErrCryptoKeyExchangeFailed     = errors.New("crypto key exchange failed")
	ErrCryptoKDFFailed             = errors.New("crypto kdf failed")
	ErrCompressionNotSupported     = errors.New("compression algorithm not supported")
	ErrCompressionFailed           = errors.New("compression failed")
	ErrDecompressionFailed         = errors.New("decompression failed")
	ErrPackFailed                  = errors.New("pack failed")
	ErrUnpackFailed                = errors.New("unpack failed")
	ErrInvalidData                 = errors.New("invalid data")
	ErrConnectionClosing           = errors.New("connection is closing")
)

Error definitions for connection context.

Functions

func NegotiateCompression

NegotiateCompression negotiates the compression algorithm based on supported algorithms.

func NegotiateCryptoAlgorithm

func NegotiateCryptoAlgorithm(local, remote []protocol.ATBUS_CRYPTO_ALGORITHM_TYPE) protocol.ATBUS_CRYPTO_ALGORITHM_TYPE

NegotiateCryptoAlgorithm negotiates the crypto algorithm based on supported algorithms.

func NegotiateKDF

func NegotiateKDF(local, remote []protocol.ATBUS_CRYPTO_KDF_TYPE) protocol.ATBUS_CRYPTO_KDF_TYPE

NegotiateKDF negotiates the KDF type based on supported types.

func NegotiateKeyExchange

NegotiateKeyExchange negotiates the key exchange algorithm.

Types

type CompressionSession

type CompressionSession struct {
	Algorithm protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE
	// contains filtered or unexported fields
}

CompressionSession handles compression and decompression.

func NewCompressionSession

func NewCompressionSession() *CompressionSession

NewCompressionSession creates a new compression session.

func (*CompressionSession) Compress

func (cs *CompressionSession) Compress(data []byte) ([]byte, error)

Compress compresses the data using the configured algorithm.

func (*CompressionSession) Decompress

func (cs *CompressionSession) Decompress(data []byte, originalSize int) ([]byte, error)

Decompress decompresses the data using the configured algorithm.

func (*CompressionSession) GetAlgorithm

GetAlgorithm returns the current compression algorithm.

func (*CompressionSession) SetAlgorithm

SetAlgorithm sets the compression algorithm.

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

func CreateConnection

func CreateConnection(owner *Node, addr string) *Connection

func (*Connection) AddStatFault

func (c *Connection) AddStatFault() uint64

func (*Connection) CheckFlag

func (c *Connection) CheckFlag(flag types.ConnectionFlag) bool

func (*Connection) ClearStatFault

func (c *Connection) ClearStatFault()

func (*Connection) Connect

func (c *Connection) Connect() types.ErrorType

func (*Connection) Disconnect

func (c *Connection) Disconnect() types.ErrorType

func (*Connection) GetAddress

func (c *Connection) GetAddress() types.ChannelAddress

func (*Connection) GetBinding

func (c *Connection) GetBinding() types.Endpoint

func (*Connection) GetConnectionContext

func (c *Connection) GetConnectionContext() types.ConnectionContext

func (*Connection) GetIoStreamConnection

func (c *Connection) GetIoStreamConnection() *io_stream.IoStreamConnection

GetIoStreamConnection returns the underlying IoStreamConnection.

func (*Connection) GetStatistic

func (c *Connection) GetStatistic() types.ConnectionStatistic

func (*Connection) GetStatus

func (c *Connection) GetStatus() types.ConnectionState

func (*Connection) IsConnected

func (c *Connection) IsConnected() bool

func (*Connection) IsRunning

func (c *Connection) IsRunning() bool

func (*Connection) Listen

func (c *Connection) Listen() types.ErrorType

func (*Connection) Proc

func (c *Connection) Proc() types.ErrorType

func (*Connection) Push

func (c *Connection) Push(data []byte) types.ErrorType

func (*Connection) RemoveOwnerChecker

func (c *Connection) RemoveOwnerChecker()

func (*Connection) Reset

func (c *Connection) Reset()

func (*Connection) SetIoStreamConnection

func (c *Connection) SetIoStreamConnection(conn *io_stream.IoStreamConnection)

SetIoStreamConnection sets the underlying IoStreamConnection.

func (*Connection) SetTemporary

func (c *Connection) SetTemporary()

type ConnectionContext

type ConnectionContext struct {
	// contains filtered or unexported fields
}

ConnectionContext manages the connection state including crypto and compression.

func NewConnectionContext

func NewConnectionContext(keyExchangeType protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE) *ConnectionContext

NewConnectionContext creates a new connection context with default settings.

func (*ConnectionContext) CreateHandshakeData

func (cc *ConnectionContext) CreateHandshakeData() (*CryptoHandshakeData, error)

CreateHandshakeData creates the handshake data for initiating a handshake.

func (*ConnectionContext) GetCompressSelectAlgorithm

func (cc *ConnectionContext) GetCompressSelectAlgorithm() protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE

GetCompressSelectAlgorithm returns the compression algorithm selected during handshake.

func (*ConnectionContext) GetCompression

func (cc *ConnectionContext) GetCompression() *CompressionSession

GetCompression returns the compression session.

func (*ConnectionContext) GetCryptoKeyExchangeAlgorithm

func (cc *ConnectionContext) GetCryptoKeyExchangeAlgorithm() protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE

GetCryptoKeyExchangeAlgorithm returns the key exchange algorithm used for crypto handshake.

func (*ConnectionContext) GetCryptoSelectAlgorithm

func (cc *ConnectionContext) GetCryptoSelectAlgorithm() protocol.ATBUS_CRYPTO_ALGORITHM_TYPE

GetCryptoSelectAlgorithm returns the crypto algorithm selected during handshake.

func (*ConnectionContext) GetCryptoSelectKdfType

func (cc *ConnectionContext) GetCryptoSelectKdfType() protocol.ATBUS_CRYPTO_KDF_TYPE

GetCryptoSelectKdfType returns the KDF type selected during handshake.

func (*ConnectionContext) GetHandshakeStartTime

func (cc *ConnectionContext) GetHandshakeStartTime() time.Time

GetHandshakeStartTime returns the time when the handshake was started.

func (*ConnectionContext) GetNextSequence

func (cc *ConnectionContext) GetNextSequence() uint64

GetNextSequence returns the next sequence number.

func (*ConnectionContext) GetReadCrypto

func (cc *ConnectionContext) GetReadCrypto() *CryptoSession

GetReadCrypto returns the read crypto session.

func (*ConnectionContext) GetSupportedCompressionAlgorithms

func (cc *ConnectionContext) GetSupportedCompressionAlgorithms() []protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE

GetSupportedCompressionAlgorithms returns the supported compression algorithms.

func (*ConnectionContext) GetSupportedCryptoAlgorithms

func (cc *ConnectionContext) GetSupportedCryptoAlgorithms() []protocol.ATBUS_CRYPTO_ALGORITHM_TYPE

GetSupportedCryptoAlgorithms returns the supported crypto algorithms.

func (*ConnectionContext) GetSupportedKeyExchange

func (cc *ConnectionContext) GetSupportedKeyExchange() protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE

GetSupportedKeyExchange returns the supported key exchange type.

func (*ConnectionContext) GetWriteCrypto

func (cc *ConnectionContext) GetWriteCrypto() *CryptoSession

GetWriteCrypto returns the write crypto session.

func (*ConnectionContext) HandshakeGenerateSelfKey

func (cc *ConnectionContext) HandshakeGenerateSelfKey(peerSequenceId uint64) error_code.ErrorType

HandshakeGenerateSelfKey generates the local ECDH key pair for handshake. In client mode, peerSequenceId should be 0 to generate a new sequence. In server mode, peerSequenceId should be the peer's handshake sequence.

func (*ConnectionContext) HandshakeReadPeerKey

func (cc *ConnectionContext) HandshakeReadPeerKey(peerPubKey *protocol.CryptoHandshakeData,
	supportedCryptoAlgorithms []protocol.ATBUS_CRYPTO_ALGORITHM_TYPE,
) error_code.ErrorType

HandshakeReadPeerKey reads the peer's public key and computes the shared secret.

func (*ConnectionContext) HandshakeWriteSelfPublicKey

func (cc *ConnectionContext) HandshakeWriteSelfPublicKey(
	selfPubKey *protocol.CryptoHandshakeData,
	supportedCryptoAlgorithms []protocol.ATBUS_CRYPTO_ALGORITHM_TYPE,
) error_code.ErrorType

HandshakeWriteSelfPublicKey writes the local public key to the handshake data structure.

func (*ConnectionContext) IsClosing

func (cc *ConnectionContext) IsClosing() bool

IsClosing returns true if the connection is closing.

func (*ConnectionContext) IsCompressionAlgorithmSupported

func (cc *ConnectionContext) IsCompressionAlgorithmSupported(
	algorithm protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE,
) bool

IsCompressionAlgorithmSupported reports whether the specified compression algorithm is supported.

func (*ConnectionContext) IsHandshakeDone

func (cc *ConnectionContext) IsHandshakeDone() bool

IsHandshakeDone returns true if the handshake is completed.

func (*ConnectionContext) NegotiateCompressionWithPeer

func (cc *ConnectionContext) NegotiateCompressionWithPeer(peerAlgorithms []protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE) error

NegotiateCompressionWithPeer negotiates compression with peer's supported algorithms.

func (*ConnectionContext) PackMessage

func (cc *ConnectionContext) PackMessage(m *types.Message, protocolVersion int32, maxBodySize int) (*buffer.StaticBufferBlock, error_code.ErrorType)

PackMessage packs a Message into a StaticBufferBlock for transmission. This matches the C++ connection_context::pack_message signature. Message frame format: vint(header_length) + header + body

func (*ConnectionContext) ProcessHandshakeData

func (cc *ConnectionContext) ProcessHandshakeData(peerData *CryptoHandshakeData) error

ProcessHandshakeData processes the received handshake data and completes the key exchange.

func (*ConnectionContext) SetClosing

func (cc *ConnectionContext) SetClosing(closing bool)

SetClosing sets the closing state.

func (*ConnectionContext) SetSupportedCompressionAlgorithms

func (cc *ConnectionContext) SetSupportedCompressionAlgorithms(algorithms []protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE)

SetSupportedCompressionAlgorithms sets the supported compression algorithms.

func (*ConnectionContext) SetSupportedCryptoAlgorithms

func (cc *ConnectionContext) SetSupportedCryptoAlgorithms(algorithms []protocol.ATBUS_CRYPTO_ALGORITHM_TYPE)

SetSupportedCryptoAlgorithms sets the supported crypto algorithms.

func (*ConnectionContext) SetSupportedKeyExchange

func (cc *ConnectionContext) SetSupportedKeyExchange(keyExchange protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE)

SetSupportedKeyExchange sets the supported key exchange type.

func (*ConnectionContext) SetupCryptoWithKey

func (cc *ConnectionContext) SetupCryptoWithKey(
	algorithm protocol.ATBUS_CRYPTO_ALGORITHM_TYPE, key []byte, iv []byte,
) error_code.ErrorType

SetupCryptoWithKey directly sets the encryption key and IV, skipping key exchange. This is primarily used for testing purposes.

func (*ConnectionContext) UnpackMessage

func (cc *ConnectionContext) UnpackMessage(m *types.Message, input []byte, maxBodySize int) error_code.ErrorType

UnpackMessage unpacks binary data into a Message. This matches the C++ connection_context::unpack_message signature. Message frame format: vint(header_length) + header + body

func (*ConnectionContext) UpdateCompressionAlgorithm

func (cc *ConnectionContext) UpdateCompressionAlgorithm(
	algorithms []protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE,
) error_code.ErrorType

UpdateCompressionAlgorithm updates the list of supported compression algorithms.

type CryptoHandshakeData

type CryptoHandshakeData struct {
	Sequence    uint64
	KeyExchange protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE
	KDFTypes    []protocol.ATBUS_CRYPTO_KDF_TYPE
	Algorithms  []protocol.ATBUS_CRYPTO_ALGORITHM_TYPE
	PublicKey   []byte
	IVSize      uint32
	TagSize     uint32
}

CryptoHandshakeData holds the data for crypto handshake.

type CryptoSession

type CryptoSession struct {

	// Negotiated algorithm and parameters
	Algorithm   protocol.ATBUS_CRYPTO_ALGORITHM_TYPE
	KeyExchange protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE
	KDFType     protocol.ATBUS_CRYPTO_KDF_TYPE
	Key         []byte
	IV          []byte
	TagSize     uint32
	IVSize      uint32
	// contains filtered or unexported fields
}

CryptoSession holds the crypto session state.

func NewCryptoSession

func NewCryptoSession() *CryptoSession

NewCryptoSession creates a new crypto session.

func (*CryptoSession) ComputeSharedSecret

func (cs *CryptoSession) ComputeSharedSecret(peerPublicKeyBytes []byte) ([]byte, error)

ComputeSharedSecret computes the shared secret using the peer's public key.

func (*CryptoSession) Decrypt

func (cs *CryptoSession) Decrypt(ciphertext []byte) ([]byte, error)

Decrypt decrypts the ciphertext data.

func (*CryptoSession) DecryptWithIV

func (cs *CryptoSession) DecryptWithIV(ciphertext []byte, iv []byte) ([]byte, error)

DecryptWithIV decrypts data for non-AEAD algorithms that require an IV (e.g., CBC). The IV must be provided by the caller and is not expected to be prepended to ciphertext.

func (*CryptoSession) DecryptWithIVAndAAD

func (cs *CryptoSession) DecryptWithIVAndAAD(ciphertext []byte, iv []byte, aad []byte) ([]byte, error)

DecryptWithIVAndAAD decrypts data using AEAD with a caller-provided IV/nonce and AAD. This is used for cross-language compatibility where IV/AAD are carried in message headers.

func (*CryptoSession) DeriveKey

func (cs *CryptoSession) DeriveKey(sharedSecret []byte, algorithm protocol.ATBUS_CRYPTO_ALGORITHM_TYPE, kdfType protocol.ATBUS_CRYPTO_KDF_TYPE) error

DeriveKey derives the encryption key and IV from the shared secret using HKDF.

func (*CryptoSession) Encrypt

func (cs *CryptoSession) Encrypt(plaintext []byte) ([]byte, error)

Encrypt encrypts the plaintext data.

func (*CryptoSession) EncryptWithIV

func (cs *CryptoSession) EncryptWithIV(plaintext []byte, iv []byte) ([]byte, error)

EncryptWithIV encrypts data for non-AEAD algorithms that require an IV (e.g., CBC). The IV must be provided by the caller and will not be prepended to ciphertext.

func (*CryptoSession) EncryptWithIVAndAAD

func (cs *CryptoSession) EncryptWithIVAndAAD(plaintext []byte, iv []byte, aad []byte) ([]byte, error)

EncryptWithIVAndAAD encrypts data using AEAD with a caller-provided IV/nonce and AAD. This is used for cross-language compatibility where IV/AAD are carried in message headers.

func (*CryptoSession) GenerateKeyPair

func (cs *CryptoSession) GenerateKeyPair(keyExchange protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE) error

GenerateKeyPair generates a new ECDH key pair for the given key exchange type.

func (*CryptoSession) GetPublicKey

func (cs *CryptoSession) GetPublicKey() []byte

GetPublicKey returns the public key bytes.

func (*CryptoSession) IsInitialized

func (cs *CryptoSession) IsInitialized() bool

IsInitialized returns true if the crypto session is initialized.

func (*CryptoSession) SetKey

func (cs *CryptoSession) SetKey(key, iv []byte, algorithm protocol.ATBUS_CRYPTO_ALGORITHM_TYPE) error

SetKey directly sets the encryption key and IV.

type Endpoint

type Endpoint struct {
	// contains filtered or unexported fields
}

func CreateEndpoint

func CreateEndpoint(owner *Node, id types.BusIdType, pid int64, hostname string) *Endpoint

func (*Endpoint) AddConnection

func (e *Endpoint) AddConnection(conn types.Connection, forceData bool) bool

func (*Endpoint) AddListenAddress

func (e *Endpoint) AddListenAddress(addr string)

func (*Endpoint) AddPingTimer

func (e *Endpoint) AddPingTimer()

func (*Endpoint) AddStatisticFault

func (e *Endpoint) AddStatisticFault() uint64

func (*Endpoint) ClearListenAddress

func (e *Endpoint) ClearListenAddress()

func (*Endpoint) ClearPingTimer

func (e *Endpoint) ClearPingTimer()

func (*Endpoint) ClearStatisticFault

func (e *Endpoint) ClearStatisticFault()

func (*Endpoint) GetCtrlConnection

func (e *Endpoint) GetCtrlConnection(peer types.Endpoint) types.Connection

func (*Endpoint) GetDataConnection

func (e *Endpoint) GetDataConnection(peer types.Endpoint, enableFallbackCtrl bool) types.Connection

func (*Endpoint) GetDataConnectionCount

func (e *Endpoint) GetDataConnectionCount(enableFallbackCtrl bool) int

func (*Endpoint) GetFlag

func (e *Endpoint) GetFlag(f types.EndpointFlag) bool

func (*Endpoint) GetFlags

func (e *Endpoint) GetFlags() uint32

func (*Endpoint) GetHashCode

func (e *Endpoint) GetHashCode() string

func (*Endpoint) GetHostname

func (e *Endpoint) GetHostname() string

func (*Endpoint) GetId

func (e *Endpoint) GetId() types.BusIdType

func (*Endpoint) GetListenAddress

func (e *Endpoint) GetListenAddress() []types.ChannelAddress

func (*Endpoint) GetOwner

func (e *Endpoint) GetOwner() types.Node

func (*Endpoint) GetPid

func (e *Endpoint) GetPid() int32

func (*Endpoint) GetStatisticCreatedTime

func (e *Endpoint) GetStatisticCreatedTime() time.Time

func (*Endpoint) GetStatisticLastPong

func (e *Endpoint) GetStatisticLastPong() time.Time

func (*Endpoint) GetStatisticPingDelay

func (e *Endpoint) GetStatisticPingDelay() time.Duration

func (*Endpoint) GetStatisticPullSize

func (e *Endpoint) GetStatisticPullSize() uint64

func (*Endpoint) GetStatisticPullTimes

func (e *Endpoint) GetStatisticPullTimes() uint64

func (*Endpoint) GetStatisticPushFailedSize

func (e *Endpoint) GetStatisticPushFailedSize() uint64

func (*Endpoint) GetStatisticPushFailedTimes

func (e *Endpoint) GetStatisticPushFailedTimes() uint64

func (*Endpoint) GetStatisticPushStartSize

func (e *Endpoint) GetStatisticPushStartSize() uint64

func (*Endpoint) GetStatisticPushStartTimes

func (e *Endpoint) GetStatisticPushStartTimes() uint64

func (*Endpoint) GetStatisticPushSuccessSize

func (e *Endpoint) GetStatisticPushSuccessSize() uint64

func (*Endpoint) GetStatisticPushSuccessTimes

func (e *Endpoint) GetStatisticPushSuccessTimes() uint64

func (*Endpoint) GetStatisticUnfinishedPing

func (e *Endpoint) GetStatisticUnfinishedPing() uint64

func (*Endpoint) IsAvailable

func (e *Endpoint) IsAvailable() bool

func (*Endpoint) IsSchemeSupported

func (e *Endpoint) IsSchemeSupported(scheme string) bool

func (*Endpoint) RemoveConnection

func (e *Endpoint) RemoveConnection(conn types.Connection) bool

func (*Endpoint) Reset

func (e *Endpoint) Reset()

func (*Endpoint) SetFlag

func (e *Endpoint) SetFlag(f types.EndpointFlag, v bool)

func (*Endpoint) SetStatisticPingDelay

func (e *Endpoint) SetStatisticPingDelay(pd time.Duration, pongTimepoint time.Time)

func (*Endpoint) SetStatisticUnfinishedPing

func (e *Endpoint) SetStatisticUnfinishedPing(p uint64)

func (*Endpoint) UpdateHashCode

func (e *Endpoint) UpdateHashCode(code string)

func (*Endpoint) UpdateSupportSchemes

func (e *Endpoint) UpdateSupportSchemes(schemes []string)

type Node

type Node struct {
	// contains filtered or unexported fields
}

func (*Node) AddConnectionGcList

func (n *Node) AddConnectionGcList(conn types.Connection)

func (*Node) AddEndpoint

func (n *Node) AddEndpoint(ep types.Endpoint) error_code.ErrorType

func (*Node) AddEndpointGcList

func (n *Node) AddEndpointGcList(ep types.Endpoint)

func (*Node) AddStatisticDispatchTimes

func (n *Node) AddStatisticDispatchTimes()

func (*Node) AllocateMessageSequence

func (n *Node) AllocateMessageSequence() uint64

func (*Node) CheckAccessHash

func (n *Node) CheckAccessHash(accessKey *protocol.AccessData, plainText string, conn types.Connection) bool

func (*Node) CheckFlag

func (n *Node) CheckFlag(f types.NodeFlag) bool

func (*Node) Connect

func (n *Node) Connect(address string) error_code.ErrorType

func (*Node) ConnectWithEndpoint

func (n *Node) ConnectWithEndpoint(address string, ep types.Endpoint) error_code.ErrorType

func (*Node) CreateEndpoint

func (n *Node) CreateEndpoint(tid types.BusIdType, hostName string, pid int) types.Endpoint

func (*Node) DisableDebugMessageVerbose

func (n *Node) DisableDebugMessageVerbose()

func (*Node) Disconnect

func (n *Node) Disconnect(id types.BusIdType) error_code.ErrorType

func (*Node) DispatchAllSelfMessages

func (n *Node) DispatchAllSelfMessages() int32

func (*Node) EnableDebugMessageVerbose

func (n *Node) EnableDebugMessageVerbose()

func (*Node) FatalShutdown

func (n *Node) FatalShutdown(ep types.Endpoint, conn types.Connection, code error_code.ErrorType, err error) error_code.ErrorType

func (*Node) GetAccessCode

func (n *Node) GetAccessCode() string

func (*Node) GetConfigure

func (n *Node) GetConfigure() *types.NodeConfigure

func (*Node) GetContext

func (n *Node) GetContext() context.Context

func (*Node) GetCryptoKeyExchangeType

func (n *Node) GetCryptoKeyExchangeType() protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE

func (*Node) GetEndpoint

func (n *Node) GetEndpoint(tid types.BusIdType) types.Endpoint

func (*Node) GetEventHandleOnAddEndpoint

func (n *Node) GetEventHandleOnAddEndpoint() types.NodeOnEndpointEventFunc

func (*Node) GetEventHandleOnAvailable

func (n *Node) GetEventHandleOnAvailable() types.NodeOnNodeUpFunc

func (*Node) GetEventHandleOnCloseConnection

func (n *Node) GetEventHandleOnCloseConnection() types.NodeOnCloseConnectionFunc

func (*Node) GetEventHandleOnCustomCommandRequest

func (n *Node) GetEventHandleOnCustomCommandRequest() types.NodeOnCustomCommandRequestFunc

func (*Node) GetEventHandleOnCustomCommandResponse

func (n *Node) GetEventHandleOnCustomCommandResponse() types.NodeOnCustomCommandResponseFunc

func (*Node) GetEventHandleOnForwardRequest

func (n *Node) GetEventHandleOnForwardRequest() types.NodeOnForwardRequestFunc

func (*Node) GetEventHandleOnForwardResponse

func (n *Node) GetEventHandleOnForwardResponse() types.NodeOnForwardResponseFunc

func (*Node) GetEventHandleOnInvalidConnection

func (n *Node) GetEventHandleOnInvalidConnection() types.NodeOnInvalidConnectionFunc

func (*Node) GetEventHandleOnNewConnection

func (n *Node) GetEventHandleOnNewConnection() types.NodeOnNewConnectionFunc

func (*Node) GetEventHandleOnPingEndpoint

func (n *Node) GetEventHandleOnPingEndpoint() types.NodeOnPingPongEndpointFunc

func (*Node) GetEventHandleOnPongEndpoint

func (n *Node) GetEventHandleOnPongEndpoint() types.NodeOnPingPongEndpointFunc

func (*Node) GetEventHandleOnRegister

func (n *Node) GetEventHandleOnRegister() types.NodeOnRegisterFunc

func (*Node) GetEventHandleOnRemoveEndpoint

func (n *Node) GetEventHandleOnRemoveEndpoint() types.NodeOnEndpointEventFunc

func (*Node) GetEventHandleOnShutdown

func (n *Node) GetEventHandleOnShutdown() types.NodeOnNodeDownFunc

func (*Node) GetEventHandleOnTopologyUpdateUpstream

func (n *Node) GetEventHandleOnTopologyUpdateUpstream() types.NodeOnTopologyUpdateUpstreamFunc

func (*Node) GetHostname

func (n *Node) GetHostname() string

func (*Node) GetId

func (n *Node) GetId() types.BusIdType

func (*Node) GetImmediateEndpointSet

func (n *Node) GetImmediateEndpointSet() types.EndpointCollectionType

func (*Node) GetIoStreamChannel

func (n *Node) GetIoStreamChannel() types.IoStreamChannel

func (*Node) GetIoStreamConfigure

func (n *Node) GetIoStreamConfigure() *types.IoStreamConfigure

func (*Node) GetListenList

func (n *Node) GetListenList() []types.ChannelAddress

func (*Node) GetLogger

func (n *Node) GetLogger() *utils_log.Logger

func (*Node) GetPid

func (n *Node) GetPid() int

func (*Node) GetProtocolMinimalVersion

func (n *Node) GetProtocolMinimalVersion() int32

func (*Node) GetProtocolVersion

func (n *Node) GetProtocolVersion() int32

func (*Node) GetSelfEndpoint

func (n *Node) GetSelfEndpoint() types.Endpoint

func (*Node) GetSelfEndpointInstance

func (n *Node) GetSelfEndpointInstance() types.Endpoint

func (*Node) GetState

func (n *Node) GetState() types.NodeState

func (*Node) GetTimerTick

func (n *Node) GetTimerTick() time.Time

func (*Node) GetTopologyRegistry

func (n *Node) GetTopologyRegistry() types.TopologyRegistry

func (*Node) GetTopologyRelation

func (n *Node) GetTopologyRelation(tid types.BusIdType) (types.TopologyRelationType, types.TopologyPeer)

func (*Node) GetUpstreamEndpoint

func (n *Node) GetUpstreamEndpoint() types.Endpoint

func (*Node) GetUpstreamEndpointInstance

func (n *Node) GetUpstreamEndpointInstance() types.Endpoint

func (*Node) Init

func (*Node) IsDebugMessageVerboseEnabled

func (n *Node) IsDebugMessageVerboseEnabled() bool

func (*Node) IsEndpointAvailable

func (n *Node) IsEndpointAvailable(tid types.BusIdType) bool

func (*Node) Listen

func (n *Node) Listen(address string) error_code.ErrorType

func (*Node) LogDebug

func (n *Node) LogDebug(ep types.Endpoint, conn types.Connection, m *types.Message, msg string, args ...any)

func (*Node) LogError

func (n *Node) LogError(ep types.Endpoint, conn types.Connection, status int, errcode error_code.ErrorType, msg string, args ...any)

func (*Node) LogInfo

func (n *Node) LogInfo(ep types.Endpoint, conn types.Connection, msg string, args ...any)

func (*Node) OnActived

func (n *Node) OnActived()

func (*Node) OnCustomCommandRequest

func (n *Node) OnCustomCommandRequest(ep types.Endpoint, conn types.Connection, from types.BusIdType, argv [][]byte) (error_code.ErrorType, [][]byte)

func (*Node) OnCustomCommandResponse

func (n *Node) OnCustomCommandResponse(ep types.Endpoint, conn types.Connection, from types.BusIdType, argv [][]byte, sequence uint64) error_code.ErrorType

func (*Node) OnDisconnect

func (n *Node) OnDisconnect(ep types.Endpoint, conn types.Connection) error_code.ErrorType

func (*Node) OnNewConnection

func (n *Node) OnNewConnection(conn types.Connection) error_code.ErrorType

func (*Node) OnPing

func (n *Node) OnPing(ep types.Endpoint, message *types.Message, body *protocol.PingData) error_code.ErrorType

func (*Node) OnPong

func (n *Node) OnPong(ep types.Endpoint, message *types.Message, body *protocol.PingData) error_code.ErrorType

func (*Node) OnReceiveData

func (n *Node) OnReceiveData(ep types.Endpoint, conn types.Connection, message *types.Message, data []byte)

func (*Node) OnReceiveForwardResponse

func (n *Node) OnReceiveForwardResponse(ep types.Endpoint, conn types.Connection, message *types.Message)

func (*Node) OnRegister

func (n *Node) OnRegister(ep types.Endpoint, conn types.Connection, code error_code.ErrorType)

func (*Node) OnShutdown

func (n *Node) OnShutdown(code error_code.ErrorType) error_code.ErrorType

func (*Node) OnUpstreamRegisterDone

func (n *Node) OnUpstreamRegisterDone()

func (*Node) Poll

func (n *Node) Poll() error_code.ErrorType

func (*Node) Proc

func (n *Node) Proc(now time.Time) error_code.ErrorType

func (*Node) ReloadCompression

func (n *Node) ReloadCompression(compressionAllowAlgorithms []protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE,
	compressionLevel protocol.ATBUS_COMPRESSION_LEVEL,
) error_code.ErrorType

func (*Node) ReloadCrypto

func (n *Node) ReloadCrypto(cryptoKeyExchangeType protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE,
	cryptoKeyRefreshInterval time.Duration,
	cryptoAllowAlgorithms []protocol.ATBUS_CRYPTO_ALGORITHM_TYPE,
) error_code.ErrorType

func (*Node) RemoveEndpoint

func (n *Node) RemoveEndpoint(ep types.Endpoint) error_code.ErrorType

func (*Node) Reset

func (n *Node) Reset() error_code.ErrorType

func (*Node) SendCtrlMessage

func (n *Node) SendCtrlMessage(tid types.BusIdType, message *types.Message, options *types.NodeSendDataOptions) (error_code.ErrorType, types.Endpoint, types.Connection)

func (*Node) SendCustomCommand

func (n *Node) SendCustomCommand(tid types.BusIdType, args [][]byte) error_code.ErrorType

func (*Node) SendCustomCommandWithOptions

func (n *Node) SendCustomCommandWithOptions(tid types.BusIdType, args [][]byte, options *types.NodeSendDataOptions) error_code.ErrorType

func (*Node) SendData

func (n *Node) SendData(tid types.BusIdType, t int32, data []byte) error_code.ErrorType

func (*Node) SendDataMessage

func (n *Node) SendDataMessage(tid types.BusIdType, message *types.Message, options *types.NodeSendDataOptions) (error_code.ErrorType, types.Endpoint, types.Connection)

func (*Node) SendDataWithOptions

func (n *Node) SendDataWithOptions(tid types.BusIdType, t int32, data []byte, options *types.NodeSendDataOptions) error_code.ErrorType

func (*Node) SetEventHandleOnAddEndpoint

func (n *Node) SetEventHandleOnAddEndpoint(handle types.NodeOnEndpointEventFunc)

func (*Node) SetEventHandleOnAvailable

func (n *Node) SetEventHandleOnAvailable(handle types.NodeOnNodeUpFunc)

func (*Node) SetEventHandleOnCloseConnection

func (n *Node) SetEventHandleOnCloseConnection(handle types.NodeOnCloseConnectionFunc)

func (*Node) SetEventHandleOnCustomCommandRequest

func (n *Node) SetEventHandleOnCustomCommandRequest(handle types.NodeOnCustomCommandRequestFunc)

func (*Node) SetEventHandleOnCustomCommandResponse

func (n *Node) SetEventHandleOnCustomCommandResponse(handle types.NodeOnCustomCommandResponseFunc)

func (*Node) SetEventHandleOnForwardRequest

func (n *Node) SetEventHandleOnForwardRequest(handle types.NodeOnForwardRequestFunc)

func (*Node) SetEventHandleOnForwardResponse

func (n *Node) SetEventHandleOnForwardResponse(handle types.NodeOnForwardResponseFunc)

func (*Node) SetEventHandleOnInvalidConnection

func (n *Node) SetEventHandleOnInvalidConnection(handle types.NodeOnInvalidConnectionFunc)

func (*Node) SetEventHandleOnNewConnection

func (n *Node) SetEventHandleOnNewConnection(handle types.NodeOnNewConnectionFunc)

func (*Node) SetEventHandleOnPingEndpoint

func (n *Node) SetEventHandleOnPingEndpoint(handle types.NodeOnPingPongEndpointFunc)

func (*Node) SetEventHandleOnPongEndpoint

func (n *Node) SetEventHandleOnPongEndpoint(handle types.NodeOnPingPongEndpointFunc)

func (*Node) SetEventHandleOnRegister

func (n *Node) SetEventHandleOnRegister(handle types.NodeOnRegisterFunc)

func (*Node) SetEventHandleOnRemoveEndpoint

func (n *Node) SetEventHandleOnRemoveEndpoint(handle types.NodeOnEndpointEventFunc)

func (*Node) SetEventHandleOnShutdown

func (n *Node) SetEventHandleOnShutdown(handle types.NodeOnNodeDownFunc)

func (*Node) SetEventHandleOnTopologyUpdateUpstream

func (n *Node) SetEventHandleOnTopologyUpdateUpstream(handle types.NodeOnTopologyUpdateUpstreamFunc)

func (*Node) SetHostname

func (n *Node) SetHostname(hostname string, force bool) bool

func (*Node) SetLogger

func (n *Node) SetLogger(logger *utils_log.Logger)

func (*Node) SetTopologyUpstream

func (n *Node) SetTopologyUpstream(tid types.BusIdType)

func (*Node) Shutdown

func (n *Node) Shutdown(reason error_code.ErrorType) error_code.ErrorType

func (*Node) Start

func (n *Node) Start() error_code.ErrorType

func (*Node) StartWithConfigure

func (n *Node) StartWithConfigure(conf *types.StartConfigure) error_code.ErrorType

type TopologyPeer

type TopologyPeer struct {
	// contains filtered or unexported fields
}

func CreateTopologyPeer

func CreateTopologyPeer(busId types.BusIdType) *TopologyPeer

func (*TopologyPeer) ContainsDownstream

func (p *TopologyPeer) ContainsDownstream(busId types.BusIdType) bool

func (*TopologyPeer) ForeachDownstream

func (p *TopologyPeer) ForeachDownstream(fn func(peer types.TopologyPeer) bool) bool

func (*TopologyPeer) GetBusId

func (p *TopologyPeer) GetBusId() types.BusIdType

func (*TopologyPeer) GetTopologyData

func (p *TopologyPeer) GetTopologyData() *types.TopologyData

func (*TopologyPeer) GetUpstream

func (p *TopologyPeer) GetUpstream() types.TopologyPeer

type TopologyRegistry

type TopologyRegistry struct {
	// contains filtered or unexported fields
}

func CreateTopologyRegistry

func CreateTopologyRegistry() *TopologyRegistry

func (*TopologyRegistry) CheckPolicy

func (r *TopologyRegistry) CheckPolicy(rule *types.TopologyPolicyRule, fromData *types.TopologyData, toData *types.TopologyData) bool

func (*TopologyRegistry) ForeachPeer

func (r *TopologyRegistry) ForeachPeer(fn func(peer types.TopologyPeer) bool) bool

func (*TopologyRegistry) GetPeer

func (*TopologyRegistry) GetPeerInstance

func (r *TopologyRegistry) GetPeerInstance(busId types.BusIdType) *TopologyPeer

func (*TopologyRegistry) GetRelation

func (*TopologyRegistry) RemovePeer

func (r *TopologyRegistry) RemovePeer(targetBusId types.BusIdType)

func (*TopologyRegistry) UpdatePeer

func (r *TopologyRegistry) UpdatePeer(targetBusId types.BusIdType, upstreamBusId types.BusIdType, data *types.TopologyData) bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL