queue

package
v0.0.0-...-fefb669 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 31, 2025 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

package queue define the queue interface and message type for workflow

Index

Constants

This section is empty.

Variables

View Source
var DBMessageStatus = struct {
	Created   string
	Processed string
}{
	Created:   "Created",
	Processed: "Processed",
}

DBMessageStatus message status in db

View Source
var DefaultDBDelayQueueOption = DBQueueOption{

	PollingDuration: 500 * time.Millisecond,

	PollingLimit: 10,
}

默认DB队列参数

View Source
var MessageClass = struct {
	Execution string
	State     string
	StepGroup string
}{
	Execution: "Execution",
	State:     "State",
	StepGroup: "StepGroup",
}

MessageClass Message class

View Source
var MessageQueueErrorCode = struct {
	DBError         string
	DataError       string
	MessageNotFound string
}{
	DBError:         "DBError",
	DataError:       "DataError",
	MessageNotFound: "MessageNotFound",
}

MessageQueueErrorCode errorcode

Functions

This section is empty.

Types

type DBMessageQueue

type DBMessageQueue struct {
	// contains filtered or unexported fields
}

DBDelayMessageQueue DB delay message queue

func NewDBMessageQueue

func NewDBMessageQueue(client *rdb.DBClient, option DBQueueOption) *DBMessageQueue

NewDBMessageQueue Create New Inner DBMessage

func (*DBMessageQueue) CleanExecutionMessage

func (dbmq *DBMessageQueue) CleanExecutionMessage(execution_id int) error

CleanExecutionMessage 清理ExecutionMessage

func (*DBMessageQueue) Close

func (mq *DBMessageQueue) Close() error

Close stop async task message queue

func (*DBMessageQueue) DispatchInnerMessage

func (mq *DBMessageQueue) DispatchInnerMessage(msgID int) error

DispatchInnerMessage dispatch message to forward queue

func (*DBMessageQueue) GetUnreadMessageIDsOnce

func (mq *DBMessageQueue) GetUnreadMessageIDsOnce() ([]int, error)

GetUnreadMessageIDsOnce 获得所有未处理的消息列表

func (*DBMessageQueue) ReceiveInnerMessage

func (dbmq *DBMessageQueue) ReceiveInnerMessage() (<-chan InnerMessage, error)

ReceiveInnerMessage return chan with type InnerMessage

func (*DBMessageQueue) RunPollingInnerMessage

func (mq *DBMessageQueue) RunPollingInnerMessage()

func (*DBMessageQueue) SendInnerMessage

func (mq *DBMessageQueue) SendInnerMessage(message InnerMessageBody, sendTime *time.Time) error

SendInnerMessage Send InnerMessage ,Save message in DB

func (*DBMessageQueue) SetForwardQueue

func (mq *DBMessageQueue) SetForwardQueue(forwardQueue InnerMessageQueue)

func (*DBMessageQueue) SetLogger

func (dbmq *DBMessageQueue) SetLogger(logger *slog.Logger)

func (*DBMessageQueue) SetOption

func (dbmq *DBMessageQueue) SetOption(opt DBQueueOption)

SetOption 设置option选项

func (*DBMessageQueue) StartPolling

func (mq *DBMessageQueue) StartPolling()

func (*DBMessageQueue) SyncSchema

func (dbmq *DBMessageQueue) SyncSchema() error

type DBQueueMessage

type DBQueueMessage struct {
	// contains filtered or unexported fields
}

func (*DBQueueMessage) Ack

func (t *DBQueueMessage) Ack() error

func (*DBQueueMessage) Body

func (t *DBQueueMessage) Body() InnerMessageBody

func (*DBQueueMessage) ID

func (t *DBQueueMessage) ID() string

func (*DBQueueMessage) Nack

func (t *DBQueueMessage) Nack() error

type DBQueueOption

type DBQueueOption struct {
	// 正常消息的巡检时长
	PollingDuration time.Duration
	// 正常消息每次捞出的消息数量
	PollingLimit int
}

DBQueueOption db 队列的默认参数

func NewDBQueueOption

func NewDBQueueOption() DBQueueOption

func ParseDBQueueOption

func ParseDBQueueOption(data map[string]string) (DBQueueOption, error)

ParseDBQueueOption parse

type InnerMessage

type InnerMessage interface {
	ID() string
	Body() InnerMessageBody
	Ack() error
	Nack() error
}

InnerMessage 内部消息的队列接口

type InnerMessageBody

type InnerMessageBody struct {
	ExecutionID int    //message execution id
	StepID      int    // message state id
	Class       string // class Execution、State
	Type        string // message type
	Data        string // data in message
}

InnerMessage 工作流内部的消息类型

func UnmarshalInnerMessageBody

func UnmarshalInnerMessageBody(data []byte) (*InnerMessageBody, error)

func (*InnerMessageBody) Marshal

func (imb *InnerMessageBody) Marshal() ([]byte, error)

type InnerMessageQueue

type InnerMessageQueue interface {
	// 发送消息
	SendInnerMessage(InnerMessageBody, *time.Time) error
	// 接收消息
	ReceiveInnerMessage() (<-chan InnerMessage, error)
	// 同步数据模型
	SyncSchema() error
	// 清理Execution相关的Message
	CleanExecutionMessage(int) error
	// Close 关闭MessageQueue
	Close() error
}

InnerMessageQueue 工作流内部执行的MQ

func NewInnerMessageQueueFromConfig

func NewInnerMessageQueueFromConfig(conf config.AccessPoint) (InnerMessageQueue, error)

NewInnerMessageQueueFromConfig create new inner message queue

type InnerQueueGroup

type InnerQueueGroup struct {
	// contains filtered or unexported fields
}

InnerQueueGroup inner queue group

func NewInnerQueueGroup

func NewInnerQueueGroup(masterqueue InnerMessageQueue, delayqueue InnerMessageQueue) (*InnerQueueGroup, error)

func NewInnerQueueGroupFromConfig

func NewInnerQueueGroupFromConfig(normal config.AccessPoint, delay config.AccessPoint) (*InnerQueueGroup, error)

func (*InnerQueueGroup) CleanExecutionMessage

func (q *InnerQueueGroup) CleanExecutionMessage(execution_id int) error

CleanExecution 清理一个execution的 message

func (*InnerQueueGroup) Close

func (q *InnerQueueGroup) Close() error

Close close group queue

func (*InnerQueueGroup) ReceiveInnerMessage

func (q *InnerQueueGroup) ReceiveInnerMessage() (<-chan InnerMessage, error)

ReceiveInnerMessage receive innermessage from group queue

func (*InnerQueueGroup) SendInnerMessage

func (q *InnerQueueGroup) SendInnerMessage(msg InnerMessageBody, sendTime *time.Time) error

SendInnerMessage Select suitable queue send message

func (*InnerQueueGroup) SyncSchema

func (q *InnerQueueGroup) SyncSchema() error

SyncSchema

type MQInnerMessage

type MQInnerMessage struct {
	// contains filtered or unexported fields
}

func (*MQInnerMessage) Ack

func (im *MQInnerMessage) Ack() error

Ack return nil

func (*MQInnerMessage) Body

func (im *MQInnerMessage) Body() InnerMessageBody

Body return body

func (*MQInnerMessage) ID

func (im *MQInnerMessage) ID() string

Ack return nil

func (*MQInnerMessage) Nack

func (im *MQInnerMessage) Nack() error

Nack return nil

type MQInnerMessageQueue

type MQInnerMessageQueue struct {
	// contains filtered or unexported fields
}

MQInnerMessageQueue mq based inner message queue

func NewMQInnerMessageQueue

func NewMQInnerMessageQueue(queue mq.MessageQueue) *MQInnerMessageQueue

func (*MQInnerMessageQueue) CleanExecutionMessage

func (imq *MQInnerMessageQueue) CleanExecutionMessage(executionID int) error

func (*MQInnerMessageQueue) Close

func (imq *MQInnerMessageQueue) Close() error

func (*MQInnerMessageQueue) ReceiveInnerMessage

func (imq *MQInnerMessageQueue) ReceiveInnerMessage() (<-chan InnerMessage, error)

ReceiveInnerMessage receive inner message

func (*MQInnerMessageQueue) SendInnerMessage

func (imq *MQInnerMessageQueue) SendInnerMessage(msgbody InnerMessageBody, sendtime *time.Time) error

func (*MQInnerMessageQueue) SyncSchema

func (imq *MQInnerMessageQueue) SyncSchema() error

type MockInnerQueue

type MockInnerQueue struct{}

MockInnerQueue mock innerqueue

func (MockInnerQueue) CleanExecutionMessage

func (mock MockInnerQueue) CleanExecutionMessage(int) error

CleanExecution 清理一个execution的 message 在这里, 这个是一个空方法

func (MockInnerQueue) Close

func (mock MockInnerQueue) Close() error

Close Close

func (MockInnerQueue) GetName

func (mock MockInnerQueue) GetName() string

func (MockInnerQueue) ReceiveInnerMessage

func (mock MockInnerQueue) ReceiveInnerMessage() (<-chan InnerMessageBody, error)

ReceiveInnerMessage ReceiveInnerMessage

func (MockInnerQueue) SendInnerMessage

func (mock MockInnerQueue) SendInnerMessage(msg InnerMessage, sendtime *time.Time) error

SendInnerMessage SendInnerMessage

func (MockInnerQueue) SyncSchema

func (mock MockInnerQueue) SyncSchema() error

type SimpleInnerQueue

type SimpleInnerQueue struct {
	// contains filtered or unexported fields
}

SimpleInnerQueue test inner queue

func NewSimpleInnerQueue

func NewSimpleInnerQueue() *SimpleInnerQueue

NewSimpleInnerQueue create a new simple inner queue

func (*SimpleInnerQueue) CleanExecutionMessage

func (n *SimpleInnerQueue) CleanExecutionMessage(int) error

func (*SimpleInnerQueue) Close

func (t *SimpleInnerQueue) Close() error

Close Close

func (*SimpleInnerQueue) ReceiveInnerMessage

func (n *SimpleInnerQueue) ReceiveInnerMessage() (<-chan InnerMessage, error)

func (*SimpleInnerQueue) SendInnerMessage

func (t *SimpleInnerQueue) SendInnerMessage(msg InnerMessageBody, sendTime *time.Time) error

func (*SimpleInnerQueue) SyncSchema

func (n *SimpleInnerQueue) SyncSchema() error

type TaskMessage

type TaskMessage interface {
	ID() string
	Body() TaskMessageBody
	Ack() error
	Nack() error
}

TaskMessage 异步任务的队列接口

type TaskMessageBody

type TaskMessageBody struct {
	Type     string //Task Resource 类型,Resource 字段的前缀部分, qrn/arn 等等
	Resource string //Resource 的资源描述 "qrn:qcs:.xxxxxxxx:xxx"
	Input    string // Task 执行时的input
	Token    string // Task 执行时的token
	TaskData string // Task 执行时附带的数据, Execution创建时附带的
}

AsyncTaskMessage 异步任务的消息类型

type TaskMessageQueue

type TaskMessageQueue interface {
	GetName() string
	// 同步数据模型
	SyncSchema() error
	// 发送消息
	SendTaskMessage(TaskMessageBody) error
	// 接收消息
	ReceiveTaskMessage() (<-chan TaskMessage, error)
	// Close 关闭MessageQueue
	Close() error
}

TaskMessageQueue 工作流内部执行的MQ

type TestInnerMessage

type TestInnerMessage struct {
	// contains filtered or unexported fields
}

func (*TestInnerMessage) Ack

func (t *TestInnerMessage) Ack() error

func (*TestInnerMessage) Body

func (*TestInnerMessage) ID

func (t *TestInnerMessage) ID() string

func (*TestInnerMessage) Nack

func (t *TestInnerMessage) Nack() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL