trickle

package
v0.8.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2026 License: MIT Imports: 15 Imported by: 0

README

Trickle Protocol

Trickle is a segmented publish-subscribe protocol that streams data in realtime, mainly over HTTP.

Breaking this down:

  1. Data streams are called channels , Channels are content agnostic - the data can be video, audio, JSON, logs, a custom format, etc.

  2. Segmented: Data for each channel is sent as discrete segments. For example, a video may use a segment for each GOP, or an API may use a segment for each JSON message, or a logging application may split segments on time intervals. The boundaries of each segment are application defined, but segments are preferably standalone such that a request for a single segment should return usable content on its own.

  3. Publish-subscribe: Publishers push segments, and subscribers pull them.

  4. Streaming: Content can be sent by publishers and received by subscribers in real-time as it is generated - data is trickled out.

  5. HTTP: Trickle is tied to HTTP with GET / POST semantics, status codes, path based routing, metadata in headers, etc. However, other implementations are possible, such as a local, in-memory trickle client.

Protocol Specification

TODO: more details

Publishers POST to /channel-name/seq

Subscribers GET to /channel-name/seq

The channel-name is any valid HTTP path part.

The seq is an integer sequence number indicating the segment, and must increase sequentially without gaps.

As data is published to channel-name/seq,the server will relay the data to all subscribers for channel-name/seq.

Clients are responsible for maintaining their own position in the sequence.

Servers may opt to keep the last N segments for subscribers to catch up on.

Servers will 404 if a channel-name or a seq does not exist.

Clients may pre-connect the next segment in order to set up the resource and minimize connection set-up time.

Publishers should only actively send data to one seq at a time, although they may still pre-connect to seq + 1

Publishers do not have to push content immeditely after preconnecting, however the server should have some reasonable timeout to avoid excessive idle connections.

If a subscriber retrieves a segment mid-publish, the server should return all the content it has up until that point, and trickle down the rest as it receives it.

If a timeout has been hit without sending (or receiving) content, the publisher (or subscriber) can re-connect to the same seq. (TODO; also indicate timeout via signaling)

Servers may offer some grace with leading sequence numbers to avoid data races, eg allowing a GET for seq+1 if a publisher hasn't yet preconnected that number.

Publishers are responsible for segmenting content (if necessary) and subscribers are responsible for re-assembling content (if necessary)

Subscribers can initiate a subscribe with a seq of -1 to retrieve the most recent publish. With preconnects, the subscriber may be waiting for the next publish. For video this allows clients to eg, start streaming at the live edge of the next GOP.

Subscribers can retrieve the current seq with the Lp-Trickle-Seq metadata (HTTP header). This is useful in case -1 was used to initiate the subscription; the subscribing client can then pre-connect to Lp-Trickle-Seq + 1

Subscribers can initiate a subscribe with a seq of -N to get the Nth-from-last segment. (TODO)

The server should send subscribers Lp-Trickle-Size metadata to indicate the size of the content up until now. This allows clients to know where the live edge is, eg video implementations can decode-and-discard frames up until the edge to achieve immediate playback without waiting for the next segment. (TODO)

The server currently has a special changefeed channel named _changes which will send subscribers updates on streams that are added and removed. The changefeed is disabled by default.

Documentation

Index

Constants

View Source
const CHANGEFEED = "_changes"

Variables

View Source
var EOS = errors.New("End of stream")
View Source
var FirstByteTimeout = errors.New("pending read timeout")
View Source
var StreamNotFoundErr = errors.New("stream not found")

Functions

func GetLatest added in v0.8.2

func GetLatest(resp *http.Response) int

func GetSeq

func GetSeq(resp *http.Response) int

func IsEOS

func IsEOS(resp *http.Response) bool

Types

type Changefeed

type Changefeed struct {
	Added   []string `json:"added,omitempty"`
	Removed []string `json:"removed,omitempty"`
}

type HTTPError added in v0.8.1

type HTTPError struct {
	Code int
	Body string
}

HTTPError gets returned with a >=400 status code (non-400)

func (*HTTPError) Error added in v0.8.1

func (e *HTTPError) Error() string

type Segment

type Segment struct {
	// contains filtered or unexported fields
}

type SegmentSubscriber

type SegmentSubscriber struct {
	// contains filtered or unexported fields
}

type SequenceNonexistent added in v0.8.2

type SequenceNonexistent struct {
	Latest int
	Seq    int
}

func (*SequenceNonexistent) Error added in v0.8.2

func (e *SequenceNonexistent) Error() string

type SequenceStart added in v0.8.7

type SequenceStart int
const (
	Current SequenceStart = -2
	Next                  = -1
)

type Server

type Server struct {
	// contains filtered or unexported fields
}

func ConfigureServer

func ConfigureServer(config TrickleServerConfig) *Server

func (*Server) Start added in v0.8.2

func (sm *Server) Start() func()

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

type TrickleData

type TrickleData struct {
	Reader   io.Reader
	Metadata map[string]string
}

type TrickleLocalPublisher

type TrickleLocalPublisher struct {
	// contains filtered or unexported fields
}

func NewLocalPublisher

func NewLocalPublisher(sm *Server, channelName string, mimeType string) *TrickleLocalPublisher

func (*TrickleLocalPublisher) Close

func (c *TrickleLocalPublisher) Close() error

func (*TrickleLocalPublisher) CreateChannel

func (c *TrickleLocalPublisher) CreateChannel()

func (*TrickleLocalPublisher) Write

func (c *TrickleLocalPublisher) Write(data io.Reader) error

type TrickleLocalSubscriber

type TrickleLocalSubscriber struct {
	// contains filtered or unexported fields
}

func NewLocalSubscriber

func NewLocalSubscriber(sm *Server, channelName string) *TrickleLocalSubscriber

func (*TrickleLocalSubscriber) Read

func (*TrickleLocalSubscriber) SetSeq added in v0.8.9

func (c *TrickleLocalSubscriber) SetSeq(seq int)

type TricklePublisher

type TricklePublisher struct {
	// contains filtered or unexported fields
}

TricklePublisher represents a trickle streaming client

func NewTricklePublisher

func NewTricklePublisher(url string) (*TricklePublisher, error)

NewTricklePublisher creates a new trickle stream client

func (*TricklePublisher) Close

func (c *TricklePublisher) Close() error

func (*TricklePublisher) Next added in v0.8.2

func (c *TricklePublisher) Next() (*pendingPost, error)

func (*TricklePublisher) Write

func (c *TricklePublisher) Write(data io.Reader) error

Write sends data to the current segment, sets up the next segment concurrently, and blocks until completion

type TrickleServerConfig

type TrickleServerConfig struct {
	// Base HTTP path for the server
	BasePath string

	// HTTP mux to use
	Mux *http.ServeMux

	// Whether to enable the changefeed (default false)
	Changefeed bool

	// Whether to auto-create channels on first publish (default false)
	Autocreate bool

	// Amount of time a channel has no new segments before being swept (default 5 minutes)
	IdleTimeout time.Duration

	// How often to sweep for idle channels (default 1 minute)
	SweepInterval time.Duration
}

type TrickleSubscriber

type TrickleSubscriber struct {
	// contains filtered or unexported fields
}

TrickleSubscriber represents a trickle streaming reader that always fetches from index -1

func NewTrickleSubscriber

func NewTrickleSubscriber(config TrickleSubscriberConfig) (*TrickleSubscriber, error)

NewTrickleSubscriber creates a new trickle stream reader for GET requests

func (*TrickleSubscriber) Read

func (c *TrickleSubscriber) Read() (*http.Response, error)

Read retrieves data from the current segment and sets up the next segment concurrently. It returns the reader for the current segment's data.

func (*TrickleSubscriber) SetSeq added in v0.8.2

func (c *TrickleSubscriber) SetSeq(seq int)

type TrickleSubscriberConfig added in v0.8.7

type TrickleSubscriberConfig struct {

	// Trickle URL to subscribe to (required).
	URL string

	// Pass in a context for custom cancellation of
	// the entire subscription.
	//
	// Setting a context here is unusual but aligns
	// better with how contexts are used internally
	// (eg, they do not strictly map to a single Read request)
	Ctx context.Context

	// Set the index of the first sequence to read.
	// Pointer to distinguish unset from a valid zero field.
	Start *SequenceStart
}

TrickleSubscriberConfig holds all NewTrickleSubscriber inputs. Pass this by value; any nil or zero will fall back to defaults.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL