Documentation
¶
Overview ¶
Package slim provides an input component that bridges SLIM (Secure Lightweight Instant Messaging) groups to SemStreams using MLS (Messaging Layer Security).
Overview ¶
The SLIM bridge enables cross-organizational agent communication by connecting to SLIM groups and translating messages to/from the SemStreams message format. SLIM provides end-to-end encrypted group messaging using the MLS protocol.
Architecture ¶
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ SLIM Groups │────▶│ SLIM Bridge │────▶│ NATS/JetStream │
│ (MLS encrypted)│ │ (SemStreams) │ │ (Agent Dispatch)│
└─────────────────┘ └──────────────────┘ └─────────────────┘
│
▼
┌──────────────────┐
│ Session Manager │
│ (MLS lifecycle) │
└──────────────────┘
Components ¶
The package consists of several key components:
- Component: The main LifecycleComponent that bridges SLIM to NATS
- SessionManager: Manages MLS group sessions and key ratcheting
- MessageMapper: Translates between SLIM and SemStreams message formats
- SLIMClient: Interface for SLIM protocol operations (stub for SDK)
Configuration ¶
Example configuration:
{
"slim_endpoint": "wss://slim.agntcy.dev",
"group_ids": ["did:agntcy:group:tenant-123"],
"key_ratchet_interval": "1h",
"reconnect_interval": "5s",
"max_reconnect_attempts": 10,
"identity_provider": "local"
}
Message Flow ¶
Inbound (SLIM → NATS):
- User messages → user.message.slim.{group_id}
- Task delegations → agent.task.slim.{group_id}
Outbound (NATS → SLIM):
- Agent responses → SLIM group via SendResponse()
- Task results → SLIM group via SendTaskResult()
Session Lifecycle ¶
- Component starts and connects to SLIM service
- Joins configured groups, establishing MLS sessions
- Receives encrypted messages, decrypts, and publishes to NATS
- Periodically ratchets MLS keys for forward secrecy
- On shutdown, gracefully leaves all groups
MLS Key Ratcheting ¶
The session manager periodically ratchets MLS keys to maintain forward secrecy. This ensures that compromise of current keys does not expose past messages. The ratchet interval is configurable (default: 1 hour).
Security Considerations ¶
- All SLIM messages are end-to-end encrypted using MLS
- Agent DIDs are used for authentication
- Key material is managed by the SLIM SDK
- No plaintext message content leaves the bridge
Usage ¶
Register the component with the component registry:
import slim "github.com/c360studio/semstreams/input/slim"
func init() {
slim.Register(registry)
}
Implementation Status ¶
This package provides the infrastructure for SLIM integration. The actual SLIM protocol implementation requires the AGNTCY SLIM SDK, which provides:
- MLS protocol implementation
- Group management
- Key exchange and ratcheting
- Message encryption/decryption
The SLIMClient interface defines the required operations and can be implemented when the SDK becomes available.
See Also ¶
- input/a2a: A2A protocol adapter for agent-to-agent communication
- output/directory-bridge: Registers agents with AGNTCY directories
- agentic/identity: DID and verifiable credential management
- docs/concepts/22-slim-messaging.md: SLIM integration guide
Index ¶
- func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
- func Register(registry RegistryInterface) error
- type Attachment
- type Client
- type Component
- func (c *Component) ConfigSchema() component.ConfigSchema
- func (c *Component) DataFlow() component.FlowMetrics
- func (c *Component) GetSessions() []*GroupSession
- func (c *Component) Health() component.HealthStatus
- func (c *Component) Initialize() error
- func (c *Component) InputPorts() []component.Port
- func (c *Component) Meta() component.Metadata
- func (c *Component) OutputPorts() []component.Port
- func (c *Component) SendResponse(ctx context.Context, groupID string, response *agentic.UserResponse) error
- func (c *Component) SendTaskResult(ctx context.Context, groupID string, result *TaskResult) error
- func (c *Component) Start(ctx context.Context) error
- func (c *Component) Stop(_ time.Duration) error
- type Config
- type GroupSession
- type Message
- type MessageMapper
- func (m *MessageMapper) CreateTaskMessage(prompt string, requesterDID string, capabilities []string) ([]byte, error)
- func (m *MessageMapper) FromTaskResult(result *TaskResult) ([]byte, error)
- func (m *MessageMapper) FromUserResponse(response *agentic.UserResponse) ([]byte, error)
- func (m *MessageMapper) ParseMessageType(content []byte) (string, error)
- func (m *MessageMapper) ToTaskMessage(slimMsg *Message) (*agentic.TaskMessage, error)
- func (m *MessageMapper) ToUserMessage(slimMsg *Message) (*agentic.UserMessage, error)
- type MockSLIMClient
- func (m *MockSLIMClient) CallCounts() map[string]int
- func (m *MockSLIMClient) Close()
- func (m *MockSLIMClient) Connect(_ context.Context) error
- func (m *MockSLIMClient) Disconnect(_ context.Context) error
- func (m *MockSLIMClient) GetGroupMembers(_ context.Context, groupID string) ([]string, error)
- func (m *MockSLIMClient) GetJoinedGroups() []string
- func (m *MockSLIMClient) GetSentMessages() []SentMessage
- func (m *MockSLIMClient) IsConnected() bool
- func (m *MockSLIMClient) IsInGroup(groupID string) bool
- func (m *MockSLIMClient) JoinGroup(_ context.Context, groupID string) error
- func (m *MockSLIMClient) LeaveGroup(_ context.Context, groupID string) error
- func (m *MockSLIMClient) RatchetKeys(_ context.Context, _ string) error
- func (m *MockSLIMClient) ReceiveMessages() <-chan *Message
- func (m *MockSLIMClient) Reset()
- func (m *MockSLIMClient) SendMessage(_ context.Context, groupID string, message []byte) error
- func (m *MockSLIMClient) SimulateMessage(msg *Message)
- type RegistryInterface
- type ResponseMessage
- type SentMessage
- type SessionManager
- func (sm *SessionManager) ActiveSessionCount() int
- func (sm *SessionManager) GetSession(groupID string) *GroupSession
- func (sm *SessionManager) JoinGroup(ctx context.Context, groupID string) error
- func (sm *SessionManager) LeaveGroup(ctx context.Context, groupID string) error
- func (sm *SessionManager) ListSessions() []*GroupSession
- func (sm *SessionManager) SendMessage(ctx context.Context, groupID string, content []byte) error
- func (sm *SessionManager) Start(ctx context.Context) error
- func (sm *SessionManager) Stop(ctx context.Context) error
- func (sm *SessionManager) UpdateActivity(groupID string)
- type SessionState
- type TaskDelegation
- type TaskResult
- type UserMessage
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewComponent ¶
func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
NewComponent creates a new SLIM bridge component.
func Register ¶
func Register(registry RegistryInterface) error
Register registers the SLIM bridge input component with the given registry.
Types ¶
type Attachment ¶
type Attachment struct {
// Name is the filename.
Name string `json:"name"`
// MimeType is the content type.
MimeType string `json:"mime_type"`
// Data is the base64-encoded content.
Data string `json:"data,omitempty"`
// URL is an alternative to inline data.
URL string `json:"url,omitempty"`
// Size is the file size in bytes.
Size int64 `json:"size,omitempty"`
}
Attachment represents an attachment in a SLIM message.
type Client ¶
type Client interface {
// Connect establishes connection to the SLIM service.
Connect(ctx context.Context) error
// Disconnect closes the connection to the SLIM service.
Disconnect(ctx context.Context) error
// JoinGroup joins a SLIM group.
JoinGroup(ctx context.Context, groupID string) error
// LeaveGroup leaves a SLIM group.
LeaveGroup(ctx context.Context, groupID string) error
// SendMessage sends a message to a group.
SendMessage(ctx context.Context, groupID string, message []byte) error
// ReceiveMessages returns a channel for receiving messages.
ReceiveMessages() <-chan *Message
// RatchetKeys performs key ratcheting for a group.
RatchetKeys(ctx context.Context, groupID string) error
// GetGroupMembers returns the members of a group.
GetGroupMembers(ctx context.Context, groupID string) ([]string, error)
}
Client defines the interface for SLIM protocol operations. This is a stub interface - implementation requires the SLIM SDK.
type Component ¶
type Component struct {
// contains filtered or unexported fields
}
Component implements the SLIM bridge input component. It receives messages from SLIM groups and publishes them to NATS for agent processing.
func (*Component) ConfigSchema ¶
func (c *Component) ConfigSchema() component.ConfigSchema
ConfigSchema returns the configuration schema.
func (*Component) DataFlow ¶
func (c *Component) DataFlow() component.FlowMetrics
DataFlow returns current data flow metrics.
func (*Component) GetSessions ¶
func (c *Component) GetSessions() []*GroupSession
GetSessions returns all active SLIM sessions.
func (*Component) Health ¶
func (c *Component) Health() component.HealthStatus
Health returns the current health status.
func (*Component) Initialize ¶
Initialize prepares the component.
func (*Component) InputPorts ¶
InputPorts returns configured input port definitions.
func (*Component) OutputPorts ¶
OutputPorts returns configured output port definitions.
func (*Component) SendResponse ¶
func (c *Component) SendResponse(ctx context.Context, groupID string, response *agentic.UserResponse) error
SendResponse sends a response back to a SLIM group.
func (*Component) SendTaskResult ¶
SendTaskResult sends a task result back to a SLIM group.
type Config ¶
type Config struct {
// Ports defines the input/output port configuration.
Ports *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`
// SLIMEndpoint is the SLIM service endpoint URL.
SLIMEndpoint string `json:"slim_endpoint" schema:"type:string,description:SLIM service endpoint URL,category:basic"`
// GroupIDs specifies which SLIM groups to join.
// If empty, the bridge will dynamically join groups based on tenant configuration.
GroupIDs []string `json:"group_ids" schema:"type:array,description:SLIM group IDs to join,category:basic"`
// KeyRatchetInterval is how often to ratchet MLS keys.
KeyRatchetInterval string `json:"key_ratchet_interval" schema:"type:string,description:MLS key ratchet interval,category:advanced,default:1h"`
// ReconnectInterval is the delay between reconnection attempts.
ReconnectInterval string `json:"reconnect_interval" schema:"type:string,description:Reconnection interval,category:advanced,default:5s"`
// MaxReconnectAttempts is the maximum number of reconnection attempts.
MaxReconnectAttempts int `json:"max_reconnect_attempts" schema:"type:int,description:Maximum reconnection attempts,category:advanced,default:10"`
// MessageBufferSize is the size of the message buffer for async processing.
MessageBufferSize int `json:"message_buffer_size" schema:"type:int,description:Message buffer size,category:advanced,default:1000"`
// IdentityProvider specifies which identity provider to use for DID resolution.
IdentityProvider string `json:"identity_provider" schema:"type:string,description:Identity provider type,category:basic,default:local"`
// ConsumerNameSuffix adds a suffix to consumer names for uniqueness in tests.
ConsumerNameSuffix string `json:"consumer_name_suffix" schema:"type:string,description:Suffix for consumer names,category:advanced"`
// DeleteConsumerOnStop enables consumer cleanup on stop (for testing).
DeleteConsumerOnStop bool `` /* 128-byte string literal not displayed */
}
Config defines the configuration for the SLIM bridge component.
func (*Config) GetKeyRatchetInterval ¶
GetKeyRatchetInterval returns the key ratchet interval.
func (*Config) GetReconnectInterval ¶
GetReconnectInterval returns the reconnect interval.
type GroupSession ¶
type GroupSession struct {
// GroupID is the DID-based group identifier.
GroupID string `json:"group_id"`
// State is the current session state.
State SessionState `json:"state"`
// JoinedAt is when the session was established.
JoinedAt time.Time `json:"joined_at"`
// LastActive is when the last message was sent/received.
LastActive time.Time `json:"last_active"`
// LastKeyRatchet is when keys were last ratcheted.
LastKeyRatchet time.Time `json:"last_key_ratchet"`
// MemberCount is the number of members in the group.
MemberCount int `json:"member_count"`
// ErrorMessage contains error details if state is error.
ErrorMessage string `json:"error_message,omitempty"`
}
GroupSession represents an active SLIM group session.
type Message ¶
type Message struct {
// GroupID is the group the message was received from.
GroupID string `json:"group_id"`
// SenderDID is the DID of the message sender.
SenderDID string `json:"sender_did"`
// Content is the decrypted message content.
Content []byte `json:"content"`
// Timestamp is when the message was sent.
Timestamp time.Time `json:"timestamp"`
// MessageID is a unique identifier for the message.
MessageID string `json:"message_id"`
}
Message represents a message received from SLIM.
type MessageMapper ¶
type MessageMapper struct {
// contains filtered or unexported fields
}
MessageMapper translates between SLIM messages and SemStreams messages.
func NewMessageMapper ¶
func NewMessageMapper() *MessageMapper
NewMessageMapper creates a new message mapper.
func (*MessageMapper) CreateTaskMessage ¶
func (m *MessageMapper) CreateTaskMessage(prompt string, requesterDID string, capabilities []string) ([]byte, error)
CreateTaskMessage creates a new SLIM task message for delegation.
func (*MessageMapper) FromTaskResult ¶
func (m *MessageMapper) FromTaskResult(result *TaskResult) ([]byte, error)
FromTaskResult converts a task result to a SLIM response message.
func (*MessageMapper) FromUserResponse ¶
func (m *MessageMapper) FromUserResponse(response *agentic.UserResponse) ([]byte, error)
FromUserResponse converts a SemStreams response to a SLIM response message.
func (*MessageMapper) ParseMessageType ¶
func (m *MessageMapper) ParseMessageType(content []byte) (string, error)
ParseMessageType determines the type of a SLIM message.
func (*MessageMapper) ToTaskMessage ¶
func (m *MessageMapper) ToTaskMessage(slimMsg *Message) (*agentic.TaskMessage, error)
ToTaskMessage converts a SLIM task message to a SemStreams TaskMessage.
func (*MessageMapper) ToUserMessage ¶
func (m *MessageMapper) ToUserMessage(slimMsg *Message) (*agentic.UserMessage, error)
ToUserMessage converts a SLIM message to a SemStreams UserMessage.
type MockSLIMClient ¶
type MockSLIMClient struct {
// Sent messages
SentMessages []SentMessage
// Configurable errors
ConnectErr error
DisconnectErr error
JoinGroupErr error
LeaveGroupErr error
SendMessageErr error
RatchetKeysErr error
GetMembersErr error
// Configurable members
GroupMembers map[string][]string
// contains filtered or unexported fields
}
MockSLIMClient is a mock implementation of SLIMClient for testing.
func NewMockSLIMClient ¶
func NewMockSLIMClient() *MockSLIMClient
NewMockSLIMClient creates a new mock SLIM client.
func (*MockSLIMClient) CallCounts ¶
func (m *MockSLIMClient) CallCounts() map[string]int
CallCounts returns the number of times each method was called.
func (*MockSLIMClient) Close ¶
func (m *MockSLIMClient) Close()
Close closes the mock client's channels.
func (*MockSLIMClient) Connect ¶
func (m *MockSLIMClient) Connect(_ context.Context) error
Connect implements SLIMClient.Connect.
func (*MockSLIMClient) Disconnect ¶
func (m *MockSLIMClient) Disconnect(_ context.Context) error
Disconnect implements SLIMClient.Disconnect.
func (*MockSLIMClient) GetGroupMembers ¶
GetGroupMembers implements SLIMClient.GetGroupMembers.
func (*MockSLIMClient) GetJoinedGroups ¶
func (m *MockSLIMClient) GetJoinedGroups() []string
GetJoinedGroups returns all joined groups.
func (*MockSLIMClient) GetSentMessages ¶
func (m *MockSLIMClient) GetSentMessages() []SentMessage
GetSentMessages returns all messages sent through the client.
func (*MockSLIMClient) IsConnected ¶
func (m *MockSLIMClient) IsConnected() bool
IsConnected returns whether the client is connected.
func (*MockSLIMClient) IsInGroup ¶
func (m *MockSLIMClient) IsInGroup(groupID string) bool
IsInGroup returns whether the client has joined the specified group.
func (*MockSLIMClient) JoinGroup ¶
func (m *MockSLIMClient) JoinGroup(_ context.Context, groupID string) error
JoinGroup implements SLIMClient.JoinGroup.
func (*MockSLIMClient) LeaveGroup ¶
func (m *MockSLIMClient) LeaveGroup(_ context.Context, groupID string) error
LeaveGroup implements SLIMClient.LeaveGroup.
func (*MockSLIMClient) RatchetKeys ¶
func (m *MockSLIMClient) RatchetKeys(_ context.Context, _ string) error
RatchetKeys implements SLIMClient.RatchetKeys.
func (*MockSLIMClient) ReceiveMessages ¶
func (m *MockSLIMClient) ReceiveMessages() <-chan *Message
ReceiveMessages implements SLIMClient.ReceiveMessages.
func (*MockSLIMClient) SendMessage ¶
SendMessage implements SLIMClient.SendMessage.
func (*MockSLIMClient) SimulateMessage ¶
func (m *MockSLIMClient) SimulateMessage(msg *Message)
SimulateMessage simulates receiving a message from SLIM.
type RegistryInterface ¶
type RegistryInterface interface {
RegisterWithConfig(component.RegistrationConfig) error
}
RegistryInterface defines the minimal interface needed for registration.
type ResponseMessage ¶
type ResponseMessage struct {
// Type identifies this as a response message.
Type string `json:"type"`
// InReplyTo is the original message/task ID.
InReplyTo string `json:"in_reply_to"`
// Status indicates success/failure.
Status string `json:"status"`
// Content is the response content.
Content string `json:"content"`
// Error contains error details if status is error.
Error string `json:"error,omitempty"`
// Metadata contains additional response metadata.
Metadata map[string]any `json:"metadata,omitempty"`
}
ResponseMessage represents a response in SLIM format.
type SentMessage ¶
SentMessage represents a message that was sent through the mock client.
type SessionManager ¶
type SessionManager struct {
// contains filtered or unexported fields
}
SessionManager manages SLIM MLS group sessions. This is a stub implementation - full MLS integration requires the SLIM SDK.
func NewSessionManager ¶
func NewSessionManager(config Config, client Client, logger *slog.Logger) *SessionManager
NewSessionManager creates a new SLIM session manager.
func (*SessionManager) ActiveSessionCount ¶
func (sm *SessionManager) ActiveSessionCount() int
ActiveSessionCount returns the number of active sessions.
func (*SessionManager) GetSession ¶
func (sm *SessionManager) GetSession(groupID string) *GroupSession
GetSession returns a copy of the session for a group. Returns nil if the group is not found.
func (*SessionManager) JoinGroup ¶
func (sm *SessionManager) JoinGroup(ctx context.Context, groupID string) error
JoinGroup joins a SLIM group and creates a session.
func (*SessionManager) LeaveGroup ¶
func (sm *SessionManager) LeaveGroup(ctx context.Context, groupID string) error
LeaveGroup leaves a SLIM group and removes the session.
func (*SessionManager) ListSessions ¶
func (sm *SessionManager) ListSessions() []*GroupSession
ListSessions returns copies of all active sessions.
func (*SessionManager) SendMessage ¶
SendMessage sends a message to a SLIM group.
func (*SessionManager) Start ¶
func (sm *SessionManager) Start(ctx context.Context) error
Start begins the session manager and key ratchet loop.
func (*SessionManager) Stop ¶
func (sm *SessionManager) Stop(ctx context.Context) error
Stop gracefully stops the session manager.
func (*SessionManager) UpdateActivity ¶
func (sm *SessionManager) UpdateActivity(groupID string)
UpdateActivity updates the last activity time for a session.
type SessionState ¶
type SessionState string
SessionState represents the state of a SLIM group session.
const ( // SessionStateJoining indicates the session is being established. SessionStateJoining SessionState = "joining" // SessionStateActive indicates the session is active and ready for messages. SessionStateActive SessionState = "active" // SessionStateRekeying indicates the session is ratcheting keys. SessionStateRekeying SessionState = "rekeying" // SessionStateLeaving indicates the session is being terminated. SessionStateLeaving SessionState = "leaving" // SessionStateError indicates the session encountered an error. SessionStateError SessionState = "error" )
type TaskDelegation ¶
type TaskDelegation struct {
// Type identifies this as a task message.
Type string `json:"type"`
// TaskID is the unique task identifier.
TaskID string `json:"task_id"`
// Prompt is the task description.
Prompt string `json:"prompt"`
// Role specifies the agent role for execution.
Role string `json:"role,omitempty"`
// Model specifies the LLM model to use.
Model string `json:"model,omitempty"`
// RequestingAgentDID is the DID of the requesting agent.
RequestingAgentDID string `json:"requesting_agent_did"`
// TargetCapabilities are the required capabilities.
TargetCapabilities []string `json:"target_capabilities,omitempty"`
// Priority is the task priority.
Priority string `json:"priority,omitempty"`
// Deadline is the task deadline.
Deadline *time.Time `json:"deadline,omitempty"`
// Context contains additional task context.
Context map[string]any `json:"context,omitempty"`
}
TaskDelegation represents a task delegation in SLIM format.
type TaskResult ¶
type TaskResult struct {
// TaskID is the task identifier.
TaskID string `json:"task_id"`
// Result is the task output content.
Result string `json:"result"`
// Error contains error message if task failed.
Error string `json:"error,omitempty"`
// CompletedAt is when the task finished.
CompletedAt time.Time `json:"completed_at"`
}
TaskResult represents the result of a completed task. This is used for outbound SLIM messages when tasks complete.
type UserMessage ¶
type UserMessage struct {
// Type identifies the message type.
Type string `json:"type"`
// Content is the message content.
Content string `json:"content"`
// Attachments are optional file attachments.
Attachments []Attachment `json:"attachments,omitempty"`
// Metadata contains additional message metadata.
Metadata map[string]any `json:"metadata,omitempty"`
// ReplyTo is the message ID this is replying to.
ReplyTo string `json:"reply_to,omitempty"`
// ThreadID groups related messages.
ThreadID string `json:"thread_id,omitempty"`
}
UserMessage represents a user message in SLIM format.