Jobs
In a production environment, pipelines will be run as jobs that are either batch jobs executed on a schedule or a streaming job executed to be run continuously.
Build a Pipeline
Prerequisites
Ensure that you have followed the installation instructions as specified in the Getting Started section and follow the steps which highlight the installation requirements for Pipelines. In particular:
- RTDIP SDK Installation
- Java - If your pipeline steps utilize pyspark then Java must be installed.
RTDIP SDK installation
Ensure you have installed the RTDIP SDK, as a minimum, as follows:
pip install "rtdip-sdk[pipelines]"
For all installation options please see the RTDIP SDK installation instructions.
Import
Import the required components of a Pipeline Job.
from rtdip_sdk.pipelines.execute import PipelineJob, PipelineStep, PipelineTask
from rtdip_sdk.pipelines.sources import SparkEventhubSource
from rtdip_sdk.pipelines.transformers import BinaryToStringTransformer
from rtdip_sdk.pipelines.destinations import SparkDeltaDestination
from rtdip_sdk.pipelines.secrets import PipelineSecret, DatabricksSecrets
import json
Steps
Pipeline steps are constructed from components and added to a Pipeline task as a list. Each component is created as a PipelineStep
and populated with the following information.
Parameter | Description | Requirements |
---|---|---|
Name | Each component requires a unique name that also facilitates dependencies between each component | Contains only letters, numbers and underscores |
Description | A brief description of each component | Will populate certain components of a runtime such as Delta Live Tables |
Component | The component Class | Populate with the Class Name |
Component Parameters | Configures the component with specific information, such as connection information and component specific settings | Use Pipeline Secrets for sensitive Information |
Depends On Step | Specifies any component names that must be executed prior to this component | A python list of component names |
Provides Output To Step | Specifies any component names that require this component's output as an input | A python list of component names |
step_list = []
# read step
eventhub_configuration = {
"eventhubs.connectionString": PipelineSecret(type=DatabricksSecrets, vault="test_vault", key="test_key"),
"eventhubs.consumerGroup": "$Default",
"eventhubs.startingPosition": json.dumps({"offset": "0", "seqNo": -1, "enqueuedTime": None, "isInclusive": True})
}
step_list.append(PipelineStep(
name="test_step1",
description="test_step1",
component=SparkEventhubSource,
component_parameters={"options": eventhub_configuration},
provide_output_to_step=["test_step2"]
))
# transform step
step_list.append(PipelineStep(
name="test_step2",
description="test_step2",
component=BinaryToStringTransformer,
component_parameters={
"source_column_name": "body",
"target_column_name": "body"
},
depends_on_step=["test_step1"],
provide_output_to_step=["test_step3"]
))
# write step
step_list.append(PipelineStep(
name="test_step3",
description="test_step3",
component=SparkDeltaDestination,
component_parameters={
"destination": "test_table",
"options": {},
"mode": "overwrite"
},
depends_on_step=["test_step2"]
))
Tasks
Tasks contain a list of steps. Each task is created as a PipelineTask
and populated with the following information.
Parameter | Description | Requirements |
---|---|---|
Name | Each task requires a unique name | Contains only letters, numbers and underscores |
Description | A brief description of the task | Will populate certain components of a runtime such as Delta Live Tables |
Step List | A python list of steps that are to be executed by the task | A list of step names that contain only letters, numbers and underscores |
Batch Task | The task should be executed as a batch task | Optional, defaults to False |
task = PipelineTask(
name="test_task",
description="test_task",
step_list=step_list,
batch_task=True
)
Jobs
Jobs contain a list of tasks. A job is created as a PipelineJob
and populated with the following information.
Parameter | Description | Requirements |
---|---|---|
Name | The Job requires a unique name | Contains only letters, numbers and underscores |
Description | A brief description of the job | Will populate certain components of a runtime such as Delta Live Tables |
Version | Enables version control of the task for certain environments | Follow semantic versioning |
Task List | A python list of tasks that are to be executed by the job | A list of task names that contain only letters, numbers and underscores |
pipeline_job = PipelineJob(
name="test_job",
description="test_job",
version="0.0.1",
task_list=[task]
)
Execute
Pipeline Jobs can be executed directly if the run environment where the code has been written facilitates it. To do so, the above Pipeline Job can be executed as follows:
Pyspark Installation
Ensure you have Java installed in your environment and you have installed pyspark using the below command:
pip install "rtdip-sdk[pipelines,pyspark]"
from rtdip_sdk.pipelines.execute import PipelineJobExecute
pipeline = PipelineJobExecute(pipeline_job)
result = pipeline.run()
Conclusion
The above sets out how a Pipeline Job can be constructed and executed. Most pipelines, however, will be exevcuted by orchestration engines. See the Deploy section for more information above how Pipeline Jobs can be deployed and executed in this way.