PocketFlowGo

module
v0.0.0-...-d4e4ec1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2025 License: Apache-2.0

README

PocketFlowGo

Over 80% of the code is generated by AI

PocketFlowGo is a lightweight, centralized task orchestration service written in Go. It lets you define business workflows as versioned graphs, execute tasks across remote workers over HTTP, and persist state and run history in SQLite for durability and recovery.

Highlights

  • Versioned workflow model: flowsflow_versionstasks
  • Rich node types: executor, choice, parallel, subflow, timer, foreach, wait_event, approval
  • Remote workers over HTTP with registration, heartbeat, and load-aware allocation
  • Durable state in SQLite: task cursor, shared state, retries, leases, and full node run logs
  • Crash-safe scheduling loop that resumes leased tasks
  • Async Queue (Pull Mode) for workers behind firewalls or long-running tasks

UI Preview

Flow Details

Task Details

Quick Start

Prerequisites: Go 1.20+

  1. Start the Scheduler
SCHEDULER_DB=./scheduler.db go run cmd/scheduler/main.go
  1. Start a Worker
REGISTRY_URL=http://localhost:8070 \
WORKER_URL=http://localhost:8080 \
go run cmd/worker/main.go
  1. Run the CLI demo
SCHEDULER_BASE=http://localhost:8070 go run cmd/cli/main.go

This will create example flows and tasks, then poll until tasks complete. Check the console output for task status and node run details.

Data Model

  • flows: logical workflow namespace (id, name)
  • flow_versions: concrete version with JSON graph definition and status (e.g., published)
  • tasks: execution instance bound to a flow_version with runtime fields like current_node_key, shared_json, lease_owner, etc.

Relationship: one flow has many flow_versions; one flow_version is referenced by many tasks.

Flow Definition (JSON)

Minimal example:

{
  "start": "decide",
  "nodes": {
    "decide": {"kind": "executor", "service": "route", "post": {"action_key": "action"}},
    "B": {"kind": "executor", "service": "transform", "prep": {"input_key": "$params.text"}, "post": {"output_key": "result", "action_static": "default"}},
    "C": {"kind": "executor", "service": "transform", "prep": {"input_key": "$params.text"}, "post": {"output_key": "result", "action_static": "default"}}
  },
  "edges": [
    {"from": "decide", "action": "goB", "to": "B"},
    {"from": "decide", "action": "goC", "to": "C"},
    {"from": "B", "action": "default", "to": ""},
    {"from": "C", "action": "default", "to": ""}
  ]
}

Key fields:

  • start: node key to begin execution
  • nodes: map of node definitions with kind, service, exec_type (default http, optional queue), prep, params, and post
  • edges: array of {from, action, to}; action="default" is the fallback

Async Queue Mode (Pull)

For workers that cannot be reached directly via HTTP (e.g., behind firewalls) or for long-running tasks, use exec_type: "queue".

  1. Define Node: Set "exec_type": "queue" in the node definition.
  2. Enqueue: The engine suspends the task and writes a job to the queue table.
  3. Poll: Workers call POST /queue/poll to fetch pending jobs.
  4. Complete: Workers process the job and call POST /queue/complete with the result.
  5. Resume: The engine resumes the flow execution with the reported result.

HTTP API

Worker Registry (compatible routes):

  • POST /workers/register
  • POST /workers/heartbeat
  • GET /workers/list?service=...
  • GET /workers/allocate?service=...

Flows & Versions:

  • POST /flows → create flow
  • POST /flows/version → create and publish version with definition JSON

Tasks:

  • POST /tasks → create task referencing latest published version of a flow
  • GET /tasks?status=... → list tasks
  • GET /tasks/get?id=... → task details
  • POST /tasks/cancel?id=... → mark as canceling
  • GET /tasks/runs?task_id=... → node run log
  • POST /tasks/signal → write a key/value into task shared state (for wait_event/approval)

Queue (Pull Mode):

  • POST /queue/poll → worker polls for pending tasks
  • POST /queue/complete → worker reports task completion with result

Components & Layout

  • cmd/scheduler: HTTP API + scheduling loop
  • cmd/worker: HTTP worker providing services like transform, sum, route
  • cmd/cli: loads demo flows, creates tasks, and polls for completion
  • pkg/server: HTTP handlers for flows, versions, tasks, workers
  • pkg/engine: execution engine for nodes and edges
  • pkg/store: SQLite schema and read/write operations

Testing

Run unit tests:

go test ./...

Notable tests include engine integration scenarios and utility functions.

Notes

  • SQLite is used by default; the model can be extended to Postgres.
  • Leases (lease_owner/lease_expiry) prevent double execution; expired leases are reclaimed.
  • See docs/architecture.md for a detailed design record.

Future Work

  • Worker SDK for Go / Python / JavaScript
  • Postgres support
  • Metrics and logging

License

This project is licensed under the Apache License 2.0 - see the LICENSE file for details.

Directories

Path Synopsis
cmd
cli command
loadexamples command
scheduler command
worker command
worker-async command
pkg
server
Package server provides the HTTP API server for PocketFlowGo.
Package server provides the HTTP API server for PocketFlowGo.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL