Documentation
¶
Overview ¶
Package persistence 提供代理状态持久化存储抽象与实现。
Index ¶
- Variables
- type AsyncTask
- func (t *AsyncTask) Duration() time.Duration
- func (t *AsyncTask) IsRecoverable() bool
- func (t *AsyncTask) IsTerminal() bool
- func (t *AsyncTask) IsTimedOut() bool
- func (t *AsyncTask) MarshalJSON() ([]byte, error)
- func (t *AsyncTask) ShouldRetry() bool
- func (t *AsyncTask) UnmarshalJSON(data []byte) error
- type CleanupConfig
- type FileMessageStore
- func (s *FileMessageStore) AckMessage(ctx context.Context, msgID string) error
- func (s *FileMessageStore) Cleanup(ctx context.Context, olderThan time.Duration) (int, error)
- func (s *FileMessageStore) Close() error
- func (s *FileMessageStore) DeleteMessage(ctx context.Context, msgID string) error
- func (s *FileMessageStore) GetMessage(ctx context.Context, msgID string) (*Message, error)
- func (s *FileMessageStore) GetMessages(ctx context.Context, topic string, cursor string, limit int) ([]*Message, string, error)
- func (s *FileMessageStore) GetPendingMessages(ctx context.Context, topic string, limit int) ([]*Message, error)
- func (s *FileMessageStore) GetUnackedMessages(ctx context.Context, topic string, olderThan time.Duration) ([]*Message, error)
- func (s *FileMessageStore) IncrementRetry(ctx context.Context, msgID string) error
- func (s *FileMessageStore) Ping(ctx context.Context) error
- func (s *FileMessageStore) SaveMessage(ctx context.Context, msg *Message) error
- func (s *FileMessageStore) SaveMessages(ctx context.Context, msgs []*Message) error
- func (s *FileMessageStore) Stats(ctx context.Context) (*MessageStoreStats, error)
- type FileTaskStore
- func (s *FileTaskStore) Cleanup(ctx context.Context, olderThan time.Duration) (int, error)
- func (s *FileTaskStore) Close() error
- func (s *FileTaskStore) DeleteTask(ctx context.Context, taskID string) error
- func (s *FileTaskStore) GetRecoverableTasks(ctx context.Context) ([]*AsyncTask, error)
- func (s *FileTaskStore) GetTask(ctx context.Context, taskID string) (*AsyncTask, error)
- func (s *FileTaskStore) ListTasks(ctx context.Context, filter TaskFilter) ([]*AsyncTask, error)
- func (s *FileTaskStore) Ping(ctx context.Context) error
- func (s *FileTaskStore) SaveTask(ctx context.Context, task *AsyncTask) error
- func (s *FileTaskStore) Stats(ctx context.Context) (*TaskStoreStats, error)
- func (s *FileTaskStore) UpdateProgress(ctx context.Context, taskID string, progress float64) error
- func (s *FileTaskStore) UpdateStatus(ctx context.Context, taskID string, status TaskStatus, result any, ...) error
- type MemoryMessageStore
- func (s *MemoryMessageStore) AckMessage(ctx context.Context, msgID string) error
- func (s *MemoryMessageStore) Cleanup(ctx context.Context, olderThan time.Duration) (int, error)
- func (s *MemoryMessageStore) Close() error
- func (s *MemoryMessageStore) DeleteMessage(ctx context.Context, msgID string) error
- func (s *MemoryMessageStore) GetMessage(ctx context.Context, msgID string) (*Message, error)
- func (s *MemoryMessageStore) GetMessages(ctx context.Context, topic string, cursor string, limit int) ([]*Message, string, error)
- func (s *MemoryMessageStore) GetPendingMessages(ctx context.Context, topic string, limit int) ([]*Message, error)
- func (s *MemoryMessageStore) GetUnackedMessages(ctx context.Context, topic string, olderThan time.Duration) ([]*Message, error)
- func (s *MemoryMessageStore) IncrementRetry(ctx context.Context, msgID string) error
- func (s *MemoryMessageStore) Ping(ctx context.Context) error
- func (s *MemoryMessageStore) SaveMessage(ctx context.Context, msg *Message) error
- func (s *MemoryMessageStore) SaveMessages(ctx context.Context, msgs []*Message) error
- func (s *MemoryMessageStore) Stats(ctx context.Context) (*MessageStoreStats, error)
- type MemoryTaskStore
- func (s *MemoryTaskStore) Cleanup(ctx context.Context, olderThan time.Duration) (int, error)
- func (s *MemoryTaskStore) Close() error
- func (s *MemoryTaskStore) DeleteTask(ctx context.Context, taskID string) error
- func (s *MemoryTaskStore) GetRecoverableTasks(ctx context.Context) ([]*AsyncTask, error)
- func (s *MemoryTaskStore) GetTask(ctx context.Context, taskID string) (*AsyncTask, error)
- func (s *MemoryTaskStore) ListTasks(ctx context.Context, filter TaskFilter) ([]*AsyncTask, error)
- func (s *MemoryTaskStore) Ping(ctx context.Context) error
- func (s *MemoryTaskStore) SaveTask(ctx context.Context, task *AsyncTask) error
- func (s *MemoryTaskStore) Stats(ctx context.Context) (*TaskStoreStats, error)
- func (s *MemoryTaskStore) UpdateProgress(ctx context.Context, taskID string, progress float64) error
- func (s *MemoryTaskStore) UpdateStatus(ctx context.Context, taskID string, status TaskStatus, result any, ...) error
- type Message
- type MessageFilter
- type MessageStatus
- type MessageStore
- type MessageStoreStats
- type RedisMessageStore
- func (s *RedisMessageStore) AckMessage(ctx context.Context, msgID string) error
- func (s *RedisMessageStore) Cleanup(ctx context.Context, olderThan time.Duration) (int, error)
- func (s *RedisMessageStore) Close() error
- func (s *RedisMessageStore) DeleteMessage(ctx context.Context, msgID string) error
- func (s *RedisMessageStore) GetMessage(ctx context.Context, msgID string) (*Message, error)
- func (s *RedisMessageStore) GetMessages(ctx context.Context, topic string, cursor string, limit int) ([]*Message, string, error)
- func (s *RedisMessageStore) GetPendingMessages(ctx context.Context, topic string, limit int) ([]*Message, error)
- func (s *RedisMessageStore) GetUnackedMessages(ctx context.Context, topic string, olderThan time.Duration) ([]*Message, error)
- func (s *RedisMessageStore) IncrementRetry(ctx context.Context, msgID string) error
- func (s *RedisMessageStore) Ping(ctx context.Context) error
- func (s *RedisMessageStore) SaveMessage(ctx context.Context, msg *Message) error
- func (s *RedisMessageStore) SaveMessages(ctx context.Context, msgs []*Message) error
- func (s *RedisMessageStore) Stats(ctx context.Context) (*MessageStoreStats, error)
- type RedisStoreConfig
- type RedisTaskStore
- func (s *RedisTaskStore) Cleanup(ctx context.Context, olderThan time.Duration) (int, error)
- func (s *RedisTaskStore) Close() error
- func (s *RedisTaskStore) DeleteTask(ctx context.Context, taskID string) error
- func (s *RedisTaskStore) GetRecoverableTasks(ctx context.Context) ([]*AsyncTask, error)
- func (s *RedisTaskStore) GetTask(ctx context.Context, taskID string) (*AsyncTask, error)
- func (s *RedisTaskStore) ListTasks(ctx context.Context, filter TaskFilter) ([]*AsyncTask, error)
- func (s *RedisTaskStore) Ping(ctx context.Context) error
- func (s *RedisTaskStore) SaveTask(ctx context.Context, task *AsyncTask) error
- func (s *RedisTaskStore) Stats(ctx context.Context) (*TaskStoreStats, error)
- func (s *RedisTaskStore) UpdateProgress(ctx context.Context, taskID string, progress float64) error
- func (s *RedisTaskStore) UpdateStatus(ctx context.Context, taskID string, status TaskStatus, result any, ...) error
- type RetryConfig
- type Serializable
- type Store
- type StoreConfig
- type StoreType
- type TaskEvent
- type TaskEventType
- type TaskFilter
- type TaskStatus
- type TaskStore
- type TaskStoreStats
Constants ¶
This section is empty.
Variables ¶
var ( ErrNotFound = errors.New("not found") ErrAlreadyExists = errors.New("already exists") ErrStoreClosed = errors.New("store is closed") ErrInvalidInput = errors.New("invalid input") )
常见错误
Functions ¶
This section is empty.
Types ¶
type AsyncTask ¶
type AsyncTask struct {
// ID 是任务的唯一标识符
ID string `json:"id"`
// 会话ID 是此任务所属的会话
SessionID string `json:"session_id,omitempty"`
// AgentID 是执行此任务的代理
AgentID string `json:"agent_id"`
// 类型是任务类型
Type string `json:"type"`
// 状态是当前任务状态
Status TaskStatus `json:"status"`
// 输入包含任务输入数据
Input map[string]any `json:"input,omitempty"`
// 结果包含任务结果(完成后)
Result any `json:"result,omitempty"`
// 错误包含错误消息( 当失败时)
Error string `json:"error,omitempty"`
// 进展是任务进展(0-100)
Progress float64 `json:"progress"`
// 优先是任务优先(较高=更重要)
Priority int `json:"priority"`
// CreatedAt 是任务创建时
CreatedAt time.Time `json:"created_at"`
// 更新到上次更新任务时
UpdatedAt time.Time `json:"updated_at"`
// 开始是任务开始执行时
StartedAt *time.Time `json:"started_at,omitempty"`
// 任务完成时
CompletedAt *time.Time `json:"completed_at,omitempty"`
// 超时是任务超时的期限
Timeout time.Duration `json:"timeout,omitempty"`
// 重试请求是重试尝试的次数
RetryCount int `json:"retry_count"`
// MaxRetries 是允许的最大重试次数
MaxRetries int `json:"max_retries"`
// 元数据包含额外的任务元数据
Metadata map[string]string `json:"metadata,omitempty"`
// 父任务ID 是父任务ID( 用于子任务)
ParentTaskID string `json:"parent_task_id,omitempty"`
// 儿童任务ID是儿童任务ID
ChildTaskIDs []string `json:"child_task_ids,omitempty"`
}
AsyncTask 代表一个持久的同步任务
func (*AsyncTask) UnmarshalJSON ¶
UnmarshalJSON 执行json。 解马沙勒
type CleanupConfig ¶
type CleanupConfig struct {
// 启用后确定是否启用自动清理
Enabled bool `json:"enabled" yaml:"enabled"`
// 间断是清理运行的频率( 默认:1 h)
Interval time.Duration `json:"interval" yaml:"interval"`
// 保留信件是保留已确认信件的时间( 默认为 1h)
MessageRetention time.Duration `json:"message_retention" yaml:"message_retention"`
// 任务保留是保存已完成任务的时间( 默认: 24h)
TaskRetention time.Duration `json:"task_retention" yaml:"task_retention"`
}
清理Config 为已完成的任务和旧消息定义清理行为
type FileMessageStore ¶
type FileMessageStore struct {
// contains filtered or unexported fields
}
FileMessageStore是基于文件执行的MessageStore. 适合单节点生产部署.
func NewFileMessageStore ¶
func NewFileMessageStore(config StoreConfig) (*FileMessageStore, error)
NewFileMessageStore 创建一个新的基于文件的信息存储
func (*FileMessageStore) AckMessage ¶
func (s *FileMessageStore) AckMessage(ctx context.Context, msgID string) error
AckMessage 是一个被承认的信息
func (*FileMessageStore) DeleteMessage ¶
func (s *FileMessageStore) DeleteMessage(ctx context.Context, msgID string) error
删除信件从存储处删除
func (*FileMessageStore) GetMessage ¶
通过 ID 获取信件
func (*FileMessageStore) GetMessages ¶
func (s *FileMessageStore) GetMessages(ctx context.Context, topic string, cursor string, limit int) ([]*Message, string, error)
GetMessages 获取带有 pagination 主题的信息
func (*FileMessageStore) GetPendingMessages ¶
func (s *FileMessageStore) GetPendingMessages(ctx context.Context, topic string, limit int) ([]*Message, error)
GetPendingMessages 检索需要发送的信件
func (*FileMessageStore) GetUnackedMessages ¶
func (s *FileMessageStore) GetUnackedMessages(ctx context.Context, topic string, olderThan time.Duration) ([]*Message, error)
获取未保存的邮件获取未确认的比指定时间长的信件
func (*FileMessageStore) IncrementRetry ¶
func (s *FileMessageStore) IncrementRetry(ctx context.Context, msgID string) error
递增
func (*FileMessageStore) Ping ¶
func (s *FileMessageStore) Ping(ctx context.Context) error
平平检查,如果商店是健康的
func (*FileMessageStore) SaveMessage ¶
func (s *FileMessageStore) SaveMessage(ctx context.Context, msg *Message) error
保存信件坚持一个消息
func (*FileMessageStore) SaveMessages ¶
func (s *FileMessageStore) SaveMessages(ctx context.Context, msgs []*Message) error
保存消息在解剖上持续了多个消息
func (*FileMessageStore) Stats ¶
func (s *FileMessageStore) Stats(ctx context.Context) (*MessageStoreStats, error)
Stats 返回关于消息库的统计数据
type FileTaskStore ¶
type FileTaskStore struct {
// contains filtered or unexported fields
}
FileTaskStore是一个基于文件的执行"TaskStore". 适合单节点生产部署.
func (*FileTaskStore) DeleteTask ¶
func (s *FileTaskStore) DeleteTask(ctx context.Context, taskID string) error
删除任务从商店中删除任务
func (*FileTaskStore) GetRecoverableTasks ¶
func (s *FileTaskStore) GetRecoverableTasks(ctx context.Context) ([]*AsyncTask, error)
获取可回收的任务检索重启后需要回收的任务
func (*FileTaskStore) ListTasks ¶
func (s *FileTaskStore) ListTasks(ctx context.Context, filter TaskFilter) ([]*AsyncTask, error)
ListTasks 检索匹配过滤标准的任务
func (*FileTaskStore) SaveTask ¶
func (s *FileTaskStore) SaveTask(ctx context.Context, task *AsyncTask) error
保存任务持续执行商店的任务
func (*FileTaskStore) Stats ¶
func (s *FileTaskStore) Stats(ctx context.Context) (*TaskStoreStats, error)
Stats 返回关于任务存储的统计
func (*FileTaskStore) UpdateProgress ¶
更新进度更新任务进度
func (*FileTaskStore) UpdateStatus ¶
func (s *FileTaskStore) UpdateStatus(ctx context.Context, taskID string, status TaskStatus, result any, errMsg string) error
更新状态更新任务状态
type MemoryMessageStore ¶
type MemoryMessageStore struct {
// contains filtered or unexported fields
}
记忆MessageStore是MessageStore的内在执行. 适合开发和测试。 数据在重新启动时丢失 。
func NewMemoryMessageStore ¶
func NewMemoryMessageStore(config StoreConfig) *MemoryMessageStore
新记忆MessageStore 创建了新的记忆信息存储器
func (*MemoryMessageStore) AckMessage ¶
func (s *MemoryMessageStore) AckMessage(ctx context.Context, msgID string) error
AckMessage 是一个被承认的信息
func (*MemoryMessageStore) DeleteMessage ¶
func (s *MemoryMessageStore) DeleteMessage(ctx context.Context, msgID string) error
删除信件从存储处删除
func (*MemoryMessageStore) GetMessage ¶
通过 ID 获取信件
func (*MemoryMessageStore) GetMessages ¶
func (s *MemoryMessageStore) GetMessages(ctx context.Context, topic string, cursor string, limit int) ([]*Message, string, error)
GetMessages 获取带有 pagination 主题的信息
func (*MemoryMessageStore) GetPendingMessages ¶
func (s *MemoryMessageStore) GetPendingMessages(ctx context.Context, topic string, limit int) ([]*Message, error)
GetPendingMessages 检索需要发送的信件
func (*MemoryMessageStore) GetUnackedMessages ¶
func (s *MemoryMessageStore) GetUnackedMessages(ctx context.Context, topic string, olderThan time.Duration) ([]*Message, error)
获取未保存的邮件获取未确认的比指定时间长的信件
func (*MemoryMessageStore) IncrementRetry ¶
func (s *MemoryMessageStore) IncrementRetry(ctx context.Context, msgID string) error
递增
func (*MemoryMessageStore) Ping ¶
func (s *MemoryMessageStore) Ping(ctx context.Context) error
平平检查,如果商店是健康的
func (*MemoryMessageStore) SaveMessage ¶
func (s *MemoryMessageStore) SaveMessage(ctx context.Context, msg *Message) error
保存信件坚持一个消息
func (*MemoryMessageStore) SaveMessages ¶
func (s *MemoryMessageStore) SaveMessages(ctx context.Context, msgs []*Message) error
保存消息在解剖上持续了多个消息
func (*MemoryMessageStore) Stats ¶
func (s *MemoryMessageStore) Stats(ctx context.Context) (*MessageStoreStats, error)
Stats 返回关于消息库的统计数据
type MemoryTaskStore ¶
type MemoryTaskStore struct {
// contains filtered or unexported fields
}
MemoryTaskStore是TaskStore的一个内在执行. 适合开发和测试。 数据在重新启动时丢失 。
func (*MemoryTaskStore) DeleteTask ¶
func (s *MemoryTaskStore) DeleteTask(ctx context.Context, taskID string) error
删除任务从商店中删除任务
func (*MemoryTaskStore) GetRecoverableTasks ¶
func (s *MemoryTaskStore) GetRecoverableTasks(ctx context.Context) ([]*AsyncTask, error)
获取可回收的任务检索重启后需要回收的任务
func (*MemoryTaskStore) ListTasks ¶
func (s *MemoryTaskStore) ListTasks(ctx context.Context, filter TaskFilter) ([]*AsyncTask, error)
ListTasks 检索匹配过滤标准的任务
func (*MemoryTaskStore) Ping ¶
func (s *MemoryTaskStore) Ping(ctx context.Context) error
平平检查,如果商店是健康的
func (*MemoryTaskStore) SaveTask ¶
func (s *MemoryTaskStore) SaveTask(ctx context.Context, task *AsyncTask) error
保存任务持续执行商店的任务
func (*MemoryTaskStore) Stats ¶
func (s *MemoryTaskStore) Stats(ctx context.Context) (*TaskStoreStats, error)
Stats 返回关于任务存储的统计
func (*MemoryTaskStore) UpdateProgress ¶
func (s *MemoryTaskStore) UpdateProgress(ctx context.Context, taskID string, progress float64) error
更新进度更新任务进度
func (*MemoryTaskStore) UpdateStatus ¶
func (s *MemoryTaskStore) UpdateStatus(ctx context.Context, taskID string, status TaskStatus, result any, errMsg string) error
更新状态更新任务状态
type Message ¶
type Message struct {
// ID 是信件的唯一标识符
ID string `json:"id"`
// 题目是信息主题/频道
Topic string `json:"topic"`
// FromID 是发送代理 ID
FromID string `json:"from_id"`
// ToID 是接收代理ID( 空来播放)
ToID string `json:"to_id,omitempty"`
// 类型是信件类型(提议、回应、表决等)
Type string `json:"type"`
// 内容是信件内容
Content string `json:"content"`
// 有效载荷包含额外的结构化数据
Payload map[string]any `json:"payload,omitempty"`
// 元数据包含信件元数据
Metadata map[string]string `json:"metadata,omitempty"`
// 创建到信件创建时
CreatedAt time.Time `json:"created_at"`
// AckedAt是消息被承认的时候(即使没有被承认也没有)
AckedAt *time.Time `json:"acked_at,omitempty"`
// 重试( Rettry Count) 是送货尝试的次数
RetryCount int `json:"retry_count"`
// Last RetryAt 是上次尝试重试时
LastRetryAt *time.Time `json:"last_retry_at,omitempty"`
// 过期是信件过期时( 可选)
ExpiresAt *time.Time `json:"expires_at,omitempty"`
}
信件代表系统中的持久信息
func (*Message) NextRetryTime ¶
func (m *Message) NextRetryTime(config RetryConfig) time.Time
下次重试时计算
func (*Message) UnmarshalJSON ¶
UnmarshalJSON 执行json。 解马沙勒
type MessageFilter ¶
type MessageFilter struct {
// 按主题划分的专题过滤器
Topic string `json:"topic,omitempty"`
// 发送者从ID中过滤
FromID string `json:"from_id,omitempty"`
// 收件人的 ToID 过滤器
ToID string `json:"to_id,omitempty"`
// 按信件类型输入过滤器
Type string `json:"type,omitempty"`
// 通过承认状态进行状态过滤
Status MessageStatus `json:"status,omitempty"`
// 在此时间之后创建过滤信件
CreatedAfter *time.Time `json:"created_after,omitempty"`
// 在此之前创建过滤信件
CreatedBefore *time.Time `json:"created_before,omitempty"`
// 限定要返回的信件的最大数量
Limit int `json:"limit,omitempty"`
// 偏移为要跳过的信件数量
Offset int `json:"offset,omitempty"`
}
MessageFilter 定义过滤信件的标准
type MessageStatus ¶
type MessageStatus string
信件状态代表信件状态
const ( // 信件状态显示信件正在等待处理 MessageStatusPending MessageStatus = "pending" // 信件状态显示信件已被确认 MessageStatusAcked MessageStatus = "acked" // 信件状态已过期 。 MessageStatusExpired MessageStatus = "expired" // 信件状态失败, 表示信件在最大重试后失败 MessageStatusFailed MessageStatus = "failed" )
type MessageStore ¶
type MessageStore interface {
Store
// 保存Message 坚持给商店的单个消息
SaveMessage(ctx context.Context, msg *Message) error
// 保存消息在解剖上持续了多个消息
SaveMessages(ctx context.Context, msgs []*Message) error
// 通过 ID 获取信件
GetMessage(ctx context.Context, msgID string) (*Message, error)
// GetMessages 获取带有 pagination 主题的信息
// 返回信件、 下个光标和错误
GetMessages(ctx context.Context, topic string, cursor string, limit int) ([]*Message, string, error)
// AckMessage 标记已确认/处理的信息
AckMessage(ctx context.Context, msgID string) error
// 获取未保存的邮件获取未确认的比指定时间长的信件
// 这些留言是重试的候选人
GetUnackedMessages(ctx context.Context, topic string, olderThan time.Duration) ([]*Message, error)
// GetPendingMessages 检索需要发送的信件
// 这包括需要重试的新信件和信件
GetPendingMessages(ctx context.Context, topic string, limit int) ([]*Message, error)
// 递增
IncrementRetry(ctx context.Context, msgID string) error
// 删除信件从存储处删除
DeleteMessage(ctx context.Context, msgID string) error
// 清理删除旧消息
Cleanup(ctx context.Context, olderThan time.Duration) (int, error)
// Stats 返回关于消息库的统计数据
Stats(ctx context.Context) (*MessageStoreStats, error)
}
MessageStore定义了信件持久性的界面. 它提供可靠的信息传送,并附有确认和重新测试支持。
func MustNewMessageStore ¶
func MustNewMessageStore(config StoreConfig) MessageStore
Must NewMessageStore 创建了新信件存储器或对错误的恐慌 。
警告:此功能只应在应用程序初始化时使用 (例如,主要()或init())。 在请求处理器或业务逻辑中使用恐慌 强烈劝阻。 对于运行时商店创建,取而代之的是使用"NewMessageStore".
示例用法:
func 主 () {
存储 := 持久性. Must NewMessageStore(配置) // OK - 初始化
// ...
}
func NewMessageStore ¶
func NewMessageStore(config StoreConfig) (MessageStore, error)
新MessageStore 创建基于配置的新信件系统
func NewMessageStoreOrExit ¶
func NewMessageStoreOrExit(config StoreConfig) MessageStore
NewMessageStore OrExit 创建了新的信件存储器,或在错误时退出程序. 这是用于CLI应用的MustNewMessageStore的更安全的替代品.
type MessageStoreStats ¶
type MessageStoreStats struct {
// TotalMessages 为商店中信件的总数
TotalMessages int64 `json:"total_messages"`
// 未决信件是未确认信件的数量
PendingMessages int64 `json:"pending_messages"`
// AckedMessages 是确认消息的数量
AckedMessages int64 `json:"acked_messages"`
// 过期信件是过期信件的数量
ExpiredMessages int64 `json:"expired_messages"`
// 主题计数为每个主题的信息数
TopicCounts map[string]int64 `json:"topic_counts"`
// 最老的PendingAge是最老的待发消息的年龄
OldestPendingAge time.Duration `json:"oldest_pending_age"`
}
信件Stats 包含关于信件存储的统计数据
type RedisMessageStore ¶
type RedisMessageStore struct {
// contains filtered or unexported fields
}
RedisMessageStore是一个基于Redis的MessageStore执行. 适合分布式生产部署. 使用 Redis Streams 在消费组的支持下存储消息.
func NewRedisMessageStore ¶
func NewRedisMessageStore(config StoreConfig) (*RedisMessageStore, error)
NewRedisMessageStore 创建一个新的基于 Redis 的信息存储
func (*RedisMessageStore) AckMessage ¶
func (s *RedisMessageStore) AckMessage(ctx context.Context, msgID string) error
AckMessage 是一个被承认的信息
func (*RedisMessageStore) DeleteMessage ¶
func (s *RedisMessageStore) DeleteMessage(ctx context.Context, msgID string) error
删除信件从存储处删除
func (*RedisMessageStore) GetMessage ¶
通过 ID 获取信件
func (*RedisMessageStore) GetMessages ¶
func (s *RedisMessageStore) GetMessages(ctx context.Context, topic string, cursor string, limit int) ([]*Message, string, error)
GetMessages 获取带有 pagination 主题的信息
func (*RedisMessageStore) GetPendingMessages ¶
func (s *RedisMessageStore) GetPendingMessages(ctx context.Context, topic string, limit int) ([]*Message, error)
GetPendingMessages 检索需要发送的信件
func (*RedisMessageStore) GetUnackedMessages ¶
func (s *RedisMessageStore) GetUnackedMessages(ctx context.Context, topic string, olderThan time.Duration) ([]*Message, error)
获取未保存的邮件获取未确认的比指定时间长的信件
func (*RedisMessageStore) IncrementRetry ¶
func (s *RedisMessageStore) IncrementRetry(ctx context.Context, msgID string) error
递增
func (*RedisMessageStore) Ping ¶
func (s *RedisMessageStore) Ping(ctx context.Context) error
平平检查,如果商店是健康的
func (*RedisMessageStore) SaveMessage ¶
func (s *RedisMessageStore) SaveMessage(ctx context.Context, msg *Message) error
保存信件坚持一个消息
func (*RedisMessageStore) SaveMessages ¶
func (s *RedisMessageStore) SaveMessages(ctx context.Context, msgs []*Message) error
保存消息在解剖上持续了多个消息
func (*RedisMessageStore) Stats ¶
func (s *RedisMessageStore) Stats(ctx context.Context) (*MessageStoreStats, error)
Stats 返回关于消息库的统计数据
type RedisStoreConfig ¶
type RedisStoreConfig struct {
// 主机是 Redis 服务器主机
Host string `json:"host" yaml:"host"`
// 端口是 Redis 服务器端口
Port int `json:"port" yaml:"port"`
// 密码是 Redis 密码( 可选)
Password string `json:"password" yaml:"password"`
// DB 为 Redis 数据库编号
DB int `json:"db" yaml:"db"`
// PoolSize 是连接池大小
PoolSize int `json:"pool_size" yaml:"pool_size"`
// 密钥前缀是所有 Redis 密钥的前缀
KeyPrefix string `json:"key_prefix" yaml:"key_prefix"`
}
RedisStore Config 包含 Redis 特定配置
type RedisTaskStore ¶
type RedisTaskStore struct {
// contains filtered or unexported fields
}
RedisTaskStore是一个基于Redis的"TaskStore"执行. 适合分布式生产部署. 使用 Redis Hash 来进行任务存储, 并排序集进行索引 。
func NewRedisTaskStore ¶
func NewRedisTaskStore(config StoreConfig) (*RedisTaskStore, error)
新建基于 Redis 的任务库
func (*RedisTaskStore) DeleteTask ¶
func (s *RedisTaskStore) DeleteTask(ctx context.Context, taskID string) error
删除任务从商店中删除任务
func (*RedisTaskStore) GetRecoverableTasks ¶
func (s *RedisTaskStore) GetRecoverableTasks(ctx context.Context) ([]*AsyncTask, error)
获取可回收的任务检索重启后需要回收的任务
func (*RedisTaskStore) ListTasks ¶
func (s *RedisTaskStore) ListTasks(ctx context.Context, filter TaskFilter) ([]*AsyncTask, error)
ListTasks 检索匹配过滤标准的任务
func (*RedisTaskStore) Ping ¶
func (s *RedisTaskStore) Ping(ctx context.Context) error
平平检查,如果商店是健康的
func (*RedisTaskStore) SaveTask ¶
func (s *RedisTaskStore) SaveTask(ctx context.Context, task *AsyncTask) error
保存任务持续执行商店的任务
func (*RedisTaskStore) Stats ¶
func (s *RedisTaskStore) Stats(ctx context.Context) (*TaskStoreStats, error)
Stats 返回关于任务存储的统计
func (*RedisTaskStore) UpdateProgress ¶
更新进度更新任务进度
func (*RedisTaskStore) UpdateStatus ¶
func (s *RedisTaskStore) UpdateStatus(ctx context.Context, taskID string, status TaskStatus, result any, errMsg string) error
更新状态更新任务状态
type RetryConfig ¶
type RetryConfig struct {
// Max Retries 是重试尝试的最大次数( 默认 3)
MaxRetries int `json:"max_retries" yaml:"max_retries"`
// 初始备份是初始备份期限( 默认:1s)
InitialBackoff time.Duration `json:"initial_backoff" yaml:"initial_backoff"`
// MaxBackoff 是最大后退持续时间( 默认: 30s)
MaxBackoff time.Duration `json:"max_backoff" yaml:"max_backoff"`
// 后置倍数是指数后置的乘数( 默认: 2. 0)
BackoffMultiplier float64 `json:"backoff_multiplier" yaml:"backoff_multiplier"`
}
RetryConfig 定义消息发送的再试行为
func DefaultRetryConfig ¶
func DefaultRetryConfig() RetryConfig
默认重试Config 返回默认重试配置 保守策略:最大3个回推,以指数后置 1s/2s/4s
func (RetryConfig) CalculateBackoff ¶
func (c RetryConfig) CalculateBackoff(attempt int) time.Duration
计算 Backoff 计算给定重试的后退持续时间
type Serializable ¶
type Serializable interface {
// 元帅JSON 返回对象的 JSON 编码
json.Marshaler
// UnmarshalJSON 解析 JSON 编码数据
json.Unmarshaler
}
可序列化是可被序列化为JSON的对象的接口
type Store ¶
type Store interface {
// 关闭存储并释放资源
Close() error
// 平平检查,如果商店是健康的
Ping(ctx context.Context) error
}
存储是所有持久存储的基础界面
type StoreConfig ¶
type StoreConfig struct {
// 类型是存储后端类型
Type StoreType `json:"type" yaml:"type"`
// BaseDir 是基于文件存储的基础目录
BaseDir string `json:"base_dir" yaml:"base_dir"`
// Redis 配置( 仅在类型为 “ redis” 时使用)
Redis RedisStoreConfig `json:"redis" yaml:"redis"`
// 重试配置
Retry RetryConfig `json:"retry" yaml:"retry"`
// 清理配置
Cleanup CleanupConfig `json:"cleanup" yaml:"cleanup"`
}
StoreConfig 是所有存储执行的基础配置
type TaskEvent ¶
type TaskEvent struct {
// TaskID 是此事件所属的任务
TaskID string `json:"task_id"`
// 类型是事件类型
Type TaskEventType `json:"type"`
// 旧状态是上一个状态( 对于状态变化事件)
OldStatus TaskStatus `json:"old_status,omitempty"`
// 新状态是新状态(用于改变状态事件)
NewStatus TaskStatus `json:"new_status,omitempty"`
// 进步是进步价值(对于进步活动)
Progress float64 `json:"progress,omitempty"`
// 信件是可选事件消息
Message string `json:"message,omitempty"`
// 时间戳是事件发生时
Timestamp time.Time `json:"timestamp"`
}
TaskEvent 代表任务生命周期中的事件
type TaskEventType ¶
type TaskEventType string
TaskEventType 代表任务事件的类型
const ( // 任务EventCreated 表示创建了任务 TaskEventCreated TaskEventType = "created" // 任务启动( T) 表示任务已经开始执行 TaskEventStarted TaskEventType = "started" // 任务进度显示任务进度已更新 TaskEventProgress TaskEventType = "progress" // 任务Event 已完成 。 TaskEventCompleted TaskEventType = "completed" // 任务Event 失败表示任务失败 TaskEventFailed TaskEventType = "failed" // 任务已取消 TaskEventCancelled TaskEventType = "cancelled" // 任务EventRetry 显示任务正在重试 TaskEventRetry TaskEventType = "retry" // 任务Event 已恢复 。 TaskEventRecovered TaskEventType = "recovered" )
type TaskFilter ¶
type TaskFilter struct {
// 按会话排列的会话ID过滤器
SessionID string `json:"session_id,omitempty"`
// 通过代理代理ID过滤器
AgentID string `json:"agent_id,omitempty"`
// 按任务类型分类的过滤器
Type string `json:"type,omitempty"`
// 按状态进行状态过滤( 可以是多个)
Status []TaskStatus `json:"status,omitempty"`
// 父任务过滤器
ParentTaskID string `json:"parent_task_id,omitempty"`
// 在此次过滤后创建任务
CreatedAfter *time.Time `json:"created_after,omitempty"`
// 在此之前创建过滤器任务
CreatedBefore *time.Time `json:"created_before,omitempty"`
// 限制是返回的最大任务数
Limit int `json:"limit,omitempty"`
// 偏移为要跳过的任务数
Offset int `json:"offset,omitempty"`
// 顺序By 指定排序顺序
OrderBy string `json:"order_by,omitempty"`
// 命令代斯克指定了降序
OrderDesc bool `json:"order_desc,omitempty"`
}
任务 过滤器定义过滤任务的标准
type TaskStatus ¶
type TaskStatus string
任务状态 :
const ( // 任务状态显示任务正在等待执行 TaskStatusPending TaskStatus = "pending" // 任务状态运行显示任务正在执行中 TaskStatusRunning TaskStatus = "running" // 任务状态完成后显示任务成功完成 TaskStatusCompleted TaskStatus = "completed" // 任务状态失败 。 TaskStatusFailed TaskStatus = "failed" // 任务状态已取消 。 TaskStatusCancelled TaskStatus = "cancelled" // 任务状态超时显示任务超时 TaskStatusTimeout TaskStatus = "timeout" )
type TaskStore ¶
type TaskStore interface {
Store
// 保存任务持续到存储( 创建或更新) 。
SaveTask(ctx context.Context, task *AsyncTask) error
// 通过 ID 获取任务
GetTask(ctx context.Context, taskID string) (*AsyncTask, error)
// ListTasks 检索匹配过滤标准的任务
ListTasks(ctx context.Context, filter TaskFilter) ([]*AsyncTask, error)
// 更新状态更新任务状态
UpdateStatus(ctx context.Context, taskID string, status TaskStatus, result any, errMsg string) error
// 更新进度更新任务进度
UpdateProgress(ctx context.Context, taskID string, progress float64) error
// 删除任务从商店中删除任务
DeleteTask(ctx context.Context, taskID string) error
// 获取可回收的任务检索重启后需要回收的任务
// 这包括待决或运行状态中的任务
GetRecoverableTasks(ctx context.Context) ([]*AsyncTask, error)
// 清除完成/ 失败的任务超过指定期限
Cleanup(ctx context.Context, olderThan time.Duration) (int, error)
// Stats 返回关于任务存储的统计
Stats(ctx context.Context) (*TaskStoreStats, error)
}
TaskStore定义了同步任务持久性的接口. 它在服务重启后为任务状态管理提供恢复支持.
func MustNewTaskStore ¶
func MustNewTaskStore(config StoreConfig) TaskStore
MustNewTaskStore 创建了新的 TaskStore 或对错误的恐慌.
警告:此功能只应在应用程序初始化时使用 (例如,主要()或init())。 在请求处理器或业务逻辑中使用恐慌 强烈劝阻。 对于运行时间商店的创建,使用"NewTaskStore"代替.
示例用法:
func 主 () {
存储 := 持久性. MustNewTaskStore(配置) // OK - 初始化
// ...
}
func NewTaskStore ¶
func NewTaskStore(config StoreConfig) (TaskStore, error)
NewTaskStore 创建基于配置的新任务库
func NewTaskStoreOrExit ¶
func NewTaskStoreOrExit(config StoreConfig) TaskStore
NewTaskStoreOrExit 创建了新的 TaskStore 程序,或者在出错时退出程序. 这是用于CLI应用的MustNewTaskStore的更安全的替代品.
type TaskStoreStats ¶
type TaskStoreStats struct {
// 任务总数是存储中的任务总数
TotalTasks int64 `json:"total_tasks"`
// 待决 任务是待决任务的数量
PendingTasks int64 `json:"pending_tasks"`
// 运行 任务是运行中的任务数
RunningTasks int64 `json:"running_tasks"`
// 已完成 任务是已完成任务的数量
CompletedTasks int64 `json:"completed_tasks"`
// 失败的任务是失败的任务数
FailedTasks int64 `json:"failed_tasks"`
// 已取消 任务是已取消的任务的数量
CancelledTasks int64 `json:"cancelled_tasks"`
// 状态计数为每个状态的任务数
StatusCounts map[TaskStatus]int64 `json:"status_counts"`
// AgentCounts 是每个代理的任务数
AgentCounts map[string]int64 `json:"agent_counts"`
// 平均完成任务时间
AverageCompletionTime time.Duration `json:"average_completion_time"`
// 最老的Ping-Age是最老的待决任务年龄
OldestPendingAge time.Duration `json:"oldest_pending_age"`
}
TaskStats 包含关于任务存储的统计数据