message

package
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 1, 2025 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Dispatcher overview: - Single producer (the underlying Messager reading from a stream) - Multiple logical consumers via WaitFor() Pipeline: continuous read -> bucket by pb.MessageType (per-type queue) -> consumer fetches Features:

  • Per-type queue limit (10). When full, oldest entry is evicted (overflow counter increments)
  • TTL pruning: on enqueue we drop expired head entries; on dequeue we also skip newly expired items
  • Minimum effective TTL enforced at 1s (even if ttl_ms smaller or zero)
  • WaitFor waits for the first non‑expired message among requested types or returns on context/close/error
  • Metrics: droppedExpired, droppedOverflow (stored but not yet exported via API)
  • Close() unblocks all waiting goroutines

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

func NewDispatcher

func NewDispatcher(m *Messager) *Dispatcher

func (*Dispatcher) Close

func (d *Dispatcher) Close()

func (*Dispatcher) IsClosed

func (d *Dispatcher) IsClosed() bool

func (*Dispatcher) Start

func (d *Dispatcher) Start()

func (*Dispatcher) WaitFor

func (d *Dispatcher) WaitFor(ctx context.Context, types ...pb.MessageType) (*pb.UnifiedMessage, error)

type Messager

type Messager struct {
	// contains filtered or unexported fields
}

func NewMessager

func NewMessager(conn io.ReadWriteCloser, cfg *config.Config, peerID peer.ID) *Messager

NewMessager constructs a handshake instance (exported API wrapper).

func (*Messager) PeerID

func (h *Messager) PeerID() peer.ID

PeerID returns the remote peer id (exported API wrapper).

func (*Messager) ReceiveOne

func (h *Messager) ReceiveOne(timeout time.Duration) (*pb.UnifiedMessage, error)

ReceiveOne 读取单个 protobuf 消息(仅用于数据流一次性握手,避免后台持续读取破坏后续原始字节)。 timeout>0 时若底层支持 deadline 则设置读超时。

func (*Messager) SendAuthRequest

func (h *Messager) SendAuthRequest(password string) error

SendAuthRequest sends an authentication request with password.

func (*Messager) SendAuthResponse

func (h *Messager) SendAuthResponse(ok bool, msg string) error

SendAuthResponse replies to an authentication request.

func (*Messager) SendClientShutdownNotification

func (h *Messager) SendClientShutdownNotification() error

SendClientShutdownNotification sends a client shutdown notice.

func (*Messager) SendHeartbeat

func (h *Messager) SendHeartbeat() error

SendHeartbeat sends a heartbeat control message.

func (*Messager) SendHeartbeatAck

func (h *Messager) SendHeartbeatAck() error

func (*Messager) SendPasswordRequired

func (h *Messager) SendPasswordRequired(require bool) error

SendPasswordRequired indicates whether password is required.

func (*Messager) SendServerShutdownNotification

func (h *Messager) SendServerShutdownNotification() error

SendServerShutdownNotification sends a server shutdown notice.

func (*Messager) SendServiceRequest

func (h *Messager) SendServiceRequest(name, protocol, pwd string) error

SendServiceRequest requests a specific service/protocol (optionally with password).

func (*Messager) SendServiceResponse

func (h *Messager) SendServiceResponse(ok bool, code pb.ErrorCode, msg string, data map[string]string) error

SendServiceResponse replies to a service request.

func (*Messager) SendWithTTL

func (h *Messager) SendWithTTL(msg *pb.UnifiedMessage, ttl time.Duration) error

TTL in dispatcher

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL