slim

package
v1.0.0-alpha.39 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 13, 2026 License: MIT Imports: 12 Imported by: 0

README

SLIM Bridge Input Component

Bidirectional bridge between AGNTCY's SLIM (Secure Low-Latency Interactive Messaging) network and SemStreams' internal NATS infrastructure, enabling cross-organizational agent communication.

Overview

The SLIM bridge enables SemStreams agents to participate in the Internet of Agents ecosystem by connecting to SLIM groups and translating messages between SLIM's encrypted messaging protocol and SemStreams' internal NATS message format. SLIM provides end-to-end encrypted group messaging using the MLS (Messaging Layer Security) protocol with quantum-safe security options.

Key Capabilities:

  • Cross-Organizational Messaging: Secure agent communication across organizational boundaries
  • MLS Encryption: End-to-end encryption with forward secrecy through key ratcheting
  • DID-Based Identity: Decentralized identifiers for cryptographic agent verification
  • Protocol Translation: Seamless mapping between SLIM messages and SemStreams message types
  • Session Management: Automatic MLS session lifecycle management and key ratcheting

Architecture

flowchart TD
    subgraph AGNTCY["AGNTCY Network"]
        SLIM[SLIM Groups<br/>MLS Encrypted]
        DIR[Agent Directory]
    end

    subgraph Bridge["SLIM Bridge Component"]
        SM[Session Manager<br/>MLS Lifecycle]
        MM[Message Mapper<br/>Protocol Translation]
        CLIENT[SLIM Client<br/>gRPC Connection]
    end

    subgraph SemStreams["SemStreams Infrastructure"]
        NATS[NATS JetStream]
        DISPATCH[Agent Dispatch]
        LOOP[Agentic Loop]
    end

    SLIM <-->|gRPC/MLS| CLIENT
    CLIENT --> SM
    SM --> MM
    MM -->|user.message.slim.*| NATS
    MM -->|agent.task.slim.*| NATS
    NATS --> DISPATCH
    DISPATCH --> LOOP
    LOOP -->|agent.complete.*| NATS
    NATS --> MM
    MM --> CLIENT
    CLIENT -->|Responses| SLIM

    DIR -.->|Discovery| SLIM

    style Bridge fill:#e1f5ff
    style AGNTCY fill:#fff4e1
    style SemStreams fill:#f0f0f0

Configuration

Basic Configuration
{
  "slim_endpoint": "wss://slim.agntcy.dev",
  "group_ids": ["did:agntcy:group:tenant-123"],
  "key_ratchet_interval": "1h",
  "reconnect_interval": "5s",
  "max_reconnect_attempts": 10,
  "identity_provider": "local"
}
Configuration Fields
Field Type Default Description
slim_endpoint string - SLIM service WebSocket or gRPC endpoint URL
group_ids []string [] SLIM group DIDs to join on startup
key_ratchet_interval duration 1h How often to ratchet MLS keys for forward secrecy
reconnect_interval duration 5s Delay between reconnection attempts
max_reconnect_attempts int 10 Maximum reconnection attempts before failure
message_buffer_size int 1000 Internal message buffer size for async processing
identity_provider string local Identity provider for DID resolution
Port Configuration

The component defines input and output ports for NATS connectivity:

Input Ports:

  • slim_messages - Receives control messages on slim.message.> (optional)

Output Ports:

  • user_messages - Publishes to user.message.slim.> on USER_MESSAGES stream (required)
  • task_delegations - Publishes to agent.task.slim.> on AGENT_TASKS stream (optional)
Example Full Configuration
{
  "ports": {
    "inputs": [
      {
        "name": "slim_messages",
        "subject": "slim.message.>",
        "type": "nats",
        "required": false,
        "description": "Messages from SLIM groups"
      }
    ],
    "outputs": [
      {
        "name": "user_messages",
        "subject": "user.message.slim.>",
        "type": "jetstream",
        "stream_name": "USER_MESSAGES",
        "required": true,
        "description": "User messages to agent dispatch"
      },
      {
        "name": "task_delegations",
        "subject": "agent.task.slim.>",
        "type": "jetstream",
        "stream_name": "AGENT_TASKS",
        "required": false,
        "description": "Task delegations from external agents"
      }
    ]
  },
  "slim_endpoint": "wss://slim.agntcy.dev",
  "group_ids": [
    "did:agntcy:group:org-platform",
    "did:agntcy:group:cross-team-collab"
  ],
  "key_ratchet_interval": "30m",
  "reconnect_interval": "10s",
  "max_reconnect_attempts": 5,
  "message_buffer_size": 500,
  "identity_provider": "local"
}

Session Management

The SLIM bridge manages MLS group sessions with automatic lifecycle handling and periodic key ratcheting for forward secrecy.

Session States
stateDiagram-v2
    [*] --> Joining: Connect to Group
    Joining --> Active: MLS Handshake Complete
    Active --> Rekeying: Ratchet Interval
    Rekeying --> Active: Keys Updated
    Active --> Leaving: Stop Signal
    Leaving --> [*]: Disconnected
    Active --> Error: Connection Lost
    Error --> Joining: Reconnect
    Error --> [*]: Max Retries
State Description
joining Establishing MLS session with group
active Session ready for sending/receiving messages
rekeying Performing MLS key ratcheting
leaving Gracefully terminating session
error Session error, attempting recovery
MLS Key Ratcheting

The session manager automatically ratchets MLS keys at the configured interval to maintain forward secrecy. This ensures that compromise of current keys does not expose past messages.

Key Ratchet Process:

  1. Session transitions to rekeying state
  2. SLIM client generates new epoch keys
  3. New keys distributed to group members via MLS protocol
  4. Session returns to active state
  5. Old keys are discarded

Configuration: Set key_ratchet_interval to balance security (shorter interval) vs. performance (longer interval). Default is 1h.

Message Mapping

The message mapper translates between SLIM message formats and SemStreams message types.

Inbound Message Flow
flowchart LR
    SLIM[SLIM Message<br/>MLS Encrypted] -->|Decrypt| MSG[Decrypted Content]
    MSG -->|Parse Type| TYPE{Message Type}
    TYPE -->|user| USER[UserMessage]
    TYPE -->|task| TASK[TaskMessage]
    TYPE -->|response| RESP[Ignore/Log]
    USER -->|Publish| NATS1[user.message.slim.group-id]
    TASK -->|Publish| NATS2[agent.task.slim.group-id]

    style SLIM fill:#fff4e1
    style NATS1 fill:#e1f5ff
    style NATS2 fill:#e1f5ff
Message Type Detection

SLIM messages contain a type field that determines routing:

{
  "type": "user",
  "content": "Hello, agent!",
  "attachments": [],
  "metadata": {}
}
SLIM Type SemStreams Type NATS Subject Pattern Description
user or plain text agentic.UserMessage user.message.slim.{group_id} User chat messages
task agentic.TaskMessage agent.task.slim.{group_id} Task delegations from external agents
response - - Outbound only, logged if received
task_result - - Outbound only, logged if received
Outbound Message Flow
flowchart LR
    NATS[agent.complete.*] -->|Subscribe| RESP[Agent Response]
    RESP -->|Map| SLIM_RESP[SLIM Response]
    SLIM_RESP -->|Encrypt| MLS[MLS Encrypted]
    MLS -->|Send| GROUP[SLIM Group]

    style NATS fill:#e1f5ff
    style GROUP fill:#fff4e1
UserMessage Mapping

SLIM → SemStreams:

{
  "type": "user",
  "content": "What's the status?",
  "attachments": [
    {
      "name": "report.pdf",
      "mime_type": "application/pdf",
      "data": "base64...",
      "size": 12345
    }
  ],
  "reply_to": "msg-123",
  "thread_id": "thread-456"
}

Maps to:

agentic.UserMessage{
    MessageID:   "generated-uuid",
    Content:     "What's the status?",
    ChannelType: "slim",
    ChannelID:   "did:agntcy:group:tenant-123",
    UserID:      "did:agntcy:agent:sender",
    Timestamp:   time.Now(),
    Attachments: []agentic.Attachment{...},
    Metadata: map[string]string{
        "slim_group_id": "did:agntcy:group:tenant-123",
        "slim_sender_did": "did:agntcy:agent:sender",
        "slim_reply_to": "msg-123",
        "slim_thread_id": "thread-456",
    },
}
TaskMessage Mapping

SLIM → SemStreams:

{
  "type": "task",
  "task_id": "task-789",
  "prompt": "Analyze this dataset",
  "role": "analyst",
  "model": "claude-3-5-sonnet-20241022",
  "requesting_agent_did": "did:agntcy:agent:requester",
  "target_capabilities": ["data-analysis", "visualization"],
  "priority": "high",
  "deadline": "2026-02-13T12:00:00Z"
}

Maps to:

agentic.TaskMessage{
    TaskID:      "task-789",
    Prompt:      "Analyze this dataset",
    Role:        "analyst",
    Model:       "claude-3-5-sonnet-20241022",
    ChannelType: "slim",
    ChannelID:   "did:agntcy:group:tenant-123",
    UserID:      "did:agntcy:agent:requester",
}
Response Mapping

SemStreams → SLIM:

agentic.UserResponse{
    Type:      "message",
    InReplyTo: "msg-123",
    Content:   "Status report: all systems operational",
}

Maps to SLIM response message:

{
  "type": "response",
  "in_reply_to": "msg-123",
  "status": "success",
  "content": "Status report: all systems operational"
}

NATS Topology

The SLIM bridge integrates with SemStreams' NATS subject hierarchy.

Subject Patterns
Inbound (SLIM → NATS):
  user.message.slim.<group_id>     - User messages from SLIM groups
  agent.task.slim.<group_id>       - Task delegations from external agents

Outbound (NATS → SLIM):
  agent.complete.*                 - Agent completion events (monitored for responses)
Stream Configuration

The component requires these JetStream streams to be configured:

USER_MESSAGES Stream:

Name: USER_MESSAGES
Subjects: user.message.>
Storage: File
Retention: Interest

AGENT_TASKS Stream:

Name: AGENT_TASKS
Subjects: agent.task.>
Storage: File
Retention: Interest
Group ID Sanitization

SLIM group IDs (DIDs) contain characters invalid in NATS subjects (: and .). The bridge sanitizes them:

did:agntcy:group:tenant-123  →  did-agntcy-group-tenant-123
org.platform.group.dev       →  org-platform-group-dev

This ensures valid NATS subject names while preserving uniqueness.

Usage

Registration

Register the component with the component registry:

import (
    "github.com/c360studio/semstreams/input/slim"
    "github.com/c360studio/semstreams/component"
)

func main() {
    registry := component.NewRegistry()

    if err := slim.Register(registry); err != nil {
        log.Fatal(err)
    }
}
Flow Configuration

Include the SLIM bridge in your flow configuration:

components:
  - name: slim-bridge
    type: input
    protocol: slim
    config:
      slim_endpoint: "wss://slim.agntcy.dev"
      group_ids:
        - "did:agntcy:group:production"
      key_ratchet_interval: "1h"
Programmatic Usage
import (
    "context"
    "encoding/json"
    "time"

    "github.com/c360studio/semstreams/input/slim"
    "github.com/c360studio/semstreams/component"
    "github.com/c360studio/semstreams/natsclient"
)

func main() {
    // Create configuration
    cfg := slim.DefaultConfig()
    cfg.SLIMEndpoint = "wss://slim.agntcy.dev"
    cfg.GroupIDs = []string{"did:agntcy:group:my-group"}

    rawConfig, _ := json.Marshal(cfg)

    // Create dependencies
    natsClient := natsclient.NewClient(/* ... */)
    deps := component.Dependencies{
        NATSClient: natsClient,
    }

    // Create component
    comp, err := slim.NewComponent(rawConfig, deps)
    if err != nil {
        log.Fatal(err)
    }

    // Initialize and start
    lifecycle := comp.(component.LifecycleComponent)
    if err := lifecycle.Initialize(); err != nil {
        log.Fatal(err)
    }

    ctx := context.Background()
    if err := lifecycle.Start(ctx); err != nil {
        log.Fatal(err)
    }

    // Run for some time
    time.Sleep(5 * time.Minute)

    // Graceful shutdown
    if err := lifecycle.Stop(10 * time.Second); err != nil {
        log.Warn("Error during shutdown", "error", err)
    }
}
Sending Responses

Send responses back to SLIM groups:

import (
    "context"
    "github.com/c360studio/semstreams/agentic"
)

// Get the component instance
slimComp := comp.(*slim.Component)

// Send a user response
response := &agentic.UserResponse{
    Type:      "message",
    InReplyTo: "original-message-id",
    Content:   "Task completed successfully",
}

err := slimComp.SendResponse(ctx, "did:agntcy:group:my-group", response)
if err != nil {
    log.Error("Failed to send response", "error", err)
}

// Send a task result
result := &slim.TaskResult{
    TaskID:      "task-123",
    Result:      "Analysis complete: 42 anomalies detected",
    CompletedAt: time.Now(),
}

err = slimComp.SendTaskResult(ctx, "did:agntcy:group:my-group", result)
if err != nil {
    log.Error("Failed to send task result", "error", err)
}

Testing

Unit Tests

Run unit tests for the package:

# Run all tests
go test ./input/slim/...

# Run with verbose output
go test -v ./input/slim/...

# Run with race detection
go test -race ./input/slim/...

# Run specific test
go test -run TestComponentInitialize ./input/slim/
Test Coverage
# Generate coverage report
go test -coverprofile=coverage.out ./input/slim/...
go tool cover -html=coverage.out
Integration Testing

The SLIM bridge includes mock implementations for testing without SLIM infrastructure:

import (
    "testing"
    "github.com/c360studio/semstreams/input/slim"
)

func TestSLIMIntegration(t *testing.T) {
    // Use mock SLIM client
    mockClient := slim.NewMockSLIMClient()

    // Configure mock responses
    mockClient.SetMockMessage(&slim.Message{
        GroupID:   "test-group",
        SenderDID: "did:test:sender",
        Content:   []byte(`{"type":"user","content":"test"}`),
    })

    // Create session manager with mock
    manager := slim.NewSessionManager(config, mockClient, logger)

    // Test session lifecycle
    err := manager.Start(ctx)
    if err != nil {
        t.Fatal(err)
    }

    // Verify session state
    session := manager.GetSession("test-group")
    if session.State != slim.SessionStateActive {
        t.Errorf("expected active state, got %s", session.State)
    }
}
Task-Based Testing

Use the project's task runner for standardized testing:

# Run unit tests
task test

# Run integration tests (requires testcontainers)
task test:integration

# Run with race detector
task test:race

# Run linters
task lint

# Run all checks
task check

Implementation Status

Current Status: Infrastructure complete, awaiting AGNTCY SLIM SDK integration.

This package provides the complete infrastructure for SLIM integration. The actual SLIM protocol implementation requires the AGNTCY SLIM SDK, which provides:

  • MLS protocol implementation (Message Layer Security)
  • Group management and membership
  • Key exchange and ratcheting
  • Message encryption/decryption
  • DID-based authentication

The Client interface defines the required operations and can be implemented when the SDK becomes available. Until then, the component uses a stub implementation for development and testing.

Security Considerations

  • End-to-End Encryption: All SLIM messages are encrypted using MLS before leaving the bridge
  • Forward Secrecy: Periodic key ratcheting ensures compromised keys don't expose historical messages
  • DID Authentication: Agent DIDs provide cryptographic identity verification
  • Quantum-Safe Options: SLIM supports quantum-resistant cryptographic algorithms
  • No Plaintext Exposure: Message content never leaves the bridge in plaintext form
  • Group Isolation: Messages are isolated by SLIM group membership

Troubleshooting

Common Issues

Connection Failures:

Error: Failed to connect to SLIM endpoint

Check:

  • slim_endpoint URL is correct and reachable
  • Network connectivity to AGNTCY infrastructure
  • Identity provider configuration is valid

Session Errors:

Error: Session state is error (state: error)

Check:

  • Group ID is valid and accessible
  • Agent has permission to join the group
  • MLS key ratcheting is not failing

Message Delivery Failures:

Error: Failed to publish user message

Check:

  • NATS connection is healthy
  • JetStream streams are configured correctly
  • Subject permissions allow publishing
Debug Logging

Enable debug logging to see detailed SLIM bridge activity:

import "log/slog"

logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
    Level: slog.LevelDebug,
}))

deps := component.Dependencies{
    Logger: logger,
}
Health Monitoring

Check component health status:

comp := comp.(*slim.Component)
health := comp.Health()

fmt.Printf("Healthy: %v\n", health.Healthy)
fmt.Printf("Status: %s\n", health.Status)
fmt.Printf("Errors: %d\n", health.ErrorCount)
fmt.Printf("Uptime: %v\n", health.Uptime)

Check session details:

sessions := comp.GetSessions()
for _, session := range sessions {
    fmt.Printf("Group: %s\n", session.GroupID)
    fmt.Printf("State: %s\n", session.State)
    fmt.Printf("Members: %d\n", session.MemberCount)
    fmt.Printf("Last Active: %v\n", session.LastActive)
}

See Also

  • input/a2a - A2A protocol adapter for agent-to-agent communication
  • output/directory-bridge - Registers agents with AGNTCY directories
  • agentic/identity - DID and verifiable credential management
  • processor/oasf-generator - Generates OASF capability descriptions

Documentation

Overview

Package slim provides an input component that bridges SLIM (Secure Lightweight Instant Messaging) groups to SemStreams using MLS (Messaging Layer Security).

Overview

The SLIM bridge enables cross-organizational agent communication by connecting to SLIM groups and translating messages to/from the SemStreams message format. SLIM provides end-to-end encrypted group messaging using the MLS protocol.

Architecture

┌─────────────────┐     ┌──────────────────┐     ┌─────────────────┐
│  SLIM Groups    │────▶│  SLIM Bridge     │────▶│  NATS/JetStream │
│  (MLS encrypted)│     │  (SemStreams)    │     │  (Agent Dispatch)│
└─────────────────┘     └──────────────────┘     └─────────────────┘
                               │
                               ▼
                        ┌──────────────────┐
                        │  Session Manager │
                        │  (MLS lifecycle) │
                        └──────────────────┘

Components

The package consists of several key components:

  • Component: The main LifecycleComponent that bridges SLIM to NATS
  • SessionManager: Manages MLS group sessions and key ratcheting
  • MessageMapper: Translates between SLIM and SemStreams message formats
  • SLIMClient: Interface for SLIM protocol operations (stub for SDK)

Configuration

Example configuration:

{
  "slim_endpoint": "wss://slim.agntcy.dev",
  "group_ids": ["did:agntcy:group:tenant-123"],
  "key_ratchet_interval": "1h",
  "reconnect_interval": "5s",
  "max_reconnect_attempts": 10,
  "identity_provider": "local"
}

Message Flow

Inbound (SLIM → NATS):

  • User messages → user.message.slim.{group_id}
  • Task delegations → agent.task.slim.{group_id}

Outbound (NATS → SLIM):

  • Agent responses → SLIM group via SendResponse()
  • Task results → SLIM group via SendTaskResult()

Session Lifecycle

  1. Component starts and connects to SLIM service
  2. Joins configured groups, establishing MLS sessions
  3. Receives encrypted messages, decrypts, and publishes to NATS
  4. Periodically ratchets MLS keys for forward secrecy
  5. On shutdown, gracefully leaves all groups

MLS Key Ratcheting

The session manager periodically ratchets MLS keys to maintain forward secrecy. This ensures that compromise of current keys does not expose past messages. The ratchet interval is configurable (default: 1 hour).

Security Considerations

  • All SLIM messages are end-to-end encrypted using MLS
  • Agent DIDs are used for authentication
  • Key material is managed by the SLIM SDK
  • No plaintext message content leaves the bridge

Usage

Register the component with the component registry:

import slim "github.com/c360studio/semstreams/input/slim"

func init() {
    slim.Register(registry)
}

Implementation Status

This package provides the infrastructure for SLIM integration. The actual SLIM protocol implementation requires the AGNTCY SLIM SDK, which provides:

  • MLS protocol implementation
  • Group management
  • Key exchange and ratcheting
  • Message encryption/decryption

The SLIMClient interface defines the required operations and can be implemented when the SDK becomes available.

See Also

  • input/a2a: A2A protocol adapter for agent-to-agent communication
  • output/directory-bridge: Registers agents with AGNTCY directories
  • agentic/identity: DID and verifiable credential management
  • docs/concepts/22-slim-messaging.md: SLIM integration guide

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewComponent

func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)

NewComponent creates a new SLIM bridge component.

func Register

func Register(registry RegistryInterface) error

Register registers the SLIM bridge input component with the given registry.

Types

type Attachment

type Attachment struct {
	// Name is the filename.
	Name string `json:"name"`

	// MimeType is the content type.
	MimeType string `json:"mime_type"`

	// Data is the base64-encoded content.
	Data string `json:"data,omitempty"`

	// URL is an alternative to inline data.
	URL string `json:"url,omitempty"`

	// Size is the file size in bytes.
	Size int64 `json:"size,omitempty"`
}

Attachment represents an attachment in a SLIM message.

type Client

type Client interface {
	// Connect establishes connection to the SLIM service.
	Connect(ctx context.Context) error

	// Disconnect closes the connection to the SLIM service.
	Disconnect(ctx context.Context) error

	// JoinGroup joins a SLIM group.
	JoinGroup(ctx context.Context, groupID string) error

	// LeaveGroup leaves a SLIM group.
	LeaveGroup(ctx context.Context, groupID string) error

	// SendMessage sends a message to a group.
	SendMessage(ctx context.Context, groupID string, message []byte) error

	// ReceiveMessages returns a channel for receiving messages.
	ReceiveMessages() <-chan *Message

	// RatchetKeys performs key ratcheting for a group.
	RatchetKeys(ctx context.Context, groupID string) error

	// GetGroupMembers returns the members of a group.
	GetGroupMembers(ctx context.Context, groupID string) ([]string, error)
}

Client defines the interface for SLIM protocol operations. This is a stub interface - implementation requires the SLIM SDK.

type Component

type Component struct {
	// contains filtered or unexported fields
}

Component implements the SLIM bridge input component. It receives messages from SLIM groups and publishes them to NATS for agent processing.

func (*Component) ConfigSchema

func (c *Component) ConfigSchema() component.ConfigSchema

ConfigSchema returns the configuration schema.

func (*Component) DataFlow

func (c *Component) DataFlow() component.FlowMetrics

DataFlow returns current data flow metrics.

func (*Component) GetSessions

func (c *Component) GetSessions() []*GroupSession

GetSessions returns all active SLIM sessions.

func (*Component) Health

func (c *Component) Health() component.HealthStatus

Health returns the current health status.

func (*Component) Initialize

func (c *Component) Initialize() error

Initialize prepares the component.

func (*Component) InputPorts

func (c *Component) InputPorts() []component.Port

InputPorts returns configured input port definitions.

func (*Component) Meta

func (c *Component) Meta() component.Metadata

Meta returns component metadata.

func (*Component) OutputPorts

func (c *Component) OutputPorts() []component.Port

OutputPorts returns configured output port definitions.

func (*Component) SendResponse

func (c *Component) SendResponse(ctx context.Context, groupID string, response *agentic.UserResponse) error

SendResponse sends a response back to a SLIM group.

func (*Component) SendTaskResult

func (c *Component) SendTaskResult(ctx context.Context, groupID string, result *TaskResult) error

SendTaskResult sends a task result back to a SLIM group.

func (*Component) Start

func (c *Component) Start(ctx context.Context) error

Start begins receiving messages from SLIM groups.

func (*Component) Stop

func (c *Component) Stop(_ time.Duration) error

Stop gracefully stops the component.

type Config

type Config struct {
	// Ports defines the input/output port configuration.
	Ports *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`

	// SLIMEndpoint is the SLIM service endpoint URL.
	SLIMEndpoint string `json:"slim_endpoint" schema:"type:string,description:SLIM service endpoint URL,category:basic"`

	// GroupIDs specifies which SLIM groups to join.
	// If empty, the bridge will dynamically join groups based on tenant configuration.
	GroupIDs []string `json:"group_ids" schema:"type:array,description:SLIM group IDs to join,category:basic"`

	// KeyRatchetInterval is how often to ratchet MLS keys.
	KeyRatchetInterval string `json:"key_ratchet_interval" schema:"type:string,description:MLS key ratchet interval,category:advanced,default:1h"`

	// ReconnectInterval is the delay between reconnection attempts.
	ReconnectInterval string `json:"reconnect_interval" schema:"type:string,description:Reconnection interval,category:advanced,default:5s"`

	// MaxReconnectAttempts is the maximum number of reconnection attempts.
	MaxReconnectAttempts int `json:"max_reconnect_attempts" schema:"type:int,description:Maximum reconnection attempts,category:advanced,default:10"`

	// MessageBufferSize is the size of the message buffer for async processing.
	MessageBufferSize int `json:"message_buffer_size" schema:"type:int,description:Message buffer size,category:advanced,default:1000"`

	// IdentityProvider specifies which identity provider to use for DID resolution.
	IdentityProvider string `json:"identity_provider" schema:"type:string,description:Identity provider type,category:basic,default:local"`

	// ConsumerNameSuffix adds a suffix to consumer names for uniqueness in tests.
	ConsumerNameSuffix string `json:"consumer_name_suffix" schema:"type:string,description:Suffix for consumer names,category:advanced"`

	// DeleteConsumerOnStop enables consumer cleanup on stop (for testing).
	DeleteConsumerOnStop bool `` /* 128-byte string literal not displayed */
}

Config defines the configuration for the SLIM bridge component.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns the default configuration.

func (*Config) GetKeyRatchetInterval

func (c *Config) GetKeyRatchetInterval() time.Duration

GetKeyRatchetInterval returns the key ratchet interval.

func (*Config) GetReconnectInterval

func (c *Config) GetReconnectInterval() time.Duration

GetReconnectInterval returns the reconnect interval.

func (*Config) Validate

func (c *Config) Validate() error

Validate validates the configuration.

type GroupSession

type GroupSession struct {
	// GroupID is the DID-based group identifier.
	GroupID string `json:"group_id"`

	// State is the current session state.
	State SessionState `json:"state"`

	// JoinedAt is when the session was established.
	JoinedAt time.Time `json:"joined_at"`

	// LastActive is when the last message was sent/received.
	LastActive time.Time `json:"last_active"`

	// LastKeyRatchet is when keys were last ratcheted.
	LastKeyRatchet time.Time `json:"last_key_ratchet"`

	// MemberCount is the number of members in the group.
	MemberCount int `json:"member_count"`

	// ErrorMessage contains error details if state is error.
	ErrorMessage string `json:"error_message,omitempty"`
}

GroupSession represents an active SLIM group session.

type Message

type Message struct {
	// GroupID is the group the message was received from.
	GroupID string `json:"group_id"`

	// SenderDID is the DID of the message sender.
	SenderDID string `json:"sender_did"`

	// Content is the decrypted message content.
	Content []byte `json:"content"`

	// Timestamp is when the message was sent.
	Timestamp time.Time `json:"timestamp"`

	// MessageID is a unique identifier for the message.
	MessageID string `json:"message_id"`
}

Message represents a message received from SLIM.

type MessageMapper

type MessageMapper struct {
	// contains filtered or unexported fields
}

MessageMapper translates between SLIM messages and SemStreams messages.

func NewMessageMapper

func NewMessageMapper() *MessageMapper

NewMessageMapper creates a new message mapper.

func (*MessageMapper) CreateTaskMessage

func (m *MessageMapper) CreateTaskMessage(prompt string, requesterDID string, capabilities []string) ([]byte, error)

CreateTaskMessage creates a new SLIM task message for delegation.

func (*MessageMapper) FromTaskResult

func (m *MessageMapper) FromTaskResult(result *TaskResult) ([]byte, error)

FromTaskResult converts a task result to a SLIM response message.

func (*MessageMapper) FromUserResponse

func (m *MessageMapper) FromUserResponse(response *agentic.UserResponse) ([]byte, error)

FromUserResponse converts a SemStreams response to a SLIM response message.

func (*MessageMapper) ParseMessageType

func (m *MessageMapper) ParseMessageType(content []byte) (string, error)

ParseMessageType determines the type of a SLIM message.

func (*MessageMapper) ToTaskMessage

func (m *MessageMapper) ToTaskMessage(slimMsg *Message) (*agentic.TaskMessage, error)

ToTaskMessage converts a SLIM task message to a SemStreams TaskMessage.

func (*MessageMapper) ToUserMessage

func (m *MessageMapper) ToUserMessage(slimMsg *Message) (*agentic.UserMessage, error)

ToUserMessage converts a SLIM message to a SemStreams UserMessage.

type MockSLIMClient

type MockSLIMClient struct {

	// Sent messages
	SentMessages []SentMessage

	// Configurable errors
	ConnectErr     error
	DisconnectErr  error
	JoinGroupErr   error
	LeaveGroupErr  error
	SendMessageErr error
	RatchetKeysErr error
	GetMembersErr  error

	// Configurable members
	GroupMembers map[string][]string
	// contains filtered or unexported fields
}

MockSLIMClient is a mock implementation of SLIMClient for testing.

func NewMockSLIMClient

func NewMockSLIMClient() *MockSLIMClient

NewMockSLIMClient creates a new mock SLIM client.

func (*MockSLIMClient) CallCounts

func (m *MockSLIMClient) CallCounts() map[string]int

CallCounts returns the number of times each method was called.

func (*MockSLIMClient) Close

func (m *MockSLIMClient) Close()

Close closes the mock client's channels.

func (*MockSLIMClient) Connect

func (m *MockSLIMClient) Connect(_ context.Context) error

Connect implements SLIMClient.Connect.

func (*MockSLIMClient) Disconnect

func (m *MockSLIMClient) Disconnect(_ context.Context) error

Disconnect implements SLIMClient.Disconnect.

func (*MockSLIMClient) GetGroupMembers

func (m *MockSLIMClient) GetGroupMembers(_ context.Context, groupID string) ([]string, error)

GetGroupMembers implements SLIMClient.GetGroupMembers.

func (*MockSLIMClient) GetJoinedGroups

func (m *MockSLIMClient) GetJoinedGroups() []string

GetJoinedGroups returns all joined groups.

func (*MockSLIMClient) GetSentMessages

func (m *MockSLIMClient) GetSentMessages() []SentMessage

GetSentMessages returns all messages sent through the client.

func (*MockSLIMClient) IsConnected

func (m *MockSLIMClient) IsConnected() bool

IsConnected returns whether the client is connected.

func (*MockSLIMClient) IsInGroup

func (m *MockSLIMClient) IsInGroup(groupID string) bool

IsInGroup returns whether the client has joined the specified group.

func (*MockSLIMClient) JoinGroup

func (m *MockSLIMClient) JoinGroup(_ context.Context, groupID string) error

JoinGroup implements SLIMClient.JoinGroup.

func (*MockSLIMClient) LeaveGroup

func (m *MockSLIMClient) LeaveGroup(_ context.Context, groupID string) error

LeaveGroup implements SLIMClient.LeaveGroup.

func (*MockSLIMClient) RatchetKeys

func (m *MockSLIMClient) RatchetKeys(_ context.Context, _ string) error

RatchetKeys implements SLIMClient.RatchetKeys.

func (*MockSLIMClient) ReceiveMessages

func (m *MockSLIMClient) ReceiveMessages() <-chan *Message

ReceiveMessages implements SLIMClient.ReceiveMessages.

func (*MockSLIMClient) Reset

func (m *MockSLIMClient) Reset()

Reset resets the mock client state.

func (*MockSLIMClient) SendMessage

func (m *MockSLIMClient) SendMessage(_ context.Context, groupID string, message []byte) error

SendMessage implements SLIMClient.SendMessage.

func (*MockSLIMClient) SimulateMessage

func (m *MockSLIMClient) SimulateMessage(msg *Message)

SimulateMessage simulates receiving a message from SLIM.

type RegistryInterface

type RegistryInterface interface {
	RegisterWithConfig(component.RegistrationConfig) error
}

RegistryInterface defines the minimal interface needed for registration.

type ResponseMessage

type ResponseMessage struct {
	// Type identifies this as a response message.
	Type string `json:"type"`

	// InReplyTo is the original message/task ID.
	InReplyTo string `json:"in_reply_to"`

	// Status indicates success/failure.
	Status string `json:"status"`

	// Content is the response content.
	Content string `json:"content"`

	// Error contains error details if status is error.
	Error string `json:"error,omitempty"`

	// Metadata contains additional response metadata.
	Metadata map[string]any `json:"metadata,omitempty"`
}

ResponseMessage represents a response in SLIM format.

type SentMessage

type SentMessage struct {
	GroupID string
	Content []byte
	SentAt  time.Time
}

SentMessage represents a message that was sent through the mock client.

type SessionManager

type SessionManager struct {
	// contains filtered or unexported fields
}

SessionManager manages SLIM MLS group sessions. This is a stub implementation - full MLS integration requires the SLIM SDK.

func NewSessionManager

func NewSessionManager(config Config, client Client, logger *slog.Logger) *SessionManager

NewSessionManager creates a new SLIM session manager.

func (*SessionManager) ActiveSessionCount

func (sm *SessionManager) ActiveSessionCount() int

ActiveSessionCount returns the number of active sessions.

func (*SessionManager) GetSession

func (sm *SessionManager) GetSession(groupID string) *GroupSession

GetSession returns a copy of the session for a group. Returns nil if the group is not found.

func (*SessionManager) JoinGroup

func (sm *SessionManager) JoinGroup(ctx context.Context, groupID string) error

JoinGroup joins a SLIM group and creates a session.

func (*SessionManager) LeaveGroup

func (sm *SessionManager) LeaveGroup(ctx context.Context, groupID string) error

LeaveGroup leaves a SLIM group and removes the session.

func (*SessionManager) ListSessions

func (sm *SessionManager) ListSessions() []*GroupSession

ListSessions returns copies of all active sessions.

func (*SessionManager) SendMessage

func (sm *SessionManager) SendMessage(ctx context.Context, groupID string, content []byte) error

SendMessage sends a message to a SLIM group.

func (*SessionManager) Start

func (sm *SessionManager) Start(ctx context.Context) error

Start begins the session manager and key ratchet loop.

func (*SessionManager) Stop

func (sm *SessionManager) Stop(ctx context.Context) error

Stop gracefully stops the session manager.

func (*SessionManager) UpdateActivity

func (sm *SessionManager) UpdateActivity(groupID string)

UpdateActivity updates the last activity time for a session.

type SessionState

type SessionState string

SessionState represents the state of a SLIM group session.

const (
	// SessionStateJoining indicates the session is being established.
	SessionStateJoining SessionState = "joining"

	// SessionStateActive indicates the session is active and ready for messages.
	SessionStateActive SessionState = "active"

	// SessionStateRekeying indicates the session is ratcheting keys.
	SessionStateRekeying SessionState = "rekeying"

	// SessionStateLeaving indicates the session is being terminated.
	SessionStateLeaving SessionState = "leaving"

	// SessionStateError indicates the session encountered an error.
	SessionStateError SessionState = "error"
)

type TaskDelegation

type TaskDelegation struct {
	// Type identifies this as a task message.
	Type string `json:"type"`

	// TaskID is the unique task identifier.
	TaskID string `json:"task_id"`

	// Prompt is the task description.
	Prompt string `json:"prompt"`

	// Role specifies the agent role for execution.
	Role string `json:"role,omitempty"`

	// Model specifies the LLM model to use.
	Model string `json:"model,omitempty"`

	// RequestingAgentDID is the DID of the requesting agent.
	RequestingAgentDID string `json:"requesting_agent_did"`

	// TargetCapabilities are the required capabilities.
	TargetCapabilities []string `json:"target_capabilities,omitempty"`

	// Priority is the task priority.
	Priority string `json:"priority,omitempty"`

	// Deadline is the task deadline.
	Deadline *time.Time `json:"deadline,omitempty"`

	// Context contains additional task context.
	Context map[string]any `json:"context,omitempty"`
}

TaskDelegation represents a task delegation in SLIM format.

type TaskResult

type TaskResult struct {
	// TaskID is the task identifier.
	TaskID string `json:"task_id"`

	// Result is the task output content.
	Result string `json:"result"`

	// Error contains error message if task failed.
	Error string `json:"error,omitempty"`

	// CompletedAt is when the task finished.
	CompletedAt time.Time `json:"completed_at"`
}

TaskResult represents the result of a completed task. This is used for outbound SLIM messages when tasks complete.

type UserMessage

type UserMessage struct {
	// Type identifies the message type.
	Type string `json:"type"`

	// Content is the message content.
	Content string `json:"content"`

	// Attachments are optional file attachments.
	Attachments []Attachment `json:"attachments,omitempty"`

	// Metadata contains additional message metadata.
	Metadata map[string]any `json:"metadata,omitempty"`

	// ReplyTo is the message ID this is replying to.
	ReplyTo string `json:"reply_to,omitempty"`

	// ThreadID groups related messages.
	ThreadID string `json:"thread_id,omitempty"`
}

UserMessage represents a user message in SLIM format.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL