Documentation
¶
Overview ¶
Example (ComplexTransformation) ¶
p := NewBenthosProcessor()
// In a real implementation, this would be a Benthos YAML configuration
// that defines a complex transformation pipeline
p.Configure(nil, config.Config{
"benthosYAML": `
input:
generate:
mapping: 'root = {"test":"data"}'
interval: ""
count: 1
pipeline:
processors:
- mapping: |
let parsed = content().payload.after.string().parse_json()
root.payload.after = {
"id": parsed.id,
"name": parsed.name.uppercase(),
"summary": "User " + parsed.name + " is " + parsed.age.string() + " years old"
}.encode_json().bytes()
`,
})
// Open the processor
p.Open(nil)
defer p.Teardown(nil)
// Create a test record with JSON data
record := opencdc.Record{
Position: opencdc.Position("pos-1"),
Operation: opencdc.OperationCreate,
Payload: opencdc.Change{
After: opencdc.RawData(`{"id": 123, "name": "john", "age": 30}`),
},
}
// Process the record
results := p.Process(nil, []opencdc.Record{record})
// In a real implementation with actual Benthos integration,
// the result would be transformed according to the YAML config
result := results[0].(sdk.SingleRecord)
fmt.Println("Processed by:", result.Metadata["processed_by"])
Output: Processed by: benthos
Example (Uppercase) ¶
p := NewBenthosProcessor()
// Configure the processor with a simple uppercase transformation
p.Configure(nil, config.Config{
"benthosYAML": `
input:
generate:
mapping: 'root = {"test":"data"}'
interval: ""
count: 1
pipeline:
processors:
- mapping: |
root.payload.after = content().payload.after.string().uppercase().bytes()
`,
})
// Open the processor
p.Open(nil)
defer p.Teardown(nil)
// Create a test record
record := opencdc.Record{
Position: opencdc.Position("pos-1"),
Operation: opencdc.OperationCreate,
Metadata: opencdc.Metadata{"source": "example"},
Payload: opencdc.Change{
After: opencdc.RawData("hello world"),
},
}
// Process the record
results := p.Process(nil, []opencdc.Record{record})
// Print the result
result := results[0].(sdk.SingleRecord)
fmt.Println("Processed payload:", string(result.Payload.After.Bytes()))
fmt.Println("Metadata:", result.Metadata["processed_by"])
Output: Processed payload: HELLO WORLD Metadata: benthos
Index ¶
- Constants
- type BenthosConfig
- type BenthosProcessor
- func (p *BenthosProcessor) Configure(ctx context.Context, cfg config.Config) error
- func (p *BenthosProcessor) Open(ctx context.Context) error
- func (p *BenthosProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord
- func (p *BenthosProcessor) SetupBenthosStream(ctx context.Context, config BenthosConfig) error
- func (p *BenthosProcessor) Specification() (sdk.Specification, error)
- func (p *BenthosProcessor) Teardown(ctx context.Context) error
Examples ¶
Constants ¶
const ( BenthosConfigBatchSize = "batchSize" BenthosConfigChannelBufferSize = "channelBufferSize" BenthosConfigThreadCount = "threadCount" BenthosConfigYaml = "yaml" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BenthosConfig ¶
type BenthosConfig struct {
// YAML is the complete Benthos configuration (excluding input/output)
// This includes processors, resources, buffer, metrics, etc.
YAML string `json:"yaml" validate:"required"`
// BatchSize controls the maximum number of records to process in a single Benthos batch
// Higher values can improve throughput but may increase memory usage
BatchSize int `json:"batchSize" default:"100" validate:"gt=0"`
// ChannelBufferSize controls the size of internal channels for communication
// Higher values can improve throughput but use more memory
ChannelBufferSize int `json:"channelBufferSize" default:"10"`
// ThreadCount controls the number of parallel processing threads in the Benthos pipeline
// Higher values can improve throughput for CPU-bound processors
ThreadCount int `json:"threadCount" default:"1"`
}
BenthosConfig represents the configuration for the Benthos processor It's used both for initial configuration and for updates
func (BenthosConfig) Parameters ¶
func (BenthosConfig) Parameters() map[string]config.Parameter
type BenthosProcessor ¶
type BenthosProcessor struct {
sdk.UnimplementedProcessor
// contains filtered or unexported fields
}
func GetProcessorByID ¶
func GetProcessorByID(id string) (*BenthosProcessor, bool)
GetProcessorByID returns a BenthosProcessor instance by its ID. Returns the processor and a boolean indicating if it was found.
This function is used by external API calls (like UpdateBenthosStream) to find a processor instance by its ID so that its configuration can be updated.
The processor ID typically follows the format "pipelineID:processorID" and is set by Conduit when the processor is opened.
func NewBenthosProcessor ¶
func NewBenthosProcessor(logger log.CtxLogger) *BenthosProcessor
NewBenthosProcessor creates a new Benthos processor with the provided logger.
func (*BenthosProcessor) Process ¶
func (p *BenthosProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*BenthosProcessor) SetupBenthosStream ¶
func (p *BenthosProcessor) SetupBenthosStream(ctx context.Context, config BenthosConfig) error
SetupBenthosStream handles creating or updating the Benthos stream with the provided configuration. This method is thread-safe and can be called both during initialization and for runtime updates.
func (*BenthosProcessor) Specification ¶
func (p *BenthosProcessor) Specification() (sdk.Specification, error)