libatbus_channel_iostream

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2026 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Package libatbus_channel_iostream provides IO stream channel implementation for libatbus. It handles TCP/Unix socket/Named pipe connections with proper frame encoding/decoding.

Index

Constants

View Source
const (
	// DefaultWriteQueueSize is the default size of the write queue
	DefaultWriteQueueSize = 1024
	// DefaultReadBufferSize is the default read buffer size
	DefaultReadBufferSize = 64 * 1024
)
View Source
const (
	// HashSize is the size of the hash field in bytes (murmur3_x86_32)
	HashSize = 4
	// MaxVintSize is the maximum size of a varint encoded value (for uint64)
	MaxVintSize = 10
	// MaxFrameHeaderSize is the maximum possible frame header size
	MaxFrameHeaderSize = HashSize + MaxVintSize
)
View Source
const (
	// IoStreamCallbackEventTypeAccepted is triggered when a new connection is accepted.
	IoStreamCallbackEventTypeAccepted = types.IoStreamCallbackEventType_Accepted
	// IoStreamCallbackEventTypeConnected is triggered when an outbound connection is established.
	IoStreamCallbackEventTypeConnected = types.IoStreamCallbackEventType_Connected
	// IoStreamCallbackEventTypeDisconnected is triggered when a connection is closed.
	IoStreamCallbackEventTypeDisconnected = types.IoStreamCallbackEventType_Disconnected
	// IoStreamCallbackEventTypeReceived is triggered when data is received.
	IoStreamCallbackEventTypeReceived = types.IoStreamCallbackEventType_Received
	// IoStreamCallbackEventTypeWritten is triggered when data has been written.
	IoStreamCallbackEventTypeWritten = types.IoStreamCallbackEventType_Written
	// IoStreamCallbackEventTypeMax is the maximum event type value.
	IoStreamCallbackEventTypeMax = types.IoStreamCallbackEventType_Max
)
View Source
const (
	// IoStreamConnectionFlagListen indicates this is a listening connection.
	IoStreamConnectionFlagListen = types.IoStreamConnectionFlag_Listen
	// IoStreamConnectionFlagConnect indicates this is an outbound connection.
	IoStreamConnectionFlagConnect = types.IoStreamConnectionFlag_Connect
	// IoStreamConnectionFlagAccept indicates this connection was accepted from a listener.
	IoStreamConnectionFlagAccept = types.IoStreamConnectionFlag_Accept
	// IoStreamConnectionFlagWriting indicates the connection is currently writing.
	IoStreamConnectionFlagWriting = types.IoStreamConnectionFlag_Writing
	// IoStreamConnectionFlagClosing indicates the connection is being closed.
	IoStreamConnectionFlagClosing = types.IoStreamConnectionFlag_Closing
)
View Source
const (
	// IoStreamConnectionStatusCreated indicates the connection has been created but not connected.
	IoStreamConnectionStatusCreated = types.IoStreamConnectionStatus_Created
	// IoStreamConnectionStatusConnected indicates the connection is established.
	IoStreamConnectionStatusConnected = types.IoStreamConnectionStatus_Connected
	// IoStreamConnectionStatusDisconnecting indicates the connection is being disconnected.
	IoStreamConnectionStatusDisconnecting = types.IoStreamConnectionStatus_Disconnecting
	// IoStreamConnectionStatusDisconnected indicates the connection has been closed.
	IoStreamConnectionStatusDisconnected = types.IoStreamConnectionStatus_Disconnected
)
View Source
const (
	// IoStreamChannelFlagIsLoopOwner indicates the channel owns the event loop.
	IoStreamChannelFlagIsLoopOwner = types.IoStreamChannelFlag_IsLoopOwner
	// IoStreamChannelFlagClosing indicates the channel is being closed.
	IoStreamChannelFlagClosing = types.IoStreamChannelFlag_Closing
	// IoStreamChannelFlagInCallback indicates we're currently in a callback.
	IoStreamChannelFlagInCallback = types.IoStreamChannelFlag_InCallback
)

Variables

View Source
var (
	// ErrBufferTooSmall indicates the buffer is too small to hold the frame
	ErrBufferTooSmall = errors.New("buffer too small for frame")
	// ErrInvalidFrameHash indicates the frame hash verification failed
	ErrInvalidFrameHash = errors.New("invalid frame hash")
	// ErrInvalidFrameLength indicates the frame length is invalid
	ErrInvalidFrameLength = errors.New("invalid frame length")
	// ErrIncompleteFrame indicates the frame is incomplete (need more data)
	ErrIncompleteFrame = errors.New("incomplete frame data")
)
View Source
var SetDefaultIoStreamConfigure = types.SetDefaultIoStreamConfigure

SetDefaultIoStreamConfigure is an alias for types.SetDefaultIoStreamConfigure.

Functions

func CalculateHash

func CalculateHash(payload []byte) uint32

CalculateHash calculates the murmur3_x86_32 hash of the payload with seed 0. This matches the C++ implementation: atfw::util::hash::murmur_hash3_x86_32(buf, len, 0)

func PackFrame

func PackFrame(payload []byte, frame []byte) int

PackFrame packs payload into a frame with hash and length prefix. Returns the number of bytes written, or 0 if buffer is too small.

The frame format is:

  • 4 bytes: murmur3_x86_32 hash of payload (little-endian)
  • varint: payload length (libatbus custom vint format)
  • payload: the actual data

func PackFrameSize

func PackFrameSize(payloadSize uint64) int

PackFrameSize returns the total size needed to pack a frame with the given payload size.

func TryUnpackFrameHeader

func TryUnpackFrameHeader(data []byte) (payloadLen uint64, headerSize int, needMoreData bool)

TryUnpackFrameHeader tries to parse just the frame header (hash + length) without validating hash. This is useful for pre-allocating buffer space for large messages. Returns:

  • payloadLen: the payload length if header is complete
  • headerSize: the size of the header (hash + varint) if complete, 0 otherwise
  • needMoreData: true if more data is needed to parse the header

Types

type FrameReader

type FrameReader struct {
	// contains filtered or unexported fields
}

FrameReader provides a streaming frame reader that accumulates data and extracts complete frames.

func NewFrameReader

func NewFrameReader(initialCapacity int) *FrameReader

NewFrameReader creates a new FrameReader with the specified initial buffer capacity.

func (*FrameReader) Available

func (r *FrameReader) Available() int

Available returns the number of bytes available for reading.

func (*FrameReader) ReadFrame

func (r *FrameReader) ReadFrame() UnpackFrameResult

ReadFrame attempts to read a complete frame from the buffer. Returns the unpacked result. If no complete frame is available, Error will be ErrIncompleteFrame.

func (*FrameReader) Reset

func (r *FrameReader) Reset()

Reset clears the reader's buffer.

func (*FrameReader) Write

func (r *FrameReader) Write(data []byte) int

Write appends data to the reader's buffer, growing it if necessary. Returns the number of bytes written.

type IoStreamCallbackEventHandleSet

type IoStreamCallbackEventHandleSet = types.IoStreamCallbackEventHandleSet

IoStreamCallbackEventHandleSet holds callback functions for IO stream events.

type IoStreamCallbackEventType

type IoStreamCallbackEventType = types.IoStreamCallbackEventType

IoStreamCallbackEventType defines the type of IO stream callback event.

type IoStreamCallbackFunc

type IoStreamCallbackFunc = types.IoStreamCallbackFunc

IoStreamCallbackFunc is the callback function type for IO stream events.

type IoStreamChannel

type IoStreamChannel struct {
	// contains filtered or unexported fields
}

IoStreamChannel manages IO stream connections.

func NewIoStreamChannel

func NewIoStreamChannel(ctx context.Context, conf *IoStreamConfigure) *IoStreamChannel

NewIoStreamChannel creates a new IO stream channel.

func (*IoStreamChannel) Close

Close closes the channel and all connections.

func (*IoStreamChannel) Connect

Connect connects to the specified address. Returns the connection and error code.

func (*IoStreamChannel) Disconnect

Disconnect disconnects the specified connection.

func (*IoStreamChannel) GetConfigure

func (c *IoStreamChannel) GetConfigure() *IoStreamConfigure

GetConfigure returns the channel configuration.

func (*IoStreamChannel) GetConnections

func (c *IoStreamChannel) GetConnections() []*IoStreamConnection

GetConnection returns all current connections.

func (*IoStreamChannel) GetContext

func (c *IoStreamChannel) GetContext() context.Context

GetContext returns the channel context.

func (*IoStreamChannel) GetEventHandleSet

func (c *IoStreamChannel) GetEventHandleSet() *IoStreamCallbackEventHandleSet

GetEventHandleSet returns the event handle set.

func (*IoStreamChannel) GetFlag

GetFlag returns whether a flag is set.

func (*IoStreamChannel) GetPrivateData

func (c *IoStreamChannel) GetPrivateData() interface{}

GetPrivateData returns user-defined private data.

func (*IoStreamChannel) GetStatisticActiveRequestCount

func (c *IoStreamChannel) GetStatisticActiveRequestCount() uint64

GetStatisticActiveRequestCount returns the active request count.

func (*IoStreamChannel) GetStatisticCheckBlockSizeFailedCount

func (c *IoStreamChannel) GetStatisticCheckBlockSizeFailedCount() uint64

GetStatisticCheckBlockSizeFailedCount returns the block size check failure count.

