catalog

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2026 License: MIT Imports: 15 Imported by: 0

Documentation

Overview

Package catalog provides schema catalog modules (Schema Registry, etc.).

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewCatalogLineageQueryStep

func NewCatalogLineageQueryStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewCatalogLineageQueryStep creates a new step.catalog_lineage_query instance.

func NewCatalogLineageStep

func NewCatalogLineageStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewCatalogLineageStep creates a new step.catalog_lineage instance.

func NewCatalogRegisterStep

func NewCatalogRegisterStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewCatalogRegisterStep creates a new step.catalog_register instance.

func NewCatalogSearchStep

func NewCatalogSearchStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewCatalogSearchStep creates a new step.catalog_search instance.

func NewContractValidateStep

func NewContractValidateStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewContractValidateStep creates a new step.contract_validate instance.

func NewDataHubModule

func NewDataHubModule(name string, config map[string]any) (sdk.ModuleInstance, error)

NewDataHubModule creates a new catalog.datahub module.

func NewOpenMetadataModule

func NewOpenMetadataModule(name string, config map[string]any) (sdk.ModuleInstance, error)

NewOpenMetadataModule creates a new catalog.openmetadata module.

func NewSchemaRegisterStep

func NewSchemaRegisterStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewSchemaRegisterStep creates a new step.schema_register instance.

func NewSchemaRegistryModule

func NewSchemaRegistryModule(name string, config map[string]any) (sdk.ModuleInstance, error)

NewSchemaRegistryModule creates a new catalog.schema_registry module.

func NewSchemaValidateStep

func NewSchemaValidateStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewSchemaValidateStep creates a new step.schema_validate instance.

func RegisterCatalogModule

func RegisterCatalogModule(name string, m any) error

RegisterCatalogModule registers a DataHub or OpenMetadata module.

func RegisterContractDB

func RegisterContractDB(name string, db ContractDB)

RegisterContractDB registers a ContractDB under a name for testing/wiring.

func RegisterSRModule

func RegisterSRModule(name string, m *SchemaRegistryModule) error

RegisterSRModule registers a SchemaRegistryModule under the given name.

func UnregisterCatalogModule

func UnregisterCatalogModule(name string)

UnregisterCatalogModule removes a registered catalog module.

func UnregisterContractDB

func UnregisterContractDB(name string)

UnregisterContractDB removes a ContractDB from the registry.

func UnregisterSRModule

func UnregisterSRModule(name string)

UnregisterSRModule removes a registered SchemaRegistryModule.

Types

type CatalogDataset

type CatalogDataset struct {
	Name     string `json:"name"`
	Platform string `json:"platform"`
	Owner    string `json:"owner"`
}

CatalogDataset is a uniform dataset representation across catalog backends.

type CatalogProvider

type CatalogProvider interface {
	RegisterDataset(ctx context.Context, req RegisterDatasetRequest) error
	SearchDatasets(ctx context.Context, query string, limit int) ([]CatalogDataset, int, error)
}

CatalogProvider is the common interface for catalog backends (DataHub, OpenMetadata).

func LookupCatalogProvider

func LookupCatalogProvider(name string) (CatalogProvider, error)

LookupCatalogProvider returns the CatalogProvider for the given module name.

type ContractDB

type ContractDB interface {
	QueryCount(ctx context.Context, query string) (int, error)
}

ContractDB is an interface for executing SQL assertions during contract validation.

type ContractField

type ContractField struct {
	Name     string `yaml:"name"`
	Type     string `yaml:"type"`
	Nullable bool   `yaml:"nullable"`
	Required bool   `yaml:"required"`
}

ContractField describes a field in the contract schema.

type DataContract

type DataContract struct {
	Dataset string          `yaml:"dataset"`
	Version string          `yaml:"version"`
	Schema  []ContractField `yaml:"schema"`
	Quality []QualityCheck  `yaml:"quality"`
}

DataContract is the YAML schema for a data contract file.

type DataHubClient

type DataHubClient interface {
	GetDataset(ctx context.Context, urn string) (*Dataset, error)
	SearchDatasets(ctx context.Context, query string, start, count int) (*SearchResult, error)
	EmitMetadata(ctx context.Context, proposals []MetadataProposal) error
	AddTag(ctx context.Context, urn, tag string) error
	AddGlossaryTerm(ctx context.Context, urn, term string) error
	SetOwner(ctx context.Context, urn, ownerUrn, ownerType string) error
	SetLineage(ctx context.Context, upstream, downstream string) error
	GetLineage(ctx context.Context, urn, direction string) (*LineageResult, error)
}

DataHubClient is the interface for DataHub GMS REST API operations.

func NewDataHubClient

func NewDataHubClient(endpoint, token string, timeout time.Duration) DataHubClient

NewDataHubClient creates a new DataHub HTTP client.

type DataHubConfig

type DataHubConfig struct {
	Endpoint string        `json:"endpoint" yaml:"endpoint"`
	Token    string        `json:"token"    yaml:"token"`
	Timeout  time.Duration `json:"timeout"  yaml:"timeout"`
}

DataHubConfig holds configuration for the catalog.datahub module.

type DataHubModule

type DataHubModule struct {
	// contains filtered or unexported fields
}

DataHubModule implements the catalog.datahub module.

func (*DataHubModule) CatalogProvider

func (m *DataHubModule) CatalogProvider() CatalogProvider

CatalogProvider returns this module as a CatalogProvider.

func (*DataHubModule) Client

func (m *DataHubModule) Client() DataHubClient

Client returns the underlying DataHubClient.

func (*DataHubModule) Init

func (m *DataHubModule) Init() error

Init validates configuration.

func (*DataHubModule) Start

func (m *DataHubModule) Start(_ context.Context) error

Start creates the client and registers the module.

func (*DataHubModule) Stop

func (m *DataHubModule) Stop(_ context.Context) error

Stop deregisters the module.

type Dataset

type Dataset struct {
	URN        string            `json:"urn"`
	Name       string            `json:"name"`
	Platform   string            `json:"platform"`
	Schema     *DatasetSchema    `json:"schema,omitempty"`
	Properties map[string]string `json:"properties,omitempty"`
	Tags       []string          `json:"tags,omitempty"`
	Owner      string            `json:"owner,omitempty"`
}

Dataset represents a dataset entity in DataHub.

type DatasetRef

type DatasetRef struct {
	Dataset  string
	Platform string
}

DatasetRef identifies a dataset by name and platform.

type DatasetSchema

type DatasetSchema struct {
	Fields []SchemaField `json:"fields"`
}

DatasetSchema holds the schema fields of a dataset.

type EntityRef

type EntityRef struct {
	FQN  string `json:"fullyQualifiedName"`
	Type string `json:"type"`
}

EntityRef is a lineage entity reference (FQN + type).

type LineageEdge

type LineageEdge struct {
	From     string `json:"from"`
	To       string `json:"to"`
	Pipeline string `json:"pipeline"`
}

LineageEdge is a directed edge in the lineage graph.

type LineageNode

type LineageNode struct {
	Dataset  string `json:"dataset"`
	Platform string `json:"platform"`
	Depth    int    `json:"depth"`
}

LineageNode is a node in the lineage graph.

type LineageProvider

type LineageProvider interface {
	RecordLineage(ctx context.Context, upstreams, downstreams []DatasetRef, pipeline string) error
	QueryLineage(ctx context.Context, dataset, direction string, depth int) (*LineageQueryResult, error)
}

LineageProvider is the interface for recording and querying dataset lineage.

func LookupLineageProvider

func LookupLineageProvider(name string) (LineageProvider, error)

LookupLineageProvider returns the LineageProvider for the named catalog module.

type LineageQueryResult

type LineageQueryResult struct {
	Nodes []LineageNode
	Edges []LineageEdge
}

LineageQueryResult holds the result of a lineage query.

type LineageResult

type LineageResult struct {
	URN       string    `json:"urn"`
	Direction string    `json:"direction"`
	Entities  []Dataset `json:"entities"`
}

LineageResult holds lineage information for a dataset.

type MetadataProposal

type MetadataProposal struct {
	EntityURN  string         `json:"entityUrn"`
	AspectName string         `json:"aspectName"`
	Aspect     map[string]any `json:"aspect"`
}

MetadataProposal is a single aspect to emit to DataHub.

type OMColumn

type OMColumn struct {
	Name     string `json:"name"`
	DataType string `json:"dataType"`
}

OMColumn is a single column in an OpenMetadata table.

type OMEntityRef

type OMEntityRef struct {
	Name string `json:"name"`
	Type string `json:"type"`
}

OMEntityRef is a reference to an entity (e.g. owner).

type OMLineageResult

type OMLineageResult struct {
	FQN        string    `json:"fullyQualifiedName"`
	Upstream   []OMTable `json:"upstream,omitempty"`
	Downstream []OMTable `json:"downstream,omitempty"`
}

OMLineageResult holds lineage information from OpenMetadata.

type OMSearchResult

type OMSearchResult struct {
	Tables []OMTable `json:"tables"`
	Total  int       `json:"total"`
}

OMSearchResult is the result of an OpenMetadata search query.

type OMTable

type OMTable struct {
	ID                 string       `json:"id,omitempty"`
	Name               string       `json:"name"`
	FullyQualifiedName string       `json:"fullyQualifiedName,omitempty"`
	Description        string       `json:"description,omitempty"`
	Columns            []OMColumn   `json:"columns,omitempty"`
	Tags               []OMTag      `json:"tags,omitempty"`
	Owner              *OMEntityRef `json:"owner,omitempty"`
}

OMTable represents a table entity in OpenMetadata.

type OMTag

type OMTag struct {
	TagFQN string `json:"tagFQN"`
}

OMTag is a tag attached to an entity in OpenMetadata.

type OpenMetadataClient

type OpenMetadataClient interface {
	GetTable(ctx context.Context, fqn string) (*OMTable, error)
	SearchTables(ctx context.Context, query string, limit int) (*OMSearchResult, error)
	CreateOrUpdateTable(ctx context.Context, table OMTable) error
	AddTag(ctx context.Context, fqn, tag string) error
	SetOwner(ctx context.Context, fqn, owner string) error
	AddLineageEdge(ctx context.Context, from, to EntityRef) error
	GetLineage(ctx context.Context, fqn string) (*OMLineageResult, error)
}

OpenMetadataClient is the interface for OpenMetadata REST API operations.

func NewOpenMetadataClient

func NewOpenMetadataClient(endpoint, token string, timeout time.Duration) OpenMetadataClient

NewOpenMetadataClient creates a new OpenMetadata HTTP client.

type OpenMetadataConfig

type OpenMetadataConfig struct {
	Endpoint string        `json:"endpoint" yaml:"endpoint"`
	Token    string        `json:"token"    yaml:"token"`
	Timeout  time.Duration `json:"timeout"  yaml:"timeout"`
}

OpenMetadataConfig holds configuration for the catalog.openmetadata module.

type OpenMetadataModule

type OpenMetadataModule struct {
	// contains filtered or unexported fields
}

OpenMetadataModule implements the catalog.openmetadata module.

func (*OpenMetadataModule) CatalogProvider

func (m *OpenMetadataModule) CatalogProvider() CatalogProvider

CatalogProvider returns this module as a CatalogProvider.

func (*OpenMetadataModule) Client

Client returns the underlying OpenMetadataClient.

func (*OpenMetadataModule) Init

func (m *OpenMetadataModule) Init() error

Init validates configuration.

func (*OpenMetadataModule) Start

Start creates the client and registers the module.

func (*OpenMetadataModule) Stop

Stop deregisters the module.

type QualityCheck

type QualityCheck struct {
	Name  string `yaml:"name"`
	Query string `yaml:"query"` // must return 0 rows for passing
	Type  string `yaml:"type"`  // not_null, unique, custom
}

QualityCheck is a simple SQL-based quality assertion in the contract.

type RegisterDatasetRequest

type RegisterDatasetRequest struct {
	Dataset    string
	Schema     []SchemaField
	Owner      string
	Tags       []string
	Properties map[string]string
}

RegisterDatasetRequest carries all fields needed to register a dataset.

type SchemaDefinition

type SchemaDefinition struct {
	Schema     string            `json:"schema"               yaml:"schema"`
	SchemaType string            `json:"schemaType,omitempty" yaml:"schemaType,omitempty"`
	References []SchemaReference `json:"references,omitempty" yaml:"references,omitempty"`
}

SchemaDefinition describes a schema to register or validate against.

type SchemaField

type SchemaField struct {
	Name     string `json:"fieldPath" yaml:"name"`
	Type     string `json:"nativeDataType" yaml:"type"`
	Nullable bool   `json:"nullable"       yaml:"nullable"`
}

SchemaField is a single field in a dataset schema.

type SchemaInfo

type SchemaInfo struct {
	Subject    string `json:"subject"              yaml:"subject"`
	Version    int    `json:"version"              yaml:"version"`
	ID         int    `json:"id"                   yaml:"id"`
	Schema     string `json:"schema"               yaml:"schema"`
	SchemaType string `json:"schemaType,omitempty" yaml:"schemaType,omitempty"`
}

SchemaInfo is the full metadata returned for a registered schema.

type SchemaReference

type SchemaReference struct {
	Name    string `json:"name"    yaml:"name"`
	Subject string `json:"subject" yaml:"subject"`
	Version int    `json:"version" yaml:"version"`
}

SchemaReference is a reference to another schema used in a schema definition.

type SchemaRegistryClient

type SchemaRegistryClient interface {
	ListSubjects(ctx context.Context) ([]string, error)
	RegisterSchema(ctx context.Context, subject string, schema SchemaDefinition) (int, error)
	GetSchema(ctx context.Context, id int) (*SchemaDefinition, error)
	GetLatestSchema(ctx context.Context, subject string) (*SchemaInfo, error)
	GetSchemaByVersion(ctx context.Context, subject, version string) (*SchemaInfo, error)
	DeleteSubject(ctx context.Context, subject string, permanent bool) ([]int, error)
	CheckCompatibility(ctx context.Context, subject string, schema SchemaDefinition) (bool, error)
	GetCompatibilityLevel(ctx context.Context, subject string) (string, error)
	SetCompatibilityLevel(ctx context.Context, subject string, level string) error
	ValidateSchema(ctx context.Context, schema SchemaDefinition, data []byte) error
}

SchemaRegistryClient is the interface for Confluent Schema Registry REST API operations.

func NewSchemaRegistryClient

func NewSchemaRegistryClient(endpoint, username, password string, timeout time.Duration) SchemaRegistryClient

NewSchemaRegistryClient creates a Schema Registry HTTP client.

type SchemaRegistryConfig

type SchemaRegistryConfig struct {
	Provider             string        `json:"provider"             yaml:"provider"`
	Endpoint             string        `json:"endpoint"             yaml:"endpoint"`
	Username             string        `json:"username"             yaml:"username"`
	Password             string        `json:"password"             yaml:"password"`
	DefaultCompatibility string        `json:"defaultCompatibility" yaml:"defaultCompatibility"`
	Timeout              time.Duration `json:"timeout"              yaml:"timeout"`
}

SchemaRegistryConfig holds configuration for the catalog.schema_registry module.

type SchemaRegistryModule

type SchemaRegistryModule struct {
	// contains filtered or unexported fields
}

SchemaRegistryModule implements the catalog.schema_registry module.

func LookupSRModule

func LookupSRModule(name string) (*SchemaRegistryModule, error)

LookupSRModule returns the registered SchemaRegistryModule by name.

func (*SchemaRegistryModule) Client

Client returns the underlying SchemaRegistryClient.

func (*SchemaRegistryModule) Config

Config returns the module configuration.

func (*SchemaRegistryModule) Init

func (m *SchemaRegistryModule) Init() error

Init validates the module configuration.

func (*SchemaRegistryModule) Start

func (m *SchemaRegistryModule) Start(ctx context.Context) error

Start verifies connectivity by listing subjects, then registers the module.

func (*SchemaRegistryModule) Stop

Stop deregisters the module.

type SearchResult

type SearchResult struct {
	Entities []Dataset `json:"entities"`
	Total    int       `json:"total"`
	Start    int       `json:"start"`
	Count    int       `json:"count"`
}

SearchResult is the result of a DataHub search query.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL