Documentation
¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DebeziumProcessor ¶ added in v0.13.4
type DebeziumProcessor struct {
sdk.UnimplementedProcessor
// contains filtered or unexported fields
}
Example ¶
p := NewDebeziumProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
Summary: "Unwrap a Debezium record",
Description: `This example how to unwrap a Debezium record from a field nested in a record's
` + "`.Payload.After`" + ` field. It additionally shows how the key is unwrapped, and the metadata merged.`,
Config: config.Config{
"field": ".Payload.After.nested",
},
Have: opencdc.Record{
Position: opencdc.Position("test-position"),
Operation: opencdc.OperationCreate,
Key: opencdc.RawData(`{"payload":"27"}`),
Metadata: opencdc.Metadata{"metadata-key": "metadata-value"},
Payload: opencdc.Change{
After: opencdc.StructuredData{
"nested": `{
"payload": {
"after": {
"description": "test1",
"id": 27
},
"before": null,
"op": "c",
"source": {
"opencdc.readAt": "1674061777225877000",
"opencdc.version": "v1"
},
"transaction": null,
"ts_ms": 1674061777225
},
"schema": {}
}`,
},
},
},
Want: sdk.SingleRecord{
Position: opencdc.Position("test-position"),
Key: opencdc.RawData("27"),
Operation: opencdc.OperationCreate,
Metadata: opencdc.Metadata{
"metadata-key": "metadata-value",
"opencdc.readAt": "1674061777225877000",
"opencdc.version": "v1",
},
Payload: opencdc.Change{
After: opencdc.StructuredData{
"description": "test1",
"id": float64(27),
},
},
},
})
Output: processor transformed record: --- before +++ after @@ -1,14 +1,17 @@ { "position": "dGVzdC1wb3NpdGlvbg==", "operation": "create", "metadata": { - "metadata-key": "metadata-value" + "metadata-key": "metadata-value", - }, - "key": "{\"payload\":\"27\"}", - "payload": { - "before": null, - "after": { - "nested": "{\n \"payload\": {\n \"after\": {\n \"description\": \"test1\",\n \"id\": 27\n },\n \"before\": null,\n \"op\": \"c\",\n \"source\": {\n \"opencdc.readAt\": \"1674061777225877000\",\n \"opencdc.version\": \"v1\"\n },\n \"transaction\": null,\n \"ts_ms\": 1674061777225\n },\n \"schema\": {}\n}" + "opencdc.readAt": "1674061777225877000", + "opencdc.version": "v1" + }, + "key": "27", + "payload": { + "before": null, + "after": { + "description": "test1", + "id": 27 } } }
func NewDebeziumProcessor ¶
func NewDebeziumProcessor(logger log.CtxLogger) *DebeziumProcessor
func (*DebeziumProcessor) Process ¶ added in v0.13.4
func (d *DebeziumProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*DebeziumProcessor) Specification ¶ added in v0.13.4
func (d *DebeziumProcessor) Specification() (sdk.Specification, error)
type KafkaConnectProcessor ¶ added in v0.13.4
type KafkaConnectProcessor struct {
sdk.UnimplementedProcessor
// contains filtered or unexported fields
}
Example ¶
p := NewKafkaConnectProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
Summary: "Unwrap a Kafka Connect record",
Description: `This example shows how to unwrap a Kafka Connect record.
The Kafka Connect record is serialized as a JSON string in the ` + "`.Payload.After`" + ` field (raw data).
The Kafka Connect record's payload will replace the [OpenCDC record](https://conduit.io/docs/using/opencdc-record)'s payload.
We also see how the key is unwrapped too. In this case, the key comes in as structured data.`,
Config: config.Config{},
Have: opencdc.Record{
Position: opencdc.Position("test position"),
Operation: opencdc.OperationCreate,
Metadata: opencdc.Metadata{
"metadata-key": "metadata-value",
},
Key: opencdc.StructuredData{
"payload": map[string]interface{}{
"id": 27,
},
"schema": map[string]interface{}{},
},
Payload: opencdc.Change{
After: opencdc.RawData(`{
"payload": {
"description": "test2"
},
"schema": {}
}`),
},
},
Want: sdk.SingleRecord{
Position: opencdc.Position("test position"),
Operation: opencdc.OperationCreate,
Metadata: opencdc.Metadata{
"metadata-key": "metadata-value",
},
Key: opencdc.StructuredData{"id": 27},
Payload: opencdc.Change{
After: opencdc.StructuredData{
"description": "test2",
},
},
},
})
Output: processor transformed record: --- before +++ after @@ -1,17 +1,16 @@ { "position": "dGVzdCBwb3NpdGlvbg==", "operation": "create", "metadata": { "metadata-key": "metadata-value" }, "key": { - "payload": { - "id": 27 + "id": 27 - }, + }, - "schema": {} - }, - "payload": { + "payload": { "before": null, - "after": "{\n\"payload\": {\n \"description\": \"test2\"\n},\n\"schema\": {}\n}" + "after": { + "description": "test2" + } } }
func NewKafkaConnectProcessor ¶
func NewKafkaConnectProcessor(log.CtxLogger) *KafkaConnectProcessor
func (*KafkaConnectProcessor) Process ¶ added in v0.13.4
func (u *KafkaConnectProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*KafkaConnectProcessor) Specification ¶ added in v0.13.4
func (u *KafkaConnectProcessor) Specification() (sdk.Specification, error)
type OpenCDCProcessor ¶ added in v0.13.4
type OpenCDCProcessor struct {
sdk.UnimplementedProcessor
// contains filtered or unexported fields
}
Example ¶
p := NewOpenCDCProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
Summary: "Unwrap an [OpenCDC record](https://conduit.io/docs/using/opencdc-record)",
Description: "In this example we use the `unwrap.opencdc` processor to unwrap the [OpenCDC record](https://conduit.io/docs/using/opencdc-record) found in the " +
"record's `.Payload.After` field.",
Config: config.Config{},
Have: opencdc.Record{
Position: opencdc.Position("wrapping position"),
Key: opencdc.RawData("wrapping key"),
Operation: opencdc.OperationCreate,
Metadata: map[string]string{},
Payload: opencdc.Change{
Before: nil,
After: opencdc.StructuredData{
"position": opencdc.Position("test-position"),
"operation": opencdc.OperationUpdate,
"key": map[string]interface{}{
"id": "test-key",
},
"metadata": opencdc.Metadata{},
"payload": opencdc.Change{
After: opencdc.StructuredData{
"msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d",
"sensor_id": 1250383582,
"triggered": false,
},
},
},
},
},
Want: sdk.SingleRecord{
Position: opencdc.Position("wrapping position"),
Operation: opencdc.OperationUpdate,
Key: opencdc.StructuredData{
"id": "test-key",
},
Metadata: opencdc.Metadata{},
Payload: opencdc.Change{
After: opencdc.StructuredData{
"msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d",
"sensor_id": 1250383582,
"triggered": false,
},
},
},
})
Output: processor transformed record: --- before +++ after @@ -1,25 +1,16 @@ { "position": "d3JhcHBpbmcgcG9zaXRpb24=", - "operation": "create", + "operation": "update", "metadata": {}, - "key": "wrapping key", + "key": { + "id": "test-key" + }, "payload": { "before": null, "after": { - "key": { - "id": "test-key" - }, - "metadata": {}, - "operation": "update", - "payload": { - "before": null, - "after": { - "msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d", - "sensor_id": 1250383582, - "triggered": false - } - }, + "msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d", + "sensor_id": 1250383582, - "position": "dGVzdC1wb3NpdGlvbg==" + "triggered": false } } }
func NewOpenCDCProcessor ¶
func NewOpenCDCProcessor(logger log.CtxLogger) *OpenCDCProcessor
func (*OpenCDCProcessor) Process ¶ added in v0.13.4
func (u *OpenCDCProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*OpenCDCProcessor) Specification ¶ added in v0.13.4
func (u *OpenCDCProcessor) Specification() (sdk.Specification, error)
Click to show internal directories.
Click to hide internal directories.