It's almost Friday night, and the squad at H+S is ready to get cooking. Dim down the lights and slip into something more comfortable as we take you on this 100% organic flavor extravaganza. Tonight's menu? A Python MySQL library: PyMySQL.

PyMySQL is a lightweight library perfect for simple scripts. If you're looking to connect to a MySQL database and perform some simple queries, look no further. PyMySQL won't give us the sort of bells and whistles that come with libraries like SQLAlchemy, which is fine if we're looking to build a quick endpoint. A great use case for PyMySQL is for usage in AWS Lambda (or other serverless functions). We'll get to that, but for now, let's cook up something good.

Heat the Stove

Turn on the gas and prep the table to set with your favorite collection of plates! That's right; we're talking boilerplate. You knew it was coming: every time you want to do something tangibly cool, we need to get into the business of managing connections and whatnot. To ease the pain, I'll share with you a preferred method of handling opening connections with PyMySQL.

First things first, we need to create a config file. Unlike SQLAlchemy, PyMySQL doesn't support database URI strings out of the box. Because of this, we need to set variables for each part of a database connection such as username, password, etc.:

"""Config values."""
from os import environ


class Config:

    # Database config
    db_user = environ.get('DATABASE_USERNAME')
    db_password = environ.get('DATABASE_PASSWORD')
    db_host = environ.get('DATABASE_HOST')
    db_port = environ.get('DATABASE_PORT')
    db_name = environ.get('DATABASE_NAME')

We're pulling the values for each of these from a .env file: a practice I highly recommend for security purposes.

Meat and Potatoes

I'll be containing all our app's database logic in a class called Database to contain all variables and functions related to database connections:

class Database:
    """Database connection class."""

    def __init__(self, config):
        self.host = config.db_host
        self.username = config.db_user
        self.password = config.db_password
        self.port = config.db_port
        self.dbname = config.db_name
        self.conn = None
        
    ...

Initializing this class saves our database connection variables to the instance of the class, as well as creates a self.conn variable for managing connections. We create an instance of this class by passing our config object to Database:

from config import Config

db = Database(config)

With our class initialized, we're ready to get cookin'.

Set The Table

The first method we'll add to our class will be called open_connection() to manage opening connections to our database. Here we set a function to separate basic connection logic and error messaging from our app:

import sys
import pymysql
import logging


class Database:
    """Database connection class."""

    ...

    def open_connection(self):
        """Connect to MySQL Database."""
        try:
            if self.conn is None:
                self.conn = pymysql.connect(self.host,
                                            user=self.username,
                                            passwd=self.password,
                                            db=self.dbname,
                                            connect_timeout=5)
        except pymysql.MySQLError as e:
            logging.error(e)
            sys.exit()
        finally:
            logging.info('Connection opened successfully.')

We look at our class's variable self.conn to serve as our connection object. By setting conn on our class's self, we can allow all methods of this class to work against the same databases connection instead of opening & closing connections for every query.

openConnection() checks to see if self.conn already exists. If a connection doesn't exist, our function will attempt to connect to our MySQL database with the given credentials. We also have some logic in here to throw an error if something goes wrong by leveraging PyMySQL's built-in pymysql.MySQLError exception type. Get used to seeing a lot of try/except/finally statements when working with databases.

We can now share openConnection() between all the methods of our class and open connections only when needed.

The Entree

With the boring stuff out of the way, let's dig into some goodness. We'll start off with a basic use case: selecting all rows from a table:

...

class Database:
    """Database connection class."""

    ...

    def run_query(self, query):
        """Execute SQL query."""
        try:
            self.open_connection()
            with self.conn.cursor() as cur:
                records = []
                cur.execute(query)
                result = cur.fetchall()
                for row in result:
                    records.append(row)
                cur.close()
                return records
        except pymysql.MySQLError as e:
            print(e)
        finally:
            if self.conn:
                self.conn.close()
                self.conn = None
                print('Database connection closed.')

run_query() tries to open a connection using the open_connection() function we created earlier. With that connection open, we're free to run whichever queries we want.

Our function is passed a parameter called query, which represents whichever SQL query we want to run (AKA: SELECT * FROM [table]). To execute such a query, we need to open a cursor, which we do with this line:

with self.conn.cursor() as cur:
    ...

We're able to run all the queries we want while cur is open. Let's kick it up a notch.

Selecting Rows of Data

Now with cur open, we can call a few methods against cur.

  • cur.execute([QUERY]): Calling execute() on cur will run a query inside our cursor object.
  • cur.fetchall(): After running a query which results in rows, we can see all the rows returned by our query by calling fetchall() on cur. As you can see, we should only execute one query per cursor. Otherwise, we'd be writing over the results of our previous query over and over. Attempting to print the result of .fetchall() will result in a single integer, which represents the number of rows fetched.
  • cur.fetchone(): Unlike fetchall(), fetchone() only fetches the first row returned by our query. If there are instances where we know only one record should be returned, .fetchone() should be used.
  • cur.close(): When we're finally done with our query, we should close the cursor with close().

run_query() assumes that the query we're passing is a SELECT query, which is why we're saving the results to an array. If we're running an UPDATE or DELETE query, this wouldn't make much sense since nothing would be returned.

Updating Rows of Data

What if we're not selecting data, but rather modifying it? Check our how our function changes:

def run_query(self, query):
        """Execute SQL query."""
        try:
            self.open_connection()
            with self.conn.cursor() as cur:
				result = cur.execute(query)
				self.conn.commit()
				affected = f"{cur.rowcount} rows affected."
                cur.close()
                return affected
        ...

There are a few new things here:

  • conn.commit(): Running commit() actually commits the changes in our query. If you forget this, changes to your data won't actually be saved (this goes for DELETE and INSERT statements as well).
  • cur.rowcount: Returns the number of rows affected by a query which changes data.

Combining the Above

So now we have a version of run_query() which expects SELECT statements, and another which expects mutations. How could we make this function smart enough to handle either case and return the result which makes the most sense? How about checking to see if the word SELECT exists in the query before executing?:

...

class Database:
    """Database connection class."""

    ...

    def run_query(self, query):
        """Execute SQL query."""
        try:
            self.open_connection()
            with self.conn.cursor() as cur:
                if 'SELECT' in query:
                    records = []
                    cur.execute(query)
                    result = cur.fetchall()
                    for row in result:
                        records.append(row)
                    cur.close()
                    return records
                else:
                    result = cur.execute(query)
                    self.conn.commit()
                    affected = f"{cur.rowcount} rows affected."
                    cur.close()
                    return affected
        except pymysql.MySQLError as e:
            print(e)
        finally:
            if self.conn:
                self.conn.close()
                self.conn = None
                logging.info('Database connection closed.')

Now we have a function that handles both scenarios! Of course, this implementation isn't exactly foolproof: if one of your columns happens to named SELECT (or something), you might run into some problems. Don't do that, I suppose.

For Dessert

Hopefully you've found our little dinner date to be useful. If you're looking for some copy & paste source to get started with PyMySQL on our own, feel free to check out the source code for this up on Github here. If that's too much, feel free to copy + paste the below:

import sys
import pymysql
import logging


class Database:
    """Database connection class."""

    def __init__(self, config):
        self.host = config.db_host
        self.username = config.db_user
        self.password = config.db_password
        self.port = config.db_port
        self.dbname = config.db_name
        self.conn = None

    def open_connection(self):
        """Connect to MySQL Database."""
        try:
            if self.conn is None:
                self.conn = pymysql.connect(self.host,
                                            user=self.username,
                                            passwd=self.password,
                                            db=self.dbname,
                                            connect_timeout=5)
        except pymysql.MySQLError as e:
            logging.error(e)
            sys.exit()
        finally:
            logging.info('Connection opened successfully.')

    def run_query(self, query):
        """Execute SQL query."""
        try:
            self.open_connection()
            with self.conn.cursor() as cur:
                if 'SELECT' in query:
                    records = []
                    cur.execute(query)
                    result = cur.fetchall()
                    for row in result:
                        records.append(row)
                    cur.close()
                    return records
                else:
                    result = cur.execute(query)
                    self.conn.commit()
                    affected = f"{cur.rowcount} rows affected."
                    cur.close()
                    return affected
        except pymysql.MySQLError as e:
            print(e)
        finally:
            if self.conn:
                self.conn.close()
                self.conn = None
                logging.info('Database connection closed.')

We thank you all for joining us in this adventure of tantalizing treats. May your data be clean and your stomachs full.

Bon appétit.