impl

package
v0.15.0-nightly.20250711 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 9, 2025 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewErrorProcessor added in v0.10.1

func NewErrorProcessor(log.CtxLogger) sdk.Processor

Types

type CloneProcessor added in v0.14.0

type CloneProcessor struct {
	sdk.UnimplementedProcessor
	// contains filtered or unexported fields
}
Example (Simple)
p := NewCloneProcessor(log.Nop())

exampleutil.RunExample(p, exampleutil.Example{
	Summary:     "Clone record into multiple records",
	Description: "This example takes a record and clones it once, producing 2 records, each containing the same data, except for the metadata field `clone.index`.",
	Config:      config.Config{"count": "1"},
	Have: opencdc.Record{
		Operation: opencdc.OperationCreate,
		Metadata:  map[string]string{"foo": "bar"},
		Key:       opencdc.StructuredData{"id": 123},
		Payload: opencdc.Change{After: opencdc.StructuredData{
			"name": "Alice",
			"age":  30,
		}},
	},
	Want: sdk.MultiRecord{
		opencdc.Record{
			Operation: opencdc.OperationCreate,
			Metadata: map[string]string{
				"foo":         "bar",
				"clone.index": "0",
			},
			Key: opencdc.StructuredData{"id": 123},
			Payload: opencdc.Change{After: opencdc.StructuredData{
				"name": "Alice",
				"age":  30,
			}},
		},
		opencdc.Record{
			Operation: opencdc.OperationCreate,
			Metadata: map[string]string{
				"foo":         "bar",
				"clone.index": "1",
			},
			Key: opencdc.StructuredData{"id": 123},
			Payload: opencdc.Change{After: opencdc.StructuredData{
				"name": "Alice",
				"age":  30,
			}},
		},
	},
})
Output:

processor transformed record:
--- before
+++ after
@@ -1,17 +1,38 @@
-{
+[
+  {
-  "position": null,
+    "position": null,
-  "operation": "create",
+    "operation": "create",
-  "metadata": {
+    "metadata": {
-    "foo": "bar"
+      "clone.index": "0",
+      "foo": "bar"
+    },
+    "key": {
+      "id": 123
+    },
+    "payload": {
+      "before": null,
+      "after": {
+        "age": 30,
+        "name": "Alice"
+      }
+    }
+  },
+  {
+    "position": null,
+    "operation": "create",
+    "metadata": {
+      "clone.index": "1",
+      "foo": "bar"
-  },
+    },
-  "key": {
+    "key": {
-    "id": 123
+      "id": 123
-  },
+    },
-  "payload": {
+    "payload": {
-    "before": null,
+      "before": null,
-    "after": {
+      "after": {
-      "age": 30,
+        "age": 30,
-      "name": "Alice"
+        "name": "Alice"
-    }
+      }
-  }
+    }
-}
+  }
+]

func NewCloneProcessor added in v0.14.0

func NewCloneProcessor(log.CtxLogger) *CloneProcessor

func (*CloneProcessor) Configure added in v0.14.0

func (p *CloneProcessor) Configure(ctx context.Context, c config.Config) error

func (*CloneProcessor) Process added in v0.14.0

func (p *CloneProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord

func (*CloneProcessor) Specification added in v0.14.0

func (p *CloneProcessor) Specification() (sdk.Specification, error)

type ErrorProcessor added in v0.13.4

type ErrorProcessor struct {
	sdk.UnimplementedProcessor
	// contains filtered or unexported fields
}
Example
p := NewErrorProcessor(log.Nop())

exampleutil.RunExample(p, exampleutil.Example{
	Summary: `Error record with custom error message`,
	Description: `This example shows how to configure the error processor to
return a custom error message for a record using a Go template.`,
	Config: config.Config{
		"message": "custom error message with data from record: {{.Metadata.foo}}",
	},
	Have: opencdc.Record{
		Operation: opencdc.OperationCreate,
		Metadata:  map[string]string{"foo": "bar"},
		Payload:   opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}, Before: opencdc.StructuredData{"bar": "baz"}},
	},
	Want: sdk.ErrorRecord{
		Error: cerrors.New("custom error message with data from record: bar"),
	},
})
Output:

processor returned error: custom error message with data from record: bar

func (*ErrorProcessor) Configure added in v0.13.4

func (p *ErrorProcessor) Configure(ctx context.Context, cfg config.Config) error

func (*ErrorProcessor) MiddlewareOptions added in v0.13.4

func (*ErrorProcessor) MiddlewareOptions() []sdk.ProcessorMiddlewareOption

func (*ErrorProcessor) Process added in v0.13.4

func (p *ErrorProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord

func (*ErrorProcessor) Specification added in v0.13.4

func (p *ErrorProcessor) Specification() (sdk.Specification, error)

type FilterProcessor added in v0.13.4

type FilterProcessor struct {
	sdk.UnimplementedProcessor
}
Example
p := NewFilterProcessor(log.Nop())

exampleutil.RunExample(p, exampleutil.Example{
	Summary: `Filter out the record`,
	Config:  config.Config{},
	Have: opencdc.Record{
		Operation: opencdc.OperationCreate,
		Metadata:  map[string]string{"key1": "val1"},
		Payload:   opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}, Before: opencdc.StructuredData{"bar": "baz"}},
	},
	Want: sdk.FilterRecord{},
})
Output:

processor filtered record out

func NewFilterProcessor

func NewFilterProcessor(log.CtxLogger) *FilterProcessor

func (*FilterProcessor) MiddlewareOptions added in v0.13.4

func (*FilterProcessor) MiddlewareOptions() []sdk.ProcessorMiddlewareOption

func (*FilterProcessor) Process added in v0.13.4

func (p *FilterProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord

func (*FilterProcessor) Specification added in v0.13.4

func (p *FilterProcessor) Specification() (sdk.Specification, error)

type SplitProcessor added in v0.14.0

type SplitProcessor struct {
	sdk.UnimplementedProcessor
	// contains filtered or unexported fields
}
Example (Simple)
p := NewSplitProcessor(log.Nop())

exampleutil.RunExample(p, exampleutil.Example{
	Summary:     "Split array into multiple records",
	Description: "This example takes the array in field `.Payload.After.users` and splits it into separate records, each containing one element.",
	Config:      config.Config{"field": ".Payload.After.users"},
	Have: opencdc.Record{
		Operation: opencdc.OperationUpdate,
		Key:       opencdc.StructuredData{"id": 123},
		Payload: opencdc.Change{After: opencdc.StructuredData{
			"users": []map[string]any{
				{"name": "Alice", "age": 30},
				{"name": "Bob", "age": 25},
				{"name": "Charlie", "age": 35},
			},
		}},
	},
	Want: sdk.MultiRecord{
		opencdc.Record{
			Operation: opencdc.OperationUpdate,
			Metadata:  map[string]string{"split.index": "0"},
			Key:       opencdc.StructuredData{"id": 123},
			Payload: opencdc.Change{After: opencdc.StructuredData{
				"users": map[string]any{"name": "Alice", "age": 30},
			}},
		},
		opencdc.Record{
			Operation: opencdc.OperationUpdate,
			Metadata:  map[string]string{"split.index": "1"},
			Key:       opencdc.StructuredData{"id": 123},
			Payload: opencdc.Change{After: opencdc.StructuredData{
				"users": map[string]any{"name": "Bob", "age": 25},
			}},
		},
		opencdc.Record{
			Operation: opencdc.OperationUpdate,
			Metadata:  map[string]string{"split.index": "2"},
			Key:       opencdc.StructuredData{"id": 123},
			Payload: opencdc.Change{After: opencdc.StructuredData{
				"users": map[string]any{"name": "Charlie", "age": 35},
			}},
		},
	},
})
Output:

processor transformed record:
--- before
+++ after
@@ -1,27 +1,59 @@
-{
+[
+  {
-  "position": null,
+    "position": null,
-  "operation": "update",
+    "operation": "update",
-  "metadata": null,
+    "metadata": {
+      "split.index": "0"
+    },
-  "key": {
+    "key": {
-    "id": 123
+      "id": 123
-  },
+    },
-  "payload": {
-    "before": null,
-    "after": {
-      "users": [
-        {
-          "age": 30,
-          "name": "Alice"
-        },
-        {
-          "age": 25,
+    "payload": {
+      "before": null,
+      "after": {
+        "users": {
+          "age": 30,
+          "name": "Alice"
+        }
+      }
+    }
+  },
+  {
+    "position": null,
+    "operation": "update",
+    "metadata": {
+      "split.index": "1"
+    },
+    "key": {
+      "id": 123
+    },
+    "payload": {
+      "before": null,
+      "after": {
+        "users": {
+          "age": 25,
+          "name": "Bob"
+        }
+      }
+    }
+  },
+  {
+    "position": null,
+    "operation": "update",
+    "metadata": {
+      "split.index": "2"
+    },
+    "key": {
+      "id": 123
+    },
+    "payload": {
+      "before": null,
-          "name": "Bob"
+      "after": {
-        },
-        {
+        "users": {
           "age": 35,
           "name": "Charlie"
         }
-      ]
+      }
     }
   }
-}
+]

func NewSplitProcessor added in v0.14.0

func NewSplitProcessor(log.CtxLogger) *SplitProcessor

func (*SplitProcessor) Configure added in v0.14.0

func (p *SplitProcessor) Configure(ctx context.Context, c config.Config) error

func (*SplitProcessor) Process added in v0.14.0

func (p *SplitProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord

func (*SplitProcessor) Specification added in v0.14.0

func (p *SplitProcessor) Specification() (sdk.Specification, error)

Directories

Path Synopsis
Package avro is a generated GoMock package.
Package avro is a generated GoMock package.
mock
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL