eventstream

package module
v0.2.1-beta.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 29, 2026 License: AGPL-3.0 Imports: 12 Imported by: 0

README

eventstream

ci codecov

Pub/sub event delivery plugin for the Pilot Protocol daemon. Binds service port 1002 and ships events between Pilot peers over the daemon's reliable stream transport.

Install

import "github.com/pilot-protocol/eventstream"

Usage

// Register as a plugin on the daemon runtime:
rt.Register(eventstream.NewService())

// Or use the client directly:
c, err := eventstream.Dial(ctx, peerAddr)
if err != nil {
    return err
}
defer c.Close()
if err := c.Subscribe("ticker.btcusd"); err != nil {
    return err
}
for {
    ev, err := c.Read()
    if err != nil {
        return err
    }
    handle(ev)
}

From pilotctl:

pilotctl subscribe <peer-address> ticker.btcusd
pilotctl publish   <peer-address> ticker.btcusd --data '...'

Layout

File What it does
eventstream.go Wire format: Event{Topic, Payload}, length-prefixed framing. WriteEvent / ReadEvent.
client.go Subscriber side: Client.Dial, .Subscribe(topic), .Read, .Close.
server.go Publisher side: Server accepts inbound stream connections and broadcasts.
service.go *Servicecoreapi.Service adapter, binds port 1002. Build tag !no_eventstream.
service_disabled.go Stub when -tags no_eventstream is set.
examples/main.go Minimal publisher + subscriber example.

Build tags

Tag Effect
no_eventstream Compiles a no-op stub service.

License

AGPL-3.0-or-later. See LICENSE.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WriteEvent

func WriteEvent(w io.Writer, e *Event) error

WriteEvent writes an event to a writer.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client connects to a remote event stream broker on port 1002.

func Subscribe

func Subscribe(d *driver.Driver, addr protocol.Addr, topic string) (*Client, error)

Subscribe connects to the event stream and subscribes to a topic. Use "*" to subscribe to all events.

func (*Client) Close

func (c *Client) Close() error

Close closes the connection.

func (*Client) Publish

func (c *Client) Publish(topic string, payload []byte) error

Publish sends an event to the broker for distribution.

func (*Client) Recv

func (c *Client) Recv() (*Event, error)

Recv waits for the next event from the broker.

type Event

type Event struct {
	Topic   string
	Payload []byte
}

Event is a typed message published to the event stream. Wire format: [2-byte topic length][topic][4-byte payload length][payload]

func ReadEvent

func ReadEvent(r io.Reader) (*Event, error)

ReadEvent reads an event from a reader.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is a pub/sub event broker on port 1002. Clients connect, subscribe to topics, and publish events. The first event from a client is treated as a subscription: - Topic "*" subscribes to all events - Any other topic subscribes to that specific topic Subsequent events are published to all matching subscribers.

func NewServer

func NewServer(d *driver.Driver) *Server

NewServer creates an event stream server.

func (*Server) ListenAndServe

func (s *Server) ListenAndServe() error

ListenAndServe binds port 1002 and starts the broker.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service is the L11 plugin adapter. cmd/daemon/main.go (L12) and cmd/pilotctl _daemon-run construct it via NewService and register via daemon.RegisterPlugin.

func NewService

func NewService() *Service

NewService returns a Service ready for daemon.RegisterPlugin.

func (*Service) Name

func (s *Service) Name() string

func (*Service) Order

func (s *Service) Order() int

Order: 120 — after the trust subsystem (50) and other application services that may want to publish before the broker is up.

func (*Service) Start

func (s *Service) Start(ctx context.Context, deps coreapi.Deps) error

func (*Service) Stop

func (s *Service) Stop(ctx context.Context) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL