Read from Amazon Kinesis Data Streams
SparkKinesisSource
Bases: SourceInterface
The Spark Kinesis Source is used to read data from Kinesis in a Databricks environment. Structured streaming from Kinesis is not supported in open source Spark.
Example
from rtdip_sdk.pipelines.sources import SparkKinesisSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()
kinesis_source = SparkKinesisSource(
spark=spark,
options={
"awsAccessKey": "{AWS-ACCESS-KEY}",
"awsSecretKey": "{AWS-SECRET-KEY}",
"streamName": "{STREAM-NAME}",
"region": "{REGION}",
"endpoint": "https://kinesis.{REGION}.amazonaws.com",
"initialPosition": "earliest"
}
)
kinesis_source.read_stream()
OR
kinesis_source.read_batch()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark |
SparkSession
|
Spark Session required to read data from Kinesis |
required |
options |
dict
|
Options that can be specified for a Kinesis read operation (See Attributes table below). Further information on the options is available here |
required |
Attributes:
Name | Type | Description |
---|---|---|
awsAccessKey |
str
|
AWS access key. |
awsSecretKey |
str
|
AWS secret access key corresponding to the access key. |
streamName |
List[str]
|
The stream names to subscribe to. |
region |
str
|
The region the streams are defined in. |
endpoint |
str
|
The regional endpoint for Kinesis Data Streams. |
initialPosition |
str
|
The point to start reading from; earliest, latest, or at_timestamp. |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kinesis.py
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK_DATABRICKS |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kinesis.py
77 78 79 80 81 82 83 |
|
read_batch()
Raises:
Type | Description |
---|---|
NotImplementedError
|
Kinesis only supports streaming reads. To perform a batch read, use the read_stream method of this component and specify the Trigger on the write_stream to be |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kinesis.py
101 102 103 104 105 106 107 108 |
|
read_stream()
Reads streaming data from Kinesis. All of the data in the table is processed as well as any new data that arrives after the stream started.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kinesis.py
110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
|