Documentation
¶
Overview ¶
Package avro is a generated GoMock package.
Package avro is a generated GoMock package.
Index ¶
- type DecodeProcessor
- func (p *DecodeProcessor) Configure(ctx context.Context, c config.Config) error
- func (p *DecodeProcessor) Open(context.Context) error
- func (p *DecodeProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord
- func (p *DecodeProcessor) SetSchemaRegistry(registry schemaregistry.Registry)
- func (p *DecodeProcessor) Specification() (sdk.Specification, error)
- func (p *DecodeProcessor) Teardown(ctx context.Context) error
- type EncodeProcessor
- func (p *EncodeProcessor) Configure(ctx context.Context, c config.Config) error
- func (p *EncodeProcessor) Open(context.Context) error
- func (p *EncodeProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord
- func (p *EncodeProcessor) SetSchemaRegistry(registry schemaregistry.Registry)
- func (p *EncodeProcessor) Specification() (sdk.Specification, error)
- func (p *EncodeProcessor) Teardown(context.Context) error
- type MockDecoder
- type MockDecoderDecodeCall
- func (c *MockDecoderDecodeCall) Do(f func(context.Context, opencdc.RawData) (opencdc.StructuredData, error)) *MockDecoderDecodeCall
- func (c *MockDecoderDecodeCall) DoAndReturn(f func(context.Context, opencdc.RawData) (opencdc.StructuredData, error)) *MockDecoderDecodeCall
- func (c *MockDecoderDecodeCall) Return(arg0 opencdc.StructuredData, arg1 error) *MockDecoderDecodeCall
- type MockDecoderMockRecorder
- type MockEncoder
- type MockEncoderEncodeCall
- func (c *MockEncoderEncodeCall) Do(f func(context.Context, opencdc.StructuredData) (opencdc.RawData, error)) *MockEncoderEncodeCall
- func (c *MockEncoderEncodeCall) DoAndReturn(f func(context.Context, opencdc.StructuredData) (opencdc.RawData, error)) *MockEncoderEncodeCall
- func (c *MockEncoderEncodeCall) Return(arg0 opencdc.RawData, arg1 error) *MockEncoderEncodeCall
- type MockEncoderMockRecorder
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DecodeProcessor ¶ added in v0.13.4
type DecodeProcessor struct {
sdk.UnimplementedProcessor
// contains filtered or unexported fields
}
Example ¶
url, cleanup := schemaregistrytest.ExampleSchemaRegistryURL("ExampleDecodeProcessor", 54322)
defer cleanup()
client, err := schemaregistry.NewClient(log.Nop(), sr.URLs(url))
if err != nil {
panic(fmt.Sprintf("failed to create schema registry client: %v", err))
}
_, err = client.CreateSchema(context.Background(), "example-decode", sr.Schema{
Type: sr.TypeAvro,
Schema: `
{
"type":"record",
"name":"record",
"fields":[
{"name":"myString","type":"string"},
{"name":"myInt","type":"int"}
]
}`,
})
if err != nil {
panic(fmt.Sprintf("failed to create schema: %v", err))
}
p := NewDecodeProcessor(log.Nop())
p.SetSchemaRegistry(client)
exampleutil.RunExample(p, exampleutil.Example{
Summary: "Decode a record field in Avro format",
Description: `This example shows the usage of the ` + "`avro.decode`" + ` processor.
The processor decodes the record's` + "`.Key`" + ` field using the schema that is
downloaded from the schema registry and needs to exist under the subject` + "`example-decode`" + `.
In this example we use the following schema:
` + "```json" + `
{
"type":"record",
"name":"record",
"fields":[
{"name":"myString","type":"string"},
{"name":"myInt","type":"int"}
]
}
` + "```",
Config: config.Config{
"field": ".Key",
},
Have: opencdc.Record{
Position: opencdc.Position("test-position"),
Operation: opencdc.OperationCreate,
Metadata: map[string]string{"key1": "val1"},
Key: opencdc.RawData([]byte{0, 0, 0, 0, 1, 6, 98, 97, 114, 2}),
},
Want: sdk.SingleRecord{
Position: opencdc.Position("test-position"),
Operation: opencdc.OperationCreate,
Metadata: map[string]string{"key1": "val1"},
Key: opencdc.StructuredData{
"myString": "bar",
"myInt": 1,
},
},
})
Output: processor transformed record: --- before +++ after @@ -1,12 +1,15 @@ { "position": "dGVzdC1wb3NpdGlvbg==", "operation": "create", "metadata": { "key1": "val1" }, - "key": "\u0000\u0000\u0000\u0000\u0001\u0006bar\u0002", + "key": { + "myInt": 1, + "myString": "bar" + }, "payload": { "before": null, "after": null } }
func NewDecodeProcessor ¶
func NewDecodeProcessor(logger log.CtxLogger) *DecodeProcessor
func (*DecodeProcessor) Open ¶ added in v0.13.4
func (p *DecodeProcessor) Open(context.Context) error
func (*DecodeProcessor) Process ¶ added in v0.13.4
func (p *DecodeProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*DecodeProcessor) SetSchemaRegistry ¶ added in v0.13.4
func (p *DecodeProcessor) SetSchemaRegistry(registry schemaregistry.Registry)
func (*DecodeProcessor) Specification ¶ added in v0.13.4
func (p *DecodeProcessor) Specification() (sdk.Specification, error)
type EncodeProcessor ¶ added in v0.13.4
type EncodeProcessor struct {
sdk.UnimplementedProcessor
// contains filtered or unexported fields
}
Example (AutoRegister) ¶
url, cleanup := schemaregistrytest.ExampleSchemaRegistryURL("ExampleEncodeProcessor_autoRegister", 54322)
defer cleanup()
client, err := schemaregistry.NewClient(log.Nop(), sr.URLs(url))
if err != nil {
panic(fmt.Sprintf("failed to create schema registry client: %v", err))
}
p := NewEncodeProcessor(log.Nop())
p.SetSchemaRegistry(client)
exampleutil.RunExample(p, exampleutil.Example{
Summary: "Auto-register schema",
Description: `This example shows the usage of the ` + "`avro.encode`" + ` processor
with the ` + "`autoRegister`" + ` schema strategy. The processor encodes the record's
` + "`.Payload.After`" + ` field using the schema that is extracted from the data
and registered on the fly under the subject ` + "`example-autoRegister`" + `.`,
Config: config.Config{
"schema.strategy": "autoRegister",
"schema.autoRegister.subject": "example-autoRegister",
},
Have: opencdc.Record{
Position: opencdc.Position("test-position"),
Operation: opencdc.OperationCreate,
Metadata: map[string]string{"key1": "val1"},
Payload: opencdc.Change{
After: opencdc.StructuredData{
"myString": "bar",
"myInt": 1,
"myFloat": 2.3,
"myMap": map[string]any{
"foo": true,
"bar": 2.2,
},
"myStruct": opencdc.StructuredData{
"foo": 1,
"bar": false,
},
},
},
},
Want: sdk.SingleRecord{
Position: opencdc.Position("test-position"),
Operation: opencdc.OperationCreate,
Metadata: map[string]string{"key1": "val1"},
Payload: opencdc.Change{
After: opencdc.RawData([]byte{0, 0, 0, 0, 1, 102, 102, 102, 102, 102, 102, 2, 64, 2, 154, 153, 153, 153, 153, 153, 1, 64, 1, 6, 98, 97, 114, 0, 2}),
},
},
})
Output: processor transformed record: --- before +++ after @@ -1,24 +1,12 @@ { "position": "dGVzdC1wb3NpdGlvbg==", "operation": "create", "metadata": { "key1": "val1" }, "key": null, "payload": { "before": null, - "after": { - "myFloat": 2.3, - "myInt": 1, - "myMap": { - "bar": 2.2, - "foo": true - }, - "myString": "bar", - "myStruct": { - "bar": false, - "foo": 1 - } - } + "after": "\u0000\u0000\u0000\u0000\u0001ffffff\u0002@\u0002\ufffd\ufffd\ufffd\ufffd\ufffd\ufffd\u0001@\u0001\u0006bar\u0000\u0002" } }
Example (PreRegistered) ¶
url, cleanup := schemaregistrytest.ExampleSchemaRegistryURL("ExampleEncodeProcessor_preRegistered", 54322)
defer cleanup()
client, err := schemaregistry.NewClient(log.Nop(), sr.URLs(url))
if err != nil {
panic(fmt.Sprintf("failed to create schema registry client: %v", err))
}
_, err = client.CreateSchema(context.Background(), "example-preRegistered", sr.Schema{
Type: sr.TypeAvro,
Schema: `
{
"type":"record",
"name":"record",
"fields":[
{"name":"myString","type":"string"},
{"name":"myInt","type":"int"}
]
}`,
})
if err != nil {
panic(fmt.Sprintf("failed to create schema: %v", err))
}
p := NewEncodeProcessor(log.Nop())
p.SetSchemaRegistry(client)
exampleutil.RunExample(p, exampleutil.Example{
Summary: "Pre-register schema",
Description: `This example shows the usage of the ` + "`avro.encode`" + ` processor
with the ` + "`preRegistered`" + ` schema strategy. When using this strategy, the
schema has to be manually pre-registered. In this example we use the following schema:
` + "```json" + `
{
"type":"record",
"name":"record",
"fields":[
{"name":"myString","type":"string"},
{"name":"myInt","type":"int"}
]
}
` + "```" + `
The processor encodes the record's` + "`.Key`" + ` field using the above schema.`,
Config: config.Config{
"schema.strategy": "preRegistered",
"schema.preRegistered.subject": "example-preRegistered",
"schema.preRegistered.version": "1",
"field": ".Key",
},
Have: opencdc.Record{
Position: opencdc.Position("test-position"),
Operation: opencdc.OperationCreate,
Metadata: map[string]string{"key1": "val1"},
Key: opencdc.StructuredData{
"myString": "bar",
"myInt": 1,
},
},
Want: sdk.SingleRecord{
Position: opencdc.Position("test-position"),
Operation: opencdc.OperationCreate,
Metadata: map[string]string{"key1": "val1"},
Key: opencdc.RawData([]byte{0, 0, 0, 0, 1, 6, 98, 97, 114, 2}),
},
})
Output: processor transformed record: --- before +++ after @@ -1,15 +1,12 @@ { "position": "dGVzdC1wb3NpdGlvbg==", "operation": "create", "metadata": { "key1": "val1" }, - "key": { - "myInt": 1, - "myString": "bar" - }, + "key": "\u0000\u0000\u0000\u0000\u0001\u0006bar\u0002", "payload": { "before": null, "after": null } }
func NewEncodeProcessor ¶
func NewEncodeProcessor(logger log.CtxLogger) *EncodeProcessor
func (*EncodeProcessor) Open ¶ added in v0.13.4
func (p *EncodeProcessor) Open(context.Context) error
func (*EncodeProcessor) Process ¶ added in v0.13.4
func (p *EncodeProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*EncodeProcessor) SetSchemaRegistry ¶ added in v0.13.4
func (p *EncodeProcessor) SetSchemaRegistry(registry schemaregistry.Registry)
func (*EncodeProcessor) Specification ¶ added in v0.13.4
func (p *EncodeProcessor) Specification() (sdk.Specification, error)
type MockDecoder ¶
type MockDecoder struct {
// contains filtered or unexported fields
}
MockDecoder is a mock of decoder interface.
func NewMockDecoder ¶
func NewMockDecoder(ctrl *gomock.Controller) *MockDecoder
NewMockDecoder creates a new mock instance.
func (*MockDecoder) Decode ¶
func (m *MockDecoder) Decode(ctx context.Context, b opencdc.RawData) (opencdc.StructuredData, error)
Decode mocks base method.
func (*MockDecoder) EXPECT ¶
func (m *MockDecoder) EXPECT() *MockDecoderMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
type MockDecoderDecodeCall ¶ added in v0.11.0
MockDecoderDecodeCall wrap *gomock.Call
func (*MockDecoderDecodeCall) Do ¶ added in v0.11.0
func (c *MockDecoderDecodeCall) Do(f func(context.Context, opencdc.RawData) (opencdc.StructuredData, error)) *MockDecoderDecodeCall
Do rewrite *gomock.Call.Do
func (*MockDecoderDecodeCall) DoAndReturn ¶ added in v0.11.0
func (c *MockDecoderDecodeCall) DoAndReturn(f func(context.Context, opencdc.RawData) (opencdc.StructuredData, error)) *MockDecoderDecodeCall
DoAndReturn rewrite *gomock.Call.DoAndReturn
func (*MockDecoderDecodeCall) Return ¶ added in v0.11.0
func (c *MockDecoderDecodeCall) Return(arg0 opencdc.StructuredData, arg1 error) *MockDecoderDecodeCall
Return rewrite *gomock.Call.Return
type MockDecoderMockRecorder ¶
type MockDecoderMockRecorder struct {
// contains filtered or unexported fields
}
MockDecoderMockRecorder is the mock recorder for MockDecoder.
func (*MockDecoderMockRecorder) Decode ¶
func (mr *MockDecoderMockRecorder) Decode(ctx, b any) *MockDecoderDecodeCall
Decode indicates an expected call of Decode.
type MockEncoder ¶
type MockEncoder struct {
// contains filtered or unexported fields
}
MockEncoder is a mock of encoder interface.
func NewMockEncoder ¶
func NewMockEncoder(ctrl *gomock.Controller) *MockEncoder
NewMockEncoder creates a new mock instance.
func (*MockEncoder) EXPECT ¶
func (m *MockEncoder) EXPECT() *MockEncoderMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockEncoder) Encode ¶
func (m *MockEncoder) Encode(ctx context.Context, sd opencdc.StructuredData) (opencdc.RawData, error)
Encode mocks base method.
type MockEncoderEncodeCall ¶ added in v0.11.0
MockEncoderEncodeCall wrap *gomock.Call
func (*MockEncoderEncodeCall) Do ¶ added in v0.11.0
func (c *MockEncoderEncodeCall) Do(f func(context.Context, opencdc.StructuredData) (opencdc.RawData, error)) *MockEncoderEncodeCall
Do rewrite *gomock.Call.Do
func (*MockEncoderEncodeCall) DoAndReturn ¶ added in v0.11.0
func (c *MockEncoderEncodeCall) DoAndReturn(f func(context.Context, opencdc.StructuredData) (opencdc.RawData, error)) *MockEncoderEncodeCall
DoAndReturn rewrite *gomock.Call.DoAndReturn
func (*MockEncoderEncodeCall) Return ¶ added in v0.11.0
func (c *MockEncoderEncodeCall) Return(arg0 opencdc.RawData, arg1 error) *MockEncoderEncodeCall
Return rewrite *gomock.Call.Return
type MockEncoderMockRecorder ¶
type MockEncoderMockRecorder struct {
// contains filtered or unexported fields
}
MockEncoderMockRecorder is the mock recorder for MockEncoder.
func (*MockEncoderMockRecorder) Encode ¶
func (mr *MockEncoderMockRecorder) Encode(ctx, sd any) *MockEncoderEncodeCall
Encode indicates an expected call of Encode.