Fledge Pipeline using Dagster and Databricks Connect
This article provides a guide on how to deploy a pipeline in dagster using the RTDIP SDK and Databricks Connect. This pipeline was tested on an M2 Macbook Pro using VS Code in a Python (3.10) environment.
Note
Reading from Eventhubs is currently not supported on Databricks Connect.
Prerequisites
Deployment using Databricks Connect requires:
-
a Databricks workspace
-
a cluster in the same workspace
-
a personal access token
Further information on Databricks requirements can be found here.
This pipeline job requires the packages:
Dagster Installation
For Mac users with an M1 or M2 chip, installation of dagster should be done as follows:
pip install dagster dagster-webserver --find-links=https://github.com/dagster-io/build-grpcio/wiki/Wheels
Components
Name | Description |
---|---|
SparkDeltaSource | Read data from a Delta table. |
BinaryToStringTransformer | Converts a Spark DataFrame column from binary to string. |
FledgeOPCUAJsonToPCDMTransformer | Converts a Spark DataFrame column containing a json string to the Process Control Data Model. |
SparkDeltaDestination | Writes to a Delta table. |
Authentication
For Databricks authentication, the following fields should be added to a configuration profile in your .databrickscfg
file:
[PROFILE]
host = https://{workspace_instance}
token = dapi...
cluster_id = {cluster_id}
This profile should match the configurations in your DatabricksSession
in the example below as it will be used by the Databricks extension in VS Code for authenticating your Databricks cluster.
Example
Below is an example of how to set up a pipeline to read Fledge data from a Delta table, transform it to RTDIP's PCDM model and write it to a Delta table.
from dagster import Definitions, ResourceDefinition, graph, op
from databricks.connect import DatabricksSession
from rtdip_sdk.pipelines.sources.spark.delta import SparkDeltaSource
from rtdip_sdk.pipelines.transformers.spark.binary_to_string import BinaryToStringTransformer
from rtdip_sdk.pipelines.transformers.spark.fledge_opcua_json_to_pcdm import FledgeOPCUAJsonToPCDMTransformer
from rtdip_sdk.pipelines.destinations.spark.delta import SparkDeltaDestination
# Databricks cluster configuration
databricks_resource = ResourceDefinition.hardcoded_resource(
DatabricksSession.builder.remote(
host = "https://{workspace_instance_name}",
token = "{token}",
cluster_id = "{cluster_id}"
).getOrCreate()
)
# Pipeline
@op(required_resource_keys={"databricks"})
def pipeline(context):
spark = context.resources.databricks
source = SparkDeltaSource(spark, {}, "{path_to_table}").read_batch()
transformer = BinaryToStringTransformer(source, "{source_column_name}", "{target_column_name}").transform()
transformer = FledgeOPCUAJsonToPCDMTransformer(transformer, "{source_column_name}").transform()
SparkDeltaDestination(transformer, {}, "{path_to_table}").write_batch()
@graph
def fledge_pipeline():
pipeline()
fledge_pipeline_job = fledge_pipeline.to_job(
resource_defs={
"databricks": databricks_resource
}
)
defs = Definitions(jobs=[fledge_pipeline_job])
Deploy
The following command deploys the pipeline to dagster:
dagster dev -f <path/to/file.py>
Using the link provided from the command above, click on Launchpad and hit run to run the pipeline.