Documentation
¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DebeziumProcessor ¶ added in v0.13.4
type DebeziumProcessor struct { sdk.UnimplementedProcessor // contains filtered or unexported fields }
Example ¶
p := NewDebeziumProcessor(log.Nop()) exampleutil.RunExample(p, exampleutil.Example{ Summary: "Unwrap a Debezium record", Description: `This example how to unwrap a Debezium record from a field nested in a record's ` + "`.Payload.After`" + ` field. It additionally shows how the key is unwrapped, and the metadata merged.`, Config: config.Config{ "field": ".Payload.After.nested", }, Have: opencdc.Record{ Position: opencdc.Position("test-position"), Operation: opencdc.OperationCreate, Key: opencdc.RawData(`{"payload":"27"}`), Metadata: opencdc.Metadata{"metadata-key": "metadata-value"}, Payload: opencdc.Change{ After: opencdc.StructuredData{ "nested": `{ "payload": { "after": { "description": "test1", "id": 27 }, "before": null, "op": "c", "source": { "opencdc.readAt": "1674061777225877000", "opencdc.version": "v1" }, "transaction": null, "ts_ms": 1674061777225 }, "schema": {} }`, }, }, }, Want: sdk.SingleRecord{ Position: opencdc.Position("test-position"), Key: opencdc.RawData("27"), Operation: opencdc.OperationCreate, Metadata: opencdc.Metadata{ "metadata-key": "metadata-value", "opencdc.readAt": "1674061777225877000", "opencdc.version": "v1", }, Payload: opencdc.Change{ After: opencdc.StructuredData{ "description": "test1", "id": float64(27), }, }, }, })
Output: processor transformed record: --- before +++ after @@ -1,14 +1,17 @@ { "position": "dGVzdC1wb3NpdGlvbg==", "operation": "create", "metadata": { - "metadata-key": "metadata-value" + "metadata-key": "metadata-value", - }, - "key": "{\"payload\":\"27\"}", - "payload": { - "before": null, - "after": { - "nested": "{\n \"payload\": {\n \"after\": {\n \"description\": \"test1\",\n \"id\": 27\n },\n \"before\": null,\n \"op\": \"c\",\n \"source\": {\n \"opencdc.readAt\": \"1674061777225877000\",\n \"opencdc.version\": \"v1\"\n },\n \"transaction\": null,\n \"ts_ms\": 1674061777225\n },\n \"schema\": {}\n}" + "opencdc.readAt": "1674061777225877000", + "opencdc.version": "v1" + }, + "key": "27", + "payload": { + "before": null, + "after": { + "description": "test1", + "id": 27 } } }
func NewDebeziumProcessor ¶
func NewDebeziumProcessor(logger log.CtxLogger) *DebeziumProcessor
func (*DebeziumProcessor) Process ¶ added in v0.13.4
func (d *DebeziumProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*DebeziumProcessor) Specification ¶ added in v0.13.4
func (d *DebeziumProcessor) Specification() (sdk.Specification, error)
type KafkaConnectProcessor ¶ added in v0.13.4
type KafkaConnectProcessor struct { sdk.UnimplementedProcessor // contains filtered or unexported fields }
Example ¶
p := NewKafkaConnectProcessor(log.Nop()) exampleutil.RunExample(p, exampleutil.Example{ Summary: "Unwrap a Kafka Connect record", Description: `This example shows how to unwrap a Kafka Connect record. The Kafka Connect record is serialized as a JSON string in the ` + "`.Payload.After`" + ` field (raw data). The Kafka Connect record's payload will replace the [OpenCDC record](https://conduit.io/docs/using/opencdc-record)'s payload. We also see how the key is unwrapped too. In this case, the key comes in as structured data.`, Config: config.Config{}, Have: opencdc.Record{ Position: opencdc.Position("test position"), Operation: opencdc.OperationCreate, Metadata: opencdc.Metadata{ "metadata-key": "metadata-value", }, Key: opencdc.StructuredData{ "payload": map[string]interface{}{ "id": 27, }, "schema": map[string]interface{}{}, }, Payload: opencdc.Change{ After: opencdc.RawData(`{ "payload": { "description": "test2" }, "schema": {} }`), }, }, Want: sdk.SingleRecord{ Position: opencdc.Position("test position"), Operation: opencdc.OperationCreate, Metadata: opencdc.Metadata{ "metadata-key": "metadata-value", }, Key: opencdc.StructuredData{"id": 27}, Payload: opencdc.Change{ After: opencdc.StructuredData{ "description": "test2", }, }, }, })
Output: processor transformed record: --- before +++ after @@ -1,17 +1,16 @@ { "position": "dGVzdCBwb3NpdGlvbg==", "operation": "create", "metadata": { "metadata-key": "metadata-value" }, "key": { - "payload": { - "id": 27 + "id": 27 - }, + }, - "schema": {} - }, - "payload": { + "payload": { "before": null, - "after": "{\n\"payload\": {\n \"description\": \"test2\"\n},\n\"schema\": {}\n}" + "after": { + "description": "test2" + } } }
func NewKafkaConnectProcessor ¶
func NewKafkaConnectProcessor(log.CtxLogger) *KafkaConnectProcessor
func (*KafkaConnectProcessor) Process ¶ added in v0.13.4
func (u *KafkaConnectProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*KafkaConnectProcessor) Specification ¶ added in v0.13.4
func (u *KafkaConnectProcessor) Specification() (sdk.Specification, error)
type OpenCDCProcessor ¶ added in v0.13.4
type OpenCDCProcessor struct { sdk.UnimplementedProcessor // contains filtered or unexported fields }
Example ¶
p := NewOpenCDCProcessor(log.Nop()) exampleutil.RunExample(p, exampleutil.Example{ Summary: "Unwrap an [OpenCDC record](https://conduit.io/docs/using/opencdc-record)", Description: "In this example we use the `unwrap.opencdc` processor to unwrap the [OpenCDC record](https://conduit.io/docs/using/opencdc-record) found in the " + "record's `.Payload.After` field.", Config: config.Config{}, Have: opencdc.Record{ Position: opencdc.Position("wrapping position"), Key: opencdc.RawData("wrapping key"), Operation: opencdc.OperationCreate, Metadata: map[string]string{}, Payload: opencdc.Change{ Before: nil, After: opencdc.StructuredData{ "position": opencdc.Position("test-position"), "operation": opencdc.OperationUpdate, "key": map[string]interface{}{ "id": "test-key", }, "metadata": opencdc.Metadata{}, "payload": opencdc.Change{ After: opencdc.StructuredData{ "msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d", "sensor_id": 1250383582, "triggered": false, }, }, }, }, }, Want: sdk.SingleRecord{ Position: opencdc.Position("wrapping position"), Operation: opencdc.OperationUpdate, Key: opencdc.StructuredData{ "id": "test-key", }, Metadata: opencdc.Metadata{}, Payload: opencdc.Change{ After: opencdc.StructuredData{ "msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d", "sensor_id": 1250383582, "triggered": false, }, }, }, })
Output: processor transformed record: --- before +++ after @@ -1,25 +1,16 @@ { "position": "d3JhcHBpbmcgcG9zaXRpb24=", - "operation": "create", + "operation": "update", "metadata": {}, - "key": "wrapping key", + "key": { + "id": "test-key" + }, "payload": { "before": null, "after": { - "key": { - "id": "test-key" - }, - "metadata": {}, - "operation": "update", - "payload": { - "before": null, - "after": { - "msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d", - "sensor_id": 1250383582, - "triggered": false - } - }, + "msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d", + "sensor_id": 1250383582, - "position": "dGVzdC1wb3NpdGlvbg==" + "triggered": false } } }
func NewOpenCDCProcessor ¶
func NewOpenCDCProcessor(logger log.CtxLogger) *OpenCDCProcessor
func (*OpenCDCProcessor) Process ¶ added in v0.13.4
func (u *OpenCDCProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*OpenCDCProcessor) Specification ¶ added in v0.13.4
func (u *OpenCDCProcessor) Specification() (sdk.Specification, error)
Click to show internal directories.
Click to hide internal directories.