func (*IoStreamChannel) GetStatisticCheckHashFailedCount

func (c *IoStreamChannel) GetStatisticCheckHashFailedCount() uint64

GetStatisticCheckHashFailedCount returns the hash check failure count.

func (*IoStreamChannel) GetStatisticReadNetEgainCount

func (c *IoStreamChannel) GetStatisticReadNetEgainCount() uint64

GetStatisticReadNetEgainCount returns the EAGAIN count.

func (*IoStreamChannel) Listen

func (c *IoStreamChannel) Listen(addr string) error_code.ErrorType

Listen starts listening on the specified address. Supported address formats:

  • ipv4://host:port - IPv4/IPv6 TCP
  • ipv6://host:port - IPv4/IPv6 TCP
  • atcp://host:port - IPv4/IPv6 TCP
  • dns://host:port - DNS resolved TCP
  • unix://path - Unix domain socket
  • pipe://path - Named pipe (same as unix)

func (*IoStreamChannel) Send

Send sends data to the specified connection. The data will be automatically framed with hash and length prefix.

func (*IoStreamChannel) SetFlag

func (c *IoStreamChannel) SetFlag(f IoStreamConnectionFlag, v bool)

SetFlag sets or clears a channel flag.

func (*IoStreamChannel) SetPrivateData

func (c *IoStreamChannel) SetPrivateData(data interface{})

SetPrivateData sets user-defined private data.

type IoStreamChannelFlag

type IoStreamChannelFlag = types.IoStreamChannelFlag

IoStreamChannelFlag defines channel flags.

type IoStreamConfigure

type IoStreamConfigure = types.IoStreamConfigure

IoStreamConfigure is an alias for types.IoStreamConfigure.

type IoStreamConnection

type IoStreamConnection struct {
	// contains filtered or unexported fields
}

IoStreamConnection represents a single IO stream connection.

func (*IoStreamConnection) GetAddress

func (c *IoStreamConnection) GetAddress() types.ChannelAddress

GetAddress returns the connection address.

func (*IoStreamConnection) GetChannel

func (c *IoStreamConnection) GetChannel() types.IoStreamChannel

GetChannel returns the parent channel.

func (*IoStreamConnection) GetEventHandleSet

func (c *IoStreamConnection) GetEventHandleSet() *IoStreamCallbackEventHandleSet

GetEventHandleSet returns the event handle set.

func (*IoStreamConnection) GetFlag

GetFlag returns whether a flag is set.

func (*IoStreamConnection) GetNetConn

func (c *IoStreamConnection) GetNetConn() net.Conn

GetNetConn returns the underlying net.Conn.

func (*IoStreamConnection) GetPrivateData

func (c *IoStreamConnection) GetPrivateData() interface{}

GetPrivateData returns user-defined private data.

func (*IoStreamConnection) GetProactivelyDisconnectCallback

func (c *IoStreamConnection) GetProactivelyDisconnectCallback() IoStreamCallbackFunc

GetProactivelyDisconnectCallback returns the proactive disconnect callback.

func (*IoStreamConnection) GetReadBufferManager

func (c *IoStreamConnection) GetReadBufferManager() *buffer.BufferManager

GetReadBufferManager returns the read buffer manager.

func (*IoStreamConnection) GetStatus

GetStatus returns the current connection status.

func (*IoStreamConnection) SetFlag

SetFlag sets or clears a connection flag.

func (*IoStreamConnection) SetPrivateData

func (c *IoStreamConnection) SetPrivateData(data interface{})

SetPrivateData sets user-defined private data.

func (*IoStreamConnection) SetStatus

func (c *IoStreamConnection) SetStatus(status IoStreamConnectionStatus)

SetStatus sets the connection status.

type IoStreamConnectionFlag

type IoStreamConnectionFlag = types.IoStreamConnectionFlag

IoStreamConnectionFlag defines connection flags.

type IoStreamConnectionStatus

type IoStreamConnectionStatus = types.IoStreamConnectionStatus

IoStreamConnectionStatus defines connection status.

type UnpackFrameResult

type UnpackFrameResult struct {
	// Payload is the extracted payload data
	Payload []byte
	// Consumed is the total number of bytes consumed from the input buffer
	Consumed int
	// Error indicates any error that occurred during unpacking
	Error error
	// ErrorCode is the libatbus error code
	ErrorCode error_code.ErrorType
}

UnpackFrameResult contains the result of unpacking a frame.

func UnpackFrame

func UnpackFrame(data []byte) UnpackFrameResult

UnpackFrame attempts to unpack a frame from the input buffer. Returns the unpacked result including payload, consumed bytes, and any error.

If the buffer doesn't contain a complete frame, Error will be ErrIncompleteFrame. If the hash verification fails, Error will be ErrInvalidFrameHash.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL