Problem

In my last job I was tasked with recreating the data models which were coming out of a few social & subscription data API's (YouTube being a good example), and was frankly hugely disappointed with the performance of Python when downloading these files using requests and pd.to_csv() to create the files.

YouTube data was downloading in 10-15 minutes when processing 450MB's of data. As the process was pretty lightweight in production this wasn't any real problem, but in test, needing to repeatedly downloading the files, this became infuriating.


This is a very common problem with APIs and Python. I have looked into the externally available solutions, including Dask, Ray, Modin, and some more task specific options like AsyncIO.

Each of these had some pretty frustrating issues: requiring pretty big rewrites, not supporting certain environments and having huge dependency lists & overhead.

Solution

A golden rule in multithreading/multiprocessing is that you really shouldn't implement it unless you absolutely need to because of the amount of headaches it can cause for a tiny bit of extra performance in some functions. All of the above solutions confirmed this, with some test implementations failing for unknown reasons and swallowing my NAS's CPU power for an entire night.

The problem I was having really didn't seem to merit such extreme solutions, so I sought to simplify, I looked back into the concurrency options provide by the Python built-in Concurrent and Futures libraries, and have been hugely impressed.

While the actual concurrency engine will never perform as well as the externals I mentioned earlier the simplicity of implementation is pretty much unbeatable, and seemed to bring day to day concurrency back into the realm of normality in Python.

After reading up on the built in classes I implemented the below in 30 minutes over a lunch break, (and have now implemented it in another 10 API's just to be sure) and would like to share/discuss the code and possible improvements and next steps.

Basically, it's a static utility function which accepts an instantiated class (as an object), and a function to use to process data (as a string), and then a list of the data which you would like to parse. I named the function fast_parse(), as it should theoretically work with absolutely any type of function which you would like to process data with.

I added **kwargs as an optional param (incase there are any which are passed through all functions), and a number of workers so that when deploying across environments we are able to configure the functions to operate depending on the server.

I'm really surprised there aren't more concurrency functions which have been structured in this way, after implementing this the run times of the downloads went from 10-15 minutes to consistently 2-3 minutes, the only change which was required  in the processing class was the import and then:

processed_data = get_youtube_data(unprocessed_data)

Becomes...

processed_data=utils.fast_parse(self, "get_youtube_data", unprocessed_data)

(Loguru is the one external library included, if you don't use it already I suggest you do (especially for multithreading), there is a great wrapper from them which allows catching the errors across all the threads while also only needing to change line of code).  


TL;DR

Below is a function which you can copy paste as is and it should make your functions faster. See example usage above :P

import concurrent.futures
import pandas as pd
from loguru import logger


def fast_parse(python_class, parse_function, data_to_parse, number_of_workers=4, **kwargs):
    """
    Util function to split any data set to the number of workers,
    Then return results using any give parsing function.

    Note that when using dicts the Index of the Key will be passed to the function
    Object too, so that needs to be handled.
        :param python_class: Instantiated class object which contains the parse function.
        :param parse_function: Function to parse data, can either be list or dict.
        :param data_to_parse: Data to be parsed.
        :param number_of_workers: Number of workers to split the parsing to.
        :param kwargs: Optional, extra params which parse function may need.
        :return:
    """
    try:
        function_object = getattr(python_class, parse_function)
    except AttributeError as e:
        logger.error(f"{python_class} doesn't have {parse_function}")
        return
    else:
        results = []
        with concurrent.futures.ThreadPoolExecutor(max_workers=number_of_workers) as executor:
            if type(data_to_parse) == list:
                future_to_result = {executor.submit(
                    function_object, data, **kwargs): data for data in data_to_parse}
            elif type(data_to_parse) == dict or type(data_to_parse) == pd.Series:
                for index, data in data_to_parse.items():
                    future_to_result = {executor.submit(
                        function_object, index, data, **kwargs)}
            else:
                logger.error("Unsupported data type")
                return
            for future in concurrent.futures.as_completed(future_to_result):
                try:
                    data = future.result()
                except Exception as exc:
                    logger.error(
                        f"{future_to_result[future]} generated an exception: {exc}")
                finally:
                    results.append(data)
            return results
paul-armstrong-dev/faster-parser
Quick and dirty API speedup for lists of data. Contribute to paul-armstrong-dev/faster-parser development by creating an account on GitHub.