Documentation
¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DecodeProcessor ¶ added in v0.13.4
type DecodeProcessor struct {
sdk.UnimplementedProcessor
// contains filtered or unexported fields
}
Example ¶
p := NewDecodeProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
Summary: "Decode a base64 encoded string",
Description: `This example decodes the base64 encoded string stored in
` + "`.Payload.After`" + `. Note that the result is a string, so if you want to
further process the result (e.g. parse the string as JSON), you need to chain
other processors (e.g. [` + "`json.decode`" + `](/docs/using/processors/builtin/json.decode)).`,
Config: config.Config{
"field": ".Payload.After.foo",
},
Have: opencdc.Record{
Position: opencdc.Position("test-position"),
Operation: opencdc.OperationCreate,
Metadata: map[string]string{"key1": "val1"},
Key: opencdc.RawData("test-key"),
Payload: opencdc.Change{
After: opencdc.StructuredData{
"foo": "YmFy",
},
},
},
Want: sdk.SingleRecord{
Position: opencdc.Position("test-position"),
Operation: opencdc.OperationCreate,
Metadata: map[string]string{"key1": "val1"},
Key: opencdc.RawData("test-key"),
Payload: opencdc.Change{
After: opencdc.StructuredData{
"foo": "bar",
},
},
},
})
Output: processor transformed record: --- before +++ after @@ -1,14 +1,14 @@ { "position": "dGVzdC1wb3NpdGlvbg==", "operation": "create", "metadata": { "key1": "val1" }, "key": "test-key", "payload": { "before": null, "after": { - "foo": "YmFy" + "foo": "bar" } } }
func NewDecodeProcessor ¶
func NewDecodeProcessor(log.CtxLogger) *DecodeProcessor
func (*DecodeProcessor) Process ¶ added in v0.13.4
func (p *DecodeProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*DecodeProcessor) Specification ¶ added in v0.13.4
func (p *DecodeProcessor) Specification() (sdk.Specification, error)
type EncodeProcessor ¶ added in v0.13.4
type EncodeProcessor struct {
sdk.UnimplementedProcessor
// contains filtered or unexported fields
}
Example (RawData) ¶
p := NewEncodeProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
Summary: "Encode record key to base64",
Description: `TThis example takes a record containing raw data in
` + "`.Key`" + ` and converts it into a base64 encoded string.`,
Config: config.Config{
"field": ".Key",
},
Have: opencdc.Record{
Position: opencdc.Position("test-position"),
Operation: opencdc.OperationCreate,
Metadata: map[string]string{"key1": "val1"},
Key: opencdc.RawData("test-key"),
Payload: opencdc.Change{
After: opencdc.StructuredData{
"foo": "bar",
},
},
},
Want: sdk.SingleRecord{
Position: opencdc.Position("test-position"),
Operation: opencdc.OperationCreate,
Metadata: map[string]string{"key1": "val1"},
Key: opencdc.RawData("dGVzdC1rZXk="),
Payload: opencdc.Change{
After: opencdc.StructuredData{
"foo": "bar",
},
},
},
})
Output: processor transformed record: --- before +++ after @@ -1,14 +1,14 @@ { "position": "dGVzdC1wb3NpdGlvbg==", "operation": "create", "metadata": { "key1": "val1" }, - "key": "test-key", + "key": "dGVzdC1rZXk=", "payload": { "before": null, "after": { "foo": "bar" } } }
Example (StringField) ¶
p := NewEncodeProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
Summary: "Encode nested value to base64",
Description: `This example takes a record containing a string in
` + "`.Payload.Before.foo`" + ` and converts it into a base64 encoded string.`,
Config: config.Config{
"field": ".Payload.After.foo",
},
Have: opencdc.Record{
Position: opencdc.Position("test-position"),
Operation: opencdc.OperationCreate,
Metadata: map[string]string{"key1": "val1"},
Key: opencdc.RawData("test-key"),
Payload: opencdc.Change{
After: opencdc.StructuredData{
"foo": "bar",
},
},
},
Want: sdk.SingleRecord{
Position: opencdc.Position("test-position"),
Operation: opencdc.OperationCreate,
Metadata: map[string]string{"key1": "val1"},
Key: opencdc.RawData("test-key"),
Payload: opencdc.Change{
After: opencdc.StructuredData{
"foo": "YmFy",
},
},
},
})
Output: processor transformed record: --- before +++ after @@ -1,14 +1,14 @@ { "position": "dGVzdC1wb3NpdGlvbg==", "operation": "create", "metadata": { "key1": "val1" }, "key": "test-key", "payload": { "before": null, "after": { - "foo": "bar" + "foo": "YmFy" } } }
func NewEncodeProcessor ¶
func NewEncodeProcessor(log.CtxLogger) *EncodeProcessor
func (*EncodeProcessor) Process ¶ added in v0.13.4
func (p *EncodeProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*EncodeProcessor) Specification ¶ added in v0.13.4
func (p *EncodeProcessor) Specification() (sdk.Specification, error)
Click to show internal directories.
Click to hide internal directories.