Documentation
¶
Overview ¶
Package bus implements the per-AppID event-bus daemon; lifecycle is driven by consumer presence (idle timeout) and explicit shutdown.
Index ¶
- func SetupBusLogger(eventsDir string) (*log.Logger, error)
- type Bus
- type Conn
- func (c *Conn) Close()
- func (c *Conn) DroppedCount() int64
- func (c *Conn) EventKey() string
- func (c *Conn) EventTypes() []string
- func (c *Conn) IncrementDropped()
- func (c *Conn) IncrementReceived()
- func (c *Conn) NextSeq() uint64
- func (c *Conn) PID() int
- func (c *Conn) PushDropOldest(msg interface{}) (enqueued, dropped bool)
- func (c *Conn) ReaderLoop()
- func (c *Conn) Received() int64
- func (c *Conn) SendCh() chan interface{}
- func (c *Conn) SenderLoop()
- func (c *Conn) SetCheckLastForKey(fn func(string) bool)
- func (c *Conn) SetLogger(l *log.Logger)
- func (c *Conn) SetOnClose(fn func(*Conn))
- func (c *Conn) Start()
- func (c *Conn) TrySend(msg interface{}) bool
- type Hub
- func (h *Hub) AcquireCleanupLock(eventKey string) bool
- func (h *Hub) BroadcastSourceStatus(source, state, detail string)
- func (h *Hub) ConnCount() int
- func (h *Hub) Consumers() []protocol.ConsumerInfo
- func (h *Hub) EventKeyCount(eventKey string) int
- func (h *Hub) Publish(raw *event.RawEvent)
- func (h *Hub) RegisterAndIsFirst(s Subscriber) bool
- func (h *Hub) ReleaseCleanupLock(eventKey string)
- func (h *Hub) SetLogger(l *log.Logger)
- func (h *Hub) UnregisterAndIsLast(s Subscriber) bool
- type Subscriber
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Bus ¶
type Bus struct {
// contains filtered or unexported fields
}
Bus is the central event bus daemon.
type Conn ¶
type Conn struct {
// contains filtered or unexported fields
}
Conn represents a single consume client connection in the Bus.
func NewConn ¶
func NewConn(conn net.Conn, reader *bufio.Reader, eventKey string, eventTypes []string, pid int) *Conn
NewConn creates a Conn; pass a reader with pre-buffered bytes (handoff from Bus.handleConn) or nil for a fresh one.
func (*Conn) DroppedCount ¶
func (*Conn) EventTypes ¶
func (*Conn) IncrementDropped ¶
func (c *Conn) IncrementDropped()
func (*Conn) IncrementReceived ¶
func (c *Conn) IncrementReceived()
func (*Conn) PushDropOldest ¶
PushDropOldest enqueues msg; on full channel evicts one oldest and retries, atomically under sendMu. Returns (enqueued, dropped). A rare concurrent drain may make drop unnecessary — still succeeds with dropped=false.
func (*Conn) ReaderLoop ¶
func (c *Conn) ReaderLoop()
ReaderLoop reads control messages (Bye, PreShutdownCheck) until EOF.
func (*Conn) SenderLoop ¶
func (c *Conn) SenderLoop()
SenderLoop exits on closed (not sendCh close) so Hub.Publish can send without panic risk.
func (*Conn) SetCheckLastForKey ¶
SetCheckLastForKey: returning true means "you are the last subscriber, run cleanup".
func (*Conn) SetOnClose ¶
type Hub ¶
type Hub struct {
// contains filtered or unexported fields
}
func (*Hub) AcquireCleanupLock ¶
AcquireCleanupLock reserves cleanup rights iff exactly one subscriber exists for eventKey and no lock is held. Count==0 is rejected (would block future Register calls). On true return, caller MUST Release.
func (*Hub) BroadcastSourceStatus ¶
BroadcastSourceStatus fans out a source-level status change to every subscriber. Best-effort: channel full → drop silently (status isn't worth applying back-pressure for). Routes through Subscriber.TrySend so the send shares PushDropOldest's sendMu — without this a status broadcast could slip into the tiny window between another goroutine's drop and its retry push and break the atomicity contract.
func (*Hub) Consumers ¶
func (h *Hub) Consumers() []protocol.ConsumerInfo
Consumers returns info about all connected consumers.
func (*Hub) EventKeyCount ¶
EventKeyCount returns the number of subscribers registered for eventKey.
func (*Hub) Publish ¶
Publish fans out a RawEvent to all matching subscribers (non-blocking).
A fresh *protocol.Event is allocated per subscriber so each consumer sees its own monotonically-increasing Seq (assigned via Conn.NextSeq) — sharing a single msg struct across subscribers would alias Seq and defeat the gap-detection at the consume side. The extra allocation per fan-out is cheap compared to the socket write that follows.
func (*Hub) RegisterAndIsFirst ¶
func (h *Hub) RegisterAndIsFirst(s Subscriber) bool
RegisterAndIsFirst adds s to the hub and reports whether it's the first subscriber for its EventKey. If a cleanup is in progress for s.EventKey() (another conn holds the cleanup lock), this waits until cleanup releases before registering — closing the PreShutdownCheck × Hello TOCTOU race. The wait releases h.mu before blocking on the channel, so concurrent operations on other keys aren't stalled.
func (*Hub) ReleaseCleanupLock ¶
ReleaseCleanupLock is idempotent; OnClose calls unconditionally.
func (*Hub) UnregisterAndIsLast ¶
func (h *Hub) UnregisterAndIsLast(s Subscriber) bool
UnregisterAndIsLast removes s and reports whether it was last for its EventKey; stale unregisters are no-ops.
type Subscriber ¶
type Subscriber interface {
EventKey() string
EventTypes() []string
SendCh() chan interface{}
PID() int
IncrementReceived()
Received() int64
// PushDropOldest enqueues atomically with drop-oldest backpressure.
PushDropOldest(msg interface{}) (enqueued, dropped bool)
// TrySend is non-evictive but shares PushDropOldest's mutex.
TrySend(msg interface{}) bool
DroppedCount() int64
IncrementDropped()
// NextSeq returns a monotonic per-subscriber seq; tests may return 0.
NextSeq() uint64
}
Subscriber is the interface a connection must satisfy for Hub registration.