Google BigQuery is a “serverless” data warehouse platform stored in the Google Cloud Platform. The serverless approach means you don’t have to maintain a server yourself and Google looks after security, and it can scale to handle massive volumes of data.
Like other data warehouse platforms, BigQuery is designed specifically for storing data and allowing you to query it quickly. It’s designed to be used with other platforms, such as Google Data Studio, where BigQuery data can be queried, visualised and used to create reports and data exploration tools.
Here’s how you can get up and running with Google BigQuery using Python, Pandas, and MySQL. However, the same guidance will also work for a wide range of other databases that can be queried using SQLAlchemy’s database drivers.
Google provides a Python API for authenticating and querying BigQuery, which you can install via a PyPi package using Pip. As well as the google-cloud-bigquery
package you’ll also need to install google-cloud-bigquery-storage
.
!pip3 install google-cloud-bigquery
!pip3 install google-cloud-bigquery-storage
To get up and running you will need to login to your Google account, visit the Google Cloud Platform and create a new project. You’ll then need to turn on the Big Query API and create a Service Account User and Key and save the keyfile in JSON format. The Service Account User needs to have “Project > Owner” privileges.
Create a Python script or Jupyter notebook and load up the packages required for this project. We’ll be using OS, Pandas, SQLAlchemy, and the Google Cloud API.
import os
import pandas as pd
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from sqlalchemy import create_engine
Next, place the JSON keyfile in a location on your machine and create a Python script and use os.environ()
to export the location of the keyfile to your PATH with the environmental variable GOOGLE_APPLICATION_CREDENTIALS
.
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "/home/matt/.config/GoogleBigQuery/google-big-query.json"
To start off with we need to create a dataset on BigQuery. We’ll create a reusable function that creates a BigQuery
Client()
object connection and then creates a dataset of a given name if it doesn’t already exist in BigQuery.
We’ll also define the location to use for the BigQuery dataset. I’ve opted for europe-west2
which is a data centre
in London. If you run the query and visit the BigQuery consoleyou
should see your dataset. This uses a Python try except to catch the errors.
def create_dataset(dataset_id, region_name):
"""Create a dataset in Google BigQuery if it does not exist.
:param dataset_id: Name of dataset
:param region_name: Region name for data center, i.e. europe-west2 for London
:return: Create dataset
"""
client = bigquery.Client()
reference = client.dataset(dataset_id)
try:
client.get_dataset(reference)
except NotFound:
dataset = bigquery.Dataset(reference)
dataset.location = region_name
dataset = client.create_dataset(dataset)
create_dataset('competitors','europe-west2')
There are a few different ways you can get BigQuery to “ingest” data. One of the easiest is to load data into a table from a Pandas dataframe. Here, you use the load_table_from_dataframe()
function and pass it the Pandas dataframe and the name of the table (i.e. competitors.products
). BigQuery will read the dataframe and create the schema for you.
def insert(df, table):
"""Insert data from a Pandas dataframe into Google BigQuery.
:param df: Name of Pandas dataframe
:param table: Name of BigQuery dataset and table, i.e. competitors.products
:return: BigQuery job object
"""
client = bigquery.Client()
return client.load_table_from_dataframe(df, table)
Since Pandas can import a wide range of data sources into a dataframe, this function gives you the ability to import almost anything into BigQuery. For example, we can use read_csv()
to read a CSV file and create a new table. There’s also read_json()
if you want to import JSON data to BigQuery, read_excel()
for Microsoft Excel spreadsheets, read_html()
to scrape HTML, read_sas()
for SAS files, read_spss()
for SPSS files, read_stata()
for Stata DTA files and several more.
df = pd.read_csv('products.csv')
job = insert(df, 'competitors.products')
We can use a similar approach with MySQL by using the SQLAlchemy database toolkit package. By using SQLAlchemy with Pandas’ read_sql()
function you can create simple Extract Transform Load (or ETL) data pipelines that take data from a database, pull it into a Pandas dataframe and push it into BigQuery.
SQLAlchemy supports a range of databases, including MySQL, Postgresql, SQLite, Oracle, MS-SQL, Firebird, Sybase, and several others, so you can create ETL pipelines for most systems and push data to BigQuery. To use SQLAlchemy with MySQL you will need to use Pip to install the sqlalchemy
and pymysql
packages.
engine = create_engine('mysql+pymysql://root:PASSWORD@172.17.0.2:3306/database_name')
query = """
SELECT
DATE_FORMAT(shoporders.datecreated, '%%Y%%m') AS period,
COUNT(DISTINCT(shoporders.id)) AS orders,
COUNT(DISTINCT(shoporders.customerid)) AS customers,
SUM(shoporders.ordertotal) AS revenue,
ROUND((SUM(shoporders.ordertotal) / COUNT(DISTINCT(shoporders.customerid))),2) AS aov,
shoporderorigins.title AS channel
FROM shoporders
LEFT JOIN shoporderorigins ON shoporders.orderoriginid = shoporderorigins.id
WHERE shoporderorigins.title = 'Amazon'
GROUP BY DATE_FORMAT(shoporders.datecreated, '%%Y%%m')
ORDER BY DATE_FORMAT(shoporders.datecreated, '%%Y%%m') DESC
"""
df = pd.read_sql(query, con=engine)
job = insert(df, 'orders.monthly')
BigQuery is queried using SQL via the query()
function. You need to define the dataset and the table name in your SQL statement. If you append to_dataframe()
you can return the BigQuery resultset in a Pandas dataframe.
def select(sql):
"""Select data from Google BigQuery table and return a Pandas dataframe.
:param sql: Google BigQuery SQL statement
:return: Pandas dataframe
"""
client = bigquery.Client()
return client.query(sql).to_dataframe()
sql = """
SELECT *
FROM `competitors.products`
LIMIT 100
"""
df = select(sql)
BigQuery doesn’t include a TRUNCATE
command, but you can now DELETE
all of the content from a table using WHERE 1=1
. In order to perform a DELETE
operation you need to use BigQuery’s Data Manipulation Language or DML. To use the DML you need to enable billing and you’ll be limited to a certain number of DML queries per day. Here’s the code to truncate a table.
def truncate(table):
"""Truncate a Google BigQuery table
:param sql: Google BigQuery dataset and table, i.e. competitors.products
"""
client = bigquery.Client()
query = ("DELETE FROM "+ table +" WHERE 1=1")
job_config = bigquery.QueryJobConfig(use_legacy_sql=False)
query_job = client.query(query, job_config=job_config)
truncate('competitors.products')
To completely remove a dataset you can use the delete_dataset()
function. Unless you pass the delete_contents=True
parameter, you’ll only be able to delete the dataset if it’s empty. If you add this, it will remove the dataset and all of the tables within, so you need to use it with caution.
def delete_dataset(dataset_id):
"""Delete a dataset in Google BigQuery if it exists, including its tables.
:param name: Name of dataset
:return: Create dataset
"""
client = bigquery.Client()
client.delete_dataset(
dataset_id, delete_contents=True, not_found_ok=True
)
print("Deleted dataset '{}'.".format(dataset_id))
delete_dataset('ecommerce')
Since you’re paying for the data you query, it’s a best practice to perform a dry run of your queries before you run them to determine the number of bytes of your quota the query will use. You can do that with the code below.
To avoid going over your quota or incurring high costs, you need to focus on writing efficient queries. Only SELECT
the columns you require and don’t use SELECT * FROM
. Rather than using LIMIT
, which will still query the whole table, the recommended approach is to split up your dataset with partitions.
def select_dry_run(sql):
"""Perform a dry run of a select query on a Google Big Query table
and return the number of bytes the query will use.
:param sql: Google BigQuery SQL statement
:return: Number of bytes the query will use
"""
client = bigquery.Client()
job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
query_job = client.query((sql),job_config=job_config,)
return format(query_job.total_bytes_processed)
query_bytes = select_dry_run('select * from competitors.products')
query_bytes
def get_table_schema(dataset_id, table_id):
"""Return the schema of a Google BigQuery table.
:param dataset_id: Dataset ID, i.e. competitors
:param table_id: Table ID, i.e. products
:return: Schema of table
"""
client = bigquery.Client()
dataset_ref = client.dataset(dataset_id)
table_reference = bigquery.table.TableReference(dataset_ref, table_id)
table = client.get_table(table_reference)
return table.schema
schema = get_table_schema('competitors','products')
schema
Matt Clarke, Thursday, March 04, 2021