In terms of technical ingenuity, Google BigQuery is probably the most impressive data warehouse on the market. BigQuery differs from other data warehouses in that the underlying architecture is shared by everybody on Google Cloud, meaning you don't need to pay for a dedicated cluster of expensive servers to occasionally run queries for large-scale data analysis.

Moving data into data warehouses traditionally involves dumping a shit ton of unstructured or semi-structured files into storage such as S3, Google Storage, or data lakes before loading them to their destination. It's no surprise that generations inspired by MapReduce perpetuate outdated processes in technology, as fans of Java typically do (don't @ me). Google upheld this status quo with BigQuery, which we painfully worked through in a previous tutorial. Luckily there are mysterious heroes among us, known only as "third-party developers."

PyBigQuery is a connector for SQLAlchemy, which allows us to connect to and query a BigQuery table as though it were a relational database, as you've certainly done with either PyMySQL or Psycopg2. It really is as easy as it sounds. To make things interesting, we’re going to build a small script to pipe data back and forth from BigQuery to MySQL, and we’ll do it by creating instances of a database-flavor-agnostic class for both sides. Now we’re getting saucy.

DISCLAIMER: Just because we can interact with BigQuery as easily as an ACID-compliant relational database doesn't mean we always should. It's your responsibility to understand the pros and cons of data warehouses, and recognize when to use them.

Making our First Big Query

Let's get you set up to do the bare minimum: pulling rows from BigQuery with SQLAlchemy. I’ll save us the agony of explaining what a gcloud credentials JSON file is and assume you already have one for your project.

The obvious packages we need to install are PyBigQuery and SQLAlchemy. I’m also going to have us grab both PyMySQL and Psycopg2-binary for the hell of it:

$ pip3 install pybigquery sqalchemy pymysql psycopg2-binary

To connect to BigQuery we’ll need the name of the BigQuery dataset your working in, as well as the table name. With that information, we can create our beautiful SQLAlchemy URI, which looks something like this:

bigquery://[GOOGLE_CLOUD_PROJECT_NAME]/[BIGQUERY_DATASET_NAME]
SQLAlchemy URI pattern for BigQuery

I recommend creating a configuration file to build this dynamically. We're going to need all the variables below at some point anyway:

"""BigQuery Configuration."""
from os import environ


# Google Cloud config
gcp_credentials = environ.get('GCP_CREDENTIALS')
gcp_project = environ.get('GCP_PROJECT')

# Google BigQuery config
bigquery_dataset = environ.get('GCP_BIGQUERY_DATASET')
bigquery_table = environ.get('GCP_BIGQUERY_TABLE')
bigquery_uri = f'bigquery://{gcp_project}/{bigquery_dataset}'

config.py

In case any of this terminology is confusing, here's how you can find your project name, BigQuery Dataset Name, and BigQuery Table Name via the Bigquery console:

BigQuery Terminology

Connecting to BigQuery

Creating an SQLAlchemy engine for BigQuery should look the same as always, with the only notable difference being the credentials_path parameter now points to our gcloud credentials JSON:

from sqlalchemy.engine import create_engine
from config import bigquery_uri


engine = create_engine(bigquery_uri,
                       credentials_path='/path/to/credentials.json')
main.py

We can now hit BigQuery as we would with any SQLAlchemy engine:

from sqlalchemy.engine import create_engine
from config import (bigquery_dataset,
                    bigquery_table,
                    bigquery_uri)


engine = create_engine(bigquery_uri,
                       credentials_path='/path/to/googlecloudcredentials.json')

query = f'SELECT title, url, referrer FROM {bigquery_dataset}.{bigquery_table} \
          WHERE referrer IS NOT NULL \
          AND title IS NOT NULL \
          ORDER BY RAND () LIMIT 20;'
rows = engine.execute(query).fetchall()
main.py

The rows variable returned by our query is a typical SQLAlchemy ResultProxy, which makes it easy to pass this data to other destinations. We can preview the data we received just for fun:

...
rows = [dict(row) for row in rows]
pp = pprint.PrettyPrinter(indent=2)
pp.pprint(rows)
main.py

And here it is:

[
  {
    "referrer": "https://www.google.com/",
    "title": "The Art of Routing in Flask | Hackers and Slackers",
    "url": "https://hackersandslackers.com/the-art-of-building-flask-routes/"
  },
  {
    "referrer": "https://www.facebook.com/",
    "title": "Demystifying Flask\"s Application Factory And the Application "
    "Context",
    "url": "https://hackersandslackers.com/demystifying-flask-application-factory/?fbclid=IwAR30VBM2hFq-49FCwY9YumyBYxhhWkMsyDb5FE_8zsFngybMTFHpnirihdA"
  },
  {
    "referrer": "https://www.google.com/",
    "title": "Connect Flask to a Database with Flask-SQLAlchemy - Hackers and "

    "Slackers",
    "url": "https://hackersandslackers.com/manage-database-models-with-flask-sqlalchemy/"
  },
  {
    "referrer": "https://www.google.com/",
    "title": "Constructing Database Queries with SQLAlchemy | Hackers and "
    "Slackers",
    "url": "https://hackersandslackers.com/constructuing-database-queries-with-the-sqlalchemy-orm/"
  },
  {
    "referrer": "https://www.google.com/",
    "title": "Constructing Database Queries with SQLAlchemy | Hackers and "
    "Slackers",
    "url": "https://hackersandslackers.com/constructuing-database-queries-with-the-sqlalchemy-orm/"
  }
]
Our query results

ETL with BigQuery & SQL Databases

If pulling rows from BigQuery was all you hoped to accomplish, feel free to skip away happily. For those ambitious few who remain, I'll let you in on a secret: this is not a 5-liner "trick" tutorial. Oh, no... this tutorial is about building the groundwork for an unstoppable data machine. This is a mission to liberate tabular data to move freely from data warehouse to database, unaware of the borders that once stood between them. We're building the opposite of Brexit.

As the SQLAlchemy community grows, a new landscape begins to unfold where every data source and destination in our stack is compatible with SQLAlchemy. It might not sound astounding, but the idea of data sources sharing a single dialect is somewhat unprecedented. Forget about Spark clusters or messaging queues: we're going to standardize the way we work with every data source. We'll do this by abstracting databases via Python classes, as such that fetching rows from MySQL would use the same method as fetching from Postgres or BigQuery without any idiosyncrasies.

Classes are in Session

Our first step to data globalization is to create a Python class containing logic, which is common to all databases or data warehouses. No matter what we're working with, we're going to want ways to:

  1. Connect to our data
  2. Fetch our data
  3. Insert new data

Here's what a class like that might look like:

"""Base Data Client."""
import logging
from sqlalchemy import Table


logging.basicConfig(filename='logs/queries.log',
                    format='%(asctime)s %(message)s')
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)


class BaseClient:

    def __init__(self, engine=None, metadata=None, table=None):
        self.table = table
        self.engine = engine
        self.metadata = metadata

    def insert_rows(self, rows, replace=None):
        """Insert rows into table."""
        if replace:
            self.engine.execute(f'TRUNCATE TABLE {self.table}')
        table = Table(self.table, self.metadata, autoload=True)
        self.engine.execute(table.insert(), rows)
        return self.__construct_response(rows)

    def fetch_rows(self, query):
        """Fetch all rows via query."""
        rows = self.engine.execute(query).fetchall()
        return rows

    def __construct_response(self, rows):
        """Summarize what was accomplished by a query."""
        columns = rows[0].keys()
        column_names = ", ".join(columns)
        num_rows = len(rows)
        return f'Inserted {num_rows} rows into `{self.table}` with {len(columns)} columns: {column_names}'
client.py

By using a class like BaseClient we can avoid repeating the same patterns (like table insertions) over and over. Instead, we can create an instance of BaseClient for each of our data sources.

The obvious benefit is that repetitive chunks of logic are possible via one-liners. Replacing an existing table with new data becomes a one-liner with insert_rows(), just as fetch_rows() makes life a bit easier. The real benefit we're after isn't about just saving time with functions, though. Take a closer look and see that insert_rows() expects an input of rows, which happens to be the same data type that fetch_rows() outputs. That means we could "fetch" rows from source A and "insert" those rows into destination B in only two lines of code. Moving data in the opposite direction is just as easy since each of our sources share the same methods.

Of course, a SQL URI is different from a BigQuery URI, and demands different information. This is where we can extend BaseClient.

SQL Database Client

I'm guessing none of us expected to get hit with OOP in a post about ETL, but hey. Shit happens. Below we have a class called Database, which expects the things needed to connect to a traditional database (username, password, host, etc). For added laziness, instantiating this class will build your connection string for you depending on whether you specify a MySQL or PostgreSQL database:

"""SQL Database Client."""
from sqlalchemy.engine import create_engine
from sqlalchemy import MetaData
from src.sources.client import BaseClient


class Database(BaseClient):

    def __init__(self,
                 type,
                 username=None,
                 password=None,
                 host=None,
                 port=None,
                 table=None,
                 db_name=None):
        self.table = table
        self.type = self.__conection_type(type)
        self.engine = create_engine(f'{self.type}://{username}:{password}@{host}:{port}/{db_name}')
        super().__init__(engine=self.engine,
                         metadata=MetaData(bind=self.engine),
                         table=table)

    @staticmethod
    def __conection_type(type):
        if type.lower() == 'mysql':
            return 'mysql+pymysql'
        if type.lower() == 'postgres':
            return 'postgresql+psycopg2'
        return None
database_client.py

The point of extending BaseClient with this class is simply to create the SQLAlchemy engine that our client will use- that's really all this class does. This is how we might instantiate a MySQL client using this class:

from config.sql_config import (database_username,
                               database_password,
                               database_host,
                               database_port,
                               database_table,
                               database_name)
from .database_client import Database

dbc = Database('mysql',
               username=database_username,
               password=database_password,
               host=database_host,
               port=database_port,
               db_name=database_name,
               table=database_table)

Now we have an object named dbc representing our MySQL database, giving us access to our two methods: dbc.fetch_rows() and dbc.insert_rows()

BigQuery Client

We'll need to do something similar on the BigQuery side. Again, we extend BaseClient for the purposes of creating an engine:

"""BigQuery Client."""
from sqlalchemy.engine import create_engine
from sqlalchemy import MetaData
from src.sources.client import BaseClient


class BigQueryClient(BaseClient):

    def __init__(self, project=None, dataset=None, creds=None, table=None):
        self.uri = f'bigquery://{project}/{dataset}'
        self.engine = create_engine(self.uri, credentials_path=creds)
        super().__init__(engine=self.engine,
                         metadata=MetaData(bind=self.engine),
                         table=table)
bigquery_client.py

This time around we'll have a variable name bqc which matches the functionality of dbc (except for BigQuery):

from config.bigquery_config import (gcp_credentials,
                                    gcp_project,
                                    bigquery_dataset,
                                    bigquery_table)
from .bigquery_client import BigQueryClient

bqc = BigQueryClient(project=gcp_project,
                     dataset=bigquery_dataset,
                     table=bigquery_table,
                     creds=gcp_credentials)

Pay-per-view: dbc vs bqc

Let's get ready to rumble. In the red corner, we've got dbc: a DigitalOcean MySQL managed database. In the blue corner, bqc: my BigQuery data warehouse being filled with data generated by the second. I want to grab a subset of analytics-related data from BigQuery and dump it into my application’s MySQL database. Let’s see how that would go down.

First, here's the source:

...

fetched_rows = bqc.fetch_rows(analytics_query)
inserted_rows = dbc.insert_rows(fetched_rows, replace=True)
print(inserted_rows)
main.py

This is the query I ran:

SELECT
  REPLACE(REPLACE(REPLACE(title, ' | Hackers and Slackers', ''), ' | Hackers And Slackers', ''), '- Hackers and Slackers', '') as title,
  url,
  REPLACE(REPLACE(url, 'https://hackersandslackers.com/', ''), '/' , '') as slug,
  COUNT(title) AS views
FROM
  hackersgatsbyprod.pages
WHERE
  timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 day)
GROUP BY
  title,
  url
ORDER BY
  COUNT(title) DESC
LIMIT
  100;

And finally, here's the result that was printed to our console:

Inserted 100 rows into `hackers` with 4 columns: title, url, slug, views

As promised, we moved data from one source to another in two lines of code, where one of those sources was a data warehouse.

What can we Take From This?

We have a fairly strong proof-of-concept, which demonstrates that SQLAlchemy could take abstraction a level further, which is impressive considering how much abstraction SQLAlchemy currently handles for us out-of-the-box. That said, I’m regrettably unsure whether the code we’ve written is either:

  • The beginning of a paradigm where database “flavors” are even less significant when writing code, or
  • A bunch of bloated classes which don't add enough value to justify anybody using them.

The latter may very well be true as things currently stand, but I’m willing to bet that our hunger for database-related abstraction at the code level depends entirely on how the landscape of data changes in the near future.

Capitalism tends to consolidate things over time (businesses, products, choices), but it seems we have more database & data warehouse options than ever. The last decade has brought us Redshift, BigQuery, CockroachDB, Cassandra, Snowflake... the list goes on. The only trend more ridiculous than this abundance of choice is a trend within enterprises to incorporate all of them. I'm willing to bet your mid-sized company has a data stack which includes:

  • Multiple types of RDBs
  • A data warehouse
  • An inexplicably misused DynamoDB instance
  • A forgotten MongoDB database that some frontend guy set up years ago and is actively getting hacked into as we speak.

Making decisions is how non-contributors force themselves into office space relevance. More options mean more opportunities for individuals to make self-serving choices. In software teams, this usually translates into nonsensical architecture where each service, database, or data warehouse is a relic of an unseen political play. I don't see this changing anytime soon, so maybe we are on to something after all.

Source code for this tutorial found here:

hackersandslackers/bigquery-sqlalchemy-tutorial
:bar_chart: :arrow_right: :floppy_disk: ETL script to migrate data from BigQuery to SQL. - hackersandslackers/bigquery-sqlalchemy-tutorial