pubsub

package
v0.3.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 27, 2026 License: Apache-2.0 Imports: 12 Imported by: 0

README

Pub/Sub Service

A publish/subscribe message bus for inter-service communication. Supports hierarchical topics and pluggable backends (memory, Redis, Kafka).

Overview

The pubsub service allows services to communicate asynchronously via topics. It integrates with Framingo's service lifecycle — services that implement common.MessageHandler or common.RawMessageHandler receive messages automatically when subscribed.

Key characteristics:

  • Hierarchical topics: subscribing to "app" receives messages from "app", "app/module", "app/module/component", etc.
  • Self-delivery skip: a publisher does not receive its own messages
  • Pluggable drivers: memory (in-process), Redis (cross-instance), Kafka (cross-instance)
  • Typed and raw messages: supports both common.Message (typed) and raw (kind, payload) dispatch

Structure

pubsub/
├── model.go          # Manager interface
├── manager.go        # Implementation
├── option.go         # Functional options
├── manager_test.go   # Tests
└── driver/
    ├── model.go      # Driver interface
    ├── util.go       # Shared types (subscriber, topicMatches)
    ├── memory.go     # In-process driver (trie-based)
    ├── redis.go      # Redis driver (cross-instance via Redis Pub/Sub)
    ├── kafka.go      # Kafka driver (cross-instance via Kafka consumer groups)
    ├── memory_test.go
    ├── redis_test.go
    └── kafka_test.go

Usage

Creating the Bus
import (
    "github.com/xhanio/framingo/pkg/services/pubsub"
    "github.com/xhanio/framingo/pkg/services/pubsub/driver"
)

// In-memory driver (single-instance)
bus := pubsub.New(
    driver.NewMemory(logger),
    pubsub.WithLogger(logger),
)
Subscribing

Subscribe a named service to a topic. The service must implement common.MessageHandler and/or common.RawMessageHandler to receive messages.

bus.Subscribe(myService, "/events")

Hierarchical matching: subscribing to "/events" also receives messages published to "/events/user", "/events/user/created", etc.

Publishing
// Publish with explicit topic and kind
bus.Publish(sender, "/events/user", "user.created", payload)

// SendMessage: topic is derived from sender.Name(), kind from message.Kind()
bus.SendMessage(ctx, sender, myMessage)

// SendRawMessage: topic is derived from sender.Name()
bus.SendRawMessage(ctx, sender, "custom.kind", rawPayload)
Unsubscribing
bus.Unsubscribe(myService, "/events")

Message Handlers

Services receive messages by implementing these interfaces from pkg/types/common:

// For typed messages (payload implements common.Message)
type MessageHandler interface {
    HandleMessage(ctx context.Context, e Message) error
}

// For any payload
type RawMessageHandler interface {
    HandleRawMessage(ctx context.Context, kind string, payload any) error
}

If a payload implements common.Message, both handlers are called. Otherwise, only RawMessageHandler is called.

Drivers

Memory

In-process driver using a trie for topic matching. No external dependencies.

d := driver.NewMemory(logger)
Redis

Cross-instance driver using Redis Pub/Sub pattern subscriptions. Messages are delivered locally and published to Redis for other instances.

import "github.com/redis/go-redis/v9"

client := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
d, err := driver.NewRedis(client, logger)
Kafka

Cross-instance driver using Kafka. Each instance gets a unique consumer group for broadcast semantics (every instance receives all messages).

d, err := driver.NewKafka(
    []string{"localhost:9092"}, // brokers
    "my-app",                   // group ID prefix
    logger,
)

Manager Interface

type Manager interface {
    common.Service
    common.Daemon
    common.Initializable
    common.Debuggable
    common.MessageSender
    common.RawMessageSender

    Publish(from common.Named, topic string, kind string, payload any)
    Subscribe(svc common.Named, topic string)
    Unsubscribe(svc common.Named, topic string)
}

Options

Option Description
WithLogger(log.Logger) Set the logger
WithName(string) Override the auto-derived name

Topic Hierarchy Example

In the example server component, all services are subscribed to three levels:

bus.Subscribe(svc, "/")
bus.Subscribe(svc, "/components/{component}")
bus.Subscribe(svc, "/components/{component}/services/{service}")

This allows publishing at different scopes:

  • / — broadcast to all services across all components
  • /components/myapp — target all services in a specific component
  • /components/myapp/services/db — target a specific service

See Also

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Manager

type Manager interface {
	common.Service
	common.Daemon
	common.Initializable
	common.Debuggable
	common.MessageSender
	common.RawMessageSender

	// Publish sends a message to all subscribers of the given topic.
	// The publisher (from) will NOT receive its own message.
	// If payload implements common.Message, MessageHandler subscribers are notified.
	// RawMessageHandler subscribers are always notified.
	Publish(from common.Named, topic string, kind string, payload any)

	// Subscribe registers a service to receive events on the given topic.
	// Topics are hierarchical: subscribing to "app" receives events from
	// "app", "app/module", "app/module/component", etc.
	Subscribe(svc common.Named, topic string)

	// Unsubscribe removes a service's subscription from the given topic.
	Unsubscribe(svc common.Named, topic string)
}

func New

func New(b driver.Driver, opts ...Option) Manager

type Option

type Option func(*manager)

func WithLogger

func WithLogger(logger log.Logger) Option

func WithName

func WithName(name string) Option

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL