Introduction¶
An efficient way to migrate Snowflake data to Databricks is to export the Snowflake tables to Parquet then import them into Databricks -- using a Cloud storage container to hold the Parquet intermediary objects in the same Cloud region as the Snowflake and Databricks tenants (for speed and egress charges). This is substantially faster/cheaper than using a typical database-to-database SQL connection setup and is consistent with the method described in the Databricks AI Summit session on Snowflake migration practices.
Keep in mind this method serves the purpose of transferring data that is no longer changing. This could be data that is historical, or a point-in-time migration of data that will then be wired up with ongoing processing logic.
The data validation steps are minimal and need expanding in a "real world" setting: we get a row count from the export and import process which we visually compare, and a we visual compare the first 10 rows from the Snowflake source and Databricks destination table.
SQL provides a COPY INTO expression that performs the heavy lifting of exporting and then importing the Parquet objects. The "legacy" process within Databricks for accessing external objects is to mount the cloud store under DBFS (Databricks File System) and then reference the object using the DBFS path name. This has been deprecated, and the preferred method is to use a URI along with an access credential to reach the object. The documentation for making this work using Azure ADLS is lacking, with the material and examples based on AWS S3. I had to conduct experimentation to "back into" the correct methods detailed in this notebook.
The access credentials for interfacing with Snowflake and the Cloud storage are hidden within Databricks Secrets as a best practice.
Kurt Rosenfeld kurtr@ctidata.com
Note: This notebook was developed and runs within the native Databricks Notebook. It likely can be made to work within other IDEs (eg. Visual Studio or native Jupyter) by installing the Databricks IDE package into your desktop environment.
How it works¶
The notebook migrates a Snowflake table, called tableName, to a Databricks Unity Catalog table having the same name.
Snowflake employs a 3 level namespace of DATABASE.SCHEMA.TABLE and the tables we want to export reside in patient.silver
Databricks Unity Catalog also employs a 3 level namespace of CATALOG.SCHEMA.TABLE
. We create a new Unity Catalog called patient_import.silver
into which the tables will be imported with the same name.
For example:
Snowflake table | Databricks table | Parquet Objects |
---|---|---|
patient.silver.admissions |
patient_import.silver.admissions |
admissons/admissions_1.snappy.parquet |
admissons/admissions_2.snappy.parquet |
Export process¶
Each table is exported from snowflake as a collection of Snappy compressed Parquet objects, each about 20Mb, labelled:
tableName_< increment count >.snappy.parquet
The Parquet objects are stored in an ADLS blob container, each collection stored under a folder called tableName within the container.
The generation of this per-table collection structure is automatic to the methods used in the code.
Import process¶
The import process takes each folder collection and automatically ingests all the Parquet objects reconstituting the table within Databricks.
Databricks needs a hierarchical namespace, similar to HDFS:
- The Azure WASBS interface overlays a hierarchy "emulation" on Azure blob storage by incorporating "/" in the blob names, similar to the way AWS S3 works.
- Azure ABFSS uses the more efficient hierarchy features built in to ADLS Gen2, which are available if the storage container is set to "hierarchy enabled"
- Since WASBS will work either way, we are using it for simplicity. However we also ran the code using the ABFSS access method, which we show commented out.
Prerequisites¶
- An ADLS Storage Account provisioned with a Storage Container and a generated SAS token saved in Databricks Secrets.
- Access credentials to the Snowflake data, saved in Databricks Secrets.
Packages & technical doc references¶
%pip install --upgrade snowflake-connector-python
%pip install --upgrade azure-storage-blob
"""
https://docs.snowflake.com/en/sql-reference/sql/copy-into-location
https://docs.databricks.com/en/sql/language-manual/delta-copy-into.html
https://docs.databricks.com/en/ingestion/cloud-object-storage/copy-into/temporary-credentials.html
https://docs.databricks.com/en/sql/language-manual/sql-ref-datatype-rules.html
"""
%restart_python
Initialize platform connections¶
Credential secrets¶
secretStore = "dbmigrate"
print(f"""
-- Use the databricks CLI to create the secrets.
-- Run these commands from the databricks cluster web terminal
databricks secrets create-scope {secretStore}
databricks secrets list-scopes
databricks secrets put-secret {secretStore} ADLS_ACCOUNT --string-value "<YOUR ADLS ACCOUNT NAME>"
databricks secrets put-secret {secretStore} ADLS_SAS_TOKEN --string-value "<YOUR ADLS SAS TOKEN>"
databricks secrets put-secret {secretStore} SNOW_USER --string-value "<YOUR SNOWFLAKE USER ACCOUNT>"
databricks secrets put-secret {secretStore} SNOW_PWD --string-value "<YOUR SNOWFLAKE USER PASSWORD>"
databricks secrets put-secret {secretStore} SNOW_ACCOUNT --string-value "<YOUR SNOWFLAKE ACCOUNT>"
databricks secrets list-secrets {secretStore}
-- To retrieve a secret, first install jq to parse json ouput
apt-get update
apt-get install jq
-- Then retrieve the secret value and convert it from base64 to plain text
databricks secrets get-secret {secretStore} SNOW_USER | jq -r '.value' | base64 --decode; echo
""")
# Load all the secrets into a dictionary for convenience
secretStore = "dbmigrate"
secrets = {s.key: dbutils.secrets.get(scope=secretStore, key=s.key)
for s in dbutils.secrets.list(secretStore)}
secrets
Connection parameters¶
### ADLS container identifiers
azStorageAccount, azSAStoken = secrets['ADLS_ACCOUNT'], secrets['ADLS_SAS_TOKEN']
azBlobContainer = "kurtssnowbrickexchange"
# Snowflake ADLS access
snowBlobURL = f'azure://{azStorageAccount}.blob.core.windows.net/{azBlobContainer}'
snowCreds = { 'warehouse': 'DEMO_WH',
'user': secrets['SNOW_USER'],
'account': secrets['SNOW_ACCOUNT'],
'password': secrets['SNOW_PWD'] }
snowStage = "DATABRICKS_IMPORT"
snowSchema = "patient.silver"
# Databricks ADLS access
# dbxBlobURL = f"abfss://{azBlobContainer}@{azStorageAccount}.dfs.core.windows.net"
dbxBlobURL = f"wasbs://{azBlobContainer}@{azStorageAccount}.blob.core.windows.net"
dbxSchema = "patient_import.silver" # Unity Catalog name for the import location
# Populate spark keys that can serve as variables in the notebook SQL cells later
spark.conf.set("stagefile.accessToken", azSAStoken)
spark.conf.set("stagefile.catalog", dbxSchema.split('.')[0])
spark.conf.set("stagefile.schema", dbxSchema)
Snowflake connection¶
# Setup the snowlfake connection used to execute the per-table exports in the main cycle
import snowflake.connector
# Even though we fully qualify the schema.table in the SQL statements later,
# we also force the schema default in the session connection for redundancy
snowCreds |= { 'database': snowSchema.split(".")[0], 'schema': snowSchema.split(".")[1] }
snow_con = snowflake.connector.connect(**snowCreds)
def snow_exec(sql, snow_con=snow_con):
cur = snow_con.cursor().execute(sql)
result = { 'columns': [desc[0] for desc in cur.description],
'data': cur.fetchall() }
cur.close()
return result
# packaging SQL results in a dataframe gets nicely displayed in the Databricks notebook
import pandas as pd
pd.set_option('display.max_colwidth', None)
prettify = lambda sql_result: pd.DataFrame(**sql_result)
Azure ADLS Container¶
# See whats already in the AZURE container
from azure.storage.blob import ContainerClient
azGetBlobs = ContainerClient(
account_url=f"https://{azStorageAccount}.blob.core.windows.net",
container_name=azBlobContainer, credential=azSAStoken)
# Iterate through the blobs and print the names
for blob in azGetBlobs.list_blobs(): print(blob.name)
Setup export and import points¶
# Create the snowflake stage area connected to the external Azure container
# This only needs to be run once
prettify(snow_exec(f"""
CREATE STAGE IF NOT EXISTS {snowStage}
URL = '{snowBlobURL}',
CREDENTIALS = ( AZURE_SAS_TOKEN = '{azSAStoken}' )
DIRECTORY = ( ENABLE = true );
""" ))
%sql
-- Make sure the Unity Catalog Schema exists, this only needs to be run once
CREATE CATALOG IF NOT EXISTS ${stagefile.catalog};
CREATE SCHEMA IF NOT EXISTS ${stagefile.schema};
-- DROP CATALOG IF EXISTS ${stagefile.catalog} CASCADE;
Main "per table" process¶
""" Initial tables of interest:
PATIENTS
ADMISSIONS
DRGCODES
D_ICD_DIAGNOSES
DIAGNOSES_ICD
D_ICD_PROCEDURES
PROCEDURES_ICD
NOTE_DISCHARGE
TRANSFERS
"""
# the table to export and then import, change this to whatever you want
# tableName = "DIAGNOSES_ICD"
tableName = input("tableName")
# Setup table specific processing variables that can be used in the SQL notebook cells
spark.conf.set("stagefile.import", dbxBlobURL + "/" + tableName) # ADLS container folder path
spark.conf.set("stagefile.target", dbxSchema + "." + tableName) # Unity Catalog table path
# Dump out all the spark keys that will be used in the SQL for debugging purposes
{k: v for k, v in spark.conf.getAll.items() if k.startswith("stagefile.")}
# Execute snowflake "copy into" to export tableName into the ADLS container as a parquet file collection
# Make note of the total rows and compare with the import cell later
prettify(snow_exec(f"""
copy into @{snowStage}/{tableName}/{tableName}
from {tableName}
OVERWRITE = TRUE
HEADER = TRUE
FILE_FORMAT = (TYPE = PARQUET);
"""))
# Check to see the blob now exists in the container
for blob in azGetBlobs.list_blobs(name_starts_with=tableName): print(blob.name)
%sql
-- Setup the target table in Unity Catalog
-- DROP TABLE ${stagefile.target};
CREATE TABLE IF NOT EXISTS ${stagefile.target};
%sql
-- COPY INTO will insert into an existing table, so clear it out in case an old copy is there
TRUNCATE TABLE ${stagefile.target};
COPY INTO ${stagefile.target}
FROM "${stagefile.import}"
WITH ( CREDENTIAL( AZURE_SAS_TOKEN = "${stagefile.accessToken}" ))
FILEFORMAT = PARQUET
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true', 'force' = 'true'); -- force = overwrite
# Snowflake meta-data for the table
prettify(snow_exec(f"""
DESCRIBE TABLE {snowSchema}.{tableName}
"""))
# Snowflake meta-data for the table
prettify(snow_exec(f"""
SELECT COUNT(SUBJECT_ID), COUNT(DISTINCT SUBJECT_ID)
FROM {snowSchema}.{tableName}
"""))
# Databricks meta-data for the table
display(spark.sql(f"""
DESCRIBE TABLE {dbxSchema}.{tableName}
"""))
%sql
-- The same thing using a SQL magic cell
DESCRIBE TABLE ${stagefile.target}
%sql
-- Databricks meta-data for the table
SELECT COUNT(SUBJECT_ID), COUNT(DISTINCT SUBJECT_ID)
FROM ${stagefile.target}
Business SQL Test¶
# Top 5 DRGs with the highest number of pneumonia diagnoses with mortality counts
business_sql_test = \
"""
SELECT
concat(DR.DESCRIPTION,' (', DR.DRG_CODE, ')') AS "DRG Description",
count(DISTINCT A.HADM_ID) AS "Pneumonia Diagnosis",
count(
DISTINCT CASE
WHEN A.HOSPITAL_EXPIRE_FLAG = TRUE
THEN A.HADM_ID
ELSE NULL
END
) AS "Mortality Count"
FROM ADMISSIONS AS A
JOIN DIAGNOSES_ICD AS D ON A.HADM_ID = D.HADM_ID
JOIN D_ICD_DIAGNOSES AS DI ON D.ICD_VER_CODE = DI.ICD_VER_CODE
JOIN DRGCODES AS DR ON A.HADM_ID = DR.HADM_ID
WHERE
EXTRACT(YEAR FROM A.ADMITTIME) = 2124
AND DI.LONG_TITLE ILIKE '%pneumonia%'
GROUP BY
DR.DESCRIPTION,
DR.DRG_CODE
ORDER BY
"Pneumonia Diagnosis" DESC NULLS LAST
LIMIT 5
"""
# Databricks Top 5 DRGs with the highest number of pneumonia diagnoses
spark.sql(f"USE {dbxSchema}")
display(spark.sql(business_sql_test.replace('"','`'))) # Databricks uses back-quotes for column aliases
# Snowflake Top 5 DRGs with the highest number of pneumonia diagnoses
snow_exec(f"USE {snowSchema}")
prettify(snow_exec(business_sql_test))