Something we haven't done just yet on this site is walking through the humble process of creating data pipelines: the art of taking a bunch of data, changing said data, and putting it somewhere else. It's kind of a weird thing to be into, hence why the MoMA has been rejecting my submissions of Github repositories. Don't worry; I'll keep at it.

Something you don't see every day are people sharing their pipelines, which is understandable. Presumably, the other people who do this kind of stuff do it for work; nobody is happily building stupid pipelines in their free time begging to be open sourced. Except me.

We've recently revamped our projects page to include a public-facing Kanban board using GraphQL. To achieve this, we need to extract JIRA data from the JIRA Cloud REST API and place it securely in our database.

Structuring our Pipeline

An ETL pipeline which is considered 'well-structured' is in the eyes of the beholder. There are a million different ways to pull and mess with data, so there isn't a "template" for building these things out. In my case, the structure of my script just so happened to end up as three modules: one for extracting, one for loading, and one for transforming. This was unplanned, but it's a good sign when our app matches our mindset. Here's the breakdown:

jira-database-etl
├── __main__.py
├── jira_etl
│   ├── __init__.py
│   ├── fetch.py
│   ├── data.py
│   └── db.py
├── LICENSE
├── MANIFEST.in
├── Pipfile
├── Pipfile.lock
├── README.md
├── requirements.txt
├── setup.cfg
└── setup.py

main.py is our application entry point. The logic of our pipeline is stored in three parts under the jira_etl directory:

  • fetch.py grabs the data from the source (JIRA Cloud's REST API) and handles fetching all JIRA issues.
  • data.py transforms the data we've fetched and constructs a neat DataFrame containing only the information we're after.
  • db.py finally loads the data into a SQL database.

Don't look into it too much, but here's our entry point:

from jira_etl import fetch
from jira_etl import data
from jira_etl import db


def main():
    """Application Entry Point.

    1. Fetch all desired JIRA issues from an instance's REST API.
    2. Sanitize the data and add secondary metadata.
    3. Upload resulting DataFrame to database.
    """
    jira_issues_json = fetch.FetchJiraIssues.fetch_all_results()
    jira_issues_df = data.TransformData.construct_dataframe(jira_issues_json)
    upload_status = db.DatabaseImport.upload_dataframe(jira_issues_df)
    return upload_status

Without further adieu, let's dig into the logic!

Extracting Our Data

Before doing anything, it's essential we become familiar with the data we're about to pull. Firstly, JIRA's REST API returns paginated results which max out at 100 results per page. This means we'll have to loop through the pages recursively until all results are loaded.

Next, let's look at an example of a single JIRA issue JSON object returned by the API:

{
    "expand": "names,schema",
    "startAt": 0,
    "maxResults": 1,
    "total": 888,
    "issues": [
        {
            "expand": "operations,versionedRepresentations,editmeta,changelog,renderedFields",
            "id": "11718",
            "self": "https://hackersandslackers.atlassian.net/rest/api/3/issue/11718",
            "key": "HACK-756",
            "fields": {
                "issuetype": {
                    "self": "https://hackersandslackers.atlassian.net/rest/api/3/issuetype/10014",
                    "id": "10014",
                    "description": "Placeholder item for \"holy shit this is going to be a lot of work\"",
                    "iconUrl": "https://hackersandslackers.atlassian.net/secure/viewavatar?size=xsmall&avatarId=10311&avatarType=issuetype",
                    "name": "Major Functionality",
                    "subtask": false,
                    "avatarId": 10311
                },
                "customfield_10070": null,
                "customfield_10071": null,
                "customfield_10073": null,
                "customfield_10074": null,
                "customfield_10075": null,
                "project": {
                    "self": "https://hackersandslackers.atlassian.net/rest/api/3/project/10015",
                    "id": "10015",
                    "key": "HACK",
                    "name": "Hackers and Slackers",
                    "projectTypeKey": "software",
                    "avatarUrls": {
                        "48x48": "https://hackersandslackers.atlassian.net/secure/projectavatar?pid=10015&avatarId=10535",
                        "24x24": "https://hackersandslackers.atlassian.net/secure/projectavatar?size=small&pid=10015&avatarId=10535",
                        "16x16": "https://hackersandslackers.atlassian.net/secure/projectavatar?size=xsmall&pid=10015&avatarId=10535",
                        "32x32": "https://hackersandslackers.atlassian.net/secure/projectavatar?size=medium&pid=10015&avatarId=10535"
                    }
                },
                "fixVersions": [],
                "resolution": null,
                "resolutiondate": null,
                "workratio": -1,
                "lastViewed": "2019-03-24T02:01:31.355-0400",
                "watches": {
                    "self": "https://hackersandslackers.atlassian.net/rest/api/3/issue/HACK-756/watchers",
                    "watchCount": 1,
                    "isWatching": true
                },
                "created": "2019-02-03T00:47:36.677-0500",
                "customfield_10062": null,
                "customfield_10063": null,
                "customfield_10064": null,
                "customfield_10065": null,
                "customfield_10066": null,
                "priority": {
                    "self": "https://hackersandslackers.atlassian.net/rest/api/3/priority/2",
                    "iconUrl": "https://hackersandslackers.atlassian.net/images/icons/priorities/high.svg",
                    "name": "High",
                    "id": "2"
                },
                "customfield_10067": null,
                "customfield_10068": null,
                "customfield_10069": [],
                "labels": [],
                "versions": [],
                "issuelinks": [],
                "assignee": {
                    "self": "https://hackersandslackers.atlassian.net/rest/api/3/user?accountId=557058%3A713aac6d-44ef-416d-9a1d-3e524a5c4dc8",
                    "name": "bro",
                    "key": "admin",
                    "accountId": "557058:713aac6d-44ef-416d-9a1d-3e524a5c4dc8",
                    "emailAddress": "[email protected]",
                    "avatarUrls": {
                        "48x48": "https://avatar-cdn.atlassian.com/9eb3868db428fb602e03b3059608199b?s=48&d=https%3A%2F%2Fsecure.gravatar.com%2Favatar%2F9eb3868db428fb602e03b3059608199b%3Fd%3Dmm%26s%3D48%26noRedirect%3Dtrue",
                        "24x24": "https://avatar-cdn.atlassian.com/9eb3868db428fb602e03b3059608199b?s=24&d=https%3A%2F%2Fsecure.gravatar.com%2Favatar%2F9eb3868db428fb602e03b3059608199b%3Fd%3Dmm%26s%3D24%26noRedirect%3Dtrue",
                        "16x16": "https://avatar-cdn.atlassian.com/9eb3868db428fb602e03b3059608199b?s=16&d=https%3A%2F%2Fsecure.gravatar.com%2Favatar%2F9eb3868db428fb602e03b3059608199b%3Fd%3Dmm%26s%3D16%26noRedirect%3Dtrue",
                        "32x32": "https://avatar-cdn.atlassian.com/9eb3868db428fb602e03b3059608199b?s=32&d=https%3A%2F%2Fsecure.gravatar.com%2Favatar%2F9eb3868db428fb602e03b3059608199b%3Fd%3Dmm%26s%3D32%26noRedirect%3Dtrue"
                    },
                    "displayName": "Todd Birchard",
                    "active": true,
                    "timeZone": "America/New_York",
                    "accountType": "atlassian"
                },
                "updated": "2019-03-24T02:01:30.724-0400",
                "status": {
                    "self": "https://hackersandslackers.atlassian.net/rest/api/3/status/10004",
                    "description": "",
                    "iconUrl": "https://hackersandslackers.atlassian.net/",
                    "name": "To Do",
                    "id": "10004",
                    "statusCategory": {
                        "self": "https://hackersandslackers.atlassian.net/rest/api/3/statuscategory/2",
                        "id": 2,
                        "key": "new",
                        "colorName": "blue-gray",
                        "name": "To Do"
                    }
                },
                "components": [],
                "description": {
                    "version": 1,
                    "type": "doc",
                    "content": [
                        {
                            "type": "paragraph",
                            "content": [
                                {
                                    "type": "text",
                                    "text": "https://mailchimp.com/help/share-your-blog-posts-with-mailchimp/",
                                    "marks": [
                                        {
                                            "type": "link",
                                            "attrs": {
                                                "href": "https://mailchimp.com/help/share-your-blog-posts-with-mailchimp/"
                                            }
                                        }
                                    ]
                                }
                            ]
                        }
                    ]
                },
                "customfield_10010": null,
                "customfield_10011": "0|i0064j:i",
                "customfield_10012": null,
                "customfield_10013": null,
                "security": null,
                "customfield_10008": "HACK-143",
                "customfield_10009": {
                    "hasEpicLinkFieldDependency": false,
                    "showField": false,
                    "nonEditableReason": {
                        "reason": "PLUGIN_LICENSE_ERROR",
                        "message": "Portfolio for Jira must be licensed for the Parent Link to be available."
                    }
                },
                "summary": "Automate newsletter",
                "creator": {
                    "self": "https://hackersandslackers.atlassian.net/rest/api/3/user?accountId=557058%3A713aac6d-44ef-416d-9a1d-3e524a5c4dc8",
                    "name": "bro",
                    "key": "admin",
                    "accountId": "557058:713aac6d-44ef-416d-9a1d-3e524a5c4dc8",
                    "emailAddress": "[email protected]",
                    "avatarUrls": {
                        "48x48": "https://avatar-cdn.atlassian.com/9eb3868db428fb602e03b3059608199b?s=48&d=https%3A%2F%2Fsecure.gravatar.com%2Favatar%2F9eb3868db428fb602e03b3059608199b%3Fd%3Dmm%26s%3D48%26noRedirect%3Dtrue",
                        "24x24": "https://avatar-cdn.atlassian.com/9eb3868db428fb602e03b3059608199b?s=24&d=https%3A%2F%2Fsecure.gravatar.com%2Favatar%2F9eb3868db428fb602e03b3059608199b%3Fd%3Dmm%26s%3D24%26noRedirect%3Dtrue",
                        "16x16": "https://avatar-cdn.atlassian.com/9eb3868db428fb602e03b3059608199b?s=16&d=https%3A%2F%2Fsecure.gravatar.com%2Favatar%2F9eb3868db428fb602e03b3059608199b%3Fd%3Dmm%26s%3D16%26noRedirect%3Dtrue",
                        "32x32": "https://avatar-cdn.atlassian.com/9eb3868db428fb602e03b3059608199b?s=32&d=https%3A%2F%2Fsecure.gravatar.com%2Favatar%2F9eb3868db428fb602e03b3059608199b%3Fd%3Dmm%26s%3D32%26noRedirect%3Dtrue"
                    },
                    "displayName": "Todd Birchard",
                    "active": true,
                    "timeZone": "America/New_York",
                    "accountType": "atlassian"
                },
                "subtasks": [],
                "reporter": {
                    "self": "https://hackersandslackers.atlassian.net/rest/api/3/user?accountId=557058%3A713aac6d-44ef-416d-9a1d-3e524a5c4dc8",
                    "name": "bro",
                    "key": "admin",
                    "accountId": "557058:713aac6d-44ef-416d-9a1d-3e524a5c4dc8",
                    "emailAddress": "[email protected]",
                    "avatarUrls": {
                        "48x48": "https://avatar-cdn.atlassian.com/9eb3868db428fb602e03b3059608199b?s=48&d=https%3A%2F%2Fsecure.gravatar.com%2Favatar%2F9eb3868db428fb602e03b3059608199b%3Fd%3Dmm%26s%3D48%26noRedirect%3Dtrue",
                        "24x24": "https://avatar-cdn.atlassian.com/9eb3868db428fb602e03b3059608199b?s=24&d=https%3A%2F%2Fsecure.gravatar.com%2Favatar%2F9eb3868db428fb602e03b3059608199b%3Fd%3Dmm%26s%3D24%26noRedirect%3Dtrue",
                        "16x16": "https://avatar-cdn.atlassian.com/9eb3868db428fb602e03b3059608199b?s=16&d=https%3A%2F%2Fsecure.gravatar.com%2Favatar%2F9eb3868db428fb602e03b3059608199b%3Fd%3Dmm%26s%3D16%26noRedirect%3Dtrue",
                        "32x32": "https://avatar-cdn.atlassian.com/9eb3868db428fb602e03b3059608199b?s=32&d=https%3A%2F%2Fsecure.gravatar.com%2Favatar%2F9eb3868db428fb602e03b3059608199b%3Fd%3Dmm%26s%3D32%26noRedirect%3Dtrue"
                    },
                    "displayName": "Todd Birchard",
                    "active": true,
                    "timeZone": "America/New_York",
                    "accountType": "atlassian"
                },
                "customfield_10000": "{}",
                "customfield_10001": null,
                "customfield_10004": null,
                "environment": null,
                "duedate": null,
                "votes": {
                    "self": "https://hackersandslackers.atlassian.net/rest/api/3/issue/HACK-756/votes",
                    "votes": 0,
                    "hasVoted": false
                }
            }
        }
    ]
}

Whoa, mama! That's a ton of BS for a single issue. You can see now why we'd want to transform this data before importing ten million fields into any database. Make note of these important fields:

  • startAt: An integer which tells us which issue number the paginated results start at.
  • maxResults: Denotes the maximum number of results page - maxes out at 100 issues.
  • total: The total number of issues across all pages.
  • issues: A list of objects which contain the information for exactly one JIRA issue per object

Great. So the purpose of fetch.py will essentially consist of creating a list of all 888 issues (in my case), and passing that off for transformation. Check it the source I came up with:

import os
import math
import requests


class FetchJiraIssues:
    """Fetch all public-facing issues from JIRA instance.

    1. Retrieve all values from env vars.
    2. Construct request against JIRA REST API.
    3. Fetch paginated issues via recursion.
    4. Pass final JSON to be transformed into a DataFrame.
     """
    results_per_page = 100
    username = os.environ.get('JIRA_USERNAME')
    password = os.environ.get('JIRA_PASSWORD')
    endpoint = os.environ.get('JIRA_ENDPOINT')
    jql = os.environ.get('JIRA_QUERY')
    headers = {
        "Accept": "application/json"
    }

    @classmethod
    def get_total_number_of_issues(cls):
        """Gets the total number of results."""
        params = {
            "jql": cls.jql,
            "maxResults": 0,
            "startAt": 0
        }
        req = requests.get(cls.endpoint,
                           headers=cls.headers,
                           params=params,
                           auth=(cls.username, cls.password)
                           )
        response = req.json()
        try:
            total_results = response['total']
            return total_results
        except KeyError:
            print('Could not find any issues!')

    @classmethod
    def fetch_all_results(cls):
        """Recursively retrieve all pages of JIRA issues."""
        total_results = cls.get_total_number_of_issues()
        issue_arr = []

        def fetch_single_page():
            """Fetch one page of results, and determine if another page exists."""
            params = {
                "jql": cls.jql,
                "maxResults": cls.results_per_page,
                "startAt": len(issue_arr)
            }
            req = requests.get(cls.endpoint,
                               headers=cls.headers,
                               params=params,
                               auth=(cls.username, cls.password)
                               )
            response = req.json()
            issues = response['issues']
            issues_so_far = len(issue_arr) + cls.results_per_page
            print(issues_so_far, ' out of', total_results)
            issue_arr.extend(issues)
            # Check if additional pages of results exist.
        count = math.ceil(total_results/cls.results_per_page)
        for x in range(0, count):
            fetch_single_page()
        return issue_arr

Yep, I'm using classes. This class has two methods:

  • get_total_number_of_issues: All this does is essentially pull the number of issues (888) from the REST API. We'll use this number in our next function to check if additional pages exist.
  • fetch_all_results: This is where things start getting fun. fetch_all_results is a @classmethod which contains a function within itself. fetch_all_results gets the total number of JIRA issues and then calls upon child function fetch_single_page to pull JIRA issue JSON objects and dump them into a list called issue_arr until all issues are accounted for.

Because we have 888 issues and can retrieve 100 issues at a time, our function  fetch_single_page should run 9 times. And it does!

Transforming Our Data

So now we have a list of 888 messy JIRA issues. The scope of data.py should be to pull out only the data we want, and make sure that data is clean:

import os
import json
from pandas.io.json import json_normalize
from datetime import datetime


class TransformData:
    """Build JIRA issue DataFrame.

    1. Loop through JIRA issues and create a dictionary of desired data.
    2. Convert each issue dictionary into a JSON object.
    3. Load all issues into a Pandas DataFrame.
    """

    issue_count = 0

    @classmethod
    def construct_dataframe(cls, issue_list_chunk):
        """Make DataFrame out of data received from JIRA API."""
        issue_list = [cls.make_issue_body(issue) for issue in issue_list_chunk]
        issue_json_list = [cls.dict_to_json_string(issue) for issue in issue_list]
        jira_issues_df = json_normalize(issue_json_list)
        return jira_issues_df

    @staticmethod
    def dict_to_json_string(issue_dict):
        """Convert dict to JSON to string."""
        issue_json_string = json.dumps(issue_dict)
        issue_json = json.loads(issue_json_string)
        return issue_json

    @classmethod
    def make_issue_body(cls, issue):
        """Create a JSON body for each ticket."""
        updated_date = datetime.strptime(issue['fields']['updated'], "%Y-%m-%dT%H:%M:%S.%f%z")
        body = {
            'id': str(cls.issue_count),
            'key': str(issue['key']),
            'assignee_name': str(issue['fields']['assignee']['displayName']),
            'assignee_url': str(issue['fields']['assignee']['avatarUrls']['48x48']),
            'summary': str(issue['fields']['summary']),
            'status': str(issue['fields']['status']['name']),
            'priority_url': str(issue['fields']['priority']['iconUrl']),
            'priority_rank': int(issue['fields']['priority']['id']),
            'issuetype_name': str(issue['fields']['issuetype']['name']),
            'issuetype_icon': str(issue['fields']['issuetype']['iconUrl']),
            'epic_link': str(issue['fields']['customfield_10008']),
            'project': str(issue['fields']['project']['name']),
            'updated': int(datetime.timestamp(updated_date)),
            'updatedAt': str(updated_date)
        }
        cls.issue_count += 1
        return body

Again, let's see the methods at work:

  • construct_dataframe: The main function we invoke to build our DataFrame (mostly just calls other methods). Once all transformations are completed, creates a DataFrame called jira_df by using the Pandas json_normalize() method.
  • make_issue_body: Creates a new dictionary per singular JIRA issue. Extracts only the fields we want to be imported into our database. Converts each field into either a string or an int as a lazy way of avoiding null values (for example, if issue['fields']['priority']['name'] contained a null value, the script would error out. Wrapping this in str() is a dirty way of converting null to an empty string).
  • dict_to_json_string Takes each issue dictionary and converts it to a JSON object, which is then turned into a string (this is done for Pandas).

Loading Our Data

And now for the final step! Thanks to the joyful marriage of Pandas and SQLAlchemy, turning DataFrames into SQL tables is super simple. We never make things simple, though.

import os
import logging
from sqlalchemy import create_engine, text, MetaData
from sqlalchemy.types import Integer, Text, TIMESTAMP, String
import pandas as pd

logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)


class DatabaseImport:
    """Merge Epic metadata and upload JIRA issues.

    1. Merge Epic metadata by fetching an existing table.
    2. Explicitly set data types for all columns found in jira_issues_df.
    2. Create a new table from the final jira_issues_df.
    """

    URI = os.environ.get('SQLALCHEMY_DATABASE_URI')
    db_epic_table = os.environ.get('SQLALCHEMY_EPIC_TABLE')
    db_jira_table = os.environ.get('SQLALCHEMY_JIRA_TABLE')
    db_schema = os.environ.get('SQLALCHEMY_DB_SCHEMA')

    # Create Engine
    meta = MetaData(schema="hackers$prod")
    engine = create_engine(URI, echo=True)

    @staticmethod
    def truncate_table(engine):
        """Clear table of data."""
        sql = text('TRUNCATE TABLE "hackers$prod"."JiraIssue"')
        engine.execute(sql)

    @classmethod
    def merge_epic_metadata(cls, jira_issues_df):
        """Merge epic metadata from existing SQL table."""
        cls.truncate_table(cls.engine)
        epics_df = pd.read_sql_table(cls.db_epic_table,
                                     cls.engine,
                                     schema=cls.db_schema)
        jira_issues_df = pd.merge(jira_issues_df,
                                  epics_df[['epic_link', 'epic_name', 'epic_color']],
                                  how='left',
                                  on='epic_link',
                                  copy=False)
        return jira_issues_df

    @classmethod
    def upload_dataframe(cls, jira_issues_df):
        """Upload JIRA DataFrame to PostgreSQL database."""
        jira_issues_df = cls.merge_epic_metadata(jira_issues_df)
        jira_issues_df.to_sql(cls.db_jira_table,
                              cls.engine,
                              if_exists='append',
                              schema=cls.db_schema,
                              index=False,
                              dtype={"assignee": String(30),
                                     "assignee_url": Text,
                                     "epic_link": String(50),
                                     "issuetype_name": String(50),
                                     "issuetype_icon": Text,
                                     "key": String(10),
                                     "priority_name": String(30),
                                     "priority_rank": Integer,
                                     "priority_url": Text,
                                     "project": String(50),
                                     "status": String(30),
                                     "summary": Text,
                                     "updated": Integer,
                                     "updatedAt": TIMESTAMP,
                                     "createdAt": TIMESTAMP,
                                     "epic_color": String(20),
                                     "epic_name": String(50)
                                     })
        success_message = 'Successfully uploaded' \
                          + str(len(jira_issues_df.index)) \
                          + ' rows to ' + cls.db_jira_table
        return success_message
  • merge_epic_metadata: Due to the nature of the JIRA REST API, some metadata is missing per issue. If you're interested, the data missing revolves around Epics: JIRA's REST API does not include the Epic Name or Epic Color fields of linked epics.
  • upload_dataframe: Uses Panda's to_sql() method to upload our DataFrame into a SQL table (our target happens to be PostgreSQL, so we pass schema here). To make things explicit, we set the data type of every column on upload.

Well, let's see how we made out!

A look at our resulting database table.

Whoaaa nelly, we did it! With our data clean, we can now build something useful! Here's what I built:

Fruits of our labor!

There we have it: a pipeline that takes a bunch of messy data, cleans it, and puts it somewhere else for proper use.

If you're interested in how we created the frontend for our Kanban board, check out our series on building features with GraphQL. For the source code, check out the Github repository.