Skip to content

EdgeX Eventhub to Delta Pipeline

This article provides a guide on how to execute a pipeline that batch reads EdgeX data from an Eventhub and writes to a Delta Table locally using the RTDIP SDK. This pipeline was tested on an M2 Macbook Pro using VS Code in a Python (3.10) environment.

Prerequisites

This pipeline job requires the packages:

Components

Name Description
SparkEventhubSource Reads data from an Eventhub.
BinaryToStringTransformer Transforms Spark DataFrame column to string.
EdgeXOPCUAJsonToPCDMTransformer Transforms EdgeX to PCDM.
SparkDeltaDestination Writes to Delta.

Common Errors

Error Solution
[com.google.common.util.concurrent.ExecutionError: java.lang.NoClassDefFoundError: org/apache/spark/ErrorClassesJsonReader] The Delta version in the Spark Session must be compatible with your local Pyspark version. See here for version compatibility

Example

Below is an example of how to read from and write to Delta Tables locally without the need for Spark

from rtdip_sdk.pipelines.sources.spark.eventhub import SparkEventhubSource
from rtdip_sdk.pipelines.transformers.spark.binary_to_string import (
    BinaryToStringTransformer,
)
from rtdip_sdk.pipelines.destinations.spark.delta import SparkDeltaDestination
from rtdip_sdk.pipelines.transformers.spark.edgex_opcua_json_to_pcdm import (
    EdgeXOPCUAJsonToPCDMTransformer,
)
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
import json


def pipeline():

    spark = SparkSessionUtility(config={}).execute()

    ehConf = {
        "eventhubs.connectionString": "{EventhubConnectionString}",
        "eventhubs.consumerGroup": "{EventhubConsumerGroup}",
        "eventhubs.startingPosition": json.dumps(
            {"offset": "0", "seqNo": -1, "enqueuedTime": None, "isInclusive": True}
        ),
    }

    source = SparkEventhubSource(spark, ehConf).read_batch()
    string_data = BinaryToStringTransformer(source, "body", "body").transform()
    PCDM_data = EdgeXOPCUAJsonToPCDMTransformer(string_data, "body").transform()
    SparkDeltaDestination(
        data=PCDM_data, options={}, destination="{path/to/table}"
    ).write_batch()


if __name__ == "__main__":
    pipeline()