airflow

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 4, 2026 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

+marmot:name=Airflow +marmot:description=Ingests metadata from Apache Airflow including DAGs, tasks, and dataset lineage. +marmot:status=experimental +marmot:features=Assets, Lineage, Run History

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type APIError

type APIError struct {
	Detail string `json:"detail"`
	Status int    `json:"status"`
	Title  string `json:"title"`
	Type   string `json:"type"`
}

APIError represents an error response from the Airflow API

type AirflowDAGFields

type AirflowDAGFields struct {
	DagID            string  `json:"dag_id" metadata:"dag_id" description:"Unique DAG identifier"`
	Description      string  `json:"description" metadata:"description" description:"DAG description"`
	FilePath         string  `json:"file_path" metadata:"file_path" description:"Path to DAG definition file"`
	ScheduleInterval string  `json:"schedule_interval" metadata:"schedule_interval" description:"DAG schedule (cron expression or preset)"`
	IsPaused         bool    `json:"is_paused" metadata:"is_paused" description:"Whether DAG is paused"`
	IsActive         bool    `json:"is_active" metadata:"is_active" description:"Whether DAG is active"`
	Owners           string  `json:"owners" metadata:"owners" description:"DAG owners (comma-separated)"`
	LastRunState     string  `json:"last_run_state" metadata:"last_run_state" description:"State of the last DAG run (success, failed, running)"`
	LastRunID        string  `json:"last_run_id" metadata:"last_run_id" description:"ID of the last DAG run"`
	LastRunDate      string  `json:"last_run_date" metadata:"last_run_date" description:"Execution date of the last DAG run"`
	NextRunDate      string  `json:"next_run_date" metadata:"next_run_date" description:"Next scheduled run date"`
	LastParsedTime   string  `json:"last_parsed_time" metadata:"last_parsed_time" description:"Last time the DAG file was parsed"`
	SuccessRate      float64 `json:"success_rate" metadata:"success_rate" description:"Success rate percentage over the lookback period"`
	RunCount         int     `json:"run_count" metadata:"run_count" description:"Number of runs in the lookback period"`
}

AirflowDAGFields represents Airflow DAG-specific metadata fields +marmot:metadata

type AirflowDAGRunFields

type AirflowDAGRunFields struct {
	DagRunID      string `json:"dag_run_id" metadata:"dag_run_id" description:"Unique identifier for the DAG run"`
	State         string `json:"state" metadata:"state" description:"Run state (queued, running, success, failed)"`
	ExecutionDate string `json:"execution_date" metadata:"execution_date" description:"Logical execution date"`
	StartDate     string `json:"start_date" metadata:"start_date" description:"Actual start time of the run"`
	EndDate       string `json:"end_date" metadata:"end_date" description:"End time of the run"`
	RunType       string `json:"run_type" metadata:"run_type" description:"Type of run (scheduled, manual, backfill)"`
}

AirflowDAGRunFields represents Airflow DAG run-specific metadata fields +marmot:metadata

type AirflowDatasetFields

type AirflowDatasetFields struct {
	URI           string `json:"uri" metadata:"uri" description:"Dataset URI identifier"`
	CreatedAt     string `json:"created_at" metadata:"created_at" description:"Dataset creation timestamp"`
	UpdatedAt     string `json:"updated_at" metadata:"updated_at" description:"Dataset last update timestamp"`
	ProducerCount int    `json:"producer_count" metadata:"producer_count" description:"Number of tasks that produce this dataset"`
	ConsumerCount int    `json:"consumer_count" metadata:"consumer_count" description:"Number of DAGs that consume this dataset"`
}

AirflowDatasetFields represents Airflow Dataset-specific metadata fields +marmot:metadata

type AirflowTaskFields

type AirflowTaskFields struct {
	TaskID          string   `json:"task_id" metadata:"task_id" description:"Task identifier within the DAG"`
	DagID           string   `json:"dag_id" metadata:"dag_id" description:"Parent DAG ID"`
	OperatorName    string   `json:"operator_name" metadata:"operator_name" description:"Airflow operator class name (e.g., BashOperator, PythonOperator)"`
	TriggerRule     string   `json:"trigger_rule" metadata:"trigger_rule" description:"Task trigger rule (e.g., all_success, one_success)"`
	Retries         int      `json:"retries" metadata:"retries" description:"Number of retries configured for the task"`
	Pool            string   `json:"pool" metadata:"pool" description:"Execution pool for the task"`
	DownstreamTasks []string `json:"downstream_tasks" metadata:"downstream_tasks" description:"List of downstream task IDs"`
}

AirflowTaskFields represents Airflow task-specific metadata fields +marmot:metadata

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is an Airflow REST API client

func NewClient

func NewClient(config ClientConfig) *Client

NewClient creates a new Airflow API client

func (*Client) GetDAG

func (c *Client) GetDAG(ctx context.Context, dagID string) (*DAG, error)

GetDAG returns a specific DAG by ID

func (*Client) GetDatasetEvents

func (c *Client) GetDatasetEvents(ctx context.Context, datasetURI string) ([]DatasetEvent, error)

GetDatasetEvents returns events for a specific dataset

