kafkaq

package module
v0.0.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 17, 2023 License: Apache-2.0 Imports: 1 Imported by: 0

README

CILicense

Simple, reliable, low-latency task queue library in Golang

Kafkaq is a small Go library for queueing tasks and processing them asynchronously with workers.

High-level overview of how Kafkaq works:

  • A client publishes tasks in the queue (message broker like Kafka, Red Panda ...).
  • A job executor (worker) polls the queue for tasks to be processed.
  • Acquired tasks states stored in a key-value store to control their execution status.
  • Unconfirmed tasks are distributed again in the timeout.

The system consists of the following components:

  • The message broker which persists tasks (Kafka or similar tasks log-based persister)
  • The tasks' state storage (Redis, Zookeeper, etcd...) to keep a track of the state of the tasks acquired for processing.
  • The Go library for publishing and consuming tasks on the queue.

Features

  • Guaranteed at least one execution of each task
  • Low latency (microseconds) (limited by Kafka or key-value storage like Redis/etcd which one is slower)
  • Rescheduling uncommitted tasks after timeout
  • Highly-reliable (limited by message broker or key-value storage reliabilities)
  • Highly-scalable (each the system component maybe scaled horizontally)

Quickstart

Make sure you have Go installed (download). Version 1.19 or higher is recommended.

go get -u github.com/acquirecloud/kafkaq

Make sure you're running a Redis server locally or from a Docker container. Make sure you're running Kafka server locally or from a Docker container.

NOTE: you can use docker compose, which runs Redis and Kafka in the docker, provided in scripts folder.

Then you can try the following code, but don't be scared that it takes some time to start, this is due to default Kafka config:

package main

import (
	"context"
	"fmt"
	"github.com/acquirecloud/kafkaq"
	"github.com/acquirecloud/kafkaq/pkg/kafka"
	"github.com/go-redis/redis/v8"
	"sync"
	"time"
)

func main() {
	// creating the queue client. It allows to publish new tasks and receiving existing ones from the queue
	q := kafka.NewKafkaRedis(kafka.GetDefaultQueueConfig(),
		// docker options
		&redis.Options{
			Addr:     "localhost:6379",
			Password: "", // no password set
			DB:       0,  // use default DB
		})
	// don't forget to start it...
	q.Init(context.Background())
	defer q.Shutdown() // q must be shutdown

	ctx, cancel := context.WithCancel(context.Background())
	c := make(chan bool)
	done := make(chan struct{})

	var wg sync.WaitGroup
	for i := 0; i < 50; i++ {
		wg.Add(1)
		go func(wID int) {
			defer wg.Done()
			for ctx.Err() == nil {
				// getting the job to process
				j, err := q.GetJob(ctx)
				if err == nil {
					fmt.Println("worker ", wID, ": executing ", string(j.Task()))
					j.Done() // commit it
					select {
					case c <- true:
					case <-done:
						return
					}
				}
			}
		}(i)
	}

	total := 10

	for j := 0; j < 10; j++ {
		start := time.Now()
		for i := 0; i < total; i++ {
			fmt.Println("publishing task ", i)
			// Publishing new task, each task is just "task X" string
			q.Publish(kafkaq.Task(fmt.Sprintf("task %d", i)))
		}
		count := 0
		for count < total {
			select {
			case <-c:
				count++
			}
		}
		diff := time.Now().Sub(start)
		fmt.Println("total ", diff, " ", diff/time.Duration(total), " per one request")
	}
	cancel()
	close(done)
	wg.Wait()
}

Documentation

Overview

Copyright 2023 The acquirecloud Authors

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Job

type Job interface {
	// Task returns the task for the job.
	Task() Task

	// Done marks the task as processed, so it will not be re-tried again by a timeout.
	// If Done() is called before the task timeout fired, the task will not
	// be executed again. Otherwise, the task can be run in another job.
	// The function may return an error if the request cannot be executed for a whatever reason.
	// For this case the task timeout is still in charge and the task may be re-executed
	// again later.
	Done() error
}

Job is an object that allows to signal the task completion.

type Queue

type Queue interface {
	// GetJob returns a job for a submitted task. The call will be blocked until
	// the task is found or the context is closed. The function will return error
	// if the task can be retrieved (normally if the ctx is closed)
	GetJob(ctx context.Context) (Job, error)
}

Queue interface represents the queue object: Tasks may be published to the queue and retrieved as jobs for processing via GetJob() function. Each task stays in the queue until it is done for the processing. When a task is retrieved from the queue the timeout for the task is charged and if the task is not done on the moment when the timeout is fired, a new job for the task will be returned again until the task is done.

type Task

type Task []byte

Task abstraction represents a task which may be published to the queue and then consumed for further processing.

type TaskPublisher

type TaskPublisher interface {
	// Publish places the Task into the queue
	Publish(Task) error
}

TaskPublisher allows to submit a new task into the queue. It separated from the Queue object cause not all publishers need to consume jobs from the queue.

Directories

Path Synopsis
pkg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL