Monitor dbt pipelines with Elastic Observability

Learn how to set up a dbt monitoring system with Elastic that proactively alerts on data processing cost spikes, anomalies in rows per table, and data quality test failures

Monitor dbt pipelines with Elastic Observability

In the Data Analytics team within the Observability organization in Elastic, we use dbt (dbt™, data build tool) to execute our SQL data transformation pipelines. dbt is a SQL-first transformation workflow that lets teams quickly and collaboratively deploy analytics code. In particular, we use dbt core, the open-source project, where you can develop from the command line and run your dbt project.

Our data transformation pipelines run daily and process the data that feed our internal dashboards, reports, analyses, and Machine Learning (ML) models.

There have been incidents in the past when the pipelines have failed, the source tables contained wrong data or we have introduced a change into our SQL code that has caused data quality issues, and we only realized once we saw it in a weekly report that was showing an anomalous number of records. That’s why we have built a monitoring system that proactively alerts us about these types of incidents as soon as they happen and helps us with visualizations and analyses to understand their root cause, saving us several hours or days of manual investigations.

We have leveraged our own Observability Solution to help solve this challenge, monitoring the entire lifecycle of our dbt implementation. This setup enables us to track the behavior of our models and conduct data quality testing on the final tables. We export dbt process logs from run jobs and tests into Elasticsearch and utilize Kibana to create dashboards, set up alerts, and configure Machine Learning jobs to monitor and assess issues.

The following diagram shows our complete architecture. In a follow-up article, we’ll also cover how we observe our python data processing and ML model processes using OTEL and Elastic - stay tuned.

Why monitor dbt pipelines with Elastic?

With every invocation, dbt generates and saves one or more JSON files called artifacts containing log data on the invocation results.

dbt run
and
dbt test
invocation logs are stored in the file
run_results.json
, as per the dbt documentation:

This file contains information about a completed invocation of dbt, including timing and status info for each node (model, test, etc) that was executed. In aggregate, many

run_results.json
can be combined to calculate average model runtime, test failure rates, the number of record changes captured by snapshots, etc.

Monitoring

dbt run
invocation logs can help solve several issues, including tracking and alerting about table volumes, detecting excessive slot time from resource-intensive models, identifying cost spikes due to slot time or volume, and pinpointing slow execution times that may indicate scheduling issues. This system was crucial when we merged a PR with a change in our code that had an issue, producing a sudden drop in the number of daily rows in upstream Table A. By ingesting the
dbt run
logs into Elastic, our anomaly detection job quickly identified anomalies in the daily row counts for Table A and its downstream tables, B, C, and D. The Data Analytics team received an alert notification about the issue, allowing us to promptly troubleshoot, fix and backfill the tables before it affected the weekly dashboards and downstream ML models.

Monitoring

dbt test
invocation logs can also address several issues, such as identifying duplicates in tables, detecting unnoticed alterations in allowed values for specific fields through validation of all enum fields, and resolving various other data processing and quality concerns. With dashboards and alerts on data quality tests, we proactively identify issues like duplicate keys, unexpected category values, and increased nulls, ensuring data integrity. In our team, we had an issue where a change in one of our raw lookup tables produced duplicated rows in our user table, doubling the number of users reported. By ingesting the
dbt test
logs into Elastic, our rules detected that some duplicate tests had failed. The team received an alert notification about the issue, allowing us to troubleshoot it right away by finding the upstream table that was the root cause. These duplicates meant that downstream tables had to process 2x the amount of data, creating a spike in the bytes processed and slot time. The anomaly detection and alerts on the
dbt run
logs also helped us spot these spikes for individual tables and allowed us to quantify the impact on our billing.

Processing our dbt logs with Elastic and Kibana allows us to obtain real-time insights, helps us quickly troubleshoot potential issues, and keeps our data transformation processes running smoothly. We set up anomaly detection jobs and alerts in Kibana to monitor the number of rows processed by dbt, the slot time, and the results of the tests. This lets us catch real-time incidents, and by promptly identifying and fixing these issues, Elastic makes our data pipeline more resilient and our models more cost-effective, helping us stay on top of cost spikes or data quality issues.

We can also correlate this information with other events ingested into Elastic, for example using the Elastic Github connector, we can correlate data quality test failures or other anomalies with code changes to find the root cause of the commit or PR that caused the issues. By ingesting application logs into Elastic, we can also analyze if these issues in our pipelines have affected downstream applications, increasing latency, throughput or error rates using APM. Ingesting billing, revenue data or web traffic, we could also see the impact in business metrics.

How to export dbt invocation logs to Elasticsearch

We use the Python Elasticsearch client to send the dbt invocation logs to Elastic after we run our

dbt run
and
dbt test
processes daily in production. The setup just requires you to install the Elasticsearch Python client and obtain your Elastic Cloud ID (go to https://cloud.elastic.co/deployments/, select your deployment and find the
Cloud ID
) and Elastic Cloud API Key (following this guide)

This python helper function will index the results from your

run_results.json
file to the specified index. You just need to export the variables to the environment:

  • RESULTS_FILE
    : path to your
    run_results.json
    file
  • DBT_RUN_LOGS_INDEX
    : the name you want to give to dbt run logs index in Elastic, e.g.
    dbt_run_logs
  • DBT_TEST_LOGS_INDEX
    : the name you want to give to the dbt test logs index in Elastic, e.g.
    dbt_test_logs
  • ES_CLUSTER_CLOUD_ID
  • ES_CLUSTER_API_KEY

Then call the function

log_dbt_es
from your python code or save this code as a python script and run it after executing your
dbt run
or
dbt test
commands:

from elasticsearch import Elasticsearch, helpers
import os
import sys
import json

def log_dbt_es():
   RESULTS_FILE = os.environ["RESULTS_FILE"]
   DBT_RUN_LOGS_INDEX = os.environ["DBT_RUN_LOGS_INDEX"]
   DBT_TEST_LOGS_INDEX = os.environ["DBT_TEST_LOGS_INDEX"]
   es_cluster_cloud_id = os.environ["ES_CLUSTER_CLOUD_ID"]
   es_cluster_api_key = os.environ["ES_CLUSTER_API_KEY"]


   es_client = Elasticsearch(
       cloud_id=es_cluster_cloud_id,
       api_key=es_cluster_api_key,
       request_timeout=120,
   )


   if not os.path.exists(RESULTS_FILE):
       print(f"ERROR: {RESULTS_FILE} No dbt run results found.")
       sys.exit(1)


   with open(RESULTS_FILE, "r") as json_file:
       results = json.load(json_file)
       timestamp = results["metadata"]["generated_at"]
       metadata = results["metadata"]
       elapsed_time = results["elapsed_time"]
       args = results["args"]
       docs = []
       for result in results["results"]:
           if result["unique_id"].split(".")[0] == "test":
               result["_index"] = DBT_TEST_LOGS_INDEX
           else:
               result["_index"] = DBT_RUN_LOGS_INDEX
           result["@timestamp"] = timestamp
           result["metadata"] = metadata
           result["elapsed_time"] = elapsed_time
           result["args"] = args
           docs.append(result)
       _ = helpers.bulk(es_client, docs)
   return "Done"

# Call the function
log_dbt_es()

If you want to add/remove any other fields from

run_results.json
, you can modify the above function to do it.

Once the results are indexed, you can use Kibana to create Data Views for both indexes and start exploring them in Discover.

Go to Discover, click on the data view selector on the top left and “Create a data view”.

Now you can create a data view with your preferred name. Do this for both dbt run (

DBT_RUN_LOGS_INDEX
in your code) and dbt test (
DBT_TEST_LOGS_INDEX
in your code) indices:

Going back to Discover, you’ll be able to select the Data Views and explore the data.

dbt run alerts, dashboards and ML jobs

The invocation of

executes compiled SQL model files against the current database.
dbt run
invocation logs contain the following fields:

  • unique_id
    : Unique model identifier
  • execution_time
    : Total time spent executing this model run

The logs also contain the following metrics about the job execution from the adapter:

  • adapter_response.bytes_processed
  • adapter_response.bytes_billed
  • adapter_response.slot_ms
  • adapter_response.rows_affected

We have used Kibana to set up Anomaly Detection jobs on the above-mentioned metrics. You can configure a multi-metric job split by

unique_id
to be alerted when the sum of rows affected, slot time consumed, or bytes billed is anomalous per table. You can track one job per metric. If you have built a dashboard of the metrics per table, you can use this shortcut to create the Anomaly Detection job directly from the visualization. After the jobs are created and are running on incoming data, you can view the jobs and add them to a dashboard using the three dots button in the anomaly timeline:

We have used the ML job to set up alerts that send us emails/slack messages when anomalies are detected. Alerts can be created directly from the Jobs (Machine Learning > Anomaly Detection Jobs) page, by clicking on the three dots at the end of the ML job row:

We also use Kibana dashboards to visualize the anomaly detection job results and related metrics per table, to identify which tables consume most of our resources, to have visibility on their temporal evolution, and to measure aggregated metrics that can help us understand month over month changes.

dbt test alerts and dashboards

You may already be familiar with tests in dbt, but if you’re not, dbt data tests are assertions you make about your models. Using the command

, dbt will tell you if each test in your project passes or fails. Here is an example of how to set them up. In our team, we use out-of-the-box dbt tests (
unique
,
not_null
,
accepted_values
, and
relationships
) and the packages dbt_utils and dbt_expectations for some extra tests. When the command
dbt test
is run, it generates logs that are stored in
run_results.json
.

dbt test logs contain the following fields:

  • unique_id
    : Unique test identifier, tests contain the “test” prefix in their unique identifier
  • status
    : result of the test,
    pass
    or
    fail
  • execution_time
    : Total time spent executing this test
  • failures
    : will be 0 if the test passes and 1 if the test fails
  • message
    : If the test fails, reason why it failed

The logs also contain the metrics about the job execution from the adapter.

We have set up alerts on document count (see guide) that will send us an email / slack message when there are any failed tests. The rule for the alerts is set up on the dbt test Data View that we have created before, the query filtering on

status:fail
to obtain the logs for the tests that have failed, and the rule condition is document count bigger than 0. Whenever there is a failure in any test in production, we get an alert with links to the alert details and dashboards to be able to troubleshoot them:

We have also built a dashboard to visualize the tests run, tests failed, and their execution time and slot time to have a historical view of the test run:

Finding Root Causes with the AI Assistant

The most effective way for us to analyze these multiple sources of information is using the AI Assistant to help us troubleshoot the incidents. In our case, we got an alert about a test failure, and we used the AI Assistant to give us context on what happened. Then we asked if there were any downstream consequences, and the AI Assistant interpreted the results of the Anomaly Detection job, which indicated a spike in slot time for one of our downstream tables and the increase of the slot time vs. the baseline. Then, we asked for the root cause, and the AI Assistant was able to find and provide us a link to a PR from our Github changelog that matched the start of the incident and was the most probable cause.

Conclusion

As a Data Analytics team, we are responsible for guaranteeing that the tables, charts, models, reports, and dashboards we provide to stakeholders are accurate and contain the right sources of information. As teams grow, the number of models we own becomes larger and more interconnected, and it isn’t easy to guarantee that everything is running smoothly and providing accurate results. Having a monitoring system that proactively alerts us on cost spikes, anomalies in row counts, or data quality test failures is like having a trusted companion that will alert you in advance if something goes wrong and help you get to the root cause of the issue.

dbt invocation logs are a crucial source of information about the status of our data pipelines, and Elastic is the perfect tool to extract the maximum potential out of them. Use this blog post as a starting point for utilizing your dbt logs to help your team achieve greater reliability and peace of mind, allowing them to focus on more strategic tasks rather than worrying about potential data issues.

Share this article