dndm

package module
v0.0.0-...-c7b5b61 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 15, 2025 License: MIT Imports: 14 Imported by: 3

README

dndm

Decentralized Named Data Messaging - A communication library for efficient, typed message passing between servers, IoT devices, robots, and embedded systems.

Overview

DNDM is inspired by LibP2P, Named Data Networks, Pub/Sub architectures, and ROS. The main purpose is to provide a framework for seamless decentralized, distributed, and modular architectures, primarily for robotic applications.

Key Features
  • Typed Message Passing: Type-safe communication using protobuf specifications
  • Zero-Copy: Efficient in-process communication via channels
  • Automatic Linking: Intent-Interest pattern automatically connects publishers and subscribers
  • Multiple Transports: Direct (in-process), Remote (TCP/UDP/Serial), Mesh (full-mesh network)
  • Peer Discovery: Automatic peer discovery and mesh network formation
  • Route-Based: Typed routes combine message type with path for type-safe routing

Architecture

DNDM consists of Intents and Interests that are communicated and managed via Endpoints. Each Intent and Interest is marked with a Route that uniquely identifies the data by combining the stream name and message type. A Linker connects an Intent with an Interest by routing messages efficiently.

graph TB
    Router[Router]
    Router --> IRouter[Intent Routers]
    Router --> INTRouter[Interest Routers]
    Router --> Linker[Linker]
    Router --> DirectEP[Direct Endpoint]
    Router --> RemoteEP[Remote Endpoint]
    Router --> MeshEP[Mesh Endpoint]
    
    DirectEP --> Linker
    RemoteEP --> Linker
    MeshEP --> Container[Container]
    Container --> RemoteEP2[Remote Endpoints]
    
    RemoteEP --> Network[Network Layer]
    Network --> Codec[Codec]
    
    Network --> TCP[TCP/UDP]
    Network --> Serial[Serial]
    Network --> NATS[NATS - Planned]
Core Concepts
Intent

An Intent declares availability to publish data on a specific route. When an Interest matches an Intent, they are automatically linked, and the Intent receives a notification to start publishing.

Interest

An Interest declares desire to receive data on a specific route. It's similar to a Subscription in Pub/Sub systems but with stronger typing.

Route

A Route combines a message type with a path. Format: TypeName@path (e.g., SensorData@sensors.temperature).

  • Plain Route: Human-readable format Foo@example.path
  • Hashed Route: Opaque format prefix#hash for security (Object-Capability model)
Endpoint Types

Direct Endpoint: In-process communication using Go channels (zero-copy)

Remote Endpoint: Cross-process/system communication via network connections (TCP/UDP/Serial)

Mesh Endpoint: Distributed full-mesh network with automatic peer discovery

Data Flow
sequenceDiagram
    participant Pub as Publisher
    participant Router as Router
    participant EP as Endpoint
    participant Intent as Intent
    participant Linker as Linker
    participant Interest as Interest
    participant Sub as Subscriber
    
    Pub->>Router: Publish(route)
    Router->>EP: Create Intent
    EP->>Linker: Register Intent
    
    Sub->>Router: Subscribe(route)
    Router->>EP: Create Interest
    EP->>Linker: Register Interest
    
    Linker->>Intent: Link(Interest)
    Linker->>Interest: Link(Intent)
    Intent->>Pub: Notify(route)
    
    Pub->>Intent: Send(message)
    Intent->>Interest: Route message
    Interest->>Sub: Deliver message

Usage Examples

Basic Publish-Subscribe
// Publisher
intent, err := router.Publish("sensors.temperature", &TemperatureData{})
if err != nil {
    log.Fatal(err)
}
defer intent.Close()

// Wait for interest
select {
case route := <-intent.Interest():
    // Send data
    intent.Send(ctx, &TemperatureData{Value: 25.5})
}

// Subscriber
interest, err := router.Subscribe("sensors.temperature", &TemperatureData{})
if err != nil {
    log.Fatal(err)
}
defer interest.Close()

// Receive data
for msg := range interest.C() {
    data := msg.(*TemperatureData)
    process(data)
}
Multi-Device Setup
// Create router with multiple endpoints
router, err := dndm.New(
    dndm.WithContext(ctx),
    dndm.WithQueueSize(10),
    dndm.WithEndpoint(direct.New(10)),           // In-process
    dndm.WithEndpoint(remote.New(...)),         // Network
    dndm.WithEndpoint(mesh.New(...)),           // Mesh network
)

Use Cases

Complex Robot System

This diagram shows a complete robot system with multiple processing modules, sensors, actuators, and decision-making components.

graph TB
    subgraph "Embedded Chip"
        Sensor[Sensors<br/>Produces: SensorData]
        Actuator[Actuators<br/>Consumes: MotorControl]
    end
    
    subgraph "SBC #1 - Camera"
        Camera[Camera Module<br/>Produces: CameraImage]
    end
    
    subgraph "SBC #1 - Image Processing"
        ImageProc[Image Processing<br/>Consumes: CameraImage<br/>Produces: ImageFeatures, DepthMap]
    end
    
    subgraph "SBC #2 - Visual Odometry"
        VIO[Visual Odometry<br/>Consumes: ImageFeatures, DepthMap<br/>Produces: VisualOdometry]
    end
    
    subgraph "SBC #3 - Navigation"
        Nav[Navigation Module<br/>Consumes: VisualOdometry, SensorData, MapData<br/>Produces: Location, Direction]
    end
    
    subgraph "SBC #4 - Map Building"
        MapBuild[Map Building<br/>Consumes: MapUpdate<br/>Produces: MapData]
    end
    
    subgraph "SBC #5 - Decision"
        Decision[Decision Module<br/>Consumes: Location, Direction, UserInput, Goals<br/>Produces: MotorControl]
    end
    
    subgraph "Cloud/User"
        User[User Commands<br/>via NATS<br/>Produces: UserInput, Goals]
    end
    
    Sensor -->|SensorData| Nav
    Camera -->|CameraImage| ImageProc
    ImageProc -->|ImageFeatures, DepthMap| VIO
    VIO -->|VisualOdometry| Nav
    MapBuild -->|MapData| Nav
    Nav -->|Location, Direction| Decision
    User -->|UserInput, Goals| Decision
    Decision -->|MotorControl| Actuator
    
    style Sensor fill:#e1f5ff
    style Actuator fill:#ffe1f5
    style Camera fill:#e1f5ff
    style ImageProc fill:#fff5e1
    style VIO fill:#fff5e1
    style Nav fill:#fff5e1
    style MapBuild fill:#fff5e1
    style Decision fill:#e1ffe1
    style User fill:#f0f0f0

Message Routes:

  • SensorData@sensors.data
  • CameraImage@cameras.front
  • ImageFeatures@image.features
  • DepthMap@image.depth
  • VisualOdometry@odometry.visual
  • MapData@map.global
  • Location@navigation.location
  • Direction@navigation.direction
  • MotorControl@actuators.motors
  • UserInput@user.commands
  • Goals@mission.goals
Simple Sensor Fusion (Embedded + Raspberry Pi)

A simpler use case with a sensor chip connected to a Raspberry Pi via serial port.

graph LR
    subgraph "Embedded Sensor Chip"
        SensorChip[Sensor Chip<br/>Produces: SensorData<br/>Route: SensorData@sensors.raw]
    end
    
    subgraph "Raspberry Pi"
        SerialPort[Serial Port<br/>Transport: serial:///dev/ttyUSB0]
        Fusion[Sensor Fusion<br/>Consumes: SensorData<br/>Produces: SensorFusion<br/>Route: SensorFusion@sensors.fused]
    end
    
    SensorChip -->|Serial| SerialPort
    SerialPort -->|SensorData| Fusion
    Fusion -->|SensorFusion| Output[Output<br/>Position, Orientation,<br/>Velocity Vector,<br/>Rotation Vector]
    
    style SensorChip fill:#e1f5ff
    style Fusion fill:#fff5e1
    style Output fill:#e1ffe1

Message Routes:

  • Input: SensorData@sensors.raw (from sensor chip)
  • Output: SensorFusion@sensors.fused (contains Position, Orientation, Velocity Vector, Rotation Vector)

Configuration:

// Sensor chip peer
sensorPeer := "serial:///dev/ttyUSB0/sensors.chip?baud=115200"

// Raspberry Pi peer
rpiPeer := "tcp://192.168.1.100:8080/rpi.fusion"
Stereo Camera Processing

Multiple Raspberry Pi devices processing left and right camera feeds in parallel, then combining results.

graph TB
    subgraph "Raspberry Pi #1 - Camera Hub"
        LeftCam[Left Camera<br/>Produces: LeftImage<br/>Route: CameraImage@cameras.left]
        RightCam[Right Camera<br/>Produces: RightImage<br/>Route: CameraImage@cameras.right]
        Hub[USB Hub]
    end
    
    subgraph "Raspberry Pi #2"
        LeftProc[Left Processor<br/>Consumes: LeftImage<br/>Produces: LeftFeatures<br/>Route: ImageFeatures@left.features]
    end
    
    subgraph "Raspberry Pi #3"
        RightProc[Right Processor<br/>Consumes: RightImage<br/>Produces: RightFeatures<br/>Route: ImageFeatures@right.features]
    end
    
    subgraph "Raspberry Pi #4 - Stereo Fusion"
        Stereo[Stereo Fusion<br/>Consumes: LeftFeatures, RightFeatures<br/>Produces: StereoData<br/>Route: StereoData@stereo.result]
    end
    
    LeftCam -->|LeftImage| Hub
    RightCam -->|RightImage| Hub
    Hub -->|USB/TCP| LeftProc
    Hub -->|USB/TCP| RightProc
    LeftProc -->|TCP/Mesh| Stereo
    RightProc -->|TCP/Mesh| Stereo
    
    style LeftCam fill:#e1f5ff
    style RightCam fill:#e1f5ff
    style LeftProc fill:#fff5e1
    style RightProc fill:#fff5e1
    style Stereo fill:#e1ffe1

Message Routes:

  • CameraImage@cameras.left - Left camera feed
  • CameraImage@cameras.right - Right camera feed
  • ImageFeatures@left.features - Processed left features
  • ImageFeatures@right.features - Processed right features
  • StereoData@stereo.result - Final stereo processing result

Network Topology:

  • RPI #1, #2, #3, #4 connected via mesh network or TCP
  • All devices discover each other automatically
  • Messages route based on route prefixes and peer paths

Transport Protocols

Direct (In-Process)
  • Zero-copy message passing via Go channels
  • No network overhead
  • Perfect for multi-module applications
TCP/UDP
  • Standard network protocol
  • Peer format: tcp://host:port/path or udp://host:port/path
  • Reliable delivery (TCP) or low-latency (UDP)
Serial
  • Serial port communication
  • Peer format: serial:///dev/ttyUSB0/path?baud=115200
  • Parameters: baud, parity, stop bits, data bits
NATS (Planned)
  • Pub/Sub message broker
  • Peer format: nats://nats-server:4222/path
  • Supports JetStream for persistence
  • Automatic message routing

Routes

A Route combines a message type with a path: TypeName@path

Examples:

  • SensorData@sensors.temperature
  • CameraImage@cameras.front
  • MotorControl@actuators.motors
  • Location@navigation.position

Route Rules:

  • Path must not contain @ or # characters
  • TypeName is the protobuf message name
  • Path is hierarchical (dot-separated)
Peer Paths and Routing

Each peer has a path that acts as a namespace prefix:

  • Peer A: tcp://192.168.1.1:8080/robot.sensors
  • Peer B: tcp://192.168.1.2:8080/robot.processors

Peer A publishes SensorData@robot.sensors.temperature → Routed to Peer B if B subscribes to routes matching robot.sensors.*

Getting Started

Installation
go get github.com/itohio/dndm
Basic Example
package main

import (
    "context"
    "log"
    
    "github.com/itohio/dndm"
    "github.com/itohio/dndm/endpoint/direct"
    "google.golang.org/protobuf/proto"
)

func main() {
    ctx := context.Background()
    
    // Create router with direct endpoint
    router, err := dndm.New(
        dndm.WithContext(ctx),
        dndm.WithQueueSize(10),
        dndm.WithEndpoint(direct.New(10)),
    )
    if err != nil {
        log.Fatal(err)
    }
    defer router.Close()
    
    // Publisher
    var msg proto.Message
    intent, err := router.Publish("example.data", msg)
    if err != nil {
        log.Fatal(err)
    }
    
    // Subscriber
    interest, err := router.Subscribe("example.data", msg)
    if err != nil {
        log.Fatal(err)
    }
    
    // Use intent/interest...
}
Network Example
import (
    "github.com/itohio/dndm/endpoint/mesh"
    "github.com/itohio/dndm/network"
    "github.com/itohio/dndm/network/net"
)

// Create network node
peer, _ := dndm.PeerFromString("tcp://localhost:8080/robot")
node, _ := net.New(slog.Default(), peer)
factory, _ := network.New(node)

// Create mesh endpoint
meshEP, _ := mesh.New(
    peer,
    10,                    // buffer size
    5,                     // num dialers
    time.Second*10,        // timeout
    time.Second*3,         // ping duration
    factory,
    nil,                   // peers
)

// Create router with mesh endpoint
router, _ := dndm.New(
    dndm.WithContext(ctx),
    dndm.WithEndpoint(meshEP),
)

Documentation

Package Specifications

Status

This is the initial version focusing on:

  • ✅ In-process communication (Direct endpoint)
  • ✅ Cross-process communication (Remote endpoint via TCP/UDP/Serial)
  • ✅ Full-mesh network (Mesh endpoint)
  • ✅ Automatic peer discovery
  • ✅ Route-based typed message passing
  • ✅ Protobuf message support
  • 🔄 NATS transport (planned)

Contributing

Contributions are welcome! Please see the design documents and specifications before implementing new features.

License

[License information]

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Base

type Base struct {
	// contains filtered or unexported fields
}

Base provides basic context management functionalities for components that require initialization with a context, cancellation, and cleanup operations.

func NewBase

func NewBase() Base

NewBase initializes a new Base instance without a specific context.

func NewBaseWithCtx

func NewBaseWithCtx(ctx context.Context) Base

NewBaseWithCtx initializes a new Base instance with the provided context.

func (*Base) AddOnClose

func (t *Base) AddOnClose(f func())

AddOnClose registers a function to be called upon the context's cancellation.

func (*Base) Close

func (t *Base) Close() error

Close cleans up resources and cancels the context without a specific cause.

func (*Base) CloseCause

func (t *Base) CloseCause(err error) error

CloseCause cleans up resources and cancels the context with a specified error cause.

func (*Base) Ctx

func (t *Base) Ctx() context.Context

Ctx returns the current context associated with the Base instance.

func (*Base) Init

func (t *Base) Init(ctx context.Context) error

Init sets up the Base instance with a cancellation context.

type BaseEndpoint

type BaseEndpoint struct {
	Base

	Log           *slog.Logger
	OnAddIntent   IntentCallback
	OnAddInterest InterestCallback
	Size          int
	// contains filtered or unexported fields
}

BaseEndpoint is a concrete implementation of the Endpoint interface that provides methods for endpoint initialization, managing lifecycle, and handling intents and interests.

func NewEndpointBase

func NewEndpointBase(name string, size int) BaseEndpoint

NewEndpointBase creates a new BaseEndpoint with a specified name and size.

func (*BaseEndpoint) Init

func (t *BaseEndpoint) Init(ctx context.Context, logger *slog.Logger, addIntent IntentCallback, addInterest InterestCallback) error

Init initializes the BaseEndpoint with necessary callbacks and logging capabilities.

func (*BaseEndpoint) Name

func (t *BaseEndpoint) Name() string

Name returns the name of the endpoint.

func (*BaseEndpoint) SetName

func (t *BaseEndpoint) SetName(name string)

SetName sets or updates the name of the endpoint.

type CauseCloser

type CauseCloser interface {
	CloseCause(e error) error
}

CauseCloser interface for objects that accepts closure reason

type Container

type Container struct {
	BaseEndpoint
	// contains filtered or unexported fields
}

Container implements an aggregate Endpoint that stores and manages multiple endpoints, and it coordinates the linking of intents and interests across these endpoints.

Actions:

  • Add/Remove endpoints look for existing intents and interests and registers them to respective routers.
  • Add calls initialize on new endpoint
  • Publish/Subscribe look for existing endpoints and registers intents/interests respectively.

func NewContainer

func NewContainer(name string, size int) *Container

NewContainer creates a new Container with a given name and size.

func (*Container) Add

func (t *Container) Add(ep Endpoint) error

func (*Container) Close

func (t *Container) Close() error

func (*Container) Endpoint

func (t *Container) Endpoint(compare func(Endpoint) bool) []Endpoint

func (*Container) Init

func (t *Container) Init(ctx context.Context, logger *slog.Logger, addIntent IntentCallback, addInterest InterestCallback) error

Init is used by the Router to initialize this endpoint.

func (*Container) Intent

func (t *Container) Intent(compare func(Intent) bool) []Intent

func (*Container) Interest

func (t *Container) Interest(compare func(Interest) bool) []Interest

func (*Container) OnClose

func (t *Container) OnClose(f func()) Endpoint

func (*Container) Publish

func (t *Container) Publish(route Route, opt ...PubOpt) (Intent, error)

Publish will advertise an intent to publish named and typed data.

func (*Container) Remove

func (t *Container) Remove(ep Endpoint) error

func (*Container) Subscribe

func (t *Container) Subscribe(route Route, opt ...SubOpt) (Interest, error)

Subscribe will advertise an interest in named and typed data.

type Endpoint

type Endpoint interface {
	io.Closer
	OnClose(func()) Endpoint
	Name() string
	// Publish will advertise an intent to publish named and typed data.
	Publish(route Route, opt ...PubOpt) (Intent, error)
	// Subscribe will advertise an interest in named and typed data.
	Subscribe(route Route, opt ...SubOpt) (Interest, error)
	// Init is used by the Router to initialize this endpoint.
	Init(ctx context.Context, logger *slog.Logger, addIntent IntentCallback, addInterest InterestCallback) error
}

Endpoint describes a component that can register, manage, and link intents and interests based on data routes. It provides methods for initialization, publishing intents, subscribing interests, and managing its lifecycle.

type FanInInterest

type FanInInterest struct {
	// contains filtered or unexported fields
}

FanInInterest aggregates multiple interests, routing incoming messages to a single channel.

func NewFanInInterest

func NewFanInInterest(ctx context.Context, route Route, size int, interests ...Interest) (*FanInInterest, error)

NewFanInInterest creates a new FanInInterest with specified context, route, size, and initial interests.

func (*FanInInterest) AddInterest

func (i *FanInInterest) AddInterest(interest Interest) error

AddInterest registers an interest and sets up the routing.

func (*FanInInterest) C

func (i *FanInInterest) C() <-chan proto.Message

func (*FanInInterest) Close

func (i *FanInInterest) Close() error

func (*FanInInterest) Ctx

func (i *FanInInterest) Ctx() context.Context

func (*FanInInterest) OnClose

func (i *FanInInterest) OnClose(f func()) Interest

func (*FanInInterest) RemoveInterest

func (i *FanInInterest) RemoveInterest(interest Interest)

func (*FanInInterest) Route

func (i *FanInInterest) Route() Route

type FanOutIntent

type FanOutIntent struct {
	// contains filtered or unexported fields
}

FanOutIntent represents an Intent that manages multiple underlying intents. It distributes incoming messages across all registered intents and synchronizes their lifecycle.

func NewFanOutIntent

func NewFanOutIntent(ctx context.Context, route Route, size int, intents ...Intent) (*FanOutIntent, error)

NewFanOutIntent creates a FanOutIntent with given context, route, size, and optional initial intents.

func (*FanOutIntent) AddIntent

func (i *FanOutIntent) AddIntent(intent Intent) error

AddIntent adds a new intent of the same type, creates a notify runner and links wrappers to it.

func (*FanOutIntent) Close

func (i *FanOutIntent) Close() error

func (*FanOutIntent) Ctx

func (i *FanOutIntent) Ctx() context.Context

func (*FanOutIntent) Interest

func (i *FanOutIntent) Interest() <-chan Route

func (*FanOutIntent) Notify

func (i *FanOutIntent) Notify()

func (*FanOutIntent) OnClose

func (i *FanOutIntent) OnClose(f func()) Intent

func (*FanOutIntent) RemoveIntent

func (i *FanOutIntent) RemoveIntent(intent Intent)

func (*FanOutIntent) Route

func (i *FanOutIntent) Route() Route

func (*FanOutIntent) Send

func (i *FanOutIntent) Send(ctx context.Context, msg proto.Message) error

type HashedRoute

type HashedRoute struct {
	// contains filtered or unexported fields
}

func NewHashedRoute

func NewHashedRoute(prefix, path string, msg proto.Message) (HashedRoute, error)

NewHashedRoute creates a new Hashed Route instance given a path and a proto.Message. The route's identifier is formed by concatenating the provided prefix with the hashed route, separated by an "#" symbol.

Route is represented by the hash, while the prefix is used for routing Remote Interests and Intents.

func (HashedRoute) Equal

func (r HashedRoute) Equal(route Route) bool

func (HashedRoute) ID

func (r HashedRoute) ID() string

func (HashedRoute) Path

func (r HashedRoute) Path() string

func (HashedRoute) String

func (r HashedRoute) String() string

func (HashedRoute) Type

func (r HashedRoute) Type() reflect.Type

type Intent

type Intent interface {
	io.Closer
	OnClose(func()) Intent
	Route() Route
	// Interest returns a channel that contains Routes that are interested in the data indicated by the intent.
	// Users should start sending the data once an event is received on this channel.
	Interest() <-chan Route
	// Send will send a message to any recepient that indicated an interest.
	Send(context.Context, proto.Message) error
}

Intent is an interface that defines methods to manage data provision requests. It encapsulates behaviors to close the intent, listen for interest on a route, send messages, and execute closure callbacks.

type IntentCallback

type IntentCallback func(intent Intent, ep Endpoint) error

IntentCallback is a function type used for callbacks upon adding an Intent.

type IntentInternal

type IntentInternal interface {
	Intent
	Link(chan<- proto.Message)
	Notify()
	Ctx() context.Context
}

IntentInternal extends Intent with functionalities for linking and notifications.

type IntentRouter

type IntentRouter struct {
	*FanOutIntent
	// contains filtered or unexported fields
}

IntentRouter manages a collection of intents, routing messages and notifications among them.

func NewIntentRouter

func NewIntentRouter(ctx context.Context, route Route, size int, intents ...Intent) (*IntentRouter, error)

NewIntentRouter creates a new IntentRouter with a given context, route, size, and optionally pre-registered intents.

func (*IntentRouter) Notify

func (i *IntentRouter) Notify()

func (*IntentRouter) Wrap

func (i *IntentRouter) Wrap() *intentWrapper

Wrap returns a wrapped intent. Messages sent to this wrapped intent will be sent to all the registered intents.

type IntentWrapperFunc

type IntentWrapperFunc func(IntentInternal) (IntentInternal, error)

IntentWrapperFunc is a type of function intended to wrap or modify an IntentInternal object. It accepts an IntentInternal as input and returns a possibly modified IntentInternal and an error. The primary use case for this function is to provide a mechanism to alter or augment the behavior of an Intent object at runtime, such as adding logging, validation, or other cross-cutting concerns.

Parameters:

intent - The IntentInternal to wrap or modify.

Returns:

IntentInternal - The wrapped or modified IntentInternal.
error - An error if something goes wrong during the wrapping/modification process.

type Interest

type Interest interface {
	io.Closer
	OnClose(func()) Interest
	Route() Route
	// C returns a channel that contains messages. Users should typecast to specific message type that
	// was registered with the interest.
	C() <-chan proto.Message
}

Interest defines the behavior for components interested in receiving named data. It allows closing of the interest, setting closure callbacks, and accessing routes and message channels.

User should consume C of the interest until it is closed or no longer needed. Messages will be delivered only when a corresponding Intent is discovered.

type InterestCallback

type InterestCallback func(interest Interest, ep Endpoint) error

InterestCallback is a function type used for callbacks upon adding an Interest.

type InterestInternal

type InterestInternal interface {
	Interest
	Ctx() context.Context
	MsgC() chan<- proto.Message
}

InterestInternal extends Interest with additional internal management capabilities.

type InterestRouter

type InterestRouter struct {
	*FanInInterest
	// contains filtered or unexported fields
}

InterestRouter manages a collection of interests, directing incoming messages to multiple subscribers.

func NewInterestRouter

func NewInterestRouter(ctx context.Context, route Route, size int, interests ...Interest) (*InterestRouter, error)

NewInterestRouter initializes a new InterestRouter with a context, route, size, and initial interests.

func (*InterestRouter) Wrap

func (i *InterestRouter) Wrap() *interestWrapper

Wrap returns a wrapped interest that collects messages from all registered interests.

type InterestWrapperFunc

type InterestWrapperFunc func(InterestInternal) (InterestInternal, error)

InterestWrapperFunc is a type of function designed to wrap or modify an InterestInternal object. Similar to IntentWrapperFunc, it takes an InterestInternal as input and returns a potentially modified InterestInternal and an error. This function type facilitates dynamic alterations to the behavior of Interest objects, enabling enhancements such as security checks, data enrichment, or custom event handling to be injected transparently.

Parameters:

interest - The InterestInternal to wrap or modify.

Returns:

InterestInternal - The wrapped or modified InterestInternal.
error - An error if there is a failure in the wrapping/modification process.
type Link struct {
	Base
	// contains filtered or unexported fields
}

Link represents a managed connection between an intent and an interest. It handles synchronization between these components, ensuring that messages from the intent are directed to the interest's channel and notifying each other of changes in state or context.

func NewLink(ctx context.Context, intent IntentInternal, interest InterestInternal) *Link

NewLink creates a new Link instance initialized with the provided context, intent, and interest. It sets up an onClose behavior to sever the link cleanly when the Link object is closed.

func (l *Link) Link()

Link starts the process of linking the intent with the interest. It configures the intent to send messages to the channel of the interest and ensures that notifications are sent to the intent to signal the establishment of the link. This method ensures that the link operation is performed only once. The operation runs concurrently and listens for context cancellation signals from the Link itself or either the intent or interest to properly manage resource cleanup.

func (*Link) Notify

func (l *Link) Notify()

Notify sends a notification to the intent, typically used to indicate state changes or updates in the interest that need to be communicated to the intent.

type Linker

type Linker struct {
	Base
	// contains filtered or unexported fields
}

Link represents a dynamic connection between an Intent and an Interest. It manages the lifecycle and interactions between linked entities, ensuring that actions on one entity are reflected on the other. For example, closing an Intent should also close the linked Interest.

func NewLinker

func NewLinker(ctx context.Context, log *slog.Logger, size int, addIntent func(intent Intent) error, addInterest func(interest Interest) error, beforeLink func(Intent, Interest) error) *Linker

NewLinker creates a new Linker with provided context, logger, size, and callback functions. It initializes the Linker with empty maps for intents and interests and sets up a beforeLink function if not provided.

func (*Linker) AddIntent

func (t *Linker) AddIntent(route Route) (Intent, error)

AddIntent registers a new intent by its route. If a matching intent is found, it attempts to link it with a corresponding interest if available.

func (*Linker) AddIntentWithWrapper

func (t *Linker) AddIntentWithWrapper(route Route, wrapper IntentWrapperFunc) (Intent, error)

AddIntentWithWrapper acts like AddIntent but allows the intent to be modified or wrapped by a provided function before being added to the Linker.

func (*Linker) AddInterest

func (t *Linker) AddInterest(route Route) (Interest, error)

AddInterest registers a new interest by its route. If a matching interest is found, it attempts to link it with a corresponding intent if available.

func (*Linker) AddInterestWithWrapper

func (t *Linker) AddInterestWithWrapper(route Route, wrapper InterestWrapperFunc) (Interest, error)

AddInterestWithWrapper acts like AddInterest but allows the interest to be modified or wrapped by a provided function before being added to the Linker.

func (*Linker) Close

func (t *Linker) Close() error

Close shuts down the Linker and cleans up all resources associated with it. It iterates through all intents and interests, closes them, and finally clears the collections.

func (*Linker) Intent

func (t *Linker) Intent(route Route) (Intent, bool)

Intent returns an intent identified by a route if found.

func (*Linker) Interest

func (t *Linker) Interest(route Route) (Interest, bool)

Interest retrieves an interest by its route if it exists within the Linker.

func (*Linker) RemoveIntent

func (t *Linker) RemoveIntent(route Route) error

RemoveIntent removes an intent by its route and cleans up any associated links.

func (*Linker) RemoveInterest

func (t *Linker) RemoveInterest(route Route) error

RemoveInterest removes an interest by its route and cleans up any associated links.

type LocalIntent

type LocalIntent struct {
	Base
	// contains filtered or unexported fields
}

LocalIntent represents a simple intent that is local to the process. LocalIntent can be linked with LocalInterest or RemoteInterest.

func NewIntent

func NewIntent(ctx context.Context, route Route, size int) *LocalIntent

NewIntent initializes a new LocalIntent with specified context, route, and buffer size for the notifications channel.

func (*LocalIntent) Interest

func (i *LocalIntent) Interest() <-chan Route
func (i *LocalIntent) Link(c chan<- proto.Message)

func (*LocalIntent) LinkedC

func (i *LocalIntent) LinkedC() chan<- proto.Message

LinkedC is used for internal debugging and race condition hunting

func (*LocalIntent) Notify

func (i *LocalIntent) Notify()

func (*LocalIntent) OnClose

func (t *LocalIntent) OnClose(f func()) Intent

func (*LocalIntent) Route

func (i *LocalIntent) Route() Route

func (*LocalIntent) Send

func (i *LocalIntent) Send(ctx context.Context, msg proto.Message) error

type LocalInterest

type LocalInterest struct {
	Base
	// contains filtered or unexported fields
}

LocalInterest manages a local interest for receiving data based on a specific route.

func NewInterest

func NewInterest(ctx context.Context, route Route, size int) *LocalInterest

NewInterest creates a new LocalInterest with a specified context, route, and buffer size.

func (*LocalInterest) C

func (i *LocalInterest) C() <-chan proto.Message

func (*LocalInterest) MsgC

func (i *LocalInterest) MsgC() chan<- proto.Message

func (*LocalInterest) OnClose

func (t *LocalInterest) OnClose(f func()) Interest

func (*LocalInterest) Route

func (i *LocalInterest) Route() Route

type MockEndpoint

type MockEndpoint struct {
	mock.Mock
}

MockEndpoint is an autogenerated mock type for the Endpoint type

func NewMockEndpoint

func NewMockEndpoint(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockEndpoint

NewMockEndpoint creates a new instance of MockEndpoint. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockEndpoint) Close

func (_m *MockEndpoint) Close() error

Close provides a mock function with given fields:

func (*MockEndpoint) EXPECT

func (_m *MockEndpoint) EXPECT() *MockEndpoint_Expecter

func (*MockEndpoint) Init

func (_m *MockEndpoint) Init(ctx context.Context, logger *slog.Logger, addIntent IntentCallback, addInterest InterestCallback) error

Init provides a mock function with given fields: ctx, logger, addIntent, addInterest

func (*MockEndpoint) Name

func (_m *MockEndpoint) Name() string

Name provides a mock function with given fields:

func (*MockEndpoint) OnClose

func (_m *MockEndpoint) OnClose(_a0 func()) Endpoint

OnClose provides a mock function with given fields: _a0

func (*MockEndpoint) Publish

func (_m *MockEndpoint) Publish(route Route, opt ...PubOpt) (Intent, error)

Publish provides a mock function with given fields: route, opt

func (*MockEndpoint) Subscribe

func (_m *MockEndpoint) Subscribe(route Route, opt ...SubOpt) (Interest, error)

Subscribe provides a mock function with given fields: route, opt

type MockEndpoint_Close_Call

type MockEndpoint_Close_Call struct {
	*mock.Call
}

MockEndpoint_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'

func (*MockEndpoint_Close_Call) Return

func (*MockEndpoint_Close_Call) Run

func (_c *MockEndpoint_Close_Call) Run(run func()) *MockEndpoint_Close_Call

func (*MockEndpoint_Close_Call) RunAndReturn

func (_c *MockEndpoint_Close_Call) RunAndReturn(run func() error) *MockEndpoint_Close_Call

type MockEndpoint_Expecter

type MockEndpoint_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockEndpoint_Expecter) Close

Close is a helper method to define mock.On call

func (*MockEndpoint_Expecter) Init

func (_e *MockEndpoint_Expecter) Init(ctx interface{}, logger interface{}, addIntent interface{}, addInterest interface{}) *MockEndpoint_Init_Call

Init is a helper method to define mock.On call

  • ctx context.Context
  • logger *slog.Logger
  • addIntent IntentCallback
  • addInterest InterestCallback

func (*MockEndpoint_Expecter) Name

Name is a helper method to define mock.On call

func (*MockEndpoint_Expecter) OnClose

func (_e *MockEndpoint_Expecter) OnClose(_a0 interface{}) *MockEndpoint_OnClose_Call

OnClose is a helper method to define mock.On call

  • _a0 func()

func (*MockEndpoint_Expecter) Publish

func (_e *MockEndpoint_Expecter) Publish(route interface{}, opt ...interface{}) *MockEndpoint_Publish_Call

Publish is a helper method to define mock.On call

  • route Route
  • opt ...PubOpt

func (*MockEndpoint_Expecter) Subscribe

func (_e *MockEndpoint_Expecter) Subscribe(route interface{}, opt ...interface{}) *MockEndpoint_Subscribe_Call

Subscribe is a helper method to define mock.On call

  • route Route
  • opt ...SubOpt

type MockEndpoint_Init_Call

type MockEndpoint_Init_Call struct {
	*mock.Call
}

MockEndpoint_Init_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Init'

func (*MockEndpoint_Init_Call) Return

func (*MockEndpoint_Init_Call) Run

func (_c *MockEndpoint_Init_Call) Run(run func(ctx context.Context, logger *slog.Logger, addIntent IntentCallback, addInterest InterestCallback)) *MockEndpoint_Init_Call

func (*MockEndpoint_Init_Call) RunAndReturn

type MockEndpoint_Name_Call

type MockEndpoint_Name_Call struct {
	*mock.Call
}

MockEndpoint_Name_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Name'

func (*MockEndpoint_Name_Call) Return

func (*MockEndpoint_Name_Call) Run

func (_c *MockEndpoint_Name_Call) Run(run func()) *MockEndpoint_Name_Call

func (*MockEndpoint_Name_Call) RunAndReturn

func (_c *MockEndpoint_Name_Call) RunAndReturn(run func() string) *MockEndpoint_Name_Call

type MockEndpoint_OnClose_Call

type MockEndpoint_OnClose_Call struct {
	*mock.Call
}

MockEndpoint_OnClose_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OnClose'

func (*MockEndpoint_OnClose_Call) Return

func (*MockEndpoint_OnClose_Call) Run

func (_c *MockEndpoint_OnClose_Call) Run(run func(_a0 func())) *MockEndpoint_OnClose_Call

func (*MockEndpoint_OnClose_Call) RunAndReturn

func (_c *MockEndpoint_OnClose_Call) RunAndReturn(run func(func()) Endpoint) *MockEndpoint_OnClose_Call

type MockEndpoint_Publish_Call

type MockEndpoint_Publish_Call struct {
	*mock.Call
}

MockEndpoint_Publish_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Publish'

func (*MockEndpoint_Publish_Call) Return

func (*MockEndpoint_Publish_Call) Run

func (_c *MockEndpoint_Publish_Call) Run(run func(route Route, opt ...PubOpt)) *MockEndpoint_Publish_Call

func (*MockEndpoint_Publish_Call) RunAndReturn

func (_c *MockEndpoint_Publish_Call) RunAndReturn(run func(Route, ...PubOpt) (Intent, error)) *MockEndpoint_Publish_Call

type MockEndpoint_Subscribe_Call

type MockEndpoint_Subscribe_Call struct {
	*mock.Call
}

MockEndpoint_Subscribe_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Subscribe'

func (*MockEndpoint_Subscribe_Call) Return

func (*MockEndpoint_Subscribe_Call) Run

func (_c *MockEndpoint_Subscribe_Call) Run(run func(route Route, opt ...SubOpt)) *MockEndpoint_Subscribe_Call

func (*MockEndpoint_Subscribe_Call) RunAndReturn

type MockIntent

type MockIntent struct {
	mock.Mock
}

MockIntent is an autogenerated mock type for the Intent type

func NewMockIntent

func NewMockIntent(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockIntent

NewMockIntent creates a new instance of MockIntent. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockIntent) Close

func (_m *MockIntent) Close() error

Close provides a mock function with given fields:

func (*MockIntent) EXPECT

func (_m *MockIntent) EXPECT() *MockIntent_Expecter

func (*MockIntent) Interest

func (_m *MockIntent) Interest() <-chan Route

Interest provides a mock function with given fields:

func (*MockIntent) OnClose

func (_m *MockIntent) OnClose(_a0 func()) Intent

OnClose provides a mock function with given fields: _a0

func (*MockIntent) Route

func (_m *MockIntent) Route() Route

Route provides a mock function with given fields:

func (*MockIntent) Send

Send provides a mock function with given fields: _a0, _a1

type MockIntentInternal

type MockIntentInternal struct {
	mock.Mock
}

MockIntentInternal is an autogenerated mock type for the IntentInternal type

func NewMockIntentInternal

func NewMockIntentInternal(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockIntentInternal

NewMockIntentInternal creates a new instance of MockIntentInternal. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockIntentInternal) Close

func (_m *MockIntentInternal) Close() error

Close provides a mock function with given fields:

func (*MockIntentInternal) Ctx

Ctx provides a mock function with given fields:

func (*MockIntentInternal) EXPECT

func (*MockIntentInternal) Interest

func (_m *MockIntentInternal) Interest() <-chan Route

Interest provides a mock function with given fields:

func (_m *MockIntentInternal) Link(_a0 chan<- protoreflect.ProtoMessage)

Link provides a mock function with given fields: _a0

func (*MockIntentInternal) Notify

func (_m *MockIntentInternal) Notify()

Notify provides a mock function with given fields:

func (*MockIntentInternal) OnClose

func (_m *MockIntentInternal) OnClose(_a0 func()) Intent

OnClose provides a mock function with given fields: _a0

func (*MockIntentInternal) Route

func (_m *MockIntentInternal) Route() Route

Route provides a mock function with given fields:

func (*MockIntentInternal) Send

Send provides a mock function with given fields: _a0, _a1

type MockIntentInternal_Close_Call

type MockIntentInternal_Close_Call struct {
	*mock.Call
}

MockIntentInternal_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'

func (*MockIntentInternal_Close_Call) Return

func (*MockIntentInternal_Close_Call) Run

func (*MockIntentInternal_Close_Call) RunAndReturn

type MockIntentInternal_Ctx_Call

type MockIntentInternal_Ctx_Call struct {
	*mock.Call
}

MockIntentInternal_Ctx_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Ctx'

func (*MockIntentInternal_Ctx_Call) Return

func (*MockIntentInternal_Ctx_Call) Run

func (*MockIntentInternal_Ctx_Call) RunAndReturn

type MockIntentInternal_Expecter

type MockIntentInternal_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockIntentInternal_Expecter) Close

Close is a helper method to define mock.On call

func (*MockIntentInternal_Expecter) Ctx

Ctx is a helper method to define mock.On call

func (*MockIntentInternal_Expecter) Interest

Interest is a helper method to define mock.On call

func (_e *MockIntentInternal_Expecter) Link(_a0 interface{}) *MockIntentInternal_Link_Call

Link is a helper method to define mock.On call

  • _a0 chan<- protoreflect.ProtoMessage

func (*MockIntentInternal_Expecter) Notify

Notify is a helper method to define mock.On call

func (*MockIntentInternal_Expecter) OnClose

func (_e *MockIntentInternal_Expecter) OnClose(_a0 interface{}) *MockIntentInternal_OnClose_Call

OnClose is a helper method to define mock.On call

  • _a0 func()

func (*MockIntentInternal_Expecter) Route

Route is a helper method to define mock.On call

func (*MockIntentInternal_Expecter) Send

func (_e *MockIntentInternal_Expecter) Send(_a0 interface{}, _a1 interface{}) *MockIntentInternal_Send_Call

Send is a helper method to define mock.On call

  • _a0 context.Context
  • _a1 protoreflect.ProtoMessage

type MockIntentInternal_Interest_Call

type MockIntentInternal_Interest_Call struct {
	*mock.Call
}

MockIntentInternal_Interest_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Interest'

func (*MockIntentInternal_Interest_Call) Return

func (*MockIntentInternal_Interest_Call) Run

func (*MockIntentInternal_Interest_Call) RunAndReturn

func (_c *MockIntentInternal_Interest_Call) RunAndReturn(run func() <-chan Route) *MockIntentInternal_Interest_Call
type MockIntentInternal_Link_Call struct {
	*mock.Call
}

MockIntentInternal_Link_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Link'

type MockIntentInternal_Notify_Call

type MockIntentInternal_Notify_Call struct {
	*mock.Call
}

MockIntentInternal_Notify_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Notify'

func (*MockIntentInternal_Notify_Call) Return

func (*MockIntentInternal_Notify_Call) Run

func (*MockIntentInternal_Notify_Call) RunAndReturn

func (_c *MockIntentInternal_Notify_Call) RunAndReturn(run func()) *MockIntentInternal_Notify_Call

type MockIntentInternal_OnClose_Call

type MockIntentInternal_OnClose_Call struct {
	*mock.Call
}

MockIntentInternal_OnClose_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OnClose'

func (*MockIntentInternal_OnClose_Call) Return

func (*MockIntentInternal_OnClose_Call) Run

func (*MockIntentInternal_OnClose_Call) RunAndReturn

func (_c *MockIntentInternal_OnClose_Call) RunAndReturn(run func(func()) Intent) *MockIntentInternal_OnClose_Call

type MockIntentInternal_Route_Call

type MockIntentInternal_Route_Call struct {
	*mock.Call
}

MockIntentInternal_Route_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Route'

func (*MockIntentInternal_Route_Call) Return

func (*MockIntentInternal_Route_Call) Run

func (*MockIntentInternal_Route_Call) RunAndReturn

type MockIntentInternal_Send_Call

type MockIntentInternal_Send_Call struct {
	*mock.Call
}

MockIntentInternal_Send_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Send'

func (*MockIntentInternal_Send_Call) Return

func (*MockIntentInternal_Send_Call) Run

func (*MockIntentInternal_Send_Call) RunAndReturn

type MockIntent_Close_Call

type MockIntent_Close_Call struct {
	*mock.Call
}

MockIntent_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'

func (*MockIntent_Close_Call) Return

func (*MockIntent_Close_Call) Run

func (_c *MockIntent_Close_Call) Run(run func()) *MockIntent_Close_Call

func (*MockIntent_Close_Call) RunAndReturn

func (_c *MockIntent_Close_Call) RunAndReturn(run func() error) *MockIntent_Close_Call

type MockIntent_Expecter

type MockIntent_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockIntent_Expecter) Close

Close is a helper method to define mock.On call

func (*MockIntent_Expecter) Interest

Interest is a helper method to define mock.On call

func (*MockIntent_Expecter) OnClose

func (_e *MockIntent_Expecter) OnClose(_a0 interface{}) *MockIntent_OnClose_Call

OnClose is a helper method to define mock.On call

  • _a0 func()

func (*MockIntent_Expecter) Route

Route is a helper method to define mock.On call

func (*MockIntent_Expecter) Send

func (_e *MockIntent_Expecter) Send(_a0 interface{}, _a1 interface{}) *MockIntent_Send_Call

Send is a helper method to define mock.On call

  • _a0 context.Context
  • _a1 protoreflect.ProtoMessage

type MockIntent_Interest_Call

type MockIntent_Interest_Call struct {
	*mock.Call
}

MockIntent_Interest_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Interest'

func (*MockIntent_Interest_Call) Return

func (*MockIntent_Interest_Call) Run

func (*MockIntent_Interest_Call) RunAndReturn

func (_c *MockIntent_Interest_Call) RunAndReturn(run func() <-chan Route) *MockIntent_Interest_Call

type MockIntent_OnClose_Call

type MockIntent_OnClose_Call struct {
	*mock.Call
}

MockIntent_OnClose_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OnClose'

func (*MockIntent_OnClose_Call) Return

func (*MockIntent_OnClose_Call) Run

func (_c *MockIntent_OnClose_Call) Run(run func(_a0 func())) *MockIntent_OnClose_Call

func (*MockIntent_OnClose_Call) RunAndReturn

func (_c *MockIntent_OnClose_Call) RunAndReturn(run func(func()) Intent) *MockIntent_OnClose_Call

type MockIntent_Route_Call

type MockIntent_Route_Call struct {
	*mock.Call
}

MockIntent_Route_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Route'

func (*MockIntent_Route_Call) Return

func (*MockIntent_Route_Call) Run

func (_c *MockIntent_Route_Call) Run(run func()) *MockIntent_Route_Call

func (*MockIntent_Route_Call) RunAndReturn

func (_c *MockIntent_Route_Call) RunAndReturn(run func() Route) *MockIntent_Route_Call

type MockIntent_Send_Call

type MockIntent_Send_Call struct {
	*mock.Call
}

MockIntent_Send_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Send'

func (*MockIntent_Send_Call) Return

func (*MockIntent_Send_Call) Run

func (*MockIntent_Send_Call) RunAndReturn

type MockInterest

type MockInterest struct {
	mock.Mock
}

MockInterest is an autogenerated mock type for the Interest type

func NewMockInterest

func NewMockInterest(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockInterest

NewMockInterest creates a new instance of MockInterest. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockInterest) C

func (_m *MockInterest) C() <-chan protoreflect.ProtoMessage

C provides a mock function with given fields:

func (*MockInterest) Close

func (_m *MockInterest) Close() error

Close provides a mock function with given fields:

func (*MockInterest) EXPECT

func (_m *MockInterest) EXPECT() *MockInterest_Expecter

func (*MockInterest) OnClose

func (_m *MockInterest) OnClose(_a0 func()) Interest

OnClose provides a mock function with given fields: _a0

func (*MockInterest) Route

func (_m *MockInterest) Route() Route

Route provides a mock function with given fields:

type MockInterestInternal

type MockInterestInternal struct {
	mock.Mock
}

MockInterestInternal is an autogenerated mock type for the InterestInternal type

func NewMockInterestInternal

func NewMockInterestInternal(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockInterestInternal

NewMockInterestInternal creates a new instance of MockInterestInternal. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockInterestInternal) C

C provides a mock function with given fields:

func (*MockInterestInternal) Close

func (_m *MockInterestInternal) Close() error

Close provides a mock function with given fields:

func (*MockInterestInternal) Ctx

Ctx provides a mock function with given fields:

func (*MockInterestInternal) EXPECT

func (*MockInterestInternal) MsgC

MsgC provides a mock function with given fields:

func (*MockInterestInternal) OnClose

func (_m *MockInterestInternal) OnClose(_a0 func()) Interest

OnClose provides a mock function with given fields: _a0

func (*MockInterestInternal) Route

func (_m *MockInterestInternal) Route() Route

Route provides a mock function with given fields:

type MockInterestInternal_C_Call

type MockInterestInternal_C_Call struct {
	*mock.Call
}

MockInterestInternal_C_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'C'

func (*MockInterestInternal_C_Call) Return

func (*MockInterestInternal_C_Call) Run

func (*MockInterestInternal_C_Call) RunAndReturn

type MockInterestInternal_Close_Call

type MockInterestInternal_Close_Call struct {
	*mock.Call
}

MockInterestInternal_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'

func (*MockInterestInternal_Close_Call) Return

func (*MockInterestInternal_Close_Call) Run

func (*MockInterestInternal_Close_Call) RunAndReturn

type MockInterestInternal_Ctx_Call

type MockInterestInternal_Ctx_Call struct {
	*mock.Call
}

MockInterestInternal_Ctx_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Ctx'

func (*MockInterestInternal_Ctx_Call) Return

func (*MockInterestInternal_Ctx_Call) Run

func (*MockInterestInternal_Ctx_Call) RunAndReturn

type MockInterestInternal_Expecter

type MockInterestInternal_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockInterestInternal_Expecter) C

C is a helper method to define mock.On call

func (*MockInterestInternal_Expecter) Close

Close is a helper method to define mock.On call

func (*MockInterestInternal_Expecter) Ctx

Ctx is a helper method to define mock.On call

func (*MockInterestInternal_Expecter) MsgC

MsgC is a helper method to define mock.On call

func (*MockInterestInternal_Expecter) OnClose

OnClose is a helper method to define mock.On call

  • _a0 func()

func (*MockInterestInternal_Expecter) Route

Route is a helper method to define mock.On call

type MockInterestInternal_MsgC_Call

type MockInterestInternal_MsgC_Call struct {
	*mock.Call
}

MockInterestInternal_MsgC_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MsgC'

func (*MockInterestInternal_MsgC_Call) Return

func (*MockInterestInternal_MsgC_Call) Run

func (*MockInterestInternal_MsgC_Call) RunAndReturn

type MockInterestInternal_OnClose_Call

type MockInterestInternal_OnClose_Call struct {
	*mock.Call
}

MockInterestInternal_OnClose_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OnClose'

func (*MockInterestInternal_OnClose_Call) Return

func (*MockInterestInternal_OnClose_Call) Run

func (*MockInterestInternal_OnClose_Call) RunAndReturn

type MockInterestInternal_Route_Call

type MockInterestInternal_Route_Call struct {
	*mock.Call
}

MockInterestInternal_Route_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Route'

func (*MockInterestInternal_Route_Call) Return

func (*MockInterestInternal_Route_Call) Run

func (*MockInterestInternal_Route_Call) RunAndReturn

type MockInterest_C_Call

type MockInterest_C_Call struct {
	*mock.Call
}

MockInterest_C_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'C'

func (*MockInterest_C_Call) Return

func (*MockInterest_C_Call) Run

func (_c *MockInterest_C_Call) Run(run func()) *MockInterest_C_Call

func (*MockInterest_C_Call) RunAndReturn

func (_c *MockInterest_C_Call) RunAndReturn(run func() <-chan protoreflect.ProtoMessage) *MockInterest_C_Call

type MockInterest_Close_Call

type MockInterest_Close_Call struct {
	*mock.Call
}

MockInterest_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'

func (*MockInterest_Close_Call) Return

func (*MockInterest_Close_Call) Run

func (_c *MockInterest_Close_Call) Run(run func()) *MockInterest_Close_Call

func (*MockInterest_Close_Call) RunAndReturn

func (_c *MockInterest_Close_Call) RunAndReturn(run func() error) *MockInterest_Close_Call

type MockInterest_Expecter

type MockInterest_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockInterest_Expecter) C

C is a helper method to define mock.On call

func (*MockInterest_Expecter) Close

Close is a helper method to define mock.On call

func (*MockInterest_Expecter) OnClose

func (_e *MockInterest_Expecter) OnClose(_a0 interface{}) *MockInterest_OnClose_Call

OnClose is a helper method to define mock.On call

  • _a0 func()

func (*MockInterest_Expecter) Route

Route is a helper method to define mock.On call

type MockInterest_OnClose_Call

type MockInterest_OnClose_Call struct {
	*mock.Call
}

MockInterest_OnClose_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OnClose'

func (*MockInterest_OnClose_Call) Return

func (*MockInterest_OnClose_Call) Run

func (_c *MockInterest_OnClose_Call) Run(run func(_a0 func())) *MockInterest_OnClose_Call

func (*MockInterest_OnClose_Call) RunAndReturn

func (_c *MockInterest_OnClose_Call) RunAndReturn(run func(func()) Interest) *MockInterest_OnClose_Call

type MockInterest_Route_Call

type MockInterest_Route_Call struct {
	*mock.Call
}

MockInterest_Route_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Route'

func (*MockInterest_Route_Call) Return

func (*MockInterest_Route_Call) Run

func (_c *MockInterest_Route_Call) Run(run func()) *MockInterest_Route_Call

func (*MockInterest_Route_Call) RunAndReturn

func (_c *MockInterest_Route_Call) RunAndReturn(run func() Route) *MockInterest_Route_Call

type Option

type Option func(*Options) error

func WithContext

func WithContext(ctx context.Context) Option

WithContext option configures context.

func WithEndpoint

func WithEndpoint(t Endpoint) Option

WithEndpoint adds an endpoint to the collection.

func WithEndpoints

func WithEndpoints(t ...Endpoint) Option

WithEndpoints sets the collection to specified endpoints.

func WithLogger

func WithLogger(l *slog.Logger) Option

WithLogger option configures Logger.

func WithQueueSize

func WithQueueSize(size int) Option

WithQueueSize configures the size of send/receive/notification chan sizes.

type Options

type Options struct {
	// contains filtered or unexported fields
}

func (*Options) Config

func (o *Options) Config(opts ...Option) error

type Peer

type Peer interface {
	String() string
	Values() url.Values
	Address() string
	Path() string
	Scheme() string
	// Equal compares this Peer to another Peer interface to determine if they represent
	// the same peer.
	Equal(v Peer) bool
	// HasPrefix compares this Peer path to Route Path.
	HasPrefix(r Route) bool
}

Peer defines an interface for network peer entities, encapsulating methods that provide details about network connection points such as URL components and query parameters.

Peer is identified URI such as [schema]://[address]/path[?params&...].

type PeerImpl

type PeerImpl struct {
	// contains filtered or unexported fields
}

func NewPeer

func NewPeer(scheme, address, path string, args url.Values) (PeerImpl, error)

NewPeer constructs a new PeerImpl object given its components: scheme, address, path, and arguments (query parameters). It initializes the PeerImpl with these components.

func PeerFromString

func PeerFromString(peer string) (PeerImpl, error)

PeerFromString parses a string containing a URL into a Peer object. It extracts the scheme, host, path, and query parameters from the string.

func (PeerImpl) Address

func (p PeerImpl) Address() string

func (PeerImpl) Equal

func (p PeerImpl) Equal(v Peer) bool

func (PeerImpl) HasPrefix

func (p PeerImpl) HasPrefix(r Route) bool

func (PeerImpl) Path

func (p PeerImpl) Path() string

func (PeerImpl) Scheme

func (p PeerImpl) Scheme() string

func (PeerImpl) String

func (r PeerImpl) String() string

func (PeerImpl) Values

func (p PeerImpl) Values() url.Values

type PlainRoute

type PlainRoute struct {
	// contains filtered or unexported fields
}

func EmptyRoute

func EmptyRoute() PlainRoute

EmptyRoute creates an empty route that is useful in tests and also in remote endpoint.

func NewRoute

func NewRoute(path string, msg proto.Message) (PlainRoute, error)

NewRoute creates a new Plain text Route instance given a path and a proto.Message. The route's identifier is formed by concatenating the provided path with the name of the message's type, separated by an "@" symbol.

Path must not contain `@` nor `#` symbols.

func (PlainRoute) Equal

func (r PlainRoute) Equal(route Route) bool

func (PlainRoute) ID

func (r PlainRoute) ID() string

func (PlainRoute) Path

func (r PlainRoute) Path() string

func (PlainRoute) String

func (r PlainRoute) String() string

func (PlainRoute) Type

func (r PlainRoute) Type() reflect.Type

type PubOpt

type PubOpt func(*PubOptStruct) error

type PubOptStruct

type PubOptStruct struct {
}

Publishing options

type RemoteEndpoint

type RemoteEndpoint interface {
	// Local returns the name of the local peer
	Local() Peer
	// Remote returns the name of the remote peer
	Remote() Peer
}

RemoteEndpoint extends Endpoint with methods to retrieve local and remote peer information.

type RemoteIntent

type RemoteIntent interface {
	Intent
	Peer() Peer
}

RemoteIntent extends Intent with a method to retrieve associated peer information.

type RemoteInterest

type RemoteInterest interface {
	Interest
	Peer() Peer
}

RemoteInterest extends Interest with the ability to retrieve the peer involved in the interest.

type RemoteRoute

type RemoteRoute interface {
	// ID returns the unique identifier of the route, which combines the path
	// and the name of the protobuf message type.
	ID() string
	// Path returns the path component of the route.
	Path() string

	// Equal checks if two Route instances represent the same route.
	// It returns true if both routes have the same route identifier.
	Equal(Route) bool
	// String returns the route as a string, which is the unique identifier
	// combining the path with the protobuf message type name.
	String() string
}

RemoteRoute describes a named and typed data route. It encapsulates a routing mechanism by combining the path of the route with the protobuf message type, facilitating the identification and handling of different data types across a distributed system. Remote Route is a subset of Route that does not implement Type() method.

type Route

type Route interface {
	RemoteRoute
	// Type returns the reflect.Type of the proto.Message associated with the route,
	// allowing type introspection and dynamic handling of message types.
	//
	// Type() returns nil for routes received from remote endpoints.
	Type() reflect.Type
}

Route describes a named and typed data route. It encapsulates a routing mechanism by combining the path of the route with the protobuf message type, facilitating the identification and handling of different data types across a distributed system.

  • Routes can be plain and hashed. Plain routes contain the path and type description.
  • Remote routes may not contain the type so that Type method returns nil or the interface is not implemented at all.
  • Hashed routes are (some-hash) of the path and type. This way it is possible to hide types of the messages. Hashing in this case allows for Object-capability security model where only those who know exact path and exact type can send and decode received message.

func RouteFromString

func RouteFromString(route string) (Route, error)

RouteFromString creates a Route from a string representation, assuming the string is a valid route identifier.

  • Plain Route `Type@Path`
  • Hashed Ruote `prefix#[Base64 Hash]`

type Router

type Router struct {
	Base
	// contains filtered or unexported fields
}

func New

func New(opts ...Option) (*Router, error)

func (*Router) Publish

func (d *Router) Publish(path string, msg proto.Message, opt ...PubOpt) (Intent, error)

Publish delivers data to an interested party. It may advertise the availability of the data if no interest is found.

func (*Router) Subscribe

func (d *Router) Subscribe(path string, msg proto.Message, opt ...SubOpt) (Interest, error)

Subscribe advertises an interest in a specific message type on particular path.

type SubOpt

type SubOpt func(*SubOptStruct) error

type SubOptStruct

type SubOptStruct struct {
}

Subscription options

Directories

Path Synopsis
endpoint
direct
Package direct provides a concrete implementation of the dndm.Endpoint interface, facilitating direct communications by managing links between intents and interests.
Package direct provides a concrete implementation of the dndm.Endpoint interface, facilitating direct communications by managing links between intents and interests.
bench command
bus_direct command
bus_module command
bus_request command
direct command
embedded/host command
node command
net
types
p2p
x
bus

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL