Read from an Eventhub
SparkEventhubSource
Bases: SourceInterface
This Spark source class is used to read batch or streaming data from Eventhubs. Eventhub configurations need to be specified as options in a dictionary. Additionally, there are more optional configurations which can be found here. If using startingPosition or endingPosition make sure to check out the Event Position section for more details and examples.
Example
#Eventhub Source for Streaming Queries
from rtdip_sdk.pipelines.sources import SparkEventhubSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
import json
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
connectionString = "Endpoint=sb://{NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY}=;EntityPath={EVENT_HUB_NAME}"
startingEventPosition = {
"offset": -1,
"seqNo": -1,
"enqueuedTime": None,
"isInclusive": True
}
eventhub_source = SparkEventhubSource(
spark=spark,
options = {
"eventhubs.connectionString": connectionString,
"eventhubs.consumerGroup": "{YOUR-CONSUMER-GROUP}",
"eventhubs.startingPosition": json.dumps(startingEventPosition),
"maxEventsPerTrigger" : 1000
}
)
eventhub_source.read_stream()
#Eventhub Source for Batch Queries
from rtdip_sdk.pipelines.sources import SparkEventhubSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
import json
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
connectionString = "Endpoint=sb://{NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY}=;EntityPath={EVENT_HUB_NAME}"
startingEventPosition = {
"offset": -1,
"seqNo": -1,
"enqueuedTime": None,
"isInclusive": True
}
endingEventPosition = {
"offset": None,
"seqNo": -1,
"enqueuedTime": endTime,
"isInclusive": True
}
eventhub_source = SparkEventhubSource(
spark,
options = {
"eventhubs.connectionString": connectionString,
"eventhubs.consumerGroup": "{YOUR-CONSUMER-GROUP}",
"eventhubs.startingPosition": json.dumps(startingEventPosition),
"eventhubs.endingPosition": json.dumps(endingEventPosition)
}
)
eventhub_source.read_batch()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session |
required |
options |
dict
|
A dictionary of Eventhub configurations (See Attributes table below) |
required |
Attributes:
Name | Type | Description |
---|---|---|
eventhubs.connectionString |
str
|
Eventhubs connection string is required to connect to the Eventhubs service. (Streaming and Batch) |
eventhubs.consumerGroup |
str
|
A consumer group is a view of an entire eventhub. Consumer groups enable multiple consuming applications to each have a separate view of the event stream, and to read the stream independently at their own pace and with their own offsets. (Streaming and Batch) |
eventhubs.startingPosition |
JSON str
|
The starting position for your Structured Streaming job. If a specific EventPosition is not set for a partition using startingPositions, then we use the EventPosition set in startingPosition. If nothing is set in either option, we will begin consuming from the end of the partition. (Streaming and Batch) |
eventhubs.endingPosition |
JSON str
|
(JSON str): The ending position of a batch query. This works the same as startingPosition. (Batch) |
maxEventsPerTrigger |
long
|
Rate limit on maximum number of events processed per trigger interval. The specified total number of events will be proportionally split across partitions of different volume. (Stream) |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/eventhub.py
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/eventhub.py
124 125 126 127 128 129 130 |
|
read_batch()
Reads batch data from Eventhubs.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/eventhub.py
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
|
read_stream()
Reads streaming data from Eventhubs.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/eventhub.py
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
|