relay

package
v0.35.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 22, 2026 License: MIT Imports: 12 Imported by: 0

README

Bot-to-Bot Relay (internal/relay)

The relay package enables secure, cross-platform communication between AI agents. It allows one HotPlex agent to send messages to another agent, even if they are on different chat platforms (e.g., Slack to DingTalk).

Overview

Relay uses Bindings to map agent names to target URLs. It includes fault tolerance via circuit breakers and persistent routing configuration.

Core Components

  • RelayManager: Orchestrates message delivery and routing lookups.
  • BindingStore: Manages persistent storage of relay bindings.
  • RelaySender: Handles authenticated HTTP delivery of relay messages.
  • RelayCircuitBreaker: Provides fault isolation and prevents cascading failures.

Interaction Pattern

  1. Lookup: RelayManager finds the target URL associated with an agent name.
  2. Circuit Check: Verifies the target agent is healthy.
  3. Delivery: RelaySender POSTs a RelayMessage to the target.
  4. Tracking: Each relay is assigned a unique TaskID for async tracking.

Usage

// Add a binding
binding := &relay.RelayBinding{
    Platform: "slack",
    ChatID:   "C12345",
    Bots: map[string]string{
        "security-bot": "http://other-hotplex:8080/relay",
    },
}
manager.AddBinding(binding)

// Send a relay
resp, err := manager.Send(ctx, "security-bot", "Analyze this session: s_abc123")

CLI Interface

Manage relay bindings via hotplexd relay:

  • hotplexd relay add_binding: Map a local chat ID to remote bot URLs.
  • hotplexd relay list_bindings: View all active relay routes.
  • hotplexd relay test_relay: Send a test message to a remote agent.

Documentation

Overview

Package relay provides bot-to-bot communication across HotPlex instances.

Index

Constants

View Source
const (
	TaskStatusWorking   = "working"
	TaskStatusCompleted = "completed"
	TaskStatusFailed    = "failed"
	TaskStatusCanceled  = "canceled"
)

Task status constants.

Variables

This section is empty.

Functions

This section is empty.

Types

type BindingStore

type BindingStore struct {
	// contains filtered or unexported fields
}

BindingStore persists RelayBindings to disk using atomic writes.

func NewBindingStore

func NewBindingStore(dataDir string) *BindingStore

NewBindingStore loads or creates a BindingStore at the default path.

func (*BindingStore) Add

func (bs *BindingStore) Add(binding *RelayBinding) error

Add creates or updates a binding, keyed by ChatID.

func (*BindingStore) Delete

func (bs *BindingStore) Delete(chatID string) error

Delete removes a binding by ChatID.

func (*BindingStore) List

func (bs *BindingStore) List() []*RelayBinding

List returns a snapshot of all bindings.

type RelayBinding

type RelayBinding struct {
	Platform string            `json:"platform"`
	ChatID   string            `json:"chat_id"`
	Bots     map[string]string `json:"bots"`
}

RelayBinding binds a platform+chatID to a set of bot instances.

type RelayCircuitBreaker

type RelayCircuitBreaker struct {
	// contains filtered or unexported fields
}

RelayCircuitBreaker manages per-destination circuit breakers for relay requests. Each unique target (identified by name, typically the bot or instance name) gets its own circuit breaker that prevents cascading failures.

func NewRelayCircuitBreaker

func NewRelayCircuitBreaker() *RelayCircuitBreaker

NewRelayCircuitBreaker creates a new RelayCircuitBreaker with sensible defaults: - MaxRequests: 1 (half-open allows 1 probe request) - Interval: 0 (no periodic reset; only trip-based reset) - Timeout: 30s (how long the breaker stays open before transitioning to half-open) - ReadyToTrip: opens after 5 consecutive failures

func (*RelayCircuitBreaker) Call

func (rcb *RelayCircuitBreaker) Call(ctx context.Context, name string, fn func() (any, error)) (any, error)

Call executes fn through the circuit breaker identified by name. Returns gobreaker.ErrOpenState if the circuit is open. The state change (closed → open → half-open → closed) is managed by gobreaker.

func (*RelayCircuitBreaker) Get

Get returns the circuit breaker for the given name, creating one if it doesn't exist. Thread-safe: uses double-checked locking.

type RelayManager

type RelayManager struct {
	// contains filtered or unexported fields
}

RelayManager manages relay bindings and orchestrates message delivery.

func NewRelayManager

func NewRelayManager(sender *RelaySender) *RelayManager

NewRelayManager creates a new RelayManager backed by a BindingStore.

func (*RelayManager) AddBinding

func (rm *RelayManager) AddBinding(binding *RelayBinding) error

AddBinding registers a new relay binding and persists it to disk.

func (*RelayManager) ListBindings

func (rm *RelayManager) ListBindings() []*RelayBinding

ListBindings returns a snapshot of all registered bindings.

func (*RelayManager) RemoveBinding

func (rm *RelayManager) RemoveBinding(platform, chatID string) error

RemoveBinding removes a relay binding by platform and chatID.

func (*RelayManager) Send

func (rm *RelayManager) Send(ctx context.Context, toAgent, content string) (*RelayResponse, error)

Send delivers a relay message to a target agent identified by toAgent. It uses the circuit breaker for fault tolerance and the sender for HTTP delivery.

type RelayMessage

type RelayMessage struct {
	TaskID     string    `json:"task_id,omitempty"`
	From       string    `json:"from,omitempty"`
	To         string    `json:"to,omitempty"`
	Content    string    `json:"content,omitempty"`
	SessionKey string    `json:"session_key,omitempty"`
	Metadata   string    `json:"metadata,omitempty"`
	Status     string    `json:"status,omitempty"`
	Response   string    `json:"response,omitempty"`
	Error      string    `json:"error,omitempty"`
	CreatedAt  time.Time `json:"created_at,omitempty"`
}

RelayMessage extends bridgewire.WireMessage with relay-specific fields. All fields are omitempty to maintain backward compatibility with existing WireMessage serialization.

type RelayResponse

type RelayResponse struct {
	TaskID    string        `json:"task_id"`
	Status    string        `json:"status"`
	Content   string        `json:"content,omitempty"`
	Timestamp time.Time     `json:"timestamp"`
	Error     string        `json:"error,omitempty"`
	Duration  time.Duration `json:"duration"`
}

RelayResponse is the result of a relay send operation.

type RelaySender

type RelaySender struct {
	// contains filtered or unexported fields
}

RelaySender sends relay messages to remote HotPlex instances.

func NewRelaySender

func NewRelaySender(token string) *RelaySender

NewRelaySender creates a new RelaySender with the given auth token.

func (*RelaySender) Send

func (s *RelaySender) Send(ctx context.Context, msg *RelayMessage, targetURL string) error

Send delivers a RelayMessage to a target HotPlex instance via HTTP POST. It retries up to 3 times with exponential backoff (1s -> 2s -> 4s).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL