unwrap

package
v0.15.0-nightly.20250912 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 10, 2025 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DebeziumProcessor added in v0.13.4

type DebeziumProcessor struct {
	sdk.UnimplementedProcessor
	// contains filtered or unexported fields
}
Example
p := NewDebeziumProcessor(log.Nop())

exampleutil.RunExample(p, exampleutil.Example{
	Summary: "Unwrap a Debezium record",
	Description: `This example how to unwrap a Debezium record from a field nested in a record's
` + "`.Payload.After`" + ` field. It additionally shows how the key is unwrapped, and the metadata merged.`,
	Config: config.Config{
		"field": ".Payload.After.nested",
	},
	Have: opencdc.Record{
		Position:  opencdc.Position("test-position"),
		Operation: opencdc.OperationCreate,
		Key:       opencdc.RawData(`{"payload":"27"}`),
		Metadata:  opencdc.Metadata{"metadata-key": "metadata-value"},
		Payload: opencdc.Change{
			After: opencdc.StructuredData{
				"nested": `{
  "payload": {
    "after": {
      "description": "test1",
      "id": 27
    },
    "before": null,
    "op": "c",
    "source": {
      "opencdc.readAt": "1674061777225877000",
      "opencdc.version": "v1"
    },
    "transaction": null,
    "ts_ms": 1674061777225
  },
  "schema": {}
}`,
			},
		},
	},
	Want: sdk.SingleRecord{
		Position:  opencdc.Position("test-position"),
		Key:       opencdc.RawData("27"),
		Operation: opencdc.OperationCreate,
		Metadata: opencdc.Metadata{
			"metadata-key":    "metadata-value",
			"opencdc.readAt":  "1674061777225877000",
			"opencdc.version": "v1",
		},
		Payload: opencdc.Change{
			After: opencdc.StructuredData{
				"description": "test1",
				"id":          float64(27),
			},
		},
	},
})
Output:

processor transformed record:
--- before
+++ after
@@ -1,14 +1,17 @@
 {
   "position": "dGVzdC1wb3NpdGlvbg==",
   "operation": "create",
   "metadata": {
-    "metadata-key": "metadata-value"
+    "metadata-key": "metadata-value",
-  },
-  "key": "{\"payload\":\"27\"}",
-  "payload": {
-    "before": null,
-    "after": {
-      "nested": "{\n  \"payload\": {\n    \"after\": {\n      \"description\": \"test1\",\n      \"id\": 27\n    },\n    \"before\": null,\n    \"op\": \"c\",\n    \"source\": {\n      \"opencdc.readAt\": \"1674061777225877000\",\n      \"opencdc.version\": \"v1\"\n    },\n    \"transaction\": null,\n    \"ts_ms\": 1674061777225\n  },\n  \"schema\": {}\n}"
+    "opencdc.readAt": "1674061777225877000",
+    "opencdc.version": "v1"
+  },
+  "key": "27",
+  "payload": {
+    "before": null,
+    "after": {
+      "description": "test1",
+      "id": 27
     }
   }
 }

func NewDebeziumProcessor

func NewDebeziumProcessor(logger log.CtxLogger) *DebeziumProcessor

func (*DebeziumProcessor) Configure added in v0.13.4

func (d *DebeziumProcessor) Configure(ctx context.Context, c config.Config) error

func (*DebeziumProcessor) Process added in v0.13.4

func (d *DebeziumProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord

func (*DebeziumProcessor) Specification added in v0.13.4

func (d *DebeziumProcessor) Specification() (sdk.Specification, error)

type KafkaConnectProcessor added in v0.13.4

type KafkaConnectProcessor struct {
	sdk.UnimplementedProcessor
	// contains filtered or unexported fields
}
Example
p := NewKafkaConnectProcessor(log.Nop())

exampleutil.RunExample(p, exampleutil.Example{
	Summary: "Unwrap a Kafka Connect record",
	Description: `This example shows how to unwrap a Kafka Connect record.

The Kafka Connect record is serialized as a JSON string in the ` + "`.Payload.After`" + ` field (raw data).
The Kafka Connect record's payload will replace the [OpenCDC record](https://conduit.io/docs/using/opencdc-record)'s payload.

We also see how the key is unwrapped too. In this case, the key comes in as structured data.`,
	Config: config.Config{},
	Have: opencdc.Record{
		Position:  opencdc.Position("test position"),
		Operation: opencdc.OperationCreate,
		Metadata: opencdc.Metadata{
			"metadata-key": "metadata-value",
		},
		Key: opencdc.StructuredData{
			"payload": map[string]interface{}{
				"id": 27,
			},
			"schema": map[string]interface{}{},
		},
		Payload: opencdc.Change{
			After: opencdc.RawData(`{
"payload": {
  "description": "test2"
},
"schema": {}
}`),
		},
	},
	Want: sdk.SingleRecord{
		Position:  opencdc.Position("test position"),
		Operation: opencdc.OperationCreate,
		Metadata: opencdc.Metadata{
			"metadata-key": "metadata-value",
		},
		Key: opencdc.StructuredData{"id": 27},
		Payload: opencdc.Change{
			After: opencdc.StructuredData{
				"description": "test2",
			},
		},
	},
})
Output:

processor transformed record:
--- before
+++ after
@@ -1,17 +1,16 @@
 {
   "position": "dGVzdCBwb3NpdGlvbg==",
   "operation": "create",
   "metadata": {
     "metadata-key": "metadata-value"
   },
   "key": {
-    "payload": {
-      "id": 27
+    "id": 27
-    },
+  },
-    "schema": {}
-  },
-  "payload": {
+  "payload": {
     "before": null,
-    "after": "{\n\"payload\": {\n  \"description\": \"test2\"\n},\n\"schema\": {}\n}"
+    "after": {
+      "description": "test2"
+    }
   }
 }

func NewKafkaConnectProcessor

func NewKafkaConnectProcessor(log.CtxLogger) *KafkaConnectProcessor

func (*KafkaConnectProcessor) Configure added in v0.13.4

func (u *KafkaConnectProcessor) Configure(ctx context.Context, c config.Config) error

func (*KafkaConnectProcessor) Process added in v0.13.4

func (*KafkaConnectProcessor) Specification added in v0.13.4

func (u *KafkaConnectProcessor) Specification() (sdk.Specification, error)

type OpenCDCProcessor added in v0.13.4

type OpenCDCProcessor struct {
	sdk.UnimplementedProcessor
	// contains filtered or unexported fields
}
Example
p := NewOpenCDCProcessor(log.Nop())

exampleutil.RunExample(p, exampleutil.Example{
	Summary: "Unwrap an [OpenCDC record](https://conduit.io/docs/using/opencdc-record)",
	Description: "In this example we use the `unwrap.opencdc` processor to unwrap the [OpenCDC record](https://conduit.io/docs/using/opencdc-record) found in the " +
		"record's `.Payload.After` field.",
	Config: config.Config{},
	Have: opencdc.Record{
		Position:  opencdc.Position("wrapping position"),
		Key:       opencdc.RawData("wrapping key"),
		Operation: opencdc.OperationCreate,
		Metadata:  map[string]string{},
		Payload: opencdc.Change{
			Before: nil,
			After: opencdc.StructuredData{
				"position":  opencdc.Position("test-position"),
				"operation": opencdc.OperationUpdate,
				"key": map[string]interface{}{
					"id": "test-key",
				},
				"metadata": opencdc.Metadata{},
				"payload": opencdc.Change{
					After: opencdc.StructuredData{
						"msg":       "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d",
						"sensor_id": 1250383582,
						"triggered": false,
					},
				},
			},
		},
	},
	Want: sdk.SingleRecord{
		Position:  opencdc.Position("wrapping position"),
		Operation: opencdc.OperationUpdate,
		Key: opencdc.StructuredData{
			"id": "test-key",
		},
		Metadata: opencdc.Metadata{},
		Payload: opencdc.Change{
			After: opencdc.StructuredData{
				"msg":       "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d",
				"sensor_id": 1250383582,
				"triggered": false,
			},
		},
	},
})
Output:

processor transformed record:
--- before
+++ after
@@ -1,25 +1,16 @@
 {
   "position": "d3JhcHBpbmcgcG9zaXRpb24=",
-  "operation": "create",
+  "operation": "update",
   "metadata": {},
-  "key": "wrapping key",
+  "key": {
+    "id": "test-key"
+  },
   "payload": {
     "before": null,
     "after": {
-      "key": {
-        "id": "test-key"
-      },
-      "metadata": {},
-      "operation": "update",
-      "payload": {
-        "before": null,
-        "after": {
-          "msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d",
-          "sensor_id": 1250383582,
-          "triggered": false
-        }
-      },
+      "msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d",
+      "sensor_id": 1250383582,
-      "position": "dGVzdC1wb3NpdGlvbg=="
+      "triggered": false
     }
   }
 }

func NewOpenCDCProcessor

func NewOpenCDCProcessor(logger log.CtxLogger) *OpenCDCProcessor

func (*OpenCDCProcessor) Configure added in v0.13.4

func (u *OpenCDCProcessor) Configure(ctx context.Context, c config.Config) error

func (*OpenCDCProcessor) Process added in v0.13.4

func (u *OpenCDCProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord

func (*OpenCDCProcessor) Specification added in v0.13.4

func (u *OpenCDCProcessor) Specification() (sdk.Specification, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL