Read from Delta sharing
SparkDeltaSharingSource
Bases: SourceInterface
The Spark Delta Sharing Source is used to read data from a Delta table where Delta sharing is configured
Example
#Delta Sharing Source for Streaming Queries
from rtdip_sdk.pipelines.sources import SparkDeltaSharingSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
delta_sharing_source = SparkDeltaSharingSource(
spark=spark,
options={
"maxFilesPerTrigger": 1000,
"ignoreChanges: True,
"startingVersion": 0
},
table_name="{YOUR-DELTA-TABLE-PATH}"
)
delta_sharing_source.read_stream()
#Delta Sharing Source for Batch Queries
from rtdip_sdk.pipelines.sources import SparkDeltaSharingSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
delta_sharing_source = SparkDeltaSharingSource(
spark=spark,
options={
"versionAsOf": 0,
"timestampAsOf": "yyyy-mm-dd hh:mm:ss[.fffffffff]"
},
table_name="{YOUR-DELTA-TABLE-PATH}"
)
delta_sharing_source.read_batch()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session required to read data from a Delta table |
required |
options |
dict
|
Options that can be specified for a Delta Table read operation (See Attributes table below). Further information on the options is available here |
required |
table_path |
str
|
Path to credentials file and Delta table to query |
required |
Attributes:
Name | Type | Description |
---|---|---|
ignoreDeletes |
bool str
|
Ignore transactions that delete data at partition boundaries. (Streaming) |
ignoreChanges |
bool str
|
Pre-process updates if files had to be rewritten in the source table due to a data changing operation. (Streaming) |
startingVersion |
int str
|
The Delta Lake version to start from. (Streaming) |
startingTimestamp |
datetime str
|
The timestamp to start from. (Streaming) |
maxFilesPerTrigger |
int
|
How many new files to be considered in every micro-batch. The default is 1000. (Streaming) |
maxBytesPerTrigger |
int
|
How much data gets processed in each micro-batch. (Streaming) |
readChangeFeed |
bool str
|
Stream read the change data feed of the shared table. (Batch & Streaming) |
timestampAsOf |
datetime str
|
Query the Delta Table from a specific point in time. (Batch) |
versionAsOf |
int str
|
Query the Delta Table from a specific version. (Batch) |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/delta_sharing.py
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/delta_sharing.py
98 99 100 101 102 103 104 |
|
read_batch()
Reads batch data from Delta. Most of the options provided by the Apache Spark DataFrame read API are supported for performing batch reads on Delta tables.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/delta_sharing.py
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
|
read_stream()
Reads streaming data from Delta. All of the data in the table is processed as well as any new data that arrives after the stream started. .load() can take table name or path.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/delta_sharing.py
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
|