func (*Client) ListDAGRuns

func (c *Client) ListDAGRuns(ctx context.Context, dagID string, days int) ([]DAGRun, error)

ListDAGRuns returns DAG runs for a specific DAG within the specified number of days

func (*Client) ListDAGs

func (c *Client) ListDAGs(ctx context.Context, onlyActive bool) ([]DAG, error)

ListDAGs returns all DAGs from Airflow

func (*Client) ListDatasets

func (c *Client) ListDatasets(ctx context.Context) ([]Dataset, error)

ListDatasets returns all datasets from Airflow (requires Airflow 2.4+)

func (*Client) ListTasks

func (c *Client) ListTasks(ctx context.Context, dagID string) ([]Task, error)

ListTasks returns all tasks for a specific DAG

type ClientConfig

type ClientConfig struct {
	BaseURL  string
	Username string
	Password string
	APIToken string
	Timeout  time.Duration
}

ClientConfig holds configuration for the Airflow API client

type Config

type Config struct {
	plugin.BaseConfig `json:",inline"`

	Host     string `json:"host" description:"Airflow webserver URL (e.g., http://localhost:8080)" validate:"required,url"`
	Username string `json:"username,omitempty" description:"Username for basic authentication"`
	Password string `json:"password,omitempty" description:"Password for basic authentication" sensitive:"true"`
	APIToken string `json:"api_token,omitempty" description:"API token for authentication (alternative to basic auth)" sensitive:"true"`

	DiscoverDAGs     bool `json:"discover_dags" description:"Discover Airflow DAGs as Pipeline assets" default:"true"`
	DiscoverTasks    bool `json:"discover_tasks" description:"Discover tasks within DAGs" default:"true"`
	DiscoverDatasets bool `json:"discover_datasets" description:"Discover Airflow Datasets for lineage (requires Airflow 2.4+)" default:"true"`

	IncludeRunHistory bool `json:"include_run_history" description:"Include DAG run history in metadata" default:"true"`
	RunHistoryDays    int  `json:"run_history_days" description:"Number of days of run history to fetch" default:"7"`

	DAGFilter  *plugin.Filter `json:"dag_filter,omitempty" description:"Filter DAGs by ID pattern (include/exclude regex)"`
	OnlyActive bool           `json:"only_active" description:"Only discover active (unpaused) DAGs" default:"true"`
}

Config for Airflow plugin +marmot:config

type DAG

type DAG struct {
	DagID            string            `json:"dag_id"`
	DagDisplayName   string            `json:"dag_display_name,omitempty"`
	Description      *string           `json:"description"`
	FileToken        string            `json:"file_token"`
	Fileloc          string            `json:"fileloc"`
	IsPaused         bool              `json:"is_paused"`
	IsActive         bool              `json:"is_active"`
	IsSubdag         bool              `json:"is_subdag"`
	LastParsedTime   *string           `json:"last_parsed_time"`
	LastPickled      *string           `json:"last_pickled"`
	LastExpiredTime  *string           `json:"last_expired"`
	SchedulerLock    *string           `json:"scheduler_lock"`
	PickleID         *string           `json:"pickle_id"`
	DefaultView      string            `json:"default_view"`
	Owners           []string          `json:"owners"`
	Tags             []Tag             `json:"tags"`
	ScheduleInterval *ScheduleInterval `json:"schedule_interval"`
	TimetableDesc    *string           `json:"timetable_description"`
	NextDagRun       *string           `json:"next_dagrun"`
	NextDagRunTime   *string           `json:"next_dagrun_data_interval_start"`
	MaxActiveRuns    int               `json:"max_active_runs"`
	MaxActiveTasks   int               `json:"max_active_tasks"`
	HasTaskConcur    bool              `json:"has_task_concurrency_limits"`
	HasImportErrors  bool              `json:"has_import_errors"`
}

DAG represents an Airflow DAG from the REST API

type DAGCollection

type DAGCollection struct {
	DAGs       []DAG `json:"dags"`
	TotalCount int   `json:"total_entries"`
}

DAGCollection represents the API response for listing DAGs

type DAGRun

type DAGRun struct {
	DagRunID          string                 `json:"dag_run_id"`
	DagID             string                 `json:"dag_id"`
	LogicalDate       string                 `json:"logical_date"`
	ExecutionDate     string                 `json:"execution_date"`
	StartDate         *string                `json:"start_date"`
	EndDate           *string                `json:"end_date"`
	DataIntervalStart *string                `json:"data_interval_start"`
	DataIntervalEnd   *string                `json:"data_interval_end"`
	LastSchedulingDec *string                `json:"last_scheduling_decision"`
	RunType           string                 `json:"run_type"`
	State             string                 `json:"state"`
	ExternalTrigger   bool                   `json:"external_trigger"`
	Conf              map[string]interface{} `json:"conf"`
	Note              *string                `json:"note"`
}

DAGRun represents an Airflow DAG run from the REST API

type DAGRunCollection

type DAGRunCollection struct {
	DagRuns    []DAGRun `json:"dag_runs"`
	TotalCount int      `json:"total_entries"`
}

DAGRunCollection represents the API response for listing DAG runs

type DagRef

type DagRef struct {
	DagID string `json:"dag_id"`
}

DagRef represents a reference to a DAG

type Dataset

type Dataset struct {
	ID             int                    `json:"id"`
	URI            string                 `json:"uri"`
	Extra          map[string]interface{} `json:"extra"`
	CreatedAt      string                 `json:"created_at"`
	UpdatedAt      string                 `json:"updated_at"`
	ConsumingDags  []DagRef               `json:"consuming_dags"`
	ProducingTasks []TaskRef              `json:"producing_tasks"`
}

Dataset represents an Airflow Dataset from the REST API (Airflow 2.4+)

type DatasetCollection

type DatasetCollection struct {
	Datasets   []Dataset `json:"datasets"`
	TotalCount int       `json:"total_entries"`
}

DatasetCollection represents the API response for listing datasets

type DatasetEvent

type DatasetEvent struct {
	ID             int                    `json:"id"`
	DatasetID      int                    `json:"dataset_id"`
	DatasetURI     string                 `json:"dataset_uri"`
	SourceDagID    *string                `json:"source_dag_id"`
	SourceTaskID   *string                `json:"source_task_id"`
	SourceRunID    *string                `json:"source_run_id"`
	SourceMapIndex int                    `json:"source_map_index"`
	CreatedDagruns []DagRef               `json:"created_dagruns"`
	Timestamp      string                 `json:"timestamp"`
	Extra          map[string]interface{} `json:"extra"`
}

DatasetEvent represents a dataset event from the REST API

type DatasetEventCollection

type DatasetEventCollection struct {
	DatasetEvents []DatasetEvent `json:"dataset_events"`
	TotalCount    int            `json:"total_entries"`
}

DatasetEventCollection represents the API response for listing dataset events

type ExtraLink struct {
	ClassRef string `json:"class_ref"`
}

ExtraLink represents an extra link on a task

type RetryDelay

type RetryDelay struct {
	Type    string `json:"__type"`
	Days    int    `json:"days"`
	Seconds int    `json:"seconds"`
	Micros  int    `json:"microseconds"`
}

RetryDelay represents a task's retry delay configuration

type ScheduleInterval

type ScheduleInterval struct {
	Type  string `json:"__type"`
	Value string `json:"value"`
}

ScheduleInterval represents a DAG's schedule interval

type Source

type Source struct {
	// contains filtered or unexported fields
}

Source implements the Airflow plugin.

func (*Source) Discover

func (s *Source) Discover(ctx context.Context, rawConfig plugin.RawPluginConfig) (*plugin.DiscoveryResult, error)

Discover discovers Airflow DAGs, tasks, and datasets.

func (*Source) Validate

func (s *Source) Validate(rawConfig plugin.RawPluginConfig) (plugin.RawPluginConfig, error)

Validate validates and normalizes the plugin configuration.

type SubDAG

type SubDAG struct {
	DagID string `json:"dag_id"`
}

SubDAG represents a sub-DAG reference

type Tag

type Tag struct {
	Name string `json:"name"`
}

Tag represents a DAG tag

type Task

type Task struct {
	TaskID              string                 `json:"task_id"`
	TaskDisplayName     string                 `json:"task_display_name,omitempty"`
	OperatorName        string                 `json:"operator_name"`
	ClassName           string                 `json:"class_ref,omitempty"`
	Pool                string                 `json:"pool"`
	PoolSlots           int                    `json:"pool_slots"`
	ExecutionTimeout    *string                `json:"execution_timeout"`
	TriggerRule         string                 `json:"trigger_rule"`
	Retries             int                    `json:"retries"`
	RetryDelay          *RetryDelay            `json:"retry_delay"`
	RetryExponentialBac bool                   `json:"retry_exponential_backoff"`
	PriorityWeight      int                    `json:"priority_weight"`
	Weight              string                 `json:"weight_rule"`
	Queue               string                 `json:"queue"`
	DownstreamTaskIDs   []string               `json:"downstream_task_ids"`
	UpstreamTaskIDs     []string               `json:"upstream_task_ids"`
	DependsOnPast       bool                   `json:"depends_on_past"`
	WaitForDownstream   bool                   `json:"wait_for_downstream"`
	StartDate           *string                `json:"start_date"`
	EndDate             *string                `json:"end_date"`
	UIColor             string                 `json:"ui_color"`
	UIFgcolor           string                 `json:"ui_fgcolor"`
	TemplateFields      []string               `json:"template_fields"`
	ExtraLinks          []ExtraLink            `json:"extra_links"`
	SubDag              *SubDAG                `json:"sub_dag,omitempty"`
	Params              map[string]interface{} `json:"params,omitempty"`
}

Task represents an Airflow task from the REST API

type TaskCollection

type TaskCollection struct {
	Tasks      []Task `json:"tasks"`
	TotalCount int    `json:"total_entries"`
}

TaskCollection represents the API response for listing tasks

type TaskRef

type TaskRef struct {
	DagID  string `json:"dag_id"`
	TaskID string `json:"task_id"`
}

TaskRef represents a reference to a task

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL