Documentation
¶
Overview ¶
package queue define the queue interface and message type for workflow
Index ¶
- Variables
- type DBMessageQueue
- func (dbmq *DBMessageQueue) CleanExecutionMessage(execution_id int) error
- func (mq *DBMessageQueue) Close() error
- func (mq *DBMessageQueue) DispatchInnerMessage(msgID int) error
- func (mq *DBMessageQueue) GetUnreadMessageIDsOnce() ([]int, error)
- func (dbmq *DBMessageQueue) ReceiveInnerMessage() (<-chan InnerMessage, error)
- func (mq *DBMessageQueue) RunPollingInnerMessage()
- func (mq *DBMessageQueue) SendInnerMessage(message InnerMessageBody, sendTime *time.Time) error
- func (mq *DBMessageQueue) SetForwardQueue(forwardQueue InnerMessageQueue)
- func (dbmq *DBMessageQueue) SetLogger(logger *slog.Logger)
- func (dbmq *DBMessageQueue) SetOption(opt DBQueueOption)
- func (mq *DBMessageQueue) StartPolling()
- func (dbmq *DBMessageQueue) SyncSchema() error
- type DBQueueMessage
- type DBQueueOption
- type InnerMessage
- type InnerMessageBody
- type InnerMessageQueue
- type InnerQueueGroup
- func (q *InnerQueueGroup) CleanExecutionMessage(execution_id int) error
- func (q *InnerQueueGroup) Close() error
- func (q *InnerQueueGroup) ReceiveInnerMessage() (<-chan InnerMessage, error)
- func (q *InnerQueueGroup) SendInnerMessage(msg InnerMessageBody, sendTime *time.Time) error
- func (q *InnerQueueGroup) SyncSchema() error
- type MQInnerMessage
- type MQInnerMessageQueue
- func (imq *MQInnerMessageQueue) CleanExecutionMessage(executionID int) error
- func (imq *MQInnerMessageQueue) Close() error
- func (imq *MQInnerMessageQueue) ReceiveInnerMessage() (<-chan InnerMessage, error)
- func (imq *MQInnerMessageQueue) SendInnerMessage(msgbody InnerMessageBody, sendtime *time.Time) error
- func (imq *MQInnerMessageQueue) SyncSchema() error
- type MockInnerQueue
- func (mock MockInnerQueue) CleanExecutionMessage(int) error
- func (mock MockInnerQueue) Close() error
- func (mock MockInnerQueue) GetName() string
- func (mock MockInnerQueue) ReceiveInnerMessage() (<-chan InnerMessageBody, error)
- func (mock MockInnerQueue) SendInnerMessage(msg InnerMessage, sendtime *time.Time) error
- func (mock MockInnerQueue) SyncSchema() error
- type SimpleInnerQueue
- func (n *SimpleInnerQueue) CleanExecutionMessage(int) error
- func (t *SimpleInnerQueue) Close() error
- func (n *SimpleInnerQueue) ReceiveInnerMessage() (<-chan InnerMessage, error)
- func (t *SimpleInnerQueue) SendInnerMessage(msg InnerMessageBody, sendTime *time.Time) error
- func (n *SimpleInnerQueue) SyncSchema() error
- type TaskMessage
- type TaskMessageBody
- type TaskMessageQueue
- type TestInnerMessage
Constants ¶
This section is empty.
Variables ¶
var DBMessageStatus = struct { Created string Processed string }{ Created: "Created", Processed: "Processed", }
DBMessageStatus message status in db
var DefaultDBDelayQueueOption = DBQueueOption{ PollingDuration: 500 * time.Millisecond, PollingLimit: 10, }
默认DB队列参数
var MessageClass = struct { Execution string State string StepGroup string }{ Execution: "Execution", State: "State", StepGroup: "StepGroup", }
MessageClass Message class
var MessageQueueErrorCode = struct { DBError string DataError string MessageNotFound string }{ DBError: "DBError", DataError: "DataError", MessageNotFound: "MessageNotFound", }
MessageQueueErrorCode errorcode
Functions ¶
This section is empty.
Types ¶
type DBMessageQueue ¶
type DBMessageQueue struct {
// contains filtered or unexported fields
}
DBDelayMessageQueue DB delay message queue
func NewDBMessageQueue ¶
func NewDBMessageQueue(client *rdb.DBClient, option DBQueueOption) *DBMessageQueue
NewDBMessageQueue Create New Inner DBMessage
func (*DBMessageQueue) CleanExecutionMessage ¶
func (dbmq *DBMessageQueue) CleanExecutionMessage(execution_id int) error
CleanExecutionMessage 清理ExecutionMessage
func (*DBMessageQueue) Close ¶
func (mq *DBMessageQueue) Close() error
Close stop async task message queue
func (*DBMessageQueue) DispatchInnerMessage ¶
func (mq *DBMessageQueue) DispatchInnerMessage(msgID int) error
DispatchInnerMessage dispatch message to forward queue
func (*DBMessageQueue) GetUnreadMessageIDsOnce ¶
func (mq *DBMessageQueue) GetUnreadMessageIDsOnce() ([]int, error)
GetUnreadMessageIDsOnce 获得所有未处理的消息列表
func (*DBMessageQueue) ReceiveInnerMessage ¶
func (dbmq *DBMessageQueue) ReceiveInnerMessage() (<-chan InnerMessage, error)
ReceiveInnerMessage return chan with type InnerMessage
func (*DBMessageQueue) RunPollingInnerMessage ¶
func (mq *DBMessageQueue) RunPollingInnerMessage()
func (*DBMessageQueue) SendInnerMessage ¶
func (mq *DBMessageQueue) SendInnerMessage(message InnerMessageBody, sendTime *time.Time) error
SendInnerMessage Send InnerMessage ,Save message in DB
func (*DBMessageQueue) SetForwardQueue ¶
func (mq *DBMessageQueue) SetForwardQueue(forwardQueue InnerMessageQueue)
func (*DBMessageQueue) SetLogger ¶
func (dbmq *DBMessageQueue) SetLogger(logger *slog.Logger)
func (*DBMessageQueue) SetOption ¶
func (dbmq *DBMessageQueue) SetOption(opt DBQueueOption)
SetOption 设置option选项
func (*DBMessageQueue) StartPolling ¶
func (mq *DBMessageQueue) StartPolling()
func (*DBMessageQueue) SyncSchema ¶
func (dbmq *DBMessageQueue) SyncSchema() error
type DBQueueMessage ¶
type DBQueueMessage struct {
// contains filtered or unexported fields
}
func (*DBQueueMessage) Ack ¶
func (t *DBQueueMessage) Ack() error
func (*DBQueueMessage) Body ¶
func (t *DBQueueMessage) Body() InnerMessageBody
func (*DBQueueMessage) ID ¶
func (t *DBQueueMessage) ID() string
func (*DBQueueMessage) Nack ¶
func (t *DBQueueMessage) Nack() error
type DBQueueOption ¶
type DBQueueOption struct { // 正常消息的巡检时长 PollingDuration time.Duration // 正常消息每次捞出的消息数量 PollingLimit int }
DBQueueOption db 队列的默认参数
func NewDBQueueOption ¶
func NewDBQueueOption() DBQueueOption
func ParseDBQueueOption ¶
func ParseDBQueueOption(data map[string]string) (DBQueueOption, error)
ParseDBQueueOption parse
type InnerMessage ¶
type InnerMessage interface { ID() string Body() InnerMessageBody Ack() error Nack() error }
InnerMessage 内部消息的队列接口
type InnerMessageBody ¶
type InnerMessageBody struct { ExecutionID int //message execution id StepID int // message state id Class string // class Execution、State Type string // message type Data string // data in message }
InnerMessage 工作流内部的消息类型
func UnmarshalInnerMessageBody ¶
func UnmarshalInnerMessageBody(data []byte) (*InnerMessageBody, error)
func (*InnerMessageBody) Marshal ¶
func (imb *InnerMessageBody) Marshal() ([]byte, error)
type InnerMessageQueue ¶
type InnerMessageQueue interface { // 发送消息 SendInnerMessage(InnerMessageBody, *time.Time) error // 接收消息 ReceiveInnerMessage() (<-chan InnerMessage, error) // 同步数据模型 SyncSchema() error // 清理Execution相关的Message CleanExecutionMessage(int) error // Close 关闭MessageQueue Close() error }
InnerMessageQueue 工作流内部执行的MQ
func NewInnerMessageQueueFromConfig ¶
func NewInnerMessageQueueFromConfig(conf config.AccessPoint) (InnerMessageQueue, error)
NewInnerMessageQueueFromConfig create new inner message queue
type InnerQueueGroup ¶
type InnerQueueGroup struct {
// contains filtered or unexported fields
}
InnerQueueGroup inner queue group
func NewInnerQueueGroup ¶
func NewInnerQueueGroup(masterqueue InnerMessageQueue, delayqueue InnerMessageQueue) (*InnerQueueGroup, error)
func NewInnerQueueGroupFromConfig ¶
func NewInnerQueueGroupFromConfig(normal config.AccessPoint, delay config.AccessPoint) (*InnerQueueGroup, error)
func (*InnerQueueGroup) CleanExecutionMessage ¶
func (q *InnerQueueGroup) CleanExecutionMessage(execution_id int) error
CleanExecution 清理一个execution的 message
func (*InnerQueueGroup) ReceiveInnerMessage ¶
func (q *InnerQueueGroup) ReceiveInnerMessage() (<-chan InnerMessage, error)
ReceiveInnerMessage receive innermessage from group queue
func (*InnerQueueGroup) SendInnerMessage ¶
func (q *InnerQueueGroup) SendInnerMessage(msg InnerMessageBody, sendTime *time.Time) error
SendInnerMessage Select suitable queue send message
type MQInnerMessage ¶
type MQInnerMessage struct {
// contains filtered or unexported fields
}
type MQInnerMessageQueue ¶
type MQInnerMessageQueue struct {
// contains filtered or unexported fields
}
MQInnerMessageQueue mq based inner message queue
func NewMQInnerMessageQueue ¶
func NewMQInnerMessageQueue(queue mq.MessageQueue) *MQInnerMessageQueue
func (*MQInnerMessageQueue) CleanExecutionMessage ¶
func (imq *MQInnerMessageQueue) CleanExecutionMessage(executionID int) error
func (*MQInnerMessageQueue) Close ¶
func (imq *MQInnerMessageQueue) Close() error
func (*MQInnerMessageQueue) ReceiveInnerMessage ¶
func (imq *MQInnerMessageQueue) ReceiveInnerMessage() (<-chan InnerMessage, error)
ReceiveInnerMessage receive inner message
func (*MQInnerMessageQueue) SendInnerMessage ¶
func (imq *MQInnerMessageQueue) SendInnerMessage(msgbody InnerMessageBody, sendtime *time.Time) error
func (*MQInnerMessageQueue) SyncSchema ¶
func (imq *MQInnerMessageQueue) SyncSchema() error
type MockInnerQueue ¶
type MockInnerQueue struct{}
MockInnerQueue mock innerqueue
func (MockInnerQueue) CleanExecutionMessage ¶
func (mock MockInnerQueue) CleanExecutionMessage(int) error
CleanExecution 清理一个execution的 message 在这里, 这个是一个空方法
func (MockInnerQueue) GetName ¶
func (mock MockInnerQueue) GetName() string
func (MockInnerQueue) ReceiveInnerMessage ¶
func (mock MockInnerQueue) ReceiveInnerMessage() (<-chan InnerMessageBody, error)
ReceiveInnerMessage ReceiveInnerMessage
func (MockInnerQueue) SendInnerMessage ¶
func (mock MockInnerQueue) SendInnerMessage(msg InnerMessage, sendtime *time.Time) error
SendInnerMessage SendInnerMessage
func (MockInnerQueue) SyncSchema ¶
func (mock MockInnerQueue) SyncSchema() error
type SimpleInnerQueue ¶
type SimpleInnerQueue struct {
// contains filtered or unexported fields
}
SimpleInnerQueue test inner queue
func NewSimpleInnerQueue ¶
func NewSimpleInnerQueue() *SimpleInnerQueue
NewSimpleInnerQueue create a new simple inner queue
func (*SimpleInnerQueue) CleanExecutionMessage ¶
func (n *SimpleInnerQueue) CleanExecutionMessage(int) error
func (*SimpleInnerQueue) ReceiveInnerMessage ¶
func (n *SimpleInnerQueue) ReceiveInnerMessage() (<-chan InnerMessage, error)
func (*SimpleInnerQueue) SendInnerMessage ¶
func (t *SimpleInnerQueue) SendInnerMessage(msg InnerMessageBody, sendTime *time.Time) error
func (*SimpleInnerQueue) SyncSchema ¶
func (n *SimpleInnerQueue) SyncSchema() error
type TaskMessage ¶
type TaskMessage interface { ID() string Body() TaskMessageBody Ack() error Nack() error }
TaskMessage 异步任务的队列接口
type TaskMessageBody ¶
type TaskMessageBody struct { Type string //Task Resource 类型,Resource 字段的前缀部分, qrn/arn 等等 Resource string //Resource 的资源描述 "qrn:qcs:.xxxxxxxx:xxx" Input string // Task 执行时的input Token string // Task 执行时的token TaskData string // Task 执行时附带的数据, Execution创建时附带的 }
AsyncTaskMessage 异步任务的消息类型
type TaskMessageQueue ¶
type TaskMessageQueue interface { GetName() string // 同步数据模型 SyncSchema() error // 发送消息 SendTaskMessage(TaskMessageBody) error // 接收消息 ReceiveTaskMessage() (<-chan TaskMessage, error) // Close 关闭MessageQueue Close() error }
TaskMessageQueue 工作流内部执行的MQ
type TestInnerMessage ¶
type TestInnerMessage struct {
// contains filtered or unexported fields
}
func (*TestInnerMessage) Ack ¶
func (t *TestInnerMessage) Ack() error
func (*TestInnerMessage) Body ¶
func (t *TestInnerMessage) Body() InnerMessageBody
func (*TestInnerMessage) ID ¶
func (t *TestInnerMessage) ID() string
func (*TestInnerMessage) Nack ¶
func (t *TestInnerMessage) Nack() error