protocol

package
v1.0.21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2026 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package protocol defines the newline-delimited JSON wire format used over IPC.

Index

Constants

View Source
const (
	MsgTypeHello            = "hello"
	MsgTypeHelloAck         = "hello_ack"
	MsgTypeEvent            = "event"
	MsgTypeBye              = "bye"
	MsgTypePreShutdownCheck = "pre_shutdown_check"
	MsgTypePreShutdownAck   = "pre_shutdown_ack"
	MsgTypeStatusQuery      = "status_query"
	MsgTypeStatusResponse   = "status_response"
	MsgTypeShutdown         = "shutdown"
	MsgTypeSourceStatus     = "source_status"
)
View Source
const (
	SourceStateConnecting   = "connecting"
	SourceStateConnected    = "connected"
	SourceStateDisconnected = "disconnected"
	SourceStateReconnecting = "reconnecting"
)
View Source
const MaxFrameBytes = 1 << 20 // reject larger frames to bound reader buffer growth
View Source
const WriteTimeout = 5 * time.Second // bound writes against wedged peer kernel buffer

Variables

View Source
var ErrFrameTooLarge = errors.New("protocol: frame exceeds MaxFrameBytes")

ErrFrameTooLarge is returned by ReadFrame when a single frame exceeds MaxFrameBytes.

Functions

func Decode

func Decode(line []byte) (interface{}, error)

func Encode

func Encode(w io.Writer, msg interface{}) error

func EncodeWithDeadline

func EncodeWithDeadline(conn net.Conn, msg interface{}, timeout time.Duration) error

func ReadFrame

func ReadFrame(br *bufio.Reader) ([]byte, error)

ReadFrame reads one newline-delimited message; caps at MaxFrameBytes to defang slowloris.

Types

type Bye

type Bye struct {
	Type string `json:"type"`
}

type ConsumerInfo

type ConsumerInfo struct {
	PID      int    `json:"pid"`
	EventKey string `json:"event_key"`
	Received int64  `json:"received"`
	Dropped  int64  `json:"dropped"`
}

type Event

type Event struct {
	Type       string          `json:"type"`
	EventType  string          `json:"event_type"`
	EventID    string          `json:"event_id,omitempty"`
	SourceTime string          `json:"source_time,omitempty"` // ms-precision unix timestamp, stringified
	Seq        uint64          `json:"seq,omitempty"`
	Payload    json.RawMessage `json:"payload"`
}

Event: Seq is per-conn monotonic; gaps signal bus drop-oldest backpressure loss.

func NewEvent

func NewEvent(eventType, eventID, sourceTime string, seq uint64, payload json.RawMessage) *Event

type Hello

type Hello struct {
	Type       string   `json:"type"`
	PID        int      `json:"pid"`
	EventKey   string   `json:"event_key"`
	EventTypes []string `json:"event_types"`
	Version    string   `json:"version"`
}

func NewHello

func NewHello(pid int, eventKey string, eventTypes []string, version string) *Hello

type HelloAck

type HelloAck struct {
	Type        string `json:"type"`
	BusVersion  string `json:"bus_version"`
	FirstForKey bool   `json:"first_for_key"`
}

func NewHelloAck

func NewHelloAck(busVersion string, firstForKey bool) *HelloAck

type PreShutdownAck

type PreShutdownAck struct {
	Type       string `json:"type"`
	LastForKey bool   `json:"last_for_key"`
}

func NewPreShutdownAck

func NewPreShutdownAck(lastForKey bool) *PreShutdownAck

type PreShutdownCheck

type PreShutdownCheck struct {
	Type     string `json:"type"`
	EventKey string `json:"event_key"`
}

PreShutdownCheck atomically reserves the cleanup lock for EventKey.

func NewPreShutdownCheck

func NewPreShutdownCheck(eventKey string) *PreShutdownCheck

type Shutdown

type Shutdown struct {
	Type string `json:"type"`
}

func NewShutdown

func NewShutdown() *Shutdown

type SourceStatus

type SourceStatus struct {
	Type   string `json:"type"`
	Source string `json:"source"`
	State  string `json:"state"`
	Detail string `json:"detail,omitempty"`
}

SourceStatus is best-effort: hub drops it when consumer's send channel is full.

func NewSourceStatus

func NewSourceStatus(source, state, detail string) *SourceStatus

type StatusQuery

type StatusQuery struct {
	Type string `json:"type"`
}

func NewStatusQuery

func NewStatusQuery() *StatusQuery

type StatusResponse

type StatusResponse struct {
	Type        string         `json:"type"`
	PID         int            `json:"pid"`
	UptimeSec   int            `json:"uptime_sec"`
	ActiveConns int            `json:"active_conns"`
	Consumers   []ConsumerInfo `json:"consumers"`
}

func NewStatusResponse

func NewStatusResponse(pid int, uptimeSec int, activeConns int, consumers []ConsumerInfo) *StatusResponse

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL