Skip to content

TURBODBC SQL Connector

TURBODBCSQLConnection

Bases: ConnectionInterface

Turbodbc is a python module used to access relational databases through an ODBC interface. It will allow a user to connect to databricks clusters or sql warehouses.

Turbodbc offers built-in NumPy support allowing it to be much faster for processing compared to other connectors. To find details for SQL warehouses server_hostname and http_path location to the SQL Warehouse tab in the documentation.

Parameters:

Name Type Description Default
server_hostname str

hostname for the cluster or SQL Warehouse

required
http_path str

Http path for the cluster or SQL Warehouse

required
access_token str

Azure AD Token

required
Note

More fields such as driver can be configured upon extension.

Source code in src/sdk/python/rtdip_sdk/connectors/odbc/turbodbc_sql_connector.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
class TURBODBCSQLConnection(ConnectionInterface):
    """
    Turbodbc is a python module used to access relational databases through an ODBC interface. It will allow a user to connect to databricks clusters or sql warehouses.

    Turbodbc offers built-in NumPy support allowing it to be much faster for processing compared to other connectors.
    To find details for SQL warehouses server_hostname and http_path location to the SQL Warehouse tab in the documentation.

    Args:
        server_hostname: hostname for the cluster or SQL Warehouse
        http_path: Http path for the cluster or SQL Warehouse
        access_token: Azure AD Token

    Note:
        More fields such as driver can be configured upon extension.
    """

    def __init__(
        self,
        server_hostname: str,
        http_path: str,
        access_token: str,
        return_type=ConnectionReturnType.Pandas,
    ) -> None:
        _package_version_meets_minimum("turbodbc", "4.0.0")
        self.server_hostname = server_hostname
        self.http_path = http_path
        self.access_token = access_token
        self.return_type = return_type
        # call auth method
        self.connection = self._connect()
        self.open = True

    def _connect(self):
        """Connects to the endpoint"""
        try:
            options = make_options(
                autocommit=True, read_buffer_size=Megabytes(100), use_async_io=True
            )

            return connect(
                Driver="Simba Spark ODBC Driver",
                Host=self.server_hostname,
                Port=443,
                SparkServerType=3,
                ThriftTransport=2,
                SSL=1,
                AuthMech=11,
                Auth_AccessToken=self.access_token,
                Auth_Flow=0,
                HTTPPath=self.http_path,
                UseNativeQuery=1,
                FastSQLPrepare=1,
                ApplyFastSQLPrepareToAllQueries=1,
                DisableLimitZero=1,
                EnableAsyncExec=1,
                RowsFetchedPerBlock=os.getenv("RTDIP_ODBC_ROW_BLOCK_SIZE", 500000),
                UserAgentEntry="RTDIP",
                turbodbc_options=options,
            )

        except Exception as e:
            logging.exception("error while connecting to the endpoint")
            raise e

    def close(self) -> None:
        """Closes connection to database."""
        try:
            self.connection.close()
            self.open = False
        except Exception as e:
            logging.exception("error while closing the connection")
            raise e

    def cursor(self) -> object:
        """
        Intiates the cursor and returns it.

        Returns:
          TURBODBCSQLCursor: Object to represent a databricks workspace with methods to interact with clusters/jobs.
        """
        try:
            if self.open == False:
                self.connection = self._connect()
            return TURBODBCSQLCursor(
                self.connection.cursor(), return_type=self.return_type
            )
        except Exception as e:
            logging.exception("error with cursor object")
            raise e

close()

Closes connection to database.

Source code in src/sdk/python/rtdip_sdk/connectors/odbc/turbodbc_sql_connector.py
89
90
91
92
93
94
95
96
def close(self) -> None:
    """Closes connection to database."""
    try:
        self.connection.close()
        self.open = False
    except Exception as e:
        logging.exception("error while closing the connection")
        raise e

cursor()

Intiates the cursor and returns it.

Returns:

Name Type Description
TURBODBCSQLCursor object

Object to represent a databricks workspace with methods to interact with clusters/jobs.

Source code in src/sdk/python/rtdip_sdk/connectors/odbc/turbodbc_sql_connector.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def cursor(self) -> object:
    """
    Intiates the cursor and returns it.

    Returns:
      TURBODBCSQLCursor: Object to represent a databricks workspace with methods to interact with clusters/jobs.
    """
    try:
        if self.open == False:
            self.connection = self._connect()
        return TURBODBCSQLCursor(
            self.connection.cursor(), return_type=self.return_type
        )
    except Exception as e:
        logging.exception("error with cursor object")
        raise e

TURBODBCSQLCursor

Bases: CursorInterface

Object to represent a databricks workspace with methods to interact with clusters/jobs.

Parameters:

Name Type Description Default
cursor object

controls execution of commands on cluster or SQL Warehouse

required
Source code in src/sdk/python/rtdip_sdk/connectors/odbc/turbodbc_sql_connector.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
class TURBODBCSQLCursor(CursorInterface):
    """
    Object to represent a databricks workspace with methods to interact with clusters/jobs.

    Args:
        cursor: controls execution of commands on cluster or SQL Warehouse
    """

    def __init__(self, cursor: object, return_type=ConnectionReturnType.Pandas) -> None:
        self.cursor = cursor
        self.return_type = return_type

    def execute(self, query: str) -> None:
        """
        Prepares and runs a database query.

        Args:
            query: sql query to execute on the cluster or SQL Warehouse
        """
        try:
            self.cursor.execute(query)
        except Exception as e:
            logging.exception("error while executing the query")
            raise e

    def fetch_all(self) -> list:
        """
        Gets all rows of a query.

        Returns:
            list: list of results
        """
        try:
            result = self.cursor.fetchallarrow()
            if self.return_type == ConnectionReturnType.Pyarrow:
                return result
            elif self.return_type == ConnectionReturnType.Pandas:
                return result.to_pandas()
        except Exception as e:
            logging.exception("error while fetching the rows from the query")
            raise e

    def close(self) -> None:
        """Closes the cursor."""
        try:
            self.cursor.close()
        except Exception as e:
            logging.exception("error while closing the cursor")
            raise e

execute(query)

Prepares and runs a database query.

Parameters:

Name Type Description Default
query str

sql query to execute on the cluster or SQL Warehouse

required
Source code in src/sdk/python/rtdip_sdk/connectors/odbc/turbodbc_sql_connector.py
128
129
130
131
132
133
134
135
136
137
138
139
def execute(self, query: str) -> None:
    """
    Prepares and runs a database query.

    Args:
        query: sql query to execute on the cluster or SQL Warehouse
    """
    try:
        self.cursor.execute(query)
    except Exception as e:
        logging.exception("error while executing the query")
        raise e

fetch_all()

Gets all rows of a query.

Returns:

Name Type Description
list list

list of results

Source code in src/sdk/python/rtdip_sdk/connectors/odbc/turbodbc_sql_connector.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def fetch_all(self) -> list:
    """
    Gets all rows of a query.

    Returns:
        list: list of results
    """
    try:
        result = self.cursor.fetchallarrow()
        if self.return_type == ConnectionReturnType.Pyarrow:
            return result
        elif self.return_type == ConnectionReturnType.Pandas:
            return result.to_pandas()
    except Exception as e:
        logging.exception("error while fetching the rows from the query")
        raise e

close()

Closes the cursor.

Source code in src/sdk/python/rtdip_sdk/connectors/odbc/turbodbc_sql_connector.py
158
159
160
161
162
163
164
def close(self) -> None:
    """Closes the cursor."""
    try:
        self.cursor.close()
    except Exception as e:
        logging.exception("error while closing the cursor")
        raise e