MISO Pipeline using RTDIP
This article provides a guide on how to execute a MISO pipeline using RTDIP. This pipeline was tested on an M2 Macbook Pro using VS Code in a Conda (3.11) environment.
Prerequisites
This pipeline assumes you have followed the installation instructions as specified in the Getting Started section. In particular ensure you have installed the following:
RTDIP SDK Installation
Ensure you have installed the RTDIP SDK as follows:
pip install "rtdip-sdk[pipelines,pyspark]"
Components
Name | Description |
---|---|
MISODailyLoadISOSource | Read daily load data from MISO API. |
MISOToMDMTransformer | Converts MISO Raw data into Meters Data Model. |
SparkDeltaDestination | Writes to a Delta table. |
Example
Below is an example of how to set up a pipeline to read daily load data from the MISO API, transform it into the Meters Data Model and write it to a Delta table.
from rtdip_sdk.pipelines.sources import MISODailyLoadISOSource
from rtdip_sdk.pipelines.transformers import MISOToMDMTransformer
from rtdip_sdk.pipelines.destinations import SparkDeltaDestination
from pyspark.sql import SparkSession
def pipeline():
spark = SparkSession.builder.config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0")\
.config("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension")\
.config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog").getOrCreate()
source_df = MISODailyLoadISOSource(
spark = spark,
options = {
"load_type": "actual",
"date": "20230520",
}
).read_batch()
transform_value_df = MISOToMDMTransformer(
spark=spark,
data=source_df,
output_type= "usage"
).transform()
transform_meta_df = MISOToMDMTransformer(
spark=spark,
data=source_df,
output_type= "meta"
).transform()
SparkDeltaDestination(
data=transform_value_df,
options={
"partitionBy":"timestamp"
},
destination="miso_usage_data"
).write_batch()
SparkDeltaDestination(
data=transform_meta_df,
options={
"partitionBy":"timestamp"
},
destination="miso_meta_data"
).write_batch()
if __name__ == "__main__":
pipeline()
Using environments
If using an environment, include the following lines at the top of your script to prevent a difference in Python versions in worker and driver:
import sys, os
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable