sasynq

package
v1.14.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 3, 2025 License: MIT Imports: 10 Imported by: 0

README

sasynq

sasynq is a wrapper around the excellent asynq library. It provides a simpler and more user-friendly SDK while remaining fully compatible with native asynq usage patterns. Its main features include:

  • Support for Redis Cluster and Sentinel for high availability and horizontal scalability.
  • Distributed task queues with support for priority queues, delayed queues, unique tasks (to prevent duplicate execution), cancel task, and periodic task scheduling.
  • Built-in mechanisms for task retries (with customizable retry counts), timeouts, and deadlines.
  • Flexible scheduling for immediate, delayed, or specific-time execution.
  • Unified logging using zap.

sasynq streamlines asynchronous and distributed task processing in Go, helping you write clean and maintainable background job code quickly and safely.


Example of use

Queues

Defining Task Payloads and Handlers

Here’s how to define task payloads and handlers in sasynq:

// example/common/task.go
package common

import (
    "context"
    "encoding/json"
    "fmt"

    "github.com/hibiken/asynq"
    "github.com/go-dev-frame/sponge/pkg/sasynq"
)

// ----------------------------- Definition Method 1 (recommended)----------------------------------

const TypeEmailSend = "email:send"

type EmailPayload struct {
    UserID  int    `json:"user_id"`
    Message string `json:"message"`
}

func HandleEmailTask(ctx context.Context, p *EmailPayload) error {
    fmt.Printf("[Email] Task for UserID %d completed successfully\n", p.UserID)
    return nil
}

// ----------------------------- Definition Method  2 ----------------------------------

const TypeSMSSend = "sms:send"

type SMSPayload struct {
    UserID  int    `json:"user_id"`
    Message string `json:"message"`
}

func (p *SMSPayload) ProcessTask(ctx context.Context, t *asynq.Task) error {
    fmt.Printf("[SMS] Task for UserID %d completed successfully\n", p.UserID)
    return nil
}

// ----------------------------- Definition Method  3 ----------------------------------

const TypeMsgNotification = "msg:notification"

type MsgNotificationPayload struct {
    UserID  int    `json:"user_id"`
    Message string `json:"message"`
}

func HandleMsgNotificationTask(ctx context.Context, t *asynq.Task) error {
    var p MsgNotificationPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return fmt.Errorf("failed to unmarshal payload: %w", err)
    }
    fmt.Printf("[MSG] Task for UserID %d completed successfully\n", p.UserID)
    return nil
}

const TypeUniqueEmailSend = "unique:email:send"

Producer Example

A producer enqueues tasks with various options like priority, delays, deadlines, and unique IDs.

// example/producer/main.go
package main

import (
    "fmt"
    "time"

    "github.com/go-dev-frame/sponge/pkg/sasynq"
    "example/common"
)

func runProducer(client *sasynq.Client) error {
    // Immediate enqueue with critical priority
    userPayload1 := &common.EmailPayload{
        UserID:  101,
        Message: "This is a message that is immediately queued, with critical priority",
    }
    _, info, err := client.EnqueueNow(common.TypeEmailSend, userPayload1,
        sasynq.WithQueue("critical"),
        sasynq.WithRetry(5),
    )
    if err != nil {
        return err
    }
    fmt.Printf("enqueued task: type=%s, id=%s, queue=%s\n", common.TypeEmailSend, info.ID, info.Queue)

    // Enqueue after a 5-second delay
    userPayload2 := &common.SMSPayload{
        UserID:  202,
        Message: "This is a message added to the queue after a 5-second delay, with default priority",
    }
    _, info, err = client.EnqueueIn(5*time.Second, common.TypeSMSSend, userPayload2,
        sasynq.WithQueue("default"),
        sasynq.WithRetry(3),
    )
    if err != nil {
        return err
    }
    fmt.Printf("enqueued task: type=%s, id=%s, queue=%s\n", common.TypeSMSSend, info.ID, info.Queue)

    // Enqueue to run at a specific time
    userPayload3 := &common.MsgNotificationPayload{
        UserID:  303,
        Message: "This is a message scheduled to run at a specific time, with low priority",
    }
    _, info, err = client.EnqueueAt(time.Now().Add(10*time.Second), common.TypeMsgNotification, userPayload3,
        sasynq.WithQueue("low"),
        sasynq.WithRetry(1),
    )
    if err != nil {
        return err
    }
    fmt.Printf("enqueued task: type=%s, id=%s, queue=%s\n", common.TypeMsgNotification, info.ID, info.Queue)

    // Example of using NewTask directly
    userPayload4 := &common.EmailPayload{
        UserID:  404,
        Message: "This is a test message, with low priority, a 15-second deadline, and a unique ID",
    }
    task, err := sasynq.NewTask(common.TypeEmailSend, userPayload4)
    if err != nil {
        return err
    }
    info, err = client.Enqueue(task,
        sasynq.WithQueue("low"),
        sasynq.WithMaxRetry(1),
        sasynq.WithDeadline(time.Now().Add(15*time.Second)),
        sasynq.WithTaskID("unique-id-xxxx-xxxx"),
    )
    if err != nil {
        return err
    }
    fmt.Printf("enqueued task: type=%s, id=%s, queue=%s\n", common.TypeEmailSend, info.ID, info.Queue)

    // Example of using EnqueueUnique
    userPayload5 := &common.EmailPayload{
        UserID:  505,
        Message: "This is a unique task, with default priority, a 1-minute deadline",
    }
    userPayload5 := &EmailPayload{UserID: 505, Message: "unique task"}
    _, info5, err := client.EnqueueUnique(time.Minute, common.TypeUniqueEmailSend, userPayload5,
        sasynq.WithQueue("default"),
        sasynq.WithMaxRetry(3))
    if err != nil {
        return err
    }
    fmt.Printf("enqueued task: type=%s, id=%s, queue=%s\n", common.TypeUniqueEmailSend, info5.ID, info5.Queue)

    return nil
}

func main() {
    cfg := sasynq.RedisConfig{
        Addr: "localhost:6379",
    }
    client := sasynq.NewClient(cfg)

    err := runProducer(client)
    if err != nil {
        panic(err)
    }
    defer client.Close()

    fmt.Println("All tasks enqueued.")
}

Consumer Example

A consumer server can register handlers in three different ways:

// example/consumer/main.go
package main

import (
    "github.com/go-dev-frame/sponge/pkg/sasynq"
    "github.com/go-dev-frame/sponge/pkg/logger"
    "example/common"
)

func runConsumer(redisCfg sasynq.RedisConfig) (*sasynq.Server, error) {
    serverCfg := sasynq.DefaultServerConfig(sasynq.WithLogger(logger.Get())) // Uses critical, default, and low queues by default
    srv := sasynq.NewServer(redisCfg, serverCfg)

    // Attach logging middleware
    srv.Use(sasynq.LoggingMiddleware(sasynq.WithLogger(logger.Get())))

    // Register task handlers (three methods available):
    sasynq.RegisterTaskHandler(srv.Mux(), common.TypeEmailSend, sasynq.HandleFunc(common.HandleEmailTask)) // Method 1 (recommended)
    srv.Register(common.TypeSMSSend, &common.SMSPayload{}) // Method 2: register struct as payload
    srv.RegisterFunc(common.TypeMsgNotification, common.HandleMsgNotificationTask) // Method 3: register function directly

    sasynq.RegisterTaskHandler(srv.Mux(), common.TypeUniqueEmailSend, sasynq.HandleFunc(common.HandleEmailTask))
    
    srv.Run()

    return srv, nil
}

func main() {
    cfg := sasynq.RedisConfig{
        Addr: "localhost:6379",
    }
    srv, err := runConsumer(cfg)
    if err != nil {
        panic(err)
    }
    srv.WaitShutdown()
}

Periodic Tasks

sasynq makes scheduling recurring tasks very simple.

package main

import (
    "context"
    "fmt"
    "github.com/go-dev-frame/sponge/pkg/sasynq"
    "github.com/go-dev-frame/sponge/pkg/logger"
)

const TypeScheduledGet = "scheduled:get"

type ScheduledGetPayload struct {
    URL string `json:"url"`
}

func handleScheduledGetTask(ctx context.Context, p *ScheduledGetPayload) error {
    fmt.Printf("[Get] Task for URL %s completed successfully\n", p.URL)
    return nil
}

// -----------------------------------------------------------------------

func registerSchedulerTasks(scheduler *sasynq.Scheduler) error {
    payload1 := &ScheduledGetPayload{URL: "https://google.com"}
    entryID1, err := scheduler.RegisterTask("@every 2s", TypeScheduledGet, payload1)
    if err != nil {
        return err
    }
    fmt.Printf("Registered periodic task with entry ID: %s\n", entryID1)

    payload2 := &ScheduledGetPayload{URL: "https://bing.com"}
    entryID2, err := scheduler.RegisterTask("@every 3s", TypeScheduledGet, payload2)
    if err != nil {
        return err
    }
    fmt.Printf("Registered periodic task with entry ID: %s\n", entryID2)

    scheduler.Run()

    return nil
}

func runServer(redisCfg sasynq.RedisConfig) (*sasynq.Server, error) {
    serverCfg := sasynq.DefaultServerConfig(sasynq.WithLogger(logger.Get()))
    srv := sasynq.NewServer(redisCfg, serverCfg)
    srv.Use(sasynq.LoggingMiddleware())

    // Register handler for scheduled tasks
    sasynq.RegisterTaskHandler(srv.Mux(), TypeScheduledGet, sasynq.HandleFunc(handleScheduledGetTask))

    srv.Run()

    return srv, nil
}

func main() {
    cfg := sasynq.RedisConfig{
        Addr: "localhost:6379",
    }

    scheduler := sasynq.NewScheduler(cfg, sasynq.WithSchedulerLogger(sasynq.WithLogger(logger.Get())))
    err := registerSchedulerTasks(scheduler)
    if err != nil {
        panic(err)
    }

    srv, err := runServer(cfg)
    if err != nil {
        panic(err)
    }
    srv.Shutdown()
}

Cancel Tasks

  1. For one-time tasks, the inspector.CancelTask(queue, taskID) function can be used to cancel the task. The example code is as follows:
package main

import (
    "fmt"
    "github.com/go-dev-frame/sponge/pkg/sasynq"
)

var inspector = sasynq.NewInspector(sasynq.DefaultServerConfig())

func main() {
    queue := "default"
    taskID := "task-id-xxxx-xxxx"
    isScheduled := false // set to true for scheduled tasks

    err := cancelTask(queue, taskID, isScheduled)
    if err != nil {
        fmt.Printf("Failed to cancel task: %v\n", err)
    }
}

func cancelTask(queue string, taskID string, isScheduled bool) error{
    var err error
    if isScheduled {
        err = inspector.CancelTask(queue, taskID)
    } else {
        err = inspector.CancelTask("", taskID) // queue is empty string for non-scheduled tasks
    }
    if err != nil {
        return err
    }

    return nil
}

  1. For periodic scheduled tasks, the scheduler.Unregister(entryID) function can be used to cancel scheduled tasks. The example code is as follows:
package main

import (
    "fmt"
    "github.com/go-dev-frame/sponge/pkg/sasynq"
)

var scheduler = sasynq.NewScheduler(sasynq.DefaultServerConfig())

func main() {
    entryID := "entry-id-xxxx-xxxx" // scheduler.RegisterTask() returns this ID

    err := scheduler.Unregister(entryID)
    if err != nil {
        fmt.Printf("Failed to unregister periodic scheduled tasks: %v\n", err)
    }
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func LoggingMiddleware

func LoggingMiddleware(opts ...LoggerOption) func(next asynq.Handler) asynq.Handler

LoggingMiddleware logs information about each processed task.

func NewTask

func NewTask[P any](typeName string, payload P, opts ...asynq.Option) (*asynq.Task, error)

NewTask creates a new asynq.Task with a typed payload. It automatically marshals the payload into JSON.

func NewZapLogger

func NewZapLogger(l *zap.Logger, skip int) asynq.Logger

func RegisterTaskHandler

func RegisterTaskHandler[T any](mux *asynq.ServeMux, typeName string, handler TaskHandler[T])

RegisterTaskHandler registers a generic, type-safe task handler with the server's mux. It automatically unmarshals the JSON payload into the specified type.

func WithDeadline

func WithDeadline(t time.Time) asynq.Option

WithDeadline specifies the deadline for the task.

func WithMaxRetry added in v1.14.4

func WithMaxRetry(maxRetry int) asynq.Option

WithMaxRetry specifies the max number of times the task will be retried.

func WithQueue

func WithQueue(name string) asynq.Option

WithQueue specifies which queue the task should be sent to.

func WithTaskID added in v1.14.4

func WithTaskID(id string) asynq.Option

WithTaskID specifies the ID for the task, if another task with the same ID already exists, it will be rejeceted.

func WithTimeout

func WithTimeout(timeout time.Duration) asynq.Option

WithTimeout specifies the timeout duration for the task.

Types

type Client

type Client struct {
	*asynq.Client
}

Client is a wrapper around asynq.Client providing more convenient APIs.

func NewClient

func NewClient(cfg RedisConfig) *Client

NewClient creates a new producer client.

func NewFromClient added in v1.14.4

func NewFromClient(c *asynq.Client) *Client

NewFromClient creates a new producer client from an existing asynq.Client.

func (*Client) Enqueue

func (c *Client) Enqueue(task *asynq.Task, opts ...asynq.Option) (*asynq.TaskInfo, error)

Enqueue enqueues the given task to a queue.

func (*Client) EnqueueAt

func (c *Client) EnqueueAt(t time.Time, typeName string, payload any, opts ...asynq.Option) (*asynq.Task, *asynq.TaskInfo, error)

EnqueueAt enqueues a task to be processed at a specific time.

func (*Client) EnqueueIn

func (c *Client) EnqueueIn(delay time.Duration, typeName string, payload any, opts ...asynq.Option) (*asynq.Task, *asynq.TaskInfo, error)

EnqueueIn enqueues a task to be processed after a specified delay.

func (*Client) EnqueueNow

func (c *Client) EnqueueNow(typeName string, payload any, opts ...asynq.Option) (*asynq.Task, *asynq.TaskInfo, error)

EnqueueNow enqueues a task for immediate processing, parameter payload should be supported json.Marshal

func (*Client) EnqueueUnique added in v1.14.4

func (c *Client) EnqueueUnique(keepTime time.Duration, typeName string, payload any, opts ...asynq.Option) (*asynq.Task, *asynq.TaskInfo, error)

EnqueueUnique enqueues a task with unique in the queue for a specified duration.

type Inspector added in v1.14.4

type Inspector struct {
	*asynq.Inspector
}

Inspector provides access to the Redis backend used by asynq.

func NewInspector added in v1.14.4

func NewInspector(cfg RedisConfig) *Inspector

NewInspector creates a new Inspector instance. Note: A new Redis connection will be created, in actual use, only once

func (*Inspector) CancelTask added in v1.14.4

func (i *Inspector) CancelTask(queue string, taskID string) error

CancelTask cancels the processing of a task.

func (*Inspector) Close added in v1.14.4

func (i *Inspector) Close() error

Close closes the inspector.

func (*Inspector) GetTaskInfo added in v1.14.4

func (i *Inspector) GetTaskInfo(queue string, taskID string) (*asynq.TaskInfo, error)

GetTaskInfo returns information about a task.

type LoggerOption

type LoggerOption func(*loggerOptions)

LoggerOption set options.

func WithLogger

func WithLogger(l *zap.Logger) LoggerOption

WithLogger sets the logger to use for logging.

func WithMaxLength

func WithMaxLength(l int) LoggerOption

WithMaxLength sets the maximum length of the payload to log.

func WithZapSkip added in v1.14.4

func WithZapSkip(s int) LoggerOption

WithZapSkip sets the number of callers to skip when logging.

type RedisConfig

type RedisConfig struct {
	Mode RedisMode `yaml:"mode"`

	// For Single Mode
	Addr string `yaml:"addr"`

	// For Sentinel Mode
	SentinelAddrs []string `yaml:"sentinelAddrs"`
	MasterName    string   `yaml:"masterName"`

	// For Cluster Mode
	ClusterAddrs []string `yaml:"clusterAddrs"`

	// Common options
	Username string `yaml:"username"`
	Password string `yaml:"password"`
	DB       int    `yaml:"db"`

	TLSConfig *tls.Config `yaml:"tlsConfig"`
}

RedisConfig holds all configurations for connecting to Redis.

func (RedisConfig) GetAsynqRedisConnOpt

func (c RedisConfig) GetAsynqRedisConnOpt() asynq.RedisConnOpt

GetAsynqRedisConnOpt converts RedisConfig to asynq's RedisConnOpt interface. This is the core of the high-availability switching logic.

type RedisMode

type RedisMode string

RedisMode defines the Redis connection mode.

const (
	// RedisModeSingle uses a single Redis instance.
	RedisModeSingle RedisMode = "single"
	// RedisModeSentinel uses Redis Sentinel for high availability.
	RedisModeSentinel RedisMode = "sentinel"
	// RedisModeCluster uses a Redis Cluster for horizontal scaling.
	RedisModeCluster RedisMode = "cluster"
)

type Scheduler

type Scheduler struct {
	*asynq.Scheduler
}

Scheduler is a wrapper around asynq.Scheduler.

func NewScheduler

func NewScheduler(cfg RedisConfig, opts ...SchedulerOption) *Scheduler

NewScheduler creates a new periodic task scheduler.

func (*Scheduler) Register

func (s *Scheduler) Register(cronSpec string, task *asynq.Task, opts ...asynq.Option) (entryID string, err error)

Register adds a new periodic task.

func (*Scheduler) RegisterTask

func (s *Scheduler) RegisterTask(cronSpec string, typeName string, payload any, opts ...asynq.Option) (entryID string, err error)

RegisterTask adds a new periodic task with a given type name.

func (*Scheduler) Run

func (s *Scheduler) Run()

Run runs the asynq Scheduler in a separate goroutine

func (*Scheduler) Shutdown

func (s *Scheduler) Shutdown()

Shutdown the Scheduler.

func (*Scheduler) Unregister

func (s *Scheduler) Unregister(entryID string) error

Unregister removes a periodic task, cancel task execution.

type SchedulerOption

type SchedulerOption func(*schedulerOptions)

SchedulerOption set options.

func WithSchedulerLogLevel

func WithSchedulerLogLevel(level asynq.LogLevel) SchedulerOption

WithSchedulerLogLevel sets the log level for the scheduler.

func WithSchedulerLogger

func WithSchedulerLogger(opts ...LoggerOption) SchedulerOption

WithSchedulerLogger sets the logger for the scheduler.

func WithSchedulerOptions

func WithSchedulerOptions(opts *asynq.SchedulerOpts) SchedulerOption

WithSchedulerOptions sets the options for the scheduler.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is a wrapper around asynq.Server providing integrated features.

func NewServer

func NewServer(redisCfg RedisConfig, serverCfg ServerConfig) *Server

NewServer creates a new consumer server.

func (*Server) Mux

func (s *Server) Mux() *asynq.ServeMux

Mux returns the underlying ServeMux to register handlers.

func (*Server) Register

func (s *Server) Register(typeName string, handler asynq.Handler)

Register a task processor

func (*Server) RegisterFunc

func (s *Server) RegisterFunc(typeName string, handlerFunc asynq.HandlerFunc)

RegisterFunc a task handler function

func (*Server) Run

func (s *Server) Run()

Run runs the asynq server in a separate goroutine

func (*Server) Shutdown

func (s *Server) Shutdown()

Shutdown the server.

func (*Server) Use

func (s *Server) Use(middlewares ...asynq.MiddlewareFunc)

Use adds middleware to the server's handler chain.

func (*Server) WaitShutdown

func (s *Server) WaitShutdown()

WaitShutdown for interrupt signals for graceful shutdown the server.

type ServerConfig

type ServerConfig struct {
	*asynq.Config
}

ServerConfig holds configurations for the asynq server.

func DefaultServerConfig

func DefaultServerConfig(opts ...LoggerOption) ServerConfig

DefaultServerConfig returns a default server configuration.

type TaskHandleFunc

type TaskHandleFunc[T any] func(ctx context.Context, payload T) error

TaskHandleFunc is a function adapter for TaskHandler.

func (TaskHandleFunc[T]) Handle

func (f TaskHandleFunc[T]) Handle(ctx context.Context, payload T) error

Handle calls the wrapped function.

type TaskHandler

type TaskHandler[T any] interface {
	Handle(ctx context.Context, payload T) error
}

TaskHandler is a generic interface for handling a task with a specific payload type.

func HandleFunc

func HandleFunc[T any](f func(ctx context.Context, payloadType T) error) TaskHandler[T]

HandleFunc creates a TaskHandler from a function.

type ZapLogger

type ZapLogger struct {
	// contains filtered or unexported fields
}

func (*ZapLogger) Debug

func (l *ZapLogger) Debug(args ...interface{})

func (*ZapLogger) Error

func (l *ZapLogger) Error(args ...interface{})

func (*ZapLogger) Fatal

func (l *ZapLogger) Fatal(args ...interface{})

func (*ZapLogger) Info

func (l *ZapLogger) Info(args ...interface{})

func (*ZapLogger) Warn

func (l *ZapLogger) Warn(args ...interface{})

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL