orbit

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 9, 2025 License: MIT Imports: 6 Imported by: 0

README ยถ

๐Ÿš€ Orbit Scheduler โ€“ Task Scheduling Made Easy!

Go Coverage MIT License

Orbit is a powerful yet intuitive job scheduler written entirely in Go. Effortlessly schedule, run, monitor, and manage your tasks, ensuring reliability and efficiency.

Orbit is a lightweight and dependency-free Go library โ€” no external modules required!


โœจ Why Orbit?

  • ๐Ÿ”ง Simple API: Quickly set up scheduled jobs using clean and intuitive methods.
  • โšก High Performance: Leverages Go's concurrency model to run thousands of tasks effortlessly.
  • ๐Ÿ“ˆ Built-In Monitoring: Track job execution in real-time with built-in monitoring hooks.
  • ๐ŸŽฏ Flexible Scheduling: Supports both interval-based and cron-based schedules.
  • ๐Ÿง  Intelligent Control: Pause, resume, and stop jobs on the fly โ€” interactively control any task like media playback.
  • ๐Ÿ”’ Safe & Reliable: Panic recovery and error isolation ensure your scheduler never crashes.

๐Ÿ“ฆ Installation

Simply install with:

go get github.com/osmike/orbit

๐Ÿšฆ Quick Start

Here's how easy it is to get started:

package main

import (
  "context"
  "fmt"
  "github.com/osmike/orbit"
  "time"
)

func main() {
  ctx := context.Background()

  pool, _ := orbit.CreatePool(ctx, orbit.PoolConfig{
    // MaxWorkers sets the maximum number of concurrent workers allowed to execute jobs simultaneously.
    // Higher values can improve throughput for CPU-bound or I/O-bound tasks, but might consume more system resources.
    // Default value:
    MaxWorkers: 1000,
    // IdleTimeout specifies the duration after which a job that remains idle
    // (not executed or scheduled for immediate execution) will be marked as inactive.
    // This helps optimize resource usage and prevents accumulation of stale tasks.
    // Default value:
    IdleTimeout: 100 * time.Hour,
    // CheckInterval defines how frequently the pool checks for jobs that are ready for execution or require status updates.
    // Short intervals result in more responsive job execution at the expense of slightly increased CPU utilization.
    CheckInterval: 100 * time.Millisecond,
  }, nil)

  jobCfg := orbit.JobConfig{
    ID:   "hello-world",
    Name: "Print Hello World",
    Fn: func(ctrl orbit.FnControl) error {
      fmt.Println("Hello, World!")
      return nil
    },
    Interval: orbit.IntervalConfig{Time: 5 * time.Second},
  }

  pool.AddJob(jobCfg)
  // Run starts the main controlling goroutine for the pool.
  // It continuously manages job scheduling, execution, and lifecycle events.
  pool.Run()

  select {} // Keep running indefinitely
}

๐Ÿ›  Features

๐ŸŽฎ Live Control: Pause, Resume, or Stop jobs dynamically โ€” as easily as managing a video or audio track.
๐Ÿ“… Advanced Job Example: Cron-Based Execution with State & Control

This example demonstrates how to:

  • initialize job state on start (OnStart)
  • perform incremental batch processing
  • support pause/resume interaction
  • run on a cron schedule
// onStartFn runs before the main job function.
// It initializes the state with the total number of rows to process.
jobID := "weekly-upload"

onStartFn := func(ctrl orb.FnControl) error {
    rowCnt := db.GetRowCount() // Get total rows to process from DB

    // Save the job's initial state
    ctrl.SaveData(map[string]interface{}{
        "rowCnt":  rowCnt,
        "uploaded": 0, // progress tracker
    })

    fmt.Printf("Job %s started, row count: %d\n", jobID, rowCnt)
    return nil
}


// mainFn handles the batch upload in chunks of 1000 rows.
// It supports pause/resume, stateful progress, and clean shutdown.
mainFn := func(ctrl orb.FnControl) error {
    for {
        select {
        case <-ctrl.PauseChan():
            fmt.Println("Paused... waiting for resume")

            <-ctrl.ResumeChan()
            fmt.Println("Resumed, reconnecting to DB...")
            reconnectToDB()

        case <-ctrl.Context().Done():
            return ctrl.Context().Err() // exit cleanly if context is cancelled

        default:
            // Read current data
            data := ctrl.GetData()

            uploaded := data["uploaded"].(int)
            total := data["rowCnt"].(int)

            // Simulate batch processing
            uploaded += db.BatchInsert(1000)

            // Persist updated data in job state
            ctrl.SaveData(map[string]interface{}{
                "rowCnt":  total,
                "uploaded": uploaded,
            })

            // Stop the job when all data is uploaded
            if uploaded >= total {
                fmt.Println("โœ… All data uploaded successfully.")
                return nil
            }
        }
    }
}
  

๐Ÿ“Œ This pattern is ideal for large ETL-like jobs, syncing data from external systems, or anything that may be paused and resumed on demand.

Then configure the job like this:

jobCfg := orbit.Job{
    ID:       jobID,
    Name:     "Weekly DB Upload",
    Fn:       mainFn,
    OnStart:  onStartFn,
    Cron:     "0 20 * * 5", // Every Friday at 20:00 (8 PM)
}

And add it to a running pool:

pool.AddJob(jobCfg)
๐Ÿง  Key Concepts
  • Persistent Job State: Use ctrl.SaveData() and ctrl.GetData() to persist job's data between runs or across iterations inside a job.
  • Pause & Resume: You can call PauseJob(id string) and ResumeJob(id string) at runtimefrom your application logic. The job can listen to ctrl.PauseChan() and ctrl.ResumeChan()to handle reconnections or resume where it left off.
  • Graceful Shutdown: Jobs respect ctrl.Context().Done() to terminate cleanly.
๐Ÿ•น Runtime Control Example: Pause & Resume the Job

You can pause and resume a running job dynamically using the pool's control methods:

// Pause the job with a timeout of 10 seconds.
// This timeout ensures two things:
//
// 1. If the job **never reads** from ctrl.PauseChan(), it will be auto-resumed after 10s.
// 2. It gives the job enough time to reach the point where it starts listening
//    for the pause signal (ctrl.PauseChan()) before it auto-resumes.
//
// This protects your system from accidental "stuck" states
// and provides a safe buffer for graceful transitions.
err := pool.PauseJob("weekly-upload", 10*time.Second)
if err != nil {
    log.Println("Pause failed:", err)
}

// Resume the job early (before the timeout ends)
err = pool.ResumeJob("weekly-upload")
if err != nil {
    log.Println("Resume failed:", err)
}

๐Ÿ”Ž Why the Timeout Matters?

In Orbit, a job becomes paused only when it explicitly reads from ctrl.PauseChan(). But some jobs might:

  • never reach that line
  • reach it too late (e.g., stuck in DB logic or long loops)

The timeout solves both cases by:

  • automatically resuming the job if it does not respond to pause
  • allowing you time to coordinate other actions (like logging, alerts, or data flush) before pause is actually picked up

Concurrency Control: Limit how many jobs run simultaneously.
๐Ÿงช Example: Concurrency Control in Action

You can control how many jobs run in parallel using the MaxWorkers option in your pool configuration.

๐Ÿ” Scenario

Let's say you want only one job at a time to run. You add 3 jobs with the same interval, but with slight delays in when they're registered:


pool, _ := orbit.CreatePool(context.Background(), context.Background(), orbit.PoolConfig{
    MaxWorkers:    1,                       // allow only 1 job at a time
    CheckInterval: 50 * time.Millisecond,  // quick job scanning
}, nil)

createJob := func(id string, delay time.Duration) orbit.Job {
    return orbit.Job{
        ID:       id,
        Name:     fmt.Sprintf("Job %s", id),
        Interval: orbit.Interval{Time: time.Second}, // runs every 1s
        Fn: func(ctrl orbit.FnControl) error {
            fmt.Printf("[%s] Started at %v\n", id, time.Now())
            time.Sleep(time.Second)
            fmt.Printf("[%s] Finished at %v\n", id, time.Now())
            return nil
        },
    }
}

pool.AddJob(createJob("job1", 0))
time.Sleep(100 * time.Millisecond)
pool.AddJob(createJob("job2", 100*time.Millisecond))
time.Sleep(100 * time.Millisecond)
pool.AddJob(createJob("job3", 200*time.Millisecond))

pool.Run()
select {}
๐Ÿ” Output (approximate):
[job1] Started at 00:00:00
[job1] Finished at 00:00:01
[job2] Started at 00:00:01
[job2] Finished at 00:00:02
[job3] Started at 00:00:02
[job3] Finished at 00:00:03
๐Ÿ“Œ Takeaway

Even though all jobs have the same interval (1s), they are executed sequentially because MaxWorkers = 1. The order of execution is determined by the order in which jobs were added to the pool.


Lifecycle Hooks: Customize behavior with hooks (OnStart, OnSuccess, OnError, etc.).

Orbit gives you full control over the job lifecycle by providing hooks for key events. Hooks allow you to log, trace, or augment the behavior of jobs without touching the core logic.

stateDiagram-v2

    state Running {
        [*] --> OnStart
        OnStart --> Executing : run Fn()
        Executing --> OnSuccess : no error
        Executing --> OnError : error occurred
        OnSuccess --> Finally
        OnError --> Finally
        Finally --> [*]
    }
Supported Hooks:
  • OnStart โ€“ triggered when the job starts
  • OnPause / OnResume โ€“ triggered on manual pause/resume
  • OnStop โ€“ triggered on manual stop
  • OnSuccess โ€“ after successful execution
  • OnError โ€“ when an error occurs during job execution
  • Finally โ€“ always executed at the end, regardless of result
๐Ÿ” Example: Logging Job Lifecycle Events

This example demonstrates a full lifecycle job that logs execution through each phase:

var logger = log.New(os.Stdout, "", log.LstdFlags)

job := orbit.Job{
    ID:   "hooked-job",
    Name: "Lifecycle Logging Demo",
    Interval: orbit.Interval{Time: 1 * time.Second},
    Fn: func(ctrl orbit.FnControl) error {
        logger.Println("[main] syncing data...")
        time.Sleep(300 * time.Millisecond)
        return errors.New("sync failed") // force error for demo
    },
    Hooks: orbit.Hooks{
        OnStart: orbit.Hook{
            Fn: func(ctrl orbit.FnControl, err error) error {
                logger.Println("[hook] OnStart: job starting...")
                return nil
            },
        },
        OnPause: orbit.Hook{
            Fn: func(ctrl orbit.FnControl, err error) error {
                logger.Println("[hook] OnPause: job paused")
                return nil
            },
        },
        OnResume: orbit.Hook{
            Fn: func(ctrl orbit.FnControl, err error) error {
                logger.Println("[hook] OnResume: job resumed")
                return nil
            },
        },
        OnStop: orbit.Hook{
            Fn: func(ctrl orbit.FnControl, err error) error {
                logger.Println("[hook] OnStop: job stopped")
                return nil
            },
        },
        OnSuccess: orbit.Hook{
            Fn: func(ctrl orbit.FnControl, err error) error {
                logger.Println("[hook] OnSuccess: job succeeded")
                return nil
            },
        },
        OnError: orbit.Hook{
            Fn: func(ctrl orbit.FnControl, err error) error {
                logger.Printf("[hook] OnError: job failed: %v\n", err)
                return nil
            },
        },
        Finally: orbit.Hook{
            Fn: func(ctrl orbit.FnControl, err error) error {
                logger.Println("[hook] Finally: job finished")
                return nil
            },
        },
    },
}
โœ… Sample Output:
[hook] OnStart: job starting...
[main] syncing data...
[hook] OnError: job failed: sync failed
[hook] Finally: job finished
๐Ÿง  Best Practices
  • Use hooks to log, monitor, or recover state cleanly.
  • Return errors inside hooks if you want the execution to halt (unless IgnoreError is set).
  • Combine with ctrl.SaveData() to track job-level metrics.

โ™ป๏ธRetry Mechanism: Automatically retry failed tasks with configurable strategy.

Orbit allows you to easily configure retry behavior for your jobs. You can define whether a job should retry on failure, how many times, or even retry infinitely!

๐Ÿงช Example: Retry Scenarios

Let's configure three jobs with different retry strategies:

package main

import (
    "context"
    "errors"
    "fmt"
    "time"

    "github.com/osmike/orbit"
)

func main() {


    pool, _ := orbit.CreatePool(context.Background(), orbit.PoolConfig{
        MaxWorkers:    3,
        CheckInterval: 10 * time.Millisecond,
    }, nil)

    failFn := func(ctrl orbit.FnControl) error {
        return errors.New("oops, failed!")
    }

    // Define three jobs:
    jobs := []orbit.Job{
        {
            ID: "no-retry",
            Fn: failFn,
            Retry: orbit.Retry{
                Active: false, // No retries โ€” fail once and stop
            },
            Interval: orbit.Interval{Time: 50 * time.Millisecond},
        },
        {
            ID: "three-retry",
            Fn: failFn,
            Retry: orbit.Retry{
                Active: true,
                Count:  3, // Retry up to 3 times after initial failure
            },
            Interval: orbit.Interval{Time: 50 * time.Millisecond},
        },
        {
            ID: "infinite-retry",
            Fn: failFn,
            Retry: orbit.Retry{
                Active: true,
                Count:  0, // 0 means infinite retries!
            },
            Interval: orbit.Interval{Time: 50 * time.Millisecond},
        },
    }

    pool.Run()

    // Add jobs to the pool
    for _, job := range jobs {
        _ = pool.AddJob(job)
    }

    // Let them run for a while
    time.Sleep(600 * time.Millisecond)

    // Retrieve metrics
    metrics := pool.GetMetrics()

    for id, m := range metrics {
        fmt.Printf("Job %s: failures = %d, status = %s\n", id, m.(orbit.JobState).Failure, m.(orbit.JobState).Status)
    }
}

Job ID Behavior Failure Count Final Status
no-retry Fails once, no retries 1 error
three-retry Fails + retries 3 times 4 (1+3) error
infinite-retry Keeps retrying indefinitely โ‰ฅ5 (and growing) running or error depending on timing
๐Ÿ“Œ Takeaway
  • Retry.Active: false โ€” the job fails once and is automatically removed from the pool.
  • Retry.Count > 0 โ€” the job retries up to Count times; after that, it is removed from the pool.
  • Retry.Count == 0 โ€” the job retries infinitely until it is manually stopped.
  • Retry.ResetOnSuccess: true โ€” After a successful execution, the retry counter is reset, allowing fresh retries if future failures occur.
  • Automatic job cleanup: Orbit ensures that jobs which are no longer retryable do not clog the pool โ€” they are automatically deleted after exhausting retries or immediately if retries are disabled.

โšก Tip: Always monitor your retry settings carefully to balance fault-tolerance and resource usage.

โšก Important

  • Retries are scheduled naturally: After a failure, the job returns to the Waiting state and follows the normal CheckInterval + Interval or Cron scheduling for the next run.
  • Jobs that exhaust retries or have no retries configured are cleanly removed to prevent resource leaks.

๐Ÿ›‘ Graceful Shutdown: Safely terminate all jobs with Kill()

Orbit provides a reliable way to gracefully shutdown the entire pool and stop all running or waiting jobs safely via the Kill() method.

When you call pool.Kill():

  • All jobs are canceled through the shared pool.Context().
  • Each job updates its metrics with
    • Status: stopped
    • Error: errs.ErrPoolShutdown
  • All jobs are removed from the pool.
  • The pool is marked as killed and cannot be restarted.
๐Ÿงช Example: Graceful Kill

Let's create a simple pool with one long-running job and gracefully shut it down:

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/osmike/orbit"
)

func main() {


    pool, _ := orbit.CreatePool(context.Background(), orbit.PoolConfig{
        MaxWorkers:    1,
        CheckInterval: 50 * time.Millisecond,
    }, nil)

    longJob := orbit.Job{
        ID: "long-running-job",
        Fn: func(ctrl orbit.FnControl) error {
            fmt.Println("Job started")
            <-ctrl.Context().Done()
            fmt.Println("Job canceled:", ctrl.Context().Err())
            return ctrl.Context().Err()
        },
        Interval: orbit.Interval{Time: 1 * time.Hour}, // Never intended to complete
    }

    _ = pool.AddJob(longJob)

    pool.Run()

    time.Sleep(100 * time.Millisecond) // Give some time to start

    pool.Kill() // Gracefully shutdown

    time.Sleep(100 * time.Millisecond) // Wait a little for cleanup

    // Trying to run the pool again will fail
    if err := pool.Run(); err != nil {
        fmt.Println("Cannot restart killed pool:", err)
    }
}
Step What Happens
Kill() called Pool context is canceled; all jobs receive Done() signal
Jobs react to cancellation Jobs detect ctx.Done() and exit if implemented correctly
Metrics update Status = stopped, Error = ErrPoolShutdown recorded in metrics
Pool cleanup Jobs are removed from internal storage
Pool is killed Run() will return an error if called again
๐Ÿ“Œ Takeaway
  • Kill() does not forcefully kill goroutines โ€” it signals cancellation via context.
  • Jobs should respect ctx.Done() to exit gracefully.
  • After Kill(), the pool is permanently shut down.
  • Metrics will reflect the final stopped state with the shutdown reason.
โšก Tip: Always design your jobs to listen to ctrl.Context().Done() if you want graceful shutdown behavior!
โšก Quick Recap
p.Kill() 
โ†’ ctx.Cancel() โ†’ jobs detect Done() 
โ†’ update metrics (stopped + shutdown error) 
โ†’ remove jobs 
โ†’ pool is permanently dead

๐Ÿ“Š Monitoring & Metrics

Orbit automatically tracks the state of each job in real time.

Collected metrics include:

Metric Description
๐Ÿ•ฐ Start Time When the job starts execution
๐Ÿ•ฐ End Time When the job finishes execution
๐Ÿƒ Execution Time Total execution duration
โœ… Success Counts Number of successful executions
โŒ Failure Counts Number of failed executions
๐Ÿ“Œ Custom User Metrics User-defined metadata (SaveData)
๐Ÿšฆ Status Current job status (Waiting, Running, Completed, Error, Paused, Stopped, Ended)

โš™๏ธ How monitoring works

Monitoring updates happen automatically at every significant lifecycle event and periodically while a job is running:

Event What gets saved
Job creation Status set to Waiting, initial state saved
Before job start (Waiting state) Orbit checks every CheckInterval (default 100ms) if StartAt time has arrived
Job start Status changed to Running, StartAt recorded, NextRun scheduled, Data wiped clean
During job execution (Running state) Every CheckInterval, Orbit updates ExecutionTime and saves metrics
Execution error Status set to Error, execution error saved
Successful completion Status set to Completed
Retry after failure Status goes Error โ†’ Completed โ†’ Waiting automatically
Exhausted retries Status set to Ended, job removed from the pool
Pause/Resume Status updated to Paused and back to Running
Pool shutdown All jobs forcibly set to Stopped, error ErrPoolShutdown attached

โœ… On every state change or execution time update, SaveMetrics() is called automatically.

๐Ÿงฉ Custom Monitoring

Orbit uses a Monitoring interface!

You can implement your own Monitoring system easily (for example, sending metrics to Prometheus, DataDog, logs, etc.).

Just pass your custom implementation when creating a Pool:

myMon := NewCustomMonitoring()

pool := orb.CreatePool(orbit.PoolConfig{
    MaxWorkers: 10,
}, myMon)

If you don't provide a custom monitor, Orbit uses a built-in in-memory monitoring system by default.

To fetch metrics, simply call:

metrics := pool.GetMetrics()

๐Ÿ“œ Full Job Lifecycle

flowchart TD
    classDef waiting fill:#ffeeba,stroke:#f0ad4e,color:#003366,stroke-width:2px
    classDef running fill:#c3e6cb,stroke:#28a745,color:#000000,stroke-width:2px
    classDef completed fill:#bee5eb,stroke:#17a2b8,color:#003366,stroke-width:2px
    classDef error fill:#f5c6cb,stroke:#dc3545,color:#000000,stroke-width:2px
    classDef ended fill:#d6d8db,stroke:#6c757d,color:#000000,stroke-width:2px
    classDef stopped fill:#e2e3e5,stroke:#6c757d,color:#003366,stroke-width:2px

    A[Job created<br/>SaveMetrics] --> B[Status: Waiting]
    B -->|CheckInterval| C{StartAt reached?}
    C -- No --> B
    C -- Yes --> D[Status: Running<br/>Wipe Data<br/>SaveMetrics]
    D -->|CheckInterval| E{Execution continues}
    E -- No error --> F[Status: Completed<br/>SaveMetrics]
    F -->|NextRun scheduled| B
    E -- Error --> G[Status: Error<br/>SaveMetrics]
    G -- Retry available --> F
    G -- Retries exhausted --> H[Status: Ended<br/>SaveMetrics<br/>Remove job]
    D -- Timeout or shutdown --> I[Status: Stopped<br/>SaveMetrics]

    class B waiting
    class D running
    class F completed
    class G error
    class H ended
    class I stopped

๐Ÿ“Œ Key Points

  • Orbit checks jobs every CheckInterval (default 100ms).
  • ExecutionTime is updated periodically while running โ€” not just at the end!
  • Data is wiped clean before every new execution (Data = {} at start).
  • Monitoring is fully pluggable โ€” just implement the Monitoring interface!

๐Ÿงฉ Example: Custom Monitoring with Prometheus

Orbit allows you to define your own custom Monitoring implementations.

You can seamlessly integrate Orbit with external observability systems like Prometheus, DataDog, New Relic, or any other monitoring backend.

Hereโ€™s a real-world example of integrating Orbit with Prometheus using its official Go client:

package mymonitoring

import (
    "orbit"
    "github.com/prometheus/client_golang/prometheus"
)

// PrometheusMonitoring implements the Monitoring interface and exports Orbit job metrics to Prometheus.
type PrometheusMonitoring struct {
    jobExecutionTime *prometheus.GaugeVec
    jobSuccessCount  *prometheus.CounterVec
    jobFailureCount  *prometheus.CounterVec
}

// NewPrometheusMonitoring initializes Prometheus metrics and registers them.
func NewPrometheusMonitoring() *PrometheusMonitoring {
    pm := &PrometheusMonitoring{
        jobExecutionTime: prometheus.NewGaugeVec(
            prometheus.GaugeOpts{
                Name: "orbit_job_execution_time_seconds",
                Help: "Execution time of a job in seconds",
            },
            []string{"job_id"},
        ),
        jobSuccessCount: prometheus.NewCounterVec(
            prometheus.CounterOpts{
                Name: "orbit_job_success_total",
                Help: "Total number of successful job completions",
            },
            []string{"job_id"},
        ),
        jobFailureCount: prometheus.NewCounterVec(
            prometheus.CounterOpts{
                Name: "orbit_job_failure_total",
                Help: "Total number of failed job executions",
            },
            []string{"job_id"},
        ),
    }

    // Register metrics
    prometheus.MustRegister(pm.jobExecutionTime)
    prometheus.MustRegister(pm.jobSuccessCount)
    prometheus.MustRegister(pm.jobFailureCount)

    return pm
}

// SaveMetrics collects and exports metrics from the given JobState.
func (pm *PrometheusMonitoring) SaveMetrics(dto orbit.JobState) {
    jobID := dto.JobID

    pm.jobExecutionTime.WithLabelValues(jobID).Set(float64(dto.ExecutionTime) / 1e9) // nanoseconds -> seconds
    pm.jobSuccessCount.WithLabelValues(jobID).Add(float64(dto.Success))
    pm.jobFailureCount.WithLabelValues(jobID).Add(float64(dto.Failure))
}

// GetMetrics (optional) returns dummy information since Prometheus pulls metrics externally.
func (pm *PrometheusMonitoring) GetMetrics() map[string]interface{} {
    return map[string]interface{}{
        "info": "Metrics are available at the Prometheus endpoint",
    }
}
๐Ÿ”ฅ Key points:
  • SaveMetrics is automatically called every time a job's state changes (Start, Completion, Error, Retry, Pause, Resume, etc).
  • You can track execution times, success/failure counters, status, and even custom runtime metadata.
  • Prometheus scrapes the registered metrics automatically via the /metrics HTTP endpoint.
๐Ÿ“Œ Important
  • Monitoring is fully pluggable via the Monitoring interface.
  • You can provide your own implementation for Prometheus, DataDog, New Relic, InfluxDB, or even log them manually.
  • If no custom Monitoring is passed when creating a Pool, Orbit uses its default lightweight in-memory Monitoring.

๐Ÿ—‚ Project Structure

orbit/
โ”œโ”€โ”€ internal/
โ”‚   โ”œโ”€โ”€ domain/     # Core domain definitions
โ”‚   โ”œโ”€โ”€ error/      # Custom error handling
โ”‚   โ”œโ”€โ”€ job/        # Job execution and lifecycle management
โ”‚   โ””โ”€โ”€ pool/       # Job pool management
โ”œโ”€โ”€ mon.go      # Implementation of default monitoring storage
โ””โ”€โ”€ orbit.go    # Main scheduler API entry point

๐Ÿ“š Public API Reference

1. Constants
Constant Description
DEFAULT_NUM_WORKERS Default number of concurrent workers per pool (1000).
DEFAULT_CHECK_INTERVAL Default interval to check job states (100ms).
DEFAULT_IDLE_TIMEOUT Default timeout for considering a job idle (100h).
DEFAULT_PAUSE_TIMEOUT Default timeout for pause acknowledgment (1s).
MAX_END_AT Maximum end time for job execution (year 9999).
2. Types

JobStatus string

Represents a jobโ€™s lifecycle state:

Status Description
waiting The job is waiting to be executed.
running The job is currently executing. Pool monitors a job currently in the "Running" state, checking for execution timeouts or runtime errors. If the job exceeds its configured timeout, it is marked as Error, triggering its finalization and metric recording.
completed The job has finished execution successfully. When completed, pool checks if the job has future scheduled executions. If another execution is pending, the job state is reset to "Waiting". Otherwise, the job is marked as "Ended", indicating no further executions are planned.
paused The job is temporarily paused. Pause method attempts to pause the currently running job by sending a non-blocking signal to pauseCh (retrive via FnControl.PauseChan() <-chan struct{}). Once the pause signal is sent, a timeout watcher is started in a separate goroutine. If the job fails to acknowledge the pause (by reading from pauseCh) within the specified timeout, the job's status is automatically reverted back to Running.
stopped The job has been explicitly stopped and will not run unless restarted. The method triggers immediate job cancellation via its execution context. When job is resumed via pool.ResumeJob(id string) its triggers recreation of job context usig pool context.
ended The job reached its defined end condition (e.g., end time or retry limit). When ended, job is removing from the pool and this triggers graceful cleanup with canceling context and closing channels
error The job encountered an error during execution. If retries are available, the job is rescheduled by setting status to Completed. If retries are exhausted or disabled, the job is finalized (Ended) and removed from the pool. (Note: In this case, execution errors are not stored in the final state.)

JobState struct

Represents the runtime state of a job.

Field Type Description
JobID string Unique identifier of the job associated with this state.
StartAt time.Time Timestamp when the job execution started.
EndAt time.Time Timestamp when the job execution ended (zero if still running).
Error StateError Captures any execution errors and lifecycle hook errors.
Status JobStatus Current lifecycle status (waiting, running, completed, error, etc.).
ExecutionTime int64 Duration of the job's execution (in nanoseconds).
Data map[string]interface{} User-defined runtime key-value data captured during execution via FnControl.SaveData().
Success int Number of times the job has completed successfully.
Failure int Number of times the job has failed.
NextRun time.Time Scheduled time for the next execution (zero for one-time jobs).

StateError struct

Captures all types of errors that can occur during job execution.

Field Type Description
JobError error Error returned by the main job function (Fn).
HookError struct containing errors from lifecycle hooks (OnStart, OnError, Finally, etc.).

Method:

  • IsEmpty() bool: Returns true if both JobError and all HookError fields are nil โ€” meaning no errors occurred during job execution or hooks.

HookError struct

Field Types Description
OnStart error Error from OnStart hook.
OnStop error Error from OnStop hook.
OnError error Error from OnError hook.
OnSuccess error Error from OnSuccess hook.
OnPause error Error from OnPause hook.
OnResume error Error from OnResume hook.
Finally error Error from Finally hook.

Job struct

Configuration for creating a new job.

Field Type Description
ID string Required. Unique identifier of the job.
Name string Optional human-readable name (defaults to ID if empty).
Fn func(ctrl FnControl) error Main execution function of the job, accepting a FnControl.
Interval Interval Defines when and how often the job should run (either interval or cron).
Timeout time.Duration Maximum allowed time for job execution before cancellation.
StartAt time.Time Earliest time when the job can start (defaults to time.Now() if unset).
EndAt time.Time Latest time when the job can run (defaults to MAX_END_AT if unset).
Retry Retry Retry behavior configuration on execution failure.
Hooks Hooks Optional lifecycle hooks for additional job logic at specific stages.

Retry struct

Defines retry behavior for failed jobs.

Field Type Description
Active bool Enables or disables retry logic. If Active: false โ†’ No retries.
Count int Number of retry attempts allowed. 0 means infinite retries. If Count > 0 โ†’ Retry up to Count times, then finalize the job.
ResetOnSuccess bool If true, resets retry counter after a successful execution.

Interval

Defines job scheduling options.

Field Type Description
Time time.Duration Fixed duration between executions (e.g., every 5 minutes).
CronExpr string Cron expression (e.g., "0 0 * * *" for midnight daily).

Only one of Time or CronExpr should be set; setting both results in an error.

Hooks

Lifecycle hooks that can run custom logic at different points during a job's life.

Field Type Description
OnStart Hook Triggered right before the job starts.
OnStop Hook Triggered when the job is stopped.
OnError Hook Triggered if the main job function returns an error.
OnSuccess Hook Triggered if the job completes successfully.
OnPause Hook Triggered when the job is paused.
OnResume Hook Triggered when the job is resumed.
Finally Hook Always triggered after job finishes, regardless of success or failure.

Hook

Field Type Description
Fn func(ctrl FnControl, err error) error Fn is the function to be executed during the lifecycle stage. It receives the current FnControl and an optional error for handling error in OnError hook. It should return an error if the hook fails.
IgnoreError bool IgnoreError determines whether hook execution errors should be ignored. If true, the job will proceed even if the hook fails. If false, the job will halt and treat the hook error as fatal.

FnControl

An interface providing control and metadata access inside job logic.

Method Description
SaveData(map[string]interface{}) Save custom runtime data (merged into JobState.Data).
GetData() map[string]interface{} Retrieve a copy of the saved runtime data.
Context() context.Context Execution context to monitor for cancellation or timeout.
PauseChan() <-chan struct{} Channel signaling the job to pause.
ResumeChan() <-chan struct{} Channel signaling the job to resume after pause.

PoolConfig

Defines configuration settings for initializing a scheduler execution pool.

Field Type Description
MaxWorkers int Maximum number of concurrent workers allowed. If 0, defaults to 1000.
CheckInterval time.Duration How frequently the pool checks jobs for execution eligibility. If 0, defaults to 100ms.
IdleTimeout time.Duration Maximum time a job can remain idle before being marked as inactive. If 0, defaults to 100h.

This configuration is passed to scheduler.CreatePool(...) and determines how the pool handles parallel execution and job polling frequency.

Pool

Represents a job execution engine. Manages job scheduling, concurrency, and lifecycle control.

Method Description
AddJob(job Job) error Adds a job to the pool. Fails if the ID already exists or status is not Waiting.
RemoveJob(id string) error Removes a job from the pool by ID. Does not stop the job explicitly.
PauseJob(id string, timeout time.Duration) error Sends a pause signal to a running job. Timeout defines how long the job has to acknowledge the pause.
ResumeJob(id string) error Resumes a job from Paused or Stopped state. Recreates internal context from pool context.
StopJob(id string) error Cancels job execution and updates status to Stopped.
Run() error Starts the pool scheduler loop. Checks job states every CheckInterval and enforces MaxWorkers.
Kill() Immediately stops all jobs, cancels context, and marks the pool as shut down permanently.
GetMetrics() map[string]interface{} Returns collected metrics from the provided Monitoring implementation.

Orbit

Main orchestrator for pool and job lifecycle.

Method Description
CreatePool(ctx context.Context, cfg PoolConfig, mon Monitoring) *Pool Initializes a pool with the given configuration and monitoring strategy.

โš–๏ธ License

MIT License


๐Ÿš€ Ready to schedule smarter? Get Orbit now!


Documentation ยถ

Index ยถ

Constants ยถ

This section is empty.

Variables ยถ

This section is empty.

Functions ยถ

This section is empty.

Types ยถ

type DefaultMon ยถ

type DefaultMon struct {
	// contains filtered or unexported fields
}

DefaultMon provides an in-memory, thread-safe implementation of the domain.Monitoring interface.

It stores and retrieves execution metrics for scheduled jobs using a concurrent-safe map (`sync.Map`). This basic implementation is suitable for internal debugging, testing, and simple runtime analytics. For production scenarios, it is advisable to extend or replace this implementation with more advanced solutions.

func (*DefaultMon) GetMetrics ยถ

func (m *DefaultMon) GetMetrics() map[string]interface{}

GetMetrics retrieves all stored job execution metrics.

Returns:

  • A map with JobID as keys and JobState values representing the captured metrics and state details of each job execution.

func (*DefaultMon) SaveMetrics ยถ

func (m *DefaultMon) SaveMetrics(dto JobState)

SaveMetrics stores execution metrics from the provided StateDTO into the monitoring storage.

Metrics are indexed by the job's unique identifier (JobID), allowing efficient retrieval.

Parameters:

  • dto: JobState containing execution details of the job.

type FnControl ยถ

type FnControl = domain.FnControl

FnControl provides job execution control and runtime metadata storage.

Offers control mechanisms:

  • Context: Execution context (cancellation, deadlines).
  • PauseChan: Channel signaling pause requests.
  • ResumeChan: Channel signaling resume requests.

Methods:

  • SaveData(map[string]interface{}): Stores custom job runtime metadata.

type Hook ยถ

type Hook = domain.Hook

Hook defines a function that is executed during a specific lifecycle stage of a job.

Each Hook consists of:

  • Fn: the hook function to be executed.
  • IgnoreError: whether to suppress errors returned by the hook.

