Json
DataBricksAutoLoaderSource
Bases: SourceInterface
The Spark Auto Loader is used to read new data files as they arrive in cloud storage. Further information on Auto Loader is available here
Example
from rtdip_sdk.pipelines.sources import DataBricksAutoLoaderSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
options = {}
path = "abfss://{FILE-SYSTEM}@{ACCOUNT-NAME}.dfs.core.windows.net/{PATH}/{FILE-NAME}
format = "{DESIRED-FILE-FORMAT}"
DataBricksAutoLoaderSource(spark, options, path, format).read_stream()
OR
DataBricksAutoLoaderSource(spark, options, path, format).read_batch()
from rtdip_sdk.pipelines.sources import DataBricksAutoLoaderSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
options = {}
path = "https://s3.{REGION-CODE}.amazonaws.com/{BUCKET-NAME}/{KEY-NAME}"
format = "{DESIRED-FILE-FORMAT}"
DataBricksAutoLoaderSource(spark, options, path, format).read_stream()
OR
DataBricksAutoLoaderSource(spark, options, path, format).read_batch()
from rtdip_sdk.pipelines.sources import DataBricksAutoLoaderSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
options = {}
path = "gs://{BUCKET-NAME}/{FILE-PATH}"
format = "{DESIRED-FILE-FORMAT}"
DataBricksAutoLoaderSource(spark, options, path, format).read_stream()
OR
DataBricksAutoLoaderSource(spark, options, path, format).read_batch()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session required to read data from cloud storage |
required |
options |
dict
|
Options that can be specified for configuring the Auto Loader. Further information on the options available are here |
required |
path |
str
|
The cloud storage path |
required |
format |
str
|
Specifies the file format to be read. Supported formats are available here |
required |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/autoloader.py
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK on Databricks |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/autoloader.py
106 107 108 109 110 111 112 |
|
read_batch()
Raises:
Type | Description |
---|---|
NotImplementedError
|
Auto Loader only supports streaming reads. To perform a batch read, use the read_stream method of this component and specify the Trigger on the write_stream to be |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/autoloader.py
130 131 132 133 134 135 136 137 |
|
read_stream()
Performs streaming reads of files in cloud storage.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/autoloader.py
139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
|
SparkDeltaSharingSource
Bases: SourceInterface
The Spark Delta Sharing Source is used to read data from a Delta table where Delta sharing is configured
Example
#Delta Sharing Source for Streaming Queries
from rtdip_sdk.pipelines.sources import SparkDeltaSharingSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
delta_sharing_source = SparkDeltaSharingSource(
spark=spark,
options={
"maxFilesPerTrigger": 1000,
"ignoreChanges: True,
"startingVersion": 0
},
table_name="{YOUR-DELTA-TABLE-PATH}"
)
delta_sharing_source.read_stream()
#Delta Sharing Source for Batch Queries
from rtdip_sdk.pipelines.sources import SparkDeltaSharingSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
delta_sharing_source = SparkDeltaSharingSource(
spark=spark,
options={
"versionAsOf": 0,
"timestampAsOf": "yyyy-mm-dd hh:mm:ss[.fffffffff]"
},
table_name="{YOUR-DELTA-TABLE-PATH}"
)
delta_sharing_source.read_batch()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session required to read data from a Delta table |
required |
options |
dict
|
Options that can be specified for a Delta Table read operation (See Attributes table below). Further information on the options is available here |
required |
table_path |
str
|
Path to credentials file and Delta table to query |
required |
Attributes:
Name | Type | Description |
---|---|---|
ignoreDeletes |
bool str
|
Ignore transactions that delete data at partition boundaries. (Streaming) |
ignoreChanges |
bool str
|
Pre-process updates if files had to be rewritten in the source table due to a data changing operation. (Streaming) |
startingVersion |
int str
|
The Delta Lake version to start from. (Streaming) |
startingTimestamp |
datetime str
|
The timestamp to start from. (Streaming) |
maxFilesPerTrigger |
int
|
How many new files to be considered in every micro-batch. The default is 1000. (Streaming) |
maxBytesPerTrigger |
int
|
How much data gets processed in each micro-batch. (Streaming) |
readChangeFeed |
bool str
|
Stream read the change data feed of the shared table. (Batch & Streaming) |
timestampAsOf |
datetime str
|
Query the Delta Table from a specific point in time. (Batch) |
versionAsOf |
int str
|
Query the Delta Table from a specific version. (Batch) |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/delta_sharing.py
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/delta_sharing.py
98 99 100 101 102 103 104 |
|
read_batch()
Reads batch data from Delta. Most of the options provided by the Apache Spark DataFrame read API are supported for performing batch reads on Delta tables.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/delta_sharing.py
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
|
read_stream()
Reads streaming data from Delta. All of the data in the table is processed as well as any new data that arrives after the stream started. .load() can take table name or path.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/delta_sharing.py
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
|
SparkEventhubSource
Bases: SourceInterface
This Spark source class is used to read batch or streaming data from Eventhubs. Eventhub configurations need to be specified as options in a dictionary. Additionally, there are more optional configurations which can be found here. If using startingPosition or endingPosition make sure to check out the Event Position section for more details and examples.
Example
#Eventhub Source for Streaming Queries
from rtdip_sdk.pipelines.sources import SparkEventhubSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
import json
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
connectionString = "Endpoint=sb://{NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY}=;EntityPath={EVENT_HUB_NAME}"
startingEventPosition = {
"offset": -1,
"seqNo": -1,
"enqueuedTime": None,
"isInclusive": True
}
eventhub_source = SparkEventhubSource(
spark=spark,
options = {
"eventhubs.connectionString": connectionString,
"eventhubs.consumerGroup": "{YOUR-CONSUMER-GROUP}",
"eventhubs.startingPosition": json.dumps(startingEventPosition),
"maxEventsPerTrigger" : 1000
}
)
eventhub_source.read_stream()
#Eventhub Source for Batch Queries
from rtdip_sdk.pipelines.sources import SparkEventhubSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
import json
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
connectionString = "Endpoint=sb://{NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY}=;EntityPath={EVENT_HUB_NAME}"
startingEventPosition = {
"offset": -1,
"seqNo": -1,
"enqueuedTime": None,
"isInclusive": True
}
endingEventPosition = {
"offset": None,
"seqNo": -1,
"enqueuedTime": endTime,
"isInclusive": True
}
eventhub_source = SparkEventhubSource(
spark,
options = {
"eventhubs.connectionString": connectionString,
"eventhubs.consumerGroup": "{YOUR-CONSUMER-GROUP}",
"eventhubs.startingPosition": json.dumps(startingEventPosition),
"eventhubs.endingPosition": json.dumps(endingEventPosition)
}
)
eventhub_source.read_batch()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session |
required |
options |
dict
|
A dictionary of Eventhub configurations (See Attributes table below) |
required |
Attributes:
Name | Type | Description |
---|---|---|
eventhubs.connectionString |
str
|
Eventhubs connection string is required to connect to the Eventhubs service. (Streaming and Batch) |
eventhubs.consumerGroup |
str
|
A consumer group is a view of an entire eventhub. Consumer groups enable multiple consuming applications to each have a separate view of the event stream, and to read the stream independently at their own pace and with their own offsets. (Streaming and Batch) |
eventhubs.startingPosition |
JSON str
|
The starting position for your Structured Streaming job. If a specific EventPosition is not set for a partition using startingPositions, then we use the EventPosition set in startingPosition. If nothing is set in either option, we will begin consuming from the end of the partition. (Streaming and Batch) |
eventhubs.endingPosition |
JSON str
|
(JSON str): The ending position of a batch query. This works the same as startingPosition. (Batch) |
maxEventsPerTrigger |
long
|
Rate limit on maximum number of events processed per trigger interval. The specified total number of events will be proportionally split across partitions of different volume. (Stream) |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/eventhub.py
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/eventhub.py
124 125 126 127 128 129 130 |
|
read_batch()
Reads batch data from Eventhubs.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/eventhub.py
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
|
read_stream()
Reads streaming data from Eventhubs.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/eventhub.py
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
|
SparkIoThubSource
Bases: SourceInterface
This Spark source class is used to read batch or streaming data from an IoT Hub. IoT Hub configurations need to be specified as options in a dictionary. Additionally, there are more optional configurations which can be found here. If using startingPosition or endingPosition make sure to check out the Event Position section for more details and examples.
Example
#IoT Hub Source for Streaming Queries
from rtdip_sdk.pipelines.sources import SparkIoThubSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
import json
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
connectionString = "Endpoint=sb://{NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY}=;EntityPath={EVENT_HUB_NAME}"
startingEventPosition = {
"offset": -1,
"seqNo": -1,
"enqueuedTime": None,
"isInclusive": True
}
iot_hub_source = SparkIoThubSource(
spark=spark,
options = {
"eventhubs.connectionString": connectionString,
"eventhubs.consumerGroup": "{YOUR-CONSUMER-GROUP}",
"eventhubs.startingPosition": json.dumps(startingEventPosition),
"maxEventsPerTrigger" : 1000
}
)
iot_hub_source.read_stream()
#IoT Hub Source for Batch Queries
from rtdip_sdk.pipelines.sources import SparkIoThubSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
import json
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
connectionString = "Endpoint=sb://{NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY}=;EntityPath={EVENT_HUB_NAME}"
startingEventPosition = {
"offset": -1,
"seqNo": -1,
"enqueuedTime": None,
"isInclusive": True
}
endingEventPosition = {
"offset": None,
"seqNo": -1,
"enqueuedTime": endTime,
"isInclusive": True
}
iot_hub_source = SparkIoThubSource(
spark,
options = {
"eventhubs.connectionString": connectionString,
"eventhubs.consumerGroup": "{YOUR-CONSUMER-GROUP}",
"eventhubs.startingPosition": json.dumps(startingEventPosition),
"eventhubs.endingPosition": json.dumps(endingEventPosition)
}
)
iot_hub_source.read_batch()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session |
required |
options |
dict
|
A dictionary of IoT Hub configurations (See Attributes table below) |
required |
Attributes:
Name | Type | Description |
---|---|---|
eventhubs.connectionString |
str
|
IoT Hub connection string is required to connect to the Eventhubs service. (Streaming and Batch) |
eventhubs.consumerGroup |
str
|
A consumer group is a view of an entire IoT Hub. Consumer groups enable multiple consuming applications to each have a separate view of the event stream, and to read the stream independently at their own pace and with their own offsets. (Streaming and Batch) |
eventhubs.startingPosition |
JSON str
|
The starting position for your Structured Streaming job. If a specific EventPosition is not set for a partition using startingPositions, then we use the EventPosition set in startingPosition. If nothing is set in either option, we will begin consuming from the end of the partition. (Streaming and Batch) |
eventhubs.endingPosition |
JSON str
|
(JSON str): The ending position of a batch query. This works the same as startingPosition. (Batch) |
maxEventsPerTrigger |
long
|
Rate limit on maximum number of events processed per trigger interval. The specified total number of events will be proportionally split across partitions of different volume. (Stream) |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iot_hub.py
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iot_hub.py
124 125 126 127 128 129 130 |
|
read_batch()
Reads batch data from IoT Hubs.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iot_hub.py
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
|
read_stream()
Reads streaming data from IoT Hubs.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iot_hub.py
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
|
SparkKafkaSource
Bases: SourceInterface
This Spark source class is used to read batch or streaming data from Kafka. Required and optional configurations can be found in the Attributes tables below.
Additionally, there are more optional configurations which can be found here.
Example
#Kafka Source for Streaming Queries
from rtdip_sdk.pipelines.sources import SparkKafkaSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
kafka_source = SparkKafkaSource(
spark=spark,
options={
"kafka.bootstrap.servers": "{HOST_1}:{PORT_1},{HOST_2}:{PORT_2}",
"subscribe": "{TOPIC_1},{TOPIC_2}",
"includeHeaders", "true"
}
)
kafka_source.read_stream()
#Kafka Source for Batch Queries
from rtdip_sdk.pipelines.sources import SparkKafkaSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
kafka_source = SparkKafkaSource(
spark=spark,
options={
"kafka.bootstrap.servers": "{HOST_1}:{PORT_1},{HOST_2}:{PORT_2}",
"subscribe": "{TOPIC_1},{TOPIC_2}",
"startingOffsets": "earliest",
"endingOffsets": "latest"
}
)
kafka_source.read_batch()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session |
required |
options |
dict
|
A dictionary of Kafka configurations (See Attributes tables below). For more information on configuration options see here |
required |
The following attributes are the most common configurations for Kafka.
The only configuration that must be set for the Kafka source for both batch and streaming queries is listed below.
Attributes:
Name | Type | Description |
---|---|---|
kafka.bootstrap.servers |
A comma-separated list of host︰port
|
The Kafka "bootstrap.servers" configuration. (Streaming and Batch) |
There are multiple ways of specifying which topics to subscribe to. You should provide only one of these attributes:
Attributes:
Name | Type | Description |
---|---|---|
assign |
json string {"topicA"︰[0,1],"topicB"︰[2,4]}
|
Specific TopicPartitions to consume. Only one of "assign", "subscribe" or "subscribePattern" options can be specified for Kafka source. (Streaming and Batch) |
subscribe |
A comma-separated list of topics
|
The topic list to subscribe. Only one of "assign", "subscribe" or "subscribePattern" options can be specified for Kafka source. (Streaming and Batch) |
subscribePattern |
Java regex string
|
The pattern used to subscribe to topic(s). Only one of "assign, "subscribe" or "subscribePattern" options can be specified for Kafka source. (Streaming and Batch) |
The following configurations are optional:
Attributes:
Name | Type | Description |
---|---|---|
startingTimestamp |
timestamp str
|
The start point of timestamp when a query is started, a string specifying a starting timestamp for all partitions in topics being subscribed. Please refer the note on starting timestamp offset options below. (Streaming and Batch) |
startingOffsetsByTimestamp |
JSON str
|
The start point of timestamp when a query is started, a json string specifying a starting timestamp for each TopicPartition. Please refer the note on starting timestamp offset options below. (Streaming and Batch) |
startingOffsets |
"earliest", "latest" (streaming only), or JSON string
|
The start point when a query is started, either "earliest" which is from the earliest offsets, "latest" which is just from the latest offsets, or a json string specifying a starting offset for each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest. |
endingTimestamp |
timestamp str
|
The end point when a batch query is ended, a json string specifying an ending timestamp for all partitions in topics being subscribed. Please refer the note on ending timestamp offset options below. (Batch) |
endingOffsetsByTimestamp |
JSON str
|
The end point when a batch query is ended, a json string specifying an ending timestamp for each TopicPartition. Please refer the note on ending timestamp offset options below. (Batch) |
endingOffsets |
latest or JSON str
|
The end point when a batch query is ended, either "latest" which is just referred to the latest, or a json string specifying an ending offset for each TopicPartition. In the json, -1 as an offset can be used to refer to latest, and -2 (earliest) as an offset is not allowed. (Batch) |
maxOffsetsPerTrigger |
long
|
Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume. (Streaming) |
minOffsetsPerTrigger |
long
|
Minimum number of offsets to be processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume. (Streaming) |
failOnDataLoss |
bool
|
Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected. |
minPartitions |
int
|
Desired minimum number of partitions to read from Kafka. By default, Spark has a 1-1 mapping of topicPartitions to Spark partitions consuming from Kafka. (Streaming and Batch) |
includeHeaders |
bool
|
Whether to include the Kafka headers in the row. (Streaming and Batch) |
Starting Timestamp Offset Note
If Kafka doesn't return the matched offset, the behavior will follow to the value of the option startingOffsetsByTimestampStrategy
.
startingTimestamp
takes precedence over startingOffsetsByTimestamp
and startingOffsets.
For streaming queries, this only applies when a new query is started, and that resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest.
Ending Timestamp Offset Note
If Kafka doesn't return the matched offset, the offset will be set to latest.
endingOffsetsByTimestamp
takes precedence over endingOffsets
.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kafka.py
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kafka.py
130 131 132 133 134 135 136 |
|
read_batch()
Reads batch data from Kafka.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kafka.py
155 156 157 158 159 160 161 162 163 164 165 166 167 |
|
read_stream()
Reads streaming data from Kafka.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kafka.py
169 170 171 172 173 174 175 176 177 178 179 180 181 |
|
SparkKafkaEventhubSource
Bases: SourceInterface
This Spark source class is used to read batch or streaming data from an Eventhub using the Kafka protocol. This enables Eventhubs to be used as a source in applications like Delta Live Tables or Databricks Serverless Jobs as the Spark Eventhubs JAR is not supported in these scenarios.
The dataframe returned is transformed to ensure the schema is as close to the Eventhub Spark source as possible. There are some minor differences:
offset
is dependent onx-opt-offset
being populated in the headers provided. If this is not found in the headers, the value will be nullpublisher
is dependent onx-opt-publisher
being populated in the headers provided. If this is not found in the headers, the value will be nullpartitionKey
is dependent onx-opt-partition-key
being populated in the headers provided. If this is not found in the headers, the value will be nullsystemProperties
are identified according to the list provided in the Eventhub documentation and IoT Hub documentation
Default settings will be specified if not provided in the options
parameter:
kafka.sasl.mechanism
will be set toPLAIN
kafka.security.protocol
will be set toSASL_SSL
kafka.request.timeout.ms
will be set to60000
kafka.session.timeout.ms
will be set to60000
Examples
#Kafka Source for Streaming Queries
from rtdip_sdk.pipelines.sources import SparkKafkaEventhubSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
connectionString = "Endpoint=sb://{NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY}=;EntityPath={EVENT_HUB_NAME}"
consumerGroup = "{YOUR-CONSUMER-GROUP}"
kafka_eventhub_source = SparkKafkaEventhubSource(
spark=spark,
options={
"startingOffsets": "earliest",
"maxOffsetsPerTrigger": 10000,
"failOnDataLoss": "false",
},
connection_string=connectionString,
consumer_group="consumerGroup"
)
kafka_eventhub_source.read_stream()
#Kafka Source for Batch Queries
from rtdip_sdk.pipelines.sources import SparkKafkaEventhubSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
connectionString = "Endpoint=sb://{NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY}=;EntityPath={EVENT_HUB_NAME}"
consumerGroup = "{YOUR-CONSUMER-GROUP}"
kafka_eventhub_source = SparkKafkaEventhubSource(
spark=spark,
options={
"startingOffsets": "earliest",
"endingOffsets": "latest",
"failOnDataLoss": "false"
},
connection_string=connectionString,
consumer_group="consumerGroup"
)
kafka_eventhub_source.read_batch()
Required and optional configurations can be found in the Attributes and Parameter tables below. Additionally, there are more optional configurations which can be found here.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session |
required |
options |
dict
|
A dictionary of Kafka configurations (See Attributes tables below). For more information on configuration options see here |
required |
connection_string |
str
|
Eventhubs connection string is required to connect to the Eventhubs service. This must include the Eventhub name as the |
required |
consumer_group |
str
|
The Eventhub consumer group to use for the connection |
required |
decode_kafka_headers_to_amqp_properties |
optional bool
|
Perform decoding of Kafka headers into their AMQP properties. Default is True |
True
|
The only configuration that must be set for the Kafka source for both batch and streaming queries is listed below.
Attributes:
Name | Type | Description |
---|---|---|
kafka.bootstrap.servers |
A comma-separated list of host︰port
|
The Kafka "bootstrap.servers" configuration. (Streaming and Batch) |
There are multiple ways of specifying which topics to subscribe to. You should provide only one of these parameters:
Attributes:
Name | Type | Description |
---|---|---|
assign |
json string {"topicA"︰[0,1],"topicB"︰[2,4]}
|
Specific TopicPartitions to consume. Only one of "assign", "subscribe" or "subscribePattern" options can be specified for Kafka source. (Streaming and Batch) |
subscribe |
A comma-separated list of topics
|
The topic list to subscribe. Only one of "assign", "subscribe" or "subscribePattern" options can be specified for Kafka source. (Streaming and Batch) |
subscribePattern |
Java regex string
|
The pattern used to subscribe to topic(s). Only one of "assign, "subscribe" or "subscribePattern" options can be specified for Kafka source. (Streaming and Batch) |
The following configurations are optional:
Attributes:
Name | Type | Description |
---|---|---|
startingTimestamp |
timestamp str
|
The start point of timestamp when a query is started, a string specifying a starting timestamp for all partitions in topics being subscribed. Please refer the note on starting timestamp offset options below. (Streaming and Batch) |
startingOffsetsByTimestamp |
JSON str
|
The start point of timestamp when a query is started, a json string specifying a starting timestamp for each TopicPartition. Please refer the note on starting timestamp offset options below. (Streaming and Batch) |
startingOffsets |
"earliest", "latest" (streaming only), or JSON string
|
The start point when a query is started, either "earliest" which is from the earliest offsets, "latest" which is just from the latest offsets, or a json string specifying a starting offset for each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest. |
endingTimestamp |
timestamp str
|
The end point when a batch query is ended, a json string specifying an ending timestamp for all partitions in topics being subscribed. Please refer the note on ending timestamp offset options below. (Batch) |
endingOffsetsByTimestamp |
JSON str
|
The end point when a batch query is ended, a json string specifying an ending timestamp for each TopicPartition. Please refer the note on ending timestamp offset options below. (Batch) |
endingOffsets |
latest or JSON str
|
The end point when a batch query is ended, either "latest" which is just referred to the latest, or a json string specifying an ending offset for each TopicPartition. In the json, -1 as an offset can be used to refer to latest, and -2 (earliest) as an offset is not allowed. (Batch) |
maxOffsetsPerTrigger |
long
|
Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume. (Streaming) |
minOffsetsPerTrigger |
long
|
Minimum number of offsets to be processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume. (Streaming) |
failOnDataLoss |
bool
|
Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected. |
minPartitions |
int
|
Desired minimum number of partitions to read from Kafka. By default, Spark has a 1-1 mapping of topicPartitions to Spark partitions consuming from Kafka. (Streaming and Batch) |
includeHeaders |
bool
|
Whether to include the Kafka headers in the row. (Streaming and Batch) |
Starting Timestamp Offset Note
If Kafka doesn't return the matched offset, the behavior will follow to the value of the option startingOffsetsByTimestampStrategy
.
startingTimestamp
takes precedence over startingOffsetsByTimestamp
and startingOffsets.
For streaming queries, this only applies when a new query is started, and that resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest.
Ending Timestamp Offset Note
If Kafka doesn't return the matched offset, the offset will be set to latest.
endingOffsetsByTimestamp
takes precedence over endingOffsets
.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kafka_eventhub.py
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kafka_eventhub.py
191 192 193 194 195 196 197 |
|
read_batch()
Reads batch data from Kafka.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kafka_eventhub.py
369 370 371 372 373 374 375 376 377 378 379 380 381 382 |
|
read_stream()
Reads streaming data from Kafka.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kafka_eventhub.py
384 385 386 387 388 389 390 391 392 393 394 395 396 397 |
|
SparkKinesisSource
Bases: SourceInterface
The Spark Kinesis Source is used to read data from Kinesis in a Databricks environment. Structured streaming from Kinesis is not supported in open source Spark.
Example
from rtdip_sdk.pipelines.sources import SparkKinesisSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
kinesis_source = SparkKinesisSource(
spark=spark,
options={
"awsAccessKey": "{AWS-ACCESS-KEY}",
"awsSecretKey": "{AWS-SECRET-KEY}",
"streamName": "{STREAM-NAME}",
"region": "{REGION}",
"endpoint": "https://kinesis.{REGION}.amazonaws.com",
"initialPosition": "earliest"
}
)
kinesis_source.read_stream()
OR
kinesis_source.read_batch()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session required to read data from Kinesis |
required |
options |
dict
|
Options that can be specified for a Kinesis read operation (See Attributes table below). Further information on the options is available here |
required |
Attributes:
Name | Type | Description |
---|---|---|
awsAccessKey |
str
|
AWS access key. |
awsSecretKey |
str
|
AWS secret access key corresponding to the access key. |
streamName |
List[str]
|
The stream names to subscribe to. |
region |
str
|
The region the streams are defined in. |
endpoint |
str
|
The regional endpoint for Kinesis Data Streams. |
initialPosition |
str
|
The point to start reading from; earliest, latest, or at_timestamp. |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kinesis.py
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK_DATABRICKS |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kinesis.py
77 78 79 80 81 82 83 |
|
read_batch()
Raises:
Type | Description |
---|---|
NotImplementedError
|
Kinesis only supports streaming reads. To perform a batch read, use the read_stream method of this component and specify the Trigger on the write_stream to be |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kinesis.py
101 102 103 104 105 106 107 108 |
|
read_stream()
Reads streaming data from Kinesis. All of the data in the table is processed as well as any new data that arrives after the stream started.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kinesis.py
110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
|
BaseISOSource
Bases: SourceInterface
Base class for all the ISO Sources. It provides common functionality and helps in reducing the code redundancy.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session instance |
required |
options |
dict
|
A dictionary of ISO Source specific configurations |
required |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
|
pre_read_validation()
Ensures all the required options are provided and performs other validations. Returns: True if all checks are passed.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
175 176 177 178 179 180 181 182 183 184 185 186 |
|
read_batch()
Spark entrypoint, It executes the entire process of pulling, transforming & fixing data. Returns: Final Spark DataFrame converted from Pandas DataFrame post-execution.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
|
read_stream()
By default, the streaming operation is not supported but child classes can override if ISO supports streaming.
Returns:
Type | Description |
---|---|
DataFrame
|
Final Spark DataFrame after all the processing. |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
213 214 215 216 217 218 219 220 221 222 223 224 |
|
ERCOTDailyLoadISOSource
Bases: BaseISOSource
The ERCOT Daily Load ISO Source is used to read daily load data from ERCOT using WebScrapping. It supports actual and forecast data. To read more about the reports, visit the following URLs (The urls are only accessible if the requester/client is in US)-
For load type actual
: Actual System Load by Weather Zone
For load type forecast
: Seven-Day Load Forecast by Weather Zone
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session instance |
required |
options |
dict
|
A dictionary of ISO Source specific configurations (See Attributes table below) |
required |
Attributes:
Name | Type | Description |
---|---|---|
load_type |
list
|
Must be one of |
date |
str
|
Must be in |
certificate_pfx_key |
str
|
The certificate key data or password received from ERCOT. |
certificate_pfx_key_contents |
str
|
The certificate data received from ERCOT, it could be base64 encoded. |
Please check the BaseISOSource for available methods.
BaseISOSource
BaseISOSource
Bases: SourceInterface
Base class for all the ISO Sources. It provides common functionality and helps in reducing the code redundancy.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session instance |
required |
options |
dict
|
A dictionary of ISO Source specific configurations |
required |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
|
pre_read_validation()
Ensures all the required options are provided and performs other validations. Returns: True if all checks are passed.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
175 176 177 178 179 180 181 182 183 184 185 186 |
|
read_batch()
Spark entrypoint, It executes the entire process of pulling, transforming & fixing data. Returns: Final Spark DataFrame converted from Pandas DataFrame post-execution.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
|
read_stream()
By default, the streaming operation is not supported but child classes can override if ISO supports streaming.
Returns:
Type | Description |
---|---|
DataFrame
|
Final Spark DataFrame after all the processing. |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
213 214 215 216 217 218 219 220 221 222 223 224 |
|
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/ercot_daily_load_iso.py
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
|
MISODailyLoadISOSource
Bases: BaseISOSource
The MISO Daily Load ISO Source is used to read daily load data from MISO API. It supports both Actual and Forecast data.
To read more about the available reports from MISO API, download the file - Market Reports
From the list of reports in the file, it pulls the report named
Daily Forecast and Actual Load by Local Resource Zone
.
Actual data is available for one day minus from the given date.
Forecast data is available for next 6 day (inclusive of given date).
Example
from rtdip_sdk.pipelines.sources import MISODailyLoadISOSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
miso_source = MISODailyLoadISOSource(
spark=spark,
options={
"load_type": "actual",
"date": "20230520",
}
)
miso_source.read_batch()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session instance |
required |
options |
dict
|
A dictionary of ISO Source specific configurations (See Attributes table below) |
required |
Attributes:
Name | Type | Description |
---|---|---|
load_type |
str
|
Must be one of |
date |
str
|
Must be in |
Please check the BaseISOSource for available methods.
BaseISOSource
BaseISOSource
Bases: SourceInterface
Base class for all the ISO Sources. It provides common functionality and helps in reducing the code redundancy.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session instance |
required |
options |
dict
|
A dictionary of ISO Source specific configurations |
required |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
|
pre_read_validation()
Ensures all the required options are provided and performs other validations. Returns: True if all checks are passed.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
175 176 177 178 179 180 181 182 183 184 185 186 |
|
read_batch()
Spark entrypoint, It executes the entire process of pulling, transforming & fixing data. Returns: Final Spark DataFrame converted from Pandas DataFrame post-execution.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
|
read_stream()
By default, the streaming operation is not supported but child classes can override if ISO supports streaming.
Returns:
Type | Description |
---|---|
DataFrame
|
Final Spark DataFrame after all the processing. |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
213 214 215 216 217 218 219 220 221 222 223 224 |
|
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/miso_daily_load_iso.py
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
|
MISOHistoricalLoadISOSource
Bases: MISODailyLoadISOSource
The MISO Historical Load ISO Source is used to read historical load data from MISO API.
To read more about the available reports from MISO API, download the file - Market Reports
From the list of reports in the file, it pulls the report named
Historical Daily Forecast and Actual Load by Local Resource Zone
.
Example
from rtdip_sdk.pipelines.sources import MISOHistoricalLoadISOSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
miso_source = MISOHistoricalLoadISOSource(
spark=spark,
options={
"start_date": "20230510",
"end_date": "20230520",
}
)
miso_source.read_batch()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session instance |
required |
options |
dict
|
A dictionary of ISO Source specific configurations (See Attributes table below) |
required |
Attributes:
Name | Type | Description |
---|---|---|
start_date |
str
|
Must be in |
end_date |
str
|
Must be in |
fill_missing |
str
|
Set to |
Please check the BaseISOSource for available methods.
BaseISOSource
BaseISOSource
Bases: SourceInterface
Base class for all the ISO Sources. It provides common functionality and helps in reducing the code redundancy.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session instance |
required |
options |
dict
|
A dictionary of ISO Source specific configurations |
required |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
|
pre_read_validation()
Ensures all the required options are provided and performs other validations. Returns: True if all checks are passed.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
175 176 177 178 179 180 181 182 183 184 185 186 |
|
read_batch()
Spark entrypoint, It executes the entire process of pulling, transforming & fixing data. Returns: Final Spark DataFrame converted from Pandas DataFrame post-execution.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
|
read_stream()
By default, the streaming operation is not supported but child classes can override if ISO supports streaming.
Returns:
Type | Description |
---|---|
DataFrame
|
Final Spark DataFrame after all the processing. |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
213 214 215 216 217 218 219 220 221 222 223 224 |
|
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/miso_historical_load_iso.py
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 |
|
PJMDailyLoadISOSource
Bases: BaseISOSource
The PJM Daily Load ISO Source is used to read daily load data from PJM API. It supports both Actual and Forecast data. Actual will return 1 day, Forecast will return 7 days.
To read more about the reports, visit the following URLs -
Actual doc: ops_sum_prev_period
Forecast doc: load_frcstd_7_day
Example
from rtdip_sdk.pipelines.sources import PJMDailyLoadISOSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
pjm_source = PJMDailyLoadISOSource(
spark=spark,
options={
"api_key": "{api_key}",
"load_type": "actual"
}
)
pjm_source.read_batch()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session instance |
required |
options |
dict
|
A dictionary of ISO Source specific configurations (See Attributes table below) |
required |
Attributes:
Name | Type | Description |
---|---|---|
api_key |
str
|
Must be a valid key from PJM, see api url |
load_type |
str
|
Must be one of |
Please check the BaseISOSource for available methods.
BaseISOSource
BaseISOSource
Bases: SourceInterface
Base class for all the ISO Sources. It provides common functionality and helps in reducing the code redundancy.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session instance |
required |
options |
dict
|
A dictionary of ISO Source specific configurations |
required |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
|
pre_read_validation()
Ensures all the required options are provided and performs other validations. Returns: True if all checks are passed.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
175 176 177 178 179 180 181 182 183 184 185 186 |
|
read_batch()
Spark entrypoint, It executes the entire process of pulling, transforming & fixing data. Returns: Final Spark DataFrame converted from Pandas DataFrame post-execution.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
|
read_stream()
By default, the streaming operation is not supported but child classes can override if ISO supports streaming.
Returns:
Type | Description |
---|---|
DataFrame
|
Final Spark DataFrame after all the processing. |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
213 214 215 216 217 218 219 220 221 222 223 224 |
|
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/pjm_daily_load_iso.py
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
|
PJMDailyPricingISOSource
Bases: BaseISOSource
The PJM Daily Pricing ISO Source is used to retrieve Real-Time and Day-Ahead hourly data from PJM API. Real-Time will return data for T - 3 to T days and Day-Ahead will return T - 3 to T + 1 days data.
API: https://api.pjm.com/api/v1/ (must be a valid apy key from PJM)
Real-Time doc: https://dataminer2.pjm.com/feed/rt_hrl_lmps/definition
Day-Ahead doc: https://dataminer2.pjm.com/feed/da_hrl_lmps/definition
Example
from rtdip_sdk.pipelines.sources import PJMDailyPricingISOSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
pjm_source = PJMDailyPricingISOSource(
spark=spark,
options={
"api_key": "{api_key}",
"load_type": "real_time"
}
)
pjm_source.read_batch()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session instance |
required |
options |
dict
|
A dictionary of ISO Source specific configurations (See Attributes table below) |
required |
Attributes:
Name | Type | Description |
---|---|---|
api_key |
str
|
Must be a valid key from PJM, see api url |
load_type |
str
|
Must be one of |
Please check the BaseISOSource for available methods.
BaseISOSource
BaseISOSource
Bases: SourceInterface
Base class for all the ISO Sources. It provides common functionality and helps in reducing the code redundancy.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session instance |
required |
options |
dict
|
A dictionary of ISO Source specific configurations |
required |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
|
pre_read_validation()
Ensures all the required options are provided and performs other validations. Returns: True if all checks are passed.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
175 176 177 178 179 180 181 182 183 184 185 186 |
|
read_batch()
Spark entrypoint, It executes the entire process of pulling, transforming & fixing data. Returns: Final Spark DataFrame converted from Pandas DataFrame post-execution.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
|
read_stream()
By default, the streaming operation is not supported but child classes can override if ISO supports streaming.
Returns:
Type | Description |
---|---|
DataFrame
|
Final Spark DataFrame after all the processing. |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
213 214 215 216 217 218 219 220 221 222 223 224 |
|
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/pjm_daily_pricing_iso.py
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 |
|
PJMHistoricalPricingISOSource
Bases: PJMDailyPricingISOSource
The PJM Historical Pricing ISO Source is used to retrieve historical Real-Time and Day-Ahead hourly data from the PJM API.
API: https://api.pjm.com/api/v1/ (must be a valid apy key from PJM)
Real-Time doc: https://dataminer2.pjm.com/feed/rt_hrl_lmps/definition
Day-Ahead doc: https://dataminer2.pjm.com/feed/da_hrl_lmps/definition
The PJM Historical Pricing ISO Source accesses the same PJM endpoints as the daily pricing source but is tailored for retrieving data within a specified historical range defined by the start_date
and end_date
attributes.
Example
from rtdip_sdk.pipelines.sources import PJMHistoricalPricingISOSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
pjm_source = PJMHistoricalPricingISOSource(
spark=spark,
options={
"api_key": "{api_key}",
"start_date": "2023-05-10",
"end_date": "2023-05-20",
}
)
pjm_source.read_batch()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
The Spark Session instance. |
required |
options |
dict
|
A dictionary of ISO Source specific configurations. |
required |
Attributes:
Name | Type | Description |
---|---|---|
api_key |
str
|
A valid key from PJM required for authentication. |
load_type |
str
|
The type of data to retrieve, either |
start_date |
str
|
Must be in |
end_date |
str
|
Must be in |
Please refer to the BaseISOSource for available methods and further details.
BaseISOSource: ::: src.sdk.python.rtdip_sdk.pipelines.sources.spark.iso.base_iso
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/pjm_historical_pricing_iso.py
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
|
PJMHistoricalLoadISOSource
Bases: PJMDailyLoadISOSource
The PJM Historical Load ISO Source is used to read historical load data from PJM API.
To read more about the reports, visit the following URLs -
Actual doc: ops_sum_prev_period
Forecast doc: load_frcstd_7_day
Historical is the same PJM endpoint as Actual, but is called repeatedly within a range established by the start_date & end_date attributes
Example
from rtdip_sdk.pipelines.sources import PJMHistoricalLoadISOSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
pjm_source = PJMHistoricalLoadISOSource(
spark=spark,
options={
"api_key": "{api_key}",
"start_date": "20230510",
"end_date": "20230520",
}
)
pjm_source.read_batch()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session instance |
required |
options |
dict
|
A dictionary of ISO Source specific configurations (See Attributes table below) |
required |
Attributes:
Name | Type | Description |
---|---|---|
api_key |
str
|
Must be a valid key from PJM, see PJM documentation |
start_date |
str
|
Must be in |
end_date |
str
|
Must be in |
query_batch_days |
int
|
(optional) Number of days must be < 160 as per PJM & is defaulted to |
sleep_duration |
int
|
(optional) Number of seconds to sleep between request, defaulted to |
request_count |
int
|
(optional) Number of requests made to PJM endpoint before sleep_duration, currently defaulted to |
Please check the BaseISOSource for available methods.
BaseISOSource
BaseISOSource
Bases: SourceInterface
Base class for all the ISO Sources. It provides common functionality and helps in reducing the code redundancy.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session instance |
required |
options |
dict
|
A dictionary of ISO Source specific configurations |
required |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
|
pre_read_validation()
Ensures all the required options are provided and performs other validations. Returns: True if all checks are passed.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
175 176 177 178 179 180 181 182 183 184 185 186 |
|
read_batch()
Spark entrypoint, It executes the entire process of pulling, transforming & fixing data. Returns: Final Spark DataFrame converted from Pandas DataFrame post-execution.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
|
read_stream()
By default, the streaming operation is not supported but child classes can override if ISO supports streaming.
Returns:
Type | Description |
---|---|
DataFrame
|
Final Spark DataFrame after all the processing. |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
213 214 215 216 217 218 219 220 221 222 223 224 |
|
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/pjm_historical_load_iso.py
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
|
CAISODailyLoadISOSource
Bases: BaseISOSource
The CAISO Daily Load ISO Source is used to read daily load data from CAISO API.
It supports multiple types of data. Check the load_types
attribute.
To read more about the available reports from CAISO API, download the file - Interface Specification
From the list of reports in the file, it pulls the report named CAISO Demand Forecast
in the file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session instance |
required |
options |
dict
|
A dictionary of ISO Source specific configurations (See Attributes table below) |
required |
Attributes:
Name | Type | Description |
---|---|---|
load_types |
list
|
Must be a subset of [ |
date |
str
|
Must be in |
Please check the BaseISOSource for available methods.
BaseISOSource
BaseISOSource
Bases: SourceInterface
Base class for all the ISO Sources. It provides common functionality and helps in reducing the code redundancy.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session instance |
required |
options |
dict
|
A dictionary of ISO Source specific configurations |
required |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
|
pre_read_validation()
Ensures all the required options are provided and performs other validations. Returns: True if all checks are passed.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
175 176 177 178 179 180 181 182 183 184 185 186 |
|
read_batch()
Spark entrypoint, It executes the entire process of pulling, transforming & fixing data. Returns: Final Spark DataFrame converted from Pandas DataFrame post-execution.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
|
read_stream()
By default, the streaming operation is not supported but child classes can override if ISO supports streaming.
Returns:
Type | Description |
---|---|
DataFrame
|
Final Spark DataFrame after all the processing. |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
213 214 215 216 217 218 219 220 221 222 223 224 |
|
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/caiso_daily_load_iso.py
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
|
CAISOHistoricalLoadISOSource
Bases: CAISODailyLoadISOSource
The CAISO Historical Load ISO Source is used to read load data for an interval of dates
between start_date and end_date inclusive from CAISO API.
It supports multiple types of data. Check the load_types
attribute.
To read more about the available reports from CAISO API, download the file - Interface Specification
From the list of reports in the file, it pulls the report named CAISO Demand Forecast
in the file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session instance |
required |
options |
dict
|
A dictionary of ISO Source specific configurations (See Attributes table below) |
required |
Attributes:
Name | Type | Description |
---|---|---|
load_types |
list
|
Must be a subset of [ |
start_date |
str
|
Must be in |
end_date |
str
|
Must be in |
Please check the BaseISOSource for available methods.
BaseISOSource
BaseISOSource
Bases: SourceInterface
Base class for all the ISO Sources. It provides common functionality and helps in reducing the code redundancy.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session instance |
required |
options |
dict
|
A dictionary of ISO Source specific configurations |
required |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
|
pre_read_validation()
Ensures all the required options are provided and performs other validations. Returns: True if all checks are passed.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
175 176 177 178 179 180 181 182 183 184 185 186 |
|
read_batch()
Spark entrypoint, It executes the entire process of pulling, transforming & fixing data. Returns: Final Spark DataFrame converted from Pandas DataFrame post-execution.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
|
read_stream()
By default, the streaming operation is not supported but child classes can override if ISO supports streaming.
Returns:
Type | Description |
---|---|
DataFrame
|
Final Spark DataFrame after all the processing. |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
213 214 215 216 217 218 219 220 221 222 223 224 |
|
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/caiso_historical_load_iso.py
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
|
SparkWeatherCompanyBaseWeatherSource
Bases: BaseISOSource
Base class for all the Weather related sources. Provides common functionality.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session instance |
required |
options |
dict
|
A dictionary of Weather Source specific configurations. |
required |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/the_weather_company/base_weather.py
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
|
SparkWeatherCompanyForecastAPIV1Source
Bases: SparkWeatherCompanyBaseWeatherSource
The Weather Forecast API V1 Source is used to read 15 days forecast from the Weather API.
URL: https://api.weather.com/v1/geocode/32.3667/-95.4/forecast/hourly/360hour.json
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session instance |
required |
options |
dict
|
A dictionary of ISO Source specific configurations (See Attributes table below). |
required |
Attributes:
Name | Type | Description |
---|---|---|
lat |
str
|
Latitude of the Weather Station. |
lon |
str
|
Longitude of the Weather Station. |
api_key |
str
|
Weather API key. |
language |
str
|
API response language. Defaults to |
units |
str
|
Unit of measurements. Defaults to |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/the_weather_company/weather_forecast_api_v1.py
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
|
SparkWeatherCompanyForecastAPIV1MultiSource
Bases: SparkWeatherCompanyForecastAPIV1Source
The Weather Forecast API V1 Multi Source is used to read 15 days forecast from the Weather API. It allows to pull weather data for multiple stations and returns all of them in a single DataFrame.
URL for one station: https://api.weather.com/v1/geocode/32.3667/-95.4/forecast/hourly/360hour.json
It takes a list of Weather Stations. Each station item must contain comma separated Latitude & Longitude.
Examples
["32.3667,-95.4", "51.52,-0.11"]
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session instance |
required |
options |
dict
|
A dictionary of ISO Source specific configurations (See Attributes table below). |
required |
Attributes:
Name | Type | Description |
---|---|---|
stations |
list[str]
|
List of Weather Stations. |
api_key |
str
|
Weather API key. |
language |
str
|
API response language. Defaults to |
units |
str
|
Unit of measurements. Defaults to |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/the_weather_company/weather_forecast_api_v1_multi.py
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
|
PythonDeltaSource
Bases: SourceInterface
The Python Delta Source is used to read data from a Delta table without using Apache Spark, returning a Polars LazyFrame.
Example
from rtdip_sdk.pipelines.sources import PythonDeltaSource
path = "abfss://{FILE-SYSTEM}@{ACCOUNT-NAME}.dfs.core.windows.net/{PATH}/{FILE-NAME}
python_delta_source = PythonDeltaSource(
path=path,
version=None,
storage_options={
"azure_storage_account_name": "{AZURE-STORAGE-ACCOUNT-NAME}",
"azure_storage_account_key": "{AZURE-STORAGE-ACCOUNT-KEY}"
},
pyarrow_options=None,
without_files=False
)
python_delta_source.read_batch()
from rtdip_sdk.pipelines.sources import PythonDeltaSource
path = "https://s3.{REGION-CODE}.amazonaws.com/{BUCKET-NAME}/{KEY-NAME}"
python_delta_source = PythonDeltaSource(
path=path,
version=None,
storage_options={
"aws_access_key_id": "{AWS-ACCESS-KEY-ID}",
"aws_secret_access_key": "{AWS-SECRET-ACCESS-KEY}"
},
pyarrow_options=None,
without_files=False
)
python_delta_source.read_batch()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str
|
Path to the Delta table. Can be local or in S3/Azure storage |
required |
version |
optional int
|
Specify the Delta table version to read from. Defaults to the latest version |
None
|
storage_options |
optional dict
|
Used to read from AWS/Azure storage. For AWS use format {"aws_access_key_id": "<>", "aws_secret_access_key":"<>"}. For Azure use format {"azure_storage_account_name": "<>", "azure_storage_account_key": "<>"}. |
None
|
pyarrow_options |
optional dict
|
Data Access and Efficiency options when reading from Delta. See to_pyarrow_dataset. |
None
|
without_files |
optional bool
|
If True loads the table without tracking files |
False
|
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/python/delta.py
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYTHON |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/python/delta.py
97 98 99 100 101 102 103 |
|
read_batch()
Reads data from a Delta table into a Polars LazyFrame
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/python/delta.py
120 121 122 123 124 125 126 127 128 129 130 131 132 |
|
read_stream()
Raises:
Type | Description |
---|---|
NotImplementedError
|
Reading from a Delta table using Python is only possible for batch reads. To perform a streaming read, use the read_stream method of the SparkDeltaSource component. |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/python/delta.py
134 135 136 137 138 139 140 141 |
|
PythonDeltaSharingSource
Bases: SourceInterface
The Python Delta Sharing Source is used to read data from a Delta table with Delta Sharing configured, without using Apache Spark.
Example
from rtdip_sdk.pipelines.sources import PythonDeltaSharingSource
python_delta_sharing_source = PythonDeltaSharingSource(
profile_path="{CREDENTIAL-FILE-LOCATION}",
share_name="{SHARE-NAME}",
schema_name="{SCHEMA-NAME}",
table_name="{TABLE-NAME}"
)
python_delta_sharing_source.read_batch()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
profile_path |
str
|
Location of the credential file. Can be any URL supported by FSSPEC |
required |
share_name |
str
|
The value of 'share=' for the table |
required |
schema_name |
str
|
The value of 'schema=' for the table |
required |
table_name |
str
|
The value of 'name=' for the table |
required |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/python/delta_sharing.py
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYTHON |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/python/delta_sharing.py
62 63 64 65 66 67 68 |
|
read_batch()
Reads data from a Delta table with Delta Sharing into a Polars LazyFrame.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/python/delta_sharing.py
85 86 87 88 89 90 91 92 93 |
|
read_stream()
Raises:
Type | Description |
---|---|
NotImplementedError
|
Reading from a Delta table with Delta Sharing using Python is only possible for batch reads. |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/python/delta_sharing.py
95 96 97 98 99 100 101 102 |
|
SparkECMWFBaseMarsSource
Download nc files from ECMWF MARS server using the ECMWF python API. Data is downloaded in parallel using joblib from ECMWF MARS server using the ECMWF python API.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
save_path |
str
|
Path to local directory where the nc files will be stored, in format "yyyy-mm-dd_HH.nc" |
required |
date_start |
str
|
Start date of extraction in "YYYY-MM-DD HH:MM:SS" format |
required |
date_end |
str
|
End date of extraction in "YYYY-MM-DD HH:MM:SS" format |
required |
ecmwf_api_key |
str
|
API key for ECMWF MARS server |
required |
ecmwf_api_email |
str
|
Email for ECMWF MARS server |
required |
ecmwf_api_url |
str
|
URL for ECMWF MARS server |
'https://api.ecmwf.int/v1'
|
run_frequency |
str
|
Frequency format of runs to download, e.g. "H" |
'H'
|
run_interval |
str
|
Interval of runs, e.g. a run_frequency of "H" and run_interval of "12" will extract the data of the 00 and 12 run for each day. |
'12'
|
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/ecmwf/base_mars.py
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
|
retrieve(mars_dict, n_jobs=None, backend='loky', tries=5, cost=False)
Retrieve the data from the server.
Function will use the ecmwf api to download the data from the server. Note that mars has a max of two active requests per user and 20 queued requests. Data is downloaded in parallel using joblib from ECMWF MARS server using the ECMWF python API.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mars_dict |
dict
|
Dictionary of mars parameters. |
required |
n_jobs |
int
|
Download in parallel? by default None, i.e. no parallelization |
None
|
backend |
str, optional)
|
Specify the parallelization backend implementation in joblib, by default "loky" |
'loky'
|
tries |
int
|
Number of tries for each request if it fails, by default 5 |
5
|
cost |
bool
|
Pass a cost request to mars to estimate the size and efficiency of your request, but not actually download the data. Can be useful for defining requests, by default False. |
False
|
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/ecmwf/base_mars.py
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
|
info()
Return info on each ECMWF request.
Returns:
Type | Description |
---|---|
Series
|
pd.Series: Successful request for each run == 1. |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/ecmwf/base_mars.py
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
|
SparkECMWFWeatherForecastSource
Bases: SourceInterface
The Weather Forecast API V1 Source class to doownload nc files from ECMWF MARS server using the ECMWF python API.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session instance |
required |
save_path |
str
|
Path to local directory where the nc files will be stored, in format "yyyy-mm-dd_HH.nc" |
required |
date_start |
str
|
Start date of extraction in "YYYY-MM-DD HH:MM:SS" format date_end:str, |
required |
date_end |
str
|
End date of extraction in "YYYY-MM-DD HH:MM:SS" format |
required |
ecmwf_class |
str
|
ecmwf classification of data |
required |
stream |
str
|
Operational model stream |
required |
expver |
str
|
Version of data |
required |
leveltype |
str
|
Surface level forecasts |
required |
ec_vars |
list
|
Variables of forecast measurements. |
required |
forecast_area |
list
|
N/W/S/E coordinates of the forecast area |
required |
ecmwf_api_key |
str
|
API key for ECMWF API |
required |
ecmwf_api_email |
str
|
Email for ECMWF API |
required |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/ecmwf/weather_forecast.py
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/ecmwf/weather_forecast.py
75 76 77 78 79 80 81 |
|
read_batch()
Pulls data from the Weather API and returns as .nc files.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/ecmwf/weather_forecast.py
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
|
SparkDeltaSource
Bases: SourceInterface
The Spark Delta Source is used to read data from a Delta table.
Example
#Delta Source for Streaming Queries
from rtdip_sdk.pipelines.sources import SparkDeltaSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
delta_source = SparkDeltaSource(
spark=spark,
options={
"maxFilesPerTrigger": 1000,
"ignoreChanges: True,
"startingVersion": 0
},
table_name="{YOUR-DELTA-TABLE-PATH}"
)
delta_source.read_stream()
#Delta Source for Batch Queries
from rtdip_sdk.pipelines.sources import SparkDeltaSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
delta_source = SparkDeltaSource(
spark=spark,
options={
"versionAsOf": 0,
"timestampAsOf": "yyyy-mm-dd hh:mm:ss[.fffffffff]"
},
table_name="{YOUR-DELTA-TABLE-PATH}"
)
delta_source.read_batch()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session required to read data from a Delta table. |
required |
options |
dict
|
required | |
table_name |
str
|
Name of the Hive Metastore or Unity Catalog Delta Table |
required |
Attributes:
Name | Type | Description |
---|---|---|
maxFilesPerTrigger |
int
|
How many new files to be considered in every micro-batch. The default is 1000. (Streaming) |
maxBytesPerTrigger |
int
|
How much data gets processed in each micro-batch. (Streaming) |
ignoreDeletes |
bool str
|
Ignore transactions that delete data at partition boundaries. (Streaming) |
ignoreChanges |
bool str
|
Pre-process updates if files had to be rewritten in the source table due to a data changing operation. (Streaming) |
startingVersion |
int str
|
The Delta Lake version to start from. (Streaming) |
startingTimestamp |
datetime str
|
The timestamp to start from. (Streaming) |
withEventTimeOrder |
bool str
|
Whether the initial snapshot should be processed with event time order. (Streaming) |
timestampAsOf |
datetime str
|
Query the Delta Table from a specific point in time. (Batch) |
versionAsOf |
int str
|
Query the Delta Table from a specific version. (Batch) |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/delta.py
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/delta.py
98 99 100 101 102 103 104 |
|
read_batch()
Reads batch data from Delta. Most of the options provided by the Apache Spark DataFrame read API are supported for performing batch reads on Delta tables.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/delta.py
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
|
read_stream()
Reads streaming data from Delta. All of the data in the table is processed as well as any new data that arrives after the stream started. .load() can take table name or path.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/delta.py
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
|
BinaryToStringTransformer
Bases: TransformerInterface
Converts a dataframe body column from a binary to a string.
Example
from rtdip_sdk.pipelines.transformers import BinaryToStringTransformer
binary_to_string_transformer = BinaryToStringTransformer(
data=df,
souce_column_name="body",
target_column_name="body"
)
result = binary_to_string_transformer.transform()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
DataFrame
|
Dataframe to be transformed |
required |
source_column_name |
str
|
Spark Dataframe column containing the Binary data |
required |
target_column_name |
str
|
Spark Dataframe column name to be used for the String data |
required |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/binary_to_string.py
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/binary_to_string.py
56 57 58 59 60 61 62 |
|
transform()
Returns:
Name | Type | Description |
---|---|---|
DataFrame |
DataFrame
|
A dataframe with the body column converted to string. |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/binary_to_string.py
79 80 81 82 83 84 85 86 |
|
OPCPublisherOPCUAJsonToPCDMTransformer
Bases: TransformerInterface
Converts a Spark Dataframe column containing a json string created by OPC Publisher to the Process Control Data Model.
Example
from rtdip_sdk.pipelines.transformers import OPCPublisherOPCUAJsonToPCDMTransformer
opc_publisher_opcua_json_to_pcdm_transformer = OPCPublisherOPCUAJsonToPCDMTransformer(
data=df,
souce_column_name="body",
multiple_rows_per_message=True,
status_null_value="Good",
change_type_value="insert",
timestamp_formats=[
"yyyy-MM-dd'T'HH:mm:ss.SSSX",
"yyyy-MM-dd'T'HH:mm:ssX"
],
filter=None
)
result = opc_publisher_opcua_json_to_pcdm_transformer.transform()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
DataFrame
|
Dataframe containing the column with Json OPC UA data |
required |
source_column_name |
str
|
Spark Dataframe column containing the OPC Publisher Json OPC UA data |
required |
multiple_rows_per_message |
optional bool
|
Each Dataframe Row contains an array of/multiple OPC UA messages. The list of Json will be exploded into rows in the Dataframe. |
True
|
status_null_value |
optional str
|
If populated, will replace null values in the Status column with the specified value. |
None
|
change_type_value |
optional str
|
If populated, will replace 'insert' in the ChangeType column with the specified value. |
'insert'
|
timestamp_formats |
optional list[str]
|
Specifies the timestamp formats to be used for converting the timestamp string to a Timestamp Type. For more information on formats, refer to this documentation. |
["yyyy-MM-dd'T'HH:mm:ss.SSSX", "yyyy-MM-dd'T'HH:mm:ssX"]
|
filter |
optional str
|
Enables providing a filter to the data which can be required in certain scenarios. For example, it would be possible to filter on IoT Hub Device Id and Module by providing a filter in SQL format such as |
None
|
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/opc_publisher_opcua_json_to_pcdm.py
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/opc_publisher_opcua_json_to_pcdm.py
99 100 101 102 103 104 105 |
|
transform()
Returns:
Name | Type | Description |
---|---|---|
DataFrame |
DataFrame
|
A dataframe with the specified column converted to PCDM |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/opc_publisher_opcua_json_to_pcdm.py
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
|
OPCPublisherOPCAEJsonToPCDMTransformer
Bases: TransformerInterface
Converts a Spark Dataframe column containing a json string created by OPC Publisher for A&E(Alarm &Events) data to the Process Control Data Model.
Example
from rtdip_sdk.pipelines.transformers import OPCPublisherOPCAEJsonToPCDMTransformer
opc_publisher_opcae_json_to_pcdm_transformer = OPCPublisherOPCAEJsonToPCDMTransformer(
data=df,
souce_column_name="body",
timestamp_formats=[
"yyyy-MM-dd'T'HH:mm:ss.SSSX",
"yyyy-MM-dd'T'HH:mm:ssX"
],
filter=None
)
result = opc_publisher_opcae_json_to_pcdm_transformer.transform()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
DataFrame
|
Dataframe containing the column with Json OPC AE data |
required |
source_column_name |
str
|
Spark Dataframe column containing the OPC Publisher Json OPC AE data |
required |
timestamp_formats |
optional list[str]
|
Specifies the timestamp formats to be used for converting the timestamp string to a Timestamp Type. For more information on formats, refer to this documentation. |
None
|
filter |
optional str
|
Enables providing a filter to the data which can be required in certain scenarios. For example, it would be possible to filter on IoT Hub Device Id and Module by providing a filter in SQL format such as |
None
|
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/opc_publisher_opcae_json_to_pcdm.py
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/opc_publisher_opcae_json_to_pcdm.py
79 80 81 82 83 84 85 |
|
transform()
Returns:
Name | Type | Description |
---|---|---|
DataFrame |
DataFrame
|
A dataframe with the OPC Publisher A&E data converted to the Process Control Data Model |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/opc_publisher_opcae_json_to_pcdm.py
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
|
FledgeOPCUAJsonToPCDMTransformer
Bases: TransformerInterface
Converts a Spark Dataframe column containing a json string created by Fledge to the Process Control Data Model.
Example
from rtdip_sdk.pipelines.transformers import FledgeOPCUAJsonToPCDMTransformer
fledge_opcua_json_to_pcdm_transfromer = FledgeOPCUAJsonToPCDMTransformer(
data=df,
souce_column_name="body",
status_null_value="Good",
change_type_value="insert",
timestamp_formats=[
"yyyy-MM-dd'T'HH:mm:ss.SSSX",
"yyyy-MM-dd'T'HH:mm:ssX",
]
)
result = fledge_opcua_json_to_pcdm_transfromer.transform()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
DataFrame
|
Dataframe containing the column with Json Fledge data |
required |
source_column_name |
str
|
Spark Dataframe column containing the OPC Publisher Json OPC UA data |
required |
status_null_value |
str
|
If populated, will replace 'Good' in the Status column with the specified value. |
'Good'
|
change_type_value |
optional str
|
If populated, will replace 'insert' in the ChangeType column with the specified value. |
'insert'
|
timestamp_formats |
list[str]
|
Specifies the timestamp formats to be used for converting the timestamp string to a Timestamp Type. For more information on formats, refer to this documentation. |
["yyyy-MM-dd'T'HH:mm:ss.SSSX", "yyyy-MM-dd'T'HH:mm:ssX"]
|
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/fledge_opcua_json_to_pcdm.py
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/fledge_opcua_json_to_pcdm.py
85 86 87 88 89 90 91 |
|
transform()
Returns:
Name | Type | Description |
---|---|---|
DataFrame |
DataFrame
|
A dataframe with the specified column converted to PCDM |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/fledge_opcua_json_to_pcdm.py
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
|
SSIPPIBinaryFileToPCDMTransformer
Bases: TransformerInterface
Converts a Spark DataFrame column containing binaryFile parquet data to the Process Control Data Model.
This DataFrame should contain a path and the binary data. Typically this can be done using the Autoloader source component and specify "binaryFile" as the format.
For more information about the SSIP PI Batch Connector, please see here.
Example
from rtdip_sdk.pipelines.transformers import SSIPPIBinaryFileToPCDMTransformer
ssip_pi_binary_file_to_pcdm_transformer = SSIPPIBinaryFileToPCDMTransformer(
data=df
)
result = ssip_pi_binary_file_to_pcdm_transformer.transform()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
DataFrame
|
DataFrame containing the path and binaryFile data |
required |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/ssip_pi_binary_file_to_pcdm.py
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/ssip_pi_binary_file_to_pcdm.py
54 55 56 57 58 59 60 |
|
transform()
Returns:
Name | Type | Description |
---|---|---|
DataFrame |
DataFrame
|
A dataframe with the provided Binary data convert to PCDM |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/ssip_pi_binary_file_to_pcdm.py
126 127 128 129 130 131 132 133 134 |
|
SSIPPIJsonStreamToPCDMTransformer
Bases: TransformerInterface
Converts a Spark DataFrame containing Binary JSON data and related Properties to the Process Control Data Model
For more information about the SSIP PI Streaming Connector, please see here.
Example
from rtdip_sdk.pipelines.transformers import SSIPPIJsonStreamToPCDMTransformer
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
ssip_pi_json_stream_to_pcdm_transformer = SSIPPIJsonStreamToPCDMTransformer(
spark=spark,
data=df,
source_column_name="body",
properties_column_name="",
metadata_delta_table=None
)
result = ssip_pi_json_stream_to_pcdm_transformer.transform()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session |
required |
data |
DataFrame
|
DataFrame containing the path and binaryFile data |
required |
source_column_name |
str
|
Spark Dataframe column containing the Binary json data |
required |
properties_column_name |
str
|
Spark Dataframe struct typed column containing an element with the PointType |
required |
metadata_delta_table |
(optional, str)
|
Name of a metadata table that can be used for PointType mappings |
None
|
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/ssip_pi_binary_json_to_pcdm.py
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/ssip_pi_binary_json_to_pcdm.py
77 78 79 80 81 82 83 |
|
transform()
Returns:
Name | Type | Description |
---|---|---|
DataFrame |
DataFrame
|
A dataframe with the provided Binary data converted to PCDM |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/ssip_pi_binary_json_to_pcdm.py
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
|
AIOJsonToPCDMTransformer
Bases: TransformerInterface
Converts a Spark Dataframe column containing a json string created by AIO to the Process Control Data Model.
Example
from rtdip_sdk.pipelines.transformers import AIOJsonToPCDMTransformer
aio_json_to_pcdm_transfromer = AIOJsonToPCDMTransformer(
data=df,
souce_column_name="body",
status_null_value="Good",
change_type_value="insert"
)
result = aio_json_to_pcdm_transfromer.transform()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
DataFrame
|
Dataframe containing the column with Json AIO data |
required |
source_column_name |
str
|
Spark Dataframe column containing the Json AIO data |
required |
status_null_value |
str
|
If populated, will replace 'Good' in the Status column with the specified value. |
'Good'
|
change_type_value |
optional str
|
If populated, will replace 'insert' in the ChangeType column with the specified value. |
'insert'
|
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/aio_json_to_pcdm.py
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/aio_json_to_pcdm.py
66 67 68 69 70 71 72 |
|
transform()
Returns:
Name | Type | Description |
---|---|---|
DataFrame |
DataFrame
|
A dataframe with the specified column converted to PCDM |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/aio_json_to_pcdm.py
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
|
OPCUAJsonToPCDMTransformer
Bases: TransformerInterface
Converts a Spark Dataframe column containing a json string created by Open Source OPC UA to the Process Control Data Model.
Example
from rtdip_sdk.pipelines.transformers import OPCUAJsonToPCDMTransformer
opcua_json_to_pcdm_transfromer = OPCUAJsonToPCDMTransformer(
data=df,
souce_column_name="body",
status_null_value="Good",
change_type_value="insert"
)
result = opcua_json_to_pcdm_transfromer.transform()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
DataFrame
|
Dataframe containing the column with Json OPC UA data |
required |
source_column_name |
str
|
Spark Dataframe column containing the OPC Publisher Json OPC UA data |
required |
status_null_value |
str
|
If populated, will replace 'Good' in the Status column with the specified value. |
'Good'
|
change_type_value |
optional str
|
If populated, will replace 'insert' in the ChangeType column with the specified value. |
'insert'
|
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/opcua_json_to_pcdm.py
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/opcua_json_to_pcdm.py
66 67 68 69 70 71 72 |
|
transform()
Returns:
Name | Type | Description |
---|---|---|
DataFrame |
DataFrame
|
A dataframe with the specified column converted to PCDM |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/opcua_json_to_pcdm.py
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
|
CAISOToMDMTransformer
Bases: BaseRawToMDMTransformer
Converts CAISO Raw data into Meters Data Model.
Please check the BaseRawToMDMTransformer for the required arguments and methods.
Example
from rtdip_sdk.pipelines.transformers import CAISOToMDMTransformer
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
caiso_to_mdm_transformer = CAISOToMDMTransformer(
spark=spark,
data=df,
output_type="usage",
name=None,
description=None,
value_type=None,
version=None,
series_id=None,
series_parent_id=None
)
result = caiso_to_mdm_transformer.transform()
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/iso/caiso_to_mdm.py
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
|
ERCOTToMDMTransformer
Bases: BaseRawToMDMTransformer
Converts ERCOT Raw data into Meters Data Model.
Please check the BaseRawToMDMTransformer for the required arguments and methods.
Example
from rtdip_sdk.pipelines.transformers import ERCOTToMDMTransformer
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
ercot_to_mdm_transformer = ERCOTToMDMTransformer(
spark=spark,
data=df,
output_type="usage",
name=None,
description=None,
value_type=None,
version=None,
series_id=None,
series_parent_id=None
)
result = ercot_to_mdm_transformer.transform()
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/iso/ercot_to_mdm.py
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
|
MISOToMDMTransformer
Bases: BaseRawToMDMTransformer
Converts MISO Raw data into Meters Data Model.
Please check the BaseRawToMDMTransformer for the required arguments and methods.
Example
from rtdip_sdk.pipelines.transformers import MISOToMDMTransformer
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
miso_to_mdm_transformer = MISOToMDMTransformer(
spark=spark,
data=df,
output_type="usage",
name=None,
description=None,
value_type=None,
version=None,
series_id=None,
series_parent_id=None
)
result = miso_to_mdm_transformer.transform()
BaseRawToMDMTransformer
BaseRawToMDMTransformer
Bases: TransformerInterface
Base class for all the Raw to Meters Data Model Transformers.
Meters Data Model requires two outputs
UsageData
: To store measurement(value) as timeseries data.MetaData
: To store meters related meta information.
It supports the generation of both the outputs as they share some common properties.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session instance. |
required |
data |
DataFrame
|
Dataframe containing the raw MISO data. |
required |
output_type |
str
|
Must be one of |
required |
name |
str
|
Set this to override default |
None
|
description |
str
|
Set this to override default |
None
|
value_type |
ValueType
|
Set this to override default |
None
|
version |
str
|
Set this to override default |
None
|
series_id |
str
|
Set this to override default |
None
|
series_parent_id |
str
|
Set this to override default |
None
|
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/base_raw_to_mdm.py
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 |
|
transform()
Returns:
Name | Type | Description |
---|---|---|
DataFrame |
DataFrame
|
A dataframe with the raw data converted into MDM. |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/base_raw_to_mdm.py
153 154 155 156 157 158 159 160 161 162 163 164 |
|
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/iso/miso_to_mdm.py
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
|
PJMToMDMTransformer
Bases: BaseRawToMDMTransformer
Converts PJM Raw data into Meters Data Model.
Please check the BaseRawToMDMTransformer for the required arguments and methods.
Example
from rtdip_sdk.pipelines.transformers import PJMToMDMTransformer
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
pjm_to_mdm_transformer = PJMToMDMTransformer(
spark=spark,
data=df,
output_type="usage",
name=None,
description=None,
value_type=None,
version=None,
series_id=None,
series_parent_id=None
)
result = pjm_to_mdm_transformer.transform()
BaseRawToMDMTransformer
BaseRawToMDMTransformer
Bases: TransformerInterface
Base class for all the Raw to Meters Data Model Transformers.
Meters Data Model requires two outputs
UsageData
: To store measurement(value) as timeseries data.MetaData
: To store meters related meta information.
It supports the generation of both the outputs as they share some common properties.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session instance. |
required |
data |
DataFrame
|
Dataframe containing the raw MISO data. |
required |
output_type |
str
|
Must be one of |
required |
name |
str
|
Set this to override default |
None
|
description |
str
|
Set this to override default |
None
|
value_type |
ValueType
|
Set this to override default |
None
|
version |
str
|
Set this to override default |
None
|
series_id |
str
|
Set this to override default |
None
|
series_parent_id |
str
|
Set this to override default |
None
|
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/base_raw_to_mdm.py
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 |
|
transform()
Returns:
Name | Type | Description |
---|---|---|
DataFrame |
DataFrame
|
A dataframe with the raw data converted into MDM. |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/base_raw_to_mdm.py
153 154 155 156 157 158 159 160 161 162 163 164 |
|
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/iso/pjm_to_mdm.py
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
|
EdgeXOPCUAJsonToPCDMTransformer
Bases: TransformerInterface
Converts a Spark Dataframe column containing a json string created by EdgeX to the Process Control Data Model.
Example
from rtdip_sdk.pipelines.transformers import EdgeXOPCUAJsonToPCDMTransformer
edge_opcua_json_to_pcdm_transformer = EdgeXOPCUAJsonToPCDMTransformer(
data=df,
souce_column_name="body",
status_null_value="Good",
change_type_value="insert"
)
result = edge_opcua_json_to_pcdm_transformer.transform()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
DataFrame
|
Dataframe containing the column with EdgeX data |
required |
source_column_name |
str
|
Spark Dataframe column containing the OPC Publisher Json OPC UA data |
required |
status_null_value |
optional str
|
If populated, will replace 'Good' in the Status column with the specified value. |
'Good'
|
change_type_value |
optional str
|
If populated, will replace 'insert' in the ChangeType column with the specified value. |
'insert'
|
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/edgex_opcua_json_to_pcdm.py
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/edgex_opcua_json_to_pcdm.py
69 70 71 72 73 74 75 |
|
transform()
Returns:
Name | Type | Description |
---|---|---|
DataFrame |
DataFrame
|
A dataframe with the specified column converted to PCDM |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/edgex_opcua_json_to_pcdm.py
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
|
ECMWFExtractBaseToWeatherDataModel
Bases: TransformerInterface
Base class for extracting forecast data downloaded in .nc format from ECMWF MARS Server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
load_path |
str
|
Path to local directory where the nc files will be stored, in format "yyyy-mm-dd_HH.nc" |
required |
date_start |
str
|
Start date of extraction in "YYYY-MM-DD HH:MM:SS" format |
required |
date_end |
str
|
End date of extraction in "YYYY-MM-DD HH:MM:SS" format |
required |
run_frequency |
str
|
Frequency format of runs to download, e.g. "H" |
required |
run_interval |
str
|
Interval of runs, e.g. a run_frequency of "H" and run_interval of "12" will extract the data of the 00 and 12 run for each day. |
required |
lat |
DataArray
|
Latitude values to extract from nc files |
required |
lon |
DataArray
|
Longitude values to extract from nc files |
required |
utc |
bool = True
|
Whether to convert the time to UTC or not |
True
|
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/ecmwf/nc_extractbase_to_weather_data_model.py
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/ecmwf/nc_extractbase_to_weather_data_model.py
68 69 70 71 72 73 74 |
|
transform(tag_prefix, variables, method='nearest')
Extract raw data from stored nc filed downloaded via ECMWF MARS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tag_prefix |
str
|
Prefix of the tag names of raw tags to be added to the dataframe |
required |
variables |
list
|
List of variable names of raw tags to be extracted from the nc files |
required |
method |
str
|
The method used to match latitude/longitude in xarray using .sel(), by default "nearest" |
'nearest'
|
Returns:
Name | Type | Description |
---|---|---|
df |
DataFrame
|
Raw data extracted with lat, lon, run_time, target_time as a pd.multiindex and variables as columns. |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/ecmwf/nc_extractbase_to_weather_data_model.py
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
|
ECMWFExtractGridToWeatherDataModel
Bases: ECMWFExtractBaseToWeatherDataModel
Extract a grid from a local .nc file downloaded from ECMWF via MARS
Parameters:
Name | Type | Description | Default |
---|---|---|---|
lat_min |
float
|
Minimum latitude of grid to extract |
required |
lat_max |
float
|
Maximum latitude of grid to extract |
required |
lon_min |
float
|
Minimum longitude of grid to extract |
required |
lon_max |
float
|
Maximum longitude of grid to extract |
required |
grid_step |
float
|
The grid length to use to define the grid, e.g. 0.1. |
required |
load_path |
str
|
Path to local directory with nc files downloaded in format "yyyy-mm-dd_HH.nc" |
required |
date_start |
str
|
Start date of extraction in "YYYY-MM-DD HH:MM:SS" format |
required |
date_end |
str
|
End date of extraction in "YYYY-MM-DD HH:MM:SS" format |
required |
run_frequency |
str
|
Frequency format of runs to download, e.g. "H" |
required |
run_interval |
str
|
Interval of runs, e.g. a run_frequency of "H" and run_interval of "12" will extract the data of the 00 and 12 run for each day. |
required |
utc |
bool
|
Add utc to the datetime indexes? Defaults to True. |
True
|
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/ecmwf/nc_extractgrid_to_weather_data_model.py
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
|
ECMWFExtractPointToWeatherDataModel
Bases: ECMWFExtractBaseToWeatherDataModel
Extract a single point from a local .nc file downloaded from ECMWF via MARS
Parameters:
Name | Type | Description | Default |
---|---|---|---|
lat |
float
|
Latitude of point to extract |
required |
lon |
float
|
Longitude of point to extract |
required |
load_path |
str
|
Path to local directory with nc files downloaded in format "yyyy-mm-dd_HH.nc" |
required |
date_start |
str
|
Start date of extraction in "YYYY-MM-DD HH:MM:SS" format |
required |
date_end |
str
|
End date of extraction in "YYYY-MM-DD HH:MM:SS" format |
required |
run_frequency |
str
|
Frequency format of runs to download, e.g. "H" |
required |
run_interval |
str
|
Interval of runs, e.g. a run_frequency of "H" and run_interval of "12" will extract the data of the 00 and 12 run for each day. |
required |
utc |
bool
|
Add utc to the datetime indexes? Defaults to True. |
True
|
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/ecmwf/nc_extractpoint_to_weather_data_model.py
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
|
RawForecastToWeatherDataModel
Bases: TransformerInterface
Converts a raw forecast into weather data model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session instance. |
required |
data |
DataFrame
|
Dataframe to be transformed |
required |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/the_weather_company/raw_forecast_to_weather_data_model.py
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/the_weather_company/raw_forecast_to_weather_data_model.py
46 47 48 49 50 51 52 |
|
transform()
Returns:
Name | Type | Description |
---|---|---|
DataFrame |
DataFrame
|
A Forecast dataframe converted into Weather Data Model |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/the_weather_company/raw_forecast_to_weather_data_model.py
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
|
PCDMToHoneywellAPMTransformer
Bases: TransformerInterface
Converts a Spark Dataframe in PCDM format to Honeywell APM format.
Example
from rtdip_sdk.pipelines.transformers import PCDMToHoneywellAPMTransformer
pcdm_to_honeywell_apm_transformer = PCDMToHoneywellAPMTransformer(
data=df,
quality="Good",
history_samples_per_message=1,
compress_payload=True
)
result = pcdm_to_honeywell_apm_transformer.transform()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Dataframe
|
Spark Dataframe in PCDM format |
required |
quality |
str
|
Value for quality inside HistorySamples |
'Good'
|
history_samples_per_message |
int
|
The number of HistorySamples for each row in the DataFrame (Batch Only) |
1
|
compress_payload |
bool
|
If True compresses CloudPlatformEvent with gzip compression |
True
|
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/pcdm_to_honeywell_apm.py
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/pcdm_to_honeywell_apm.py
83 84 85 86 87 88 89 |
|
transform()
Returns:
Name | Type | Description |
---|---|---|
DataFrame |
DataFrame
|
A dataframe with with rows in Honeywell APM format |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/pcdm_to_honeywell_apm.py
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
|
HoneywellAPMJsonToPCDMTransformer
Bases: TransformerInterface
Converts a Spark Dataframe column containing a json string created by Honeywell APM to the Process Control Data Model.
Example
from rtdip_sdk.pipelines.transformers import HoneywellAPMJsonToPCDMTransformer
honeywell_apm_json_to_pcdm_transformer = HoneywellAPMJsonToPCDMTransformer(
data=df,
souce_column_name="body",
status_null_value="Good",
change_type_value="insert"
)
result = honeywell_apm_json_to_pcdm_transformer.transform()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
DataFrame
|
Dataframe containing the column with EdgeX data |
required |
source_column_name |
str
|
Spark Dataframe column containing the OPC Publisher Json OPC UA data |
required |
status_null_value |
optional str
|
If populated, will replace 'Good' in the Status column with the specified value. |
'Good'
|
change_type_value |
optional str
|
If populated, will replace 'insert' in the ChangeType column with the specified value. |
'insert'
|
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/honeywell_apm_to_pcdm.py
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/honeywell_apm_to_pcdm.py
66 67 68 69 70 71 72 |
|
transform()
Returns:
Name | Type | Description |
---|---|---|
DataFrame |
DataFrame
|
A dataframe with the specified column converted to PCDM |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/honeywell_apm_to_pcdm.py
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
|
SEMJsonToPCDMTransformer
Bases: TransformerInterface
Converts a Spark Dataframe column containing a json string created by SEM to the Process Control Data Model.
Example
from rtdip_sdk.pipelines.transformers import SEMJsonToPCDMTransformer
sem_json_to_pcdm_transformer = SEMJsonToPCDMTransformer(
data=df
source_column_name="body",
version=10,
status_null_value="Good",
change_type_value="insert"
)
result = sem_json_to_pcdm_transformer.transform()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
DataFrame
|
Dataframe containing the column with SEM data |
required |
source_column_name |
str
|
Spark Dataframe column containing the Json SEM data |
required |
version |
int
|
The version for the OBC field mappings. The latest version is 10. |
required |
status_null_value |
optional str
|
If populated, will replace 'Good' in the Status column with the specified value. |
'Good'
|
change_type_value |
optional str
|
If populated, will replace 'insert' in the ChangeType column with the specified value. |
'insert'
|
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/sem_json_to_pcdm.py
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/sem_json_to_pcdm.py
87 88 89 90 91 92 93 |
|
transform()
Returns:
Name | Type | Description |
---|---|---|
DataFrame |
DataFrame
|
A dataframe with the specified column converted to PCDM |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/sem_json_to_pcdm.py
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
|
MiricoJsonToPCDMTransformer
Bases: TransformerInterface
Converts a Spark Dataframe column containing a json string created from Mirico to the Process Control Data Model.
Example
from rtdip_sdk.pipelines.transformers import MiricoJsonToPCDMTransformer
mirico_json_to_pcdm_transformer = MiricoJsonToPCDMTransformer(
data=df
source_column_name="body",
status_null_value="Good",
change_type_value="insert"
tagname_field="test"
)
result = mirico_json_to_pcdm_transformer.transform()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
DataFrame
|
Dataframe containing the column with Mirico data |
required |
source_column_name |
str
|
Spark Dataframe column containing the Json Mirico data |
required |
status_null_value |
optional str
|
If populated, will replace 'Good' in the Status column with the specified value. |
'Good'
|
change_type_value |
optional str
|
If populated, will replace 'insert' in the ChangeType column with the specified value. |
'insert'
|
tagname_field |
optional str
|
If populated, will add the specified field to the TagName column. |
None
|
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/mirico_json_to_pcdm.py
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/mirico_json_to_pcdm.py
86 87 88 89 90 91 92 |
|
transform()
Returns:
Name | Type | Description |
---|---|---|
DataFrame |
DataFrame
|
A dataframe with the specified column converted to PCDM |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/mirico_json_to_pcdm.py
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
|
MiricoJsonToMetadataTransformer
Bases: TransformerInterface
Converts a Spark Dataframe column containing a json string created from Mirico to the Metadata Model.
Example
from rtdip_sdk.pipelines.transformers import MiricoJsonToMetadataTransformer
mirico_json_to_metadata_transformer = MiricoJsonToMetadataTransformer(
data=df
source_column_name="body"
)
result = mirico_json_to_metadata_transformer.transform()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
DataFrame
|
Dataframe containing the column with Mirico data |
required |
source_column_name |
str
|
Spark Dataframe column containing the Json Mirico data |
required |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/mirico_json_to_metadata.py
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/mirico_json_to_metadata.py
62 63 64 65 66 67 68 |
|
transform()
Returns:
Name | Type | Description |
---|---|---|
DataFrame |
DataFrame
|
A dataframe with the specified column converted to Metadata model |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/mirico_json_to_metadata.py
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
|
PandasToPySparkTransformer
Bases: TransformerInterface
Converts a Pandas DataFrame to a PySpark DataFrame.
Example
from rtdip_sdk.pipelines.transformers import PandasToPySparkTransformer
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
pandas_to_pyspark = PandasToPySparkTransformer(
spark=spark,
df=df,
)
result = pandas_to_pyspark.transform()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session required to convert DataFrame |
required |
df |
DataFrame
|
Pandas DataFrame to be converted |
required |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/pandas_to_pyspark.py
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/pandas_to_pyspark.py
57 58 59 60 61 62 63 |
|
transform()
Returns:
Name | Type | Description |
---|---|---|
DataFrame |
DataFrame
|
A PySpark dataframe converted from a Pandas DataFrame. |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/pandas_to_pyspark.py
80 81 82 83 84 85 86 87 88 89 |
|
PySparkToPandasTransformer
Bases: TransformerInterface
Converts a PySpark DataFrame to a Pandas DataFrame.
Example
from rtdip_sdk.pipelines.transformers import PySparkToPandasTransformer
pyspark_to_pandas = PySparkToPandasTransformer(
df=df
)
result = pyspark_to_pandas.transform()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
DataFrame
|
PySpark DataFrame to be converted |
required |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/pyspark_to_pandas.py
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/pyspark_to_pandas.py
47 48 49 50 51 52 53 |
|
transform()
Returns:
Name | Type | Description |
---|---|---|
DataFrame |
DataFrame
|
A Pandas dataframe converted from a PySpark DataFrame. |
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/pyspark_to_pandas.py
70 71 72 73 74 75 76 |
|
SparkDeltaDestination
Bases: DestinationInterface
The Spark Delta Destination is used to write data to a Delta table.
Examples
#Delta Destination for Streaming Queries
from rtdip_sdk.pipelines.destinations import SparkDeltaDestination
delta_destination = SparkDeltaDestination(
data=df,
options={
"checkpointLocation": "/{CHECKPOINT-LOCATION}/"
},
destination="DELTA-TABLE-PATH",
mode="append",
trigger="10 seconds",
query_name="DeltaDestination",
query_wait_interval=None
)
delta_destination.write_stream()
#Delta Destination for Batch Queries
from rtdip_sdk.pipelines.destinations import SparkDeltaDestination
delta_destination = SparkDeltaDestination(
data=df,
options={
"overwriteSchema": True
},
destination="DELTA-TABLE-PATH",
mode="append",
trigger="10 seconds",
query_name="DeltaDestination",
query_wait_interval=None
)
delta_destination.write_batch()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
DataFrame
|
Dataframe to be written to Delta |
required |
options |
dict
|
required | |
destination |
str
|
Either the name of the Hive Metastore or Unity Catalog Delta Table or the path to the Delta table |
required |
mode |
optional str
|
Method of writing to Delta Table - append/overwrite (batch), append/update/complete (stream). Default is append |
'append'
|
trigger |
optional str
|
Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes". Set to "0 seconds" if you do not want to use a trigger. (stream) Default is 10 seconds |
'10 seconds'
|
query_name |
optional str
|
Unique name for the query in associated SparkSession. (stream) Default is DeltaDestination |
'DeltaDestination'
|
query_wait_interval |
optional int
|
If set, waits for the streaming query to complete before returning. (stream) Default is None |
None
|
Attributes:
Name | Type | Description |
---|---|---|
checkpointLocation |
str
|
Path to checkpoint files. (Streaming) |
txnAppId |
str
|
A unique string that you can pass on each DataFrame write. (Batch & Streaming) |
txnVersion |
str
|
A monotonically increasing number that acts as transaction version. (Batch & Streaming) |
maxRecordsPerFile |
int str
|
Specify the maximum number of records to write to a single file for a Delta Lake table. (Batch) |
replaceWhere |
str
|
Condition(s) for overwriting. (Batch) |
partitionOverwriteMode |
str
|
When set to dynamic, overwrites all existing data in each logical partition for which the write will commit new data. Default is static. (Batch) |
overwriteSchema |
bool str
|
If True, overwrites the schema as well as the table data. (Batch) |
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/delta.py
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/delta.py
115 116 117 118 119 120 121 |
|
write_batch()
Writes batch data to Delta. Most of the options provided by the Apache Spark DataFrame write API are supported for performing batch writes on tables.
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/delta.py
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
|
write_stream()
Writes streaming data to Delta. Exactly-once processing is guaranteed
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/delta.py
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
|
SparkDeltaMergeDestination
Bases: DestinationInterface
The Spark Delta Merge Destination is used to merge data into a Delta table. Refer to this documentation for more information about Delta Merge.
Examples
#Delta Merge Destination for Streaming Queries
from rtdip_sdk.pipelines.destinations import SparkDeltaMergeDestination
delta_merge_destination = SparkDeltaMergeDestination(
data=df,
destination="DELTA-TABLE-PATH",
options={
"checkpointLocation": "/{CHECKPOINT-LOCATION}/"
},
merge_condition="`source.id = target.id`"
when_matched_update_list=None
when_matched_delete_list=None
when_not_matched_insert_list=None
when_not_matched_by_source_update_list=None
when_not_matched_by_source_delete_list=None
try_broadcast_join=False
trigger="10 seconds",
query_name="DeltaDestination"
query_wait_interval=None
)
delta_merge_destination.write_stream()
#Delta Merge Destination for Batch Queries
from rtdip_sdk.pipelines.destinations import SparkDeltaMergeDestination
delta_merge_destination = SparkDeltaMergeDestination(
data=df,
destination="DELTA-TABLE-PATH",
options={},
merge_condition="`source.id = target.id`",
when_matched_update_list=None,
when_matched_delete_list=None,
when_not_matched_insert_list=None,
when_not_matched_by_source_update_list=None,
when_not_matched_by_source_delete_list=None,
try_broadcast_join=False,
trigger="10 seconds",
query_name="DeltaDestination"
query_wait_interval=None
)
delta_merge_destination.write_batch()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
DataFrame
|
Dataframe to be merged into a Delta Table |
required |
destination |
str
|
Either the name of the Hive Metastore or Unity Catalog Delta Table or the path to the Delta table |
required |
options |
dict
|
required | |
merge_condition |
str
|
Condition for matching records between dataframe and delta table. Reference Dataframe columns as |
required |
when_matched_update_list |
optional list[DeltaMergeConditionValues]
|
Conditions(optional) and values to be used when updating rows that match the |
None
|
when_matched_delete_list |
optional list[DeltaMergeCondition]
|
Conditions(optional) to be used when deleting rows that match the |
None
|
when_not_matched_insert_list |
optional list[DeltaMergeConditionValues]
|
Conditions(optional) and values to be used when inserting rows that do not match the |
None
|
when_not_matched_by_source_update_list |
optional list[DeltaMergeConditionValues]
|
Conditions(optional) and values to be used when updating rows that do not match the |
None
|
when_not_matched_by_source_delete_list |
optional list[DeltaMergeCondition]
|
Conditions(optional) to be used when deleting rows that do not match the |
None
|
try_broadcast_join |
optional bool
|
Attempts to perform a broadcast join in the merge which can leverage data skipping using partition pruning and file pruning automatically. Can fail if dataframe being merged is large and therefore more suitable for streaming merges than batch merges |
False
|
trigger |
optional str
|
Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes". Set to "0 seconds" if you do not want to use a trigger. (stream) Default is 10 seconds |
'10 seconds'
|
query_name |
optional str
|
Unique name for the query in associated SparkSession |
'DeltaMergeDestination'
|
query_wait_interval |
optional int
|
If set, waits for the streaming query to complete before returning. (stream) Default is None |
None
|
Attributes:
Name | Type | Description |
---|---|---|
checkpointLocation |
str
|
Path to checkpoint files. (Streaming) |
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/delta_merge.py
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/delta_merge.py
184 185 186 187 188 189 190 |
|
write_batch()
Merges batch data into a Delta Table.
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/delta_merge.py
298 299 300 301 302 303 304 305 306 307 308 309 310 311 |
|
write_stream()
Merges streaming data to Delta using foreachBatch
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/delta_merge.py
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 |
|
SparkEventhubDestination
Bases: DestinationInterface
This Spark destination class is used to write batch or streaming data to Eventhubs. Eventhub configurations need to be specified as options in a dictionary. Additionally, there are more optional configurations which can be found here. If using startingPosition or endingPosition make sure to check out Event Position section for more details and examples.
Examples
#Eventhub Destination for Streaming Queries
from rtdip_sdk.pipelines.destinations import SparkEventhubDestination
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
connectionString = Endpoint=sb://{NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY}=;EntityPath={EVENT_HUB_NAME}
eventhub_destination = SparkEventhubDestination(
spark=spark,
data=df,
options={
"eventhubs.connectionString": connectionString,
"eventhubs.consumerGroup": "{YOUR-EVENTHUB-CONSUMER-GROUP}",
"checkpointLocation": "/{CHECKPOINT-LOCATION}/"
},
trigger="10 seconds",
query_name="EventhubDestination",
query_wait_interval=None
)
eventhub_destination.write_stream()
#Eventhub Destination for Batch Queries
from rtdip_sdk.pipelines.destinations import SparkEventhubDestination
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
connectionString = Endpoint=sb://{NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY}=;EntityPath={EVENT_HUB_NAME}
eventhub_destination = SparkEventhubDestination(
spark=spark,
data=df,
options={
"eventhubs.connectionString": connectionString,
"eventhubs.consumerGroup": "{YOUR-EVENTHUB-CONSUMER-GROUP}"
},
trigger="10 seconds",
query_name="EventhubDestination",
query_wait_interval=None
)
eventhub_destination.write_batch()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session |
required |
data |
DataFrame
|
Dataframe to be written to Eventhub |
required |
options |
dict
|
A dictionary of Eventhub configurations (See Attributes table below). All Configuration options for Eventhubs can be found here. |
required |
trigger |
optional str
|
Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes". Set to "0 seconds" if you do not want to use a trigger. (stream) Default is 10 seconds |
'10 seconds'
|
query_name |
str
|
Unique name for the query in associated SparkSession |
'EventhubDestination'
|
query_wait_interval |
optional int
|
If set, waits for the streaming query to complete before returning. (stream) Default is None |
None
|
Attributes:
Name | Type | Description |
---|---|---|
checkpointLocation |
str
|
Path to checkpoint files. (Streaming) |
eventhubs.connectionString |
str
|
Eventhubs connection string is required to connect to the Eventhubs service. (Streaming and Batch) |
eventhubs.consumerGroup |
str
|
A consumer group is a view of an entire eventhub. Consumer groups enable multiple consuming applications to each have a separate view of the event stream, and to read the stream independently at their own pace and with their own offsets. (Streaming and Batch) |
eventhubs.startingPosition |
JSON str
|
The starting position for your Structured Streaming job. If a specific EventPosition is not set for a partition using startingPositions, then we use the EventPosition set in startingPosition. If nothing is set in either option, we will begin consuming from the end of the partition. (Streaming and Batch) |
eventhubs.endingPosition |
JSON str
|
(JSON str): The ending position of a batch query. This works the same as startingPosition. (Batch) |
maxEventsPerTrigger |
long
|
Rate limit on maximum number of events processed per trigger interval. The specified total number of events will be proportionally split across partitions of different volume. (Stream) |
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/eventhub.py
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/eventhub.py
130 131 132 133 134 135 136 |
|
write_batch()
Writes batch data to Eventhubs.
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/eventhub.py
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
|
write_stream()
Writes steaming data to Eventhubs.
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/eventhub.py
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 |
|
SparkKafkaDestination
Bases: DestinationInterface
This Spark destination class is used to write batch or streaming data from Kafka. Required and optional configurations can be found in the Attributes tables below.
Additionally, there are more optional configurations which can be found here.
For compatibility between Spark and Kafka, the columns in the input dataframe are concatenated into one 'value' column of JSON string.
Example
from rtdip_sdk.pipelines.destinations import SparkKafkaDestination
kafka_destination = SparkKafkaDestination(
data=df,
options={
"kafka.bootstrap.servers": "host1:port1,host2:port2"
},
trigger="10 seconds",
query_name="KafkaDestination",
query_wait_interval=None
)
kafka_destination.write_stream()
OR
kafka_destination.write_batch()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
DataFrame
|
Dataframe to be written to Kafka |
required |
options |
dict
|
A dictionary of Kafka configurations (See Attributes tables below). For more information on configuration options see here |
required |
trigger |
optional str
|
Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes". Set to "0 seconds" if you do not want to use a trigger. (stream) Default is 10 seconds |
'10 seconds'
|
query_name |
str
|
Unique name for the query in associated SparkSession |
'KafkaDestination'
|
query_wait_interval |
optional int
|
If set, waits for the streaming query to complete before returning. (stream) Default is None |
None
|
The following options must be set for the Kafka destination for both batch and streaming queries.
Attributes:
Name | Type | Description |
---|---|---|
kafka.bootstrap.servers |
A comma-separated list of host︰port
|
The Kafka "bootstrap.servers" configuration. (Streaming and Batch) |
The following configurations are optional:
Attributes:
Name | Type | Description |
---|---|---|
topic |
str
|
Sets the topic that all rows will be written to in Kafka. This option overrides any topic column that may exist in the data. (Streaming and Batch) |
includeHeaders |
bool
|
Whether to include the Kafka headers in the row. (Streaming and Batch) |
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka.py
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka.py
96 97 98 99 100 101 102 |
|
write_batch()
Writes batch data to Kafka.
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka.py
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
|
write_stream()
Writes steaming data to Kafka.
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka.py
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
|
SparkKinesisDestination
Bases: DestinationInterface
This Kinesis destination class is used to write batch or streaming data to Kinesis. Kinesis configurations need to be specified as options in a dictionary.
Example
from rtdip_sdk.pipelines.destinations import SparkKinesisDestination
kinesis_destination = SparkKinesisDestination(
data=df,
options={
"endpointUrl": "https://kinesis.{REGION}.amazonaws.com",
"awsAccessKey": "{YOUR-AWS-ACCESS-KEY}",
"awsSecretKey": "{YOUR-AWS-SECRET-KEY}",
"streamName": "{YOUR-STREAM-NAME}"
},
mode="update",
trigger="10 seconds",
query_name="KinesisDestination",
query_wait_interval=None
)
kinesis_destination.write_stream()
OR
kinesis_destination.write_batch()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
DataFrame
|
Dataframe to be written to Delta |
required |
options |
dict
|
A dictionary of Kinesis configurations (See Attributes table below). All Configuration options for Kinesis can be found here. |
required |
mode |
str
|
Method of writing to Kinesis - append, complete, update |
'update'
|
trigger |
optional str
|
Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes". Set to "0 seconds" if you do not want to use a trigger. (stream) Default is 10 seconds |
'10 seconds'
|
query_name |
str
|
Unique name for the query in associated SparkSession |
'KinesisDestination'
|
query_wait_interval |
optional int
|
If set, waits for the streaming query to complete before returning. (stream) Default is None |
None
|
Attributes:
Name | Type | Description |
---|---|---|
endpointUrl |
str
|
Endpoint of the kinesis stream. |
awsAccessKey |
str
|
AWS access key. |
awsSecretKey |
str
|
AWS secret access key corresponding to the access key. |
streamName |
List[str]
|
Name of the streams in Kinesis to write to. |
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kinesis.py
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK_DATABRICKS |
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kinesis.py
91 92 93 94 95 96 97 |
|
write_batch()
Writes batch data to Kinesis.
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kinesis.py
114 115 116 117 118 119 120 121 122 123 124 125 |
|
write_stream()
Writes steaming data to Kinesis.
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kinesis.py
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
|
SparkRestAPIDestination
Bases: DestinationInterface
The Spark Rest API Destination is used to write data to a Rest API.
The payload sent to the API is constructed by converting each row in the DataFrame to Json.
Note
While it is possible to use the write_batch
method, it is easy to overwhlem a Rest API with large volumes of data.
Consider reducing data volumes when writing to a Rest API in Batch mode to prevent API errors including throtting.
Example
#Rest API Destination for Streaming Queries
from rtdip_sdk.pipelines.destinations import SparkRestAPIDestination
rest_api_destination = SparkRestAPIDestination(
data=df,
options={
"checkpointLocation": "{/CHECKPOINT-LOCATION/}"
},
url="{REST-API-URL}",
headers = {
'Authorization': 'Bearer {}'.format("{TOKEN}")
},
batch_size=100,
method="POST",
parallelism=8,
trigger="1 minute",
query_name="DeltaRestAPIDestination",
query_wait_interval=None
)
rest_api_destination.write_stream()
#Rest API Destination for Batch Queries
from rtdip_sdk.pipelines.destinations import SparkRestAPIDestination
rest_api_destination = SparkRestAPIDestination(
data=df,
options={},
url="{REST-API-URL}",
headers = {
'Authorization': 'Bearer {}'.format("{TOKEN}")
},
batch_size=10,
method="POST",
parallelism=4,
trigger="1 minute",
query_name="DeltaRestAPIDestination",
query_wait_interval=None
)
rest_api_destination.write_stream()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
DataFrame
|
Dataframe to be merged into a Delta Table |
required |
options |
dict
|
A dictionary of options for streaming writes |
required |
url |
str
|
The Rest API Url |
required |
headers |
dict
|
A dictionary of headers to be provided to the Rest API |
required |
batch_size |
int
|
The number of DataFrame rows to be used in each Rest API call |
required |
method |
str
|
The method to be used when calling the Rest API. Allowed values are POST, PATCH and PUT |
'POST'
|
parallelism |
int
|
The number of concurrent calls to be made to the Rest API |
8
|
trigger |
optional str
|
Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes". Set to "0 seconds" if you do not want to use a trigger. (stream) Default is 10 seconds |
'1 minutes'
|
query_name |
str
|
Unique name for the query in associated SparkSession |
'DeltaRestAPIDestination'
|
query_wait_interval |
optional int
|
If set, waits for the streaming query to complete before returning. (stream) Default is None |
None
|
Attributes:
Name | Type | Description |
---|---|---|
checkpointLocation |
str
|
Path to checkpoint files. (Streaming) |
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/rest_api.py
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/rest_api.py
149 150 151 152 153 154 155 |
|
write_batch()
Writes batch data to a Rest API
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/rest_api.py
226 227 228 229 230 231 232 233 234 235 236 237 238 |
|
write_stream()
Writes streaming data to a Rest API
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/rest_api.py
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 |
|
SparkPCDMToDeltaDestination
Bases: DestinationInterface
The Process Control Data Model written to Delta.
Example
#PCDM Latest To Delta Destination for Streaming Queries
from rtdip_sdk.pipelines.destinations import SparkPCDMToDeltaDestination
pcdm_to_delta_destination = SparkPCDMToDeltaDestination(
data=df,
options={
"checkpointLocation": "{/CHECKPOINT-LOCATION/}"
},
destination_float="{DELTA_TABLE_PATH_FLOAT}",
destination_string="{DELTA_TABLE_PATH_STRING}",
destination_integer="{DELTA_TABLE_PATH_INTEGER}",
mode="append",
trigger="10 seconds",
query_name="PCDMToDeltaDestination",
query_wait_interval=None,
merge=True,
try_broadcast_join=False,
remove_nanoseconds=False,
remove_duplicates-True
)
pcdm_to_delta_destination.write_stream()
#PCDM Latest To Delta Destination for Batch Queries
from rtdip_sdk.pipelines.destinations import SparkPCDMToDeltaDestination
pcdm_to_delta_destination = SparkPCDMToDeltaDestination(
data=df,
options={
"maxRecordsPerFile", "10000"
},
destination_float="{DELTA_TABLE_PATH_FLOAT}",
destination_string="{DELTA_TABLE_PATH_STRING}",
destination_integer="{DELTA_TABLE_PATH_INTEGER}",
mode="overwrite",
trigger="10 seconds",
query_name="PCDMToDeltaDestination",
query_wait_interval=None,
merge=True,
try_broadcast_join=False,
remove_nanoseconds=False,
remove_duplicates-True
)
pcdm_to_delta_destination.write_batch()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
DataFrame
|
Dataframe to be merged into a Delta Table |
required |
options |
dict
|
required | |
destination_float |
str
|
Either the name of the Hive Metastore or Unity Catalog Delta Table or the path to the Delta table to store float values. |
required |
destination_string |
Optional str
|
Either the name of the Hive Metastore or Unity Catalog Delta Table or the path to the Delta table to store string values. |
None
|
destination_integer |
Optional str
|
Either the name of the Hive Metastore or Unity Catalog Delta Table or the path to the Delta table to store integer values |
None
|
mode |
str
|
Method of writing to Delta Table - append/overwrite (batch), append/complete (stream) |
None
|
trigger |
optional str
|
Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes". Set to "0 seconds" if you do not want to use a trigger. (stream) Default is 10 seconds |
'10 seconds'
|
query_name |
str
|
Unique name for the query in associated SparkSession |
'PCDMToDeltaDestination'
|
query_wait_interval |
optional int
|
If set, waits for the streaming query to complete before returning. (stream) Default is None |
None
|
merge |
bool
|
Use Delta Merge to perform inserts, updates and deletes |
True
|
try_broadcast_join |
bool
|
Attempts to perform a broadcast join in the merge which can leverage data skipping using partition pruning and file pruning automatically. Can fail if dataframe being merged is large and therefore more suitable for streaming merges than batch merges |
False
|
remove_nanoseconds |
bool
|
Removes nanoseconds from the EventTime column and replaces with zeros |
False
|
remove_duplicates |
bool
|
Removes duplicates before writing the data |
True
|
Attributes:
Name | Type | Description |
---|---|---|
checkpointLocation |
str
|
Path to checkpoint files. (Streaming) |
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/pcdm_to_delta.py
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/pcdm_to_delta.py
161 162 163 164 165 166 167 |
|
write_batch()
Writes Process Control Data Model data to Delta
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/pcdm_to_delta.py
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 |
|
write_stream()
Writes streaming Process Control Data Model data to Delta using foreachBatch
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/pcdm_to_delta.py
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 |
|
SparkPCDMLatestToDeltaDestination
Bases: DestinationInterface
The Process Control Data Model Latest Values written to Delta.
Example
#PCDM Latest To Delta Destination for Streaming Queries
from rtdip_sdk.pipelines.destinations import SparkPCDMLatestToDeltaDestination
pcdm_latest_to_delta_destination = SparkPCDMLatestToDeltaDestination(
data=df,
options={
"checkpointLocation": "{/CHECKPOINT-LOCATION/}"
},
destination="{DELTA_TABLE_PATH}",
mode="append",
trigger="10 seconds",
query_name="PCDMLatestToDeltaDestination",
query_wait_interval=None
)
pcdm_latest_to_delta_destination.write_stream()
#PCDM Latest To Delta Destination for Batch Queries
from rtdip_sdk.pipelines.destinations import SparkPCDMLatestToDeltaDestination
pcdm_latest_to_delta_destination = SparkPCDMLatestToDeltaDestination(
data=df,
options={
"maxRecordsPerFile", "10000"
},
destination="{DELTA_TABLE_PATH}",
mode="overwrite",
trigger="10 seconds",
query_name="PCDMLatestToDeltaDestination",
query_wait_interval=None
)
pcdm_latest_to_delta_destination.write_batch()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
DataFrame
|
Dataframe to be merged into a Delta Table |
required |
options |
dict
|
required | |
destination |
str
|
Either the name of the Hive Metastore or Unity Catalog Delta Table or the path to the Delta table to store the latest values |
required |
mode |
str
|
Method of writing to Delta Table - append/overwrite (batch), append/complete (stream) |
None
|
trigger |
optional str
|
Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes". Set to "0 seconds" if you do not want to use a trigger. (stream) Default is 10 seconds |
'10 seconds'
|
query_name |
str
|
Unique name for the query in associated SparkSession |
'PCDMLatestToDeltaDestination'
|
query_wait_interval |
optional int
|
If set, waits for the streaming query to complete before returning. (stream) Default is None |
None
|
Attributes:
Name | Type | Description |
---|---|---|
checkpointLocation |
str
|
Path to checkpoint files. (Streaming) |
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/pcdm_latest_to_delta.py
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/pcdm_latest_to_delta.py
126 127 128 129 130 131 132 |
|
write_batch()
Writes Process Control Data Model data to Delta
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/pcdm_latest_to_delta.py
240 241 242 243 244 245 246 247 248 249 250 251 252 |
|
write_stream()
Writes streaming Process Control Data Model data to Delta using foreachBatch
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/pcdm_latest_to_delta.py
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 |
|
SparkKafkaEventhubDestination
Bases: DestinationInterface
This Spark Destination class is used to write batch or streaming data to an Eventhub using the Kafka protocol. This enables Eventhubs to be used as a destination in applications like Delta Live Tables or Databricks Serverless Jobs as the Spark Eventhubs JAR is not supported in these scenarios.
Default settings will be specified if not provided in the options
parameter:
kafka.sasl.mechanism
will be set toPLAIN
kafka.security.protocol
will be set toSASL_SSL
kafka.request.timeout.ms
will be set to60000
kafka.session.timeout.ms
will be set to60000
Example
from rtdip_sdk.pipelines.destinations import SparkKafkaEventhubDestination
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
connectionString = Endpoint=sb://{NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY}=;EntityPath={EVENT_HUB_NAME}
eventhub_destination = SparkKafkaEventhubDestination(
spark=spark,
data=df,
options={
"kafka.bootstrap.servers": "host1:port1,host2:port2"
},
connection_string="{YOUR-EVENTHUB-CONNECTION-STRING}",
consumer_group="{YOUR-EVENTHUB-CONSUMER-GROUP}",
trigger="10 seconds",
query_name="KafkaEventhubDestination",
query_wait_interval=None
)
eventhub_destination.write_stream()
OR
eventhub_destination.write_batch()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session |
required |
data |
DataFrame
|
Any columns not listed in the required schema here will be merged into a single column named "value", or ignored if "value" is an existing column |
required |
connection_string |
str
|
Eventhubs connection string is required to connect to the Eventhubs service. This must include the Eventhub name as the |
required |
options |
dict
|
A dictionary of Kafka configurations (See Attributes tables below) |
required |
consumer_group |
str
|
The Eventhub consumer group to use for the connection |
required |
trigger |
optional str
|
Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes". Set to "0 seconds" if you do not want to use a trigger. (stream) Default is 10 seconds |
'10 seconds'
|
query_name |
optional str
|
Unique name for the query in associated SparkSession |
'KafkaEventhubDestination'
|
query_wait_interval |
optional int
|
If set, waits for the streaming query to complete before returning. (stream) Default is None |
None
|
The following are commonly used parameters that may be included in the options dict. kafka.bootstrap.servers is the only required config. A full list of configs can be found here
Attributes:
Name | Type | Description |
---|---|---|
kafka.bootstrap.servers |
A comma-separated list of host︰port
|
The Kafka "bootstrap.servers" configuration. (Streaming and Batch) |
topic |
string
|
Required if there is no existing topic column in your DataFrame. Sets the topic that all rows will be written to in Kafka. (Streaming and Batch) |
includeHeaders |
bool
|
Determines whether to include the Kafka headers in the row; defaults to False. (Streaming and Batch) |
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka_eventhub.py
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka_eventhub.py
131 132 133 134 135 136 137 |
|
write_batch()
Reads batch data from Kafka.
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka_eventhub.py
289 290 291 292 293 294 295 296 297 298 299 300 301 302 |
|
write_stream()
Reads streaming data from Kafka.
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka_eventhub.py
304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 |
|
EVMContractDestination
Bases: DestinationInterface
The EVM Contract Destination is used to write to a smart contract blockchain.
Examples
from rtdip_sdk.pipelines.destinations import EVMContractDestination
evm_contract_destination = EVMContractDestination(
url="https://polygon-mumbai.g.alchemy.com/v2/⟨API_KEY⟩",
account="{ACCOUNT-ADDRESS}",
private_key="{PRIVATE-KEY}",
abi="{SMART-CONTRACT'S-ABI}",
contract="{SMART-CONTRACT-ADDRESS}",
function_name="{SMART-CONTRACT-FUNCTION}",
function_params=({PARAMETER_1}, {PARAMETER_2}, {PARAMETER_3}),
transaction={'gas': {GAS}, 'gasPrice': {GAS-PRICE}},
)
evm_contract_destination.write_batch()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str
|
Blockchain network URL e.g. 'https://polygon-mumbai.g.alchemy.com/v2/⟨API_KEY⟩' |
required |
account |
str
|
Address of the sender that will be signing the transaction. |
required |
private_key |
str
|
Private key for your blockchain account. |
required |
abi |
json str
|
Smart contract's ABI. |
required |
contract |
str
|
Address of the smart contract. |
None
|
function_name |
str
|
Smart contract method to call on. |
None
|
function_params |
tuple
|
Parameters of given function. |
None
|
transaction |
dict
|
A dictionary containing a set of instructions to interact with a smart contract deployed on the blockchain (See common parameters in Attributes table below). |
None
|
Attributes:
Name | Type | Description |
---|---|---|
data |
hexadecimal str
|
Additional information store in the transaction. |
from |
hexadecimal str
|
Address of sender for a transaction. |
gas |
int
|
Amount of gas units to perform a transaction. |
gasPrice |
int Wei
|
Price to pay for each unit of gas. Integers are specified in Wei, web3's to_wei function can be used to specify the amount in a different currency. |
nonce |
int
|
The number of transactions sent from a given address. |
to |
hexadecimal str
|
Address of recipient for a transaction. |
value |
int Wei
|
Value being transferred in a transaction. Integers are specified in Wei, web3's to_wei function can be used to specify the amount in a different currency. |
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/blockchain/evm.py
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
|
write_batch()
Writes to a smart contract deployed in a blockchain and returns the transaction hash.
Example:
from web3 import Web3
web3 = Web3(Web3.HTTPProvider("https://polygon-mumbai.g.alchemy.com/v2/<API_KEY>"))
x = EVMContractDestination(
url="https://polygon-mumbai.g.alchemy.com/v2/<API_KEY>",
account='<ACCOUNT>',
private_key='<PRIVATE_KEY>',
contract='<CONTRACT>',
function_name='transferFrom',
function_params=('<FROM_ACCOUNT>', '<TO_ACCOUNT>', 0),
abi = 'ABI',
transaction={
'gas': 100000,
'gasPrice': 1000000000 # or web3.to_wei('1', 'gwei')
},
)
print(x.write_batch())
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/blockchain/evm.py
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
|
write_stream()
Raises:
Type | Description |
---|---|
NotImplementedError
|
Write stream is not supported. |
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/blockchain/evm.py
160 161 162 163 164 165 |
|
PythonDeltaDestination
Bases: DestinationInterface
The Python Delta Destination is used to write data to a Delta table from a Polars LazyFrame.
Example
from rtdip_sdk.pipelines.destinations import PythonDeltaDestination
path = "abfss://{FILE-SYSTEM}@{ACCOUNT-NAME}.dfs.core.windows.net/{PATH}/{FILE-NAME}
python_delta_destination = PythonDeltaDestination(
data=LazyFrame
path=path,
storage_options={
"azure_storage_account_name": "{AZURE-STORAGE-ACCOUNT-NAME}",
"azure_storage_account_key": "{AZURE-STORAGE-ACCOUNT-KEY}"
},
mode=:error",
overwrite_schema=False,
delta_write_options=None
)
python_delta_destination.read_batch()
from rtdip_sdk.pipelines.destinations import PythonDeltaDestination
path = "https://s3.{REGION-CODE}.amazonaws.com/{BUCKET-NAME}/{KEY-NAME}"
python_delta_destination = PythonDeltaDestination(
data=LazyFrame
path=path,
options={
"aws_access_key_id": "{AWS-ACCESS-KEY-ID}",
"aws_secret_access_key": "{AWS-SECRET-ACCESS-KEY}"
},
mode=:error",
overwrite_schema=False,
delta_write_options=None
)
python_delta_destination.read_batch()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
LazyFrame
|
Polars LazyFrame to be written to Delta |
required |
path |
str
|
Path to Delta table to be written to; either local or remote. Locally if the Table does't exist one will be created, but to write to AWS or Azure, you must have an existing Delta Table |
required |
options |
Optional dict
|
Used if writing to a remote location. For AWS use format {"aws_access_key_id": "<>", "aws_secret_access_key": "<>"}. For Azure use format {"azure_storage_account_name": "storageaccountname", "azure_storage_access_key": "<>"} |
None
|
mode |
Literal['error', 'append', 'overwrite', 'ignore']
|
Defaults to error if table exists, 'ignore' won't write anything if table exists |
'error'
|
overwrite_schema |
bool
|
If True will allow for the table schema to be overwritten |
False
|
delta_write_options |
dict
|
Options when writing to a Delta table. See here for all options |
None
|
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/python/delta.py
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYTHON |
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/python/delta.py
105 106 107 108 109 110 111 |
|
write_batch()
Writes batch data to Delta without using Spark.
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/python/delta.py
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
|
write_stream()
Raises:
Type | Description |
---|---|
NotImplementedError
|
Writing to a Delta table using Python is only possible for batch writes. To perform a streaming read, use the write_stream method of the SparkDeltaDestination component. |
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/python/delta.py
146 147 148 149 150 151 152 153 |
|
DatabricksSecrets
Bases: SecretsInterface
Retrieves secrets from Databricks Secret Scopes. For more information about Databricks Secret Scopes, see here.
Example
# Reads Secrets from Databricks Secret Scopes
from rtdip_sdk.pipelines.secrets import DatabricksSecrets
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
get_databricks_secret = DatabricksSecrets(
spark=spark,
vault="{NAME-OF-DATABRICKS-SECRET-SCOPE}"
key="{KEY-NAME-OF-SECRET}",
)
get_databricks_secret.get()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session required to read data from a Delta table |
required |
vault |
str
|
Name of the Databricks Secret Scope |
required |
key |
str
|
Name/Key of the secret in the Databricks Secret Scope |
required |
Source code in src/sdk/python/rtdip_sdk/pipelines/secrets/databricks.py
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK on Databricks |
Source code in src/sdk/python/rtdip_sdk/pipelines/secrets/databricks.py
60 61 62 63 64 65 66 |
|
get()
Retrieves the secret from the Databricks Secret Scope
Source code in src/sdk/python/rtdip_sdk/pipelines/secrets/databricks.py
77 78 79 80 81 82 |
|
set()
Sets the secret in the Secret Scope Raises: NotImplementedError: Will be implemented at a later point in time
Source code in src/sdk/python/rtdip_sdk/pipelines/secrets/databricks.py
84 85 86 87 88 89 90 |
|
HashiCorpVaultSecrets
Bases: SecretsInterface
Retrieves and creates/updates secrets in a Hashicorp Vault. For more information about Hashicorp Vaults, see here.
Example
# Retrieves Secrets from HashiCorp Vault
from rtdip_sdk.pipelines.secrets import HashiCorpVaultSecrets
get_hashicorp_secret = HashiCorpVaultSecrets(
vault="http://127.0.0.1:8200",
key="{KEY}",
secret=None,
credential="{CREDENTIAL}",
kwargs=None
)
get_hashicorp_secret.get()
# Creates or Updates Secrets in Hashicorp Vault
from rtdip_sdk.pipelines.secrets import HashiCorpVaultSecrets
set_hashicorp_secret = AzureKeyVaultSecrets(
vault="http://127.0.0.1:8200",
key="{KEY}",
secret="{SECRET-TO-BE-SET}",
credential="{CREDENTIAL}",
kwargs=None
)
set_hashicorp_secret.set()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
vault |
str
|
Hashicorp Vault URL |
required |
key |
str
|
Name/Key of the secret in the Hashicorp Vault |
required |
secret |
str
|
Secret or Password to be stored in the Hashicorp Vault |
None
|
credential |
str
|
Token for authentication with the Hashicorp Vault |
None
|
kwargs |
dict
|
List of additional parameters to be passed when creating a Hashicorp Vault Client. Please see here for more details on parameters that can be provided to the client |
{}
|
Source code in src/sdk/python/rtdip_sdk/pipelines/secrets/hashicorp_vault.py
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYTHON |
Source code in src/sdk/python/rtdip_sdk/pipelines/secrets/hashicorp_vault.py
87 88 89 90 91 92 93 |
|
get()
Retrieves the secret from the Hashicorp Vault
Source code in src/sdk/python/rtdip_sdk/pipelines/secrets/hashicorp_vault.py
108 109 110 111 112 113 |
|
set()
Creates or updates a secret in the Hashicorp Vault
Source code in src/sdk/python/rtdip_sdk/pipelines/secrets/hashicorp_vault.py
115 116 117 118 119 120 121 122 123 |
|
AzureKeyVaultSecrets
Bases: SecretsInterface
Retrieves and creates/updates secrets in Azure Key Vault. For more information about Azure Key Vaults, see here.
Example
# Retrieves Secrets from Azure Key Vault
from rtdip_sdk.pipelines.secrets import AzureKeyVaultSecrets
get_key_vault_secret = AzureKeyVaultSecrets(
vault="https://{YOUR-KEY-VAULT}.azure.net/",
key="{KEY}",
secret=None,
credential="{CREDENTIAL}",
kwargs=None
)
get_key_vault_secret.get()
# Creates or Updates Secrets in Azure Key Vault
from rtdip_sdk.pipelines.secrets import AzureKeyVaultSecrets
set_key_vault_secret = AzureKeyVaultSecrets(
vault="https://{YOUR-KEY-VAULT}.azure.net/",
key="{KEY}",
secret="{SECRET-TO-BE-SET}",
credential="{CREDENTIAL}",
kwargs=None
)
set_key_vault_secret.set()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
vault |
str
|
Azure Key Vault URL |
required |
key |
str
|
Key for the secret |
required |
secret |
str
|
Secret or Password to be set in the Azure Key Vault |
None
|
credential |
str
|
Credential for authenticating with Azure Key Vault |
None
|
kwargs |
dict
|
List of additional parameters to be passed when creating a Azure Key Vault Client. Please see here for more details on parameters that can be provided to the client |
None
|
Source code in src/sdk/python/rtdip_sdk/pipelines/secrets/azure_key_vault.py
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYTHON |
Source code in src/sdk/python/rtdip_sdk/pipelines/secrets/azure_key_vault.py
88 89 90 91 92 93 94 |
|
get()
Retrieves the secret from the Azure Key Vault
Source code in src/sdk/python/rtdip_sdk/pipelines/secrets/azure_key_vault.py
113 114 115 116 117 118 |
|
set()
Creates or updates a secret in the Azure Key Vault
Source code in src/sdk/python/rtdip_sdk/pipelines/secrets/azure_key_vault.py
120 121 122 123 124 125 |
|
SystemType
Bases: Enum
The type of the system.
Source code in src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/models.py
20 21 22 23 24 25 26 27 28 |
|
DeltaTableCreateUtility
Bases: UtilitiesInterface
Creates a Delta Table in a Hive Metastore or in Databricks Unity Catalog.
Example
from rtdip_sdk.pipelines.utilities.spark.delta_table_create import DeltaTableCreateUtility, DeltaTableColumn
table_create_utility = DeltaTableCreateUtility(
spark=spark_session,
table_name="delta_table",
columns=[
DeltaTableColumn(name="EventDate", type="date", nullable=False, metadata={"delta.generationExpression": "CAST(EventTime AS DATE)"}),
DeltaTableColumn(name="TagName", type="string", nullable=False),
DeltaTableColumn(name="EventTime", type="timestamp", nullable=False),
DeltaTableColumn(name="Status", type="string", nullable=True),
DeltaTableColumn(name="Value", type="float", nullable=True)
],
partitioned_by=["EventDate"],
properties={"delta.logRetentionDuration": "7 days", "delta.enableChangeDataFeed": "true"},
comment="Creation of Delta Table"
)
result = table_create_utility.execute()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session required to read data from cloud storage |
required |
table_name |
str
|
Name of the table, including catalog and schema if table is to be created in Unity Catalog |
required |
columns |
list[DeltaTableColumn]
|
List of columns and their related column properties |
required |
partitioned_by |
list[str]
|
List of column names to partition the table by |
None
|
location |
str
|
Path to storage location |
None
|
properties |
dict
|
Propoerties that can be specified for a Delta Table. Further information on the options available are here |
None
|
comment |
str
|
Provides a comment on the table metadata |
None
|
Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/spark/delta_table_create.py
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/spark/delta_table_create.py
100 101 102 103 104 105 106 |
|
DeltaTableOptimizeUtility
Bases: UtilitiesInterface
Optimizes a Delta Table.
Example
from rtdip_sdk.pipelines.utilities.spark.delta_table_optimize import DeltaTableOptimizeUtility
table_optimize_utility = DeltaTableOptimizeUtility(
spark=spark_session,
table_name="delta_table",
where="EventDate<=current_date()",
zorder_by=["EventDate"]
)
result = table_optimize_utility.execute()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session required to read data from cloud storage |
required |
table_name |
str
|
Name of the table, including catalog and schema if table is to be created in Unity Catalog |
required |
where |
str
|
Apply a partition filter to limit optimize to specific partitions. Example, "date='2021-11-18'" or "EventDate<=current_date()" |
None
|
zorder_by |
list[str]
|
List of column names to zorder the table by. For more information, see here. |
None
|
Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/spark/delta_table_optimize.py
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/spark/delta_table_optimize.py
69 70 71 72 73 74 75 |
|
DeltaTableVacuumUtility
Bases: UtilitiesInterface
Vacuums a Delta Table.
Example
from rtdip_sdk.pipelines.utilities.spark.delta_table_vacuum import DeltaTableVacuumUtility
table_vacuum_utility = DeltaTableVacuumUtility(
spark=spark_session,
table_name="delta_table",
retention_hours="168"
)
result = table_vacuum_utility.execute()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session required to read data from cloud storage |
required |
table_name |
str
|
Name of the table, including catalog and schema if table is to be created in Unity Catalog |
required |
retention_hours |
int
|
Sets the retention threshold in hours. |
None
|
Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/spark/delta_table_vacuum.py
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/spark/delta_table_vacuum.py
61 62 63 64 65 66 67 |
|
SparkConfigurationUtility
Bases: UtilitiesInterface
Sets configuration key value pairs to a Spark Session
Example
from rtdip_sdk.pipelines.sources import SparkConfigurationUtility
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
configuration_utility = SparkConfigurationUtility(
spark=spark,
config={}
)
result = configuration_utility.execute()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session required to read data from cloud storage |
required |
config |
dict
|
Dictionary of spark configuration to be applied to the spark session |
required |
Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/spark/configuration.py
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/spark/configuration.py
63 64 65 66 67 68 69 |
|
execute()
Executes configuration key value pairs to a Spark Session
Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/spark/configuration.py
80 81 82 83 84 85 86 87 88 89 90 91 92 |
|
SparkADLSGen2SPNConnectUtility
Bases: UtilitiesInterface
Configures Spark to Connect to an ADLS Gen 2 Storage Account using a Service Principal.
Example
from rtdip_sdk.pipelines.utilities import SparkADLSGen2SPNConnectUtility
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
adls_gen2_connect_utility = SparkADLSGen2SPNConnectUtility(
spark=spark,
storage_account="YOUR-STORAGAE-ACCOUNT-NAME",
tenant_id="YOUR-TENANT-ID",
client_id="YOUR-CLIENT-ID",
client_secret="YOUR-CLIENT-SECRET"
)
result = adls_gen2_connect_utility.execute()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session required to read data from cloud storage |
required |
storage_account |
str
|
Name of the ADLS Gen 2 Storage Account |
required |
tenant_id |
str
|
Tenant ID of the Service Principal |
required |
client_id |
str
|
Service Principal Client ID |
required |
client_secret |
str
|
Service Principal Client Secret |
required |
Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/spark/adls_gen2_spn_connect.py
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/spark/adls_gen2_spn_connect.py
76 77 78 79 80 81 82 |
|
execute()
Executes spark configuration to connect to an ADLS Gen 2 Storage Account using a service principal
Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/spark/adls_gen2_spn_connect.py
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
|
ADLSGen2DirectoryACLUtility
Bases: UtilitiesInterface
Assigns Azure AD Groups to ACLs on directories in an Azure Data Lake Store Gen 2 storage account.
Example
from rtdip_sdk.pipelines.utilities import ADLSGen2DirectoryACLUtility
adls_gen2_directory_acl_utility = ADLSGen2DirectoryACLUtility(
storage_account="YOUR-STORAGAE-ACCOUNT-NAME",
container="YOUR-ADLS_CONTAINER_NAME",
credential="YOUR-TOKEN-CREDENTIAL",
directory="DIRECTORY",
group_object_id="GROUP-OBJECT",
folder_permissions="r-x",
parent_folder_permissions="r-x",
root_folder_permissions="r-x",
set_as_default_acl=True,
create_directory_if_not_exists=True
)
result = adls_gen2_directory_acl_utility.execute()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
storage_account |
str
|
ADLS Gen 2 Storage Account Name |
required |
container |
str
|
ADLS Gen 2 Container Name |
required |
credential |
TokenCredential
|
Credentials to authenticate with ADLS Gen 2 Storage Account |
required |
directory |
str
|
Directory to be assign ACLS to in an ADLS Gen 2 |
required |
group_object_id |
str
|
Azure AD Group Object ID to be assigned to Directory |
required |
folder_permissions |
(optional, str)
|
Folder Permissions to Assign to directory |
'r-x'
|
parent_folder_permissions |
(optional, str)
|
Folder Permissions to Assign to parent directories. Parent Folder ACLs not set if None |
'r-x'
|
root_folder_permissions |
(optional, str)
|
Folder Permissions to Assign to root directory. Root Folder ACL not set if None |
'r-x'
|
set_as_default_acl |
bool
|
Sets the ACL as the default ACL on the folder |
True
|
create_directory_if_not_exists |
bool
|
Creates the directory(and Parent Directories) if it does not exist |
True
|
Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/azure/adls_gen2_acl.py
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYTHON |
Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/azure/adls_gen2_acl.py
117 118 119 120 121 122 123 |
|
AzureAutoloaderResourcesUtility
Bases: UtilitiesInterface
Creates the required Azure Resources for the Databricks Autoloader Notification Mode.
Example
from rtdip_sdk.pipelines.utilities import AzureAutoloaderResourcesUtility
azure_autoloader_resources_utility = AzureAutoloaderResourcesUtility(
subscription_id="YOUR-SUBSCRIPTION-ID",
resource_group_name="YOUR-RESOURCE-GROUP",
storage_account="YOUR-STORAGE-ACCOUNT-NAME",
container="YOUR-CONTAINER-NAME",
directory="DIRECTORY",
credential="YOUR-CLIENT-ID",
event_subscription_name="YOUR-EVENT-SUBSCRIPTION",
queue_name="YOUR-QUEUE-NAME",
system_topic_name=None
)
result = azure_autoloader_resources_utility.execute()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
subscription_id |
str
|
Azure Subscription ID |
required |
resource_group_name |
str
|
Resource Group Name of Subscription |
required |
storage_account |
str
|
Storage Account Name |
required |
container |
str
|
Container Name |
required |
directory |
str
|
Directory to be used for filtering messages in the Event Subscription. This will be equivalent to the Databricks Autoloader Path |
required |
credential |
TokenCredential
|
Credentials to authenticate with Storage Account |
required |
event_subscription_name |
str
|
Name of the Event Subscription |
required |
queue_name |
str
|
Name of the queue that will be used for the Endpoint of the Messages |
required |
Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/azure/autoloader_resources.py
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYTHON |
Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/azure/autoloader_resources.py
98 99 100 101 102 103 104 |
|
PipelineComponentsGetUtility
Bases: UtilitiesInterface
Gets the list of imported RTDIP components. Returns the libraries and settings of the components to be used in the pipeline.
Call this component after all imports of the RTDIP components to ensure that the components can be determined.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
module |
optional str
|
Provide the module to use for imports of rtdip-sdk components. If not populated, it will use the calling module to check for imports |
None
|
spark_config |
optional dict
|
Additional spark configuration to be applied to the spark session |
None
|
Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/pipeline_components.py
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYTHON |
Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/pipeline_components.py
44 45 46 47 48 49 50 |
|
SparkSessionUtility
Bases: UtilitiesInterface
Creates or Gets a Spark Session and uses settings and libraries of the imported RTDIP components to populate the spark configuration and jars in the spark session.
Call this component after all imports of the RTDIP components to ensure that the spark session is configured correctly.
Example
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
spark_session_utility = SparkSessionUtility(
config={},
module=None,
remote=None
)
result = spark_session_utility.execute()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
optional dict
|
Dictionary of spark configuration to be applied to the spark session |
None
|
module |
optional str
|
Provide the module to use for imports of rtdip-sdk components. If not populated, it will use the calling module to check for imports |
None
|
remote |
optional str
|
Specify the remote parameters if intending to use Spark Connect |
None
|
Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/spark/session.py
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/spark/session.py
69 70 71 72 73 74 75 |
|
execute()
To execute
Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/spark/session.py
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
|
PipelineJobFromJsonConverter
Bases: ConverterInterface
Converts a json string into a Pipeline Job.
Example
from rtdip_sdk.pipelines.secrets import PipelineJobFromJsonConverter
convert_json_string_to_pipline_job = PipelineJobFromJsonConverter(
pipeline_json = "{JSON-STRING}"
)
convert_json_string_to_pipline_job.convert()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_json |
str
|
Json representing PipelineJob information, including tasks and related steps |
required |
Source code in src/sdk/python/rtdip_sdk/pipelines/converters/pipeline_job_json.py
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
|
convert()
Converts a json string to a Pipeline Job
Source code in src/sdk/python/rtdip_sdk/pipelines/converters/pipeline_job_json.py
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
|
PipelineJobToJsonConverter
Bases: ConverterInterface
Converts a Pipeline Job into a json string.
Example
from rtdip_sdk.pipelines.secrets import PipelineJobToJsonConverter
convert_pipeline_job_to_json_string = PipelineJobFromJsonConverter(
pipeline_json = PipelineJob
)
convert_pipeline_job_to_json_string.convert()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_job |
PipelineJob
|
A Pipeline Job consisting of tasks and steps |
required |
Source code in src/sdk/python/rtdip_sdk/pipelines/converters/pipeline_job_json.py
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
|
convert()
Converts a Pipeline Job to a json string
Source code in src/sdk/python/rtdip_sdk/pipelines/converters/pipeline_job_json.py
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
|