field

package
v0.15.0-nightly.20250904 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 3, 2025 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConvertProcessor added in v0.13.4

type ConvertProcessor struct {
	sdk.UnimplementedProcessor
	// contains filtered or unexported fields
}
Example (FloatToString)
p := NewConvertProcessor(log.Nop())

exampleutil.RunExample(p, exampleutil.Example{
	Summary:     "Convert `float` to `string`",
	Description: "This example takes the `float` in field `.Key.id` and changes its data type to `string`.",
	Config:      config.Config{"field": ".Key.id", "type": "string"},
	Have: opencdc.Record{
		Operation: opencdc.OperationUpdate,
		Key:       opencdc.StructuredData{"id": 123.345},
		Payload:   opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}},
	},
	Want: sdk.SingleRecord{
		Operation: opencdc.OperationUpdate,
		Key:       opencdc.StructuredData{"id": "123.345"},
		Payload:   opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}},
	},
})
Output:

processor transformed record:
--- before
+++ after
@@ -1,14 +1,14 @@
 {
   "position": null,
   "operation": "update",
   "metadata": null,
   "key": {
-    "id": 123.345
+    "id": "123.345"
   },
   "payload": {
     "before": null,
     "after": {
       "foo": "bar"
     }
   }
 }
Example (IntToBool)
p := NewConvertProcessor(log.Nop())

exampleutil.RunExample(p, exampleutil.Example{
	Summary:     "Convert `int` to `bool`",
	Description: "This example takes the `int` in field `.Payload.After.done` and changes its data type to `bool`.",
	Config:      config.Config{"field": ".Payload.After.done", "type": "bool"},
	Have: opencdc.Record{
		Operation: opencdc.OperationUpdate,
		Key:       opencdc.StructuredData{"id": "123"},
		Payload:   opencdc.Change{After: opencdc.StructuredData{"done": 1}},
	},
	Want: sdk.SingleRecord{
		Operation: opencdc.OperationUpdate,
		Key:       opencdc.StructuredData{"id": "123"},
		Payload:   opencdc.Change{After: opencdc.StructuredData{"done": true}},
	},
})
Output:

processor transformed record:
--- before
+++ after
@@ -1,14 +1,14 @@
 {
   "position": null,
   "operation": "update",
   "metadata": null,
   "key": {
     "id": "123"
   },
   "payload": {
     "before": null,
     "after": {
-      "done": 1
+      "done": true
     }
   }
 }
Example (IntTotime)
p := NewConvertProcessor(log.Nop())

timeObj := time.Date(2024, 1, 2, 12, 34, 56, 123456789, time.UTC)

exampleutil.RunExample(p, exampleutil.Example{
	Summary:     "Convert `int` to `time`",
	Description: "This example takes an `int` in field `.Payload.After.createdAt` and parses it as a unix timestamp into a `time.Time` value.",
	Config:      config.Config{"field": ".Payload.After.createdAt", "type": "time"},
	Have: opencdc.Record{
		Operation: opencdc.OperationCreate,
		Key:       opencdc.StructuredData{"id": 123.345},
		Payload:   opencdc.Change{After: opencdc.StructuredData{"createdAt": timeObj.UnixNano()}},
	},
	Want: sdk.SingleRecord{
		Operation: opencdc.OperationCreate,
		Key:       opencdc.StructuredData{"id": 123.345},
		Payload:   opencdc.Change{After: opencdc.StructuredData{"createdAt": timeObj}},
	},
})
Output:

processor transformed record:
--- before
+++ after
@@ -1,14 +1,14 @@
 {
   "position": null,
   "operation": "create",
   "metadata": null,
   "key": {
     "id": 123.345
   },
   "payload": {
     "before": null,
     "after": {
-      "createdAt": 1704198896123456789
+      "createdAt": "2024-01-02T12:34:56.123456789Z"
     }
   }
 }
Example (StringToInt)
p := NewConvertProcessor(log.Nop())

exampleutil.RunExample(p, exampleutil.Example{
	Summary:     "Convert `string` to `int`",
	Description: "This example takes the string in field `.Key.id` and changes its data type to `int`.",
	Config:      config.Config{"field": ".Key.id", "type": "int"},
	Have: opencdc.Record{
		Operation: opencdc.OperationUpdate,
		Key:       opencdc.StructuredData{"id": "123"},
		Payload:   opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}},
	},
	Want: sdk.SingleRecord{
		Operation: opencdc.OperationUpdate,
		Key:       opencdc.StructuredData{"id": 123},
		Payload:   opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}},
	},
})
Output:

processor transformed record:
--- before
+++ after
@@ -1,14 +1,14 @@
 {
   "position": null,
   "operation": "update",
   "metadata": null,
   "key": {
-    "id": "123"
+    "id": 123
   },
   "payload": {
     "before": null,
     "after": {
       "foo": "bar"
     }
   }
 }

func NewConvertProcessor

func NewConvertProcessor(log.CtxLogger) *ConvertProcessor

func (*ConvertProcessor) Configure added in v0.13.4

func (p *ConvertProcessor) Configure(ctx context.Context, c config.Config) error

func (*ConvertProcessor) Process added in v0.13.4

func (p *ConvertProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord

func (*ConvertProcessor) Specification added in v0.13.4

func (p *ConvertProcessor) Specification() (sdk.Specification, error)

type ExcludeProcessor added in v0.13.4

type ExcludeProcessor struct {
	sdk.UnimplementedProcessor
	// contains filtered or unexported fields
}
Example (MultipleFields)
p := NewExcludeProcessor(log.Nop())

exampleutil.RunExample(p, exampleutil.Example{
	Summary: `Exclude multiple fields`,
	Description: `It's possible to exclude multiple fields by providing a
comma-separated list of fields. In this example, we exclude ` + "`.Metadata`" + `,
` + "`.Payload.After.foo`" + ` and ` + "`.Key.key1`" + `.`,
	Config: config.Config{"fields": ".Metadata,.Payload.After.foo,.Key.key1"},
	Have: opencdc.Record{
		Operation: opencdc.OperationCreate,
		Metadata:  map[string]string{"source": "s3"},
		Key:       opencdc.StructuredData{"key1": "val1", "key2": "val2"},
		Payload:   opencdc.Change{After: opencdc.StructuredData{"foo": "bar", "foobar": "baz"}, Before: opencdc.StructuredData{"bar": "baz"}},
	},
	Want: sdk.SingleRecord{
		Operation: opencdc.OperationCreate,
		Metadata:  map[string]string{},
		Key:       opencdc.StructuredData{"key2": "val2"},
		Payload:   opencdc.Change{After: opencdc.StructuredData{"foobar": "baz"}, Before: opencdc.StructuredData{"bar": "baz"}},
	},
})
Output:

processor transformed record:
--- before
+++ after
@@ -1,20 +1,16 @@
 {
   "position": null,
   "operation": "create",
-  "metadata": {
-    "source": "s3"
-  },
+  "metadata": {},
-  "key": {
-    "key1": "val1",
+  "key": {
     "key2": "val2"
   },
   "payload": {
     "before": {
       "bar": "baz"
     },
-    "after": {
-      "foo": "bar",
+    "after": {
       "foobar": "baz"
     }
   }
 }
Example (OneField)
p := NewExcludeProcessor(log.Nop())

exampleutil.RunExample(p, exampleutil.Example{
	Summary:     "Exclude all fields in payload",
	Description: "Excluding all fields in `.Payload` results in an empty payload.",
	Config:      config.Config{"fields": ".Payload"},
	Have: opencdc.Record{
		Operation: opencdc.OperationCreate,
		Metadata:  map[string]string{"key1": "val1"},
		Payload:   opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}, Before: opencdc.StructuredData{"bar": "baz"}},
	},
	Want: sdk.SingleRecord{
		Operation: opencdc.OperationCreate,
		Metadata:  map[string]string{"key1": "val1"},
	},
})
Output:

processor transformed record:
--- before
+++ after
@@ -1,16 +1,12 @@
 {
   "position": null,
   "operation": "create",
   "metadata": {
     "key1": "val1"
   },
   "key": null,
   "payload": {
-    "before": {
-      "bar": "baz"
-    },
+    "before": null,
-    "after": {
-      "foo": "bar"
-    }
+    "after": null
   }
 }

func NewExcludeProcessor

func NewExcludeProcessor(log.CtxLogger) *ExcludeProcessor

func (*ExcludeProcessor) Configure added in v0.13.4

func (p *ExcludeProcessor) Configure(ctx context.Context, c config.Config) error

func (*ExcludeProcessor) Process added in v0.13.4

func (p *ExcludeProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord

func (*ExcludeProcessor) Specification added in v0.13.4

func (p *ExcludeProcessor) Specification() (sdk.Specification, error)

type RenameProcessor added in v0.13.4

type RenameProcessor struct {
	sdk.UnimplementedProcessor
	// contains filtered or unexported fields
}
Example (Rename1)
p := NewRenameProcessor(log.Nop())

exampleutil.RunExample(p, exampleutil.Example{
	Summary: `Rename multiple fields`,
	Description: `This example renames the fields in ` + "`.Metadata`" + ` and
` + "`.Payload.After`" + ` as specified in the ` + "`mapping`" + ` configuration parameter.`,
	Config: config.Config{"mapping": ".Metadata.key1:newKey,.Payload.After.foo:newFoo"},
	Have: opencdc.Record{
		Operation: opencdc.OperationCreate,
		Metadata:  map[string]string{"key1": "val1"},
		Payload:   opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}, Before: opencdc.StructuredData{"bar": "baz"}},
	},
	Want: sdk.SingleRecord{
		Operation: opencdc.OperationCreate,
		Metadata:  map[string]string{"newKey": "val1"},
		Payload:   opencdc.Change{After: opencdc.StructuredData{"newFoo": "bar"}, Before: opencdc.StructuredData{"bar": "baz"}},
	},
})
Output:

processor transformed record:
--- before
+++ after
@@ -1,16 +1,16 @@
 {
   "position": null,
   "operation": "create",
   "metadata": {
-    "key1": "val1"
+    "newKey": "val1"
   },
   "key": null,
   "payload": {
     "before": {
       "bar": "baz"
     },
     "after": {
-      "foo": "bar"
+      "newFoo": "bar"
     }
   }
 }

func NewRenameProcessor

func NewRenameProcessor(log.CtxLogger) *RenameProcessor

func (*RenameProcessor) Configure added in v0.13.4

func (p *RenameProcessor) Configure(ctx context.Context, c config.Config) error

func (*RenameProcessor) Process added in v0.13.4

func (p *RenameProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord

func (*RenameProcessor) Specification added in v0.13.4

func (p *RenameProcessor) Specification() (sdk.Specification, error)

type SetProcessor added in v0.13.4

type SetProcessor struct {
	sdk.UnimplementedProcessor
	// contains filtered or unexported fields
}
Example (AddField)
p := NewSetProcessor(log.Nop())

exampleutil.RunExample(p, exampleutil.Example{
	Summary: `Add field`,
	Description: `This example adds a new field to the record. The field is
added to ` + "`.Payload.After`" + ` and is set to ` + "`bar`" + `.`,
	Config: config.Config{"field": ".Payload.After.foo", "value": "bar"},
	Have: opencdc.Record{
		Operation: opencdc.OperationSnapshot,
		Key:       opencdc.StructuredData{"my-key": "id"},
	},
	Want: sdk.SingleRecord{
		Key:       opencdc.StructuredData{"my-key": "id"},
		Operation: opencdc.OperationSnapshot,
		Payload:   opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}},
	},
})
Output:

processor transformed record:
--- before
+++ after
@@ -1,12 +1,14 @@
 {
   "position": null,
   "operation": "snapshot",
   "metadata": null,
   "key": {
     "my-key": "id"
   },
   "payload": {
     "before": null,
-    "after": null
+    "after": {
+      "foo": "bar"
+    }
   }
 }
Example (SetOperation)
p := NewSetProcessor(log.Nop())

exampleutil.RunExample(p, exampleutil.Example{
	Summary:     "Sets the record operation to `update`",
	Description: "This example sets the `.Operation` field to `update` for all records.",
	Config:      config.Config{"field": ".Operation", "value": "update"},
	Have:        opencdc.Record{Operation: opencdc.OperationCreate},
	Want:        sdk.SingleRecord{Operation: opencdc.OperationUpdate},
})
Output:

processor transformed record:
--- before
+++ after
@@ -1,10 +1,10 @@
 {
   "position": null,
-  "operation": "create",
+  "operation": "update",
   "metadata": null,
   "key": null,
   "payload": {
     "before": null,
     "after": null
   }
 }
Example (Template)
p := NewSetProcessor(log.Nop())

exampleutil.RunExample(p, exampleutil.Example{
	Summary:     `Set field using Go template`,
	Description: "This example sets the `.Payload.After.postgres` field to `true` if the `.Metadata.table` field contains `postgres`.",
	Config:      config.Config{"field": ".Payload.After.postgres", "value": "{{ eq .Metadata.table \"postgres\" }}"},
	Have: opencdc.Record{
		Metadata:  map[string]string{"table": "postgres"},
		Operation: opencdc.OperationSnapshot,
		Payload:   opencdc.Change{After: opencdc.StructuredData{"postgres": "false"}},
	},
	Want: sdk.SingleRecord{
		Metadata:  map[string]string{"table": "postgres"},
		Operation: opencdc.OperationSnapshot,
		Payload:   opencdc.Change{After: opencdc.StructuredData{"postgres": "true"}},
	},
})
Output:

processor transformed record:
--- before
+++ after
@@ -1,14 +1,14 @@
 {
   "position": null,
   "operation": "snapshot",
   "metadata": {
     "table": "postgres"
   },
   "key": null,
   "payload": {
     "before": null,
     "after": {
-      "postgres": "false"
+      "postgres": "true"
     }
   }
 }

func NewSetProcessor

func NewSetProcessor(log.CtxLogger) *SetProcessor

func (*SetProcessor) Configure added in v0.13.4

func (p *SetProcessor) Configure(ctx context.Context, c config.Config) error

func (*SetProcessor) Process added in v0.13.4

func (p *SetProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord

func (*SetProcessor) Specification added in v0.13.4

func (p *SetProcessor) Specification() (sdk.Specification, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL