Skip to main content

Documentation Index

Fetch the complete documentation index at: https://openmetadata-feat-feat-2mbfixtestexui.mintlify.app/llms.txt

Use this file to discover all available pages before exploring further.

This page is about running the Ingestion Framework externally!There are mainly 2 ways of running the ingestion:
  1. Internally, by managing the workflows from OpenMetadata.
  2. Externally, by using any other tool capable of running Python code.
If you are looking for how to manage the ingestion process from OpenMetadata, you can follow this doc.

Run the ingestion from your Airflow

OpenMetadata integrates with Airflow to orchestrate ingestion workflows. You can use Airflow to extract metadata and [deploy workflows] (/deployment/ingestion/openmetadata) directly. This guide explains how to run ingestion workflows in Airflow using three different operators:
  1. Python Operator
  2. Docker Operator
  3. Python Virtualenv Operator

Using the Python Operator

Prerequisites

Install the openmetadata-ingestion package in your Airflow environment. This approach works best if you have access to the Airflow host and can manage dependencies.

Installation Command:

pip3 install openmetadata-ingestion[<plugin>]==x.y.z
-Replace <plugin> with the sources to ingest, such as mysql, snowflake, or s3. -Replace x.y.z with the OpenMetadata version matching your server (e.g., 1.6.1).

Example

pip3 install openmetadata-ingestion[mysql,snowflake,s3]==1.6.1

Example DAG

import yaml
from datetime import timedelta
from airflow import DAG

try:
    from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
    from airflow.operators.python_operator import PythonOperator

from metadata.config.common import load_config_file
from metadata.workflow.metadata import MetadataWorkflow


from airflow.utils.dates import days_ago

default_args = {
    "owner": "user_name",
    "email": ["username@org.com"],
    "email_on_failure": False,
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "execution_timeout": timedelta(minutes=60)
}

config = """
<your YAML configuration>
"""

def metadata_ingestion_workflow():
    workflow_config = yaml.safe_load(config)
    workflow = MetadataWorkflow.create(workflow_config)
    workflow.execute()
    workflow.raise_from_status()
    workflow.print_status()
    workflow.stop()

with DAG(
    "sample_data",
    default_args=default_args,
    description="An example DAG which runs a OpenMetadata ingestion workflow",
    start_date=days_ago(1),
    is_paused_upon_creation=False,
    schedule_interval='*/5 * * * *',
    catchup=False,
) as dag:
    ingest_task = PythonOperator(
        task_id="ingest_using_recipe",
        python_callable=metadata_ingestion_workflow,
    )

Key Notes

  • Function Setup: The python_callable argument in the PythonOperator executes the metadata_ingestion_workflow function, which instantiates the workflow and runs the ingestion process.
  • Drawback: This method requires pre-installed dependencies, which may not always be feasible. Consider using the DockerOperator or PythonVirtualenvOperator as alternatives.

Next Steps

Docker & Virtualenv Operators

Run ingestion using the Docker Operator or Python Virtualenv Operator for isolated, dependency-free execution.