goflow

package module
v1.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2026 License: MIT Imports: 0 Imported by: 0

README

goflow

Go Reference Go Report Card CI codecov

A Go library for concurrent applications with rate limiting, task scheduling, and streaming.

Features

Rate Limiting (pkg/ratelimit)

  • Token bucket and leaky bucket algorithms
  • Concurrency limiting
  • Prometheus metrics

Task Scheduling (pkg/scheduling)

  • Worker pools with graceful shutdown
  • Cron-based scheduling
  • Multi-stage pipelines
  • Context-aware timeouts

Streaming (pkg/streaming)

  • Functional stream operations
  • Background buffering
  • Backpressure control
  • Channel utilities

Installation

go get github.com/1mb-dev/goflow

Usage

package main

import (
    "context"
    "fmt"
    "log"
    "time"
    
    "github.com/1mb-dev/goflow/pkg/ratelimit/bucket"
    "github.com/1mb-dev/goflow/pkg/scheduling/workerpool"
    "github.com/1mb-dev/goflow/pkg/scheduling/scheduler"
)

func main() {
    limiter, err := bucket.NewSafe(10, 20) // 10 RPS, burst 20
    if err != nil {
        log.Fatal(err)
    }

    pool, err := workerpool.NewWithConfigSafe(workerpool.Config{
        WorkerCount: 5,
        QueueSize:   100,
        TaskTimeout: 30 * time.Second,
    })
    if err != nil {
        log.Fatal(err)
    }
    defer func() { <-pool.Shutdown() }()

    if limiter.Allow() {
        task := workerpool.TaskFunc(func(ctx context.Context) error {
            fmt.Println("Processing request...")
            return nil
        })

        if err := pool.Submit(task); err != nil {
            log.Printf("Failed to submit task: %v", err)
        }
    }
}

Components

Rate Limiters

  • bucket.NewSafe(rate, burst) - Token bucket with burst capacity
  • leakybucket.New(rate) - Smooth rate limiting
  • concurrency.NewSafe(limit) - Concurrent operations control

Scheduling

  • workerpool.NewSafe(workers, queueSize) - Background task processing
  • scheduler.New() - Cron and interval scheduling

Streaming

  • stream.FromSlice(data) - Functional data processing
  • writer.New(config) - Async buffered writing

Documentation

Development

make install-hooks  # Install pre-commit hook
make test           # Run tests with race detection
make lint           # Run linter
make benchmark      # Run performance benchmarks

The pre-commit hook automatically:

  • Checks for potential secrets
  • Formats Go code with goimports and gofmt
  • Runs golangci-lint on changed files
  • Verifies the build succeeds

Contributing

See Contributing for contribution guidelines.

License

MIT License - see LICENSE for details.

Documentation

Overview

Package goflow provides a Go library for concurrent applications with rate limiting, task scheduling, and streaming.

Rate Limiting (pkg/ratelimit):

  • bucket: Token bucket rate limiter with burst capacity
  • leakybucket: Smooth rate limiting without bursts
  • concurrency: Control concurrent operations
  • distributed: Multi-instance rate limiting with Redis

Task Scheduling (pkg/scheduling):

  • workerpool: Background task processing
  • scheduler: Cron and interval-based scheduling
  • pipeline: Multi-stage data processing

Streaming (pkg/streaming):

  • stream: Functional data operations
  • writer: Async buffered writing

Example usage:

import (
	"github.com/1mb-dev/goflow/pkg/ratelimit/bucket"
	"github.com/1mb-dev/goflow/pkg/scheduling/workerpool"
)

limiter, _ := bucket.NewSafe(10, 20) // 10 RPS, burst 20
pool, _ := workerpool.NewSafe(5, 100) // 5 workers, queue 100

if limiter.Allow() {
	pool.Submit(task)
}

Directories

Path Synopsis
examples
rate_limiter command
Rate limiting example demonstrating different types of limiters
Rate limiting example demonstrating different types of limiters
streaming command
Streaming example demonstrating data processing with streams and async writing
Streaming example demonstrating data processing with streams and async writing
web-service command
Package main demonstrates a complete web service using multiple goflow modules: - Rate limiting for API endpoints - Worker pools for background tasks - Concurrency limiting for resource protection - Pipeline processing for data transformation - Scheduled tasks for maintenance
Package main demonstrates a complete web service using multiple goflow modules: - Rate limiting for API endpoints - Worker pools for background tasks - Concurrency limiting for resource protection - Pipeline processing for data transformation - Scheduled tasks for maintenance
worker_pool command
Worker pool example demonstrating background task processing
Worker pool example demonstrating background task processing
internal
testutil
Package testutil provides common testing utilities for the goflow library.
Package testutil provides common testing utilities for the goflow library.
pkg
common/errors
Package gferrors provides common error types and utilities for the goflow library.
Package gferrors provides common error types and utilities for the goflow library.
common/validation
Package validation provides common validation utilities for configuration parameters across the goflow library.
Package validation provides common validation utilities for configuration parameters across the goflow library.
ratelimit
Package ratelimit provides rate limiting primitives for Go applications.
Package ratelimit provides rate limiting primitives for Go applications.
ratelimit/concurrency
Package concurrency provides concurrency limiting for Go applications.
Package concurrency provides concurrency limiting for Go applications.
ratelimit/leakybucket
Package leakybucket provides leaky bucket rate limiting for Go applications.
Package leakybucket provides leaky bucket rate limiting for Go applications.
scheduling
Package scheduling provides task scheduling and execution primitives for Go applications.
Package scheduling provides task scheduling and execution primitives for Go applications.
scheduling/pipeline
Package pipeline provides multi-stage data processing with error handling and monitoring.
Package pipeline provides multi-stage data processing with error handling and monitoring.
scheduling/scheduler
Package scheduler provides task scheduling with support for delayed execution, repeating tasks, and cron expressions.
Package scheduler provides task scheduling with support for delayed execution, repeating tasks, and cron expressions.
scheduling/workerpool
Package workerpool provides a worker pool for executing tasks concurrently with a fixed number of goroutines and bounded queue.
Package workerpool provides a worker pool for executing tasks concurrently with a fixed number of goroutines and bounded queue.
streaming
Package streaming offers a new take on streaming data, providing higher-level abstractions than standard Go readers and writers.
Package streaming offers a new take on streaming data, providing higher-level abstractions than standard Go readers and writers.
streaming/channel
Package channel provides backpressure-aware channels for flow control in concurrent applications.
Package channel provides backpressure-aware channels for flow control in concurrent applications.
streaming/stream
Package stream provides a powerful and expressive API for processing sequences of data in Go.
Package stream provides a powerful and expressive API for processing sequences of data in Go.
streaming/writer
Package writer provides asynchronous buffered writing for Go applications.
Package writer provides asynchronous buffered writing for Go applications.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL