Bases: PJMDailyPricingISOSource
The PJM Historical Pricing ISO Source is used to retrieve historical Real-Time and Day-Ahead hourly data from the PJM API.
API: https://api.pjm.com/api/v1/ (must be a valid apy key from PJM)
Real-Time doc: https://dataminer2.pjm.com/feed/rt_hrl_lmps/definition
Day-Ahead doc: https://dataminer2.pjm.com/feed/da_hrl_lmps/definition
The PJM Historical Pricing ISO Source accesses the same PJM endpoints as the daily pricing source but is tailored for retrieving data within a specified historical range defined by the start_date
and end_date
attributes.
Example
from rtdip_sdk.pipelines.sources import PJMHistoricalPricingISOSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
pjm_source = PJMHistoricalPricingISOSource(
spark=spark,
options={
"api_key": "{api_key}",
"start_date": "2023-05-10",
"end_date": "2023-05-20",
}
)
pjm_source.read_batch()
Parameters:
Name |
Type |
Description |
Default |
spark |
SparkSession
|
The Spark Session instance.
|
required
|
options |
dict
|
A dictionary of ISO Source specific configurations.
|
required
|
Attributes:
Name |
Type |
Description |
api_key |
str
|
A valid key from PJM required for authentication.
|
load_type |
str
|
The type of data to retrieve, either real_time or day_ahead .
|
start_date |
str
|
Must be in YYYY-MM-DD format.
|
end_date |
str
|
Must be in YYYY-MM-DD format.
|
Please refer to the BaseISOSource for available methods and further details.
BaseISOSource: ::: src.sdk.python.rtdip_sdk.pipelines.sources.spark.iso.base_iso
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/pjm_historical_pricing_iso.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158 | class PJMHistoricalPricingISOSource(PJMDailyPricingISOSource):
"""
The PJM Historical Pricing ISO Source is used to retrieve historical Real-Time and Day-Ahead hourly data from the PJM API.
API: <a href="https://api.pjm.com/api/v1/">https://api.pjm.com/api/v1/</a> (must be a valid apy key from PJM)
Real-Time doc: <a href="https://dataminer2.pjm.com/feed/rt_hrl_lmps/definition">https://dataminer2.pjm.com/feed/rt_hrl_lmps/definition</a>
Day-Ahead doc: <a href="https://dataminer2.pjm.com/feed/da_hrl_lmps/definition">https://dataminer2.pjm.com/feed/da_hrl_lmps/definition</a>
The PJM Historical Pricing ISO Source accesses the same PJM endpoints as the daily pricing source but is tailored for retrieving data within a specified historical range defined by the `start_date` and `end_date` attributes.
Example
--------
```python
from rtdip_sdk.pipelines.sources import PJMHistoricalPricingISOSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
pjm_source = PJMHistoricalPricingISOSource(
spark=spark,
options={
"api_key": "{api_key}",
"start_date": "2023-05-10",
"end_date": "2023-05-20",
}
)
pjm_source.read_batch()
```
Parameters:
spark (SparkSession): The Spark Session instance.
options (dict): A dictionary of ISO Source specific configurations.
Attributes:
api_key (str): A valid key from PJM required for authentication.
load_type (str): The type of data to retrieve, either `real_time` or `day_ahead`.
start_date (str): Must be in `YYYY-MM-DD` format.
end_date (str): Must be in `YYYY-MM-DD` format.
Please refer to the BaseISOSource for available methods and further details.
BaseISOSource: ::: src.sdk.python.rtdip_sdk.pipelines.sources.spark.iso.base_iso"""
spark: SparkSession
options: dict
required_options = ["api_key", "load_type", "start_date", "end_date"]
def __init__(self, spark: SparkSession, options: dict) -> None:
super().__init__(spark, options)
self.spark: SparkSession = spark
self.options: dict = options
self.start_date: str = self.options.get("start_date", "")
self.end_date: str = self.options.get("end_date", "")
self.user_datetime_format = "%Y-%m-%d"
def _pull_data(self) -> pd.DataFrame:
"""
Pulls historical pricing data from the PJM API within the specified date range.
Returns:
pd.DataFrame: A DataFrame containing the raw historical pricing data retrieved from the PJM API.
"""
logging.info(
f"Historical data requested from {self.start_date} to {self.end_date}"
)
start_date_str = datetime.strptime(
self.start_date, self.user_datetime_format
).replace(hour=0, minute=0)
end_date_str = datetime.strptime(
self.end_date, self.user_datetime_format
).replace(hour=23)
if self.load_type == "day_ahead":
url_suffix = "da_hrl_lmps"
else:
url_suffix = "rt_hrl_lmps"
data = self._fetch_paginated_data(url_suffix, start_date_str, end_date_str)
df = pd.DataFrame(data)
logging.info(f"Data fetched successfully: {len(df)} rows")
return df
def _validate_options(self) -> bool:
"""
Validates all parameters including the following examples:
- `start_date` & `end_data` must be in the correct format.
- `start_date` must be behind `end_data`.
- `start_date` must not be in the future (UTC).
Returns:
True if all looks good otherwise raises Exception.
"""
super()._validate_options()
try:
start_date = datetime.strptime(self.start_date, self.user_datetime_format)
except ValueError:
raise ValueError(
f"Unable to parse Start date. Please specify in {self.user_datetime_format} format."
)
try:
end_date = datetime.strptime(self.end_date, self.user_datetime_format)
except ValueError:
raise ValueError(
f"Unable to parse End date. Please specify in {self.user_datetime_format} format."
)
if start_date > datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(
days=1
):
raise ValueError("Start date can't be in future.")
if start_date > end_date:
raise ValueError("Start date can't be ahead of End date.")
if end_date > datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(
days=1
):
raise ValueError("End date can't be in future.")
return True
|