Used in HooksFunc to define custom actions for events like start, stop, error, success, etc.

type Hooks ยถ

type Hooks = domain.Hooks

Hooks provides lifecycle hooks to inject custom logic at different job execution stages.

Available hooks:

  • OnStart: Executed before job starts.
  • OnStop: Executed when job is explicitly stopped.
  • OnError: Executed if job execution encounters an error.
  • OnSuccess: Executed when job completes successfully.
  • OnPause: Executed when job is paused.
  • OnResume: Executed when job resumes after pause.
  • Finally: Always executed after job ends (successful, error, paused, stopped).

type Interval ยถ

type Interval = domain.Interval

Interval encapsulates job scheduling settings.

Parameters:

  • Time: Time between job executions (set 0 if using cron expression).
  • CronExpr: Cron expression defining job execution schedule (leave empty if using interval).

type Job ยถ

type Job = domain.JobDTO

Job defines a job's configuration and execution details.

Parameters:

  • ID: Unique identifier for the job.
  • Name: Human-readable name of the job.
  • Fn: The function executed by the job.
  • Schedule: Scheduling parameters (interval or cron expression).
  • Timeout: Maximum allowed execution duration for the job.
  • StartAt: Earliest time when the job is allowed to start.
  • EndAt: Latest time when the job can still run.
  • Retry: Retry behavior in case of job execution failures.
  • Hooks: Lifecycle hooks for custom logic execution.

type JobState ยถ

type JobState = domain.StateDTO

JobState represents the current runtime state of a job, including metadata such as:

  • StartAt / EndAt timestamps
  • Execution duration
  • Current status (Waiting, Running, Completed, etc.)
  • Success/failure counts
  • Custom key-value data stored via FnControl
  • Any error or hook-related failure that occurred

This type is returned by FnControl `ctrl.GetData()`, and is useful for logging, monitoring, debugging, or exposing job status via API.

type JobStatus ยถ

type JobStatus = domain.JobStatus

JobStatus represents the current lifecycle status of a job.

Possible statuses include:

  • Waiting
  • Running
  • Completed
  • Error
  • Ended
  • Stopped
  • Paused

Use this type to monitor and control job state transitions.

type Monitoring ยถ

type Monitoring interface {
	// SaveMetrics stores execution metrics derived from a job's execution state (StateDTO).
	//
	// Implementations typically capture metrics like execution timestamps, duration,
	// final status, encountered errors, and user-defined runtime metadata.
	//
	// Parameters:
	//   - dto: StateDTO instance containing the job's execution details and metadata.
	SaveMetrics(dto JobState)

	// GetMetrics retrieves all stored execution metrics.
	//
	// Returns:
	//   - A map keyed by JobID, where each entry contains the StateDTO representing
	//     execution metrics for a specific job.
	GetMetrics() map[string]interface{}
}

Monitoring defines an interface for collecting, storing, and retrieving metrics related to job execution.

Implementations of this interface can persist metrics in various ways, such as: - In-memory storage for simple debugging and development purposes. - Real-time logging for operational monitoring. - External systems like dashboards, time-series databases, or analytics platforms.

type Pool ยถ

type Pool interface {
	AddJob(cfg Job) error
	Run() error
	Kill()
	StopJob(jobID string) error
	PauseJob(jobID string, timeout time.Duration) error
	ResumeJob(jobID string) error
	RemoveJob(jobID string) error
	GetMetrics() map[string]interface{}
}

Pool represents a job execution pool.

Provides methods for managing job execution lifecycle, concurrency, job state control, and interaction with monitoring.

func CreatePool ยถ added in v0.0.2

func CreatePool(ctx context.Context, cfg PoolConfig, mon Monitoring) (Pool, error)

CreatePool creates and configures a new job execution pool.

Parameters:

  • cfg: PoolConfig specifying MaxWorkers, CheckInterval, and IdleTimeout.
  • mon: Implementation of Monitoring interface for metrics collection. Defaults to internal in-memory monitoring if nil.

Returns:

  • Initialized and ready-to-use Pool instance.

type PoolConfig ยถ

type PoolConfig = domain.Pool

PoolConfig encapsulates the configuration settings required to initialize a new scheduler pool.

Parameters:

  • MaxWorkers: Maximum number of concurrent workers to execute jobs. Default is 1000 if set to 0.
  • CheckInterval: Time at which the scheduler checks for jobs to execute. Default is 100ms if set to 0.
  • IdleTimeout: Duration after which idle workers may be terminated. Default is 100 hours if set to 0.

type Retry ยถ

type Retry = domain.Retry

Retry defines retry behavior for a job.

Parameters:

  • Count: Number of allowed retries after execution failure.
  • Time: Time interval between retries.
  • ResetOnSuccess: Flag to reset retries count after successful job execution.

Directories ยถ

Path Synopsis
internal
job
pool
Package pool implements the core orchestration engine responsible for executing and managing scheduled jobs.
Package pool implements the core orchestration engine responsible for executing and managing scheduled jobs.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL