Why would you do such a thing?
Python packages are easy to test in isolation. But what if packaging your code is not an option, and you do want to automatically verify that your code actually works, you could run your databricks notebook from Azure DevOps directly using the databricks-cli.
It’s important to know whether your notebook has particular side effects, in which case it is advised to parameterize the notebook so that the output or side effect can be controlled to a certain extent.
⟶
samplefile.csv
notebook.py
read and transform the samplefile.csv
file into an output filetests.py
notebook that triggers the first notebook, performing some checks on the output datatests.py
notebook in a databricks workspaceCreate the following project structure:
➜ tree -a -L 2
.
├── tmp
│ └── demo
│ └── sampledata.csv
└── src
├── notebook.py
└── tests.py
3 directories, 3 files
Here’s the minimum amount of data that we provide to our notebook:
➜ cat tmp/demo/sampledata.csv
id,firstname,lastname
1,Stefan,Schenk
Here’s our notebook that concatenates the columns firstname
and lastname
to produce a column fullname
, and writes it to a temporary output folder:
# Databricks notebook source
from pyspark.sql import functions as f
# COMMAND ----------
df = spark.read.csv('tmp/demo', header=True)
# COMMAND ----------
(
df
.withColumn('fullname',
f.format_string('%s %s', f.col('firstname'), f.col('lastname')))
.write
.mode('overwrite')
.csv('tmp/demo-output', header=True)
)
Our tests.py
notebook, that runs the notebook.py
and performs some checks on the output data:
# Databricks notebook source
# MAGIC %run /Shared/tmp/notebook
# COMMAND ----------
df = spark.read.csv('tmp/demo-output', header=True)
# COMMAND ----------
assert df.limit(1).collect().pop().asDict() == {
'id': '1',
'firstname': 'Stefan',
'lastname': 'Schenk',
'fullname': 'Stefan Schenk'
}
Finally, we need a DevOps pipeline that copies the data and notebook to a databricks workspace.
Create a new azure-pipelines.yml
file, then copy and paste the following code block:
resources:
- repo: self
trigger:
- master
variables:
databricks-host: 'https://westeurope.azuredatabricks.net'
notebook-folder: '/Shared/tmp/'
cluster-id: '1234-567890-bobby123'
notebook-name: 'tests'
steps:
- task: UsePythonVersion@0
displayName: 'Use Python 3.x'
- script: |
pip install databricks-cli
displayName: 'Install databricks-cli'
- script: |
databricks fs cp mnt/demo/ dbfs:/tmp/demo --recursive --overwrite
databricks workspace import_dir src/ $(notebook-folder) -o
JOB_ID=$(databricks jobs create --json '{
"name": "Testrun",
"existing_cluster_id": "$(cluster-id)",
"timeout_seconds": 3600,
"max_retries": 1,
"notebook_task": {
"notebook_path": "$(notebook-folder)$(notebook-name)",
"base_parameters": {}
}
}' | jq '.job_id')
RUN_ID=$(databricks jobs run-now --job-id $JOB_ID | jq '.run_id')
job_status="PENDING"
while [ $job_status = "RUNNING" ] || [ $job_status = "PENDING" ]
do
sleep 2
job_status=$(databricks runs get --run-id $RUN_ID | jq -r '.state.life_cycle_state')
echo Status $job_status
done
RESULT=$(databricks runs get-output --run-id $RUN_ID)
RESULT_STATE=$(echo $RESULT | jq -r '.metadata.state.result_state')
RESULT_MESSAGE=$(echo $RESULT | jq -r '.metadata.state.state_message')
if [ $RESULT_STATE = "FAILED" ]
then
echo "##vso[task.logissue type=error;]$RESULT_MESSAGE"
echo "##vso[task.complete result=Failed;done=true;]$RESULT_MESSAGE"
fi
echo $RESULT | jq .
displayName: 'Run Databricks Notebook'
env:
DATABRICKS_TOKEN: $(databricks-token)
DATABRICKS_HOST: $(databricks-host)
In Azure DevOps, create a new pipeline from this yml file after committing and pushing it to your repository. Then continue to create a new databricks token, and add it as a secret variable called databricks-token
to the build pipeline.
The pipeline looks complicated, but it’s just a collection of databricks-cli commands:
RUN_ID
FAILED
The output will look something like this:
If you scroll all the way down to the bottom, there’s a run link, which takes you to the databricks run:
And that’s it!