 Documentation
      ¶
      Documentation
      ¶
    
    
  
    
  
    Index ¶
- func NewErrorProcessor(log.CtxLogger) sdk.Processor
- type CloneProcessor
- type ErrorProcessor
- func (p *ErrorProcessor) Configure(ctx context.Context, cfg config.Config) error
- func (*ErrorProcessor) MiddlewareOptions() []sdk.ProcessorMiddlewareOption
- func (p *ErrorProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
- func (p *ErrorProcessor) Specification() (sdk.Specification, error)
 
- type FilterProcessor
- type SplitProcessor
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type CloneProcessor ¶ added in v0.14.0
type CloneProcessor struct {
	sdk.UnimplementedProcessor
	// contains filtered or unexported fields
}
    Example (Simple) ¶
p := NewCloneProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
	Summary:     "Clone record into multiple records",
	Description: "This example takes a record and clones it once, producing 2 records, each containing the same data, except for the metadata field `clone.index`.",
	Config:      config.Config{"count": "1"},
	Have: opencdc.Record{
		Operation: opencdc.OperationCreate,
		Metadata:  map[string]string{"foo": "bar"},
		Key:       opencdc.StructuredData{"id": 123},
		Payload: opencdc.Change{After: opencdc.StructuredData{
			"name": "Alice",
			"age":  30,
		}},
	},
	Want: sdk.MultiRecord{
		opencdc.Record{
			Operation: opencdc.OperationCreate,
			Metadata: map[string]string{
				"foo":         "bar",
				"clone.index": "0",
			},
			Key: opencdc.StructuredData{"id": 123},
			Payload: opencdc.Change{After: opencdc.StructuredData{
				"name": "Alice",
				"age":  30,
			}},
		},
		opencdc.Record{
			Operation: opencdc.OperationCreate,
			Metadata: map[string]string{
				"foo":         "bar",
				"clone.index": "1",
			},
			Key: opencdc.StructuredData{"id": 123},
			Payload: opencdc.Change{After: opencdc.StructuredData{
				"name": "Alice",
				"age":  30,
			}},
		},
	},
})
Output: processor transformed record: --- before +++ after @@ -1,17 +1,38 @@ -{ +[ + { - "position": null, + "position": null, - "operation": "create", + "operation": "create", - "metadata": { + "metadata": { - "foo": "bar" + "clone.index": "0", + "foo": "bar" + }, + "key": { + "id": 123 + }, + "payload": { + "before": null, + "after": { + "age": 30, + "name": "Alice" + } + } + }, + { + "position": null, + "operation": "create", + "metadata": { + "clone.index": "1", + "foo": "bar" - }, + }, - "key": { + "key": { - "id": 123 + "id": 123 - }, + }, - "payload": { + "payload": { - "before": null, + "before": null, - "after": { + "after": { - "age": 30, + "age": 30, - "name": "Alice" + "name": "Alice" - } + } - } + } -} + } +]
func NewCloneProcessor ¶ added in v0.14.0
func NewCloneProcessor(log.CtxLogger) *CloneProcessor
func (*CloneProcessor) Process ¶ added in v0.14.0
func (p *CloneProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*CloneProcessor) Specification ¶ added in v0.14.0
func (p *CloneProcessor) Specification() (sdk.Specification, error)
type ErrorProcessor ¶ added in v0.13.4
type ErrorProcessor struct {
	sdk.UnimplementedProcessor
	// contains filtered or unexported fields
}
    Example ¶
p := NewErrorProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
	Summary: `Error record with custom error message`,
	Description: `This example shows how to configure the error processor to
return a custom error message for a record using a Go template.`,
	Config: config.Config{
		"message": "custom error message with data from record: {{.Metadata.foo}}",
	},
	Have: opencdc.Record{
		Operation: opencdc.OperationCreate,
		Metadata:  map[string]string{"foo": "bar"},
		Payload:   opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}, Before: opencdc.StructuredData{"bar": "baz"}},
	},
	Want: sdk.ErrorRecord{
		Error: cerrors.New("custom error message with data from record: bar"),
	},
})
Output: processor returned error: custom error message with data from record: bar
func (*ErrorProcessor) MiddlewareOptions ¶ added in v0.13.4
func (*ErrorProcessor) MiddlewareOptions() []sdk.ProcessorMiddlewareOption
func (*ErrorProcessor) Process ¶ added in v0.13.4
func (p *ErrorProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*ErrorProcessor) Specification ¶ added in v0.13.4
func (p *ErrorProcessor) Specification() (sdk.Specification, error)
type FilterProcessor ¶ added in v0.13.4
type FilterProcessor struct {
	sdk.UnimplementedProcessor
}
    Example ¶
p := NewFilterProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
	Summary: `Filter out the record`,
	Config:  config.Config{},
	Have: opencdc.Record{
		Operation: opencdc.OperationCreate,
		Metadata:  map[string]string{"key1": "val1"},
		Payload:   opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}, Before: opencdc.StructuredData{"bar": "baz"}},
	},
	Want: sdk.FilterRecord{},
})
Output: processor filtered record out
func NewFilterProcessor ¶
func NewFilterProcessor(log.CtxLogger) *FilterProcessor
func (*FilterProcessor) MiddlewareOptions ¶ added in v0.13.4
func (*FilterProcessor) MiddlewareOptions() []sdk.ProcessorMiddlewareOption
func (*FilterProcessor) Process ¶ added in v0.13.4
func (p *FilterProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*FilterProcessor) Specification ¶ added in v0.13.4
func (p *FilterProcessor) Specification() (sdk.Specification, error)
type SplitProcessor ¶ added in v0.14.0
type SplitProcessor struct {
	sdk.UnimplementedProcessor
	// contains filtered or unexported fields
}
    Example (Simple) ¶
p := NewSplitProcessor(log.Nop())
exampleutil.RunExample(p, exampleutil.Example{
	Summary:     "Split array into multiple records",
	Description: "This example takes the array in field `.Payload.After.users` and splits it into separate records, each containing one element.",
	Config:      config.Config{"field": ".Payload.After.users"},
	Have: opencdc.Record{
		Operation: opencdc.OperationUpdate,
		Key:       opencdc.StructuredData{"id": 123},
		Payload: opencdc.Change{After: opencdc.StructuredData{
			"users": []map[string]any{
				{"name": "Alice", "age": 30},
				{"name": "Bob", "age": 25},
				{"name": "Charlie", "age": 35},
			},
		}},
	},
	Want: sdk.MultiRecord{
		opencdc.Record{
			Operation: opencdc.OperationUpdate,
			Metadata:  map[string]string{"split.index": "0"},
			Key:       opencdc.StructuredData{"id": 123},
			Payload: opencdc.Change{After: opencdc.StructuredData{
				"users": map[string]any{"name": "Alice", "age": 30},
			}},
		},
		opencdc.Record{
			Operation: opencdc.OperationUpdate,
			Metadata:  map[string]string{"split.index": "1"},
			Key:       opencdc.StructuredData{"id": 123},
			Payload: opencdc.Change{After: opencdc.StructuredData{
				"users": map[string]any{"name": "Bob", "age": 25},
			}},
		},
		opencdc.Record{
			Operation: opencdc.OperationUpdate,
			Metadata:  map[string]string{"split.index": "2"},
			Key:       opencdc.StructuredData{"id": 123},
			Payload: opencdc.Change{After: opencdc.StructuredData{
				"users": map[string]any{"name": "Charlie", "age": 35},
			}},
		},
	},
})
Output: processor transformed record: --- before +++ after @@ -1,27 +1,59 @@ -{ +[ + { - "position": null, + "position": null, - "operation": "update", + "operation": "update", - "metadata": null, + "metadata": { + "split.index": "0" + }, - "key": { + "key": { - "id": 123 + "id": 123 - }, + }, - "payload": { - "before": null, - "after": { - "users": [ - { - "age": 30, - "name": "Alice" - }, - { - "age": 25, + "payload": { + "before": null, + "after": { + "users": { + "age": 30, + "name": "Alice" + } + } + } + }, + { + "position": null, + "operation": "update", + "metadata": { + "split.index": "1" + }, + "key": { + "id": 123 + }, + "payload": { + "before": null, + "after": { + "users": { + "age": 25, + "name": "Bob" + } + } + } + }, + { + "position": null, + "operation": "update", + "metadata": { + "split.index": "2" + }, + "key": { + "id": 123 + }, + "payload": { + "before": null, - "name": "Bob" + "after": { - }, - { + "users": { "age": 35, "name": "Charlie" } - ] + } } } -} +]
func NewSplitProcessor ¶ added in v0.14.0
func NewSplitProcessor(log.CtxLogger) *SplitProcessor
func (*SplitProcessor) Process ¶ added in v0.14.0
func (p *SplitProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*SplitProcessor) Specification ¶ added in v0.14.0
func (p *SplitProcessor) Specification() (sdk.Specification, error)
       Source Files
      ¶
      Source Files
      ¶
    
   Click to show internal directories. 
   Click to hide internal directories.