 Documentation
      ¶
      Documentation
      ¶
    
    
  
    
  
    Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DebeziumProcessor ¶ added in v0.13.4
type DebeziumProcessor struct {
	sdk.UnimplementedProcessor
	// contains filtered or unexported fields
}
    Example ¶
p := NewDebeziumProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
	Summary: "Unwrap a Debezium record",
	Description: `This example how to unwrap a Debezium record from a field nested in a record's
` + "`.Payload.After`" + ` field. It additionally shows how the key is unwrapped, and the metadata merged.`,
	Config: config.Config{
		"field": ".Payload.After.nested",
	},
	Have: opencdc.Record{
		Position:  opencdc.Position("test-position"),
		Operation: opencdc.OperationCreate,
		Key:       opencdc.RawData(`{"payload":"27"}`),
		Metadata:  opencdc.Metadata{"metadata-key": "metadata-value"},
		Payload: opencdc.Change{
			After: opencdc.StructuredData{
				"nested": `{
  "payload": {
    "after": {
      "description": "test1",
      "id": 27
    },
    "before": null,
    "op": "c",
    "source": {
      "opencdc.readAt": "1674061777225877000",
      "opencdc.version": "v1"
    },
    "transaction": null,
    "ts_ms": 1674061777225
  },
  "schema": {}
}`,
			},
		},
	},
	Want: sdk.SingleRecord{
		Position:  opencdc.Position("test-position"),
		Key:       opencdc.RawData("27"),
		Operation: opencdc.OperationCreate,
		Metadata: opencdc.Metadata{
			"metadata-key":    "metadata-value",
			"opencdc.readAt":  "1674061777225877000",
			"opencdc.version": "v1",
		},
		Payload: opencdc.Change{
			After: opencdc.StructuredData{
				"description": "test1",
				"id":          float64(27),
			},
		},
	},
})
Output: processor transformed record: --- before +++ after @@ -1,14 +1,17 @@ { "position": "dGVzdC1wb3NpdGlvbg==", "operation": "create", "metadata": { - "metadata-key": "metadata-value" + "metadata-key": "metadata-value", - }, - "key": "{\"payload\":\"27\"}", - "payload": { - "before": null, - "after": { - "nested": "{\n \"payload\": {\n \"after\": {\n \"description\": \"test1\",\n \"id\": 27\n },\n \"before\": null,\n \"op\": \"c\",\n \"source\": {\n \"opencdc.readAt\": \"1674061777225877000\",\n \"opencdc.version\": \"v1\"\n },\n \"transaction\": null,\n \"ts_ms\": 1674061777225\n },\n \"schema\": {}\n}" + "opencdc.readAt": "1674061777225877000", + "opencdc.version": "v1" + }, + "key": "27", + "payload": { + "before": null, + "after": { + "description": "test1", + "id": 27 } } }
func NewDebeziumProcessor ¶
func NewDebeziumProcessor(logger log.CtxLogger) *DebeziumProcessor
func (*DebeziumProcessor) Process ¶ added in v0.13.4
func (d *DebeziumProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*DebeziumProcessor) Specification ¶ added in v0.13.4
func (d *DebeziumProcessor) Specification() (sdk.Specification, error)
type KafkaConnectProcessor ¶ added in v0.13.4
type KafkaConnectProcessor struct {
	sdk.UnimplementedProcessor
	// contains filtered or unexported fields
}
    Example ¶
p := NewKafkaConnectProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
	Summary: "Unwrap a Kafka Connect record",
	Description: `This example shows how to unwrap a Kafka Connect record.
The Kafka Connect record is serialized as a JSON string in the ` + "`.Payload.After`" + ` field (raw data).
The Kafka Connect record's payload will replace the [OpenCDC record](https://conduit.io/docs/using/opencdc-record)'s payload.
We also see how the key is unwrapped too. In this case, the key comes in as structured data.`,
	Config: config.Config{},
	Have: opencdc.Record{
		Position:  opencdc.Position("test position"),
		Operation: opencdc.OperationCreate,
		Metadata: opencdc.Metadata{
			"metadata-key": "metadata-value",
		},
		Key: opencdc.StructuredData{
			"payload": map[string]interface{}{
				"id": 27,
			},
			"schema": map[string]interface{}{},
		},
		Payload: opencdc.Change{
			After: opencdc.RawData(`{
"payload": {
  "description": "test2"
},
"schema": {}
}`),
		},
	},
	Want: sdk.SingleRecord{
		Position:  opencdc.Position("test position"),
		Operation: opencdc.OperationCreate,
		Metadata: opencdc.Metadata{
			"metadata-key": "metadata-value",
		},
		Key: opencdc.StructuredData{"id": 27},
		Payload: opencdc.Change{
			After: opencdc.StructuredData{
				"description": "test2",
			},
		},
	},
})
Output: processor transformed record: --- before +++ after @@ -1,17 +1,16 @@ { "position": "dGVzdCBwb3NpdGlvbg==", "operation": "create", "metadata": { "metadata-key": "metadata-value" }, "key": { - "payload": { - "id": 27 + "id": 27 - }, + }, - "schema": {} - }, - "payload": { + "payload": { "before": null, - "after": "{\n\"payload\": {\n \"description\": \"test2\"\n},\n\"schema\": {}\n}" + "after": { + "description": "test2" + } } }
func NewKafkaConnectProcessor ¶
func NewKafkaConnectProcessor(log.CtxLogger) *KafkaConnectProcessor
func (*KafkaConnectProcessor) Process ¶ added in v0.13.4
func (u *KafkaConnectProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*KafkaConnectProcessor) Specification ¶ added in v0.13.4
func (u *KafkaConnectProcessor) Specification() (sdk.Specification, error)
type OpenCDCProcessor ¶ added in v0.13.4
type OpenCDCProcessor struct {
	sdk.UnimplementedProcessor
	// contains filtered or unexported fields
}
    Example ¶
p := NewOpenCDCProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
	Summary: "Unwrap an [OpenCDC record](https://conduit.io/docs/using/opencdc-record)",
	Description: "In this example we use the `unwrap.opencdc` processor to unwrap the [OpenCDC record](https://conduit.io/docs/using/opencdc-record) found in the " +
		"record's `.Payload.After` field.",
	Config: config.Config{},
	Have: opencdc.Record{
		Position:  opencdc.Position("wrapping position"),
		Key:       opencdc.RawData("wrapping key"),
		Operation: opencdc.OperationCreate,
		Metadata:  map[string]string{},
		Payload: opencdc.Change{
			Before: nil,
			After: opencdc.StructuredData{
				"position":  opencdc.Position("test-position"),
				"operation": opencdc.OperationUpdate,
				"key": map[string]interface{}{
					"id": "test-key",
				},
				"metadata": opencdc.Metadata{},
				"payload": opencdc.Change{
					After: opencdc.StructuredData{
						"msg":       "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d",
						"sensor_id": 1250383582,
						"triggered": false,
					},
				},
			},
		},
	},
	Want: sdk.SingleRecord{
		Position:  opencdc.Position("wrapping position"),
		Operation: opencdc.OperationUpdate,
		Key: opencdc.StructuredData{
			"id": "test-key",
		},
		Metadata: opencdc.Metadata{},
		Payload: opencdc.Change{
			After: opencdc.StructuredData{
				"msg":       "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d",
				"sensor_id": 1250383582,
				"triggered": false,
			},
		},
	},
})
Output: processor transformed record: --- before +++ after @@ -1,25 +1,16 @@ { "position": "d3JhcHBpbmcgcG9zaXRpb24=", - "operation": "create", + "operation": "update", "metadata": {}, - "key": "wrapping key", + "key": { + "id": "test-key" + }, "payload": { "before": null, "after": { - "key": { - "id": "test-key" - }, - "metadata": {}, - "operation": "update", - "payload": { - "before": null, - "after": { - "msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d", - "sensor_id": 1250383582, - "triggered": false - } - }, + "msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d", + "sensor_id": 1250383582, - "position": "dGVzdC1wb3NpdGlvbg==" + "triggered": false } } }
func NewOpenCDCProcessor ¶
func NewOpenCDCProcessor(logger log.CtxLogger) *OpenCDCProcessor
func (*OpenCDCProcessor) Process ¶ added in v0.13.4
func (u *OpenCDCProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*OpenCDCProcessor) Specification ¶ added in v0.13.4
func (u *OpenCDCProcessor) Specification() (sdk.Specification, error)
 Click to show internal directories. 
   Click to hide internal directories.