Documentation
¶
Overview ¶
Package schema_registry provides integration with Confluent Schema Registry.
This package enables schema management, validation, and evolution for Apache Kafka messages and other streaming data platforms. It supports multiple serialization formats including Avro, Protobuf, and JSON Schema.
Architecture ¶
This package follows the "accept interfaces, return structs" design pattern:
- Registry interface: Defines the contract for schema registry operations
- Client struct: Concrete implementation of the Registry interface
- NewClient constructor: Returns *Client (concrete type)
- FX module: Provides both *Client and Registry interface for dependency injection
Core Features:
- HTTP client for Confluent Schema Registry
- Schema registration and retrieval with caching
- Compatibility checking for schema evolution
- Confluent wire format encoding/decoding
- Serializers for Avro, Protobuf, and JSON Schema
- Generic wrapper for custom serializers
Direct Usage (Without FX) ¶
For simple applications or tests, create a client directly:
import "github.com/Aleph-Alpha/std/v1/schema_registry"
// Create schema registry client (returns concrete *Client)
client, err := schema_registry.NewClient(schema_registry.Config{
URL: "http://localhost:8081",
Username: "user", // Optional
Password: "password", // Optional
Timeout: 10 * time.Second,
})
if err != nil {
log.Fatal(err)
}
// Register a schema
avroSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}`
schemaID, err := client.RegisterSchema("users-value", avroSchema, "AVRO")
FX Module Integration ¶
For production applications using Uber's fx, use the FXModule which provides both the concrete type and interface:
import (
"go.uber.org/fx"
"github.com/Aleph-Alpha/std/v1/schema_registry"
)
app := fx.New(
schema_registry.FXModule, // Provides *Client and Registry interface
fx.Provide(
func() schema_registry.Config {
return schema_registry.Config{
URL: os.Getenv("SCHEMA_REGISTRY_URL"),
Username: os.Getenv("SCHEMA_REGISTRY_USER"),
Password: os.Getenv("SCHEMA_REGISTRY_PASSWORD"),
Timeout: 30 * time.Second,
}
},
),
fx.Invoke(func(client *schema_registry.Client) {
// Use concrete type directly
schemaID, _ := client.RegisterSchema("subject", schema, "AVRO")
}),
)
Observability (Observer Hook) ¶
Schema Registry supports optional observability through the Observer interface from the observability package. This allows external systems to track schema operations without coupling the package to specific metrics/tracing implementations.
Using WithObserver (non-FX usage):
client, err := schema_registry.NewClient(config)
if err != nil {
return err
}
client = client.WithObserver(myObserver).WithLogger(myLogger)
Using FX (automatic injection):
app := fx.New(
schema_registry.FXModule,
logger.FXModule, // Optional: provides logger
fx.Provide(
func() schema_registry.Config { return loadConfig() },
func() observability.Observer { return myObserver }, // Optional
),
)
The observer receives events for all schema operations:
- Component: "schema_registry"
- Operations: "get_schema_by_id", "get_latest_schema", "register_schema", "check_compatibility"
- Resource: subject name (or "registry" for ID lookups)
- SubResource: schema ID or version
- Duration: operation duration
- Error: any error that occurred
- Metadata: operation-specific details (e.g., cache_hit, schema_type, schema_id, is_compatible)
Type Aliases in Consumer Code ¶
To simplify your code and make it registry-agnostic, use type aliases:
package myapp
import stdRegistry "github.com/Aleph-Alpha/std/v1/schema_registry"
// Use type alias to reference std's interface
type SchemaRegistry = stdRegistry.Registry
// Now use SchemaRegistry throughout your codebase
func MyFunction(registry SchemaRegistry) {
registry.GetSchemaByID(schemaID)
}
This eliminates the need for adapters and allows you to switch implementations by only changing the alias definition.
Schema Operations ¶
import "github.com/linkedin/goavro/v2"
// Create Avro codec
codec, err := goavro.NewCodec(avroSchema)
if err != nil {
log.Fatal(err)
}
// Create Avro serializer
serializer, err := schema_registry.NewAvroSerializer(
schema_registry.AvroSerializerConfig{
Registry: client,
Subject: "users-value",
Schema: avroSchema,
MarshalFunc: func(data interface{}) ([]byte, error) {
return codec.BinaryFromNative(nil, data)
},
},
)
// Serialize data
user := map[string]interface{}{
"name": "John Doe",
"age": 30,
}
encoded, err := serializer.Serialize(user)
// encoded contains: [magic_byte][schema_id][avro_payload]
// Create Avro deserializer
deserializer, err := schema_registry.NewAvroDeserializer(
schema_registry.AvroDeserializerConfig{
Registry: client,
UnmarshalFunc: func(data []byte, target interface{}) error {
native, _, err := codec.NativeFromBinary(data)
if err != nil {
return err
}
// Handle conversion to target type
return nil
},
},
)
// Deserialize data
var result map[string]interface{}
err = deserializer.Deserialize(encoded, &result)
Using with Protobuf ¶
import "google.golang.org/protobuf/proto"
// Create Protobuf serializer
serializer, err := schema_registry.NewProtobufSerializer(
schema_registry.ProtobufSerializerConfig{
Registry: client,
Subject: "users-value",
Schema: protoSchema, // .proto file content as string
MarshalFunc: proto.Marshal,
},
)
// Serialize protobuf message
protoMsg := &pb.User{Name: "Jane", Age: 25}
encoded, err := serializer.Serialize(protoMsg)
// Create Protobuf deserializer
deserializer, err := schema_registry.NewProtobufDeserializer(
schema_registry.ProtobufDeserializerConfig{
Registry: client,
UnmarshalFunc: proto.Unmarshal,
},
)
// Deserialize
var user pb.User
err = deserializer.Deserialize(encoded, &user)
Using with JSON Schema ¶
// Create JSON serializer
serializer, err := schema_registry.NewJSONSerializer(
schema_registry.JSONSerializerConfig{
Registry: client,
Subject: "users-value",
Schema: `{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"name": {"type": "string"},
"age": {"type": "integer"}
}
}`,
},
)
// Serialize JSON
user := struct {
Name string `json:"name"`
Age int `json:"age"`
}{Name: "Alice", Age: 28}
encoded, err := serializer.Serialize(user)
// Deserialize
deserializer, err := schema_registry.NewJSONDeserializer(
schema_registry.JSONDeserializerConfig{
Registry: client,
},
)
var result struct {
Name string `json:"name"`
Age int `json:"age"`
}
err = deserializer.Deserialize(encoded, &result)
Wire Format ¶
All serializers produce messages in Confluent wire format:
[magic_byte (1 byte)] [schema_id (4 bytes, big-endian)] [payload]
The magic byte is always 0x0, followed by the schema ID, then the serialized payload. This format is compatible with all Confluent tools.
Schema Caching ¶
The client automatically caches schemas by ID and subject to minimize network calls to the Schema Registry. Caches are thread-safe and maintained in-memory for the lifetime of the client.
For more information, see the SCHEMA_REGISTRY.md documentation file.
Index ¶
- Variables
- func DecodeSchemaID(data []byte) (int, []byte, error)
- func EncodeSchemaID(schemaID int) []byte
- func RegisterSchemaRegistryLifecycle(params SchemaRegistryLifecycleParams)
- type AvroDeserializer
- type AvroDeserializerConfig
- type AvroSerializer
- type AvroSerializerConfig
- type Client
- func (c *Client) CheckCompatibility(subject, schema, schemaType string) (bool, error)
- func (c *Client) GetLatestSchema(subject string) (*Metadata, error)
- func (c *Client) GetSchemaByID(id int) (string, error)
- func (c *Client) RegisterSchema(subject, schema, schemaType string) (int, error)
- func (c *Client) WithLogger(logger Logger) *Client
- func (c *Client) WithObserver(observer observability.Observer) *Client
- type Config
- type Deserializer
- type JSONDeserializer
- type JSONDeserializerConfig
- type JSONSerializer
- type JSONSerializerConfig
- type Logger
- type Metadata
- type ProtobufDeserializer
- type ProtobufDeserializerConfig
- type ProtobufSerializer
- type ProtobufSerializerConfig
- type Registry
- type SchemaRegistryLifecycleParams
- type SchemaRegistryParams
- type Serializer
- type WrapperDeserializer
- type WrapperDeserializerConfig
- type WrapperSerializer
- type WrapperSerializerConfig
Constants ¶
This section is empty.
Variables ¶
var FXModule = fx.Module("schema_registry", fx.Provide( NewClientWithDI, fx.Annotate( func(c *Client) Registry { return c }, fx.As(new(Registry)), ), ), fx.Invoke(RegisterSchemaRegistryLifecycle), )
FXModule is an fx.Module that provides and configures the Schema Registry client. This module registers the Schema Registry client with the Fx dependency injection framework, making it available to other components in the application.
The module provides: 1. *Client (concrete type) for direct use 2. Registry interface for dependency injection 3. Lifecycle management for proper initialization
Usage:
app := fx.New(
schema_registry.FXModule,
fx.Provide(
func() schema_registry.Config {
return schema_registry.Config{
URL: "http://localhost:8081",
Username: "user",
Password: "pass",
}
},
),
)
Functions ¶
func DecodeSchemaID ¶
DecodeSchemaID decodes a schema ID from the Confluent wire format Returns the schema ID and the remaining payload (after the 5-byte header)
func EncodeSchemaID ¶
EncodeSchemaID encodes a schema ID in the Confluent wire format Format: [magic_byte][schema_id] - magic_byte: 0x0 (1 byte) - schema_id: 4 bytes (big-endian)
func RegisterSchemaRegistryLifecycle ¶
func RegisterSchemaRegistryLifecycle(params SchemaRegistryLifecycleParams)
RegisterSchemaRegistryLifecycle registers the Schema Registry client with the fx lifecycle system. This function sets up proper initialization and graceful shutdown of the Schema Registry client.
Parameters:
- params: The lifecycle parameters containing the Schema Registry client
The function:
- On application start: Logs that the registry client is ready
- On application stop: Currently no cleanup needed (HTTP client is stateless)
This ensures that the Schema Registry client remains available throughout the application's lifetime and any future cleanup logic can be added here.
Types ¶
type AvroDeserializer ¶
type AvroDeserializer struct {
*WrapperDeserializer
}
AvroDeserializer is a convenience wrapper for Avro with schema registry
func NewAvroDeserializer ¶
func NewAvroDeserializer(config AvroDeserializerConfig) (*AvroDeserializer, error)
NewAvroDeserializer creates an Avro deserializer with schema registry support
type AvroDeserializerConfig ¶
type AvroDeserializerConfig struct {
Registry Registry
UnmarshalFunc func([]byte, interface{}) error // Avro decoding function
}
AvroDeserializerConfig holds configuration for Avro deserializer
type AvroSerializer ¶
type AvroSerializer struct {
*WrapperSerializer
}
AvroSerializer is a convenience wrapper for Avro with schema registry
func NewAvroSerializer ¶
func NewAvroSerializer(config AvroSerializerConfig) (*AvroSerializer, error)
NewAvroSerializer creates an Avro serializer with schema registry support
type AvroSerializerConfig ¶
type AvroSerializerConfig struct {
Registry Registry
Subject string
Schema string
MarshalFunc func(interface{}) ([]byte, error) // Avro encoding function
}
AvroSerializerConfig holds configuration for Avro serializer
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is the default implementation of Registry that communicates with Confluent Schema Registry over HTTP.
func NewClientWithDI ¶
func NewClientWithDI(params SchemaRegistryParams) (*Client, error)
NewClientWithDI creates a new Schema Registry client using dependency injection. This function is designed to be used with Uber's fx dependency injection framework where dependencies are automatically provided via the SchemaRegistryParams struct.
Returns the concrete *Client type.
Parameters:
- params: A SchemaRegistryParams struct that contains the Config instance required to initialize the Schema Registry client. This struct embeds fx.In to enable automatic injection of these dependencies.
Returns:
- *Client: A fully initialized Schema Registry client ready for use.
Example usage with fx:
app := fx.New(
schema_registry.FXModule,
fx.Provide(
func() schema_registry.Config {
return schema_registry.Config{
URL: os.Getenv("SCHEMA_REGISTRY_URL"),
Username: os.Getenv("SCHEMA_REGISTRY_USER"),
Password: os.Getenv("SCHEMA_REGISTRY_PASSWORD"),
Timeout: 30 * time.Second,
}
},
),
)
func (*Client) CheckCompatibility ¶
CheckCompatibility checks if a schema is compatible with the existing schema for a subject
func (*Client) GetLatestSchema ¶
GetLatestSchema retrieves the latest version of a schema for a subject
func (*Client) GetSchemaByID ¶
GetSchemaByID retrieves a schema from the registry by its ID
func (*Client) RegisterSchema ¶
RegisterSchema registers a new schema with the schema registry
func (*Client) WithLogger ¶ added in v0.13.0
WithLogger sets the logger for this client and returns the client for method chaining. The logger is used for structured logging of client operations and errors.
Example:
client := client.WithObserver(myObserver).WithLogger(myLogger)
func (*Client) WithObserver ¶ added in v0.13.0
func (c *Client) WithObserver(observer observability.Observer) *Client
WithObserver sets the observer for this client and returns the client for method chaining. The observer receives events about schema registry operations (e.g., register, get, check compatibility).
Example:
client := client.WithObserver(myObserver).WithLogger(myLogger)
type Config ¶
type Config struct {
// URL is the schema registry endpoint (e.g., "http://localhost:8081")
URL string
// Username for basic auth (optional)
Username string
// Password for basic auth (optional)
Password string
// Timeout for HTTP requests
Timeout time.Duration
}
Config holds configuration for schema registry client
type Deserializer ¶
Deserializer is the interface for decoding data
type JSONDeserializer ¶
type JSONDeserializer struct {
*WrapperDeserializer
}
JSONDeserializer is a convenience wrapper for JSON Schema with schema registry
func NewJSONDeserializer ¶
func NewJSONDeserializer(config JSONDeserializerConfig) (*JSONDeserializer, error)
NewJSONDeserializer creates a JSON deserializer with schema registry support
type JSONDeserializerConfig ¶
type JSONDeserializerConfig struct {
Registry Registry
}
JSONDeserializerConfig holds configuration for JSON deserializer
type JSONSerializer ¶
type JSONSerializer struct {
*WrapperSerializer
}
JSONSerializer is a convenience wrapper for JSON Schema with schema registry
func NewJSONSerializer ¶
func NewJSONSerializer(config JSONSerializerConfig) (*JSONSerializer, error)
NewJSONSerializer creates a JSON serializer with schema registry support
type JSONSerializerConfig ¶
JSONSerializerConfig holds configuration for JSON serializer
type Logger ¶ added in v0.13.0
type Logger interface {
// InfoWithContext logs an informational message with trace context.
InfoWithContext(ctx context.Context, msg string, err error, fields ...map[string]interface{})
// WarnWithContext logs a warning message with trace context.
WarnWithContext(ctx context.Context, msg string, err error, fields ...map[string]interface{})
// ErrorWithContext logs an error message with trace context.
ErrorWithContext(ctx context.Context, msg string, err error, fields ...map[string]interface{})
}
Logger is an interface that matches the std/v1/logger.Logger interface. It provides context-aware structured logging with optional error and field parameters.
type Metadata ¶
type Metadata struct {
ID int `json:"id"`
Version int `json:"version"`
Schema string `json:"schema"`
Subject string `json:"subject"`
Type string `json:"schemaType,omitempty"`
}
Metadata contains metadata about a registered schema
type ProtobufDeserializer ¶
type ProtobufDeserializer struct {
*WrapperDeserializer
}
ProtobufDeserializer is a convenience wrapper for Protobuf with schema registry
func NewProtobufDeserializer ¶
func NewProtobufDeserializer(config ProtobufDeserializerConfig) (*ProtobufDeserializer, error)
NewProtobufDeserializer creates a Protobuf deserializer with schema registry support
type ProtobufDeserializerConfig ¶
type ProtobufDeserializerConfig struct {
Registry Registry
UnmarshalFunc func([]byte, interface{}) error // Protobuf decoding function
}
ProtobufDeserializerConfig holds configuration for Protobuf deserializer
type ProtobufSerializer ¶
type ProtobufSerializer struct {
*WrapperSerializer
}
ProtobufSerializer is a convenience wrapper for Protobuf with schema registry
func NewProtobufSerializer ¶
func NewProtobufSerializer(config ProtobufSerializerConfig) (*ProtobufSerializer, error)
NewProtobufSerializer creates a Protobuf serializer with schema registry support
type ProtobufSerializerConfig ¶
type ProtobufSerializerConfig struct {
Registry Registry
Subject string
Schema string
MarshalFunc func(interface{}) ([]byte, error) // Protobuf encoding function
}
ProtobufSerializerConfig holds configuration for Protobuf serializer
type Registry ¶
type Registry interface {
// GetSchemaByID retrieves a schema by its ID
GetSchemaByID(id int) (string, error)
// GetLatestSchema retrieves the latest version of a schema for a subject
GetLatestSchema(subject string) (*Metadata, error)
// RegisterSchema registers a new schema for a subject
RegisterSchema(subject, schema, schemaType string) (int, error)
// CheckCompatibility checks if a schema is compatible with the latest version
CheckCompatibility(subject, schema, schemaType string) (bool, error)
}
Registry provides an interface for interacting with a Confluent Schema Registry. It handles schema registration, retrieval, and caching for efficient serialization.
type SchemaRegistryLifecycleParams ¶
SchemaRegistryLifecycleParams groups the dependencies needed for Schema Registry lifecycle management
type SchemaRegistryParams ¶
type SchemaRegistryParams struct {
fx.In
Config Config
Logger Logger `optional:"true"`
Observer observability.Observer `optional:"true"`
}
SchemaRegistryParams groups the dependencies needed to create a Schema Registry client
type Serializer ¶
Serializer is the interface for encoding data
type WrapperDeserializer ¶
type WrapperDeserializer struct {
// contains filtered or unexported fields
}
WrapperDeserializer wraps any deserializer with schema registry support. It automatically retrieves schemas and strips the Confluent wire format header.
func NewWrapperDeserializer ¶
func NewWrapperDeserializer(config WrapperDeserializerConfig) (*WrapperDeserializer, error)
NewWrapperDeserializer creates a new schema registry-aware deserializer
func (*WrapperDeserializer) Deserialize ¶
func (d *WrapperDeserializer) Deserialize(data []byte, target interface{}) error
Deserialize strips the schema registry header and decodes data
type WrapperDeserializerConfig ¶
type WrapperDeserializerConfig struct {
Registry Registry
InnerDeserializer Deserializer // Optional: for decoding data after schema registry unwrapping
}
WrapperDeserializerConfig holds configuration for schema registry deserializer
type WrapperSerializer ¶
type WrapperSerializer struct {
// contains filtered or unexported fields
}
WrapperSerializer wraps any serializer with schema registry support. It automatically registers schemas and prepends the Confluent wire format header.
func NewWrapperSerializer ¶
func NewWrapperSerializer(config WrapperSerializerConfig) (*WrapperSerializer, error)
NewWrapperSerializer creates a new schema registry-aware serializer
func (*WrapperSerializer) Serialize ¶
func (s *WrapperSerializer) Serialize(data interface{}) ([]byte, error)
Serialize encodes data and prepends the schema registry header
type WrapperSerializerConfig ¶
type WrapperSerializerConfig struct {
Registry Registry
Subject string
SchemaType string // "AVRO", "PROTOBUF", "JSON"
Schema string
InnerSerializer Serializer // Optional: for encoding data before schema registry wrapping
}
WrapperSerializerConfig holds configuration for schema registry serializer