persistence

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2026 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNotFound      = errors.New("not found")
	ErrAlreadyExists = errors.New("already exists")
	ErrStoreClosed   = errors.New("store is closed")
	ErrInvalidInput  = errors.New("invalid input")
)

常见错误

Functions

This section is empty.

Types

type AsyncTask

type AsyncTask struct {
	// ID 是任务的唯一标识符
	ID string `json:"id"`

	// 会话ID 是此任务所属的会话
	SessionID string `json:"session_id,omitempty"`

	// AgentID 是执行此任务的代理
	AgentID string `json:"agent_id"`

	// 类型是任务类型
	Type string `json:"type"`

	// 状态是当前任务状态
	Status TaskStatus `json:"status"`

	// 输入包含任务输入数据
	Input map[string]any `json:"input,omitempty"`

	// 结果包含任务结果(完成后)
	Result any `json:"result,omitempty"`

	// 错误包含错误消息( 当失败时)
	Error string `json:"error,omitempty"`

	// 进展是任务进展(0-100)
	Progress float64 `json:"progress"`

	// 优先是任务优先(较高=更重要)
	Priority int `json:"priority"`

	// CreatedAt 是任务创建时
	CreatedAt time.Time `json:"created_at"`

	// 更新到上次更新任务时
	UpdatedAt time.Time `json:"updated_at"`

	// 开始是任务开始执行时
	StartedAt *time.Time `json:"started_at,omitempty"`

	// 任务完成时
	CompletedAt *time.Time `json:"completed_at,omitempty"`

	// 超时是任务超时的期限
	Timeout time.Duration `json:"timeout,omitempty"`

	// 重试请求是重试尝试的次数
	RetryCount int `json:"retry_count"`

	// MaxRetries 是允许的最大重试次数
	MaxRetries int `json:"max_retries"`

	// 元数据包含额外的任务元数据
	Metadata map[string]string `json:"metadata,omitempty"`

	// 父任务ID 是父任务ID( 用于子任务)
	ParentTaskID string `json:"parent_task_id,omitempty"`

	// 儿童任务ID是儿童任务ID
	ChildTaskIDs []string `json:"child_task_ids,omitempty"`
}

AsyncTask 代表一个持久的同步任务

func (*AsyncTask) Duration

func (t *AsyncTask) Duration() time.Duration

持续时间返回任务持续时间( 如果仍在运行, 则返回任务持续时间)

func (*AsyncTask) IsRecoverable

func (t *AsyncTask) IsRecoverable() bool

如果任务在重启后恢复, 可恢复返回为真

func (*AsyncTask) IsTerminal

func (t *AsyncTask) IsTerminal() bool

如果任务处于终端状态, Is Terminal 返回为真

func (*AsyncTask) IsTimedOut

func (t *AsyncTask) IsTimedOut() bool

IsTimedOut 如果任务超过超时返回真

func (*AsyncTask) MarshalJSON

func (t *AsyncTask) MarshalJSON() ([]byte, error)

JSON警长执行JSON。 元目录

func (*AsyncTask) ShouldRetry

func (t *AsyncTask) ShouldRetry() bool

如果任务要重试, 重试是否返回为真

func (*AsyncTask) UnmarshalJSON

func (t *AsyncTask) UnmarshalJSON(data []byte) error

UnmarshalJSON 执行json。 解马沙勒

type CleanupConfig

type CleanupConfig struct {
	// 启用后确定是否启用自动清理
	Enabled bool `json:"enabled" yaml:"enabled"`

	// 间断是清理运行的频率( 默认:1 h)
	Interval time.Duration `json:"interval" yaml:"interval"`

	// 保留信件是保留已确认信件的时间( 默认为 1h)
	MessageRetention time.Duration `json:"message_retention" yaml:"message_retention"`

	// 任务保留是保存已完成任务的时间( 默认: 24h)
	TaskRetention time.Duration `json:"task_retention" yaml:"task_retention"`
}

清理Config 为已完成的任务和旧消息定义清理行为

func DefaultCleanupConfig

func DefaultCleanupConfig() CleanupConfig

默认CleanupConfig 返回默认清理配置

type FileMessageStore

type FileMessageStore struct {
	// contains filtered or unexported fields
}

FileMessageStore是基于文件执行的MessageStore. 适合单节点生产部署.

func NewFileMessageStore

func NewFileMessageStore(config StoreConfig) (*FileMessageStore, error)

NewFileMessageStore 创建一个新的基于文件的信息存储

func (*FileMessageStore) AckMessage

func (s *FileMessageStore) AckMessage(ctx context.Context, msgID string) error

AckMessage 是一个被承认的信息

func (*FileMessageStore) Cleanup

func (s *FileMessageStore) Cleanup(ctx context.Context, olderThan time.Duration) (int, error)

清理删除旧消息

func (*FileMessageStore) Close

func (s *FileMessageStore) Close() error

关闭商店

func (*FileMessageStore) DeleteMessage

func (s *FileMessageStore) DeleteMessage(ctx context.Context, msgID string) error

删除信件从存储处删除

func (*FileMessageStore) GetMessage

func (s *FileMessageStore) GetMessage(ctx context.Context, msgID string) (*Message, error)

通过 ID 获取信件

func (*FileMessageStore) GetMessages

func (s *FileMessageStore) GetMessages(ctx context.Context, topic string, cursor string, limit int) ([]*Message, string, error)

GetMessages 获取带有 pagination 主题的信息

func (*FileMessageStore) GetPendingMessages

func (s *FileMessageStore) GetPendingMessages(ctx context.Context, topic string, limit int) ([]*Message, error)

GetPendingMessages 检索需要发送的信件

func (*FileMessageStore) GetUnackedMessages

func (s *FileMessageStore) GetUnackedMessages(ctx context.Context, topic string, olderThan time.Duration) ([]*Message, error)

获取未保存的邮件获取未确认的比指定时间长的信件

func (*FileMessageStore) IncrementRetry

func (s *FileMessageStore) IncrementRetry(ctx context.Context, msgID string) error

递增

func (*FileMessageStore) Ping

func (s *FileMessageStore) Ping(ctx context.Context) error

平平检查,如果商店是健康的

func (*FileMessageStore) SaveMessage

func (s *FileMessageStore) SaveMessage(ctx context.Context, msg *Message) error

保存信件坚持一个消息

func (*FileMessageStore) SaveMessages

func (s *FileMessageStore) SaveMessages(ctx context.Context, msgs []*Message) error

保存消息在解剖上持续了多个消息

func (*FileMessageStore) Stats

Stats 返回关于消息库的统计数据

type FileTaskStore

type FileTaskStore struct {
	// contains filtered or unexported fields
}

FileTaskStore是一个基于文件的执行"TaskStore". 适合单节点生产部署.

func NewFileTaskStore

func NewFileTaskStore(config StoreConfig) (*FileTaskStore, error)

新建文件任务存储器

func (*FileTaskStore) Cleanup

func (s *FileTaskStore) Cleanup(ctx context.Context, olderThan time.Duration) (int, error)

清除完成/ 失败的任务超过指定期限

func (*FileTaskStore) Close

func (s *FileTaskStore) Close() error

关闭商店

func (*FileTaskStore) DeleteTask

func (s *FileTaskStore) DeleteTask(ctx context.Context, taskID string) error

删除任务从商店中删除任务

func (*FileTaskStore) GetRecoverableTasks

func (s *FileTaskStore) GetRecoverableTasks(ctx context.Context) ([]*AsyncTask, error)

获取可回收的任务检索重启后需要回收的任务

func (*FileTaskStore) GetTask

func (s *FileTaskStore) GetTask(ctx context.Context, taskID string) (*AsyncTask, error)

通过 ID 获取任务

func (*FileTaskStore) ListTasks

func (s *FileTaskStore) ListTasks(ctx context.Context, filter TaskFilter) ([]*AsyncTask, error)

ListTasks 检索匹配过滤标准的任务

func (*FileTaskStore) Ping

func (s *FileTaskStore) Ping(ctx context.Context) error

平平检查,如果商店是健康的

func (*FileTaskStore) SaveTask

func (s *FileTaskStore) SaveTask(ctx context.Context, task *AsyncTask) error

保存任务持续执行商店的任务

func (*FileTaskStore) Stats

func (s *FileTaskStore) Stats(ctx context.Context) (*TaskStoreStats, error)

Stats 返回关于任务存储的统计

func (*FileTaskStore) UpdateProgress

func (s *FileTaskStore) UpdateProgress(ctx context.Context, taskID string, progress float64) error

更新进度更新任务进度

func (*FileTaskStore) UpdateStatus

func (s *FileTaskStore) UpdateStatus(ctx context.Context, taskID string, status TaskStatus, result any, errMsg string) error

更新状态更新任务状态

type MemoryMessageStore

type MemoryMessageStore struct {
	// contains filtered or unexported fields
}

记忆MessageStore是MessageStore的内在执行. 适合开发和测试。 数据在重新启动时丢失 。

func NewMemoryMessageStore

func NewMemoryMessageStore(config StoreConfig) *MemoryMessageStore

新记忆MessageStore 创建了新的记忆信息存储器

func (*MemoryMessageStore) AckMessage

func (s *MemoryMessageStore) AckMessage(ctx context.Context, msgID string) error

AckMessage 是一个被承认的信息

func (*MemoryMessageStore) Cleanup

func (s *MemoryMessageStore) Cleanup(ctx context.Context, olderThan time.Duration) (int, error)

清理删除旧消息

func (*MemoryMessageStore) Close

func (s *MemoryMessageStore) Close() error

关闭商店

func (*MemoryMessageStore) DeleteMessage

func (s *MemoryMessageStore) DeleteMessage(ctx context.Context, msgID string) error

删除信件从存储处删除

func (*MemoryMessageStore) GetMessage

func (s *MemoryMessageStore) GetMessage(ctx context.Context, msgID string) (*Message, error)

通过 ID 获取信件

func (*MemoryMessageStore) GetMessages

func (s *MemoryMessageStore) GetMessages(ctx context.Context, topic string, cursor string, limit int) ([]*Message, string, error)

GetMessages 获取带有 pagination 主题的信息

func (*MemoryMessageStore) GetPendingMessages

func (s *MemoryMessageStore) GetPendingMessages(ctx context.Context, topic string, limit int) ([]*Message, error)

GetPendingMessages 检索需要发送的信件

func (*MemoryMessageStore) GetUnackedMessages

func (s *MemoryMessageStore) GetUnackedMessages(ctx context.Context, topic string, olderThan time.Duration) ([]*Message, error)

获取未保存的邮件获取未确认的比指定时间长的信件

func (*MemoryMessageStore) IncrementRetry

func (s *MemoryMessageStore) IncrementRetry(ctx context.Context, msgID string) error

递增

func (*MemoryMessageStore) Ping

func (s *MemoryMessageStore) Ping(ctx context.Context) error

平平检查,如果商店是健康的

func (*MemoryMessageStore) SaveMessage

func (s *MemoryMessageStore) SaveMessage(ctx context.Context, msg *Message) error

保存信件坚持一个消息

func (*MemoryMessageStore) SaveMessages

func (s *MemoryMessageStore) SaveMessages(ctx context.Context, msgs []*Message) error

保存消息在解剖上持续了多个消息

func (*MemoryMessageStore) Stats

Stats 返回关于消息库的统计数据

type MemoryTaskStore

type MemoryTaskStore struct {
	// contains filtered or unexported fields
}

MemoryTaskStore是TaskStore的一个内在执行. 适合开发和测试。 数据在重新启动时丢失 。

func NewMemoryTaskStore

func NewMemoryTaskStore(config StoreConfig) *MemoryTaskStore

新建记忆任务存储器

func (*MemoryTaskStore) Cleanup

func (s *MemoryTaskStore) Cleanup(ctx context.Context, olderThan time.Duration) (int, error)

清除完成/ 失败的任务超过指定期限

func (*MemoryTaskStore) Close

func (s *MemoryTaskStore) Close() error

关闭商店

func (*MemoryTaskStore) DeleteTask

func (s *MemoryTaskStore) DeleteTask(ctx context.Context, taskID string) error

删除任务从商店中删除任务

func (*MemoryTaskStore) GetRecoverableTasks

func (s *MemoryTaskStore) GetRecoverableTasks(ctx context.Context) ([]*AsyncTask, error)

获取可回收的任务检索重启后需要回收的任务

func (*MemoryTaskStore) GetTask

func (s *MemoryTaskStore) GetTask(ctx context.Context, taskID string) (*AsyncTask, error)

通过 ID 获取任务

func (*MemoryTaskStore) ListTasks

func (s *MemoryTaskStore) ListTasks(ctx context.Context, filter TaskFilter) ([]*AsyncTask, error)

ListTasks 检索匹配过滤标准的任务

func (*MemoryTaskStore) Ping

func (s *MemoryTaskStore) Ping(ctx context.Context) error

平平检查,如果商店是健康的

func (*MemoryTaskStore) SaveTask

func (s *MemoryTaskStore) SaveTask(ctx context.Context, task *AsyncTask) error

保存任务持续执行商店的任务

func (*MemoryTaskStore) Stats

Stats 返回关于任务存储的统计

func (*MemoryTaskStore) UpdateProgress

func (s *MemoryTaskStore) UpdateProgress(ctx context.Context, taskID string, progress float64) error

更新进度更新任务进度

func (*MemoryTaskStore) UpdateStatus

func (s *MemoryTaskStore) UpdateStatus(ctx context.Context, taskID string, status TaskStatus, result any, errMsg string) error

更新状态更新任务状态

type Message

type Message struct {
	// ID 是信件的唯一标识符
	ID string `json:"id"`

	// 题目是信息主题/频道
	Topic string `json:"topic"`

	// FromID 是发送代理 ID
	FromID string `json:"from_id"`

	// ToID 是接收代理ID( 空来播放)
	ToID string `json:"to_id,omitempty"`

	// 类型是信件类型(提议、回应、表决等)
	Type string `json:"type"`

	// 内容是信件内容
	Content string `json:"content"`

	// 有效载荷包含额外的结构化数据
	Payload map[string]any `json:"payload,omitempty"`

	// 元数据包含信件元数据
	Metadata map[string]string `json:"metadata,omitempty"`

	// 创建到信件创建时
	CreatedAt time.Time `json:"created_at"`

	// AckedAt是消息被承认的时候(即使没有被承认也没有)
	AckedAt *time.Time `json:"acked_at,omitempty"`

	// 重试( Rettry Count) 是送货尝试的次数
	RetryCount int `json:"retry_count"`

	// Last RetryAt 是上次尝试重试时
	LastRetryAt *time.Time `json:"last_retry_at,omitempty"`

	// 过期是信件过期时( 可选)
	ExpiresAt *time.Time `json:"expires_at,omitempty"`
}

信件代表系统中的持久信息

func (*Message) IsAcked

func (m *Message) IsAcked() bool

如果信件已被确认, 将会被检查

func (*Message) IsExpired

func (m *Message) IsExpired() bool

如果信件已过期, 检查已过期

func (*Message) MarshalJSON

func (m *Message) MarshalJSON() ([]byte, error)

JSON警长执行JSON。 元目录

func (*Message) NextRetryTime

func (m *Message) NextRetryTime(config RetryConfig) time.Time

下次重试时计算

func (*Message) ShouldRetry

func (m *Message) ShouldRetry(config RetryConfig) bool

是否应根据重试配置重试信件

func (*Message) UnmarshalJSON

func (m *Message) UnmarshalJSON(data []byte) error

UnmarshalJSON 执行json。 解马沙勒

type MessageFilter

type MessageFilter struct {
	// 按主题划分的专题过滤器
	Topic string `json:"topic,omitempty"`

	// 发送者从ID中过滤
	FromID string `json:"from_id,omitempty"`

	// 收件人的 ToID 过滤器
	ToID string `json:"to_id,omitempty"`

	// 按信件类型输入过滤器
	Type string `json:"type,omitempty"`

	// 通过承认状态进行状态过滤
	Status MessageStatus `json:"status,omitempty"`

	// 在此时间之后创建过滤信件
	CreatedAfter *time.Time `json:"created_after,omitempty"`

	// 在此之前创建过滤信件
	CreatedBefore *time.Time `json:"created_before,omitempty"`

	// 限定要返回的信件的最大数量
	Limit int `json:"limit,omitempty"`

	// 偏移为要跳过的信件数量
	Offset int `json:"offset,omitempty"`
}

MessageFilter 定义过滤信件的标准

type MessageStatus

type MessageStatus string

信件状态代表信件状态

const (
	// 信件状态显示信件正在等待处理
	MessageStatusPending MessageStatus = "pending"

	// 信件状态显示信件已被确认
	MessageStatusAcked MessageStatus = "acked"

	// 信件状态已过期 。
	MessageStatusExpired MessageStatus = "expired"

	// 信件状态失败, 表示信件在最大重试后失败
	MessageStatusFailed MessageStatus = "failed"
)

type MessageStore

type MessageStore interface {
	Store

	// 保存Message 坚持给商店的单个消息
	SaveMessage(ctx context.Context, msg *Message) error

	// 保存消息在解剖上持续了多个消息
	SaveMessages(ctx context.Context, msgs []*Message) error

	// 通过 ID 获取信件
	GetMessage(ctx context.Context, msgID string) (*Message, error)

	// GetMessages 获取带有 pagination 主题的信息
	// 返回信件、 下个光标和错误
	GetMessages(ctx context.Context, topic string, cursor string, limit int) ([]*Message, string, error)

	// AckMessage 标记已确认/处理的信息
	AckMessage(ctx context.Context, msgID string) error

	// 获取未保存的邮件获取未确认的比指定时间长的信件
	// 这些留言是重试的候选人
	GetUnackedMessages(ctx context.Context, topic string, olderThan time.Duration) ([]*Message, error)

	// GetPendingMessages 检索需要发送的信件
	// 这包括需要重试的新信件和信件
	GetPendingMessages(ctx context.Context, topic string, limit int) ([]*Message, error)

	// 递增
	IncrementRetry(ctx context.Context, msgID string) error

	// 删除信件从存储处删除
	DeleteMessage(ctx context.Context, msgID string) error

	// 清理删除旧消息
	Cleanup(ctx context.Context, olderThan time.Duration) (int, error)

	// Stats 返回关于消息库的统计数据
	Stats(ctx context.Context) (*MessageStoreStats, error)
}

MessageStore定义了信件持久性的界面. 它提供可靠的信息传送,并附有确认和重新测试支持。

func MustNewMessageStore

func MustNewMessageStore(config StoreConfig) MessageStore

Must NewMessageStore 创建了新信件存储器或对错误的恐慌 。

警告:此功能只应在应用程序初始化时使用 (例如,主要()或init())。 在请求处理器或业务逻辑中使用恐慌 强烈劝阻。 对于运行时商店创建,取而代之的是使用"NewMessageStore".

示例用法:

func 主 () {
    存储 := 持久性. Must NewMessageStore(配置) // OK - 初始化
    // ...
}

func NewMessageStore

func NewMessageStore(config StoreConfig) (MessageStore, error)

新MessageStore 创建基于配置的新信件系统

func NewMessageStoreOrExit

func NewMessageStoreOrExit(config StoreConfig) MessageStore

NewMessageStore OrExit 创建了新的信件存储器,或在错误时退出程序. 这是用于CLI应用的MustNewMessageStore的更安全的替代品.

type MessageStoreStats

type MessageStoreStats struct {
	// TotalMessages 为商店中信件的总数
	TotalMessages int64 `json:"total_messages"`

	// 未决信件是未确认信件的数量
	PendingMessages int64 `json:"pending_messages"`

	// AckedMessages 是确认消息的数量
	AckedMessages int64 `json:"acked_messages"`

	// 过期信件是过期信件的数量
	ExpiredMessages int64 `json:"expired_messages"`

	// 主题计数为每个主题的信息数
	TopicCounts map[string]int64 `json:"topic_counts"`

	// 最老的PendingAge是最老的待发消息的年龄
	OldestPendingAge time.Duration `json:"oldest_pending_age"`
}

信件Stats 包含关于信件存储的统计数据

type RedisMessageStore

type RedisMessageStore struct {
	// contains filtered or unexported fields
}

RedisMessageStore是一个基于Redis的MessageStore执行. 适合分布式生产部署. 使用 Redis Streams 在消费组的支持下存储消息.

func NewRedisMessageStore

func NewRedisMessageStore(config StoreConfig) (*RedisMessageStore, error)

NewRedisMessageStore 创建一个新的基于 Redis 的信息存储

func (*RedisMessageStore) AckMessage

func (s *RedisMessageStore) AckMessage(ctx context.Context, msgID string) error

AckMessage 是一个被承认的信息

func (*RedisMessageStore) Cleanup

func (s *RedisMessageStore) Cleanup(ctx context.Context, olderThan time.Duration) (int, error)

清理删除旧消息

func (*RedisMessageStore) Close

func (s *RedisMessageStore) Close() error

关闭商店

func (*RedisMessageStore) DeleteMessage

func (s *RedisMessageStore) DeleteMessage(ctx context.Context, msgID string) error

删除信件从存储处删除

func (*RedisMessageStore) GetMessage

func (s *RedisMessageStore) GetMessage(ctx context.Context, msgID string) (*Message, error)

通过 ID 获取信件

func (*RedisMessageStore) GetMessages

func (s *RedisMessageStore) GetMessages(ctx context.Context, topic string, cursor string, limit int) ([]*Message, string, error)

GetMessages 获取带有 pagination 主题的信息

func (*RedisMessageStore) GetPendingMessages

func (s *RedisMessageStore) GetPendingMessages(ctx context.Context, topic string, limit int) ([]*Message, error)

GetPendingMessages 检索需要发送的信件

func (*RedisMessageStore) GetUnackedMessages

func (s *RedisMessageStore) GetUnackedMessages(ctx context.Context, topic string, olderThan time.Duration) ([]*Message, error)

获取未保存的邮件获取未确认的比指定时间长的信件

func (*RedisMessageStore) IncrementRetry

func (s *RedisMessageStore) IncrementRetry(ctx context.Context, msgID string) error

递增

func (*RedisMessageStore) Ping

func (s *RedisMessageStore) Ping(ctx context.Context) error

平平检查,如果商店是健康的

func (*RedisMessageStore) SaveMessage

func (s *RedisMessageStore) SaveMessage(ctx context.Context, msg *Message) error

保存信件坚持一个消息

func (*RedisMessageStore) SaveMessages

func (s *RedisMessageStore) SaveMessages(ctx context.Context, msgs []*Message) error

保存消息在解剖上持续了多个消息

func (*RedisMessageStore) Stats

Stats 返回关于消息库的统计数据

type RedisStoreConfig

type RedisStoreConfig struct {
	// 主机是 Redis 服务器主机
	Host string `json:"host" yaml:"host"`

	// 端口是 Redis 服务器端口
	Port int `json:"port" yaml:"port"`

	// 密码是 Redis 密码( 可选)
	Password string `json:"password" yaml:"password"`

	// DB 为 Redis 数据库编号
	DB int `json:"db" yaml:"db"`

	// PoolSize 是连接池大小
	PoolSize int `json:"pool_size" yaml:"pool_size"`

	// 密钥前缀是所有 Redis 密钥的前缀
	KeyPrefix string `json:"key_prefix" yaml:"key_prefix"`

	// TLSEnabled 是否启用 TLS 连接
	TLSEnabled bool `json:"tls_enabled" yaml:"tls_enabled"`
}

RedisStore Config 包含 Redis 特定配置

type RedisTaskStore

type RedisTaskStore struct {
	// contains filtered or unexported fields
}

RedisTaskStore是一个基于Redis的"TaskStore"执行. 适合分布式生产部署. 使用 Redis Hash 来进行任务存储, 并排序集进行索引 。

func NewRedisTaskStore

func NewRedisTaskStore(config StoreConfig) (*RedisTaskStore, error)

新建基于 Redis 的任务库

func (*RedisTaskStore) Cleanup

func (s *RedisTaskStore) Cleanup(ctx context.Context, olderThan time.Duration) (int, error)

清除完成/ 失败的任务超过指定期限

func (*RedisTaskStore) Close

func (s *RedisTaskStore) Close() error

关闭商店

func (*RedisTaskStore) DeleteTask

func (s *RedisTaskStore) DeleteTask(ctx context.Context, taskID string) error

删除任务从商店中删除任务

func (*RedisTaskStore) GetRecoverableTasks

func (s *RedisTaskStore) GetRecoverableTasks(ctx context.Context) ([]*AsyncTask, error)

获取可回收的任务检索重启后需要回收的任务

func (*RedisTaskStore) GetTask

func (s *RedisTaskStore) GetTask(ctx context.Context, taskID string) (*AsyncTask, error)

通过 ID 获取任务

func (*RedisTaskStore) ListTasks

func (s *RedisTaskStore) ListTasks(ctx context.Context, filter TaskFilter) ([]*AsyncTask, error)

ListTasks 检索匹配过滤标准的任务

func (*RedisTaskStore) Ping

func (s *RedisTaskStore) Ping(ctx context.Context) error

平平检查,如果商店是健康的

func (*RedisTaskStore) SaveTask

func (s *RedisTaskStore) SaveTask(ctx context.Context, task *AsyncTask) error

保存任务持续执行商店的任务

func (*RedisTaskStore) Stats

Stats 返回关于任务存储的统计

func (*RedisTaskStore) UpdateProgress

func (s *RedisTaskStore) UpdateProgress(ctx context.Context, taskID string, progress float64) error

更新进度更新任务进度

func (*RedisTaskStore) UpdateStatus

func (s *RedisTaskStore) UpdateStatus(ctx context.Context, taskID string, status TaskStatus, result any, errMsg string) error

更新状态更新任务状态

type RetryConfig

type RetryConfig struct {
	// Max Retries 是重试尝试的最大次数( 默认 3)
	MaxRetries int `json:"max_retries" yaml:"max_retries"`

	// 初始备份是初始备份期限( 默认:1s)
	InitialBackoff time.Duration `json:"initial_backoff" yaml:"initial_backoff"`

	// MaxBackoff 是最大后退持续时间( 默认: 30s)
	MaxBackoff time.Duration `json:"max_backoff" yaml:"max_backoff"`

	// 后置倍数是指数后置的乘数( 默认: 2. 0)
	BackoffMultiplier float64 `json:"backoff_multiplier" yaml:"backoff_multiplier"`
}

RetryConfig 定义消息发送的再试行为

func DefaultRetryConfig

func DefaultRetryConfig() RetryConfig

默认重试Config 返回默认重试配置 保守策略:最大3个回推,以指数后置 1s/2s/4s

func (RetryConfig) CalculateBackoff

func (c RetryConfig) CalculateBackoff(attempt int) time.Duration

计算 Backoff 计算给定重试的后退持续时间

type Serializable

type Serializable interface {
	// 元帅JSON 返回对象的 JSON 编码
	json.Marshaler
	// UnmarshalJSON 解析 JSON 编码数据
	json.Unmarshaler
}

可序列化是可被序列化为JSON的对象的接口

type Store

type Store interface {
	// 关闭存储并释放资源
	Close() error

	// 平平检查,如果商店是健康的
	Ping(ctx context.Context) error
}

存储是所有持久存储的基础界面

type StoreConfig

type StoreConfig struct {
	// 类型是存储后端类型
	Type StoreType `json:"type" yaml:"type"`

	// BaseDir 是基于文件存储的基础目录
	BaseDir string `json:"base_dir" yaml:"base_dir"`

	// Redis 配置( 仅在类型为 “ redis” 时使用)
	Redis RedisStoreConfig `json:"redis" yaml:"redis"`

	// 重试配置
	Retry RetryConfig `json:"retry" yaml:"retry"`

	// 清理配置
	Cleanup CleanupConfig `json:"cleanup" yaml:"cleanup"`
}

StoreConfig 是所有存储执行的基础配置

func DefaultStoreConfig

func DefaultStoreConfig() StoreConfig

默认StoreConfig 返回默认存储配置

type StoreType

type StoreType string

StoreType 代表存储后端的类型

const (
	StoreTypeMemory StoreType = "memory"
	StoreTypeFile   StoreType = "file"
	StoreTypeRedis  StoreType = "redis"
)

type TaskEvent

type TaskEvent struct {
	// TaskID 是此事件所属的任务
	TaskID string `json:"task_id"`

	// 类型是事件类型
	Type TaskEventType `json:"type"`

	// 旧状态是上一个状态( 对于状态变化事件)
	OldStatus TaskStatus `json:"old_status,omitempty"`

	// 新状态是新状态(用于改变状态事件)
	NewStatus TaskStatus `json:"new_status,omitempty"`

	// 进步是进步价值(对于进步活动)
	Progress float64 `json:"progress,omitempty"`

	// 信件是可选事件消息
	Message string `json:"message,omitempty"`

	// 时间戳是事件发生时
	Timestamp time.Time `json:"timestamp"`
}

TaskEvent 代表任务生命周期中的事件

type TaskEventType

type TaskEventType string

TaskEventType 代表任务事件的类型

const (
	// 任务EventCreated 表示创建了任务
	TaskEventCreated TaskEventType = "created"

	// 任务启动( T) 表示任务已经开始执行
	TaskEventStarted TaskEventType = "started"

	// 任务进度显示任务进度已更新
	TaskEventProgress TaskEventType = "progress"

	// 任务Event 已完成 。
	TaskEventCompleted TaskEventType = "completed"

	// 任务Event 失败表示任务失败
	TaskEventFailed TaskEventType = "failed"

	// 任务已取消
	TaskEventCancelled TaskEventType = "cancelled"

	// 任务EventRetry 显示任务正在重试
	TaskEventRetry TaskEventType = "retry"

	// 任务Event 已恢复 。
	TaskEventRecovered TaskEventType = "recovered"
)

type TaskFilter

type TaskFilter struct {
	// 按会话排列的会话ID过滤器
	SessionID string `json:"session_id,omitempty"`

	// 通过代理代理ID过滤器
	AgentID string `json:"agent_id,omitempty"`

	// 按任务类型分类的过滤器
	Type string `json:"type,omitempty"`

	// 按状态进行状态过滤( 可以是多个)
	Status []TaskStatus `json:"status,omitempty"`

	// 父任务过滤器
	ParentTaskID string `json:"parent_task_id,omitempty"`

	// 在此次过滤后创建任务
	CreatedAfter *time.Time `json:"created_after,omitempty"`

	// 在此之前创建过滤器任务
	CreatedBefore *time.Time `json:"created_before,omitempty"`

	// 限制是返回的最大任务数
	Limit int `json:"limit,omitempty"`

	// 偏移为要跳过的任务数
	Offset int `json:"offset,omitempty"`

	// 顺序By 指定排序顺序
	OrderBy string `json:"order_by,omitempty"`

	// 命令代斯克指定了降序
	OrderDesc bool `json:"order_desc,omitempty"`
}

任务 过滤器定义过滤任务的标准

type TaskStatus

type TaskStatus string

任务状态 :

const (
	// 任务状态显示任务正在等待执行
	TaskStatusPending TaskStatus = "pending"

	// 任务状态运行显示任务正在执行中
	TaskStatusRunning TaskStatus = "running"

	// 任务状态完成后显示任务成功完成
	TaskStatusCompleted TaskStatus = "completed"

	// 任务状态失败 。
	TaskStatusFailed TaskStatus = "failed"

	// 任务状态已取消 。
	TaskStatusCancelled TaskStatus = "cancelled"

	// 任务状态超时显示任务超时
	TaskStatusTimeout TaskStatus = "timeout"
)

func (TaskStatus) IsRecoverable

func (s TaskStatus) IsRecoverable() bool

如果任务在重启后恢复, 可恢复返回为真

func (TaskStatus) IsTerminal

func (s TaskStatus) IsTerminal() bool

IsTerminal 如果状态是终端状态, 返回为真

type TaskStore

type TaskStore interface {
	Store

	// 保存任务持续到存储( 创建或更新) 。
	SaveTask(ctx context.Context, task *AsyncTask) error

	// 通过 ID 获取任务
	GetTask(ctx context.Context, taskID string) (*AsyncTask, error)

	// ListTasks 检索匹配过滤标准的任务
	ListTasks(ctx context.Context, filter TaskFilter) ([]*AsyncTask, error)

	// 更新状态更新任务状态
	UpdateStatus(ctx context.Context, taskID string, status TaskStatus, result any, errMsg string) error

	// 更新进度更新任务进度
	UpdateProgress(ctx context.Context, taskID string, progress float64) error

	// 删除任务从商店中删除任务
	DeleteTask(ctx context.Context, taskID string) error

	// 获取可回收的任务检索重启后需要回收的任务
	// 这包括待决或运行状态中的任务
	GetRecoverableTasks(ctx context.Context) ([]*AsyncTask, error)

	// 清除完成/ 失败的任务超过指定期限
	Cleanup(ctx context.Context, olderThan time.Duration) (int, error)

	// Stats 返回关于任务存储的统计
	Stats(ctx context.Context) (*TaskStoreStats, error)
}

TaskStore定义了同步任务持久性的接口. 它在服务重启后为任务状态管理提供恢复支持.

func MustNewTaskStore

func MustNewTaskStore(config StoreConfig) TaskStore

MustNewTaskStore 创建了新的 TaskStore 或对错误的恐慌.

警告:此功能只应在应用程序初始化时使用 (例如,主要()或init())。 在请求处理器或业务逻辑中使用恐慌 强烈劝阻。 对于运行时间商店的创建,使用"NewTaskStore"代替.

示例用法:

func 主 () {
    存储 := 持久性. MustNewTaskStore(配置) // OK - 初始化
    // ...
}

func NewTaskStore

func NewTaskStore(config StoreConfig) (TaskStore, error)

NewTaskStore 创建基于配置的新任务库

func NewTaskStoreOrExit

func NewTaskStoreOrExit(config StoreConfig) TaskStore

NewTaskStoreOrExit 创建了新的 TaskStore 程序,或者在出错时退出程序. 这是用于CLI应用的MustNewTaskStore的更安全的替代品.

type TaskStoreStats

type TaskStoreStats struct {
	// 任务总数是存储中的任务总数
	TotalTasks int64 `json:"total_tasks"`

	// 待决 任务是待决任务的数量
	PendingTasks int64 `json:"pending_tasks"`

	// 运行 任务是运行中的任务数
	RunningTasks int64 `json:"running_tasks"`

	// 已完成 任务是已完成任务的数量
	CompletedTasks int64 `json:"completed_tasks"`

	// 失败的任务是失败的任务数
	FailedTasks int64 `json:"failed_tasks"`

	// 已取消 任务是已取消的任务的数量
	CancelledTasks int64 `json:"cancelled_tasks"`

	// 状态计数为每个状态的任务数
	StatusCounts map[TaskStatus]int64 `json:"status_counts"`

	// AgentCounts 是每个代理的任务数
	AgentCounts map[string]int64 `json:"agent_counts"`

	// 平均完成任务时间
	AverageCompletionTime time.Duration `json:"average_completion_time"`

	// 最老的Ping-Age是最老的待决任务年龄
	OldestPendingAge time.Duration `json:"oldest_pending_age"`
}

TaskStats 包含关于任务存储的统计数据

Directories

Path Synopsis
Package mongodb provides adapter types that bridge the concrete MongoDB store implementations to the agent-layer interfaces defined in agent/interfaces.go.
Package mongodb provides adapter types that bridge the concrete MongoDB store implementations to the agent-layer interfaces defined in agent/interfaces.go.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL