Documentation
¶
Index ¶
- func NewErrorProcessor(log.CtxLogger) sdk.Processor
- type ErrorProcessor
- func (p *ErrorProcessor) Configure(ctx context.Context, cfg config.Config) error
- func (*ErrorProcessor) MiddlewareOptions() []sdk.ProcessorMiddlewareOption
- func (p *ErrorProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
- func (p *ErrorProcessor) Specification() (sdk.Specification, error)
- type FilterProcessor
- type SplitProcessor
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type ErrorProcessor ¶ added in v0.13.4
type ErrorProcessor struct {
sdk.UnimplementedProcessor
// contains filtered or unexported fields
}
Example ¶
p := NewErrorProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
Summary: `Error record with custom error message`,
Description: `This example shows how to configure the error processor to
return a custom error message for a record using a Go template.`,
Config: config.Config{
"message": "custom error message with data from record: {{.Metadata.foo}}",
},
Have: opencdc.Record{
Operation: opencdc.OperationCreate,
Metadata: map[string]string{"foo": "bar"},
Payload: opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}, Before: opencdc.StructuredData{"bar": "baz"}},
},
Want: sdk.ErrorRecord{
Error: cerrors.New("custom error message with data from record: bar"),
},
})
Output: processor returned error: custom error message with data from record: bar
func (*ErrorProcessor) MiddlewareOptions ¶ added in v0.13.4
func (*ErrorProcessor) MiddlewareOptions() []sdk.ProcessorMiddlewareOption
func (*ErrorProcessor) Process ¶ added in v0.13.4
func (p *ErrorProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*ErrorProcessor) Specification ¶ added in v0.13.4
func (p *ErrorProcessor) Specification() (sdk.Specification, error)
type FilterProcessor ¶ added in v0.13.4
type FilterProcessor struct {
sdk.UnimplementedProcessor
}
Example ¶
p := NewFilterProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
Summary: `Filter out the record`,
Config: config.Config{},
Have: opencdc.Record{
Operation: opencdc.OperationCreate,
Metadata: map[string]string{"key1": "val1"},
Payload: opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}, Before: opencdc.StructuredData{"bar": "baz"}},
},
Want: sdk.FilterRecord{},
})
Output: processor filtered record out
func NewFilterProcessor ¶
func NewFilterProcessor(log.CtxLogger) *FilterProcessor
func (*FilterProcessor) MiddlewareOptions ¶ added in v0.13.4
func (*FilterProcessor) MiddlewareOptions() []sdk.ProcessorMiddlewareOption
func (*FilterProcessor) Process ¶ added in v0.13.4
func (p *FilterProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*FilterProcessor) Specification ¶ added in v0.13.4
func (p *FilterProcessor) Specification() (sdk.Specification, error)
type SplitProcessor ¶ added in v0.14.0
type SplitProcessor struct {
sdk.UnimplementedProcessor
// contains filtered or unexported fields
}
Example (Simple) ¶
p := NewSplitProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
Summary: "Split array into multiple records",
Description: "This example takes the array in field `.Payload.After.users` and splits it into separate records, each containing one element.",
Config: config.Config{"field": ".Payload.After.users"},
Have: opencdc.Record{
Operation: opencdc.OperationUpdate,
Key: opencdc.StructuredData{"id": 123},
Payload: opencdc.Change{After: opencdc.StructuredData{
"users": []map[string]any{
{"name": "Alice", "age": 30},
{"name": "Bob", "age": 25},
{"name": "Charlie", "age": 35},
},
}},
},
Want: sdk.MultiRecord{
opencdc.Record{
Operation: opencdc.OperationUpdate,
Metadata: map[string]string{"split.index": "0"},
Key: opencdc.StructuredData{"id": 123},
Payload: opencdc.Change{After: opencdc.StructuredData{
"users": map[string]any{"name": "Alice", "age": 30},
}},
},
opencdc.Record{
Operation: opencdc.OperationUpdate,
Metadata: map[string]string{"split.index": "1"},
Key: opencdc.StructuredData{"id": 123},
Payload: opencdc.Change{After: opencdc.StructuredData{
"users": map[string]any{"name": "Bob", "age": 25},
}},
},
opencdc.Record{
Operation: opencdc.OperationUpdate,
Metadata: map[string]string{"split.index": "2"},
Key: opencdc.StructuredData{"id": 123},
Payload: opencdc.Change{After: opencdc.StructuredData{
"users": map[string]any{"name": "Charlie", "age": 35},
}},
},
},
})
Output: processor transformed record: --- before +++ after @@ -1,27 +1,59 @@ -{ +[ + { - "position": null, + "position": null, - "operation": "update", + "operation": "update", - "metadata": null, + "metadata": { + "split.index": "0" + }, - "key": { + "key": { - "id": 123 + "id": 123 - }, + }, - "payload": { - "before": null, - "after": { - "users": [ - { - "age": 30, - "name": "Alice" - }, - { - "age": 25, + "payload": { + "before": null, + "after": { + "users": { + "age": 30, + "name": "Alice" + } + } + } + }, + { + "position": null, + "operation": "update", + "metadata": { + "split.index": "1" + }, + "key": { + "id": 123 + }, + "payload": { + "before": null, + "after": { + "users": { + "age": 25, + "name": "Bob" + } + } + } + }, + { + "position": null, + "operation": "update", + "metadata": { + "split.index": "2" + }, + "key": { + "id": 123 + }, + "payload": { + "before": null, - "name": "Bob" + "after": { - }, - { + "users": { "age": 35, "name": "Charlie" } - ] + } } } -} +]
func NewSplitProcessor ¶ added in v0.14.0
func NewSplitProcessor(log.CtxLogger) *SplitProcessor
func (*SplitProcessor) Process ¶ added in v0.14.0
func (p *SplitProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*SplitProcessor) Specification ¶ added in v0.14.0
func (p *SplitProcessor) Specification() (sdk.Specification, error)
Click to show internal directories.
Click to hide internal directories.