pubsub

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 20, 2025 License: MIT Imports: 15 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ChannelPubSub

type ChannelPubSub struct {
	// contains filtered or unexported fields
}

ChannelPubSub implements the PubSub interface using Go channels This provides a no-dependency pub/sub solution for SQLite-based deployments Recent events are handled by the storage layer via LookupOutpoints

func NewChannelPubSub

func NewChannelPubSub() *ChannelPubSub

NewChannelPubSub creates a new channel-based pub/sub implementation

func (*ChannelPubSub) Close

func (cp *ChannelPubSub) Close() error

Close closes the pub/sub system

func (*ChannelPubSub) Publish

func (cp *ChannelPubSub) Publish(ctx context.Context, topic string, data string) error

Publish sends data to all subscribers of a topic

func (*ChannelPubSub) Start

func (cp *ChannelPubSub) Start(ctx context.Context) error

Start initializes the pub/sub system

func (*ChannelPubSub) Stop

func (cp *ChannelPubSub) Stop() error

Stop stops the pub/sub system

func (*ChannelPubSub) Subscribe

func (cp *ChannelPubSub) Subscribe(ctx context.Context, topics []string) (<-chan Event, error)

Subscribe creates a subscription to the given topics

func (*ChannelPubSub) Unsubscribe

func (cp *ChannelPubSub) Unsubscribe(topics []string) error

Unsubscribe removes subscriptions (for compatibility)

type Event

type Event struct {
	Topic  string         `json:"topic"`
	Member string         `json:"member"` // Generic member data (usually outpoint string)
	Score  float64        `json:"score"`
	Source string         `json:"source"`         // "redis", "sse:peerURL", etc.
	Data   map[string]any `json:"data,omitempty"` // Additional event data
}

Event represents a unified event that can come from Redis or SSE sources

type EventData

type EventData struct {
	Outpoint string  `json:"outpoint"`
	Score    float64 `json:"score"`
}

EventData represents a buffered event for reconnection (backward compatibility)

type PeerBroadcaster

type PeerBroadcaster struct {
	// contains filtered or unexported fields
}

PeerBroadcaster handles broadcasting submitted transactions to peer overlays

func NewPeerBroadcaster

func NewPeerBroadcaster(peerTopics map[string][]string) *PeerBroadcaster

NewPeerBroadcaster creates a new peer broadcaster with the given peer-to-topics mapping

func (*PeerBroadcaster) BroadcastTransaction

func (p *PeerBroadcaster) BroadcastTransaction(ctx context.Context, taggedBEEF overlay.TaggedBEEF) error

BroadcastTransaction broadcasts a TaggedBEEF to all configured peers for the topics

type PubSub

type PubSub interface {
	// Publishing functionality
	Publish(ctx context.Context, topic string, data string) error

	// Subscribing functionality
	Subscribe(ctx context.Context, topics []string) (<-chan Event, error)
	Unsubscribe(topics []string) error

	// Connection management
	Start(ctx context.Context) error
	Stop() error
	Close() error
}

PubSub interface for unified publishing and subscribing

type RedisPubSub

type RedisPubSub struct {
	// contains filtered or unexported fields
}

RedisPubSub handles both publishing and subscribing to Redis

func NewRedisPubSub

func NewRedisPubSub(redisURL string) (*RedisPubSub, error)

NewRedisPubSub creates a new Redis pub/sub handler

func (*RedisPubSub) Close

func (r *RedisPubSub) Close() error

Close closes the Redis connection

func (*RedisPubSub) Publish

func (r *RedisPubSub) Publish(ctx context.Context, topic string, data string) error

Publish publishes an event to Redis

func (*RedisPubSub) Start

func (r *RedisPubSub) Start(ctx context.Context) error

Start starts the Redis pub/sub system

func (*RedisPubSub) Stop

func (r *RedisPubSub) Stop() error

Stop stops the Redis pub/sub system

func (*RedisPubSub) Subscribe

func (r *RedisPubSub) Subscribe(ctx context.Context, topics []string) (<-chan Event, error)

Subscribe subscribes to multiple topics and returns a channel of events

func (*RedisPubSub) Unsubscribe

func (r *RedisPubSub) Unsubscribe(topics []string) error

Unsubscribe unsubscribes from topics

type SSEConnection

type SSEConnection struct {
	// contains filtered or unexported fields
}

SSEConnection represents a single SSE connection to a peer

type SSEEvent

type SSEEvent struct {
	Topic    string
	Outpoint transaction.Outpoint
	PeerURL  string
}

SSEEvent represents an event received from SSE

type SSEManager

type SSEManager struct {
	// contains filtered or unexported fields
}

SSEManager provides SSE client management on top of any PubSub implementation

func NewSSEManager

func NewSSEManager(pubsub PubSub) *SSEManager

NewSSEManager creates a new SSE manager wrapping the given PubSub implementation

func (*SSEManager) AddSSEClient

func (s *SSEManager) AddSSEClient(topic string, client interface{}) error

AddSSEClient registers an SSE client for a specific topic

func (*SSEManager) Close

func (s *SSEManager) Close() error

Close closes the SSE manager and underlying PubSub

func (*SSEManager) Publish

func (s *SSEManager) Publish(ctx context.Context, topic string, data string) error

Expose the underlying PubSub methods

func (*SSEManager) RemoveSSEClient

func (s *SSEManager) RemoveSSEClient(topic string, client interface{}) error

RemoveSSEClient unregisters an SSE client from a topic

func (*SSEManager) Start

func (s *SSEManager) Start(ctx context.Context) error

Start begins listening for events and broadcasting to SSE clients

func (*SSEManager) Stop

func (s *SSEManager) Stop() error

Stop stops the SSE manager and underlying PubSub

type SSESync

type SSESync struct {
	// contains filtered or unexported fields
}

SSESync manages centralized SSE connections to peers and processes transactions

func NewSSESync

func NewSSESync(engine engine.OverlayEngineProvider, storage engine.Storage) *SSESync

NewSSESync creates a new centralized SSE sync manager

func (*SSESync) Start

func (s *SSESync) Start(ctx context.Context, peerTopics map[string][]string) error

Start begins SSE sync with configured peers

func (*SSESync) Stop

func (s *SSESync) Stop() error

Stop stops all SSE connections

type Storage

type Storage interface {
	DoesAppliedTransactionExist(ctx context.Context, tx *overlay.AppliedTransaction) (bool, error)
}

Storage interface for checking transaction existence

type SubscriberConfig

type SubscriberConfig struct {
	// Redis configuration
	RedisURL string

	// SSE peer configuration
	SSEPeers map[string][]string // peer URL -> topics

	// Engine for transaction processing
	Engine interface {
		Submit(ctx context.Context, taggedBEEF any, mode any, onSteakReady any) (any, error)
		GetStorage() Storage
	}
}

SubscriberConfig configures different types of subscribers

type TransactionFetcher

type TransactionFetcher interface {
	FetchBEEF(ctx context.Context, peerURL, topic string, outpoint transaction.Outpoint) ([]byte, error)
}

TransactionFetcher interface for fetching BEEF from peers

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL