Share via


Test the Databricks ODBC Driver (Simba)

This page describes how to test code that uses the Databricks ODBC Driver.

Use any test framework for ODBC-compatible languages. The following examples use pyodbc, pytest, and unittest.mock to test ODBC driver connections. This code is based on the example in Connect Python and pyodbc to Azure Databricks.

Helper functions

The helpers.py file contains utility functions for working with ODBC connections:

  • connect_to_dsn: Opens a connection to a Azure Databricks compute resource.
  • get_cursor_from_connection: Obtains a cursor for executing queries.
  • select_from_nyctaxi_trips: Queries the specified number of rows from samples.nyctaxi.trips.
  • print_rows: Prints the result set contents to the console.
# helpers.py

from pyodbc import connect, Connection, Cursor

def connect_to_dsn(
  connstring: str,
  autocommit: bool
) -> Connection:

  connection = connect(
    connstring,
    autocommit = autocommit
  )

  return connection

def get_cursor_from_connection(
  connection: Connection
) -> Cursor:

  cursor = connection.cursor()
  return cursor

def select_from_nyctaxi_trips(
  cursor: Cursor,
  num_rows: int
) -> Cursor:

  select_cursor = cursor.execute(f"SELECT * FROM samples.nyctaxi.trips LIMIT {num_rows}")
  return select_cursor

def print_rows(cursor: Cursor):
  for row in cursor.fetchall():
    print(row)

Main class

The main.py file calls the helper functions to connect and query data:

# main.py

from helpers import *

connection = connect_to_dsn(
  connstring = "DSN=<your-dsn-name>",
  autocommit = True
)

cursor = get_cursor_from_connection(
  connection = connection)

select_cursor = select_from_nyctaxi_trips(
  cursor = cursor,
  num_rows = 2
)

print_rows(
  cursor = select_cursor
)

Unit tests with mocking

The test_helpers.py file uses pytest and unittest.mock to test the select_from_nyctaxi_trips function. Mocking simulates database connections without using actual compute resources, so tests run in seconds without affecting your Azure Databricks workspaces.

# test_helpers.py

from pyodbc import SQL_DBMS_NAME
from helpers import *
from unittest.mock import patch
import datetime

@patch("helpers.connect_to_dsn")
def test_connect_to_dsn(mock_connection):
  mock_connection.return_value.getinfo.return_value = "Spark SQL"

  mock_connection = connect_to_dsn(
    connstring = "DSN=<your-dsn-name>",
    autocommit = True
  )

  assert mock_connection.getinfo(SQL_DBMS_NAME) == "Spark SQL"

@patch('helpers.get_cursor_from_connection')
def test_get_cursor_from_connection(mock_connection):
  mock_cursor = mock_connection.return_value.cursor
  mock_cursor.return_value.rowcount = -1

  mock_connection = connect_to_dsn(
    connstring = "DSN=<your-dsn-name>",
    autocommit = True
  )

  mock_cursor = get_cursor_from_connection(
    connection = mock_connection
  )

  assert mock_cursor.rowcount == -1

@patch('helpers.select_from_nyctaxi_trips')
def test_select_from_nyctaxi_trips(mock_connection):
  mock_cursor = mock_connection.return_value.cursor
  mock_get_cursor = mock_cursor.return_value.execute
  mock_select_cursor = mock_get_cursor.return_value.arraysize = 1

  mock_connection = connect_to_dsn(
    connstring = "DSN=<your-dsn-name>",
    autocommit = True
  )

  mock_get_cursor = get_cursor_from_connection(
    connection = mock_connection
  )

  mock_select_cursor = select_from_nyctaxi_trips(
    cursor = mock_get_cursor,
    num_rows = 2
  )

  assert mock_select_cursor.arraysize == 1

@patch('helpers.print_rows')
def test_print_rows(mock_connection, capsys):
  mock_cursor = mock_connection.return_value.cursor
  mock_get_cursor = mock_cursor.return_value.execute
  mock_select_cursor = mock_get_cursor.return_value.fetchall.return_value = [
    (datetime.datetime(2016, 2, 14, 16, 52, 13), datetime.datetime(2016, 2, 14, 17, 16, 4), 4.94, 19.0, 10282, 10171),
    (datetime.datetime(2016, 2, 4, 18, 44, 19), datetime.datetime(2016, 2, 4, 18, 46), 0.28, 3.5, 10110, 10110)
  ]

  mock_connection = connect_to_dsn(
    connstring = "DSN=<your-dsn-name>",
    autocommit = True
  )

  mock_get_cursor = get_cursor_from_connection(
    connection = mock_connection
  )

  mock_select_cursor = select_from_nyctaxi_trips(
    cursor = mock_get_cursor,
    num_rows = 2
  )

  print_rows(
    cursor = mock_select_cursor
  )

  captured = capsys.readouterr()
  assert captured.out == "(datetime.datetime(2016, 2, 14, 16, 52, 13), datetime.datetime(2016, 2, 14, 17, 16, 4), 4.94, 19.0, 10282, 10171)\n" \
                         "(datetime.datetime(2016, 2, 4, 18, 44, 19), datetime.datetime(2016, 2, 4, 18, 46), 0.28, 3.5, 10110, 10110)\n"

Because select_from_nyctaxi_trips only runs a SELECT statement, mocking isn't strictly required here. However, mocking is especially useful when testing functions that modify data (INSERT INTO, UPDATE, DELETE FROM), as you can run tests repeatedly without affecting table state.