 Documentation
      ¶
      Documentation
      ¶
    
    
  
    
  
    Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConvertProcessor ¶ added in v0.13.4
type ConvertProcessor struct {
	sdk.UnimplementedProcessor
	// contains filtered or unexported fields
}
    Example (FloatToString) ¶
p := NewConvertProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
	Summary:     "Convert `float` to `string`",
	Description: "This example takes the `float` in field `.Key.id` and changes its data type to `string`.",
	Config:      config.Config{"field": ".Key.id", "type": "string"},
	Have: opencdc.Record{
		Operation: opencdc.OperationUpdate,
		Key:       opencdc.StructuredData{"id": 123.345},
		Payload:   opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}},
	},
	Want: sdk.SingleRecord{
		Operation: opencdc.OperationUpdate,
		Key:       opencdc.StructuredData{"id": "123.345"},
		Payload:   opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}},
	},
})
Output: processor transformed record: --- before +++ after @@ -1,14 +1,14 @@ { "position": null, "operation": "update", "metadata": null, "key": { - "id": 123.345 + "id": "123.345" }, "payload": { "before": null, "after": { "foo": "bar" } } }
Example (IntToBool) ¶
p := NewConvertProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
	Summary:     "Convert `int` to `bool`",
	Description: "This example takes the `int` in field `.Payload.After.done` and changes its data type to `bool`.",
	Config:      config.Config{"field": ".Payload.After.done", "type": "bool"},
	Have: opencdc.Record{
		Operation: opencdc.OperationUpdate,
		Key:       opencdc.StructuredData{"id": "123"},
		Payload:   opencdc.Change{After: opencdc.StructuredData{"done": 1}},
	},
	Want: sdk.SingleRecord{
		Operation: opencdc.OperationUpdate,
		Key:       opencdc.StructuredData{"id": "123"},
		Payload:   opencdc.Change{After: opencdc.StructuredData{"done": true}},
	},
})
Output: processor transformed record: --- before +++ after @@ -1,14 +1,14 @@ { "position": null, "operation": "update", "metadata": null, "key": { "id": "123" }, "payload": { "before": null, "after": { - "done": 1 + "done": true } } }
Example (IntTotime) ¶
p := NewConvertProcessor(log.Nop())
timeObj := time.Date(2024, 1, 2, 12, 34, 56, 123456789, time.UTC)
exampleutil.RunExample(p, exampleutil.Example{
	Summary:     "Convert `int` to `time`",
	Description: "This example takes an `int` in field `.Payload.After.createdAt` and parses it as a unix timestamp into a `time.Time` value.",
	Config:      config.Config{"field": ".Payload.After.createdAt", "type": "time"},
	Have: opencdc.Record{
		Operation: opencdc.OperationCreate,
		Key:       opencdc.StructuredData{"id": 123.345},
		Payload:   opencdc.Change{After: opencdc.StructuredData{"createdAt": timeObj.UnixNano()}},
	},
	Want: sdk.SingleRecord{
		Operation: opencdc.OperationCreate,
		Key:       opencdc.StructuredData{"id": 123.345},
		Payload:   opencdc.Change{After: opencdc.StructuredData{"createdAt": timeObj}},
	},
})
Output: processor transformed record: --- before +++ after @@ -1,14 +1,14 @@ { "position": null, "operation": "create", "metadata": null, "key": { "id": 123.345 }, "payload": { "before": null, "after": { - "createdAt": 1704198896123456789 + "createdAt": "2024-01-02T12:34:56.123456789Z" } } }
Example (StringToInt) ¶
p := NewConvertProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
	Summary:     "Convert `string` to `int`",
	Description: "This example takes the string in field `.Key.id` and changes its data type to `int`.",
	Config:      config.Config{"field": ".Key.id", "type": "int"},
	Have: opencdc.Record{
		Operation: opencdc.OperationUpdate,
		Key:       opencdc.StructuredData{"id": "123"},
		Payload:   opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}},
	},
	Want: sdk.SingleRecord{
		Operation: opencdc.OperationUpdate,
		Key:       opencdc.StructuredData{"id": 123},
		Payload:   opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}},
	},
})
Output: processor transformed record: --- before +++ after @@ -1,14 +1,14 @@ { "position": null, "operation": "update", "metadata": null, "key": { - "id": "123" + "id": 123 }, "payload": { "before": null, "after": { "foo": "bar" } } }
func NewConvertProcessor ¶
func NewConvertProcessor(log.CtxLogger) *ConvertProcessor
func (*ConvertProcessor) Process ¶ added in v0.13.4
func (p *ConvertProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*ConvertProcessor) Specification ¶ added in v0.13.4
func (p *ConvertProcessor) Specification() (sdk.Specification, error)
type ExcludeProcessor ¶ added in v0.13.4
type ExcludeProcessor struct {
	sdk.UnimplementedProcessor
	// contains filtered or unexported fields
}
    Example (MultipleFields) ¶
p := NewExcludeProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
	Summary: `Exclude multiple fields`,
	Description: `It's possible to exclude multiple fields by providing a
comma-separated list of fields. In this example, we exclude ` + "`.Metadata`" + `,
` + "`.Payload.After.foo`" + ` and ` + "`.Key.key1`" + `.`,
	Config: config.Config{"fields": ".Metadata,.Payload.After.foo,.Key.key1"},
	Have: opencdc.Record{
		Operation: opencdc.OperationCreate,
		Metadata:  map[string]string{"source": "s3"},
		Key:       opencdc.StructuredData{"key1": "val1", "key2": "val2"},
		Payload:   opencdc.Change{After: opencdc.StructuredData{"foo": "bar", "foobar": "baz"}, Before: opencdc.StructuredData{"bar": "baz"}},
	},
	Want: sdk.SingleRecord{
		Operation: opencdc.OperationCreate,
		Metadata:  map[string]string{},
		Key:       opencdc.StructuredData{"key2": "val2"},
		Payload:   opencdc.Change{After: opencdc.StructuredData{"foobar": "baz"}, Before: opencdc.StructuredData{"bar": "baz"}},
	},
})
Output: processor transformed record: --- before +++ after @@ -1,20 +1,16 @@ { "position": null, "operation": "create", - "metadata": { - "source": "s3" - }, + "metadata": {}, - "key": { - "key1": "val1", + "key": { "key2": "val2" }, "payload": { "before": { "bar": "baz" }, - "after": { - "foo": "bar", + "after": { "foobar": "baz" } } }
Example (OneField) ¶
p := NewExcludeProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
	Summary:     "Exclude all fields in payload",
	Description: "Excluding all fields in `.Payload` results in an empty payload.",
	Config:      config.Config{"fields": ".Payload"},
	Have: opencdc.Record{
		Operation: opencdc.OperationCreate,
		Metadata:  map[string]string{"key1": "val1"},
		Payload:   opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}, Before: opencdc.StructuredData{"bar": "baz"}},
	},
	Want: sdk.SingleRecord{
		Operation: opencdc.OperationCreate,
		Metadata:  map[string]string{"key1": "val1"},
	},
})
Output: processor transformed record: --- before +++ after @@ -1,16 +1,12 @@ { "position": null, "operation": "create", "metadata": { "key1": "val1" }, "key": null, "payload": { - "before": { - "bar": "baz" - }, + "before": null, - "after": { - "foo": "bar" - } + "after": null } }
func NewExcludeProcessor ¶
func NewExcludeProcessor(log.CtxLogger) *ExcludeProcessor
func (*ExcludeProcessor) Process ¶ added in v0.13.4
func (p *ExcludeProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*ExcludeProcessor) Specification ¶ added in v0.13.4
func (p *ExcludeProcessor) Specification() (sdk.Specification, error)
type RenameProcessor ¶ added in v0.13.4
type RenameProcessor struct {
	sdk.UnimplementedProcessor
	// contains filtered or unexported fields
}
    Example (Rename1) ¶
p := NewRenameProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
	Summary: `Rename multiple fields`,
	Description: `This example renames the fields in ` + "`.Metadata`" + ` and
` + "`.Payload.After`" + ` as specified in the ` + "`mapping`" + ` configuration parameter.`,
	Config: config.Config{"mapping": ".Metadata.key1:newKey,.Payload.After.foo:newFoo"},
	Have: opencdc.Record{
		Operation: opencdc.OperationCreate,
		Metadata:  map[string]string{"key1": "val1"},
		Payload:   opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}, Before: opencdc.StructuredData{"bar": "baz"}},
	},
	Want: sdk.SingleRecord{
		Operation: opencdc.OperationCreate,
		Metadata:  map[string]string{"newKey": "val1"},
		Payload:   opencdc.Change{After: opencdc.StructuredData{"newFoo": "bar"}, Before: opencdc.StructuredData{"bar": "baz"}},
	},
})
Output: processor transformed record: --- before +++ after @@ -1,16 +1,16 @@ { "position": null, "operation": "create", "metadata": { - "key1": "val1" + "newKey": "val1" }, "key": null, "payload": { "before": { "bar": "baz" }, "after": { - "foo": "bar" + "newFoo": "bar" } } }
func NewRenameProcessor ¶
func NewRenameProcessor(log.CtxLogger) *RenameProcessor
func (*RenameProcessor) Process ¶ added in v0.13.4
func (p *RenameProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*RenameProcessor) Specification ¶ added in v0.13.4
func (p *RenameProcessor) Specification() (sdk.Specification, error)
type SetProcessor ¶ added in v0.13.4
type SetProcessor struct {
	sdk.UnimplementedProcessor
	// contains filtered or unexported fields
}
    Example (AddField) ¶
p := NewSetProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
	Summary: `Add field`,
	Description: `This example adds a new field to the record. The field is
added to ` + "`.Payload.After`" + ` and is set to ` + "`bar`" + `.`,
	Config: config.Config{"field": ".Payload.After.foo", "value": "bar"},
	Have: opencdc.Record{
		Operation: opencdc.OperationSnapshot,
		Key:       opencdc.StructuredData{"my-key": "id"},
	},
	Want: sdk.SingleRecord{
		Key:       opencdc.StructuredData{"my-key": "id"},
		Operation: opencdc.OperationSnapshot,
		Payload:   opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}},
	},
})
Output: processor transformed record: --- before +++ after @@ -1,12 +1,14 @@ { "position": null, "operation": "snapshot", "metadata": null, "key": { "my-key": "id" }, "payload": { "before": null, - "after": null + "after": { + "foo": "bar" + } } }
Example (SetOperation) ¶
p := NewSetProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
	Summary:     "Sets the record operation to `update`",
	Description: "This example sets the `.Operation` field to `update` for all records.",
	Config:      config.Config{"field": ".Operation", "value": "update"},
	Have:        opencdc.Record{Operation: opencdc.OperationCreate},
	Want:        sdk.SingleRecord{Operation: opencdc.OperationUpdate},
})
Output: processor transformed record: --- before +++ after @@ -1,10 +1,10 @@ { "position": null, - "operation": "create", + "operation": "update", "metadata": null, "key": null, "payload": { "before": null, "after": null } }
Example (Template) ¶
p := NewSetProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
	Summary:     `Set field using Go template`,
	Description: "This example sets the `.Payload.After.postgres` field to `true` if the `.Metadata.table` field contains `postgres`.",
	Config:      config.Config{"field": ".Payload.After.postgres", "value": "{{ eq .Metadata.table \"postgres\" }}"},
	Have: opencdc.Record{
		Metadata:  map[string]string{"table": "postgres"},
		Operation: opencdc.OperationSnapshot,
		Payload:   opencdc.Change{After: opencdc.StructuredData{"postgres": "false"}},
	},
	Want: sdk.SingleRecord{
		Metadata:  map[string]string{"table": "postgres"},
		Operation: opencdc.OperationSnapshot,
		Payload:   opencdc.Change{After: opencdc.StructuredData{"postgres": "true"}},
	},
})
Output: processor transformed record: --- before +++ after @@ -1,14 +1,14 @@ { "position": null, "operation": "snapshot", "metadata": { "table": "postgres" }, "key": null, "payload": { "before": null, "after": { - "postgres": "false" + "postgres": "true" } } }
func NewSetProcessor ¶
func NewSetProcessor(log.CtxLogger) *SetProcessor
func (*SetProcessor) Process ¶ added in v0.13.4
func (p *SetProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*SetProcessor) Specification ¶ added in v0.13.4
func (p *SetProcessor) Specification() (sdk.Specification, error)
 Click to show internal directories. 
   Click to hide internal directories.