Databricks Lakehouse
This example demonstrates how to use the Databricks Lakehouse
connector in ydata-sdk
.
Don't forget to set up your license key
Example Code
"""Example file on how to use the connector for the Databricks Unity
Catalog."""
from ydata.connectors import DatabricksLakehouse
HOST = 'insert-databricks-host'
TOKEN = 'insert-token'
AWS_ACCESS_KEY = 'insert-aws-key'
SECRET_ACCESS_KEY = 'insert-secret'
CATALOG = 'catalog'
SCHEMA = 'schema'
TABLE = 'table'
WAREHOUSE = 'Starter Warehouse'
if __name__ == "__main__":
# The Databricks Lakehouse requires several inputs:
# Credentials to one of the following object storage (AWS S3 or Azure Blob Storage)
# Databricks host and access token
conn = DatabricksLakehouse(
host=HOST,
access_token=TOKEN,
staging_credentials={'access_key_id': AWS_ACCESS_KEY,
'secret_access_key': SECRET_ACCESS_KEY},
catalog='catalog',
schema='schema',
cloud='aws'
)
# Based on the provided Databricks host and access token
# list all the available SQL Warehouses (query engines)
# It returns a dictionary with the following structure
# {'warehouse_name': {'host': '', 'http': '', 'port': ''}}
# Meaning it returns the name of the warehouse along with the odbc conn params
print(conn.list_sqlwarehouses())
# list all the available Catalogs
# It returns a list with the name of the available Databricks Deltalake Catalogs
print(conn.list_catalogs())
# list all the available schemas for a given catalog
# It returns a list with the name of the available Schemas
print(conn.list_schemas(catalog=CATALOG))
# list all the available tables for a given schema
# It returns a list with the name of the available Tables
print(conn.list_tables(catalog=CATALOG,
schema=SCHEMA))
# Get the data from a table
# It returns a dataset with the all data from a table
data = conn.get_table(table='new_validation',
warehouse=WAREHOUSE)
print(data)
data = conn.get_table_sample(table='cardio',
warehouse=WAREHOUSE)
# Get the data based on a retrieved query result
# It returns a dataset with the all retrieved data
data = conn.query(query="SELECT * FROM ydata.default.trans;",
warehouse=WAREHOUSE)
print(data)
# Get the data based on a retrieved query result
# It returns a Dataset with n records given the retrieved data
data = conn.query_sample(query="SELECT * FROM ydata.default.trans;",
warehouse=WAREHOUSE,
sample_size=5000)
print(data)
# Write the data to a chosen table
# It requires the path of the staging area as well as the catalog details including:
# Catalog, schema and table names
# The process of ingesting data into Databricks DeltaLake goes: write to file storage (staging area) > Copy data from staging into DeltaLake
conn.write_table(data=data,
staging_path='s3://path-to-file/table',
catalog='catalog',
schema='schema',
warehouse=WAREHOUSE,
table='table',
if_exists='replace')