Documentation
¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DecodeProcessor ¶ added in v0.13.4
type DecodeProcessor struct { sdk.UnimplementedProcessor // contains filtered or unexported fields }
Example (RawKey) ¶
p := NewDecodeProcessor(log.Nop()) exampleutil.RunExample(p, exampleutil.Example{ Summary: `Decode record key as JSON`, Description: `This example takes a record containing a raw JSON string in ` + "`.Key`" + ` and converts it into structured data.`, Config: config.Config{"field": ".Key"}, Have: opencdc.Record{ Operation: opencdc.OperationCreate, Key: opencdc.RawData(`{"after":{"data":4,"id":3}}`), }, Want: sdk.SingleRecord{ Operation: opencdc.OperationCreate, Key: opencdc.StructuredData{ "after": map[string]interface{}{"data": float64(4), "id": float64(3)}, }, }, })
Output: processor transformed record: --- before +++ after @@ -1,10 +1,15 @@ { "position": null, "operation": "create", "metadata": null, - "key": "{\"after\":{\"data\":4,\"id\":3}}", + "key": { + "after": { + "data": 4, + "id": 3 + } + }, "payload": { "before": null, "after": null } }
Example (RawPayloadField) ¶
p := NewDecodeProcessor(log.Nop()) exampleutil.RunExample(p, exampleutil.Example{ Summary: "Decode nested field as JSON", Description: `This example takes a record containing a raw JSON string in ` + "`.Payload.Before.foo`" + ` and converts it into a map.`, Config: config.Config{"field": ".Payload.Before.foo"}, Have: opencdc.Record{ Operation: opencdc.OperationSnapshot, Payload: opencdc.Change{ Before: opencdc.StructuredData{ "foo": `{"before":{"data":4,"id":3},"baz":"bar"}`, }, }, }, Want: sdk.SingleRecord{ Operation: opencdc.OperationSnapshot, Payload: opencdc.Change{ Before: opencdc.StructuredData{ "foo": map[string]any{ "before": map[string]any{"data": float64(4), "id": float64(3)}, "baz": "bar", }, }, }, }, })
Output: processor transformed record: --- before +++ after @@ -1,12 +1,18 @@ { "position": null, "operation": "snapshot", "metadata": null, "key": null, "payload": { "before": { - "foo": "{\"before\":{\"data\":4,\"id\":3},\"baz\":\"bar\"}" + "foo": { + "baz": "bar", + "before": { + "data": 4, + "id": 3 + } + } }, "after": null } }
func NewDecodeProcessor ¶
func NewDecodeProcessor(log.CtxLogger) *DecodeProcessor
func (*DecodeProcessor) Process ¶ added in v0.13.4
func (p *DecodeProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*DecodeProcessor) Specification ¶ added in v0.13.4
func (p *DecodeProcessor) Specification() (sdk.Specification, error)
type EncodeProcessor ¶ added in v0.13.4
type EncodeProcessor struct { sdk.UnimplementedProcessor // contains filtered or unexported fields }
Example (MapToJSON) ¶
p := NewEncodeProcessor(log.Nop()) exampleutil.RunExample(p, exampleutil.Example{ Summary: "Encode nested field to JSON", Description: `This example takes a record containing a map in ` + "`.Payload.Before.foo`" + ` and converts it into a raw JSON string.`, Config: config.Config{"field": ".Payload.Before.foo"}, Have: opencdc.Record{ Operation: opencdc.OperationSnapshot, Payload: opencdc.Change{ Before: opencdc.StructuredData{ "foo": map[string]any{ "before": map[string]any{"data": float64(4), "id": float64(3)}, "baz": "bar", }, }, }, }, Want: sdk.SingleRecord{ Operation: opencdc.OperationSnapshot, Payload: opencdc.Change{ Before: opencdc.StructuredData{ "foo": `{"baz":"bar","before":{"data":4,"id":3}}`, }, }, }, })
Output: processor transformed record: --- before +++ after @@ -1,18 +1,12 @@ { "position": null, "operation": "snapshot", "metadata": null, "key": null, "payload": { "before": { - "foo": { - "baz": "bar", - "before": { - "data": 4, - "id": 3 - } - } + "foo": "{\"baz\":\"bar\",\"before\":{\"data\":4,\"id\":3}}" }, "after": null } }
Example (StructuredKey) ¶
p := NewEncodeProcessor(log.Nop()) exampleutil.RunExample(p, exampleutil.Example{ Summary: "Encode record key to JSON", Description: `This example takes a record containing structured data in ` + "`.Key`" + ` and converts it into a raw JSON string.`, Config: config.Config{"field": ".Key"}, Have: opencdc.Record{ Operation: opencdc.OperationCreate, Key: opencdc.StructuredData{ "tables": []string{"table1,table2"}, }, }, Want: sdk.SingleRecord{ Operation: opencdc.OperationCreate, Key: opencdc.RawData(`{"tables":["table1,table2"]}`), }, })
Output: processor transformed record: --- before +++ after @@ -1,14 +1,10 @@ { "position": null, "operation": "create", "metadata": null, - "key": { - "tables": [ - "table1,table2" - ] - }, + "key": "{\"tables\":[\"table1,table2\"]}", "payload": { "before": null, "after": null } }
func NewEncodeProcessor ¶
func NewEncodeProcessor(log.CtxLogger) *EncodeProcessor
func (*EncodeProcessor) Process ¶ added in v0.13.4
func (p *EncodeProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*EncodeProcessor) Specification ¶ added in v0.13.4
func (p *EncodeProcessor) Specification() (sdk.Specification, error)
Click to show internal directories.
Click to hide internal directories.