json

package
v0.15.0-nightly.20251009 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 8, 2025 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DecodeProcessor added in v0.13.4

type DecodeProcessor struct {
	sdk.UnimplementedProcessor
	// contains filtered or unexported fields
}
Example (RawKey)
p := NewDecodeProcessor(log.Nop())

exampleutil.RunExample(p, exampleutil.Example{
	Summary: `Decode record key as JSON`,
	Description: `This example takes a record containing a raw JSON string in
` + "`.Key`" + ` and converts it into structured data.`,
	Config: config.Config{"field": ".Key"},
	Have: opencdc.Record{
		Operation: opencdc.OperationCreate,
		Key:       opencdc.RawData(`{"after":{"data":4,"id":3}}`),
	},
	Want: sdk.SingleRecord{
		Operation: opencdc.OperationCreate,
		Key: opencdc.StructuredData{
			"after": map[string]interface{}{"data": float64(4), "id": float64(3)},
		},
	},
})
Output:

processor transformed record:
--- before
+++ after
@@ -1,10 +1,15 @@
 {
   "position": null,
   "operation": "create",
   "metadata": null,
-  "key": "{\"after\":{\"data\":4,\"id\":3}}",
+  "key": {
+    "after": {
+      "data": 4,
+      "id": 3
+    }
+  },
   "payload": {
     "before": null,
     "after": null
   }
 }
Example (RawPayloadField)
p := NewDecodeProcessor(log.Nop())

exampleutil.RunExample(p, exampleutil.Example{
	Summary: "Decode nested field as JSON",
	Description: `This example takes a record containing a raw JSON string in
` + "`.Payload.Before.foo`" + ` and converts it into a map.`,
	Config: config.Config{"field": ".Payload.Before.foo"},
	Have: opencdc.Record{
		Operation: opencdc.OperationSnapshot,
		Payload: opencdc.Change{
			Before: opencdc.StructuredData{
				"foo": `{"before":{"data":4,"id":3},"baz":"bar"}`,
			},
		},
	},
	Want: sdk.SingleRecord{
		Operation: opencdc.OperationSnapshot,
		Payload: opencdc.Change{
			Before: opencdc.StructuredData{
				"foo": map[string]any{
					"before": map[string]any{"data": float64(4), "id": float64(3)},
					"baz":    "bar",
				},
			},
		},
	},
})
Output:

processor transformed record:
--- before
+++ after
@@ -1,12 +1,18 @@
 {
   "position": null,
   "operation": "snapshot",
   "metadata": null,
   "key": null,
   "payload": {
     "before": {
-      "foo": "{\"before\":{\"data\":4,\"id\":3},\"baz\":\"bar\"}"
+      "foo": {
+        "baz": "bar",
+        "before": {
+          "data": 4,
+          "id": 3
+        }
+      }
     },
     "after": null
   }
 }

func NewDecodeProcessor

func NewDecodeProcessor(log.CtxLogger) *DecodeProcessor

func (*DecodeProcessor) Configure added in v0.13.4

func (p *DecodeProcessor) Configure(ctx context.Context, c config.Config) error

func (*DecodeProcessor) Process added in v0.13.4

func (p *DecodeProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord

func (*DecodeProcessor) Specification added in v0.13.4

func (p *DecodeProcessor) Specification() (sdk.Specification, error)

type EncodeProcessor added in v0.13.4

type EncodeProcessor struct {
	sdk.UnimplementedProcessor
	// contains filtered or unexported fields
}
Example (MapToJSON)
p := NewEncodeProcessor(log.Nop())

exampleutil.RunExample(p, exampleutil.Example{
	Summary: "Encode nested field to JSON",
	Description: `This example takes a record containing a map in
` + "`.Payload.Before.foo`" + ` and converts it into a raw JSON string.`,
	Config: config.Config{"field": ".Payload.Before.foo"},
	Have: opencdc.Record{
		Operation: opencdc.OperationSnapshot,
		Payload: opencdc.Change{
			Before: opencdc.StructuredData{
				"foo": map[string]any{
					"before": map[string]any{"data": float64(4), "id": float64(3)},
					"baz":    "bar",
				},
			},
		},
	},
	Want: sdk.SingleRecord{
		Operation: opencdc.OperationSnapshot,
		Payload: opencdc.Change{
			Before: opencdc.StructuredData{
				"foo": `{"baz":"bar","before":{"data":4,"id":3}}`,
			},
		},
	},
})
Output:

processor transformed record:
--- before
+++ after
@@ -1,18 +1,12 @@
 {
   "position": null,
   "operation": "snapshot",
   "metadata": null,
   "key": null,
   "payload": {
     "before": {
-      "foo": {
-        "baz": "bar",
-        "before": {
-          "data": 4,
-          "id": 3
-        }
-      }
+      "foo": "{\"baz\":\"bar\",\"before\":{\"data\":4,\"id\":3}}"
     },
     "after": null
   }
 }
Example (StructuredKey)
p := NewEncodeProcessor(log.Nop())

exampleutil.RunExample(p, exampleutil.Example{
	Summary: "Encode record key to JSON",
	Description: `This example takes a record containing structured data in
` + "`.Key`" + ` and converts it into a raw JSON string.`,
	Config: config.Config{"field": ".Key"},
	Have: opencdc.Record{
		Operation: opencdc.OperationCreate,
		Key: opencdc.StructuredData{
			"tables": []string{"table1,table2"},
		},
	},
	Want: sdk.SingleRecord{
		Operation: opencdc.OperationCreate,
		Key:       opencdc.RawData(`{"tables":["table1,table2"]}`),
	},
})
Output:

processor transformed record:
--- before
+++ after
@@ -1,14 +1,10 @@
 {
   "position": null,
   "operation": "create",
   "metadata": null,
-  "key": {
-    "tables": [
-      "table1,table2"
-    ]
-  },
+  "key": "{\"tables\":[\"table1,table2\"]}",
   "payload": {
     "before": null,
     "after": null
   }
 }

func NewEncodeProcessor

func NewEncodeProcessor(log.CtxLogger) *EncodeProcessor

func (*EncodeProcessor) Configure added in v0.13.4

func (p *EncodeProcessor) Configure(ctx context.Context, c config.Config) error

func (*EncodeProcessor) Process added in v0.13.4

func (p *EncodeProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord

func (*EncodeProcessor) Specification added in v0.13.4

func (p *EncodeProcessor) Specification() (sdk.Specification, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL