Documentation
¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConvertProcessor ¶ added in v0.13.4
type ConvertProcessor struct {
sdk.UnimplementedProcessor
// contains filtered or unexported fields
}
Example (FloatToString) ¶
p := NewConvertProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
Summary: "Convert `float` to `string`",
Description: "This example takes the `float` in field `.Key.id` and changes its data type to `string`.",
Config: config.Config{"field": ".Key.id", "type": "string"},
Have: opencdc.Record{
Operation: opencdc.OperationUpdate,
Key: opencdc.StructuredData{"id": 123.345},
Payload: opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}},
},
Want: sdk.SingleRecord{
Operation: opencdc.OperationUpdate,
Key: opencdc.StructuredData{"id": "123.345"},
Payload: opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}},
},
})
Output: processor transformed record: --- before +++ after @@ -1,14 +1,14 @@ { "position": null, "operation": "update", "metadata": null, "key": { - "id": 123.345 + "id": "123.345" }, "payload": { "before": null, "after": { "foo": "bar" } } }
Example (IntToBool) ¶
p := NewConvertProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
Summary: "Convert `int` to `bool`",
Description: "This example takes the `int` in field `.Payload.After.done` and changes its data type to `bool`.",
Config: config.Config{"field": ".Payload.After.done", "type": "bool"},
Have: opencdc.Record{
Operation: opencdc.OperationUpdate,
Key: opencdc.StructuredData{"id": "123"},
Payload: opencdc.Change{After: opencdc.StructuredData{"done": 1}},
},
Want: sdk.SingleRecord{
Operation: opencdc.OperationUpdate,
Key: opencdc.StructuredData{"id": "123"},
Payload: opencdc.Change{After: opencdc.StructuredData{"done": true}},
},
})
Output: processor transformed record: --- before +++ after @@ -1,14 +1,14 @@ { "position": null, "operation": "update", "metadata": null, "key": { "id": "123" }, "payload": { "before": null, "after": { - "done": 1 + "done": true } } }
Example (IntTotime) ¶
p := NewConvertProcessor(log.Nop())
timeObj := time.Date(2024, 1, 2, 12, 34, 56, 123456789, time.UTC)
exampleutil.RunExample(p, exampleutil.Example{
Summary: "Convert `int` to `time`",
Description: "This example takes an `int` in field `.Payload.After.createdAt` and parses it as a unix timestamp into a `time.Time` value.",
Config: config.Config{"field": ".Payload.After.createdAt", "type": "time"},
Have: opencdc.Record{
Operation: opencdc.OperationCreate,
Key: opencdc.StructuredData{"id": 123.345},
Payload: opencdc.Change{After: opencdc.StructuredData{"createdAt": timeObj.UnixNano()}},
},
Want: sdk.SingleRecord{
Operation: opencdc.OperationCreate,
Key: opencdc.StructuredData{"id": 123.345},
Payload: opencdc.Change{After: opencdc.StructuredData{"createdAt": timeObj}},
},
})
Output: processor transformed record: --- before +++ after @@ -1,14 +1,14 @@ { "position": null, "operation": "create", "metadata": null, "key": { "id": 123.345 }, "payload": { "before": null, "after": { - "createdAt": 1704198896123456789 + "createdAt": "2024-01-02T12:34:56.123456789Z" } } }
Example (StringToInt) ¶
p := NewConvertProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
Summary: "Convert `string` to `int`",
Description: "This example takes the string in field `.Key.id` and changes its data type to `int`.",
Config: config.Config{"field": ".Key.id", "type": "int"},
Have: opencdc.Record{
Operation: opencdc.OperationUpdate,
Key: opencdc.StructuredData{"id": "123"},
Payload: opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}},
},
Want: sdk.SingleRecord{
Operation: opencdc.OperationUpdate,
Key: opencdc.StructuredData{"id": 123},
Payload: opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}},
},
})
Output: processor transformed record: --- before +++ after @@ -1,14 +1,14 @@ { "position": null, "operation": "update", "metadata": null, "key": { - "id": "123" + "id": 123 }, "payload": { "before": null, "after": { "foo": "bar" } } }
func NewConvertProcessor ¶
func NewConvertProcessor(log.CtxLogger) *ConvertProcessor
func (*ConvertProcessor) Process ¶ added in v0.13.4
func (p *ConvertProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*ConvertProcessor) Specification ¶ added in v0.13.4
func (p *ConvertProcessor) Specification() (sdk.Specification, error)
type ExcludeProcessor ¶ added in v0.13.4
type ExcludeProcessor struct {
sdk.UnimplementedProcessor
// contains filtered or unexported fields
}
Example (MultipleFields) ¶
p := NewExcludeProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
Summary: `Exclude multiple fields`,
Description: `It's possible to exclude multiple fields by providing a
comma-separated list of fields. In this example, we exclude ` + "`.Metadata`" + `,
` + "`.Payload.After.foo`" + ` and ` + "`.Key.key1`" + `.`,
Config: config.Config{"fields": ".Metadata,.Payload.After.foo,.Key.key1"},
Have: opencdc.Record{
Operation: opencdc.OperationCreate,
Metadata: map[string]string{"source": "s3"},
Key: opencdc.StructuredData{"key1": "val1", "key2": "val2"},
Payload: opencdc.Change{After: opencdc.StructuredData{"foo": "bar", "foobar": "baz"}, Before: opencdc.StructuredData{"bar": "baz"}},
},
Want: sdk.SingleRecord{
Operation: opencdc.OperationCreate,
Metadata: map[string]string{},
Key: opencdc.StructuredData{"key2": "val2"},
Payload: opencdc.Change{After: opencdc.StructuredData{"foobar": "baz"}, Before: opencdc.StructuredData{"bar": "baz"}},
},
})
Output: processor transformed record: --- before +++ after @@ -1,20 +1,16 @@ { "position": null, "operation": "create", - "metadata": { - "source": "s3" - }, + "metadata": {}, - "key": { - "key1": "val1", + "key": { "key2": "val2" }, "payload": { "before": { "bar": "baz" }, - "after": { - "foo": "bar", + "after": { "foobar": "baz" } } }
Example (OneField) ¶
p := NewExcludeProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
Summary: "Exclude all fields in payload",
Description: "Excluding all fields in `.Payload` results in an empty payload.",
Config: config.Config{"fields": ".Payload"},
Have: opencdc.Record{
Operation: opencdc.OperationCreate,
Metadata: map[string]string{"key1": "val1"},
Payload: opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}, Before: opencdc.StructuredData{"bar": "baz"}},
},
Want: sdk.SingleRecord{
Operation: opencdc.OperationCreate,
Metadata: map[string]string{"key1": "val1"},
},
})
Output: processor transformed record: --- before +++ after @@ -1,16 +1,12 @@ { "position": null, "operation": "create", "metadata": { "key1": "val1" }, "key": null, "payload": { - "before": { - "bar": "baz" - }, + "before": null, - "after": { - "foo": "bar" - } + "after": null } }
func NewExcludeProcessor ¶
func NewExcludeProcessor(log.CtxLogger) *ExcludeProcessor
func (*ExcludeProcessor) Process ¶ added in v0.13.4
func (p *ExcludeProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*ExcludeProcessor) Specification ¶ added in v0.13.4
func (p *ExcludeProcessor) Specification() (sdk.Specification, error)
type RenameProcessor ¶ added in v0.13.4
type RenameProcessor struct {
sdk.UnimplementedProcessor
// contains filtered or unexported fields
}
Example (Rename1) ¶
p := NewRenameProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
Summary: `Rename multiple fields`,
Description: `This example renames the fields in ` + "`.Metadata`" + ` and
` + "`.Payload.After`" + ` as specified in the ` + "`mapping`" + ` configuration parameter.`,
Config: config.Config{"mapping": ".Metadata.key1:newKey,.Payload.After.foo:newFoo"},
Have: opencdc.Record{
Operation: opencdc.OperationCreate,
Metadata: map[string]string{"key1": "val1"},
Payload: opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}, Before: opencdc.StructuredData{"bar": "baz"}},
},
Want: sdk.SingleRecord{
Operation: opencdc.OperationCreate,
Metadata: map[string]string{"newKey": "val1"},
Payload: opencdc.Change{After: opencdc.StructuredData{"newFoo": "bar"}, Before: opencdc.StructuredData{"bar": "baz"}},
},
})
Output: processor transformed record: --- before +++ after @@ -1,16 +1,16 @@ { "position": null, "operation": "create", "metadata": { - "key1": "val1" + "newKey": "val1" }, "key": null, "payload": { "before": { "bar": "baz" }, "after": { - "foo": "bar" + "newFoo": "bar" } } }
func NewRenameProcessor ¶
func NewRenameProcessor(log.CtxLogger) *RenameProcessor
func (*RenameProcessor) Process ¶ added in v0.13.4
func (p *RenameProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*RenameProcessor) Specification ¶ added in v0.13.4
func (p *RenameProcessor) Specification() (sdk.Specification, error)
type SetProcessor ¶ added in v0.13.4
type SetProcessor struct {
sdk.UnimplementedProcessor
// contains filtered or unexported fields
}
Example (AddField) ¶
p := NewSetProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
Summary: `Add field`,
Description: `This example adds a new field to the record. The field is
added to ` + "`.Payload.After`" + ` and is set to ` + "`bar`" + `.`,
Config: config.Config{"field": ".Payload.After.foo", "value": "bar"},
Have: opencdc.Record{
Operation: opencdc.OperationSnapshot,
Key: opencdc.StructuredData{"my-key": "id"},
},
Want: sdk.SingleRecord{
Key: opencdc.StructuredData{"my-key": "id"},
Operation: opencdc.OperationSnapshot,
Payload: opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}},
},
})
Output: processor transformed record: --- before +++ after @@ -1,12 +1,14 @@ { "position": null, "operation": "snapshot", "metadata": null, "key": { "my-key": "id" }, "payload": { "before": null, - "after": null + "after": { + "foo": "bar" + } } }
Example (SetOperation) ¶
p := NewSetProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
Summary: "Sets the record operation to `update`",
Description: "This example sets the `.Operation` field to `update` for all records.",
Config: config.Config{"field": ".Operation", "value": "update"},
Have: opencdc.Record{Operation: opencdc.OperationCreate},
Want: sdk.SingleRecord{Operation: opencdc.OperationUpdate},
})
Output: processor transformed record: --- before +++ after @@ -1,10 +1,10 @@ { "position": null, - "operation": "create", + "operation": "update", "metadata": null, "key": null, "payload": { "before": null, "after": null } }
Example (Template) ¶
p := NewSetProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
Summary: `Set field using Go template`,
Description: "This example sets the `.Payload.After.postgres` field to `true` if the `.Metadata.table` field contains `postgres`.",
Config: config.Config{"field": ".Payload.After.postgres", "value": "{{ eq .Metadata.table \"postgres\" }}"},
Have: opencdc.Record{
Metadata: map[string]string{"table": "postgres"},
Operation: opencdc.OperationSnapshot,
Payload: opencdc.Change{After: opencdc.StructuredData{"postgres": "false"}},
},
Want: sdk.SingleRecord{
Metadata: map[string]string{"table": "postgres"},
Operation: opencdc.OperationSnapshot,
Payload: opencdc.Change{After: opencdc.StructuredData{"postgres": "true"}},
},
})
Output: processor transformed record: --- before +++ after @@ -1,14 +1,14 @@ { "position": null, "operation": "snapshot", "metadata": { "table": "postgres" }, "key": null, "payload": { "before": null, "after": { - "postgres": "false" + "postgres": "true" } } }
func NewSetProcessor ¶
func NewSetProcessor(log.CtxLogger) *SetProcessor
func (*SetProcessor) Process ¶ added in v0.13.4
func (p *SetProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*SetProcessor) Specification ¶ added in v0.13.4
func (p *SetProcessor) Specification() (sdk.Specification, error)
Click to show internal directories.
Click to hide internal directories.