Documentation
¶
Overview ¶
Dispatcher overview: - Single producer (the underlying Messager reading from a stream) - Multiple logical consumers via WaitFor() Pipeline: continuous read -> bucket by pb.MessageType (per-type queue) -> consumer fetches Features:
- Per-type queue limit (10). When full, oldest entry is evicted (overflow counter increments)
- TTL pruning: on enqueue we drop expired head entries; on dequeue we also skip newly expired items
- Minimum effective TTL enforced at 1s (even if ttl_ms smaller or zero)
- WaitFor waits for the first non‑expired message among requested types or returns on context/close/error
- Metrics: droppedExpired, droppedOverflow (stored but not yet exported via API)
- Close() unblocks all waiting goroutines
Index ¶
- type Dispatcher
- type Messager
- func (h *Messager) PeerID() peer.ID
- func (h *Messager) ReceiveOne(timeout time.Duration) (*pb.UnifiedMessage, error)
- func (h *Messager) SendAuthRequest(password string) error
- func (h *Messager) SendAuthResponse(ok bool, msg string) error
- func (h *Messager) SendClientShutdownNotification() error
- func (h *Messager) SendHeartbeat() error
- func (h *Messager) SendHeartbeatAck() error
- func (h *Messager) SendPasswordRequired(require bool) error
- func (h *Messager) SendServerShutdownNotification() error
- func (h *Messager) SendServiceRequest(name, protocol, pwd string) error
- func (h *Messager) SendServiceResponse(ok bool, code pb.ErrorCode, msg string, data map[string]string) error
- func (h *Messager) SendWithTTL(msg *pb.UnifiedMessage, ttl time.Duration) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
func NewDispatcher ¶
func NewDispatcher(m *Messager) *Dispatcher
func (*Dispatcher) Close ¶
func (d *Dispatcher) Close()
func (*Dispatcher) IsClosed ¶
func (d *Dispatcher) IsClosed() bool
func (*Dispatcher) Start ¶
func (d *Dispatcher) Start()
func (*Dispatcher) WaitFor ¶
func (d *Dispatcher) WaitFor(ctx context.Context, types ...pb.MessageType) (*pb.UnifiedMessage, error)
type Messager ¶
type Messager struct {
// contains filtered or unexported fields
}
func NewMessager ¶
NewMessager constructs a handshake instance (exported API wrapper).
func (*Messager) ReceiveOne ¶
ReceiveOne 读取单个 protobuf 消息(仅用于数据流一次性握手,避免后台持续读取破坏后续原始字节)。 timeout>0 时若底层支持 deadline 则设置读超时。
func (*Messager) SendAuthRequest ¶
SendAuthRequest sends an authentication request with password.
func (*Messager) SendAuthResponse ¶
SendAuthResponse replies to an authentication request.
func (*Messager) SendClientShutdownNotification ¶
SendClientShutdownNotification sends a client shutdown notice.
func (*Messager) SendHeartbeat ¶
SendHeartbeat sends a heartbeat control message.
func (*Messager) SendHeartbeatAck ¶
func (*Messager) SendPasswordRequired ¶
SendPasswordRequired indicates whether password is required.
func (*Messager) SendServerShutdownNotification ¶
SendServerShutdownNotification sends a server shutdown notice.
func (*Messager) SendServiceRequest ¶
SendServiceRequest requests a specific service/protocol (optionally with password).
func (*Messager) SendServiceResponse ¶
func (h *Messager) SendServiceResponse(ok bool, code pb.ErrorCode, msg string, data map[string]string) error
SendServiceResponse replies to a service request.
func (*Messager) SendWithTTL ¶
TTL in dispatcher