tasker

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 12, 2025 License: MIT Imports: 11 Imported by: 3

README

gotasker

Go Report Card GoDoc License

gotasker 是一个基于 Go 语言开发的轻量级任务处理工具,主要用于执行和管理本地任务。它提供了简单、高效的方式来执行和监控本地任务,如文件复制、系统命令执行等。

特性

  • 并发任务执行:支持高并发任务处理,可配置并发数
  • 进度条显示:实时显示任务执行进度和剩余时间
  • 错误重试机制:自动重试失败的任务,可配置最大重试次数
  • 灵活的任务模型:支持自定义任务类型和执行逻辑
  • 同步/异步执行模式:支持同步顺序执行和异步并发执行
  • 跨平台支持:支持 Windows、Linux、macOS 等主流操作系统

安装

go get github.com/wxnacy/go-tasker

快速开始

文件复制示例
package main

import (
    "fmt"
    "time"
    "github.com/wxnacy/go-tasker"
)

func main() {
    srcPath := "~/Downloads/source"
    dstPath := fmt.Sprintf("~/Downloads/backup_%d", time.Now().Unix())
    
    // 创建文件复制任务器并执行
    err := tasker.NewCopyFSTasker(srcPath, dstPath).Exec(false)
    if err != nil {
        fmt.Printf("任务执行失败: %v\n", err)
    } else {
        fmt.Println("任务执行成功!")
    }
}
文件剪切示例
package main

import (
    "fmt"
    "github.com/wxnacy/go-tasker"
)

func main() {
    srcPath := "~/Downloads/temp"
    dstPath := "~/Documents/backup"
    
    // 创建文件剪切任务器并执行
    err := tasker.NewCutFSTasker(srcPath, dstPath).Exec(false)
    if err != nil {
        fmt.Printf("任务执行失败: %v\n", err)
    } else {
        fmt.Println("任务执行成功!")
    }
}

核心概念

任务模型 (Task)

Task 是任务执行的基本单元,包含以下字段:

  • Info: 任务数据,可以是任意类型
  • IsDone: 任务是否已完成
  • Err: 任务执行过程中发生的错误
  • RetryTime: 重试次数
任务执行器接口 (ITasker)

ITasker 定义了任务执行器的标准接口:

type ITasker interface {
    Build() error
    BuildTasks() error
    AddTask(*Task)
    GetTasks() []*Task
    RunTask(*Task) error
    Run(TaskFunc) error
    SyncRun(TaskFunc) error
    AfterRun() error
    BeforeRun() error
    Progress() float64
}
执行模式

gotasker 支持两种执行模式:

  1. 异步并发执行 (Run): 使用 goroutine 并发执行任务
  2. 同步顺序执行 (SyncRun): 按顺序逐个执行任务

SDK 使用方法

1. 基本文件操作
复制文件
// 创建复制任务器
tasker := tasker.NewCopyFSTasker("~/source", "~/destination")

// 异步执行(推荐)
err := tasker.Exec(false)

// 同步执行
err := tasker.Exec(true)
剪切文件
// 创建剪切任务器
tasker := tasker.NewCutFSTasker("~/source", "~/destination")

// 执行任务
err := tasker.Exec(false)
2. 自定义任务执行器

可以通过实现 ITasker 接口创建自定义任务执行器:

type CustomTasker struct {
    *tasker.Tasker
    // 自定义字段
}

func (c *CustomTasker) Build() error {
    // 构建前的准备工作
    return nil
}

func (c *CustomTasker) BuildTasks() error {
    // 构建任务列表
    return nil
}

func (c *CustomTasker) RunTask(task *tasker.Task) error {
    // 执行单个任务的逻辑
    return nil
}

func (c *CustomTasker) BeforeRun() error {
    // 执行前的准备工作
    return nil
}

func (c *CustomTasker) AfterRun() error {
    // 执行后的清理工作
    return nil
}
3. 配置调整

可以通过 TaskerConfig 调整任务执行器的行为:

// 创建任务执行器
tasker := tasker.NewTasker()

// 调整配置
tasker.Config.ProcessNum = 10      // 设置并发数为10
tasker.Config.RetryMaxTime = 5     // 设置最大重试次数为5
tasker.Config.UseProgressBar = false // 禁用进度条

// 添加任务
tasker.AddTask(tasker.NewTask("task data", false))

// 执行任务
err := tasker.Run(func(task *tasker.Task) error {
    // 自定义任务执行逻辑
    return nil
})
4. 进度监控
// 获取任务进度 (0.0 - 1.0)
progress, err := tasker.GetTaskerProgress(myTasker)

// 实时监控进度
go func() {
    for {
        progress := myTasker.Progress()
        fmt.Printf("当前进度: %.2f%%\n", progress*100)
        time.Sleep(time.Second)
    }
}()
5. 错误处理
// 执行任务
err := tasker.Exec(false)
if err != nil {
    fmt.Printf("任务执行失败: %v\n", err)
}

// 获取失败的任务
failedTasks := tasker.GetErrorTasks()
for _, task := range failedTasks {
    fmt.Printf("任务失败: %v, 错误: %v, 重试次数: %d\n", 
        task.Info, task.Err, task.RetryTime)
}

高级用法

1. 自定义任务类型
// 定义自定义任务信息
type CustomTaskInfo struct {
    URL    string
    Output string
}

// 创建自定义任务执行器
type DownloadTasker struct {
    *tasker.Tasker
    tasks []*tasker.Task
}

func (d *DownloadTasker) BuildTasks() error {
    // 构建下载任务
    info := CustomTaskInfo{URL: "http://example.com/file.zip", Output: "file.zip"}
    task := tasker.NewTask(info, false)
    d.AddTask(task)
    return nil
}

func (d *DownloadTasker) RunTask(task *tasker.Task) error {
    info := task.Info.(CustomTaskInfo)
    // 执行下载逻辑
    return downloadFile(info.URL, info.Output)
}
2. 批量任务处理
// 创建任务列表
tasks := make([]*tasker.Task, 0)

// 添加多个任务
for i := 0; i < 100; i++ {
    task := tasker.NewTask(fmt.Sprintf("task_%d", i), false)
    tasks = append(tasks, task)
}

// 执行批量任务
err := tasker.SimpleExecTasks(tasks, func(task *tasker.Task) error {
    // 自定义任务处理逻辑
    fmt.Printf("处理任务: %v\n", task.Info)
    return nil
})

API 参考

主要类型
TaskerConfig
type TaskerConfig struct {
    ProcessNum     int  // 并发协程数,默认20
    RetryMaxTime   int  // 最大重试次数,默认1000
    UseProgressBar bool // 是否使用进度条,默认true
}
Task
type Task struct {
    RetryTime int         // 重试次数
    Err       error       // 错误信息
    IsDone    bool        // 是否完成
    Info      interface{} // 任务数据
}
主要函数
NewTasker
func NewTasker() *Tasker

创建新的任务执行器

NewTask
func NewTask(info interface{}, done bool) *Task

创建新的任务

NewCopyFSTasker
func NewCopyFSTasker(from, to string) *FileSystemTasker

创建文件复制任务执行器

NewCutFSTasker
func NewCutFSTasker(from, to string) *FileSystemTasker

创建文件剪切任务执行器

性能优化建议

  1. 合理设置并发数:根据系统资源和任务类型调整 ProcessNum
  2. 使用适当缓冲区:文件操作时使用合适的缓冲区大小
  3. 复用 Tasker 实例:避免频繁创建/销毁任务执行器
  4. 预估任务数量:精确设置任务完成状态以提升进度计算效率

许可证

MIT License,详见 LICENSE 文件。

Documentation

Overview

Package godler provides ...

Index

Constants

This section is empty.

Variables

View Source
var BUFFERSIZE int64

Functions

func Copy added in v1.0.0

func Copy(src, dst string, BUFFERSIZE int64) error

func ExecTasker added in v0.0.4

func ExecTasker(t ITasker, isSync bool) error

执行任务

func FileExists added in v1.0.0

func FileExists(filepath string) bool

判断地址是否存在

func GetTaskerProgress added in v0.0.6

func GetTaskerProgress(t ITasker) (p float64, err error)

获取任务进度 建议 NewTask 方法精确的传入 done 参数

func SimpleExecTasks added in v0.0.6

func SimpleExecTasks(tasks []*Task, runFunc TaskFunc) error

Types

type Action added in v1.0.0

type Action int
const (
	ActionCopy Action = iota
	ActionCut
	ActionZip

	PermFile fs.FileMode = 0666
	PermDir              = 0755
)

type BubblesProgressBar added in v1.0.0

type BubblesProgressBar struct {
	// contains filtered or unexported fields
}

BubblesProgressBar 基于 charmbracelet/bubbles 的进度条实现 为了简化实现,这里使用简单的控制台输出方式

func (*BubblesProgressBar) Finish added in v1.0.0

func (b *BubblesProgressBar) Finish()

Finish 完成进度条显示

func (*BubblesProgressBar) GetProgress added in v1.0.0

func (b *BubblesProgressBar) GetProgress() float64

GetProgress 获取当前进度百分比

func (*BubblesProgressBar) Increment added in v1.0.0

func (b *BubblesProgressBar) Increment()

Increment 增加一个单位的进度

func (*BubblesProgressBar) SetProgress added in v1.0.0

func (b *BubblesProgressBar) SetProgress(current int)

SetProgress 设置当前进度值

func (*BubblesProgressBar) Start added in v1.0.0

func (b *BubblesProgressBar) Start(total int) error

Start 启动进度条显示

type FileSystemTaskInfo added in v1.0.0

type FileSystemTaskInfo struct {
	From string
	To   string
}

type FileSystemTasker added in v1.0.0

type FileSystemTasker struct {
	*Tasker
	From   string
	To     string
	Action Action
}

func NewCopyFSTasker added in v1.0.0

func NewCopyFSTasker(from, to string) *FileSystemTasker

func NewCutFSTasker added in v1.0.0

func NewCutFSTasker(from, to string) *FileSystemTasker

func (*FileSystemTasker) AfterRun added in v1.0.0

func (f *FileSystemTasker) AfterRun() error

func (*FileSystemTasker) BeforeRun added in v1.0.0

func (f *FileSystemTasker) BeforeRun() error

func (*FileSystemTasker) Build added in v1.0.0

func (f *FileSystemTasker) Build() error

func (*FileSystemTasker) BuildTasks added in v1.0.0

func (f *FileSystemTasker) BuildTasks() error

func (*FileSystemTasker) Exec added in v1.0.0

func (f *FileSystemTasker) Exec(isSync bool) error

func (*FileSystemTasker) RunTask added in v1.0.0

func (f *FileSystemTasker) RunTask(task *Task) error

type ITasker

type ITasker interface {
	Build() error
	BuildTasks() error
	AddTask(*Task)
	GetTasks() []*Task
	RunTask(*Task) error
	Run(TaskFunc) error
	SyncRun(TaskFunc) error
	AfterRun() error
	BeforeRun() error
	Progress() float64
}

type NullProgressBar added in v1.0.0

type NullProgressBar struct {
	// contains filtered or unexported fields
}

NullProgressBar 空进度条实现,用于不需要显示进度条的场景

func (*NullProgressBar) Finish added in v1.0.0

func (n *NullProgressBar) Finish()

Finish 空实现

func (*NullProgressBar) GetProgress added in v1.0.0

func (n *NullProgressBar) GetProgress() float64

GetProgress 返回当前进度

func (*NullProgressBar) Increment added in v1.0.0

func (n *NullProgressBar) Increment()

Increment 空实现

func (*NullProgressBar) SetProgress added in v1.0.0

func (n *NullProgressBar) SetProgress(current int)

SetProgress 空实现

func (*NullProgressBar) Start added in v1.0.0

func (n *NullProgressBar) Start(total int) error

Start 空实现

type ProgressBar added in v1.0.0

type ProgressBar interface {
	// Start 启动进度条,传入总任务数量
	Start(total int) error

	// Increment 增加进度,每次调用增加1个单位
	Increment()

	// SetProgress 设置当前进度值
	SetProgress(current int)

	// Finish 完成进度条显示
	Finish()

	// GetProgress 获取当前进度百分比 (0.0-1.0)
	GetProgress() float64
}

ProgressBar 进度条接口,定义进度条的基本操作 此接口设计为通用接口,方便后续更换其他进度条实现

func NewBubblesProgressBar added in v1.0.0

func NewBubblesProgressBar() ProgressBar

NewBubblesProgressBar 创建新的 bubbles 进度条实例

func NewNullProgressBar added in v1.0.0

func NewNullProgressBar() ProgressBar

NewNullProgressBar 创建空进度条实例

func NewProgressBar added in v1.0.0

func NewProgressBar() ProgressBar

NewProgressBar 工厂函数:创建默认进度条实例 此函数作为进度条的入口点,方便后续更换不同的进度条实现

type Task

type Task struct {
	RetryTime int         `json:"retry_count"`
	Err       error       `json:"err"`
	IsDone    bool        `json:"done"`
	Info      interface{} `json:"info"`
}

func NewTask added in v0.0.6

func NewTask(info interface{}, done bool) *Task

新建任务体 info: 具体的任务 done: 该任务是否已经完成,正确的传入此值可以提升效率

func (*Task) Done added in v0.0.6

func (t *Task) Done() *Task

type TaskFunc

type TaskFunc func(*Task) error

type Tasker

type Tasker struct {
	TaskId string
	Config *TaskerConfig
	// contains filtered or unexported fields
}

func NewTasker

func NewTasker() *Tasker

func (*Tasker) AddTask

func (t *Tasker) AddTask(task *Task)

func (*Tasker) AfterRun

func (t *Tasker) AfterRun() error

func (*Tasker) BeforeRun

func (t *Tasker) BeforeRun() error

func (*Tasker) Build

func (t *Tasker) Build() error

func (*Tasker) BuildTasks

func (t *Tasker) BuildTasks() error

func (*Tasker) GetErrorTasks added in v0.0.3

func (t *Tasker) GetErrorTasks() []*Task

func (*Tasker) GetTasks

func (t *Tasker) GetTasks() []*Task

func (*Tasker) Progress added in v0.0.6

func (t *Tasker) Progress() float64

获取进度

func (*Tasker) Run

func (t *Tasker) Run(runTaskFunc TaskFunc) error

func (*Tasker) RunTask

func (t *Tasker) RunTask(task *Task) error

func (*Tasker) SyncRun

func (t *Tasker) SyncRun(runTaskFunc TaskFunc) error

type TaskerConfig

type TaskerConfig struct {
	ProcessNum     int
	RetryMaxTime   int
	UseProgressBar bool
}

func NewTaskerConfig

func NewTaskerConfig() *TaskerConfig

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL