rtm

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2024 License: MIT Imports: 4 Imported by: 0

README

rtm

Resident Task Manager for golang

Use case

You want to run parallel background tasks and you don't care about in memory data loss.

Goodies
  • ✅ Offers parallel task execution by default
  • ✅ Refined cancellation mechanism to prevent runaway go-routines and leaks.
  • ✅ Crash recovery guarantees misbehaving and naughty tasks get booted off the pool
  • ✅ Unbounded FIFO ring-buffer based backup queue so you can run tasks later when execution pool size drops.
Caveats
  • ❌ In Memory Only: Tasks and data do not persist.
  • ❌ When cancelling task execution, the calling thread will block indefinitely to wait for all tasks in the executiono pool to finish. Note that tasks on the backup queue are simply dicarded since they never executed.

Example

package main

import (
	"fmt"
	"github.com/gintec-rdl/rtm"
	"math/rand"
	"os"
	"time"
)

func main() {
	tasks := 100
    poolSize := 5
	te := rtm.NewTaskExecutor(poolSize)

	task := func(ctx *rtm.TaskContext) {
		index := ctx.Args["index"].(int)
		fmt.Printf("[%d] enter\n", index)
        // perform some time consuming action
		time.Sleep(time.Duration(rand.Intn(5000)) * time.Millisecond)

        // simulate a crash
        if rand.Intn(2) == 1 {
			panic(fmt.Errorf("[%d] crashed", index))
		}

		fmt.Printf("[%d] leave\n", index)
	}

	for i := 0; i < tasks; i++ {
		params := rtm.TaskArguments{"index": i}
		te.Queue(task, params)
	}

	time.Sleep(5 * time.Second)
	te.Shutdown()
}

Documentation

Overview

Resident Task Manager

Provides in-process task execution, using go-routines.

Index

Constants

View Source
const (
	DEFAULT_MAX_TASKS = 100
)

Variables

This section is empty.

Functions

This section is empty.

Types

type TaskArguments

type TaskArguments map[string]any

type TaskContext

type TaskContext struct {
	// Cancellable context. One can also check the status of the context (Ctx.Err()) to check if the context
	// has been cancelled
	Ctx context.Context
	// Arguments passed to the task
	Args TaskArguments
	// contains filtered or unexported fields
}

type TaskExecutor

type TaskExecutor struct {
	// contains filtered or unexported fields
}

func NewTaskExecutor

func NewTaskExecutor(size ...int) *TaskExecutor

Create a new, one time use executor with a working pool of size "size".

Queueing tasks to the pool will cause the task to immediately run if the pool size is within the specified size, otherwise tasks are scheduled on a FIFO list, for later execution when space becomes available in the pool.

func (*TaskExecutor) Queue

func (te *TaskExecutor) Queue(fn TaskFunc, args TaskArguments)

Queue the task to run immediately or later when execution space becomes available.

func (*TaskExecutor) Shutdown

func (te *TaskExecutor) Shutdown()

Shuts down the executor and any running tasks.

The function will block until all running tasks in the pool have finished.

Tasks in the backup queue do not affect this function and are ignored.

type TaskFunc

type TaskFunc func(ctx *TaskContext)

The task function to be executed by the task manager

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL