MISO Pipeline using RTDIP and Databricks
This article provides a guide on how to deploy a MISO pipeline from a local file to a Databricks workflow using the RTDIP SDK and was tested on an M2 Macbook Pro using VS Code in a Conda (3.11) environment. RTDIP Pipeline Components provide Databricks with all the required Python packages and JARs to execute each component, this will automatically be set up during workflow creation.
Prerequisites
This pipeline assumes you have a Databricks workspace and have followed the installation instructions as specified in the Getting Started section. In particular ensure you have installed the following:
RTDIP SDK Installation
Ensure you have installed the RTDIP SDK as follows:
pip install "rtdip-sdk[pipelines]"
Components
Name | Description |
---|---|
MISODailyLoadISOSource | Read daily load data from MISO API. |
MISOToMDMTransformer | Converts MISO Raw data into Meters Data Model. |
SparkDeltaDestination | Writes to a Delta table. |
DatabricksSDKDeploy | Deploys an RTDIP Pipeline to Databricks Workflows leveraging the Databricks SDK. |
DeltaTableOptimizeUtility | Optimizes a Delta Table |
DeltaTableVacuumUtility | Vacuums a Delta Table |
Example
Below is an example of how to set up a pipeline job to read daily load data from the MISO API, transform it into the Meters Data Model and write it to a Delta table.
from rtdip_sdk.pipelines.sources import MISODailyLoadISOSource
from rtdip_sdk.pipelines.transformers import MISOToMDMTransformer
from rtdip_sdk.pipelines.destinations import SparkDeltaDestination
def pipeline():
source_df = MISODailyLoadISOSource(
spark = spark,
options = {
"load_type": "actual",
"date": "20230520",
}
).read_batch()
transform_value_df = MISOToMDMTransformer(
spark=spark,
data=source_df,
output_type= "usage"
).transform()
transform_meta_df = MISOToMDMTransformer(
spark=spark,
data=source_df,
output_type= "meta"
).transform()
SparkDeltaDestination(
data=transform_value_df,
options={
"partitionBy":"timestamp"
},
destination="miso_usage_data"
).write_batch()
SparkDeltaDestination(
data=transform_meta_df,
options={
"partitionBy":"timestamp"
},
destination="miso_meta_data",
mode="overwrite"
).write_batch()
if __name__ == "__main__":
pipeline()
Maintenance
The RTDIP SDK can be used to maintain Delta tables in Databricks, an example of how to set up a maintenance job to optimize and vacuum the MISO tables written from the previous example is provided below.
from rtdip_sdk.pipelines.utilities import DeltaTableOptimizeUtility, DeltaTableVacuumUtility
def maintenance():
TABLE_NAMES = [
"{path.to.table.miso_usage_data}",
"{path.to.table.miso_meta_data}"
]
for table in TABLE_NAMES:
DeltaTableOptimizeUtility(
spark=spark,
table_name=table
).execute()
DeltaTableVacuumUtility(
spark=spark,
table_name=table
).execute()
if __name__ == "__main__":
maintenance()
Deploy
Deployment to Databricks uses the Databricks SDK. Users have the option to control the job's configurations including the cluster and schedule.
from rtdip_sdk.pipelines.deploy import DatabricksSDKDeploy, CreateJob, JobCluster, ClusterSpec, Task, NotebookTask, AutoScale, RuntimeEngine, DataSecurityMode, CronSchedule, Continuous, PauseStatus
from rtdip_sdk.authentication.azure import DefaultAuth
def deploy():
credential = DefaultAuth().authenticate()
access_token = credential.get_token("2ff814a6-3304-4ab8-85cb-cd0e6f879c1d/.default").token
DATABRICKS_WORKSPACE = "{databricks-workspace-url}"
# Create clusters
cluster_list = []
cluster_list.append(JobCluster(
job_cluster_key="pipeline-cluster",
new_cluster=ClusterSpec(
node_type_id="Standard_E4ds_v5",
autoscale=AutoScale(min_workers=1, max_workers=8),
spark_version="13.3.x-scala2.12",
data_security_mode=DataSecurityMode.SINGLE_USER,
runtime_engine=RuntimeEngine.STANDARD
)
))
# Create tasks
task_list = []
task_list.append(Task(
task_key="pipeline",
job_cluster_key="pipeline-cluster",
notebook_task=NotebookTask(
notebook_path="{path/to/pipeline.py}"
)
))
# Create a Databricks Job for the Task
job = CreateJob(
name="rtdip-miso-batch-pipeline-job",
job_clusters=cluster_list,
tasks=task_list,
continuous=Continuous(pause_status=PauseStatus.UNPAUSED)
)
# Deploy to Databricks
databricks_pipeline_job = DatabricksSDKDeploy(databricks_job=job, host=DATABRICKS_WORKSPACE, token=access_token, workspace_directory="{path/to/databricks/workspace/directory}")
databricks_pipeline_job.deploy()
cluster_list = []
cluster_list.append(JobCluster(
job_cluster_key="maintenance-cluster",
new_cluster=ClusterSpec(
node_type_id="Standard_E4ds_v5",
autoscale=AutoScale(min_workers=1, max_workers=3),
spark_version="13.3.x-scala2.12",
data_security_mode=DataSecurityMode.SINGLE_USER,
runtime_engine=RuntimeEngine.PHOTON
)
))
task_list = []
task_list.append(Task(
task_key="rtdip-miso-maintenance-task",
job_cluster_key="maintenance-cluster",
notebook_task=NotebookTask(
notebook_path="{path/to/maintenance.py}"
)
))
# Create a Databricks Job for the Task
job = CreateJob(
name="rtdip-miso-maintenance-job",
job_clusters=cluster_list,
tasks=task_list,
schedule=CronSchedule(
quartz_cron_expression="4 * * * * ?",
timezone_id="UTC",
pause_status=PauseStatus.UNPAUSED
)
)
# Deploy to Databricks
databricks_pipeline_job = DatabricksSDKDeploy(databricks_job=job, host=DATABRICKS_WORKSPACE, token=access_token, workspace_directory="{path/to/databricks/workspace/directory}")
databricks_pipeline_job.deploy()
if __name__ == "__main__":
deploy()