Documentation
¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConvertProcessor ¶ added in v0.13.4
type ConvertProcessor struct { sdk.UnimplementedProcessor // contains filtered or unexported fields }
Example (FloatToString) ¶
p := NewConvertProcessor(log.Nop()) exampleutil.RunExample(p, exampleutil.Example{ Summary: "Convert `float` to `string`", Description: "This example takes the `float` in field `.Key.id` and changes its data type to `string`.", Config: config.Config{"field": ".Key.id", "type": "string"}, Have: opencdc.Record{ Operation: opencdc.OperationUpdate, Key: opencdc.StructuredData{"id": 123.345}, Payload: opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}}, }, Want: sdk.SingleRecord{ Operation: opencdc.OperationUpdate, Key: opencdc.StructuredData{"id": "123.345"}, Payload: opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}}, }, })
Output: processor transformed record: --- before +++ after @@ -1,14 +1,14 @@ { "position": null, "operation": "update", "metadata": null, "key": { - "id": 123.345 + "id": "123.345" }, "payload": { "before": null, "after": { "foo": "bar" } } }
Example (IntToBool) ¶
p := NewConvertProcessor(log.Nop()) exampleutil.RunExample(p, exampleutil.Example{ Summary: "Convert `int` to `bool`", Description: "This example takes the `int` in field `.Payload.After.done` and changes its data type to `bool`.", Config: config.Config{"field": ".Payload.After.done", "type": "bool"}, Have: opencdc.Record{ Operation: opencdc.OperationUpdate, Key: opencdc.StructuredData{"id": "123"}, Payload: opencdc.Change{After: opencdc.StructuredData{"done": 1}}, }, Want: sdk.SingleRecord{ Operation: opencdc.OperationUpdate, Key: opencdc.StructuredData{"id": "123"}, Payload: opencdc.Change{After: opencdc.StructuredData{"done": true}}, }, })
Output: processor transformed record: --- before +++ after @@ -1,14 +1,14 @@ { "position": null, "operation": "update", "metadata": null, "key": { "id": "123" }, "payload": { "before": null, "after": { - "done": 1 + "done": true } } }
Example (IntTotime) ¶
p := NewConvertProcessor(log.Nop()) timeObj := time.Date(2024, 1, 2, 12, 34, 56, 123456789, time.UTC) exampleutil.RunExample(p, exampleutil.Example{ Summary: "Convert `int` to `time`", Description: "This example takes an `int` in field `.Payload.After.createdAt` and parses it as a unix timestamp into a `time.Time` value.", Config: config.Config{"field": ".Payload.After.createdAt", "type": "time"}, Have: opencdc.Record{ Operation: opencdc.OperationCreate, Key: opencdc.StructuredData{"id": 123.345}, Payload: opencdc.Change{After: opencdc.StructuredData{"createdAt": timeObj.UnixNano()}}, }, Want: sdk.SingleRecord{ Operation: opencdc.OperationCreate, Key: opencdc.StructuredData{"id": 123.345}, Payload: opencdc.Change{After: opencdc.StructuredData{"createdAt": timeObj}}, }, })
Output: processor transformed record: --- before +++ after @@ -1,14 +1,14 @@ { "position": null, "operation": "create", "metadata": null, "key": { "id": 123.345 }, "payload": { "before": null, "after": { - "createdAt": 1704198896123456789 + "createdAt": "2024-01-02T12:34:56.123456789Z" } } }
Example (StringToInt) ¶
p := NewConvertProcessor(log.Nop()) exampleutil.RunExample(p, exampleutil.Example{ Summary: "Convert `string` to `int`", Description: "This example takes the string in field `.Key.id` and changes its data type to `int`.", Config: config.Config{"field": ".Key.id", "type": "int"}, Have: opencdc.Record{ Operation: opencdc.OperationUpdate, Key: opencdc.StructuredData{"id": "123"}, Payload: opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}}, }, Want: sdk.SingleRecord{ Operation: opencdc.OperationUpdate, Key: opencdc.StructuredData{"id": 123}, Payload: opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}}, }, })
Output: processor transformed record: --- before +++ after @@ -1,14 +1,14 @@ { "position": null, "operation": "update", "metadata": null, "key": { - "id": "123" + "id": 123 }, "payload": { "before": null, "after": { "foo": "bar" } } }
func NewConvertProcessor ¶
func NewConvertProcessor(log.CtxLogger) *ConvertProcessor
func (*ConvertProcessor) Process ¶ added in v0.13.4
func (p *ConvertProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*ConvertProcessor) Specification ¶ added in v0.13.4
func (p *ConvertProcessor) Specification() (sdk.Specification, error)
type ExcludeProcessor ¶ added in v0.13.4
type ExcludeProcessor struct { sdk.UnimplementedProcessor // contains filtered or unexported fields }
Example (MultipleFields) ¶
p := NewExcludeProcessor(log.Nop()) exampleutil.RunExample(p, exampleutil.Example{ Summary: `Exclude multiple fields`, Description: `It's possible to exclude multiple fields by providing a comma-separated list of fields. In this example, we exclude ` + "`.Metadata`" + `, ` + "`.Payload.After.foo`" + ` and ` + "`.Key.key1`" + `.`, Config: config.Config{"fields": ".Metadata,.Payload.After.foo,.Key.key1"}, Have: opencdc.Record{ Operation: opencdc.OperationCreate, Metadata: map[string]string{"source": "s3"}, Key: opencdc.StructuredData{"key1": "val1", "key2": "val2"}, Payload: opencdc.Change{After: opencdc.StructuredData{"foo": "bar", "foobar": "baz"}, Before: opencdc.StructuredData{"bar": "baz"}}, }, Want: sdk.SingleRecord{ Operation: opencdc.OperationCreate, Metadata: map[string]string{}, Key: opencdc.StructuredData{"key2": "val2"}, Payload: opencdc.Change{After: opencdc.StructuredData{"foobar": "baz"}, Before: opencdc.StructuredData{"bar": "baz"}}, }, })
Output: processor transformed record: --- before +++ after @@ -1,20 +1,16 @@ { "position": null, "operation": "create", - "metadata": { - "source": "s3" - }, + "metadata": {}, - "key": { - "key1": "val1", + "key": { "key2": "val2" }, "payload": { "before": { "bar": "baz" }, - "after": { - "foo": "bar", + "after": { "foobar": "baz" } } }
Example (OneField) ¶
p := NewExcludeProcessor(log.Nop()) exampleutil.RunExample(p, exampleutil.Example{ Summary: "Exclude all fields in payload", Description: "Excluding all fields in `.Payload` results in an empty payload.", Config: config.Config{"fields": ".Payload"}, Have: opencdc.Record{ Operation: opencdc.OperationCreate, Metadata: map[string]string{"key1": "val1"}, Payload: opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}, Before: opencdc.StructuredData{"bar": "baz"}}, }, Want: sdk.SingleRecord{ Operation: opencdc.OperationCreate, Metadata: map[string]string{"key1": "val1"}, }, })
Output: processor transformed record: --- before +++ after @@ -1,16 +1,12 @@ { "position": null, "operation": "create", "metadata": { "key1": "val1" }, "key": null, "payload": { - "before": { - "bar": "baz" - }, + "before": null, - "after": { - "foo": "bar" - } + "after": null } }
func NewExcludeProcessor ¶
func NewExcludeProcessor(log.CtxLogger) *ExcludeProcessor
func (*ExcludeProcessor) Process ¶ added in v0.13.4
func (p *ExcludeProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*ExcludeProcessor) Specification ¶ added in v0.13.4
func (p *ExcludeProcessor) Specification() (sdk.Specification, error)
type RenameProcessor ¶ added in v0.13.4
type RenameProcessor struct { sdk.UnimplementedProcessor // contains filtered or unexported fields }
Example (Rename1) ¶
p := NewRenameProcessor(log.Nop()) exampleutil.RunExample(p, exampleutil.Example{ Summary: `Rename multiple fields`, Description: `This example renames the fields in ` + "`.Metadata`" + ` and ` + "`.Payload.After`" + ` as specified in the ` + "`mapping`" + ` configuration parameter.`, Config: config.Config{"mapping": ".Metadata.key1:newKey,.Payload.After.foo:newFoo"}, Have: opencdc.Record{ Operation: opencdc.OperationCreate, Metadata: map[string]string{"key1": "val1"}, Payload: opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}, Before: opencdc.StructuredData{"bar": "baz"}}, }, Want: sdk.SingleRecord{ Operation: opencdc.OperationCreate, Metadata: map[string]string{"newKey": "val1"}, Payload: opencdc.Change{After: opencdc.StructuredData{"newFoo": "bar"}, Before: opencdc.StructuredData{"bar": "baz"}}, }, })
Output: processor transformed record: --- before +++ after @@ -1,16 +1,16 @@ { "position": null, "operation": "create", "metadata": { - "key1": "val1" + "newKey": "val1" }, "key": null, "payload": { "before": { "bar": "baz" }, "after": { - "foo": "bar" + "newFoo": "bar" } } }
func NewRenameProcessor ¶
func NewRenameProcessor(log.CtxLogger) *RenameProcessor
func (*RenameProcessor) Process ¶ added in v0.13.4
func (p *RenameProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*RenameProcessor) Specification ¶ added in v0.13.4
func (p *RenameProcessor) Specification() (sdk.Specification, error)
type SetProcessor ¶ added in v0.13.4
type SetProcessor struct { sdk.UnimplementedProcessor // contains filtered or unexported fields }
Example (AddField) ¶
p := NewSetProcessor(log.Nop()) exampleutil.RunExample(p, exampleutil.Example{ Summary: `Add field`, Description: `This example adds a new field to the record. The field is added to ` + "`.Payload.After`" + ` and is set to ` + "`bar`" + `.`, Config: config.Config{"field": ".Payload.After.foo", "value": "bar"}, Have: opencdc.Record{ Operation: opencdc.OperationSnapshot, Key: opencdc.StructuredData{"my-key": "id"}, }, Want: sdk.SingleRecord{ Key: opencdc.StructuredData{"my-key": "id"}, Operation: opencdc.OperationSnapshot, Payload: opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}}, }, })
Output: processor transformed record: --- before +++ after @@ -1,12 +1,14 @@ { "position": null, "operation": "snapshot", "metadata": null, "key": { "my-key": "id" }, "payload": { "before": null, - "after": null + "after": { + "foo": "bar" + } } }
Example (SetOperation) ¶
p := NewSetProcessor(log.Nop()) exampleutil.RunExample(p, exampleutil.Example{ Summary: "Sets the record operation to `update`", Description: "This example sets the `.Operation` field to `update` for all records.", Config: config.Config{"field": ".Operation", "value": "update"}, Have: opencdc.Record{Operation: opencdc.OperationCreate}, Want: sdk.SingleRecord{Operation: opencdc.OperationUpdate}, })
Output: processor transformed record: --- before +++ after @@ -1,10 +1,10 @@ { "position": null, - "operation": "create", + "operation": "update", "metadata": null, "key": null, "payload": { "before": null, "after": null } }
Example (Template) ¶
p := NewSetProcessor(log.Nop()) exampleutil.RunExample(p, exampleutil.Example{ Summary: `Set field using Go template`, Description: "This example sets the `.Payload.After.postgres` field to `true` if the `.Metadata.table` field contains `postgres`.", Config: config.Config{"field": ".Payload.After.postgres", "value": "{{ eq .Metadata.table \"postgres\" }}"}, Have: opencdc.Record{ Metadata: map[string]string{"table": "postgres"}, Operation: opencdc.OperationSnapshot, Payload: opencdc.Change{After: opencdc.StructuredData{"postgres": "false"}}, }, Want: sdk.SingleRecord{ Metadata: map[string]string{"table": "postgres"}, Operation: opencdc.OperationSnapshot, Payload: opencdc.Change{After: opencdc.StructuredData{"postgres": "true"}}, }, })
Output: processor transformed record: --- before +++ after @@ -1,14 +1,14 @@ { "position": null, "operation": "snapshot", "metadata": { "table": "postgres" }, "key": null, "payload": { "before": null, "after": { - "postgres": "false" + "postgres": "true" } } }
func NewSetProcessor ¶
func NewSetProcessor(log.CtxLogger) *SetProcessor
func (*SetProcessor) Process ¶ added in v0.13.4
func (p *SetProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*SetProcessor) Specification ¶ added in v0.13.4
func (p *SetProcessor) Specification() (sdk.Specification, error)
Click to show internal directories.
Click to hide internal directories.