Documentation
¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DecodeProcessor ¶ added in v0.13.4
type DecodeProcessor struct {
sdk.UnimplementedProcessor
// contains filtered or unexported fields
}
Example (RawKey) ¶
p := NewDecodeProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
Summary: `Decode record key as JSON`,
Description: `This example takes a record containing a raw JSON string in
` + "`.Key`" + ` and converts it into structured data.`,
Config: config.Config{"field": ".Key"},
Have: opencdc.Record{
Operation: opencdc.OperationCreate,
Key: opencdc.RawData(`{"after":{"data":4,"id":3}}`),
},
Want: sdk.SingleRecord{
Operation: opencdc.OperationCreate,
Key: opencdc.StructuredData{
"after": map[string]interface{}{"data": float64(4), "id": float64(3)},
},
},
})
Output: processor transformed record: --- before +++ after @@ -1,10 +1,15 @@ { "position": null, "operation": "create", "metadata": null, - "key": "{\"after\":{\"data\":4,\"id\":3}}", + "key": { + "after": { + "data": 4, + "id": 3 + } + }, "payload": { "before": null, "after": null } }
Example (RawPayloadField) ¶
p := NewDecodeProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
Summary: "Decode nested field as JSON",
Description: `This example takes a record containing a raw JSON string in
` + "`.Payload.Before.foo`" + ` and converts it into a map.`,
Config: config.Config{"field": ".Payload.Before.foo"},
Have: opencdc.Record{
Operation: opencdc.OperationSnapshot,
Payload: opencdc.Change{
Before: opencdc.StructuredData{
"foo": `{"before":{"data":4,"id":3},"baz":"bar"}`,
},
},
},
Want: sdk.SingleRecord{
Operation: opencdc.OperationSnapshot,
Payload: opencdc.Change{
Before: opencdc.StructuredData{
"foo": map[string]any{
"before": map[string]any{"data": float64(4), "id": float64(3)},
"baz": "bar",
},
},
},
},
})
Output: processor transformed record: --- before +++ after @@ -1,12 +1,18 @@ { "position": null, "operation": "snapshot", "metadata": null, "key": null, "payload": { "before": { - "foo": "{\"before\":{\"data\":4,\"id\":3},\"baz\":\"bar\"}" + "foo": { + "baz": "bar", + "before": { + "data": 4, + "id": 3 + } + } }, "after": null } }
func NewDecodeProcessor ¶
func NewDecodeProcessor(log.CtxLogger) *DecodeProcessor
func (*DecodeProcessor) Process ¶ added in v0.13.4
func (p *DecodeProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*DecodeProcessor) Specification ¶ added in v0.13.4
func (p *DecodeProcessor) Specification() (sdk.Specification, error)
type EncodeProcessor ¶ added in v0.13.4
type EncodeProcessor struct {
sdk.UnimplementedProcessor
// contains filtered or unexported fields
}
Example (MapToJSON) ¶
p := NewEncodeProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
Summary: "Encode nested field to JSON",
Description: `This example takes a record containing a map in
` + "`.Payload.Before.foo`" + ` and converts it into a raw JSON string.`,
Config: config.Config{"field": ".Payload.Before.foo"},
Have: opencdc.Record{
Operation: opencdc.OperationSnapshot,
Payload: opencdc.Change{
Before: opencdc.StructuredData{
"foo": map[string]any{
"before": map[string]any{"data": float64(4), "id": float64(3)},
"baz": "bar",
},
},
},
},
Want: sdk.SingleRecord{
Operation: opencdc.OperationSnapshot,
Payload: opencdc.Change{
Before: opencdc.StructuredData{
"foo": `{"baz":"bar","before":{"data":4,"id":3}}`,
},
},
},
})
Output: processor transformed record: --- before +++ after @@ -1,18 +1,12 @@ { "position": null, "operation": "snapshot", "metadata": null, "key": null, "payload": { "before": { - "foo": { - "baz": "bar", - "before": { - "data": 4, - "id": 3 - } - } + "foo": "{\"baz\":\"bar\",\"before\":{\"data\":4,\"id\":3}}" }, "after": null } }
Example (StructuredKey) ¶
p := NewEncodeProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
Summary: "Encode record key to JSON",
Description: `This example takes a record containing structured data in
` + "`.Key`" + ` and converts it into a raw JSON string.`,
Config: config.Config{"field": ".Key"},
Have: opencdc.Record{
Operation: opencdc.OperationCreate,
Key: opencdc.StructuredData{
"tables": []string{"table1,table2"},
},
},
Want: sdk.SingleRecord{
Operation: opencdc.OperationCreate,
Key: opencdc.RawData(`{"tables":["table1,table2"]}`),
},
})
Output: processor transformed record: --- before +++ after @@ -1,14 +1,10 @@ { "position": null, "operation": "create", "metadata": null, - "key": { - "tables": [ - "table1,table2" - ] - }, + "key": "{\"tables\":[\"table1,table2\"]}", "payload": { "before": null, "after": null } }
func NewEncodeProcessor ¶
func NewEncodeProcessor(log.CtxLogger) *EncodeProcessor
func (*EncodeProcessor) Process ¶ added in v0.13.4
func (p *EncodeProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*EncodeProcessor) Specification ¶ added in v0.13.4
func (p *EncodeProcessor) Specification() (sdk.Specification, error)
Click to show internal directories.
Click to hide internal directories.