mq

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 23, 2024 License: MIT Imports: 25 Imported by: 2

README

Introduction MQ (Message Queue Broker)

A simple Pub/Sub system memory based task processing. It uses centralized server to manage consumers and publishers.

Examples:

Run server

go run server.go

Run consumer

go run consumer.go

Run publisher

go run publisher.go

tasks.go

package tasks

import (
	"context"
	"encoding/json"
	"fmt"
	"log"

	"github.com/oarkflow/mq"
)

func Node1(ctx context.Context, task *mq.Task) mq.Result {
	return mq.Result{Payload: task.Payload, TaskID: task.ID}
}

func Node2(ctx context.Context, task *mq.Task) mq.Result {
	return mq.Result{Payload: task.Payload, TaskID: task.ID}
}

func Node3(ctx context.Context, task *mq.Task) mq.Result {
	var user map[string]any
	json.Unmarshal(task.Payload, &user)
	age := int(user["age"].(float64))
	status := "FAIL"
	if age > 20 {
		status = "PASS"
	}
	user["status"] = status
	resultPayload, _ := json.Marshal(user)
	return mq.Result{Payload: resultPayload, Status: status}
}

func Node4(ctx context.Context, task *mq.Task) mq.Result {
	var user map[string]any
	json.Unmarshal(task.Payload, &user)
	user["final"] = "D"
	resultPayload, _ := json.Marshal(user)
	return mq.Result{Payload: resultPayload}
}

func Node5(ctx context.Context, task *mq.Task) mq.Result {
	var user map[string]any
	json.Unmarshal(task.Payload, &user)
	user["salary"] = "E"
	resultPayload, _ := json.Marshal(user)
	return mq.Result{Payload: resultPayload}
}

func Node6(ctx context.Context, task *mq.Task) mq.Result {
	var user map[string]any
	json.Unmarshal(task.Payload, &user)
	resultPayload, _ := json.Marshal(map[string]any{"storage": user})
	return mq.Result{Payload: resultPayload}
}

func Callback(ctx context.Context, task mq.Result) mq.Result {
	fmt.Println("Received task", task.TaskID, "Payload", string(task.Payload), task.Error, task.Topic)
	return mq.Result{}
}

func NotifyResponse(ctx context.Context, result mq.Result) {
	log.Printf("DAG Final response: TaskID: %s, Payload: %s, Topic: %s", result.TaskID, result.Payload, result.Topic)
}

Start Server

server.go

package main

import (
	"context"
	
	"github.com/oarkflow/mq"
	"github.com/oarkflow/mq/examples/tasks"
)

func main() {
	b := mq.NewBroker(mq.WithCallback(tasks.Callback))
	b.NewQueue("queue1")
	b.NewQueue("queue2")
	b.Start(context.Background())
}

Start Consumer

consumer.go

package main

import (
	"context"

	"github.com/oarkflow/mq"

	"github.com/oarkflow/mq/examples/tasks"
)

func main() {
	consumer1 := mq.NewConsumer("consumer-1", "queue1", tasks.Node1)
	consumer2 := mq.NewConsumer("consumer-2", "queue2", tasks.Node2)
	// consumer := mq.NewConsumer("consumer-1", mq.WithTLS(true, "./certs/server.crt", "./certs/server.key"))
	go consumer1.Consume(context.Background())
	consumer2.Consume(context.Background())
}

Publish tasks

publisher.go

package main

import (
	"context"
	"fmt"
	
	"github.com/oarkflow/mq"
)

func main() {
	payload := []byte(`{"message":"Message Publisher \n Task"}`)
	task := mq.Task{
		Payload: payload,
	}
	publisher := mq.NewPublisher("publish-1")
	err := publisher.Publish(context.Background(), "queue1", task)
	if err != nil {
		panic(err)
	}
	fmt.Println("Async task published successfully")
	payload = []byte(`{"message":"Fire-and-Forget \n Task"}`)
	task = mq.Task{
		Payload: payload,
	}
	result, err := publisher.Request(context.Background(), "queue1", task)
	if err != nil {
		panic(err)
	}
	fmt.Printf("Sync task published. Result: %v\n", string(result.Payload))
}

DAG (Directed Acyclic Graph)

In this package, you can use the DAG feature to create a directed acyclic graph of tasks. The DAG feature allows you to define a sequence of tasks that need to be executed in a specific order.

Example

dag.go

package main

import (
	"context"
	"encoding/json"
	"github.com/oarkflow/mq/consts"
	"github.com/oarkflow/mq/examples/tasks"
	"io"
	"net/http"

	"github.com/oarkflow/mq"
	"github.com/oarkflow/mq/dag"
)

var (
	d = dag.NewDAG(mq.WithSyncMode(false), mq.WithNotifyResponse(tasks.NotifyResponse))
	// d = dag.NewDAG(mq.WithSyncMode(true), mq.WithTLS(true, "./certs/server.crt", "./certs/server.key"), mq.WithCAPath("./certs/ca.cert"))
)

func main() {
	d.AddNode("A", tasks.Node1, true)
	d.AddNode("B", tasks.Node2)
	d.AddNode("C", tasks.Node3)
	d.AddNode("D", tasks.Node4)
	d.AddNode("E", tasks.Node5)
	d.AddNode("F", tasks.Node6)
	d.AddEdge("A", "B", dag.LoopEdge)
	d.AddCondition("C", map[string]string{"PASS": "D", "FAIL": "E"})
	d.AddEdge("B", "C")
	d.AddEdge("D", "F")
	d.AddEdge("E", "F")
	http.HandleFunc("POST /publish", requestHandler("publish"))
	http.HandleFunc("POST /request", requestHandler("request"))
	err := d.Start(context.TODO(), ":8083")
	if err != nil {
		panic(err)
	}
}

func requestHandler(requestType string) func(w http.ResponseWriter, r *http.Request) {
	return func(w http.ResponseWriter, r *http.Request) {
		if r.Method != http.MethodPost {
			http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
			return
		}
		var payload []byte
		if r.Body != nil {
			defer r.Body.Close()
			var err error
			payload, err = io.ReadAll(r.Body)
			if err != nil {
				http.Error(w, "Failed to read request body", http.StatusBadRequest)
				return
			}
		} else {
			http.Error(w, "Empty request body", http.StatusBadRequest)
			return
		}
		ctx := context.Background()
		if requestType == "request" {
			ctx = mq.SetHeaders(ctx, map[string]string{consts.AwaitResponseKey: "true"})
		}
		// ctx = context.WithValue(ctx, "initial_node", "E")
		rs := d.ProcessTask(ctx, payload)
		w.Header().Set("Content-Type", "application/json")
		json.NewEncoder(w).Encode(rs)
	}
}

TODOS

  • Backend for task persistence
  • Task scheduling

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetAwaitResponse

func GetAwaitResponse(ctx context.Context) (string, bool)

func GetConnection

func GetConnection(addr string, config TLSConfig) (net.Conn, error)

func GetConsumerID

func GetConsumerID(ctx context.Context) (string, bool)

func GetContentType

func GetContentType(ctx context.Context) (string, bool)

func GetHeader

func GetHeader(ctx context.Context, key string) (string, bool)

func GetHeaders

func GetHeaders(ctx context.Context) (storage.IMap[string, string], bool)

func GetPublisherID

func GetPublisherID(ctx context.Context) (string, bool)

func GetQueue

func GetQueue(ctx context.Context) (string, bool)

func GetTriggerNode

func GetTriggerNode(ctx context.Context) (string, bool)

func HeadersWithConsumerID

func HeadersWithConsumerID(ctx context.Context, id string) map[string]string

func HeadersWithConsumerIDAndQueue

func HeadersWithConsumerIDAndQueue(ctx context.Context, id, queue string) map[string]string

func IsClosed

func IsClosed(conn net.Conn) bool

func NewID

func NewID() string

func RecoverPanic

func RecoverPanic(labelGenerator func() string)

func RecoverTitle

func RecoverTitle() string

func SetHeaders

func SetHeaders(ctx context.Context, headers map[string]string) context.Context

func WithHeaders

func WithHeaders(ctx context.Context, headers map[string]string) map[string]string

func WrapError

func WrapError(err error, msg, op string) error

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

func NewBroker

func NewBroker(opts ...Option) *Broker

func (*Broker) AddConsumer

func (b *Broker) AddConsumer(ctx context.Context, queueName string, conn net.Conn) string

func (*Broker) Close added in v0.0.2

func (b *Broker) Close() error

func (*Broker) HandleCallback

func (b *Broker) HandleCallback(ctx context.Context, msg *codec.Message)

func (*Broker) MessageAck

func (b *Broker) MessageAck(ctx context.Context, msg *codec.Message)

func (*Broker) MessageDeny

func (b *Broker) MessageDeny(ctx context.Context, msg *codec.Message)

func (*Broker) MessageResponseHandler

func (b *Broker) MessageResponseHandler(ctx context.Context, msg *codec.Message)

func (*Broker) NewQueue

func (b *Broker) NewQueue(name string) *Queue

func (*Broker) NotifyHandler

func (b *Broker) NotifyHandler() func(context.Context, Result) error

func (*Broker) OnClose

func (b *Broker) OnClose(ctx context.Context, conn net.Conn) error

func (*Broker) OnConsumerPause

func (b *Broker) OnConsumerPause(ctx context.Context, _ *codec.Message)

func (*Broker) OnConsumerResume

func (b *Broker) OnConsumerResume(ctx context.Context, _ *codec.Message)

func (*Broker) OnConsumerStop

func (b *Broker) OnConsumerStop(ctx context.Context, _ *codec.Message)

func (*Broker) OnError

func (b *Broker) OnError(_ context.Context, conn net.Conn, err error)

func (*Broker) OnMessage

func (b *Broker) OnMessage(ctx context.Context, msg *codec.Message, conn net.Conn)

func (*Broker) Options

func (b *Broker) Options() *Options

func (*Broker) PauseConsumer

func (b *Broker) PauseConsumer(ctx context.Context, consumerID string, queues ...string)

func (*Broker) Publish

func (b *Broker) Publish(ctx context.Context, task *Task, queue string) error

func (*Broker) PublishHandler

func (b *Broker) PublishHandler(ctx context.Context, conn net.Conn, msg *codec.Message)

func (*Broker) RemoveConsumer

func (b *Broker) RemoveConsumer(consumerID string, queues ...string)

func (*Broker) ResumeConsumer

func (b *Broker) ResumeConsumer(ctx context.Context, consumerID string, queues ...string)

func (*Broker) SetNotifyHandler

func (b *Broker) SetNotifyHandler(callback Callback)

func (*Broker) SetURL added in v0.0.2

func (b *Broker) SetURL(url string)

func (*Broker) Start

func (b *Broker) Start(ctx context.Context) error

func (*Broker) StopConsumer

func (b *Broker) StopConsumer(ctx context.Context, consumerID string, queues ...string)

func (*Broker) SubscribeHandler

func (b *Broker) SubscribeHandler(ctx context.Context, conn net.Conn, msg *codec.Message)

func (*Broker) SyncMode

func (b *Broker) SyncMode() bool

func (*Broker) TLSConfig

func (b *Broker) TLSConfig() TLSConfig

func (*Broker) URL

func (b *Broker) URL() string

type Callback

type Callback func(ctx context.Context, result Result) error

type CompletionCallback

type CompletionCallback func()

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(id string, queue string, handler Handler, opts ...Option) *Consumer

func (*Consumer) Close

func (c *Consumer) Close() error

func (*Consumer) Conn

func (c *Consumer) Conn() net.Conn

func (*Consumer) Consume

func (c *Consumer) Consume(ctx context.Context) error

func (*Consumer) ConsumeMessage

func (c *Consumer) ConsumeMessage(ctx context.Context, msg *codec.Message, conn net.Conn)

func (*Consumer) GetKey

func (c *Consumer) GetKey() string

func (*Consumer) GetType

func (c *Consumer) GetType() string

func (*Consumer) Metrics

func (c *Consumer) Metrics() Metrics

func (*Consumer) OnClose

func (c *Consumer) OnClose(_ context.Context, _ net.Conn) error

func (*Consumer) OnError

func (c *Consumer) OnError(_ context.Context, conn net.Conn, err error)

func (*Consumer) OnMessage

func (c *Consumer) OnMessage(ctx context.Context, msg *codec.Message, conn net.Conn) error

func (*Consumer) OnResponse

func (c *Consumer) OnResponse(ctx context.Context, result Result) error

func (*Consumer) Pause

func (c *Consumer) Pause(ctx context.Context) error

func (*Consumer) ProcessTask

func (c *Consumer) ProcessTask(ctx context.Context, msg *Task) Result

func (*Consumer) Resume

func (c *Consumer) Resume(ctx context.Context) error

func (*Consumer) SetKey

func (c *Consumer) SetKey(key string)

func (*Consumer) Stop

func (c *Consumer) Stop(ctx context.Context) error

type CronSchedule

type CronSchedule struct {
	Minute     string
	Hour       string
	DayOfMonth string
	Month      string
	DayOfWeek  string
}

func (CronSchedule) String

func (c CronSchedule) String() string

type ExecutionHistory

type ExecutionHistory struct {
	Timestamp time.Time
	Result    Result
}

type Handler

type Handler func(context.Context, *Task) Result

type MemoryTaskStorage

type MemoryTaskStorage struct {
	// contains filtered or unexported fields
}

func NewMemoryTaskStorage

func NewMemoryTaskStorage(expiryTime time.Duration) *MemoryTaskStorage

func (*MemoryTaskStorage) CleanupExpiredTasks

func (m *MemoryTaskStorage) CleanupExpiredTasks() error

func (*MemoryTaskStorage) DeleteTask

func (m *MemoryTaskStorage) DeleteTask(taskID string) error

func (*MemoryTaskStorage) FetchNextTask

func (m *MemoryTaskStorage) FetchNextTask() (*QueueTask, error)

func (*MemoryTaskStorage) GetAllTasks

func (m *MemoryTaskStorage) GetAllTasks() ([]*QueueTask, error)

func (*MemoryTaskStorage) GetTask

func (m *MemoryTaskStorage) GetTask(taskID string) (*QueueTask, error)

func (*MemoryTaskStorage) SaveTask

func (m *MemoryTaskStorage) SaveTask(task *QueueTask) error

type Metrics

type Metrics struct {
	TotalTasks      int64
	CompletedTasks  int64
	ErrorCount      int64
	TotalMemoryUsed int64
	TotalScheduled  int64
	ExecutionTime   int64
}

type Option

type Option func(*Options)

Option defines a function type for setting options.

func WithBrokerURL

func WithBrokerURL(url string) Option

WithBrokerURL -

func WithCAPath

func WithCAPath(caPath string) Option

WithCAPath - Option to enable/disable TLS

func WithCallback

func WithCallback(val ...func(context.Context, Result) Result) Option

WithCallback -

func WithCleanTaskOnComplete

func WithCleanTaskOnComplete() Option

WithCleanTaskOnComplete -

func WithConsumerOnClose

func WithConsumerOnClose(handler func(ctx context.Context, topic, consumerName string)) Option

func WithConsumerOnSubscribe

func WithConsumerOnSubscribe(handler func(ctx context.Context, topic, consumerName string)) Option

func WithInitialDelay

func WithInitialDelay(val time.Duration) Option

WithInitialDelay -

func WithJitterPercent

func WithJitterPercent(val float64) Option

WithJitterPercent -

func WithMaxBackoff

func WithMaxBackoff(val time.Duration) Option

WithMaxBackoff -

func WithMaxRetries

func WithMaxRetries(val int) Option

WithMaxRetries -

func WithNotifyResponse

func WithNotifyResponse(callback Callback) Option

func WithRespondPendingResult

func WithRespondPendingResult(mode bool) Option

WithRespondPendingResult -

func WithSyncMode

func WithSyncMode(mode bool) Option

WithSyncMode -

func WithTLS

func WithTLS(enableTLS bool, certPath, keyPath string) Option

WithTLS - Option to enable/disable TLS

func WithWorkerPool

func WithWorkerPool(queueSize, numOfWorkers int, maxMemoryLoad int64) Option

type Options

type Options struct {
	// contains filtered or unexported fields
}

func SetupOptions

func SetupOptions(opts ...Option) *Options

func (*Options) CleanTaskOnComplete

func (o *Options) CleanTaskOnComplete() bool

func (*Options) MaxMemoryLoad

func (o *Options) MaxMemoryLoad() int64

func (*Options) NumOfWorkers

func (o *Options) NumOfWorkers() int

func (*Options) QueueSize

func (o *Options) QueueSize() int

func (*Options) SetSyncMode

func (o *Options) SetSyncMode(sync bool)

func (*Options) Storage

func (o *Options) Storage() TaskStorage

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

func NewPool

func NewPool(numOfWorkers int, opts ...PoolOption) *Pool

func (*Pool) AdjustWorkerCount

func (wp *Pool) AdjustWorkerCount(newWorkerCount int)

func (*Pool) Dispatch

func (wp *Pool) Dispatch(event func())

func (*Pool) EnqueueTask

func (wp *Pool) EnqueueTask(ctx context.Context, payload *Task, priority int) error

func (*Pool) Metrics

func (wp *Pool) Metrics() Metrics

func (*Pool) Pause

func (wp *Pool) Pause()

func (*Pool) Resume

func (wp *Pool) Resume()

func (*Pool) Scheduler

func (wp *Pool) Scheduler() *Scheduler

func (*Pool) SetBatchSize

func (wp *Pool) SetBatchSize(size int)

func (*Pool) Start

func (wp *Pool) Start(numWorkers int)

func (*Pool) Stop

func (wp *Pool) Stop()

type PoolOption

type PoolOption func(*Pool)

func WithBatchSize

func WithBatchSize(batchSize int) PoolOption

func WithCompletionCallback

func WithCompletionCallback(callback func()) PoolOption

func WithHandler

func WithHandler(handler Handler) PoolOption

func WithMaxMemoryLoad

func WithMaxMemoryLoad(maxMemoryLoad int64) PoolOption

func WithPoolCallback

func WithPoolCallback(callback Callback) PoolOption

func WithTaskQueueSize

func WithTaskQueueSize(size int) PoolOption

func WithTaskStorage

func WithTaskStorage(storage TaskStorage) PoolOption

func WithTaskTimeout

func WithTaskTimeout(t time.Duration) PoolOption

type PriorityQueue

type PriorityQueue []*QueueTask

func (PriorityQueue) Len

func (pq PriorityQueue) Len() int

func (PriorityQueue) Less

func (pq PriorityQueue) Less(i, j int) bool

func (*PriorityQueue) Pop

func (pq *PriorityQueue) Pop() interface{}

func (*PriorityQueue) Push

func (pq *PriorityQueue) Push(x interface{})

func (PriorityQueue) Swap

func (pq PriorityQueue) Swap(i, j int)

type Processor

type Processor interface {
	ProcessTask(ctx context.Context, msg *Task) Result
	Consume(ctx context.Context) error
	Pause(ctx context.Context) error
	Resume(ctx context.Context) error
	Stop(ctx context.Context) error
	Close() error
	GetKey() string
	SetKey(key string)
	GetType() string
}

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

func NewPublisher

func NewPublisher(id string, opts ...Option) *Publisher

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, task Task, queue string) error

func (*Publisher) Request

func (p *Publisher) Request(ctx context.Context, task Task, queue string) Result

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

type QueueTask

type QueueTask struct {
	// contains filtered or unexported fields
}

type QueuedTask

type QueuedTask struct {
	Message    *codec.Message
	RetryCount int
}

type Result

type Result struct {
	CreatedAt       time.Time       `json:"created_at"`
	ProcessedAt     time.Time       `json:"processed_at,omitempty"`
	Latency         string          `json:"latency"`
	Error           error           `json:"-"` // Keep error as an error type
	Topic           string          `json:"topic"`
	TaskID          string          `json:"task_id"`
	Status          Status          `json:"status"`
	ConditionStatus string          `json:"condition_status"`
	Ctx             context.Context `json:"-"`
	Payload         json.RawMessage `json:"payload"`
}

func HandleError

func HandleError(ctx context.Context, err error, status ...Status) Result

func (Result) MarshalJSON

func (r Result) MarshalJSON() ([]byte, error)

func (Result) Unmarshal

func (r Result) Unmarshal(data any) error

func (*Result) UnmarshalJSON

func (r *Result) UnmarshalJSON(data []byte) error

func (Result) WithData

func (r Result) WithData(status Status, data []byte) Result

type Schedule

type Schedule struct {
	Interval   time.Duration
	DayOfWeek  []time.Weekday
	DayOfMonth []int
	TimeOfDay  time.Time
	Recurring  bool
	CronSpec   string
}

func (*Schedule) ToHumanReadable

func (s *Schedule) ToHumanReadable() string

type ScheduleOptions

type ScheduleOptions struct {
	Handler   Handler
	Callback  Callback
	Overlap   bool
	Interval  time.Duration
	Recurring bool
}

type ScheduledTask

type ScheduledTask struct {
	// contains filtered or unexported fields
}

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(pool *Pool) *Scheduler

func (*Scheduler) AddTask

func (s *Scheduler) AddTask(ctx context.Context, payload *Task, opts ...SchedulerOption)

func (*Scheduler) PrintAllTasks

func (s *Scheduler) PrintAllTasks()

func (*Scheduler) PrintExecutionHistory

func (s *Scheduler) PrintExecutionHistory(taskID string)

func (*Scheduler) RemoveTask

func (s *Scheduler) RemoveTask(payloadID string)

func (*Scheduler) Start

func (s *Scheduler) Start()

type SchedulerConfig

type SchedulerConfig struct {
	Callback Callback
	Overlap  bool
}

type SchedulerOption

type SchedulerOption func(*ScheduleOptions)

func WithInterval

func WithInterval(interval time.Duration) SchedulerOption

func WithOverlap

func WithOverlap() SchedulerOption

func WithRecurring

func WithRecurring() SchedulerOption

func WithSchedulerCallback

func WithSchedulerCallback(callback Callback) SchedulerOption

func WithSchedulerHandler

func WithSchedulerHandler(handler Handler) SchedulerOption

Helper functions to create SchedulerOptions

type Status added in v0.0.2

type Status string
const (
	Pending    Status = "Pending"
	Processing Status = "Processing"
	Completed  Status = "Completed"
	Failed     Status = "Failed"
)

type TLSConfig

type TLSConfig struct {
	CertPath string
	KeyPath  string
	CAPath   string
	UseTLS   bool
}

type Task

type Task struct {
	CreatedAt   time.Time       `json:"created_at"`
	ProcessedAt time.Time       `json:"processed_at"`
	Expiry      time.Time       `json:"expiry"`
	Error       error           `json:"error"`
	ID          string          `json:"id"`
	Topic       string          `json:"topic"`
	Status      string          `json:"status"`
	Payload     json.RawMessage `json:"payload"`
}

func NewTask

func NewTask(id string, payload json.RawMessage, nodeKey string) *Task

type TaskStorage

type TaskStorage interface {
	SaveTask(task *QueueTask) error
	GetTask(taskID string) (*QueueTask, error)
	DeleteTask(taskID string) error
	GetAllTasks() ([]*QueueTask, error)
	FetchNextTask() (*QueueTask, error)
	CleanupExpiredTasks() error
}

Directories

Path Synopsis
dag
v1
internal
services module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL