Fledge Pipeline using Dagster
This article provides a guide on how to deploy a pipeline in dagster using the RTDIP SDK. This pipeline was tested on an M2 Macbook Pro using VS Code in a Python (3.10) environment.
Prerequisites
This pipeline job requires the packages:
Dagster Installation
For Mac users with an M1 or M2 chip, installation of dagster should be done as follows:
pip install dagster dagster-webserver --find-links=https://github.com/dagster-io/build-grpcio/wiki/Wheels
Components
Name | Description |
---|---|
SparkEventhubSource | Read data from an Eventhub. |
BinaryToStringTransformer | Converts a Spark DataFrame column from binary to string. |
FledgeOPCUAJsonToPCDMTransformer | Converts a Spark DataFrame column containing a json string to the Process Control Data Model. |
SparkDeltaDestination | Writes to a Delta table. |
Example
Below is an example of how to set up a pipeline to read Fledge data from an Eventhub, transform it to RTDIP's PCDM model and write it to a Delta table on your machine.
import json
from datetime import datetime as dt
from dagster import Definitions, graph, op
from dagster_pyspark.resources import pyspark_resource
from rtdip_sdk.pipelines.sources.spark.eventhub import SparkEventhubSource
from rtdip_sdk.pipelines.transformers.spark.binary_to_string import BinaryToStringTransformer
from rtdip_sdk.pipelines.transformers.spark.fledge_opcua_json_to_pcdm import FledgeOPCUAJsonToPCDMTransformer
from rtdip_sdk.pipelines.destinations.spark.delta import SparkDeltaDestination
# PySpark cluster configuration
packages = "com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22,io.delta:delta-core_2.12:2.4.0"
my_pyspark_resource = pyspark_resource.configured(
{"spark_conf": {"spark.default.parallelism": 1,
"spark.jars.packages": packages,
"spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog"
}
}
)
# EventHub configuration
eventhub_connection_string = "{eventhub_connection_string}"
eventhub_consumer_group = "{eventhub_consumer_group}"
startOffset = "-1"
endTime = dt.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
startingEventPosition = {
"offset": startOffset,
"seqNo": -1,
"enqueuedTime": None,
"isInclusive": True
}
endingEventPosition = {
"offset": None,
"seqNo": -1,
"enqueuedTime": endTime,
"isInclusive": True
}
ehConf = {
'eventhubs.connectionString' : eventhub_connection_string,
'eventhubs.consumerGroup': eventhub_consumer_group,
'eventhubs.startingPosition' : json.dumps(startingEventPosition),
'eventhubs.endingPosition' : json.dumps(endingEventPosition),
'maxEventsPerTrigger': 1000
}
# Pipeline
@op(required_resource_keys={"spark"})
def pipeline(context):
spark = context.resources.pyspark.spark_session
source = SparkEventhubSource(spark, ehConf).read_batch()
transformer = BinaryToStringTransformer(source, "{source_column_name}", "{target_column_name}").transform()
transformer = FledgeOPCUAJsonToPCDMTransformer(transformer, "{source_column_name}").transform()
SparkDeltaDestination(transformer, {}, "{path_to_table}").write_batch()
@graph
def fledge_pipeline():
pipeline()
fledge_pipeline_job = fledge_pipeline.to_job(
resource_defs={
"spark": my_pyspark_resource
}
)
defs = Definitions(jobs=[fledge_pipeline_job])
Deploy
The following command deploys the pipeline to dagster:
dagster dev -f <path/to/file.py>
Using the link provided from the command above, click on Launchpad and hit run to run the pipeline.