bus

package
v1.0.21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2026 License: MIT Imports: 20 Imported by: 0

Documentation

Overview

Package bus implements the per-AppID event-bus daemon; lifecycle is driven by consumer presence (idle timeout) and explicit shutdown.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func SetupBusLogger

func SetupBusLogger(eventsDir string) (*log.Logger, error)

SetupBusLogger writes to eventsDir/bus.log with one-shot size-based rotation at startup only.

Types

type Bus

type Bus struct {
	// contains filtered or unexported fields
}

Bus is the central event bus daemon.

func NewBus

func NewBus(appID, appSecret, domain string, tr transport.IPC, logger *log.Logger) *Bus

func (*Bus) Run

func (b *Bus) Run(ctx context.Context) error

Run binds the IPC socket, starts event sources, and blocks in the accept loop until shutdown.

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

Conn represents a single consume client connection in the Bus.

func NewConn

func NewConn(conn net.Conn, reader *bufio.Reader, eventKey string, eventTypes []string, pid int) *Conn

NewConn creates a Conn; pass a reader with pre-buffered bytes (handoff from Bus.handleConn) or nil for a fresh one.

func (*Conn) Close

func (c *Conn) Close()

Close is idempotent.

func (*Conn) DroppedCount

func (c *Conn) DroppedCount() int64

func (*Conn) EventKey

func (c *Conn) EventKey() string

func (*Conn) EventTypes

func (c *Conn) EventTypes() []string

func (*Conn) IncrementDropped

func (c *Conn) IncrementDropped()

func (*Conn) IncrementReceived

func (c *Conn) IncrementReceived()

func (*Conn) NextSeq

func (c *Conn) NextSeq() uint64

NextSeq returns the next monotonic seq for this conn (first call returns 1).

func (*Conn) PID

func (c *Conn) PID() int

func (*Conn) PushDropOldest

func (c *Conn) PushDropOldest(msg interface{}) (enqueued, dropped bool)

PushDropOldest enqueues msg; on full channel evicts one oldest and retries, atomically under sendMu. Returns (enqueued, dropped). A rare concurrent drain may make drop unnecessary — still succeeds with dropped=false.

func (*Conn) ReaderLoop

func (c *Conn) ReaderLoop()

ReaderLoop reads control messages (Bye, PreShutdownCheck) until EOF.

func (*Conn) Received

func (c *Conn) Received() int64

func (*Conn) SendCh

func (c *Conn) SendCh() chan interface{}

func (*Conn) SenderLoop

func (c *Conn) SenderLoop()

SenderLoop exits on closed (not sendCh close) so Hub.Publish can send without panic risk.

func (*Conn) SetCheckLastForKey

func (c *Conn) SetCheckLastForKey(fn func(string) bool)

SetCheckLastForKey: returning true means "you are the last subscriber, run cleanup".

func (*Conn) SetLogger

func (c *Conn) SetLogger(l *log.Logger)

SetLogger attaches a logger (nil tolerated).

func (*Conn) SetOnClose

func (c *Conn) SetOnClose(fn func(*Conn))

func (*Conn) Start

func (c *Conn) Start()

Start launches the sender and reader goroutines; call exactly once.

func (*Conn) TrySend

func (c *Conn) TrySend(msg interface{}) bool

TrySend enqueues non-evictively under sendMu so it respects PushDropOldest's atomicity contract.

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

func NewHub

func NewHub() *Hub

func (*Hub) AcquireCleanupLock

func (h *Hub) AcquireCleanupLock(eventKey string) bool

AcquireCleanupLock reserves cleanup rights iff exactly one subscriber exists for eventKey and no lock is held. Count==0 is rejected (would block future Register calls). On true return, caller MUST Release.

func (*Hub) BroadcastSourceStatus

func (h *Hub) BroadcastSourceStatus(source, state, detail string)

BroadcastSourceStatus fans out a source-level status change to every subscriber. Best-effort: channel full → drop silently (status isn't worth applying back-pressure for). Routes through Subscriber.TrySend so the send shares PushDropOldest's sendMu — without this a status broadcast could slip into the tiny window between another goroutine's drop and its retry push and break the atomicity contract.

func (*Hub) ConnCount

func (h *Hub) ConnCount() int

ConnCount returns the current number of registered subscribers.

func (*Hub) Consumers

func (h *Hub) Consumers() []protocol.ConsumerInfo

Consumers returns info about all connected consumers.

func (*Hub) EventKeyCount

func (h *Hub) EventKeyCount(eventKey string) int

EventKeyCount returns the number of subscribers registered for eventKey.

func (*Hub) Publish

func (h *Hub) Publish(raw *event.RawEvent)

Publish fans out a RawEvent to all matching subscribers (non-blocking).

A fresh *protocol.Event is allocated per subscriber so each consumer sees its own monotonically-increasing Seq (assigned via Conn.NextSeq) — sharing a single msg struct across subscribers would alias Seq and defeat the gap-detection at the consume side. The extra allocation per fan-out is cheap compared to the socket write that follows.

func (*Hub) RegisterAndIsFirst

func (h *Hub) RegisterAndIsFirst(s Subscriber) bool

RegisterAndIsFirst adds s to the hub and reports whether it's the first subscriber for its EventKey. If a cleanup is in progress for s.EventKey() (another conn holds the cleanup lock), this waits until cleanup releases before registering — closing the PreShutdownCheck × Hello TOCTOU race. The wait releases h.mu before blocking on the channel, so concurrent operations on other keys aren't stalled.

func (*Hub) ReleaseCleanupLock

func (h *Hub) ReleaseCleanupLock(eventKey string)

ReleaseCleanupLock is idempotent; OnClose calls unconditionally.

func (*Hub) SetLogger

func (h *Hub) SetLogger(l *log.Logger)

SetLogger attaches a logger (nil tolerated).

func (*Hub) UnregisterAndIsLast

func (h *Hub) UnregisterAndIsLast(s Subscriber) bool

UnregisterAndIsLast removes s and reports whether it was last for its EventKey; stale unregisters are no-ops.

type Subscriber

type Subscriber interface {
	EventKey() string
	EventTypes() []string
	SendCh() chan interface{}
	PID() int
	IncrementReceived()
	Received() int64
	// PushDropOldest enqueues atomically with drop-oldest backpressure.
	PushDropOldest(msg interface{}) (enqueued, dropped bool)
	// TrySend is non-evictive but shares PushDropOldest's mutex.
	TrySend(msg interface{}) bool
	DroppedCount() int64
	IncrementDropped()
	// NextSeq returns a monotonic per-subscriber seq; tests may return 0.
	NextSeq() uint64
}

Subscriber is the interface a connection must satisfy for Hub registration.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL