schema_registry

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 2, 2026 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package schema_registry provides integration with Confluent Schema Registry.

This package enables schema management, validation, and evolution for Apache Kafka messages and other streaming data platforms. It supports multiple serialization formats including Avro, Protobuf, and JSON Schema.

Architecture

This package follows the "accept interfaces, return structs" design pattern:

  • Registry interface: Defines the contract for schema registry operations
  • Client struct: Concrete implementation of the Registry interface
  • NewClient constructor: Returns *Client (concrete type)
  • FX module: Provides both *Client and Registry interface for dependency injection

Core Features:

  • HTTP client for Confluent Schema Registry
  • Schema registration and retrieval with caching
  • Compatibility checking for schema evolution
  • Confluent wire format encoding/decoding
  • Serializers for Avro, Protobuf, and JSON Schema
  • Generic wrapper for custom serializers

Direct Usage (Without FX)

For simple applications or tests, create a client directly:

import "github.com/Aleph-Alpha/std/v1/schema_registry"

// Create schema registry client (returns concrete *Client)
client, err := schema_registry.NewClient(schema_registry.Config{
    URL:      "http://localhost:8081",
    Username: "user",     // Optional
    Password: "password", // Optional
    Timeout:  10 * time.Second,
})
if err != nil {
    log.Fatal(err)
}

// Register a schema
avroSchema := `{
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "age", "type": "int"}
    ]
}`

schemaID, err := client.RegisterSchema("users-value", avroSchema, "AVRO")

FX Module Integration

For production applications using Uber's fx, use the FXModule which provides both the concrete type and interface:

import (
    "go.uber.org/fx"
    "github.com/Aleph-Alpha/std/v1/schema_registry"
)

app := fx.New(
    schema_registry.FXModule, // Provides *Client and Registry interface
    fx.Provide(
        func() schema_registry.Config {
            return schema_registry.Config{
                URL:      os.Getenv("SCHEMA_REGISTRY_URL"),
                Username: os.Getenv("SCHEMA_REGISTRY_USER"),
                Password: os.Getenv("SCHEMA_REGISTRY_PASSWORD"),
                Timeout:  30 * time.Second,
            }
        },
    ),
    fx.Invoke(func(client *schema_registry.Client) {
        // Use concrete type directly
        schemaID, _ := client.RegisterSchema("subject", schema, "AVRO")
    }),
)

Observability (Observer Hook)

Schema Registry supports optional observability through the Observer interface from the observability package. This allows external systems to track schema operations without coupling the package to specific metrics/tracing implementations.

Using WithObserver (non-FX usage):

client, err := schema_registry.NewClient(config)
if err != nil {
    return err
}
client = client.WithObserver(myObserver).WithLogger(myLogger)

Using FX (automatic injection):

app := fx.New(
    schema_registry.FXModule,
    logger.FXModule,  // Optional: provides logger
    fx.Provide(
        func() schema_registry.Config { return loadConfig() },
        func() observability.Observer { return myObserver },  // Optional
    ),
)

The observer receives events for all schema operations:

  • Component: "schema_registry"
  • Operations: "get_schema_by_id", "get_latest_schema", "register_schema", "check_compatibility"
  • Resource: subject name (or "registry" for ID lookups)
  • SubResource: schema ID or version
  • Duration: operation duration
  • Error: any error that occurred
  • Metadata: operation-specific details (e.g., cache_hit, schema_type, schema_id, is_compatible)

Type Aliases in Consumer Code

To simplify your code and make it registry-agnostic, use type aliases:

package myapp

import stdRegistry "github.com/Aleph-Alpha/std/v1/schema_registry"

// Use type alias to reference std's interface
type SchemaRegistry = stdRegistry.Registry

// Now use SchemaRegistry throughout your codebase
func MyFunction(registry SchemaRegistry) {
    registry.GetSchemaByID(schemaID)
}

This eliminates the need for adapters and allows you to switch implementations by only changing the alias definition.

Schema Operations

import "github.com/linkedin/goavro/v2"

// Create Avro codec
codec, err := goavro.NewCodec(avroSchema)
if err != nil {
    log.Fatal(err)
}

// Create Avro serializer
serializer, err := schema_registry.NewAvroSerializer(
    schema_registry.AvroSerializerConfig{
        Registry: client,
        Subject:  "users-value",
        Schema:   avroSchema,
        MarshalFunc: func(data interface{}) ([]byte, error) {
            return codec.BinaryFromNative(nil, data)
        },
    },
)

// Serialize data
user := map[string]interface{}{
    "name": "John Doe",
    "age":  30,
}
encoded, err := serializer.Serialize(user)
// encoded contains: [magic_byte][schema_id][avro_payload]

// Create Avro deserializer
deserializer, err := schema_registry.NewAvroDeserializer(
    schema_registry.AvroDeserializerConfig{
        Registry: client,
        UnmarshalFunc: func(data []byte, target interface{}) error {
            native, _, err := codec.NativeFromBinary(data)
            if err != nil {
                return err
            }
            // Handle conversion to target type
            return nil
        },
    },
)

// Deserialize data
var result map[string]interface{}
err = deserializer.Deserialize(encoded, &result)

Using with Protobuf

import "google.golang.org/protobuf/proto"

// Create Protobuf serializer
serializer, err := schema_registry.NewProtobufSerializer(
    schema_registry.ProtobufSerializerConfig{
        Registry:    client,
        Subject:     "users-value",
        Schema:      protoSchema, // .proto file content as string
        MarshalFunc: proto.Marshal,
    },
)

// Serialize protobuf message
protoMsg := &pb.User{Name: "Jane", Age: 25}
encoded, err := serializer.Serialize(protoMsg)

// Create Protobuf deserializer
deserializer, err := schema_registry.NewProtobufDeserializer(
    schema_registry.ProtobufDeserializerConfig{
        Registry:      client,
        UnmarshalFunc: proto.Unmarshal,
    },
)

// Deserialize
var user pb.User
err = deserializer.Deserialize(encoded, &user)

Using with JSON Schema

// Create JSON serializer
serializer, err := schema_registry.NewJSONSerializer(
    schema_registry.JSONSerializerConfig{
        Registry: client,
        Subject:  "users-value",
        Schema: `{
            "$schema": "http://json-schema.org/draft-07/schema#",
            "type": "object",
            "properties": {
                "name": {"type": "string"},
                "age": {"type": "integer"}
            }
        }`,
    },
)

// Serialize JSON
user := struct {
    Name string `json:"name"`
    Age  int    `json:"age"`
}{Name: "Alice", Age: 28}
encoded, err := serializer.Serialize(user)

// Deserialize
deserializer, err := schema_registry.NewJSONDeserializer(
    schema_registry.JSONDeserializerConfig{
        Registry: client,
    },
)
var result struct {
    Name string `json:"name"`
    Age  int    `json:"age"`
}
err = deserializer.Deserialize(encoded, &result)

Wire Format

All serializers produce messages in Confluent wire format:

[magic_byte (1 byte)] [schema_id (4 bytes, big-endian)] [payload]

The magic byte is always 0x0, followed by the schema ID, then the serialized payload. This format is compatible with all Confluent tools.

Schema Caching

The client automatically caches schemas by ID and subject to minimize network calls to the Schema Registry. Caches are thread-safe and maintained in-memory for the lifetime of the client.

For more information, see the SCHEMA_REGISTRY.md documentation file.

Index

Constants

This section is empty.

Variables

View Source
var FXModule = fx.Module("schema_registry",
	fx.Provide(
		NewClientWithDI,

		fx.Annotate(
			func(c *Client) Registry { return c },
			fx.As(new(Registry)),
		),
	),
	fx.Invoke(RegisterSchemaRegistryLifecycle),
)

FXModule is an fx.Module that provides and configures the Schema Registry client. This module registers the Schema Registry client with the Fx dependency injection framework, making it available to other components in the application.

The module provides: 1. *Client (concrete type) for direct use 2. Registry interface for dependency injection 3. Lifecycle management for proper initialization

Usage:

app := fx.New(
    schema_registry.FXModule,
    fx.Provide(
        func() schema_registry.Config {
            return schema_registry.Config{
                URL:      "http://localhost:8081",
                Username: "user",
                Password: "pass",
            }
        },
    ),
)

Functions

func DecodeSchemaID

func DecodeSchemaID(data []byte) (int, []byte, error)

DecodeSchemaID decodes a schema ID from the Confluent wire format Returns the schema ID and the remaining payload (after the 5-byte header)

func EncodeSchemaID

func EncodeSchemaID(schemaID int) []byte

EncodeSchemaID encodes a schema ID in the Confluent wire format Format: [magic_byte][schema_id] - magic_byte: 0x0 (1 byte) - schema_id: 4 bytes (big-endian)

func RegisterSchemaRegistryLifecycle

func RegisterSchemaRegistryLifecycle(params SchemaRegistryLifecycleParams)

RegisterSchemaRegistryLifecycle registers the Schema Registry client with the fx lifecycle system. This function sets up proper initialization and graceful shutdown of the Schema Registry client.

Parameters:

  • params: The lifecycle parameters containing the Schema Registry client

The function:

  1. On application start: Logs that the registry client is ready
  2. On application stop: Currently no cleanup needed (HTTP client is stateless)

This ensures that the Schema Registry client remains available throughout the application's lifetime and any future cleanup logic can be added here.

Types

type AvroDeserializer

type AvroDeserializer struct {
	*WrapperDeserializer
}

AvroDeserializer is a convenience wrapper for Avro with schema registry

func NewAvroDeserializer

func NewAvroDeserializer(config AvroDeserializerConfig) (*AvroDeserializer, error)

NewAvroDeserializer creates an Avro deserializer with schema registry support

type AvroDeserializerConfig

type AvroDeserializerConfig struct {
	Registry      Registry
	UnmarshalFunc func([]byte, interface{}) error // Avro decoding function
}

AvroDeserializerConfig holds configuration for Avro deserializer

type AvroSerializer

type AvroSerializer struct {
	*WrapperSerializer
}

AvroSerializer is a convenience wrapper for Avro with schema registry

func NewAvroSerializer

func NewAvroSerializer(config AvroSerializerConfig) (*AvroSerializer, error)

NewAvroSerializer creates an Avro serializer with schema registry support

type AvroSerializerConfig

type AvroSerializerConfig struct {
	Registry    Registry
	Subject     string
	Schema      string
	MarshalFunc func(interface{}) ([]byte, error) // Avro encoding function
}

AvroSerializerConfig holds configuration for Avro serializer

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is the default implementation of Registry that communicates with Confluent Schema Registry over HTTP.

func NewClient

func NewClient(config Config) (*Client, error)

NewClient creates a new schema registry client Returns the concrete *Client type.

func NewClientWithDI

func NewClientWithDI(params SchemaRegistryParams) (*Client, error)

NewClientWithDI creates a new Schema Registry client using dependency injection. This function is designed to be used with Uber's fx dependency injection framework where dependencies are automatically provided via the SchemaRegistryParams struct.

Returns the concrete *Client type.

Parameters:

  • params: A SchemaRegistryParams struct that contains the Config instance required to initialize the Schema Registry client. This struct embeds fx.In to enable automatic injection of these dependencies.

Returns:

  • *Client: A fully initialized Schema Registry client ready for use.

Example usage with fx:

app := fx.New(
    schema_registry.FXModule,
    fx.Provide(
        func() schema_registry.Config {
            return schema_registry.Config{
                URL:      os.Getenv("SCHEMA_REGISTRY_URL"),
                Username: os.Getenv("SCHEMA_REGISTRY_USER"),
                Password: os.Getenv("SCHEMA_REGISTRY_PASSWORD"),
                Timeout:  30 * time.Second,
            }
        },
    ),
)

func (*Client) CheckCompatibility

func (c *Client) CheckCompatibility(subject, schema, schemaType string) (bool, error)

CheckCompatibility checks if a schema is compatible with the existing schema for a subject

func (*Client) GetLatestSchema

func (c *Client) GetLatestSchema(subject string) (*Metadata, error)

GetLatestSchema retrieves the latest version of a schema for a subject

func (*Client) GetSchemaByID

func (c *Client) GetSchemaByID(id int) (string, error)

GetSchemaByID retrieves a schema from the registry by its ID

func (*Client) RegisterSchema

func (c *Client) RegisterSchema(subject, schema, schemaType string) (int, error)

RegisterSchema registers a new schema with the schema registry

func (*Client) WithLogger added in v0.13.0

func (c *Client) WithLogger(logger Logger) *Client

WithLogger sets the logger for this client and returns the client for method chaining. The logger is used for structured logging of client operations and errors.

Example:

client := client.WithObserver(myObserver).WithLogger(myLogger)

func (*Client) WithObserver added in v0.13.0

func (c *Client) WithObserver(observer observability.Observer) *Client

WithObserver sets the observer for this client and returns the client for method chaining. The observer receives events about schema registry operations (e.g., register, get, check compatibility).

Example:

client := client.WithObserver(myObserver).WithLogger(myLogger)

type Config

type Config struct {
	// URL is the schema registry endpoint (e.g., "http://localhost:8081")
	URL string

	// Username for basic auth (optional)
	Username string

	// Password for basic auth (optional)
	Password string

	// Timeout for HTTP requests
	Timeout time.Duration
}

Config holds configuration for schema registry client

type Deserializer

type Deserializer interface {
	Deserialize(data []byte, target interface{}) error
}

Deserializer is the interface for decoding data

type JSONDeserializer

type JSONDeserializer struct {
	*WrapperDeserializer
}

JSONDeserializer is a convenience wrapper for JSON Schema with schema registry

func NewJSONDeserializer

func NewJSONDeserializer(config JSONDeserializerConfig) (*JSONDeserializer, error)

NewJSONDeserializer creates a JSON deserializer with schema registry support

type JSONDeserializerConfig

type JSONDeserializerConfig struct {
	Registry Registry
}

JSONDeserializerConfig holds configuration for JSON deserializer

type JSONSerializer

type JSONSerializer struct {
	*WrapperSerializer
}

JSONSerializer is a convenience wrapper for JSON Schema with schema registry

func NewJSONSerializer

func NewJSONSerializer(config JSONSerializerConfig) (*JSONSerializer, error)

NewJSONSerializer creates a JSON serializer with schema registry support

type JSONSerializerConfig

type JSONSerializerConfig struct {
	Registry Registry
	Subject  string
	Schema   string
}

JSONSerializerConfig holds configuration for JSON serializer

type Logger added in v0.13.0

type Logger interface {
	// InfoWithContext logs an informational message with trace context.
	InfoWithContext(ctx context.Context, msg string, err error, fields ...map[string]interface{})

	// WarnWithContext logs a warning message with trace context.
	WarnWithContext(ctx context.Context, msg string, err error, fields ...map[string]interface{})

	// ErrorWithContext logs an error message with trace context.
	ErrorWithContext(ctx context.Context, msg string, err error, fields ...map[string]interface{})
}

Logger is an interface that matches the std/v1/logger.Logger interface. It provides context-aware structured logging with optional error and field parameters.

type Metadata

type Metadata struct {
	ID      int    `json:"id"`
	Version int    `json:"version"`
	Schema  string `json:"schema"`
	Subject string `json:"subject"`
	Type    string `json:"schemaType,omitempty"`
}

Metadata contains metadata about a registered schema

type ProtobufDeserializer

type ProtobufDeserializer struct {
	*WrapperDeserializer
}

ProtobufDeserializer is a convenience wrapper for Protobuf with schema registry

func NewProtobufDeserializer

func NewProtobufDeserializer(config ProtobufDeserializerConfig) (*ProtobufDeserializer, error)

NewProtobufDeserializer creates a Protobuf deserializer with schema registry support

type ProtobufDeserializerConfig

type ProtobufDeserializerConfig struct {
	Registry      Registry
	UnmarshalFunc func([]byte, interface{}) error // Protobuf decoding function
}

ProtobufDeserializerConfig holds configuration for Protobuf deserializer

type ProtobufSerializer

type ProtobufSerializer struct {
	*WrapperSerializer
}

ProtobufSerializer is a convenience wrapper for Protobuf with schema registry

func NewProtobufSerializer

func NewProtobufSerializer(config ProtobufSerializerConfig) (*ProtobufSerializer, error)

NewProtobufSerializer creates a Protobuf serializer with schema registry support

type ProtobufSerializerConfig

type ProtobufSerializerConfig struct {
	Registry    Registry
	Subject     string
	Schema      string
	MarshalFunc func(interface{}) ([]byte, error) // Protobuf encoding function
}

ProtobufSerializerConfig holds configuration for Protobuf serializer

type Registry

type Registry interface {
	// GetSchemaByID retrieves a schema by its ID
	GetSchemaByID(id int) (string, error)

	// GetLatestSchema retrieves the latest version of a schema for a subject
	GetLatestSchema(subject string) (*Metadata, error)

	// RegisterSchema registers a new schema for a subject
	RegisterSchema(subject, schema, schemaType string) (int, error)

	// CheckCompatibility checks if a schema is compatible with the latest version
	CheckCompatibility(subject, schema, schemaType string) (bool, error)
}

Registry provides an interface for interacting with a Confluent Schema Registry. It handles schema registration, retrieval, and caching for efficient serialization.

type SchemaRegistryLifecycleParams

type SchemaRegistryLifecycleParams struct {
	fx.In

	Lifecycle fx.Lifecycle
	Client    *Client
}

SchemaRegistryLifecycleParams groups the dependencies needed for Schema Registry lifecycle management

type SchemaRegistryParams

type SchemaRegistryParams struct {
	fx.In

	Config   Config
	Logger   Logger                 `optional:"true"`
	Observer observability.Observer `optional:"true"`
}

SchemaRegistryParams groups the dependencies needed to create a Schema Registry client

type Serializer

type Serializer interface {
	Serialize(data interface{}) ([]byte, error)
}

Serializer is the interface for encoding data

type WrapperDeserializer

type WrapperDeserializer struct {
	// contains filtered or unexported fields
}

WrapperDeserializer wraps any deserializer with schema registry support. It automatically retrieves schemas and strips the Confluent wire format header.

func NewWrapperDeserializer

func NewWrapperDeserializer(config WrapperDeserializerConfig) (*WrapperDeserializer, error)

NewWrapperDeserializer creates a new schema registry-aware deserializer

func (*WrapperDeserializer) Deserialize

func (d *WrapperDeserializer) Deserialize(data []byte, target interface{}) error

Deserialize strips the schema registry header and decodes data

type WrapperDeserializerConfig

type WrapperDeserializerConfig struct {
	Registry          Registry
	InnerDeserializer Deserializer // Optional: for decoding data after schema registry unwrapping
}

WrapperDeserializerConfig holds configuration for schema registry deserializer

type WrapperSerializer

type WrapperSerializer struct {
	// contains filtered or unexported fields
}

WrapperSerializer wraps any serializer with schema registry support. It automatically registers schemas and prepends the Confluent wire format header.

func NewWrapperSerializer

func NewWrapperSerializer(config WrapperSerializerConfig) (*WrapperSerializer, error)

NewWrapperSerializer creates a new schema registry-aware serializer

func (*WrapperSerializer) Serialize

func (s *WrapperSerializer) Serialize(data interface{}) ([]byte, error)

Serialize encodes data and prepends the schema registry header

type WrapperSerializerConfig

type WrapperSerializerConfig struct {
	Registry        Registry
	Subject         string
	SchemaType      string // "AVRO", "PROTOBUF", "JSON"
	Schema          string
	InnerSerializer Serializer // Optional: for encoding data before schema registry wrapping
}

WrapperSerializerConfig holds configuration for schema registry serializer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL