morpheus

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2026 License: MIT Imports: 11 Imported by: 0

README

Morpheus

A data transformation library for Go

Go Version Go Reference License Build Status Go Report Card

Features | Quick Start | Documentation | Examples | Contributing


What It Does

Morpheus transforms data. It gives you:

  • 50+ built-in transformers — string, numeric, datetime, validation, normalization, privacy, schema, aggregation
  • YAML-driven rule engine — define transformation logic in config, not code
  • DAG pipeline orchestration — multi-step transformations with parallel execution
  • PII detection and masking — email, SSN, phone, credit card pattern detection with masking
  • Format conversion — JSON, YAML, XML bidirectional conversion
  • Stream parsing — token-based JSON, XML, CSV parsing

It uses hexagonal architecture (ports and adapters), so everything is behind interfaces and testable.

What It Is Not

This is not an application framework. It does not replace your HTTP server, database layer, message broker, or deployment infrastructure. It is a library you import and call.


Features

Data Transformers

50+ transformers across 10 categories. Each implements the Transformer interface. Same API, composable, chainable.

Category Transformers Example
String upper, lower, trim, replace, split, join, pad, truncate, slug hello -> HELLO
Numeric add, subtract, multiply, divide, round 3.14159 -> 3.14
Datetime format, parse, add duration, subtract duration, days between 2024-01-01 -> Jan 1, 2024
Validation email, URL, phone, IP address bad@ -> error
Normalization phone, email, name, address +1 (555) 123-4567 -> +15551234567
Privacy mask email, mask phone, hash, tokenize john@example.com -> j***@example.com
Schema validate against schema, apply defaults, type checks Missing required -> error
Mapping field rename, pick, omit, set first_name: Alice -> firstName: Alice
Aggregation sum, avg, min, max, count, median [1,2,3,4,5] -> sum: 15
Utility flatten, unflatten, jsonpath, template, reverse {a:{b:1}} -> {a.b:1}
Rule Engine

Define transformation rules in YAML. The engine compiles them, indexes fields, and evaluates data against conditions.

DAG Pipeline

Multi-step transformation pipelines where stages can depend on each other. Independent stages run in parallel automatically.

PII Detection and Masking

Regex-based detection for email, SSN, phone, credit card, passport. Configurable masking strategies (partial, full, hash). AES-256 tokenization with reversible detokenization. Audit logging for compliance.

Caching

In-memory (LRU with TTL), Redis, or multi-level (L1 memory + L2 Redis). Stampede protection. Cache warming. Redis adapters support standalone, cluster, sentinel, and ring topologies via redis.UniversalClient.

Resilience

Circuit breaker (closed/open/half-open), retry with exponential backoff and jitter, dead letter queue for failed jobs, error coordinator that composes all three.


Quick Start

Installation
go get github.com/abhipray-cpu/morpheus

The simplest way to use Morpheus. A single import, sensible defaults, all built-in transformers pre-loaded:

package main

import (
    "context"
    "fmt"
    "github.com/abhipray-cpu/morpheus"
)

func main() {
    m := morpheus.New()

    // Single transform
    result, err := m.Transform("uppercase", "hello world")
    fmt.Println(result) // "HELLO WORLD"

    // Chain transforms
    result, err = m.TransformChain([]string{"trim", "uppercase"}, "  hello  ")
    fmt.Println(result) // "HELLO"

    // Detect PII
    ctx := context.Background()
    detections, err := m.DetectPII(ctx, map[string]any{
        "email": "john@example.com",
        "ssn":   "123-45-6789",
    })

    // Mask PII
    masked, err := m.MaskPII(ctx, map[string]any{
        "email": "john@example.com",
    })

    // Rule evaluation
    rules := []string{`
rule: high_value
conditions:
  - field: total
    operator: greater_than
    value: 1000
transformations:
  - field: priority
    transformer: set_value
    params:
      value: high
`}
    data := map[string]any{"total": 1500}
    result, matched, err := m.EvaluateRules(ctx, rules, data)
    fmt.Println(matched) // ["high_value"]

    // Format conversion
    yamlOut, err := m.Convert(ctx, `{"name":"Alice"}`, "json", "yaml")
}
With Options
import (
    "time"
    "github.com/abhipray-cpu/morpheus"
    "github.com/abhipray-cpu/morpheus/pkg/adapter/cache/memory"
)

cache := memory.NewMemoryCache(1000, 5*time.Minute)
m := morpheus.New(
    morpheus.WithCache(cache),
    morpheus.WithCacheTTL(10*time.Minute),
)
Redis Cluster / Sentinel

All Redis adapters (cache, queue, enrichment) accept redis.UniversalClient, which works with standalone, cluster, sentinel, and ring topologies:

import (
    "github.com/redis/go-redis/v9"
    rediscache "github.com/abhipray-cpu/morpheus/pkg/adapter/cache/redis"
)

// Standalone (single node)
client := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
cache := rediscache.NewRedisCacheFromClient(client, 5*time.Minute)

// Cluster (multiple nodes)
cluster := redis.NewClusterClient(&redis.ClusterOptions{
    Addrs: []string{"node1:6379", "node2:6379", "node3:6379"},
})
cache = rediscache.NewRedisCacheFromClient(cluster, 5*time.Minute)

// Sentinel (automatic failover)
failover := redis.NewFailoverClient(&redis.FailoverOptions{
    MasterName:    "mymaster",
    SentinelAddrs: []string{"sentinel1:26379", "sentinel2:26379"},
})
cache = rediscache.NewRedisCacheFromClient(failover, 5*time.Minute)

// Auto-detect via UniversalClient
universal := redis.NewUniversalClient(&redis.UniversalOptions{
    Addrs:      []string{"localhost:6379"},      // 1 addr = standalone, 2+ = cluster
    MasterName: "",                               // set for sentinel mode
})
cache = rediscache.NewRedisCacheFromClient(universal, 5*time.Minute)
Direct Transformer Usage

For finer control, use transformers directly:

import "github.com/abhipray-cpu/morpheus/pkg/domain/entity"

upper := entity.NewUpperCaseTransformer()
ctx := entity.NewTransformContext()
result, err := upper.Transform(ctx, "hello world")
// result = "HELLO WORLD"
Chain Transformers
registry := entity.NewTransformerRegistry()
registry.Register(entity.NewTrimTransformer())
registry.Register(entity.NewUpperCaseTransformer())

chain := entity.NewChainTransformer(registry, []string{"trim", "uppercase"})
result, _ := chain.Transform(entity.NewTransformContext(), "  Hello World  ")
// result: "HELLO WORLD"
Use the Rule Engine
import "github.com/abhipray-cpu/morpheus/pkg/adapter/rule"

cache := memory.NewMemoryCache(1000, 5*time.Minute)
engine := rule.NewRuleEngine(cache, nil)

rules := []string{`
rule: high_value_order
conditions:
  - field: total
    operator: greater_than
    value: 1000
transformations:
  - field: priority
    transformer: set_value
    params:
      value: high
`}

ctx := context.Background()
engine.Compile(ctx, rules)
result, matched, _ := engine.Evaluate(ctx, map[string]any{
    "total": 1500,
})
// result["priority"] = "high", matched = ["high_value_order"]

Documentation

Document Description
Architecture Architecture diagrams, class diagrams, sequence diagrams, package structure, design decisions
Usage Guide Complete usage guide with code examples for all features
Contributing How to contribute
Changelog Version history
Security Vulnerability reporting
License MIT License

Examples

7 runnable examples covering all features:

# Example What It Demonstrates
01 Config-Driven Cache Memory, Redis, multi-level cache
02 Config-Driven Queue Priority queuing, worker pools
03 Transformer Factory Built-in + custom transformers
04 Rule Engine YAML rules, condition evaluation
05 DAG Pipeline Parallel execution, dependency resolution
06 PII Workflow Detection, masking, audit logging
07 Production E-Commerce End-to-end with resilience patterns
make examples           # Run all examples
make run-example E=01   # Run a specific example

Architecture

Morpheus uses hexagonal architecture with three layers:

Domain Layer (entities, services, value objects)
  Transformer interface + 50+ implementations
  Pipeline, Stage, Job entities
  TransformationService, ValidationService
  FormatConverter, StreamParsers

Port Layer (interfaces)
  TransformerPort, PipelineExecutorPort
  RuleEnginePort, PrivacyPort
  CachePort, MetricsPort

Adapter Layer (implementations)
  Rule Engine, Pipeline Orchestrator
  PII Detector/Masker/Tokenizer
  Memory/Redis/Multi-level Cache
  Circuit Breaker, Retry, DLQ

See Architecture Documentation for full diagrams.


Dependencies

Core transformers have zero external dependencies. Adapters that connect to external systems pull in:

Dependency Purpose
gopkg.in/yaml.v2 YAML parsing for rule engine and format converter
github.com/redis/go-redis/v9 Redis cache and queue adapters
golang.org/x/time Rate limiting
github.com/mattn/go-sqlite3 SQLite storage adapter

Development

make setup              # Install dependencies
make test               # Run all tests
make test-race          # Race detector
make test-coverage      # Coverage report
make lint               # Lint
make validate           # All checks
make fmt                # Format code
Test Suite
  • 1300+ tests across unit, integration, and e2e
  • Race detector clean (on non-Redis tests)

Contributing

See CONTRIBUTING.md for guidelines.

git checkout -b feature/my-feature
# make your changes
make validate
git commit -m "feat: add my feature"
git push origin feature/my-feature
# open a PR

License

MIT. See LICENSE.

Documentation

Overview

Package morpheus provides a single entry point for the Morpheus data transformation library.

Usage:

m := morpheus.New()
result, err := m.Transform("uppercase", "hello world")
// result = "HELLO WORLD"

result, err := m.TransformChain([]string{"trim", "uppercase"}, "  hello  ")
// result = "HELLO"

detections, err := m.DetectPII(data)
masked, err := m.MaskPII(data)

pipeline := morpheus.NewPipeline("etl").
    AddStage(morpheus.NewStage("extract", "extractor", nil)).
    AddStage(morpheus.NewStage("transform", "transformer", nil).DependsOn("extract"))
result, err := m.RunPipeline(pipeline, input)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultRegistry

func DefaultRegistry() *entity.TransformerRegistry

DefaultRegistry returns a TransformerRegistry pre-loaded with all built-in zero-configuration transformers. Transformers that require parameters (like Replace, Add, Substring, SchemaValidator) are not included — register them yourself with the specific config you need.

registry := morpheus.DefaultRegistry()
t, _ := registry.Get("uppercase")
result, _ := t.Transform(ctx, "hello")

Types

type Closeable

type Closeable interface {
	Close() error
}

Closeable is an optional interface that adapters can implement for graceful shutdown. If a cache or metrics adapter implements this, Shutdown() will call Close().

type Format

type Format string

Format represents a data format for conversion.

const (
	JSON Format = "json"
	YAML Format = "yaml"
	XML  Format = "xml"
)

type Morpheus

type Morpheus struct {
	// contains filtered or unexported fields
}

Morpheus is the main entry point for the library. Create one with New() and use it to transform data.

func New

func New(opts ...Option) *Morpheus

New creates a new Morpheus instance with all built-in transformers registered. Use functional options to configure caching, metrics, etc.

m := morpheus.New()
m := morpheus.New(morpheus.WithCacheTTL(10 * time.Minute))

func (*Morpheus) Convert

func (m *Morpheus) Convert(ctx context.Context, input any, from, to Format) (any, error)

Convert converts data between formats.

yaml, err := m.Convert(jsonData, morpheus.JSON, morpheus.YAML)

func (*Morpheus) DetectPII

func (m *Morpheus) DetectPII(ctx context.Context, data any) ([]port.PIIDetectionResult, error)

DetectPII scans data for personally identifiable information. Returns a list of detections with field paths and PII types.

detections, err := m.DetectPII(ctx, map[string]any{
    "email": "john@example.com",
    "ssn":   "123-45-6789",
})

func (*Morpheus) EvaluateRules

func (m *Morpheus) EvaluateRules(ctx context.Context, rules []string, data map[string]any) (map[string]any, []string, error)

EvaluateRules compiles YAML rules and evaluates data against them. Returns the modified data and a list of matched rule names.

ctx := context.Background()
rules := []string{`
rule: high_value
conditions:
  - field: total
    operator: greater_than
    value: 1000
transformations:
  - field: priority
    transformer: set_value
    params:
      value: high
`}
result, matched, err := m.EvaluateRules(ctx, rules, data)

func (*Morpheus) ListTransformers

func (m *Morpheus) ListTransformers() []string

ListTransformers returns all available transformer names.

func (*Morpheus) MaskPII

func (m *Morpheus) MaskPII(ctx context.Context, data any) (any, error)

MaskPII detects and masks PII in the data.

masked, err := m.MaskPII(ctx, map[string]any{
    "email": "john@example.com",
})
// masked["email"] = "j***@example.com"

func (*Morpheus) RegisterTransformer

func (m *Morpheus) RegisterTransformer(t entity.Transformer) error

RegisterTransformer adds a custom transformer to this instance.

m.RegisterTransformer(myCustomTransformer)

func (*Morpheus) Registry

func (m *Morpheus) Registry() *entity.TransformerRegistry

Registry returns the transformer registry for advanced usage (registering custom transformers, listing available ones, etc.)

func (*Morpheus) Shutdown

func (m *Morpheus) Shutdown() error

Shutdown gracefully shuts down all adapters that support it. Call this when your application is terminating.

m := morpheus.New(morpheus.WithCache(redisCache))
defer m.Shutdown()

func (*Morpheus) Transform

func (m *Morpheus) Transform(name string, input any, params ...Params) (any, error)

Transform applies a named transformer to the input value.

result, err := m.Transform("upper_case", "hello")
// result = "HELLO"

result, err := m.Transform("add", 5, morpheus.Params{"amount": 3})
// result = 8

func (*Morpheus) TransformChain

func (m *Morpheus) TransformChain(names []string, input any) (any, error)

TransformChain applies multiple transformers in sequence. The output of each transformer becomes the input of the next.

result, err := m.TransformChain([]string{"trim", "upper_case", "slug"}, "  Hello World  ")
// result = "hello-world"

type Option

type Option func(*Morpheus)

Option configures a Morpheus instance.

func WithCache

func WithCache(cache port.CachePort) Option

WithCache sets a cache port for caching transformation results.

m := morpheus.New(morpheus.WithCache(myCache))

func WithCacheTTL

func WithCacheTTL(ttl time.Duration) Option

WithCacheTTL sets the default cache TTL for transformation results. Default is 5 minutes.

m := morpheus.New(morpheus.WithCacheTTL(10 * time.Minute))

func WithMetrics

func WithMetrics(metrics port.MetricsPort) Option

WithMetrics sets a metrics port for recording transformation metrics.

m := morpheus.New(morpheus.WithMetrics(myMetrics))

type Params

type Params map[string]any

Params is a convenience type for passing parameters to transformers.

Directories

Path Synopsis
examples
05_dag_pipeline command
06_pii_workflow command
pkg
adapter/cache/memory
Package memory provides an in-memory LRU cache with TTL expiration.
Package memory provides an in-memory LRU cache with TTL expiration.
adapter/cache/multi_level
Package multilevel provides a multi-level cache (L1 memory + L2 Redis) with cache warming, stampede protection, and promotion between levels.
Package multilevel provides a multi-level cache (L1 memory + L2 Redis) with cache warming, stampede protection, and promotion between levels.
adapter/cache/redis
Package redis provides a Redis-backed cache adapter implementing CachePort.
Package redis provides a Redis-backed cache adapter implementing CachePort.
adapter/enrichment
Package enrichment provides OPTIONAL infrastructure adapters for data enrichment — fetching supplementary data from external sources (APIs, databases, Redis) to augment transformation inputs.
Package enrichment provides OPTIONAL infrastructure adapters for data enrichment — fetching supplementary data from external sources (APIs, databases, Redis) to augment transformation inputs.
adapter/enrichment/api
Package api provides an HTTP API-based data enrichment adapter with rate limiting, health checks, and circuit breaker support.
Package api provides an HTTP API-based data enrichment adapter with rate limiting, health checks, and circuit breaker support.
adapter/enrichment/database
Package database provides a database-backed data enrichment adapter with connection pooling and query building.
Package database provides a database-backed data enrichment adapter with connection pooling and query building.
adapter/enrichment/redis
Package redis provides a Redis-backed data enrichment adapter for fast key-value lookups.
Package redis provides a Redis-backed data enrichment adapter for fast key-value lookups.
adapter/enrichment/shared
Package shared provides shared utilities for enrichment adapters including circuit breaker and fallback strategies.
Package shared provides shared utilities for enrichment adapters including circuit breaker and fallback strategies.
adapter/error
pkg/adapter/error/backoff.go
pkg/adapter/error/backoff.go
adapter/function/local
Package local provides a local function executor that runs Go functions in-process without network calls.
Package local provides a local function executor that runs Go functions in-process without network calls.
adapter/function/serverless
Package serverless provides serverless function executors for AWS Lambda, Azure Functions, and GCP Cloud Functions.
Package serverless provides serverless function executors for AWS Lambda, Azure Functions, and GCP Cloud Functions.
adapter/metrics
Package metrics provides OPTIONAL infrastructure adapters for recording transformation metrics (in-memory, Prometheus, or async).
Package metrics provides OPTIONAL infrastructure adapters for recording transformation metrics (in-memory, Prometheus, or async).
adapter/metrics/async
Package async provides a non-blocking metrics adapter with configurable buffer size and sampling rate for high-throughput applications.
Package async provides a non-blocking metrics adapter with configurable buffer size and sampling rate for high-throughput applications.
adapter/metrics/memory
Package memory provides an in-memory metrics adapter for testing and development.
Package memory provides an in-memory metrics adapter for testing and development.
adapter/metrics/prometheus
Package prometheus provides a Prometheus-compatible metrics adapter that exposes counters, gauges, and histograms via the Prometheus client.
Package prometheus provides a Prometheus-compatible metrics adapter that exposes counters, gauges, and histograms via the Prometheus client.
adapter/pipeline/dag
Package dag provides a directed acyclic graph implementation for pipeline stage dependency resolution with cycle detection and topological sorting.
Package dag provides a directed acyclic graph implementation for pipeline stage dependency resolution with cycle detection and topological sorting.
adapter/pipeline/orchestrator
Package orchestrator provides a DAG pipeline orchestrator that executes pipeline stages in topologically-sorted waves, running independent stages in parallel.
Package orchestrator provides a DAG pipeline orchestrator that executes pipeline stages in topologically-sorted waves, running independent stages in parallel.
adapter/privacy
pkg/adapter/privacy/audit_logger.go
pkg/adapter/privacy/audit_logger.go
adapter/queue
Package queue provides OPTIONAL infrastructure adapters for job queuing (in-memory or Redis-backed) with priority routing and backpressure.
Package queue provides OPTIONAL infrastructure adapters for job queuing (in-memory or Redis-backed) with priority routing and backpressure.
adapter/queue/memory
Package memory provides an in-memory priority queue with worker pools, circuit breaker, and backpressure handling.
Package memory provides an in-memory priority queue with worker pools, circuit breaker, and backpressure handling.
adapter/queue/redis
Package redis provides a Redis-backed queue adapter with priority routing and distributed worker support.
Package redis provides a Redis-backed queue adapter with priority routing and distributed worker support.
adapter/queue/shared
Package shared provides shared types and utilities for queue adapters including configuration, backpressure, circuit breaker, and priority routing.
Package shared provides shared types and utilities for queue adapters including configuration, backpressure, circuit breaker, and priority routing.
adapter/rule
Package rule provides a YAML-driven rule engine that compiles rules, indexes fields for fast lookup, evaluates conditions against data, and applies transformations when conditions match.
Package rule provides a YAML-driven rule engine that compiles rules, indexes fields for fast lookup, evaluates conditions against data, and applies transformations when conditions match.
adapter/storage
Package storage provides OPTIONAL infrastructure adapters for persistent key-value storage (file-based or SQLite database).
Package storage provides OPTIONAL infrastructure adapters for persistent key-value storage (file-based or SQLite database).
adapter/storage/database
Package database provides a SQLite-backed key-value storage adapter.
Package database provides a SQLite-backed key-value storage adapter.
adapter/storage/file
Package file provides a file system-backed key-value storage adapter.
Package file provides a file system-backed key-value storage adapter.
domain/entity
Package entity defines the core domain entities for the Morpheus library.
Package entity defines the core domain entities for the Morpheus library.
domain/format
pkg/domain/format/converter.go
pkg/domain/format/converter.go
domain/port
pkg/domain/port/audit_port.go
pkg/domain/port/audit_port.go
domain/service
Package service provides domain services that orchestrate transformer execution.
Package service provides domain services that orchestrate transformer execution.
domain/streaming
pkg/domain/streaming/csv_parser.go
pkg/domain/streaming/csv_parser.go
domain/valueobject
Package valueobject defines immutable value objects used across the domain layer.
Package valueobject defines immutable value objects used across the domain layer.
test
integration
test/integration/privacy_integration_helper.go
test/integration/privacy_integration_helper.go
mocks
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL