kafkaq

package module
v0.0.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 24, 2023 License: Apache-2.0 Imports: 2 Imported by: 0

README

CILicense

Simple, reliable, low-latency task queue library in Golang

Kafkaq is a small Go library for queueing tasks and processing them asynchronously with workers.

High-level overview of how Kafkaq works:

  • A client publishes tasks in the queue (message broker like Kafka, Red Panda ...).
  • A job executor (worker) polls the queue for tasks to be processed.
  • Acquired tasks states stored in a key-value store to control their execution status.
  • Unconfirmed tasks are distributed again in the timeout.

The system consists of the following components:

  • The message broker which persists tasks (Kafka or similar tasks log-based persister)
  • The tasks' state storage (Redis, Zookeeper, etcd...) to keep a track of the state of the tasks acquired for processing.
  • The Go library for publishing and consuming tasks on the queue.

Features

  • Guaranteed at least one execution of each task
  • Low latency (microseconds) (limited by Kafka or key-value storage like Redis/etcd which one is slower)
  • Rescheduling uncommitted tasks after timeout
  • Highly-reliable (limited by message broker or key-value storage reliabilities)
  • Highly-scalable (each the system component maybe scaled horizontally)

Quickstart

Make sure you have Go installed (download). Version 1.19 or higher is recommended.

go get -u github.com/acquirecloud/kafkaq

Make sure you're running a Redis server locally or from a Docker container. Make sure you're running Kafka server locally or from a Docker container.

NOTE: you can use docker compose, which runs Redis and Kafka in the docker, provided in scripts folder.

Then you can try the following code, but don't be scared that it takes some time to start, this is due to default Kafka config:

package main

import (
	"context"
	"fmt"
	"github.com/acquirecloud/kafkaq"
	"github.com/acquirecloud/kafkaq/pkg/kafka"
	"github.com/go-redis/redis/v8"
	"sync"
	"time"
)

func main() {
	// creating the queue client. It allows to publish new tasks and receiving existing ones from the queue
	q := kafka.NewKafkaRedis(kafka.GetDefaultQueueConfig(),
		// docker options
		&redis.Options{
			Addr:     "localhost:6379",
			Password: "", // no password set
			DB:       0,  // use default DB
		})
	// don't forget to start it...
	q.Init(context.Background())
	defer q.Shutdown() // q must be shutdown

	ctx, cancel := context.WithCancel(context.Background())
	c := make(chan bool)
	done := make(chan struct{})

	var wg sync.WaitGroup
	for i := 0; i < 50; i++ {
		wg.Add(1)
		go func(wID int) {
			defer wg.Done()
			for ctx.Err() == nil {
				// getting the job to process
				j, err := q.GetJob(ctx)
				if err == nil {
					fmt.Println("worker ", wID, ": executing ", string(j.Task()))
					j.Done() // commit it
					select {
					case c <- true:
					case <-done:
						return
					}
				}
			}
		}(i)
	}

	total := 10

	for j := 0; j < 10; j++ {
		start := time.Now()
		for i := 0; i < total; i++ {
			fmt.Println("publishing task ", i)
			// Publishing new task, each task is just "task X" string
			q.Publish(kafkaq.Task(fmt.Sprintf("task %d", i)))
		}
		count := 0
		for count < total {
			select {
			case <-c:
				count++
			}
		}
		diff := time.Now().Sub(start)
		fmt.Println("total ", diff, " ", diff/time.Duration(total), " per one request")
	}
	cancel()
	close(done)
	wg.Wait()
}

Documentation

Overview

Copyright 2023 The acquirecloud Authors

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Job

type Job interface {
	// ID returns the Job ID
	ID() string

	// Task returns the task for the job.
	Task() Task

	// Done marks the task as processed, so it will not be re-tried again by a timeout.
	// If Done() is called before the task timeout fired, the task will not
	// be executed again. Otherwise, the task can be run in another job.
	// The function may return an error if the request cannot be executed for a whatever reason.
	// For this case the task timeout is still in charge and the task may be re-executed
	// again later.
	Done() error
}

Job is an object that allows to signal the task completion.

type JobController added in v0.0.12

type JobController interface {
	// Get returns the JobInfo for the job ID provided. The function returns
	// error if the JobInfo may not be obtained for whatever reasons, otherwise
	// it will return the job status and some information about it
	Get(jid string) (JobInfo, error)

	// Cancel allows to stop the future job processing by its id. If the job is
	// not returned for processing yet, it will be not, after the call. If the
	// job was already done, nothing will happen. The call may be made
	// in the middle of the job processing, and it will not interrupt it, but it
	// will affect only further job runs if they are scheduled.
	Cancel(jid string) (JobInfo, error)
}

JobController allows to discover information about jobs in the queue

type JobInfo added in v0.0.12

type JobInfo struct {
	// ID contains the Job ID
	ID string
	// Status contains the known status for the job
	Status JobStatus
	// Rescedules contains how many times the job was returned by the rescheduling timeout
	Rescedules int
	// NextExecutionAt contains the timestamp when the job will be executed if it will
	// not be done before it.
	NextExecutionAt time.Time
}

JobInfo contains information about a job

type JobStatus added in v0.0.12

type JobStatus int

JobStatus contains the status of a job, please see constants below

const (
	// JobStatusUnknown indicates that the job is not seen yet, or it was
	// done some time ago, so the information about the job is already gone. The
	// State indicates there is no record about the job, it is not quite possible to say
	// whether the job ever existed or not seen yet.
	JobStatusUnknown JobStatus = iota

	// JobStatusProcessing indicates that the job is seen, distributed and not done yet.
	// This state indicates that the job was selected, but it is not done (for whatever reasons)
	// yet. The job can be rescheduled and returned for the next run. The information about when
	// the Job can be run again maybe found in the JobInfo object.
	JobStatusProcessing

	// JobStatusDone indicates that the job existed, it is done, and the record about it still exists
	// in the queue.
	JobStatusDone
)

type Queue

type Queue interface {
	// GetJob returns a job for a submitted task. The call will be blocked until
	// the task is found or the context is closed. The function will return error
	// if the task can be retrieved (normally if the ctx is closed)
	GetJob(ctx context.Context) (Job, error)
}

Queue interface represents the queue object: Tasks may be published to the queue and retrieved as jobs for processing via GetJob() function. Each task stays in the queue until it is done for the processing. When a task is retrieved from the queue the timeout for the task is charged and if the task is not done on the moment when the timeout is fired, a new job for the task will be returned again until the task is done.

type Task

type Task []byte

Task abstraction represents a task which may be published to the queue and then consumed for further processing.

type TaskPublisher

type TaskPublisher interface {
	// Publish places the Task into the queue. The function returns the
	// new Job ID for the task which will be returned by Queue.GetJob() later
	Publish(Task) (string, error)
}

TaskPublisher allows to submit a new task into the queue. It separated from the Queue object cause not all publishers need to consume jobs from the queue.

Directories

Path Synopsis
pkg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL