EdgeX Eventhub to Delta Pipeline
This article provides a guide on how to execute a pipeline that batch reads EdgeX data from an Eventhub and writes to a Delta Table locally using the RTDIP SDK. This pipeline was tested on an M2 Macbook Pro using VS Code in a Python (3.10) environment.
Prerequisites
This pipeline job requires the packages:
Components
Name | Description |
---|---|
SparkEventhubSource | Reads data from an Eventhub. |
BinaryToStringTransformer | Transforms Spark DataFrame column to string. |
EdgeXOPCUAJsonToPCDMTransformer | Transforms EdgeX to PCDM. |
SparkDeltaDestination | Writes to Delta. |
Common Errors
Error | Solution |
---|---|
[com.google.common.util.concurrent.ExecutionError: java.lang.NoClassDefFoundError: org/apache/spark/ErrorClassesJsonReader] | The Delta version in the Spark Session must be compatible with your local Pyspark version. See here for version compatibility |
Example
Below is an example of how to read from and write to Delta Tables locally without the need for Spark
from rtdip_sdk.pipelines.sources.spark.eventhub import SparkEventhubSource
from rtdip_sdk.pipelines.transformers.spark.binary_to_string import (
BinaryToStringTransformer,
)
from rtdip_sdk.pipelines.destinations.spark.delta import SparkDeltaDestination
from rtdip_sdk.pipelines.transformers.spark.edgex_opcua_json_to_pcdm import (
EdgeXOPCUAJsonToPCDMTransformer,
)
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
import json
def pipeline():
spark = SparkSessionUtility(config={}).execute()
ehConf = {
"eventhubs.connectionString": "{EventhubConnectionString}",
"eventhubs.consumerGroup": "{EventhubConsumerGroup}",
"eventhubs.startingPosition": json.dumps(
{"offset": "0", "seqNo": -1, "enqueuedTime": None, "isInclusive": True}
),
}
source = SparkEventhubSource(spark, ehConf).read_batch()
string_data = BinaryToStringTransformer(source, "body", "body").transform()
PCDM_data = EdgeXOPCUAJsonToPCDMTransformer(string_data, "body").transform()
SparkDeltaDestination(
data=PCDM_data, options={}, destination="{path/to/table}"
).write_batch()
if __name__ == "__main__":
pipeline()