Skip to content

Databricks Lakehouse

This example demonstrates how to use the Databricks Lakehouse connector in ydata-sdk.

Don't forget to set up your license key

    import os

    os.environ['YDATA_LICENSE_KEY'] = '{add-your-key}'

Example Code

"""Example file on how to use the connector for the Databricks Unity
Catalog."""
from ydata.connectors import DatabricksLakehouse

HOST = 'insert-databricks-host'
TOKEN = 'insert-token'

AWS_ACCESS_KEY = 'insert-aws-key'
SECRET_ACCESS_KEY = 'insert-secret'

CATALOG = 'catalog'
SCHEMA = 'schema'
TABLE = 'table'
WAREHOUSE = 'Starter Warehouse'

if __name__ == "__main__":

    # The Databricks Lakehouse requires several inputs:
    # Credentials to one of the following object storage (AWS S3 or Azure Blob Storage)
    # Databricks host and access token
    conn = DatabricksLakehouse(
        host=HOST,
        access_token=TOKEN,
        staging_credentials={'access_key_id': AWS_ACCESS_KEY,
                             'secret_access_key': SECRET_ACCESS_KEY},
        catalog='catalog',
        schema='schema',
        cloud='aws'
    )

    # Based on the provided Databricks host and access token
    # list all the available SQL Warehouses (query engines)
    # It returns a dictionary with the following structure
    # {'warehouse_name': {'host': '', 'http': '', 'port': ''}}
    # Meaning it returns the name of the warehouse along with the odbc conn params
    print(conn.list_sqlwarehouses())

    # list all the available Catalogs
    # It returns a list with the name of the available Databricks Deltalake Catalogs
    print(conn.list_catalogs())

    # list all the available schemas for a given catalog
    # It returns a list with the name of the available Schemas
    print(conn.list_schemas(catalog=CATALOG))

    # list all the available tables for a given schema
    # It returns a list with the name of the available Tables
    print(conn.list_tables(catalog=CATALOG,
                           schema=SCHEMA))

    # Get the data from a table
    # It returns a dataset with the all data from a table
    data = conn.get_table(table='new_validation',
                          warehouse=WAREHOUSE)

    print(data)

    data = conn.get_table_sample(table='cardio',
                                 warehouse=WAREHOUSE)

    # Get the data based on a retrieved query result
    # It returns a dataset with the all retrieved data
    data = conn.query(query="SELECT * FROM ydata.default.trans;",
                      warehouse=WAREHOUSE)

    print(data)

    # Get the data based on a retrieved query result
    # It returns a Dataset with n records given the retrieved data
    data = conn.query_sample(query="SELECT * FROM ydata.default.trans;",
                             warehouse=WAREHOUSE,
                             sample_size=5000)

    print(data)

    # Write the data to a chosen table
    # It requires the path of the staging area as well as the catalog details including:
    # Catalog, schema and table names
    # The process of ingesting data into Databricks DeltaLake goes: write to file storage (staging area) > Copy data from staging into DeltaLake
    conn.write_table(data=data,
                     staging_path='s3://path-to-file/table',
                     catalog='catalog',
                     schema='schema',
                     warehouse=WAREHOUSE,
                     table='table',
                     if_exists='replace')