Write to Kafka
SparkKafkaDestination
Bases: DestinationInterface
This Spark destination class is used to write batch or streaming data from Kafka. Required and optional configurations can be found in the Attributes tables below.
Additionally, there are more optional configurations which can be found here.
For compatibility between Spark and Kafka, the columns in the input dataframe are concatenated into one 'value' column of JSON string.
Example
from rtdip_sdk.pipelines.destinations import SparkKafkaDestination
kafka_destination = SparkKafkaDestination(
data=df,
options={
"kafka.bootstrap.servers": "host1:port1,host2:port2"
},
trigger="10 seconds",
query_name="KafkaDestination",
query_wait_interval=None
)
kafka_destination.write_stream()
OR
kafka_destination.write_batch()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
DataFrame
|
Dataframe to be written to Kafka |
required |
options |
dict
|
A dictionary of Kafka configurations (See Attributes tables below). For more information on configuration options see here |
required |
trigger |
optional str
|
Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes". Set to "0 seconds" if you do not want to use a trigger. (stream) Default is 10 seconds |
'10 seconds'
|
query_name |
str
|
Unique name for the query in associated SparkSession |
'KafkaDestination'
|
query_wait_interval |
optional int
|
If set, waits for the streaming query to complete before returning. (stream) Default is None |
None
|
The following options must be set for the Kafka destination for both batch and streaming queries.
Attributes:
Name | Type | Description |
---|---|---|
kafka.bootstrap.servers |
A comma-separated list of hostī¸°port
|
The Kafka "bootstrap.servers" configuration. (Streaming and Batch) |
The following configurations are optional:
Attributes:
Name | Type | Description |
---|---|---|
topic |
str
|
Sets the topic that all rows will be written to in Kafka. This option overrides any topic column that may exist in the data. (Streaming and Batch) |
includeHeaders |
bool
|
Whether to include the Kafka headers in the row. (Streaming and Batch) |
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka.py
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka.py
96 97 98 99 100 101 102 |
|
write_batch()
Writes batch data to Kafka.
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka.py
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
|
write_stream()
Writes steaming data to Kafka.
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka.py
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
|