Compartir a través de


Ejemplos de código para Databricks Connect para Python

Nota:

Este artículo describe Databricks Connect para Databricks Runtime 13.3 LTS y versiones posteriores.

En este artículo se proporcionan ejemplos de código que usan Databricks Connect para Python. Databricks Connect le permite conectar los clústeres de Azure Databricks a entornos de desarrollo integrado populares, servidores de cuadernos y otras aplicaciones personalizadas. Consulte ¿Qué es Databricks Connect?. Para obtener la versión de Scala de este artículo, consulte Ejemplos de código para Databricks Connect para Scala.

Antes de empezar a usar Databricks Connect, debe configurar el cliente de Databricks Connect.

En los ejemplos siguientes se supone que usa la autenticación predeterminada para la configuración del cliente de Databricks Connect.

Ejemplo: Leer una tabla

Este ejemplo de código simple consulta la tabla especificada y, a continuación, muestra las primeras 5 filas de la tabla especificada.

from databricks.connect import DatabricksSession

spark = DatabricksSession.builder.getOrCreate()

df = spark.read.table("samples.nyctaxi.trips")
df.show(5)

Ejemplo: Crear un dataframe

El ejemplo de código siguiente:

  1. Crea un DataFrame en memoria.
  2. Crea una tabla con el nombre zzz_demo_temps_table dentro del esquema default. Si la tabla con este nombre ya existe, primero se elimina la tabla. Para usar un esquema o tabla diferente, ajuste las llamadas a spark.sql, temps.write.saveAsTable o ambas.
  3. Guarda el contenido del dataframe en la tabla.
  4. Ejecuta una SELECT consulta en el contenido de la tabla.
  5. Muestra el resultado de la consulta.
  6. Elimina la tabla.
from databricks.connect import DatabricksSession
from pyspark.sql.types import *
from datetime import date

spark = DatabricksSession.builder.getOrCreate()

# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
  StructField('AirportCode', StringType(), False),
  StructField('Date', DateType(), False),
  StructField('TempHighF', IntegerType(), False),
  StructField('TempLowF', IntegerType(), False)
])

data = [
  [ 'BLI', date(2021, 4, 3), 52, 43],
  [ 'BLI', date(2021, 4, 2), 50, 38],
  [ 'BLI', date(2021, 4, 1), 52, 41],
  [ 'PDX', date(2021, 4, 3), 64, 45],
  [ 'PDX', date(2021, 4, 2), 61, 41],
  [ 'PDX', date(2021, 4, 1), 66, 39],
  [ 'SEA', date(2021, 4, 3), 57, 43],
  [ 'SEA', date(2021, 4, 2), 54, 39],
  [ 'SEA', date(2021, 4, 1), 56, 41]
]

temps = spark.createDataFrame(data, schema)

# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS zzz_demo_temps_table')
temps.write.saveAsTable('zzz_demo_temps_table')

# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " \
  "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
  "GROUP BY AirportCode, Date, TempHighF, TempLowF " \
  "ORDER BY TempHighF DESC")
df_temps.show()

# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode|      Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# |        PDX|2021-04-03|       64|      45|
# |        PDX|2021-04-02|       61|      41|
# |        SEA|2021-04-03|       57|      43|
# |        SEA|2021-04-02|       54|      39|
# +-----------+----------+---------+--------+

# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE zzz_demo_temps_table')

Ejemplo: Uso de DatabricksSesssion o SparkSession

En el ejemplo siguiente se describe cómo escribir código portátil entre Databricks Connect para Databricks Runtime 13.3 LTS y versiones posteriores en entornos en los que la DatabricksSession clase no está disponible, en cuyo caso usa la SparkSession clase en su lugar para consultar la tabla especificada y devolver las primeras 5 filas. Este ejemplo usa la variable de entorno SPARK_REMOTE para la autenticación.

from pyspark.sql import SparkSession, DataFrame

def get_spark() -> SparkSession:
  try:
    from databricks.connect import DatabricksSession
    return DatabricksSession.builder.getOrCreate()
  except ImportError:
    return SparkSession.builder.getOrCreate()

def get_taxis(spark: SparkSession) -> DataFrame:
  return spark.read.table("samples.nyctaxi.trips")

get_taxis(get_spark()).show(5)

Recursos adicionales

Databricks proporciona aplicaciones de ejemplo adicionales que muestran cómo usar Databricks Connect en el repositorio de GitHub de Databricks Connect, incluido lo siguiente: