benthos

package module
v0.0.0-...-c2ad322 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2025 License: Apache-2.0 Imports: 15 Imported by: 0

README

Conduit Processor for Benthos

Conduit processor that integrates with Benthos (via the Bento fork) to leverage its rich processing capabilities for transforming Conduit records.

How to build?

Run make build to build the processor.

Testing

Run make test to run all the unit tests.

Functionality

This processor allows you to pass Conduit OpenCDC records to an embedded Benthos server/process, which will then process the records using Benthos's comprehensive set of processors, and return the processed records back to Conduit.

Benthos offers a wide range of processors for data transformation, filtering, enrichment, and more, making it a powerful tool for complex data processing pipelines.

Processor Configuration
name description required default value
benthosYAML YAML configuration for the Benthos processors section. This defines the Benthos processors that will be applied to each record. You can use any Benthos processor documented at benthos.dev. true ""
bufferSize Size of internal channels for processing records. Higher values can improve throughput but use more memory. This controls how many records can be buffered internally while waiting for processing. false 10
threadCount Number of parallel processing threads in the Benthos pipeline. Higher values can improve throughput for CPU-bound processors. For IO-bound processors, increasing this may not improve performance. false 1
logLevel Controls the verbosity of Benthos internal logs. Valid values: NONE, ERROR, WARN, INFO, DEBUG, TRACE. false "INFO"
Example Configurations
Basic Text Transformation
processors:
  - id: benthos-processor
    type: benthos
    config:
      # Required: Benthos pipeline configuration in YAML format
      benthosYAML: |
        pipeline:
          processors:
            - mapping: |
                root.payload.after = content().payload.after.string().uppercase().bytes()

      # Optional performance tuning parameters
      bufferSize: 20          # Increase for higher throughput (default: 10)
      threadCount: 4          # Use multiple threads for parallel processing (default: 1)
      logLevel: "INFO"        # Control Benthos log verbosity (default: "INFO")
JSON Transformation
processors:
  - id: json-transform
    type: benthos
    config:
      benthosYAML: |
        pipeline:
          processors:
            - bloblang: |
                # Parse JSON from the payload
                let data = this.payload.after.string().parse_json()

                # Transform and create new JSON structure
                root.payload.after = {
                  "id": data.id,
                  "user": {
                    "name": data.name.uppercase(),
                    "email": data.email.lowercase(),
                    "role": if data.admin { "ADMIN" } else { "USER" }
                  },
                  "metadata": {
                    "processed_at": now().format_timestamp(),
                    "source": meta("source") ?? "unknown"
                  }
                }.encode_json().bytes()
HTTP Enrichment
processors:
  - id: http-enrichment
    type: benthos
    config:
      benthosYAML: |
        pipeline:
          processors:
            # Extract user ID from the record
            - bloblang: |
                let user_id = this.payload.after.string().parse_json().user_id
                meta user_id = user_id

            # Make HTTP request to external API
            - http:
                url: https://api.example.com/users/${! meta("user_id") }
                verb: GET
                headers:
                  Authorization: Bearer ${! env("API_TOKEN") }
                rate_limit: 10
                timeout: 5s

            # Process the response and merge with original data
            - bloblang: |
                let original = this.payload.after.string().parse_json()
                let user_data = meta("http_response").string().parse_json()

                root.payload.after = {
                  "id": original.id,
                  "transaction": original.transaction,
                  "user": user_data,
                  "enriched": true
                }.encode_json().bytes()
Conditional Processing
processors:
  - id: conditional-processor
    type: benthos
    config:
      benthosYAML: |
        pipeline:
          processors:
            - bloblang: |
                let data = this.payload.after.string().parse_json()

                # Add metadata for routing
                meta transaction_type = data.type
                meta amount = data.amount

                # Keep the original data
                root = this

            # Apply different processing based on conditions
            - switch:
                cases:
                  - check: meta("transaction_type") == "purchase" && meta("amount").number() > 1000
                    processors:
                      - bloblang: |
                          let data = this.payload.after.string().parse_json()
                          root.payload.after = {
                            "id": data.id,
                            "type": "high_value_purchase",
                            "amount": data.amount,
                            "needs_review": true
                          }.encode_json().bytes()

                  - check: meta("transaction_type") == "refund"
                    processors:
                      - bloblang: |
                          let data = this.payload.after.string().parse_json()
                          root.payload.after = {
                            "id": data.id,
                            "type": "refund_processed",
                            "amount": data.amount,
                            "processed_at": now()
                          }.encode_json().bytes()
Example Usage
Simple Transformation
// Create and configure the processor
p := benthos.NewProcessor()
p.Configure(ctx, config.Config{
    "benthosYAML": `
pipeline:
  processors:
    - mapping: |
        root.payload.after = content().payload.after.string().uppercase().bytes()
`,
})

// Open the processor
p.Open(ctx)
defer p.Teardown(ctx)

// Process records
results := p.Process(ctx, records)
JSON Transformation
// Create and configure the processor
p := benthos.NewProcessor()
p.Configure(ctx, config.Config{
    "benthosYAML": `
pipeline:
  processors:
    - mapping: |
        let parsed = content().payload.after.string().parse_json()
        root.payload.after = {
          "id": parsed.id,
          "name": parsed.name.uppercase(),
          "summary": "User " + parsed.name + " is " + parsed.age.string() + " years old"
        }.encode_json().bytes()
`,
})

// Open the processor
p.Open(ctx)
defer p.Teardown(ctx)

// Process records
results := p.Process(ctx, records)

Features

  • Full Benthos Integration: Uses the Bento library to provide access to all Benthos processors
  • Memory Efficient: Uses object pools to reduce GC pressure
  • Conduit Batching: Works seamlessly with Conduit's native batch processing
  • Parallel Processing: Configurable thread count for CPU-bound workloads
  • Hot Reload: Update processor configuration without restarting
  • Error Handling: Comprehensive error handling with detailed logging

Benthos Bloblang

Bloblang is Benthos's powerful data mapping language, designed specifically for data transformation. It's a key feature that makes this processor so versatile.

Key Bloblang Features
  • Type-safe: Bloblang is strongly typed, helping prevent runtime errors
  • Declarative: Focus on what you want to transform, not how
  • Composable: Build complex transformations from simple functions
  • Powerful: Rich set of functions for string, numeric, and collection operations
Common Bloblang Patterns
Accessing Record Data
# Access the payload.after field from a Conduit record
let data = this.payload.after.string().parse_json()

# Access specific fields
let user_id = data.user_id
let timestamp = data.timestamp
Transforming Data
# Create a new structure
root.payload.after = {
  "id": data.id,
  "user": {
    "name": data.name.uppercase(),
    "email": data.email.lowercase()
  },
  "processed": true,
  "timestamp": now()
}.encode_json().bytes()
Conditional Logic
# If-else statements
let status = if data.amount > 1000 {
  "high_value"
} else if data.amount > 500 {
  "medium_value"
} else {
  "low_value"
}

# Ternary-style conditionals
let is_admin = data.role == "admin" ? true : false
Working with Metadata
# Set metadata
meta source = "benthos_processor"
meta timestamp = now().format_timestamp()

# Get metadata
let source = meta("source")

For more details on Bloblang, see the official documentation.

Advanced Usage

Hot Reloading Configuration

The processor supports updating the Benthos configuration at runtime. You can configure all aspects of the Benthos stream through a single comprehensive configuration:

Configuration Structure

The Benthos processor configuration is a single YAML string that includes all Benthos configuration except for the input/output sections (which are handled automatically):

# Complete Benthos configuration (excluding input/output)
yaml: |
  # Pipeline section with processors
  pipeline:
    processors:
      - mapping: |
          root = this
          root.payload.after = this.payload.after.string().capitalize().bytes()

  # Resources section
  resources:
    caches:
      - label: my_cache
        memory: {}
    rate_limits:
      - label: my_rate_limit
        local:
          count: 10
          interval: 1s

  # Buffer configuration
  buffer:
    memory:
      limit: 10000000

  # Metrics configuration
  metrics:
    prometheus:
      prefix: benthos

  # Tracer configuration
  tracer:
    jaeger:
      agent_address: localhost:6831
      service_name: benthos

  # Logger configuration
  logger:
    level: DEBUG

# Thread count (handled separately)
threadCount: 4
Updating Configuration in Code

You can update the configuration programmatically by calling SetupBenthosStream:

// Update the configuration with a complete YAML string
completeYAML := `
pipeline:
  processors:
    - mapping: |
        root = this
        root.payload.after = this.payload.after.string().capitalize().bytes()

resources:
  caches:
    - label: my_cache
      memory: {}
  rate_limits:
    - label: my_rate_limit
      local:
        count: 10
        interval: 1s

logger:
  level: DEBUG
`

err := processor.SetupBenthosStream(ctx, benthosproc.BenthosConfig{
    YAML: completeYAML,
    ThreadCount: 4,
})
if err != nil {
    log.Fatalf("Failed to update configuration: %v", err)
}

The SetupBenthosStream method simply combines your YAML with the input/output configuration and sets up the Benthos stream. This approach is much simpler and more flexible than having separate methods for each configuration aspect.

Updating Configuration via NATS API

You can update the configuration using the NATS API:

# Update the complete configuration
nats req 'server-{serverid}.benthos.updateStream' '{
  "processorId": "pipeline-id:processor-id",
  "yaml": "pipeline:\n  processors:\n    - mapping: |\n        root = this\n        root.payload.after = this.payload.after.string().capitalize().bytes()\n\nresources:\n  caches:\n    - label: my_cache\n      memory: {}\n  rate_limits:\n    - label: my_rate_limit\n      local:\n        count: 10\n        interval: 1s\n\nbuffer:\n  memory:\n    limit: 10000000\n\nmetrics:\n  prometheus:\n    prefix: benthos\n\ntracer:\n  jaeger:\n    agent_address: localhost:6831\n    service_name: benthos\n\nlogger:\n  level: DEBUG",
  "threadCount": 4
}'

The updateStream endpoint is the only endpoint available for updating the Benthos configuration. It provides a simple way to update the entire Benthos configuration in a single call.

Known Issues & Limitations

  • Some advanced Benthos features may require additional configuration
  • Very large messages may cause memory pressure despite pooling

Future Enhancements

  • Support for Benthos plugins
  • Dynamic scaling of processing threads
  • Circuit breaker pattern for error handling
  • Improved schema handling and validation

Documentation

Overview

Example (ComplexTransformation)
p := NewBenthosProcessor()

// In a real implementation, this would be a Benthos YAML configuration
// that defines a complex transformation pipeline
p.Configure(nil, config.Config{
	"benthosYAML": `
input:
  generate:
    mapping: 'root = {"test":"data"}'
    interval: ""
    count: 1
pipeline:
  processors:
    - mapping: |
        let parsed = content().payload.after.string().parse_json()
        root.payload.after = {
          "id": parsed.id,
          "name": parsed.name.uppercase(),
          "summary": "User " + parsed.name + " is " + parsed.age.string() + " years old"
        }.encode_json().bytes()
`,
})

// Open the processor
p.Open(nil)
defer p.Teardown(nil)

// Create a test record with JSON data
record := opencdc.Record{
	Position:  opencdc.Position("pos-1"),
	Operation: opencdc.OperationCreate,
	Payload: opencdc.Change{
		After: opencdc.RawData(`{"id": 123, "name": "john", "age": 30}`),
	},
}

// Process the record
results := p.Process(nil, []opencdc.Record{record})

// In a real implementation with actual Benthos integration,
// the result would be transformed according to the YAML config
result := results[0].(sdk.SingleRecord)
fmt.Println("Processed by:", result.Metadata["processed_by"])
Output:
Processed by: benthos
Example (Uppercase)
p := NewBenthosProcessor()

// Configure the processor with a simple uppercase transformation
p.Configure(nil, config.Config{
	"benthosYAML": `
input:
  generate:
    mapping: 'root = {"test":"data"}'
    interval: ""
    count: 1
pipeline:
  processors:
    - mapping: |
        root.payload.after = content().payload.after.string().uppercase().bytes()
`,
})

// Open the processor
p.Open(nil)
defer p.Teardown(nil)

// Create a test record
record := opencdc.Record{
	Position:  opencdc.Position("pos-1"),
	Operation: opencdc.OperationCreate,
	Metadata:  opencdc.Metadata{"source": "example"},
	Payload: opencdc.Change{
		After: opencdc.RawData("hello world"),
	},
}

// Process the record
results := p.Process(nil, []opencdc.Record{record})

// Print the result
result := results[0].(sdk.SingleRecord)
fmt.Println("Processed payload:", string(result.Payload.After.Bytes()))
fmt.Println("Metadata:", result.Metadata["processed_by"])
Output:
Processed payload: HELLO WORLD
Metadata: benthos

Index

Examples

Constants

View Source
const (
	BenthosConfigBatchSize         = "batchSize"
	BenthosConfigChannelBufferSize = "channelBufferSize"
	BenthosConfigThreadCount       = "threadCount"
	BenthosConfigYaml              = "yaml"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type BenthosConfig

type BenthosConfig struct {
	// YAML is the complete Benthos configuration (excluding input/output)
	// This includes processors, resources, buffer, metrics, etc.
	YAML string `json:"yaml" validate:"required"`

	// BatchSize controls the maximum number of records to process in a single Benthos batch
	// Higher values can improve throughput but may increase memory usage
	BatchSize int `json:"batchSize" default:"100" validate:"gt=0"`

	// ChannelBufferSize controls the size of internal channels for communication
	// Higher values can improve throughput but use more memory
	ChannelBufferSize int `json:"channelBufferSize" default:"10"`

	// ThreadCount controls the number of parallel processing threads in the Benthos pipeline
	// Higher values can improve throughput for CPU-bound processors
	ThreadCount int `json:"threadCount" default:"1"`
}

BenthosConfig represents the configuration for the Benthos processor It's used both for initial configuration and for updates

func (BenthosConfig) Parameters

func (BenthosConfig) Parameters() map[string]config.Parameter

type BenthosProcessor

type BenthosProcessor struct {
	sdk.UnimplementedProcessor
	// contains filtered or unexported fields
}

func GetProcessorByID

func GetProcessorByID(id string) (*BenthosProcessor, bool)

GetProcessorByID returns a BenthosProcessor instance by its ID. Returns the processor and a boolean indicating if it was found.

This function is used by external API calls (like UpdateBenthosStream) to find a processor instance by its ID so that its configuration can be updated.

The processor ID typically follows the format "pipelineID:processorID" and is set by Conduit when the processor is opened.

func NewBenthosProcessor

func NewBenthosProcessor(logger log.CtxLogger) *BenthosProcessor

NewBenthosProcessor creates a new Benthos processor with the provided logger.

func (*BenthosProcessor) Configure

func (p *BenthosProcessor) Configure(ctx context.Context, cfg config.Config) error

func (*BenthosProcessor) Open

func (p *BenthosProcessor) Open(ctx context.Context) error

func (*BenthosProcessor) Process

func (p *BenthosProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord

func (*BenthosProcessor) SetupBenthosStream

func (p *BenthosProcessor) SetupBenthosStream(ctx context.Context, config BenthosConfig) error

SetupBenthosStream handles creating or updating the Benthos stream with the provided configuration. This method is thread-safe and can be called both during initialization and for runtime updates.

func (*BenthosProcessor) Specification

func (p *BenthosProcessor) Specification() (sdk.Specification, error)

func (*BenthosProcessor) Teardown

func (p *BenthosProcessor) Teardown(ctx context.Context) error

Directories

Path Synopsis
cmd
processor command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL