Documentation
¶
Index ¶
- Variables
- type Job
- type JobDetector
- type JobType
- type LineageEdge
- type LineageNode
- type LineageResponse
- type Logger
- type MetricsClient
- type PostgresRepository
- func (r *PostgresRepository) CreateDirectLineage(ctx context.Context, sourceMRN string, targetMRN string) (string, error)
- func (r *PostgresRepository) DeleteDirectLineage(ctx context.Context, edgeID string) error
- func (r *PostgresRepository) EdgeExists(ctx context.Context, source, target string) (bool, error)
- func (r *PostgresRepository) GetAssetLineage(ctx context.Context, assetID string, limit int, direction string) (*LineageResponse, error)
- func (r *PostgresRepository) GetDirectLineage(ctx context.Context, edgeID string) (*LineageEdge, error)
- type Repository
- type Service
- type ServiceOption
Constants ¶
This section is empty.
Variables ¶
View Source
var SupportedJobs = map[string]JobType{ "AIRFLOW": { Name: "Airflow", Service: "airflow", MinVersion: "1.10", Facets: []string{ "airflow_version", "airflow_runArgs", }, DataSources: []string{ "postgresql", "mysql", "snowflake", "athena", "redshift", "sagemaker", "bigquery", "gcs", "greatexpectations", }, }, "SPARK": { Name: "Spark", Service: "spark", MinVersion: "2.4", Facets: []string{ "spark.logicalPlan", "spark.metrics", }, DataSources: []string{ "jdbc", "hdfs", "gcs", "bigquery", "s3", "azure_blob", "azure_datalake", "azure_synapse", }, }, "DBT": { Name: "dbt", Service: "dbt", MinVersion: "0.20", Facets: []string{ "dbt_version", "sql", }, DataSources: []string{ "snowflake", "bigquery", }, }, "GREAT_EXPECTATIONS": { Name: "Great Expectations", Service: "great_expectations", MinVersion: "0.13", Facets: []string{ "dataQualityMetrics", "dataQualityAssertions", }, DataSources: []string{}, }, "ASYNCAPI": { Name: "AsyncAPI", Service: "asyncapi", MinVersion: "2.0.0", Facets: []string{ "asyncapi", }, DataSources: []string{ "kafka", "aws", "rabbitmq", "mqtt", }, }, }
Functions ¶
This section is empty.
Types ¶
type Job ¶
type Job struct {
Namespace string `json:"namespace"`
Name string `json:"name"`
Facets map[string]json.RawMessage `json:"facets,omitempty"`
}
type JobDetector ¶
type JobDetector struct {
// contains filtered or unexported fields
}
func NewJobDetector ¶
func NewJobDetector(facets map[string]json.RawMessage, job *Job) *JobDetector
func (*JobDetector) DetectAsset ¶
func (d *JobDetector) DetectAsset(pipeline *JobType, job *Job) *asset.CreateInput
func (*JobDetector) DetectJob ¶
func (d *JobDetector) DetectJob() (*JobType, error)
func (*JobDetector) ExtractMetadata ¶
func (d *JobDetector) ExtractMetadata() map[string]interface{}
type LineageEdge ¶
type LineageNode ¶
type LineageResponse ¶
type LineageResponse struct {
Nodes []LineageNode `json:"nodes"`
Edges []LineageEdge `json:"edges"`
}
type MetricsClient ¶
type PostgresRepository ¶
type PostgresRepository struct {
// contains filtered or unexported fields
}
func (*PostgresRepository) CreateDirectLineage ¶
func (*PostgresRepository) DeleteDirectLineage ¶
func (r *PostgresRepository) DeleteDirectLineage(ctx context.Context, edgeID string) error
func (*PostgresRepository) EdgeExists ¶
func (*PostgresRepository) GetAssetLineage ¶
func (r *PostgresRepository) GetAssetLineage(ctx context.Context, assetID string, limit int, direction string) (*LineageResponse, error)
func (*PostgresRepository) GetDirectLineage ¶
func (r *PostgresRepository) GetDirectLineage(ctx context.Context, edgeID string) (*LineageEdge, error)
type Repository ¶
type Repository interface {
GetAssetLineage(ctx context.Context, assetID string, limit int, direction string) (*LineageResponse, error)
CreateDirectLineage(ctx context.Context, sourceMRN string, targetMRN string) (string, error)
EdgeExists(ctx context.Context, source, target string) (bool, error)
DeleteDirectLineage(ctx context.Context, edgeID string) error
GetDirectLineage(ctx context.Context, edgeID string) (*LineageEdge, error)
}
func NewPostgresRepository ¶
func NewPostgresRepository(db *pgxpool.Pool) Repository
type Service ¶
type Service interface {
GetAssetLineage(ctx context.Context, assetID string, limit int, direction string) (*LineageResponse, error)
CreateDirectLineage(ctx context.Context, sourceMRN string, targetMRN string) (string, error)
EdgeExists(ctx context.Context, source, target string) (bool, error)
DeleteDirectLineage(ctx context.Context, edgeID string) error
GetDirectLineage(ctx context.Context, edgeID string) (*LineageEdge, error)
}
func NewService ¶
func NewService(repo Repository, assetSvc asset.Service, opts ...ServiceOption) Service
type ServiceOption ¶
type ServiceOption func(*service)
func WithMetrics ¶
func WithMetrics(metrics MetricsClient) ServiceOption
Click to show internal directories.
Click to hide internal directories.