Skip to main content

Documentation Index

Fetch the complete documentation index at: https://openmetadata-feat-feat-2mbfixtestexui.mintlify.app/llms.txt

Use this file to discover all available pages before exploring further.

External Ingestion Workflow Examples

This page contains code examples for running each workflow type externally. For framework setup, configuration, and JWT token handling, see the External Ingestion Overview. Let’s jump now into some examples on how you could create the function to run the different workflows. Note that this code can then be executed inside a DAG, a GitHub action, or a vanilla Python script. It will work for any environment.

Testing

You can easily test every YAML configuration using the metadata CLI from the Ingestion Framework. In order to install it, you just need to get it from PyPI. In each of the examples below, we’ll showcase how to run the CLI, assuming you have a YAML file that contains the workflow configuration.

Metadata Workflow

This is the first workflow you have to configure and run. It will take care of fetching the metadata from your sources, be it Database Services, Dashboard Services, Pipelines, etc. The rest of the workflows (Lineage, Profiler,…) will be executed on top of the metadata already available in the platform.
1

Adding the imports

Adding the importsThe first step is to import the MetadataWorkflow class, which will take care of the full ingestion logic. We’ll add the import for printing the results at the end.
2

Defining the YAML

Defining the YAMLThen, we need to pass the YAML configuration. For this simple example we are defining a variable, but you can read from a file, parse secrets from your environment, or any other approach you’d need. In the end, it’s just Python code.
You can find complete YAMLs in each connector docs and find more information about the available configurations.
3

Preparing the Workflow

Preparing the WorkflowFinally, we’ll prepare a function that we can execute anywhere.It will take care of instantiating the workflow, executing it and giving us the results.
import yaml
CONFIG = """
source:
  type: snowflake
  serviceName: <service name>
  serviceConnection:
    config:
      type: Snowflake
      ...
  sourceConfig:
    config:
      type: DatabaseMetadata
      markDeletedTables: true
      includeTables: true
      ...
sink:
  type: metadata-rest
  config: {}
workflowConfig:
  openMetadataServerConfig:
    hostPort: "http://localhost:8585/api"
    authProvider: openmetadata
    securityConfig:
      jwtToken: "{bot_jwt_token}"
"""
def run():
    workflow = MetadataWorkflow.create(CONFIG)
    workflow.execute()
    workflow.raise_from_status()
    workflow.print_status()
    workflow.stop()
You can test the workflow via metadata ingest -c <path-to-yaml>.

Lineage Workflow

This workflow will take care of scanning your query history and defining lineage relationships between your tables. You can find more information about this workflow here.
1

Adding the imports

Adding the importsThe first step is to import the MetadataWorkflow class, which will take care of the full ingestion logic. We’ll add the import for printing the results at the end.Note that we are using the same class as in the Metadata Ingestion.
2

Defining the YAML

Defining the YAMLThen, we need to pass the YAML configuration. For this simple example we are defining a variable, but you can read from a file, parse secrets from your environment, or any other approach you’d need.Note how we have not added here the serviceConnection. Since the service would have been created during the metadata ingestion, we can let the Ingestion Framework dynamically fetch the Service Connection information.If, however, you are configuring the workflow with storeServiceConnection: false, you’ll need to explicitly define the serviceConnection.
You can find complete YAMLs in each connector docs and find more information about the available configurations.
3

Preparing the Workflow

Preparing the WorkflowFinally, we’ll prepare a function that we can execute anywhere.It will take care of instantiating the workflow, executing it and giving us the results.
import yaml
CONFIG = """
source:
  type: snowflake-lineage
  serviceName: <service name>
  sourceConfig:
    config:
      type: DatabaseLineage
      queryLogDuration: 1
      parsingTimeoutLimit: 300
      ...
sink:
  type: metadata-rest
  config: {}
workflowConfig:
  openMetadataServerConfig:
    hostPort: "http://localhost:8585/api"
    authProvider: openmetadata
    securityConfig:
      jwtToken: "{bot_jwt_token}"
"""
def run():
    workflow = MetadataWorkflow.create(CONFIG)
    workflow.execute()
    workflow.raise_from_status()
    workflow.print_status()
    workflow.stop()
You can test the workflow via metadata ingest -c <path-to-yaml>.

Usage Workflow

As with the lineage workflow, we’ll scan the query history for any DML statements. The goal is to ingest queries into the platform, figure out the relevancy of your assets and frequently joined tables.
1

Adding the imports

Adding the importsThe first step is to import the UsageWorkflow class, which will take care of the full ingestion logic. We’ll add the import for printing the results at the end.
2

Defining the YAML

Defining the YAMLThen, we need to pass the YAML configuration. For this simple example we are defining a variable, but you can read from a file, parse secrets from your environment, or any other approach you’d need.Note how we have not added here the serviceConnection. Since the service would have been created during the metadata ingestion, we can let the Ingestion Framework dynamically fetch the Service Connection information.If, however, you are configuring the workflow with storeServiceConnection: false, you’ll need to explicitly define the serviceConnection.
You can find complete YAMLs in each connector docs and find more information about the available configurations.
3

Preparing the Workflow

Preparing the WorkflowFinally, we’ll prepare a function that we can execute anywhere.It will take care of instantiating the workflow, executing it and giving us the results.
import yaml
CONFIG = """
source:
  type: snowflake-usage
  serviceName: <service name>
  sourceConfig:
    config:
      type: DatabaseUsage
      queryLogDuration: 1
      parsingTimeoutLimit: 300
      ...
processor:
  type: query-parser
  config: {}
stage:
  type: table-usage
  config:
    filename: "/tmp/snowflake_usage"
bulkSink:
  type: metadata-usage
  config:
    filename: "/tmp/snowflake_usage"
workflowConfig:
  openMetadataServerConfig:
    hostPort: "http://localhost:8585/api"
    authProvider: openmetadata
    securityConfig:
      jwtToken: "{bot_jwt_token}"
"""
def run():
    workflow = UsageWorkflow.create(CONFIG)
    workflow.execute()
    workflow.raise_from_status()
    workflow.print_status()
    workflow.stop()
You can test the workflow via metadata usage -c <path-to-yaml>.

Profiler Workflow

This workflow will execute queries against your database and send the results into OpenMetadata. The goal is to compute metrics about your data and give you a high-level view of its shape, together with the sample data. This is an interesting previous step before creating Data Quality Workflows. You can find more information about this workflow here.
1

Adding the imports

Adding the importsThe first step is to import the ProfilerWorkflow class, which will take care of the full ingestion logic. We’ll add the import for printing the results at the end.
2

Defining the YAML

Defining the YAMLThen, we need to pass the YAML configuration. For this simple example we are defining a variable, but you can read from a file, parse secrets from your environment, or any other approach you’d need.Note how we have not added here the serviceConnection. Since the service would have been created during the metadata ingestion, we can let the Ingestion Framework dynamically fetch the Service Connection information.If, however, you are configuring the workflow with storeServiceConnection: false, you’ll need to explicitly define the serviceConnection.
You can find complete YAMLs in each connector docs and find more information about the available configurations.
3

Preparing the Workflow

Preparing the WorkflowFinally, we’ll prepare a function that we can execute anywhere.It will take care of instantiating the workflow, executing it and giving us the results.
import yaml
CONFIG = """
source:
  type: snowflake
  serviceName: <service name>
  sourceConfig:
    config:
      type: Profiler
      generateSampleData: true
      ...
processor:
  type: orm-profiler
  config: {}
sink:
  type: metadata-rest
  config: {}
workflowConfig:
  openMetadataServerConfig:
    hostPort: "http://localhost:8585/api"
    authProvider: openmetadata
    securityConfig:
      jwtToken: "{bot_jwt_token}"
"""
def run():
    workflow = ProfilerWorkflow.create(CONFIG)
    workflow.execute()
    workflow.raise_from_status()
    workflow.print_status()
    workflow.stop()
You can test the workflow via metadata profile -c <path-to-yaml>.

Data Quality Workflow

This workflow will execute queries against your database and send the results into OpenMetadata. The goal is to compute metrics about your data and give you a high-level view of its shape, together with the sample data. This is an interesting previous step before creating Data Quality Workflows. You can find more information about this workflow here.
1

Adding the imports

Adding the importsThe first step is to import the TestSuiteWorkflow class, which will take care of the full ingestion logic. We’ll add the import for printing the results at the end.
2

Defining the YAML

Defining the YAMLThen, we need to pass the YAML configuration. For this simple example we are defining a variable, but you can read from a file, parse secrets from your environment, or any other approach you’d need.Note how we have not added here the serviceConnection. Since the service would have been created during the metadata ingestion, we can let the Ingestion Framework dynamically fetch the Service Connection information.If, however, you are configuring the workflow with storeServiceConnection: false, you’ll need to explicitly define the serviceConnection.Moreover, see how we are not configuring any tests in the processor. You can do that, but even if nothing gets defined in the YAML, we will execute all the tests configured against the table.
You can find complete YAMLs in each connector docs and find more information about the available configurations.
3

Preparing the Workflow

Preparing the WorkflowFinally, we’ll prepare a function that we can execute anywhere.It will take care of instantiating the workflow, executing it and giving us the results.
import yaml
CONFIG = """
source:
  type: TestSuite
  serviceName: <service name>
  sourceConfig:
    config:
      type: TestSuite
      entityFullyQualifiedName: <Table FQN, e.g., `service.database.schema.table`>
processor:
  type: orm-test-runner
  config: {}
sink:
  type: metadata-rest
  config: {}
workflowConfig:
  openMetadataServerConfig:
    hostPort: "http://localhost:8585/api"
    authProvider: openmetadata
    securityConfig:
      jwtToken: "{bot_jwt_token}"
"""
def run():
    workflow = TestSuiteWorkflow.create(CONFIG)
    workflow.execute()
    workflow.raise_from_status()
    workflow.print_status()
    workflow.stop()
You can test the workflow via metadata test -c <path-to-yaml>.