storage

package
v0.24.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 9, 2026 License: MIT Imports: 14 Imported by: 0

README

Storage Plugin / 存储插件

A production-grade message storage plugin for HotPlex ChatApps, supporting SQLite, PostgreSQL, and in-memory backends with streaming message buffering.


Architecture

graph TD
    subgraph ChatApps Layer
        Adapter[ChatAdapter]
        Plugin[MessageStorePlugin]
        Transformer[MessageTransformer]
    end

    subgraph Storage Layer
        Interface[ChatAppMessageStore]
        WriteOnly[WriteOnlyStore]
        ReadOnly[ReadOnlyStore]
        Session[SessionStore]
        Memory[MemoryStore]
        SQLite[SQLiteStore]
        PostgreSQL[PostgreSQLStore]
    end

    Adapter --> Plugin
    Plugin --> Transformer
    Transformer --> Interface
    Interface --> WriteOnly
    Interface --> ReadOnly
    Interface --> Session
    WriteOnly --> Memory
    WriteOnly --> SQLite
    WriteOnly --> PostgreSQL

Features

  • Multi-Backend Support: SQLite (L1), PostgreSQL (L2), In-Memory
  • Streaming Buffer: Memory-efficient buffering for LLM token streams
  • Fallback Strategy: Automatic direct storage when buffer overflows
  • Retry Mechanism: Exponential backoff for transient failures
  • ISP-Compliant Interfaces: ReadOnlyStore, WriteOnlyStore, SessionStore
  • Soft Delete: Messages marked as deleted, not physically removed
  • Session Metadata: Track last message, message count per session

New: Reliability Features

Fallback Storage

When stream buffer is full and no expired buffers can be evicted, the system automatically falls back to direct storage:

// When buffer overflows, log warning and store directly
logger.Warn("stream buffer full, falling back to direct storage",
    "max_buffers", 2,
    "session_id", sessionID,
    "fallback", "direct_store")
return s.store.StoreBotResponse(ctx, &ChatAppMessage{...})
Retry Mechanism

Built-in retry with exponential backoff for storage failures:

// Configuration
RetryConfig{
    MaxAttempts:  3,
    InitialDelay: 100ms,
    MaxDelay:     2s,
    Multiplier:   2.0,
}

// Automatic retry on failure
withRetry(ctx, logger, "StoreUserMessage", func() error {
    return store.StoreUserMessage(ctx, msg)
})

Quick Start

1. Create Storage Backend
import "github.com/hrygo/hotplex/plugins/storage"

// SQLite (recommended for edge deployments)
cfg := storage.SQLiteConfig{
    Path:      "~/.hotplex/messages.db",
    MaxSizeMB: 512,
}
store, err := storage.NewSQLiteStore(cfg)

// PostgreSQL (recommended for production)
cfg := storage.PostgresConfig{
    DSN:            "postgres://user:pass@localhost:5432/hotplex",
    MaxConnections: 10,
    Level:          1, // 1=million, 2=hundred million
}
store, err := storage.NewPostgreSQLStore(cfg)

// In-Memory (for testing)
store := storage.NewMemoryStore()
2. Store Messages
// Store user message
msg := &storage.ChatAppMessage{
    ChatSessionID: "slack:U123:U456:C789:TS123",
    ChatPlatform:  "slack",
    ChatUserID:    "U123",
    MessageType:   types.MessageTypeAnswer,
    Content:       "Hello, bot!",
    CreatedAt:     time.Now(),
}
err := store.StoreUserMessage(ctx, msg)

// Store bot response
botMsg := &storage.ChatAppMessage{
    ChatSessionID: "slack:U123:U456:C789:TS123",
    MessageType:   types.MessageTypeAnswer,
    Content:       "Hello! How can I help?",
    CreatedAt:     time.Now(),
}
err := store.StoreBotResponse(ctx, botMsg)
3. Query Messages
query := &storage.MessageQuery{
    ChatSessionID: "slack:U123:U456:C789:TS123",
    Limit:         50,
    Ascending:     true,
}
messages, err := store.List(ctx, query)

Configuration

YAML Configuration
message_store:
  enabled: true
  type: sqlite          # sqlite | postgres | memory
  sqlite:
    path: ~/.hotplex/chatapp_messages.db
    max_size_mb: 512
  postgres:
    dsn: postgres://user:pass@localhost:5432/hotplex
    max_connections: 10
    level: 1
  strategy: default     # default | verbose | minimal
  streaming:
    enabled: true
    timeout: 5m
    max_buffers: 1000
    storage_policy: complete_only  # complete_only | all_chunks
  retry:
    max_attempts: 3
    initial_delay: 100ms
    max_delay: 2s
    multiplier: 2.0
Environment Variables
Variable Description Default
HOTPLEX_MESSAGE_STORE_TYPE Storage backend type memory
HOTPLEX_MESSAGE_STORE_SQLITE_PATH SQLite database path ~/.hotplex/messages.db
HOTPLEX_MESSAGE_STORE_POSTGRES_DSN PostgreSQL connection string -

Interfaces

ISP-Compliant Design
// Read-only operations
type ReadOnlyStore interface {
    Get(ctx context.Context, messageID string) (*ChatAppMessage, error)
    List(ctx context.Context, query *MessageQuery) ([]*ChatAppMessage, error)
    Count(ctx context.Context, query *MessageQuery) (int64, error)
}

// Write-only operations (minimal interface for StreamMessageStore)
type WriteOnlyStore interface {
    StoreUserMessage(ctx context.Context, msg *ChatAppMessage) error
    StoreBotResponse(ctx context.Context, msg *ChatAppMessage) error
}

// Session metadata operations
type SessionStore interface {
    GetSessionMeta(ctx context.Context, chatSessionID string) (*SessionMeta, error)
    ListUserSessions(ctx context.Context, platform, userID string) ([]string, error)
    DeleteSession(ctx context.Context, chatSessionID string) error
}

// Combined interface
type ChatAppMessageStore interface {
    ReadOnlyStore
    WriteOnlyStore
    SessionStore
    Initialize(ctx context.Context) error
    Close() error
}

Streaming Support

The streaming buffer prevents database I/O thrashing by accumulating chunks in memory and persisting only the final merged content.

Buffer Overflow Handling
// When buffer is full (> max_buffers):
// 1. Try to evict expired buffers
// 2. If no expired buffers, fall back to direct storage
// 3. Log warning for observability

func (s *StreamMessageStore) OnStreamChunk(ctx context.Context, sessionID, chunk string) error {
    if len(s.buffers) >= s.maxBuffers {
        // Try evict expired buffers
        for id, buf := range s.buffers {
            if buf.IsExpired(s.timeout) {
                delete(s.buffers, id)
                evicted = true
                break
            }
        }
        // Fall back to direct storage if no eviction
        if !evicted {
            return s.store.StoreBotResponse(ctx, &ChatAppMessage{
                ChatSessionID: sessionID,
                Content:       chunk,
            })
        }
    }
    // Normal buffering
    buf.Append(chunk)
    return nil
}
Stream Completion
// When stream completes:
// 1. Merge all chunks into single message
// 2. Store merged content
// 3. Clean up buffer

func (s *StreamMessageStore) OnStreamComplete(ctx context.Context, sessionID string, msg *ChatAppMessage) error {
    mergedContent := buf.Merge()
    msg.Content = mergedContent
    
    err := s.store.StoreBotResponse(ctx, msg)
    
    // Clean up buffer after successful storage
    delete(s.buffers, sessionID)
    
    return err
}

Data Model

type ChatAppMessage struct {
    ID                string
    ChatSessionID     string
    ChatPlatform      string
    ChatUserID        string
    ChatBotUserID     string
    ChatChannelID     string
    ChatThreadID      string
    EngineSessionID   uuid.UUID
    EngineNamespace   string
    ProviderSessionID string
    ProviderType      string
    MessageType       types.MessageType
    FromUserID        string
    FromUserName      string
    ToUserID          string
    Content           string
    Metadata          map[string]any
    CreatedAt         time.Time
    UpdatedAt         time.Time
    Deleted           bool
    DeletedAt         *time.Time
}

Testing

# Run all storage tests
go test -v ./plugins/storage/...

# Run with race detection
go test -race ./plugins/storage/...

# Run specific backend tests
go test -v ./plugins/storage/... -run SQLite
go test -v ./plugins/storage/... -run PostgreSQL

# Run ChatApp storage integration tests
go test -v ./chatapps/base/... -run E2E
go test -v ./chatapps/slack/... -run Storage

存储插件

English | 简体中文

HotPlex ChatApps 的生产级消息存储插件,支持 SQLite、PostgreSQL 和内存后端,具备流式消息缓冲。


架构

graph TD
    subgraph ChatApps 层
        Adapter[ChatAdapter]
        Plugin[MessageStorePlugin]
        Transformer[MessageTransformer]
    end

    subgraph 存储层
        Interface[ChatAppMessageStore]
        WriteOnly[WriteOnlyStore]
        ReadOnly[ReadOnlyStore]
        Session[SessionStore]
        Memory[MemoryStore]
        SQLite[SQLiteStore]
        PostgreSQL[PostgreSQLStore]
    end

    Adapter --> Plugin
    Plugin --> Transformer
    Transformer --> Interface
    Interface --> WriteOnly
    Interface --> ReadOnly
    Interface --> Session
    WriteOnly --> Memory
    WriteOnly --> SQLite
    WriteOnly --> PostgreSQL

特性

  • 多后端支持: SQLite (L1)、PostgreSQL (L2)、内存
  • 流式缓冲: LLM token 流的内存高效缓冲
  • 降级策略: 缓冲区溢出时自动降级为直接存储
  • 重试机制: 指数退避的存储失败重试
  • ISP 合规接口: ReadOnlyStoreWriteOnlyStoreSessionStore
  • 软删除: 消息标记删除,非物理删除
  • 会话元数据: 追踪最近消息、每会话消息计数

新增:可靠性特性

降级存储

当流缓冲区满且无过期缓冲可清理时,系统自动降级为直接存储:

// 缓冲区溢出时,记录警告并直接存储
logger.Warn("stream buffer full, falling back to direct storage",
    "max_buffers", 2,
    "session_id", sessionID,
    "fallback", "direct_store")
return s.store.StoreBotResponse(ctx, &ChatAppMessage{...})
重试机制

内置存储失败的指数退避重试:

// 配置
RetryConfig{
    MaxAttempts:  3,
    InitialDelay: 100ms,
    MaxDelay:     2s,
    Multiplier:   2.0,
}

// 失败时自动重试
withRetry(ctx, logger, "StoreUserMessage", func() error {
    return store.StoreUserMessage(ctx, msg)
})

快速开始

1. 创建存储后端
import "github.com/hrygo/hotplex/plugins/storage"

// SQLite (推荐边缘部署)
cfg := storage.SQLiteConfig{
    Path:      "~/.hotplex/messages.db",
    MaxSizeMB: 512,
}
store, err := storage.NewSQLiteStore(cfg)

// PostgreSQL (推荐生产环境)
cfg := storage.PostgresConfig{
    DSN:            "postgres://user:pass@localhost:5432/hotplex",
    MaxConnections: 10,
    Level:          1, // 1=百万级, 2=亿级
}
store, err := storage.NewPostgreSQLStore(cfg)

// 内存 (用于测试)
store := storage.NewMemoryStore()
2. 存储消息
// 存储用户消息
msg := &storage.ChatAppMessage{
    ChatSessionID: "slack:U123:U456:C789:TS123",
    ChatPlatform:  "slack",
    ChatUserID:    "U123",
    MessageType:   types.MessageTypeAnswer,
    Content:       "你好,机器人!",
    CreatedAt:     time.Now(),
}
err := store.StoreUserMessage(ctx, msg)

// 存储 Bot 响应
botMsg := &storage.ChatAppMessage{
    ChatSessionID: "slack:U123:U456:C789:TS123",
    MessageType:   types.MessageTypeAnswer,
    Content:       "你好!有什么可以帮你的?",
    CreatedAt:     time.Now(),
}
err := store.StoreBotResponse(ctx, botMsg)
3. 查询消息
query := &storage.MessageQuery{
    ChatSessionID: "slack:U123:U456:C789:TS123",
    Limit:         50,
    Ascending:     true,
}
messages, err := store.List(ctx, query)

配置

YAML 配置
message_store:
  enabled: true
  type: sqlite          # sqlite | postgres | memory
  sqlite:
    path: ~/.hotplex/chatapp_messages.db
    max_size_mb: 512
  postgres:
    dsn: postgres://user:pass@localhost:5432/hotplex
    max_connections: 10
    level: 1
  strategy: default     # default | verbose | minimal
  streaming:
    enabled: true
    timeout: 5m
    max_buffers: 1000
    storage_policy: complete_only  # complete_only | all_chunks
  retry:
    max_attempts: 3
    initial_delay: 100ms
    max_delay: 2s
    multiplier: 2.0
环境变量
变量 描述 默认值
HOTPLEX_MESSAGE_STORE_TYPE 存储后端类型 memory
HOTPLEX_MESSAGE_STORE_SQLITE_PATH SQLite 数据库路径 ~/.hotplex/messages.db
HOTPLEX_MESSAGE_STORE_POSTGRES_DSN PostgreSQL 连接字符串 -

接口

ISP 合规设计
// 只读操作
type ReadOnlyStore interface {
    Get(ctx context.Context, messageID string) (*ChatAppMessage, error)
    List(ctx context.Context, query *MessageQuery) ([]*ChatAppMessage, error)
    Count(ctx context.Context, query *MessageQuery) (int64, error)
}

// 只写操作 (StreamMessageStore 使用的最小接口)
type WriteOnlyStore interface {
    StoreUserMessage(ctx context.Context, msg *ChatAppMessage) error
    StoreBotResponse(ctx context.Context, msg *ChatAppMessage) error
}

// 会话元数据操作
type SessionStore interface {
    GetSessionMeta(ctx context.Context, chatSessionID string) (*SessionMeta, error)
    ListUserSessions(ctx context.Context, platform, userID string) ([]string, error)
    DeleteSession(ctx context.Context, chatSessionID string) error
}

// 组合接口
type ChatAppMessageStore interface {
    ReadOnlyStore
    WriteOnlyStore
    SessionStore
    Initialize(ctx context.Context) error
    Close() error
}

流式支持

流式缓冲通过在内存中累积块并仅持久化最终合并内容,防止数据库 I/O 抖动。

缓冲区溢出处理
// 当缓冲区满 (> max_buffers):
// 1. 尝试驱逐过期缓冲
// 2. 无过期缓冲则降级为直接存储
// 3. 记录警告日志

func (s *StreamMessageStore) OnStreamChunk(ctx context.Context, sessionID, chunk string) error {
    if len(s.buffers) >= s.maxBuffers {
        // 尝试驱逐过期缓冲
        for id, buf := range s.buffers {
            if buf.IsExpired(s.timeout) {
                delete(s.buffers, id)
                evicted = true
                break
            }
        }
        // 无法驱逐则降级为直接存储
        if !evicted {
            return s.store.StoreBotResponse(ctx, &ChatAppMessage{
                ChatSessionID: sessionID,
                Content:       chunk,
            })
        }
    }
    // 正常缓冲
    buf.Append(chunk)
    return nil
}
流完成处理
// 流完成时:
// 1. 合并所有块为单条消息
// 2. 存储合并内容
// 3. 清理缓冲

func (s *StreamMessageStore) OnStreamComplete(ctx context.Context, sessionID string, msg *ChatAppMessage) error {
    mergedContent := buf.Merge()
    msg.Content = mergedContent
    
    err := s.store.StoreBotResponse(ctx, msg)
    
    // 成功存储后清理缓冲
    delete(s.buffers, sessionID)
    
    return err
}

数据模型

type ChatAppMessage struct {
    ID                string
    ChatSessionID     string    // 会话 ID
    ChatPlatform      string    // 平台 (slack/feishu 等)
    ChatUserID        string    // 用户 ID
    ChatBotUserID     string    // Bot 用户 ID
    ChatChannelID     string    // 频道 ID
    ChatThreadID      string    // 线程 ID
    EngineSessionID   uuid.UUID // 引擎会话 ID
    EngineNamespace   string    // 引擎命名空间
    ProviderSessionID string    // 提供商会话 ID
    ProviderType      string    // 提供商类型
    MessageType       types.MessageType
    FromUserID        string
    FromUserName      string
    ToUserID          string
    Content           string
    Metadata          map[string]any
    CreatedAt         time.Time
    UpdatedAt         time.Time
    Deleted           bool
    DeletedAt         *time.Time
}

测试

# 运行所有存储测试
go test -v ./plugins/storage/...

# 带竞态检测
go test -race ./plugins/storage/...

# 运行特定后端测试
go test -v ./plugins/storage/... -run SQLite
go test -v ./plugins/storage/... -run PostgreSQL

# 运行 ChatApp 存储集成测试
go test -v ./chatapps/base/... -run E2E
go test -v ./chatapps/slack/... -run Storage

状态: 生产就绪 维护者: HotPlex Core Team 版本: v1.1 (2026-03-08)

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNotFound          = errors.New("message not found")
	ErrSessionNotFound   = errors.New("session not found")
	ErrInvalidMessage    = errors.New("invalid message")
	ErrStorageNotEnabled = errors.New("storage not enabled")
	ErrConnectionFailed  = errors.New("database connection failed")
	ErrQueryFailed       = errors.New("query execution failed")
	ErrStoreFailed       = errors.New("message store failed")
	ErrInvalidConfig     = errors.New("invalid storage configuration")
	ErrUnsupportedType   = errors.New("unsupported storage type")
	ErrSessionClosed     = errors.New("storage session closed")
	ErrTransactionFailed = errors.New("transaction failed")
)

Common storage errors

Functions

func BackupStorage

func BackupStorage(store ChatAppMessageStore, backupPath string) error

BackupStorage 备份存储

func BuildSessionID

func BuildSessionID(platform, userID, channelID string) string

BuildSessionID 构建会话ID

func ExportToJSON

func ExportToJSON(store ChatAppMessageStore, outputPath string, query *MessageQuery) error

ExportToJSON 将消息导出为 JSON

func FormatTimestamp

func FormatTimestamp(t time.Time) string

FormatTimestamp 格式化时间戳

func GenerateProviderSessionID

func GenerateProviderSessionID() string

GenerateProviderSessionID 生成 Provider 会话ID

func ImportFromJSON

func ImportFromJSON(store ChatAppMessageStore, inputPath string) (int, error)

ImportFromJSON 从 JSON 导入消息

func IsConfigError

func IsConfigError(err error) bool

IsConfigError 检查是否为配置错误

func IsConnectionError

func IsConnectionError(err error) bool

IsConnectionError 检查是否为连接错误

func IsNotFound

func IsNotFound(err error) bool

IsNotFound 检查是否为未找到错误

func MaskSensitiveData

func MaskSensitiveData(data string) string

MaskSensitiveData 脱敏敏感数据

func ParseMessageType

func ParseMessageType(s string) string

ParseMessageType 解析消息类型

func SanitizeContent

func SanitizeContent(content string, maxLength int) string

SanitizeContent 清理消息内容

func TruncateForLog

func TruncateForLog(content string, maxLen int) string

TruncateForLog 截断日志内容

func ValidateMessage

func ValidateMessage(msg *ChatAppMessage) error

ValidateMessage 验证消息必填字段

Types

type ChatAppMessage

type ChatAppMessage struct {
	ID                string
	ChatSessionID     string
	ChatPlatform      string
	ChatUserID        string
	ChatBotUserID     string
	ChatChannelID     string
	ChatThreadID      string
	EngineSessionID   uuid.UUID
	EngineNamespace   string
	ProviderSessionID string
	ProviderType      string
	MessageType       types.MessageType
	FromUserID        string
	FromUserName      string
	ToUserID          string
	Content           string
	Metadata          map[string]any
	CreatedAt         time.Time
	UpdatedAt         time.Time
	Deleted           bool
	DeletedAt         *time.Time
}

ChatAppMessage 存储层消息实体

type ChatAppMessageStore

type ChatAppMessageStore interface {
	ReadOnlyStore
	WriteOnlyStore
	SessionStore
	Initialize(ctx context.Context) error
	Close() error
	Name() string
	Version() string
}

ChatAppMessageStore 完整接口

type Check

type Check struct {
	Status  string        `json:"status"` // pass, fail, warn
	Message string        `json:"message"`
	Latency time.Duration `json:"latency"`
}

Check 单项检查结果

type ConfigLoader

type ConfigLoader struct {
	// contains filtered or unexported fields
}

ConfigLoader 配置加载器

func NewConfigLoader

func NewConfigLoader(path string) *ConfigLoader

NewConfigLoader 创建配置加载器

func (*ConfigLoader) LoadStorageConfig

func (l *ConfigLoader) LoadStorageConfig() (*StorageConfig, error)

LoadStorageConfig 从 YAML 文件加载存储配置

type DefaultStrategy

type DefaultStrategy struct{}

DefaultStrategy 默认策略

func NewDefaultStrategy

func NewDefaultStrategy() *DefaultStrategy

func (*DefaultStrategy) AfterStore

func (s *DefaultStrategy) AfterStore(ctx context.Context, msg *ChatAppMessage) error

func (*DefaultStrategy) BeforeStore

func (s *DefaultStrategy) BeforeStore(ctx context.Context, msg *ChatAppMessage) error

func (*DefaultStrategy) ShouldStore

func (s *DefaultStrategy) ShouldStore(msg *ChatAppMessage) bool

type HealthCheckResult

type HealthCheckResult struct {
	Status    string           `json:"status"` // healthy, degraded, unhealthy
	Latency   time.Duration    `json:"latency"`
	Timestamp time.Time        `json:"timestamp"`
	Checks    map[string]Check `json:"checks"`
}

HealthCheckResult 健康检查结果

func DefaultHealthCheck

func DefaultHealthCheck(store ChatAppMessageStore) *HealthCheckResult

DefaultHealthCheck 执行默认健康检查

type HealthChecker

type HealthChecker interface {
	HealthCheck(ctx context.Context) (*HealthCheckResult, error)
}

HealthChecker 健康检查接口

type MemoryFactory

type MemoryFactory struct{}

func (*MemoryFactory) Create

func (f *MemoryFactory) Create(config PluginConfig) (ChatAppMessageStore, error)

type MemoryStorage

type MemoryStorage struct {
	// contains filtered or unexported fields
}

MemoryStorage 内存存储实现

func (*MemoryStorage) Close

func (m *MemoryStorage) Close() error

func (*MemoryStorage) Count

func (m *MemoryStorage) Count(ctx context.Context, query *MessageQuery) (int64, error)

func (*MemoryStorage) DeleteSession

func (m *MemoryStorage) DeleteSession(ctx context.Context, chatSessionID string) error

func (*MemoryStorage) Get

func (m *MemoryStorage) Get(ctx context.Context, messageID string) (*ChatAppMessage, error)

func (*MemoryStorage) GetSessionMeta

func (m *MemoryStorage) GetSessionMeta(ctx context.Context, chatSessionID string) (*SessionMeta, error)

func (*MemoryStorage) GetStrategy

func (m *MemoryStorage) GetStrategy() StorageStrategy

func (*MemoryStorage) Initialize

func (m *MemoryStorage) Initialize(ctx context.Context) error

func (*MemoryStorage) List

func (m *MemoryStorage) List(ctx context.Context, query *MessageQuery) ([]*ChatAppMessage, error)

func (*MemoryStorage) ListUserSessions

func (m *MemoryStorage) ListUserSessions(ctx context.Context, platform, userID string) ([]string, error)

func (*MemoryStorage) Name

func (m *MemoryStorage) Name() string

func (*MemoryStorage) SetStrategy

func (m *MemoryStorage) SetStrategy(s StorageStrategy) error

func (*MemoryStorage) StoreBotResponse

func (m *MemoryStorage) StoreBotResponse(ctx context.Context, msg *ChatAppMessage) error

func (*MemoryStorage) StoreUserMessage

func (m *MemoryStorage) StoreUserMessage(ctx context.Context, msg *ChatAppMessage) error

func (*MemoryStorage) Version

func (m *MemoryStorage) Version() string

type MessageQuery

type MessageQuery struct {
	ChatSessionID     string
	ChatUserID        string // 按用户ID过滤
	EngineSessionID   uuid.UUID
	ProviderType      string
	ProviderSessionID string
	StartTime         *time.Time
	EndTime           *time.Time
	MessageTypes      []types.MessageType
	Limit             int
	Offset            int
	Ascending         bool
	IncludeDeleted    bool
}

MessageQuery 消息查询条件

type PluginConfig

type PluginConfig map[string]any

PluginConfig 插件配置

type PluginFactory

type PluginFactory interface {
	Create(config PluginConfig) (ChatAppMessageStore, error)
}

PluginFactory 插件工厂接口

type PluginRegistry

type PluginRegistry struct {
	// contains filtered or unexported fields
}

PluginRegistry 插件注册表

func GlobalRegistry

func GlobalRegistry() *PluginRegistry

func NewPluginRegistry

func NewPluginRegistry() *PluginRegistry

func (*PluginRegistry) Get

func (*PluginRegistry) List

func (r *PluginRegistry) List() []string

func (*PluginRegistry) Register

func (r *PluginRegistry) Register(name string, factory PluginFactory)

type PostgreFactory

type PostgreFactory struct{}

PostgreFactory PostgreSQL 工厂

func (*PostgreFactory) Create

type PostgreSQLConfig

type PostgreSQLConfig struct {
	Host         string
	Port         int
	User         string
	Password     string
	Database     string
	SSLMode      string
	MaxOpenConns int
	MaxIdleConns int
	MaxLifetime  time.Duration
}

PostgreSQLConfig PostgreSQL 配置

type PostgreStorage

type PostgreStorage struct {
	// contains filtered or unexported fields
}

PostgreStorage PostgreSQL 存储实现 (Level 2: Partitioned for 100M+ rows)

func NewPostgreStorage

func NewPostgreStorage(pgConfig PostgreSQLConfig, pluginConfig PluginConfig) (*PostgreStorage, error)

NewPostgreStorage 创建 PostgreSQL 存储

func (*PostgreStorage) Close

func (p *PostgreStorage) Close() error

Close 关闭数据库连接

func (*PostgreStorage) Count

func (p *PostgreStorage) Count(ctx context.Context, query *MessageQuery) (int64, error)

Count 统计消息数量

func (*PostgreStorage) DeleteSession

func (p *PostgreStorage) DeleteSession(ctx context.Context, chatSessionID string) error

DeleteSession 删除会话

func (*PostgreStorage) Get

func (p *PostgreStorage) Get(ctx context.Context, messageID string) (*ChatAppMessage, error)

Get 根据 ID 获取消息

func (*PostgreStorage) GetSessionMeta

func (p *PostgreStorage) GetSessionMeta(ctx context.Context, chatSessionID string) (*SessionMeta, error)

GetSessionMeta 获取会话元数据

func (*PostgreStorage) GetStrategy

func (p *PostgreStorage) GetStrategy() StorageStrategy

GetStrategy 获取存储策略

func (*PostgreStorage) Initialize

func (p *PostgreStorage) Initialize(ctx context.Context) error

Initialize 初始化数据库表结构

func (*PostgreStorage) List

func (p *PostgreStorage) List(ctx context.Context, query *MessageQuery) ([]*ChatAppMessage, error)

List 查询消息列表

func (*PostgreStorage) ListUserSessions

func (p *PostgreStorage) ListUserSessions(ctx context.Context, platform, userID string) ([]string, error)

ListUserSessions 列出用户的所有会话

func (*PostgreStorage) Name

func (p *PostgreStorage) Name() string

Name 返回存储名称

func (*PostgreStorage) SetStrategy

func (p *PostgreStorage) SetStrategy(s StorageStrategy) error

SetStrategy 设置存储策略

func (*PostgreStorage) StoreBotResponse

func (p *PostgreStorage) StoreBotResponse(ctx context.Context, msg *ChatAppMessage) error

StoreBotResponse 存储机器人响应

func (*PostgreStorage) StoreUserMessage

func (p *PostgreStorage) StoreUserMessage(ctx context.Context, msg *ChatAppMessage) error

StoreUserMessage 存储用户消息

func (*PostgreStorage) Version

func (p *PostgreStorage) Version() string

Version 返回版本

type ReadOnlyStore

type ReadOnlyStore interface {
	Get(ctx context.Context, messageID string) (*ChatAppMessage, error)
	List(ctx context.Context, query *MessageQuery) ([]*ChatAppMessage, error)
	Count(ctx context.Context, query *MessageQuery) (int64, error)
}

ReadOnlyStore 只读接口 (ISP)

type SQLiteConfig

type SQLiteConfig struct {
	Path      string `json:"path"`
	MaxSizeMB int    `json:"max_size_mb"`
}

SQLiteConfig SQLite 配置

type SQLiteFactory

type SQLiteFactory struct{}

func (*SQLiteFactory) Create

func (f *SQLiteFactory) Create(config PluginConfig) (ChatAppMessageStore, error)

type SQLiteStorage

type SQLiteStorage struct {
	// contains filtered or unexported fields
}

SQLiteStorage SQLite 存储实现

func (*SQLiteStorage) Close

func (s *SQLiteStorage) Close() error

func (*SQLiteStorage) Count

func (s *SQLiteStorage) Count(ctx context.Context, query *MessageQuery) (int64, error)

func (*SQLiteStorage) DeleteSession

func (s *SQLiteStorage) DeleteSession(ctx context.Context, chatSessionID string) error

func (*SQLiteStorage) Get

func (s *SQLiteStorage) Get(ctx context.Context, messageID string) (*ChatAppMessage, error)

func (*SQLiteStorage) GetSessionMeta

func (s *SQLiteStorage) GetSessionMeta(ctx context.Context, chatSessionID string) (*SessionMeta, error)

func (*SQLiteStorage) GetStrategy

func (s *SQLiteStorage) GetStrategy() StorageStrategy

func (*SQLiteStorage) Initialize

func (s *SQLiteStorage) Initialize(ctx context.Context) error

func (*SQLiteStorage) List

func (s *SQLiteStorage) List(ctx context.Context, query *MessageQuery) ([]*ChatAppMessage, error)

func (*SQLiteStorage) ListUserSessions

func (s *SQLiteStorage) ListUserSessions(ctx context.Context, platform, userID string) ([]string, error)

func (*SQLiteStorage) Name

func (s *SQLiteStorage) Name() string

func (*SQLiteStorage) SetStrategy

func (s *SQLiteStorage) SetStrategy(st StorageStrategy) error

func (*SQLiteStorage) StoreBotResponse

func (s *SQLiteStorage) StoreBotResponse(ctx context.Context, msg *ChatAppMessage) error

func (*SQLiteStorage) StoreUserMessage

func (s *SQLiteStorage) StoreUserMessage(ctx context.Context, msg *ChatAppMessage) error

func (*SQLiteStorage) Version

func (s *SQLiteStorage) Version() string

type SessionMeta

type SessionMeta struct {
	ChatSessionID string
	ChatPlatform  string
	ChatUserID    string
	LastMessageID string
	LastMessageAt time.Time
	MessageCount  int64
	UpdatedAt     time.Time
}

SessionMeta 会话元数据

type SessionStore

type SessionStore interface {
	GetSessionMeta(ctx context.Context, chatSessionID string) (*SessionMeta, error)
	ListUserSessions(ctx context.Context, platform, userID string) ([]string, error)
	DeleteSession(ctx context.Context, chatSessionID string) error
}

SessionStore 会话管理接口

type StorageConfig

type StorageConfig struct {
	Enabled    bool             `json:"enabled"`
	Type       string           `json:"type"`
	SQLite     SQLiteConfig     `json:"sqlite"`
	PostgreSQL PostgreSQLConfig `json:"postgres"`
	Strategy   string           `json:"strategy"`
	Streaming  StreamingConfig  `json:"streaming"`
}

StorageConfig 存储配置

type StorageError

type StorageError struct {
	Code    string
	Message string
	Err     error
}

StorageError 存储错误结构

func NewStorageError

func NewStorageError(code, message string, err error) *StorageError

NewStorageError 创建新的存储错误

func (*StorageError) Error

func (e *StorageError) Error() string

func (*StorageError) Unwrap

func (e *StorageError) Unwrap() error

type StorageMetrics

type StorageMetrics struct {
	TotalMessages    int64         `json:"total_messages"`
	TotalSessions    int64         `json:"total_sessions"`
	StorageSizeBytes int64         `json:"storage_size_bytes"`
	Uptime           time.Duration `json:"uptime"`
}

StorageMetrics 存储指标

func GetMetrics

func GetMetrics(store ChatAppMessageStore) (*StorageMetrics, error)

GetMetrics 获取存储指标

type StorageStrategy

type StorageStrategy interface {
	ShouldStore(msg *ChatAppMessage) bool
	BeforeStore(ctx context.Context, msg *ChatAppMessage) error
	AfterStore(ctx context.Context, msg *ChatAppMessage) error
}

StorageStrategy 存储策略接口 (OCP)

type StreamingConfig

type StreamingConfig struct {
	Enabled       bool   `json:"enabled"`
	BufferSize    int    `json:"buffer_size"`
	TimeoutSec    int    `json:"timeout_seconds"`
	StoragePolicy string `json:"storage_policy"`
}

StreamingConfig 流式配置

type WriteOnlyStore

type WriteOnlyStore interface {
	StoreUserMessage(ctx context.Context, msg *ChatAppMessage) error
	StoreBotResponse(ctx context.Context, msg *ChatAppMessage) error
}

WriteOnlyStore 只写接口 (ISP)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL