consume

package
v1.0.21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2026 License: MIT Imports: 25 Imported by: 0

Documentation

Overview

Package consume drives the consume-side half of the events pipeline.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CheckRemoteConnections

func CheckRemoteConnections(ctx context.Context, client APIClient) (int, error)

CheckRemoteConnections returns the count of active WebSocket connections for this app.

func CompileJQ

func CompileJQ(expr string) (*gojq.Code, error)

CompileJQ compiles once for hot-path reuse; exported so callers can preflight before side effects.

func EnsureBus

func EnsureBus(ctx context.Context, tr transport.IPC, appID, profileName, domain string, apiClient APIClient, errOut io.Writer) (net.Conn, error)

EnsureBus dials the bus daemon for appID, forking a new one if none is running. apiClient nil skips remote-connection probe. Local-bus hits skip remote check (see `event status`).

func Run

func Run(ctx context.Context, tr transport.IPC, appID, profileName, domain string, opts Options) error

Run ensures bus is up, performs hello handshake, runs PreConsume for first subscriber, enters the consume loop, and runs cleanup on exit if we were the last subscriber.

Types

type APIClient

type APIClient = event.APIClient

type DirSink

type DirSink struct {
	Dir string
	// contains filtered or unexported fields
}

DirSink writes one JSON file per event; nanos+pid+seq filename avoids cross-process collisions.

func (*DirSink) Write

func (s *DirSink) Write(data json.RawMessage) error

type Options

type Options struct {
	EventKey        string
	Params          map[string]string
	JQExpr          string
	Quiet           bool
	OutputDir       string
	Runtime         event.APIClient
	Out             io.Writer // nil falls back to os.Stdout
	ErrOut          io.Writer
	RemoteAPIClient APIClient // nil disables remote-connection preflight

	MaxEvents int           // 0 = unlimited
	Timeout   time.Duration // 0 = no timeout
	IsTTY     bool
}

type Sink

type Sink interface {
	Write(data json.RawMessage) error
}

type WriterSink

type WriterSink struct {
	W      io.Writer
	Pretty bool
	ErrOut io.Writer
	// contains filtered or unexported fields
}

WriterSink writes one JSON event per line; mu serialises concurrent worker writes.

func (*WriterSink) Write

func (s *WriterSink) Write(data json.RawMessage) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL