MISO Pipeline using RTDIP
This article provides a guide on how to execute a MISO pipeline using RTDIP. This pipeline was tested on an M2 Macbook Pro using VS Code in a Conda (3.11) environment.
Prerequisites
This pipeline assumes you have a valid API key from PJM and have followed the installation instructions as specified in the Getting Started section. In particular ensure you have installed the following:
RTDIP SDK Installation
Ensure you have installed the RTDIP SDK as follows:
pip install "rtdip-sdk[pipelines,pyspark]"
Components
Name | Description |
---|---|
PJMDailyLoadISOSource | Read daily load data from MISO API. |
PJMToMDMTransformer | Converts PJM Raw data into Meters Data Model. |
SparkDeltaDestination | Writes to a Delta table. |
Example
Below is an example of how to set up a pipeline to read daily load data from the PJM API, transform it into the Meters Data Model and write it to a Delta table.
from rtdip_sdk.pipelines.sources import PJMDailyLoadISOSource
from rtdip_sdk.pipelines.transformers import PJMToMDMTransformer
from rtdip_sdk.pipelines.destinations import SparkDeltaDestination
from pyspark.sql import SparkSession
def pipeline():
spark = SparkSession.builder.config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0")\
.config("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension")\
.config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog").getOrCreate()
source_df = PJMDailyLoadISOSource(
spark = spark,
options = {
"api_key": "{api_key}",
"load_type": "actual"
}
).read_batch()
transform_value_df = PJMToMDMTransformer(
spark=spark,
data=source_df,
output_type= "usage"
).transform()
transform_meta_df = PJMToMDMTransformer(
spark=spark,
data=source_df,
output_type= "meta"
).transform()
SparkDeltaDestination(
data=transform_value_df,
options={
"partitionBy":"timestamp"
},
destination="pjm_usage_data"
).write_batch()
SparkDeltaDestination(
data=transform_meta_df,
options={
"partitionBy":"timestamp"
},
destination="pjm_meta_data"
).write_batch()
if __name__ == "__main__":
pipeline()
Using environments
If using an environment, include the following lines at the top of your script to prevent a difference in Python versions in worker and driver:
import sys, os
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable