mq

package module
v0.0.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2025 License: MIT Imports: 30 Imported by: 2

README

Introduction MQ (Message Queue Broker)

A simple Pub/Sub system memory based task processing. It uses centralized server to manage consumers and publishers.

Examples:

Run server

go run server.go

Run consumer

go run consumer.go

Run publisher

go run publisher.go

tasks.go

package tasks

import (
	"context"
	"github.com/oarkflow/json"
	"fmt"
	"log"

	"github.com/oarkflow/mq"
)

func Node1(ctx context.Context, task *mq.Task) mq.Result {
	return mq.Result{Payload: task.Payload, TaskID: task.ID}
}

func Node2(ctx context.Context, task *mq.Task) mq.Result {
	return mq.Result{Payload: task.Payload, TaskID: task.ID}
}

func Node3(ctx context.Context, task *mq.Task) mq.Result {
	var user map[string]any
	json.Unmarshal(task.Payload, &user)
	age := int(user["age"].(float64))
	status := "FAIL"
	if age > 20 {
		status = "PASS"
	}
	user["status"] = status
	resultPayload, _ := json.Marshal(user)
	return mq.Result{Payload: resultPayload, Status: status}
}

func Node4(ctx context.Context, task *mq.Task) mq.Result {
	var user map[string]any
	json.Unmarshal(task.Payload, &user)
	user["final"] = "D"
	resultPayload, _ := json.Marshal(user)
	return mq.Result{Payload: resultPayload}
}

func Node5(ctx context.Context, task *mq.Task) mq.Result {
	var user map[string]any
	json.Unmarshal(task.Payload, &user)
	user["salary"] = "E"
	resultPayload, _ := json.Marshal(user)
	return mq.Result{Payload: resultPayload}
}

func Node6(ctx context.Context, task *mq.Task) mq.Result {
	var user map[string]any
	json.Unmarshal(task.Payload, &user)
	resultPayload, _ := json.Marshal(map[string]any{"storage": user})
	return mq.Result{Payload: resultPayload}
}

func Callback(ctx context.Context, task mq.Result) mq.Result {
	fmt.Println("Received task", task.TaskID, "Payload", string(task.Payload), task.Error, task.Topic)
	return mq.Result{}
}

func NotifyResponse(ctx context.Context, result mq.Result) {
	log.Printf("DAG Final response: TaskID: %s, Payload: %s, Topic: %s", result.TaskID, result.Payload, result.Topic)
}

Start Server

server.go

package main

import (
	"context"
	
	"github.com/oarkflow/mq"
	"github.com/oarkflow/mq/examples/tasks"
)

func main() {
	b := mq.NewBroker(mq.WithCallback(tasks.Callback))
	b.NewQueue("queue1")
	b.NewQueue("queue2")
	b.Start(context.Background())
}

Start Consumer

consumer.go

package main

import (
	"context"

	"github.com/oarkflow/mq"

	"github.com/oarkflow/mq/examples/tasks"
)

func main() {
	consumer1 := mq.NewConsumer("consumer-1", "queue1", tasks.Node1)
	consumer2 := mq.NewConsumer("consumer-2", "queue2", tasks.Node2)
	// consumer := mq.NewConsumer("consumer-1", mq.WithTLS(true, "./certs/server.crt", "./certs/server.key"))
	go consumer1.Consume(context.Background())
	consumer2.Consume(context.Background())
}

Publish tasks

publisher.go

package main

import (
	"context"
	"fmt"
	
	"github.com/oarkflow/mq"
)

func main() {
	payload := []byte(`{"message":"Message Publisher \n Task"}`)
	task := mq.Task{
		Payload: payload,
	}
	publisher := mq.NewPublisher("publish-1")
	err := publisher.Publish(context.Background(), "queue1", task)
	if err != nil {
		panic(err)
	}
	fmt.Println("Async task published successfully")
	payload = []byte(`{"message":"Fire-and-Forget \n Task"}`)
	task = mq.Task{
		Payload: payload,
	}
	result, err := publisher.Request(context.Background(), "queue1", task)
	if err != nil {
		panic(err)
	}
	fmt.Printf("Sync task published. Result: %v\n", string(result.Payload))
}

DAG (Directed Acyclic Graph)

In this package, you can use the DAG feature to create a directed acyclic graph of tasks. The DAG feature allows you to define a sequence of tasks that need to be executed in a specific order.

Example

dag.go

package main

import (
	"context"
	"github.com/oarkflow/json"
	"github.com/oarkflow/mq/consts"
	"github.com/oarkflow/mq/examples/tasks"
	"io"
	"net/http"

	"github.com/oarkflow/mq"
	"github.com/oarkflow/mq/dag"
)

var (
	d = dag.NewDAG(mq.WithSyncMode(false), mq.WithNotifyResponse(tasks.NotifyResponse))
	// d = dag.NewDAG(mq.WithSyncMode(true), mq.WithTLS(true, "./certs/server.crt", "./certs/server.key"), mq.WithCAPath("./certs/ca.cert"))
)

func main() {
	d.AddNode("A", tasks.Node1, true)
	d.AddNode("B", tasks.Node2)
	d.AddNode("C", tasks.Node3)
	d.AddNode("D", tasks.Node4)
	d.AddNode("E", tasks.Node5)
	d.AddNode("F", tasks.Node6)
	d.AddEdge("A", "B", dag.LoopEdge)
	d.AddCondition("C", map[string]string{"PASS": "D", "FAIL": "E"})
	d.AddEdge("B", "C")
	d.AddEdge("D", "F")
	d.AddEdge("E", "F")
	http.HandleFunc("POST /publish", requestHandler("publish"))
	http.HandleFunc("POST /request", requestHandler("request"))
	err := d.Start(context.TODO(), ":8083")
	if err != nil {
		panic(err)
	}
}

func requestHandler(requestType string) func(w http.ResponseWriter, r *http.Request) {
	return func(w http.ResponseWriter, r *http.Request) {
		if r.Method != http.MethodPost {
			http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
			return
		}
		var payload []byte
		if r.Body != nil {
			defer r.Body.Close()
			var err error
			payload, err = io.ReadAll(r.Body)
			if err != nil {
				http.Error(w, "Failed to read request body", http.StatusBadRequest)
				return
			}
		} else {
			http.Error(w, "Empty request body", http.StatusBadRequest)
			return
		}
		ctx := context.Background()
		if requestType == "request" {
			ctx = mq.SetHeaders(ctx, map[string]string{consts.AwaitResponseKey: "true"})
		}
		// ctx = context.WithValue(ctx, "initial_node", "E")
		rs := d.ProcessTask(ctx, payload)
		w.Header().Set("Content-Type", "application/json")
		json.NewEncoder(w).Encode(rs)
	}
}

TODOS

  • Backend for task persistence
  • Task scheduling

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Config = &DynamicConfig{
	Timeout:         10 * time.Second,
	BatchSize:       1,
	MaxMemoryLoad:   100 * 1024 * 1024,
	IdleTimeout:     5 * time.Minute,
	BackoffDuration: 2 * time.Second,
	MaxRetries:      3,
	ReloadInterval:  30 * time.Second,
	WarningThreshold: WarningThresholds{
		HighMemory:    1 * 1024 * 1024,
		LongExecution: 2 * time.Second,
	},
	NumberOfWorkers: 5,
}

Functions

func GetAwaitResponse

func GetAwaitResponse(ctx context.Context) (string, bool)

func GetConnection

func GetConnection(addr string, config TLSConfig) (net.Conn, error)

Modified GetConnection: reuse existing connection if valid.

func GetConsumerID

func GetConsumerID(ctx context.Context) (string, bool)

func GetContentType

func GetContentType(ctx context.Context) (string, bool)

func GetHeader

func GetHeader(ctx context.Context, key string) (string, bool)

func GetHeaders

func GetHeaders(ctx context.Context) (storage.IMap[string, string], bool)

func GetPublisherID

func GetPublisherID(ctx context.Context) (string, bool)

func GetQueue

func GetQueue(ctx context.Context) (string, bool)

func GetTriggerNode

func GetTriggerNode(ctx context.Context) (string, bool)

func HeadersWithConsumerID

func HeadersWithConsumerID(ctx context.Context, id string) map[string]string

func HeadersWithConsumerIDAndQueue

func HeadersWithConsumerIDAndQueue(ctx context.Context, id, queue string) map[string]string

func IsClosed

func IsClosed(conn net.Conn) bool

func NewID

func NewID() string

func RecoverPanic

func RecoverPanic(labelGenerator func() string)

func RecoverTitle

func RecoverTitle() string

func SetHeaders

func SetHeaders(ctx context.Context, headers map[string]string) context.Context

func WithHeaders

func WithHeaders(ctx context.Context, headers map[string]string) map[string]string

func WrapError

func WrapError(err error, msg, op string) error

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

func NewBroker

func NewBroker(opts ...Option) *Broker

func (*Broker) AddConsumer

func (b *Broker) AddConsumer(ctx context.Context, queueName string, conn net.Conn) string

func (*Broker) AdjustConsumerWorkers added in v0.0.11

func (b *Broker) AdjustConsumerWorkers(noOfWorkers int, consumerID ...string)

func (*Broker) Close added in v0.0.2

func (b *Broker) Close() error

func (*Broker) HandleCallback

func (b *Broker) HandleCallback(ctx context.Context, msg *codec.Message)

func (*Broker) MessageAck

func (b *Broker) MessageAck(ctx context.Context, msg *codec.Message)

func (*Broker) MessageDeny

func (b *Broker) MessageDeny(ctx context.Context, msg *codec.Message)

func (*Broker) MessageResponseHandler

func (b *Broker) MessageResponseHandler(ctx context.Context, msg *codec.Message)

func (*Broker) NewQueue

func (b *Broker) NewQueue(name string) *Queue

func (*Broker) NotifyHandler

func (b *Broker) NotifyHandler() func(context.Context, Result) error

func (*Broker) OnClose

func (b *Broker) OnClose(ctx context.Context, conn net.Conn) error

func (*Broker) OnConsumerPause

func (b *Broker) OnConsumerPause(ctx context.Context, _ *codec.Message)

func (*Broker) OnConsumerResume

func (b *Broker) OnConsumerResume(ctx context.Context, _ *codec.Message)

func (*Broker) OnConsumerStop

func (b *Broker) OnConsumerStop(ctx context.Context, _ *codec.Message)

func (*Broker) OnConsumerUpdated added in v0.0.11

func (b *Broker) OnConsumerUpdated(ctx context.Context, msg *codec.Message)

func (*Broker) OnError

func (b *Broker) OnError(_ context.Context, conn net.Conn, err error)

func (*Broker) OnMessage

func (b *Broker) OnMessage(ctx context.Context, msg *codec.Message, conn net.Conn)

func (*Broker) Options

func (b *Broker) Options() *Options

func (*Broker) PauseConsumer

func (b *Broker) PauseConsumer(ctx context.Context, consumerID string, queues ...string)

func (*Broker) Publish

func (b *Broker) Publish(ctx context.Context, task *Task, queue string) error

func (*Broker) PublishHandler

func (b *Broker) PublishHandler(ctx context.Context, conn net.Conn, msg *codec.Message)

func (*Broker) RemoveConsumer

func (b *Broker) RemoveConsumer(consumerID string, queues ...string)

func (*Broker) ResumeConsumer

func (b *Broker) ResumeConsumer(ctx context.Context, consumerID string, queues ...string)

func (*Broker) SetNotifyHandler

func (b *Broker) SetNotifyHandler(callback Callback)

func (*Broker) SetURL added in v0.0.2

func (b *Broker) SetURL(url string)

func (*Broker) Start

func (b *Broker) Start(ctx context.Context) error

func (*Broker) StopConsumer

func (b *Broker) StopConsumer(ctx context.Context, consumerID string, queues ...string)

func (*Broker) SubscribeHandler

func (b *Broker) SubscribeHandler(ctx context.Context, conn net.Conn, msg *codec.Message)

func (*Broker) SyncMode

func (b *Broker) SyncMode() bool

func (*Broker) TLSConfig

func (b *Broker) TLSConfig() TLSConfig

func (*Broker) URL

func (b *Broker) URL() string

func (*Broker) UpdateConsumer added in v0.0.11

func (b *Broker) UpdateConsumer(ctx context.Context, consumerID string, config DynamicConfig, queues ...string) error

type Callback

type Callback func(ctx context.Context, result Result) error

type CircuitBreakerConfig added in v0.0.11

type CircuitBreakerConfig struct {
	Enabled          bool
	FailureThreshold int
	ResetTimeout     time.Duration
}

type CompletionCallback

type CompletionCallback func()

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(id string, queue string, handler Handler, opts ...Option) *Consumer

func (*Consumer) Close

func (c *Consumer) Close() error

func (*Consumer) Conn

func (c *Consumer) Conn() net.Conn

func (*Consumer) Consume

func (c *Consumer) Consume(ctx context.Context) error

func (*Consumer) ConsumeMessage

func (c *Consumer) ConsumeMessage(ctx context.Context, msg *codec.Message, conn net.Conn)

func (*Consumer) GetKey

func (c *Consumer) GetKey() string

func (*Consumer) GetType

func (c *Consumer) GetType() string

func (*Consumer) Metrics

func (c *Consumer) Metrics() Metrics

func (*Consumer) OnClose

func (c *Consumer) OnClose(_ context.Context, _ net.Conn) error

func (*Consumer) OnError

func (c *Consumer) OnError(_ context.Context, conn net.Conn, err error)

func (*Consumer) OnMessage

func (c *Consumer) OnMessage(ctx context.Context, msg *codec.Message, conn net.Conn) error

func (*Consumer) OnResponse

func (c *Consumer) OnResponse(ctx context.Context, result Result) error

func (*Consumer) Pause

func (c *Consumer) Pause(ctx context.Context) error

func (*Consumer) ProcessTask

func (c *Consumer) ProcessTask(ctx context.Context, msg *Task) Result

func (*Consumer) Resume

func (c *Consumer) Resume(ctx context.Context) error

func (*Consumer) SetKey

func (c *Consumer) SetKey(key string)

func (*Consumer) Stop

func (c *Consumer) Stop(ctx context.Context) error

func (*Consumer) Update added in v0.0.11

func (c *Consumer) Update(ctx context.Context, payload []byte) error

type CronSchedule

type CronSchedule struct {
	Minute     string
	Hour       string
	DayOfMonth string
	Month      string
	DayOfWeek  string
}

func (CronSchedule) String

func (c CronSchedule) String() string

type DeadLetterQueue added in v0.0.11

type DeadLetterQueue struct {
	// contains filtered or unexported fields
}

func NewDeadLetterQueue added in v0.0.11

func NewDeadLetterQueue() *DeadLetterQueue

func (*DeadLetterQueue) Add added in v0.0.11

func (dlq *DeadLetterQueue) Add(task *QueueTask)

func (*DeadLetterQueue) Tasks added in v0.0.11

func (dlq *DeadLetterQueue) Tasks() []*QueueTask

type DefaultPlugin added in v0.0.11

type DefaultPlugin struct{}

func (*DefaultPlugin) AfterTask added in v0.0.11

func (dp *DefaultPlugin) AfterTask(task *QueueTask, result Result)

func (*DefaultPlugin) BeforeTask added in v0.0.11

func (dp *DefaultPlugin) BeforeTask(task *QueueTask)

func (*DefaultPlugin) Initialize added in v0.0.11

func (dp *DefaultPlugin) Initialize(config interface{}) error

type DynamicConfig added in v0.0.11

type DynamicConfig struct {
	Timeout          time.Duration
	BatchSize        int
	MaxMemoryLoad    int64
	IdleTimeout      time.Duration
	BackoffDuration  time.Duration
	MaxRetries       int
	ReloadInterval   time.Duration
	WarningThreshold WarningThresholds
	NumberOfWorkers  int // <-- new field for worker count
}

type ExecutionHistory

type ExecutionHistory struct {
	Timestamp time.Time
	Result    Result
}

type Handler

type Handler func(context.Context, *Task) Result

type InMemoryMetricsRegistry added in v0.0.11

type InMemoryMetricsRegistry struct {
	// contains filtered or unexported fields
}

func NewInMemoryMetricsRegistry added in v0.0.11

func NewInMemoryMetricsRegistry() *InMemoryMetricsRegistry

func (*InMemoryMetricsRegistry) Get added in v0.0.11

func (m *InMemoryMetricsRegistry) Get(metricName string) interface{}

func (*InMemoryMetricsRegistry) Increment added in v0.0.11

func (m *InMemoryMetricsRegistry) Increment(metricName string)

func (*InMemoryMetricsRegistry) Register added in v0.0.11

func (m *InMemoryMetricsRegistry) Register(metricName string, value interface{})

type MemoryTaskStorage

type MemoryTaskStorage struct {
	// contains filtered or unexported fields
}

func NewMemoryTaskStorage

func NewMemoryTaskStorage(expiryTime time.Duration) *MemoryTaskStorage

func (*MemoryTaskStorage) CleanupExpiredTasks

func (m *MemoryTaskStorage) CleanupExpiredTasks() error

func (*MemoryTaskStorage) DeleteTask

func (m *MemoryTaskStorage) DeleteTask(taskID string) error

func (*MemoryTaskStorage) FetchNextTask

func (m *MemoryTaskStorage) FetchNextTask() (*QueueTask, error)

func (*MemoryTaskStorage) GetAllTasks

func (m *MemoryTaskStorage) GetAllTasks() ([]*QueueTask, error)

func (*MemoryTaskStorage) GetTask

func (m *MemoryTaskStorage) GetTask(taskID string) (*QueueTask, error)

func (*MemoryTaskStorage) SaveTask

func (m *MemoryTaskStorage) SaveTask(task *QueueTask) error

type Metrics

type Metrics struct {
	TotalTasks      int64
	CompletedTasks  int64
	ErrorCount      int64
	TotalMemoryUsed int64
	TotalScheduled  int64
	ExecutionTime   int64
}

type MetricsRegistry added in v0.0.11

type MetricsRegistry interface {
	Register(metricName string, value interface{})
	Increment(metricName string)
	Get(metricName string) interface{}
}

type Option

type Option func(*Options)

func DisableBrokerRateLimit added in v0.0.11

func DisableBrokerRateLimit() Option

func DisableConsumerRateLimit added in v0.0.11

func DisableConsumerRateLimit() Option

func WithBrokerRateLimiter added in v0.0.11

func WithBrokerRateLimiter(rate int, burst int) Option

func WithBrokerURL

func WithBrokerURL(url string) Option

WithBrokerURL -

func WithCAPath

func WithCAPath(caPath string) Option

WithCAPath - Option to enable/disable TLS

func WithCallback

func WithCallback(val ...func(context.Context, Result) Result) Option

WithCallback -

func WithCleanTaskOnComplete

func WithCleanTaskOnComplete() Option

WithCleanTaskOnComplete -

func WithConsumerOnClose

func WithConsumerOnClose(handler func(ctx context.Context, topic, consumerName string)) Option

func WithConsumerOnSubscribe

func WithConsumerOnSubscribe(handler func(ctx context.Context, topic, consumerName string)) Option

func WithConsumerRateLimiter added in v0.0.11

func WithConsumerRateLimiter(rate int, burst int) Option

func WithInitialDelay

func WithInitialDelay(val time.Duration) Option

WithInitialDelay -

func WithJitterPercent

func WithJitterPercent(val float64) Option

WithJitterPercent -

func WithLogger added in v0.0.10

func WithLogger(log logger.Logger) Option

func WithMaxBackoff

func WithMaxBackoff(val time.Duration) Option

WithMaxBackoff -

func WithMaxRetries

func WithMaxRetries(val int) Option

WithMaxRetries -

func WithNotifyResponse

func WithNotifyResponse(callback Callback) Option

func WithRespondPendingResult

func WithRespondPendingResult(mode bool) Option

WithRespondPendingResult -

func WithSyncMode

func WithSyncMode(mode bool) Option

WithSyncMode -

func WithTLS

func WithTLS(enableTLS bool, certPath, keyPath string) Option

WithTLS - Option to enable/disable TLS

func WithWorkerPool

func WithWorkerPool(queueSize, numOfWorkers int, maxMemoryLoad int64) Option

type Options

type Options struct {
	BrokerRateLimiter   *RateLimiter // new field for broker rate limiting
	ConsumerRateLimiter *RateLimiter // new field for consumer rate limiting
	// contains filtered or unexported fields
}

func SetupOptions

func SetupOptions(opts ...Option) *Options

func (*Options) BrokerAddr added in v0.0.10

func (o *Options) BrokerAddr() string

func (*Options) CleanTaskOnComplete

func (o *Options) CleanTaskOnComplete() bool

func (*Options) Logger added in v0.0.10

func (o *Options) Logger() logger.Logger

func (*Options) MaxMemoryLoad

func (o *Options) MaxMemoryLoad() int64

func (*Options) NumOfWorkers

func (o *Options) NumOfWorkers() int

func (*Options) QueueSize

func (o *Options) QueueSize() int

func (*Options) SetSyncMode

func (o *Options) SetSyncMode(sync bool)

func (*Options) Storage

func (o *Options) Storage() TaskStorage

type Plugin added in v0.0.11

type Plugin interface {
	Initialize(config interface{}) error
	BeforeTask(task *QueueTask)
	AfterTask(task *QueueTask, result Result)
}

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

func NewPool

func NewPool(numOfWorkers int, opts ...PoolOption) *Pool

func (*Pool) AdjustWorkerCount

func (wp *Pool) AdjustWorkerCount(newWorkerCount int)

func (*Pool) DLQ added in v0.0.11

func (wp *Pool) DLQ() *DeadLetterQueue

func (*Pool) Dispatch

func (wp *Pool) Dispatch(event func())

func (*Pool) EnqueueTask

func (wp *Pool) EnqueueTask(ctx context.Context, payload *Task, priority int) error

func (*Pool) Metrics

func (wp *Pool) Metrics() Metrics

func (*Pool) Pause

func (wp *Pool) Pause()

func (*Pool) Resume

func (wp *Pool) Resume()

func (*Pool) Scheduler

func (wp *Pool) Scheduler() *Scheduler

func (*Pool) SetBatchSize

func (wp *Pool) SetBatchSize(size int)

func (*Pool) Start

func (wp *Pool) Start(numWorkers int)

func (*Pool) Stop

func (wp *Pool) Stop()

func (*Pool) UpdateConfig added in v0.0.11

func (wp *Pool) UpdateConfig(newConfig *DynamicConfig) error

New method to update pool configuration via POOL_UPDATE command.

type PoolOption

type PoolOption func(*Pool)

func WithBatchSize

func WithBatchSize(batchSize int) PoolOption

func WithCircuitBreaker added in v0.0.11

func WithCircuitBreaker(config CircuitBreakerConfig) PoolOption

func WithCompletionCallback

func WithCompletionCallback(callback func()) PoolOption

func WithDiagnostics added in v0.0.11

func WithDiagnostics(enabled bool) PoolOption

func WithGracefulShutdown added in v0.0.11

func WithGracefulShutdown(timeout time.Duration) PoolOption

func WithHandler

func WithHandler(handler Handler) PoolOption

func WithHealthServicePort added in v0.0.11

func WithHealthServicePort(port int) PoolOption

func WithMaxMemoryLoad

func WithMaxMemoryLoad(maxMemoryLoad int64) PoolOption

func WithMetricsRegistry added in v0.0.11

func WithMetricsRegistry(registry MetricsRegistry) PoolOption

func WithPlugin added in v0.0.11

func WithPlugin(plugin Plugin) PoolOption

func WithPoolCallback

func WithPoolCallback(callback Callback) PoolOption

func WithTaskQueueSize

func WithTaskQueueSize(size int) PoolOption

func WithTaskStorage

func WithTaskStorage(storage TaskStorage) PoolOption

func WithTaskTimeout

func WithTaskTimeout(t time.Duration) PoolOption

func WithWarningThresholds added in v0.0.11

func WithWarningThresholds(thresholds ThresholdConfig) PoolOption

type PriorityQueue

type PriorityQueue []*QueueTask

func (PriorityQueue) Len

func (pq PriorityQueue) Len() int

func (PriorityQueue) Less

func (pq PriorityQueue) Less(i, j int) bool

func (*PriorityQueue) Pop

func (pq *PriorityQueue) Pop() interface{}

func (*PriorityQueue) Push

func (pq *PriorityQueue) Push(x interface{})

func (PriorityQueue) Swap

func (pq PriorityQueue) Swap(i, j int)

type Processor

type Processor interface {
	ProcessTask(ctx context.Context, msg *Task) Result
	Consume(ctx context.Context) error
	Pause(ctx context.Context) error
	Resume(ctx context.Context) error
	Stop(ctx context.Context) error
	Close() error
	GetKey() string
	SetKey(key string)
	GetType() string
}

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

func NewPublisher

func NewPublisher(id string, opts ...Option) *Publisher

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, task Task, queue string) error

Publish method that uses the persistent connection.

func (*Publisher) Request

func (p *Publisher) Request(ctx context.Context, task Task, queue string) Result

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

type QueueTask

type QueueTask struct {
	// contains filtered or unexported fields
}

type QueuedTask

type QueuedTask struct {
	Message    *codec.Message
	RetryCount int
}

type RateLimiter added in v0.0.11

type RateLimiter struct {
	C chan struct{}
}

NEW: RateLimiter implementation

func NewRateLimiter added in v0.0.11

func NewRateLimiter(rate int, burst int) *RateLimiter

Modified RateLimiter: use blocking send to avoid discarding tokens.

func (*RateLimiter) Wait added in v0.0.11

func (rl *RateLimiter) Wait()

type Result

type Result struct {
	CreatedAt       time.Time       `json:"created_at"`
	ProcessedAt     time.Time       `json:"processed_at,omitempty"`
	Latency         string          `json:"latency"`
	Error           error           `json:"-"` // Keep error as an error type
	Topic           string          `json:"topic"`
	TaskID          string          `json:"task_id"`
	Status          Status          `json:"status"`
	ConditionStatus string          `json:"condition_status"`
	Ctx             context.Context `json:"-"`
	Payload         json.RawMessage `json:"payload"`
	Last            bool
}

func HandleError

func HandleError(ctx context.Context, err error, status ...Status) Result

func (Result) MarshalJSON

func (r Result) MarshalJSON() ([]byte, error)

func (Result) Unmarshal

func (r Result) Unmarshal(data any) error

func (*Result) UnmarshalJSON

func (r *Result) UnmarshalJSON(data []byte) error

func (Result) WithData

func (r Result) WithData(status Status, data []byte) Result

type Schedule

type Schedule struct {
	TimeOfDay  time.Time
	CronSpec   string
	DayOfWeek  []time.Weekday
	DayOfMonth []int
	Interval   time.Duration
	Recurring  bool
}

func (*Schedule) ToHumanReadable

func (s *Schedule) ToHumanReadable() string

type ScheduleOptions

type ScheduleOptions struct {
	Handler   Handler
	Callback  Callback
	Interval  time.Duration
	Overlap   bool
	Recurring bool
}

type ScheduledTask

type ScheduledTask struct {
	// contains filtered or unexported fields
}

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(pool *Pool) *Scheduler

func (*Scheduler) AddTask

func (s *Scheduler) AddTask(ctx context.Context, payload *Task, opts ...SchedulerOption)

func (*Scheduler) PrintAllTasks

func (s *Scheduler) PrintAllTasks()

func (*Scheduler) PrintExecutionHistory

func (s *Scheduler) PrintExecutionHistory(taskID string)

func (*Scheduler) RemoveTask

func (s *Scheduler) RemoveTask(payloadID string)

func (*Scheduler) Start

func (s *Scheduler) Start()

type SchedulerConfig

type SchedulerConfig struct {
	Callback Callback
	Overlap  bool
}

type SchedulerOption

type SchedulerOption func(*ScheduleOptions)

func WithInterval

func WithInterval(interval time.Duration) SchedulerOption

func WithOverlap

func WithOverlap() SchedulerOption

func WithRecurring

func WithRecurring() SchedulerOption

func WithSchedulerCallback

func WithSchedulerCallback(callback Callback) SchedulerOption

func WithSchedulerHandler

func WithSchedulerHandler(handler Handler) SchedulerOption

WithSchedulerHandler Helper functions to create SchedulerOptions

type Status added in v0.0.2

type Status string
const (
	Pending    Status = "Pending"
	Processing Status = "Processing"
	Completed  Status = "Completed"
	Failed     Status = "Failed"
	Cancelled  Status = "Cancelled"
)

type TLSConfig

type TLSConfig struct {
	CertPath string
	KeyPath  string
	CAPath   string
	UseTLS   bool
}

type Task

type Task struct {
	CreatedAt   time.Time       `json:"created_at"`
	ProcessedAt time.Time       `json:"processed_at"`
	Expiry      time.Time       `json:"expiry"`
	Error       error           `json:"error"`
	ID          string          `json:"id"`
	Topic       string          `json:"topic"`
	Status      string          `json:"status"`
	Payload     json.RawMessage `json:"payload"`
	// contains filtered or unexported fields
}

func NewTask

func NewTask(id string, payload json.RawMessage, nodeKey string, opts ...TaskOption) *Task

func (*Task) GetFlow added in v0.0.10

func (t *Task) GetFlow() any

type TaskOption added in v0.0.10

type TaskOption func(*Task)

TaskOption defines a function type for setting options.

func WithDAG added in v0.0.10

func WithDAG(dag any) TaskOption

type TaskStorage

type TaskStorage interface {
	SaveTask(task *QueueTask) error
	GetTask(taskID string) (*QueueTask, error)
	DeleteTask(taskID string) error
	GetAllTasks() ([]*QueueTask, error)
	FetchNextTask() (*QueueTask, error)
	CleanupExpiredTasks() error
}

type ThresholdConfig added in v0.0.11

type ThresholdConfig struct {
	HighMemory    int64
	LongExecution time.Duration
}

type WarningThresholds added in v0.0.11

type WarningThresholds struct {
	HighMemory    int64
	LongExecution time.Duration
}

Directories

Path Synopsis
context_keys.go
context_keys.go
internal
phuslulog.go
phuslulog.go
services module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL