lineage

package
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 15, 2025 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var SupportedJobs = map[string]JobType{
	"AIRFLOW": {
		Name:       "Airflow",
		Service:    "airflow",
		MinVersion: "1.10",
		Facets: []string{
			"airflow_version",
			"airflow_runArgs",
		},
		DataSources: []string{
			"postgresql", "mysql", "snowflake", "athena", "redshift",
			"sagemaker", "bigquery", "gcs", "greatexpectations",
		},
	},
	"SPARK": {
		Name:       "Spark",
		Service:    "spark",
		MinVersion: "2.4",
		Facets: []string{
			"spark.logicalPlan",
			"spark.metrics",
		},
		DataSources: []string{
			"jdbc", "hdfs", "gcs", "bigquery", "s3",
			"azure_blob", "azure_datalake", "azure_synapse",
		},
	},
	"DBT": {
		Name:       "dbt",
		Service:    "dbt",
		MinVersion: "0.20",
		Facets: []string{
			"dbt_version",
			"sql",
		},
		DataSources: []string{
			"snowflake", "bigquery",
		},
	},
	"GREAT_EXPECTATIONS": {
		Name:       "Great Expectations",
		Service:    "great_expectations",
		MinVersion: "0.13",
		Facets: []string{
			"dataQualityMetrics",
			"dataQualityAssertions",
		},
		DataSources: []string{},
	},
	"ASYNCAPI": {
		Name:       "AsyncAPI",
		Service:    "asyncapi",
		MinVersion: "2.0.0",
		Facets: []string{
			"asyncapi",
		},
		DataSources: []string{
			"kafka", "aws", "rabbitmq", "mqtt",
		},
	},
}

Functions

This section is empty.

Types

type Job

type Job struct {
	Namespace string                     `json:"namespace"`
	Name      string                     `json:"name"`
	Facets    map[string]json.RawMessage `json:"facets,omitempty"`
}

type JobDetector

type JobDetector struct {
	// contains filtered or unexported fields
}

func NewJobDetector

func NewJobDetector(facets map[string]json.RawMessage, job *Job) *JobDetector

func (*JobDetector) DetectAsset

func (d *JobDetector) DetectAsset(pipeline *JobType, job *Job) *asset.CreateInput

func (*JobDetector) DetectJob

func (d *JobDetector) DetectJob() (*JobType, error)

func (*JobDetector) ExtractMetadata

func (d *JobDetector) ExtractMetadata() map[string]interface{}

type JobType

type JobType struct {
	Name        string
	Service     string
	MinVersion  string
	Facets      []string
	DataSources []string
}

type LineageEdge

type LineageEdge struct {
	ID     string `json:"id"`
	Source string `json:"source"`
	Target string `json:"target"`
	Type   string `json:"type"`
	JobMRN string `json:"job_mrn,omitempty"`
}

type LineageNode

type LineageNode struct {
	ID    string       `json:"id"`
	Type  string       `json:"type"`
	Asset *asset.Asset `json:"asset"`
	Depth int          `json:"depth"`
}

type LineageResponse

type LineageResponse struct {
	Nodes []LineageNode `json:"nodes"`
	Edges []LineageEdge `json:"edges"`
}

type Logger

type Logger interface {
	Info(msg string, fields ...interface{})
	Error(msg string, err error, fields ...interface{})
}

type MetricsClient

type MetricsClient interface {
	Count(name string, value int64, tags ...string)
	Timing(name string, value time.Duration, tags ...string)
}

type PostgresRepository

type PostgresRepository struct {
	// contains filtered or unexported fields
}

func (*PostgresRepository) CreateDirectLineage

func (r *PostgresRepository) CreateDirectLineage(ctx context.Context, sourceMRN string, targetMRN string) (string, error)

func (*PostgresRepository) DeleteDirectLineage

func (r *PostgresRepository) DeleteDirectLineage(ctx context.Context, edgeID string) error

func (*PostgresRepository) EdgeExists

func (r *PostgresRepository) EdgeExists(ctx context.Context, source, target string) (bool, error)

func (*PostgresRepository) GetAssetLineage

func (r *PostgresRepository) GetAssetLineage(ctx context.Context, assetID string, limit int, direction string) (*LineageResponse, error)

func (*PostgresRepository) GetDirectLineage

func (r *PostgresRepository) GetDirectLineage(ctx context.Context, edgeID string) (*LineageEdge, error)

type Repository

type Repository interface {
	GetAssetLineage(ctx context.Context, assetID string, limit int, direction string) (*LineageResponse, error)
	CreateDirectLineage(ctx context.Context, sourceMRN string, targetMRN string) (string, error)
	EdgeExists(ctx context.Context, source, target string) (bool, error)
	DeleteDirectLineage(ctx context.Context, edgeID string) error
	GetDirectLineage(ctx context.Context, edgeID string) (*LineageEdge, error)
}

func NewPostgresRepository

func NewPostgresRepository(db *pgxpool.Pool) Repository

type Service

type Service interface {
	GetAssetLineage(ctx context.Context, assetID string, limit int, direction string) (*LineageResponse, error)
	CreateDirectLineage(ctx context.Context, sourceMRN string, targetMRN string) (string, error)
	EdgeExists(ctx context.Context, source, target string) (bool, error)
	DeleteDirectLineage(ctx context.Context, edgeID string) error
	GetDirectLineage(ctx context.Context, edgeID string) (*LineageEdge, error)
}

func NewService

func NewService(repo Repository, assetSvc asset.Service, opts ...ServiceOption) Service

type ServiceOption

type ServiceOption func(*service)

func WithMetrics

func WithMetrics(metrics MetricsClient) ServiceOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL