Fledge Eventhub to Delta Pipeline using Spark Connect
This article provides a guide on how to execute a pipeline that batch reads Fledge data from an Eventhub, transforms to the Process Control Data Model and writes it to Delta via Spark Connect/Databricks Connect v2. This pipeline was tested on an M2 Macbook Pro using VS Code in a Python (3.11) environment.
Prerequisites
This pipeline job requires the packages:
Components
Name | Description |
---|---|
SparkEventhubSource | Reads data from an Eventhub. |
BinaryToStringTransformer | Transforms Spark DataFrame column to string. |
FledgeOPCUAJsonToPCDMTransformer | Transforms Fledge to PCDM. |
SparkPCDMToDeltaDestination | Writes to Delta in the Pross Control Data Model format |
Example
Below is an example of reading from an Eventhub, transforming and writing to Delta using Spark Connect/Databricks Connect v2.
from rtdip_sdk.pipelines.sources import SparkKafkaEventhubSource
from rtdip_sdk.pipelines.transformers import (
FledgeOPCUAJsonToPCDMTransformer,
BinaryToStringTransformer,
)
from rtdip_sdk.pipelines.destinations import SparkPCDMToDeltaDestination
from rtdip_sdk.pipelines.secrets import AzureKeyVaultSecrets
from rtdip_sdk.authentication.azure import DefaultAuth
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
def pipeline():
auth = DefaultAuth().authenticate()
token = auth.get_token("2ff814a6-3304-4ab8-85cb-cd0e6f879c1d/.default").token
DATABRICKS_WORKSPACE = "adb-xxxxxxxxxx.x.azuredatabricks.net"
DATABRICKS_CLUSTER_ID = "xxx-yyyyyy-zzzzzzzz"
DATABRICKS_USER_ID = (
"your_user_id@your_domain.com" # required for Spark Connect on Windows
)
AZURE_KEYVAULT = "{YOUR-KEYVAULT-NAME}"
AZURE_KEYVAULT_SECRET = "{YOUR-SECRET-NAME}"
spark_remote = "sc://{}:443/;token={};x-databricks-cluster-id={};user_id={}".format(
DATABRICKS_WORKSPACE, token, DATABRICKS_CLUSTER_ID, DATABRICKS_USER_ID
)
EVENTHUB_CONNECTION_STRING = AzureKeyVaultSecrets(
vault=AZURE_KEYVAULT,
key=AZURE_KEYVAULT_SECRET,
credential=auth,
).get()
EVENTHUB_CONSUMER_GROUP = "{YOUR-CONSUMER-GROUP}"
DESTINATION_FLOAT = "{YOUR-FLOAT-DELTA-TABLE}"
DESTINATION_STRING = "{YOUR-STRING-DELTA-TABLE}"
DESTINATION_INTEGER = "{YOUR-INTEGER-DELTA-TABLE}"
spark = SparkSessionUtility(config={}, remote=spark_remote).execute()
source_df = SparkKafkaEventhubSource(
spark=spark,
options={
"startingOffsets": "earliest",
"maxOffsetsPerTrigger": 500000,
"failOnDataLoss": "false",
},
connection_string=EVENTHUB_CONNECTION_STRING,
consumer_group=EVENTHUB_CONSUMER_GROUP,
decode_kafka_headers_to_amqp_properties=False,
).read_stream()
transform_df = BinaryToStringTransformer(
data=source_df, source_column_name="body", target_column_name="body"
).transform()
transform_df = FledgeOPCUAJsonToPCDMTransformer(
data=transform_df, source_column_name="body"
).transform()
transform_df = transform_df.withColumn(
"EventDate", transform_df["EventTime"].cast("date")
)
SparkPCDMToDeltaDestination(
spark=spark,
data=transform_df,
options={
"checkpointLocation": "dbfs:/checkpoints/rtdip-fledge-pcdm-stream-pipeline"
},
destination_float=DESTINATION_FLOAT,
destination_string=DESTINATION_STRING,
destination_integer=DESTINATION_INTEGER,
mode="append",
trigger="30 seconds",
merge=False,
try_broadcast_join=False,
remove_nanoseconds=True,
remove_duplicates=True,
query_wait_interval=30,
).write_stream()
if __name__ == "__main__":
pipeline()