Documentation
¶
Overview ¶
Package catalog provides schema catalog modules (Schema Registry, etc.).
Index ¶
- func NewCatalogLineageQueryStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewCatalogLineageStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewCatalogRegisterStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewCatalogSearchStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewContractValidateStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewDataHubModule(name string, config map[string]any) (sdk.ModuleInstance, error)
- func NewOpenMetadataModule(name string, config map[string]any) (sdk.ModuleInstance, error)
- func NewSchemaRegisterStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewSchemaRegistryModule(name string, config map[string]any) (sdk.ModuleInstance, error)
- func NewSchemaValidateStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func RegisterCatalogModule(name string, m any) error
- func RegisterContractDB(name string, db ContractDB)
- func RegisterSRModule(name string, m *SchemaRegistryModule) error
- func UnregisterCatalogModule(name string)
- func UnregisterContractDB(name string)
- func UnregisterSRModule(name string)
- type CatalogDataset
- type CatalogProvider
- type ContractDB
- type ContractField
- type DataContract
- type DataHubClient
- type DataHubConfig
- type DataHubModule
- type Dataset
- type DatasetRef
- type DatasetSchema
- type EntityRef
- type LineageEdge
- type LineageNode
- type LineageProvider
- type LineageQueryResult
- type LineageResult
- type MetadataProposal
- type OMColumn
- type OMEntityRef
- type OMLineageResult
- type OMSearchResult
- type OMTable
- type OMTag
- type OpenMetadataClient
- type OpenMetadataConfig
- type OpenMetadataModule
- type QualityCheck
- type RegisterDatasetRequest
- type SchemaDefinition
- type SchemaField
- type SchemaInfo
- type SchemaReference
- type SchemaRegistryClient
- type SchemaRegistryConfig
- type SchemaRegistryModule
- type SearchResult
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewCatalogLineageQueryStep ¶
NewCatalogLineageQueryStep creates a new step.catalog_lineage_query instance.
func NewCatalogLineageStep ¶
NewCatalogLineageStep creates a new step.catalog_lineage instance.
func NewCatalogRegisterStep ¶
NewCatalogRegisterStep creates a new step.catalog_register instance.
func NewCatalogSearchStep ¶
NewCatalogSearchStep creates a new step.catalog_search instance.
func NewContractValidateStep ¶
NewContractValidateStep creates a new step.contract_validate instance.
func NewDataHubModule ¶
NewDataHubModule creates a new catalog.datahub module.
func NewOpenMetadataModule ¶
NewOpenMetadataModule creates a new catalog.openmetadata module.
func NewSchemaRegisterStep ¶
NewSchemaRegisterStep creates a new step.schema_register instance.
func NewSchemaRegistryModule ¶
NewSchemaRegistryModule creates a new catalog.schema_registry module.
func NewSchemaValidateStep ¶
NewSchemaValidateStep creates a new step.schema_validate instance.
func RegisterCatalogModule ¶
RegisterCatalogModule registers a DataHub or OpenMetadata module.
func RegisterContractDB ¶
func RegisterContractDB(name string, db ContractDB)
RegisterContractDB registers a ContractDB under a name for testing/wiring.
func RegisterSRModule ¶
func RegisterSRModule(name string, m *SchemaRegistryModule) error
RegisterSRModule registers a SchemaRegistryModule under the given name.
func UnregisterCatalogModule ¶
func UnregisterCatalogModule(name string)
UnregisterCatalogModule removes a registered catalog module.
func UnregisterContractDB ¶
func UnregisterContractDB(name string)
UnregisterContractDB removes a ContractDB from the registry.
func UnregisterSRModule ¶
func UnregisterSRModule(name string)
UnregisterSRModule removes a registered SchemaRegistryModule.
Types ¶
type CatalogDataset ¶
type CatalogDataset struct {
Name string `json:"name"`
Platform string `json:"platform"`
Owner string `json:"owner"`
}
CatalogDataset is a uniform dataset representation across catalog backends.
type CatalogProvider ¶
type CatalogProvider interface {
RegisterDataset(ctx context.Context, req RegisterDatasetRequest) error
SearchDatasets(ctx context.Context, query string, limit int) ([]CatalogDataset, int, error)
}
CatalogProvider is the common interface for catalog backends (DataHub, OpenMetadata).
func LookupCatalogProvider ¶
func LookupCatalogProvider(name string) (CatalogProvider, error)
LookupCatalogProvider returns the CatalogProvider for the given module name.
type ContractDB ¶
ContractDB is an interface for executing SQL assertions during contract validation.
type ContractField ¶
type ContractField struct {
Name string `yaml:"name"`
Type string `yaml:"type"`
Nullable bool `yaml:"nullable"`
Required bool `yaml:"required"`
}
ContractField describes a field in the contract schema.
type DataContract ¶
type DataContract struct {
Dataset string `yaml:"dataset"`
Version string `yaml:"version"`
Schema []ContractField `yaml:"schema"`
Quality []QualityCheck `yaml:"quality"`
}
DataContract is the YAML schema for a data contract file.
type DataHubClient ¶
type DataHubClient interface {
GetDataset(ctx context.Context, urn string) (*Dataset, error)
SearchDatasets(ctx context.Context, query string, start, count int) (*SearchResult, error)
EmitMetadata(ctx context.Context, proposals []MetadataProposal) error
AddTag(ctx context.Context, urn, tag string) error
AddGlossaryTerm(ctx context.Context, urn, term string) error
SetOwner(ctx context.Context, urn, ownerUrn, ownerType string) error
SetLineage(ctx context.Context, upstream, downstream string) error
GetLineage(ctx context.Context, urn, direction string) (*LineageResult, error)
}
DataHubClient is the interface for DataHub GMS REST API operations.
func NewDataHubClient ¶
func NewDataHubClient(endpoint, token string, timeout time.Duration) DataHubClient
NewDataHubClient creates a new DataHub HTTP client.
type DataHubConfig ¶
type DataHubConfig struct {
Endpoint string `json:"endpoint" yaml:"endpoint"`
Token string `json:"token" yaml:"token"`
Timeout time.Duration `json:"timeout" yaml:"timeout"`
}
DataHubConfig holds configuration for the catalog.datahub module.
type DataHubModule ¶
type DataHubModule struct {
// contains filtered or unexported fields
}
DataHubModule implements the catalog.datahub module.
func (*DataHubModule) CatalogProvider ¶
func (m *DataHubModule) CatalogProvider() CatalogProvider
CatalogProvider returns this module as a CatalogProvider.
func (*DataHubModule) Client ¶
func (m *DataHubModule) Client() DataHubClient
Client returns the underlying DataHubClient.
type Dataset ¶
type Dataset struct {
URN string `json:"urn"`
Name string `json:"name"`
Platform string `json:"platform"`
Schema *DatasetSchema `json:"schema,omitempty"`
Properties map[string]string `json:"properties,omitempty"`
Tags []string `json:"tags,omitempty"`
Owner string `json:"owner,omitempty"`
}
Dataset represents a dataset entity in DataHub.
type DatasetRef ¶
DatasetRef identifies a dataset by name and platform.
type DatasetSchema ¶
type DatasetSchema struct {
Fields []SchemaField `json:"fields"`
}
DatasetSchema holds the schema fields of a dataset.
type LineageEdge ¶
type LineageEdge struct {
From string `json:"from"`
To string `json:"to"`
Pipeline string `json:"pipeline"`
}
LineageEdge is a directed edge in the lineage graph.
type LineageNode ¶
type LineageNode struct {
Dataset string `json:"dataset"`
Platform string `json:"platform"`
Depth int `json:"depth"`
}
LineageNode is a node in the lineage graph.
type LineageProvider ¶
type LineageProvider interface {
RecordLineage(ctx context.Context, upstreams, downstreams []DatasetRef, pipeline string) error
QueryLineage(ctx context.Context, dataset, direction string, depth int) (*LineageQueryResult, error)
}
LineageProvider is the interface for recording and querying dataset lineage.
func LookupLineageProvider ¶
func LookupLineageProvider(name string) (LineageProvider, error)
LookupLineageProvider returns the LineageProvider for the named catalog module.
type LineageQueryResult ¶
type LineageQueryResult struct {
Nodes []LineageNode
Edges []LineageEdge
}
LineageQueryResult holds the result of a lineage query.
type LineageResult ¶
type LineageResult struct {
URN string `json:"urn"`
Direction string `json:"direction"`
Entities []Dataset `json:"entities"`
}
LineageResult holds lineage information for a dataset.
type MetadataProposal ¶
type MetadataProposal struct {
EntityURN string `json:"entityUrn"`
AspectName string `json:"aspectName"`
Aspect map[string]any `json:"aspect"`
}
MetadataProposal is a single aspect to emit to DataHub.
type OMEntityRef ¶
OMEntityRef is a reference to an entity (e.g. owner).
type OMLineageResult ¶
type OMLineageResult struct {
FQN string `json:"fullyQualifiedName"`
Upstream []OMTable `json:"upstream,omitempty"`
Downstream []OMTable `json:"downstream,omitempty"`
}
OMLineageResult holds lineage information from OpenMetadata.
type OMSearchResult ¶
OMSearchResult is the result of an OpenMetadata search query.
type OMTable ¶
type OMTable struct {
ID string `json:"id,omitempty"`
Name string `json:"name"`
FullyQualifiedName string `json:"fullyQualifiedName,omitempty"`
Description string `json:"description,omitempty"`
Columns []OMColumn `json:"columns,omitempty"`
Tags []OMTag `json:"tags,omitempty"`
Owner *OMEntityRef `json:"owner,omitempty"`
}
OMTable represents a table entity in OpenMetadata.
type OMTag ¶
type OMTag struct {
TagFQN string `json:"tagFQN"`
}
OMTag is a tag attached to an entity in OpenMetadata.
type OpenMetadataClient ¶
type OpenMetadataClient interface {
GetTable(ctx context.Context, fqn string) (*OMTable, error)
SearchTables(ctx context.Context, query string, limit int) (*OMSearchResult, error)
CreateOrUpdateTable(ctx context.Context, table OMTable) error
AddTag(ctx context.Context, fqn, tag string) error
SetOwner(ctx context.Context, fqn, owner string) error
AddLineageEdge(ctx context.Context, from, to EntityRef) error
GetLineage(ctx context.Context, fqn string) (*OMLineageResult, error)
}
OpenMetadataClient is the interface for OpenMetadata REST API operations.
func NewOpenMetadataClient ¶
func NewOpenMetadataClient(endpoint, token string, timeout time.Duration) OpenMetadataClient
NewOpenMetadataClient creates a new OpenMetadata HTTP client.
type OpenMetadataConfig ¶
type OpenMetadataConfig struct {
Endpoint string `json:"endpoint" yaml:"endpoint"`
Token string `json:"token" yaml:"token"`
Timeout time.Duration `json:"timeout" yaml:"timeout"`
}
OpenMetadataConfig holds configuration for the catalog.openmetadata module.
type OpenMetadataModule ¶
type OpenMetadataModule struct {
// contains filtered or unexported fields
}
OpenMetadataModule implements the catalog.openmetadata module.
func (*OpenMetadataModule) CatalogProvider ¶
func (m *OpenMetadataModule) CatalogProvider() CatalogProvider
CatalogProvider returns this module as a CatalogProvider.
func (*OpenMetadataModule) Client ¶
func (m *OpenMetadataModule) Client() OpenMetadataClient
Client returns the underlying OpenMetadataClient.
func (*OpenMetadataModule) Init ¶
func (m *OpenMetadataModule) Init() error
Init validates configuration.
type QualityCheck ¶
type QualityCheck struct {
Name string `yaml:"name"`
Query string `yaml:"query"` // must return 0 rows for passing
Type string `yaml:"type"` // not_null, unique, custom
}
QualityCheck is a simple SQL-based quality assertion in the contract.
type RegisterDatasetRequest ¶
type RegisterDatasetRequest struct {
Dataset string
Schema []SchemaField
Owner string
Tags []string
Properties map[string]string
}
RegisterDatasetRequest carries all fields needed to register a dataset.
type SchemaDefinition ¶
type SchemaDefinition struct {
Schema string `json:"schema" yaml:"schema"`
SchemaType string `json:"schemaType,omitempty" yaml:"schemaType,omitempty"`
References []SchemaReference `json:"references,omitempty" yaml:"references,omitempty"`
}
SchemaDefinition describes a schema to register or validate against.
type SchemaField ¶
type SchemaField struct {
Name string `json:"fieldPath" yaml:"name"`
Type string `json:"nativeDataType" yaml:"type"`
Nullable bool `json:"nullable" yaml:"nullable"`
}
SchemaField is a single field in a dataset schema.
type SchemaInfo ¶
type SchemaInfo struct {
Subject string `json:"subject" yaml:"subject"`
Version int `json:"version" yaml:"version"`
ID int `json:"id" yaml:"id"`
Schema string `json:"schema" yaml:"schema"`
SchemaType string `json:"schemaType,omitempty" yaml:"schemaType,omitempty"`
}
SchemaInfo is the full metadata returned for a registered schema.
type SchemaReference ¶
type SchemaReference struct {
Name string `json:"name" yaml:"name"`
Subject string `json:"subject" yaml:"subject"`
Version int `json:"version" yaml:"version"`
}
SchemaReference is a reference to another schema used in a schema definition.
type SchemaRegistryClient ¶
type SchemaRegistryClient interface {
ListSubjects(ctx context.Context) ([]string, error)
RegisterSchema(ctx context.Context, subject string, schema SchemaDefinition) (int, error)
GetSchema(ctx context.Context, id int) (*SchemaDefinition, error)
GetLatestSchema(ctx context.Context, subject string) (*SchemaInfo, error)
GetSchemaByVersion(ctx context.Context, subject, version string) (*SchemaInfo, error)
DeleteSubject(ctx context.Context, subject string, permanent bool) ([]int, error)
CheckCompatibility(ctx context.Context, subject string, schema SchemaDefinition) (bool, error)
GetCompatibilityLevel(ctx context.Context, subject string) (string, error)
SetCompatibilityLevel(ctx context.Context, subject string, level string) error
ValidateSchema(ctx context.Context, schema SchemaDefinition, data []byte) error
}
SchemaRegistryClient is the interface for Confluent Schema Registry REST API operations.
func NewSchemaRegistryClient ¶
func NewSchemaRegistryClient(endpoint, username, password string, timeout time.Duration) SchemaRegistryClient
NewSchemaRegistryClient creates a Schema Registry HTTP client.
type SchemaRegistryConfig ¶
type SchemaRegistryConfig struct {
Provider string `json:"provider" yaml:"provider"`
Endpoint string `json:"endpoint" yaml:"endpoint"`
Username string `json:"username" yaml:"username"`
Password string `json:"password" yaml:"password"`
DefaultCompatibility string `json:"defaultCompatibility" yaml:"defaultCompatibility"`
Timeout time.Duration `json:"timeout" yaml:"timeout"`
}
SchemaRegistryConfig holds configuration for the catalog.schema_registry module.
type SchemaRegistryModule ¶
type SchemaRegistryModule struct {
// contains filtered or unexported fields
}
SchemaRegistryModule implements the catalog.schema_registry module.
func LookupSRModule ¶
func LookupSRModule(name string) (*SchemaRegistryModule, error)
LookupSRModule returns the registered SchemaRegistryModule by name.
func (*SchemaRegistryModule) Client ¶
func (m *SchemaRegistryModule) Client() SchemaRegistryClient
Client returns the underlying SchemaRegistryClient.
func (*SchemaRegistryModule) Config ¶
func (m *SchemaRegistryModule) Config() SchemaRegistryConfig
Config returns the module configuration.
func (*SchemaRegistryModule) Init ¶
func (m *SchemaRegistryModule) Init() error
Init validates the module configuration.