Problem
In my last job I was tasked with recreating the data models which were coming out of a few social & subscription data API's (YouTube is a good example), and was frankly hugely disappointed with the performance of Python when downloading these files using requests
and pd.to_csv()
to create the files.
YouTube data was downloaded in 10-15 minutes when processing 450MB's of data. As the process was pretty lightweight in production this wasn't any real problem, but in test, needing to repeatedly download the files, became infuriating.
This is a very common problem with APIs and Python. I have looked into the externally available solutions, including Dask, Ray, Modin, and some more task-specific options like AsyncIO.
Each of these had some pretty frustrating issues: requiring pretty big rewrites, not supporting certain environments, and having huge dependency lists & overhead.
Solution
A golden rule in multithreading/multiprocessing is that you really shouldn't implement it unless you absolutely need to because of the number of headaches it can cause for a tiny bit of extra performance in some functions. All of the above solutions confirmed this, with some test implementations failing for unknown reasons and swallowing my NAS's CPU power for an entire night.
The problem I was having really didn't seem to merit such extreme solutions, so I sought to simplify, I looked back into the concurrency options provide by the Python built-in Concurrent and Futures libraries, and have been hugely impressed.
While the actual concurrency engine will never perform as well as the externals I mentioned earlier the simplicity of implementation is pretty much unbeatable and seemed to bring day-to-day concurrency back into the realm of normality in Python.
After reading up on the built-in classes I implemented the below in 30 minutes over a lunch break, (and have now implemented it in another 10 API's just to be sure) and would like to share/discuss the code and possible improvements and next steps.
Basically, it's a static utility function that accepts an instantiated class (as an object), a function to use to process data (as a string), and then a list of the data which you would like to parse. I named the function fast_parse()
, as it should theoretically work with absolutely any type of function you would like to process data with.
I added **kwargs
as an optional param (in case there are any which are passed through all functions), and a number of workers so that when deploying across environments we are able to configure the functions to operate depending on the server.
I'm really surprised there aren't more concurrency functions that have been structured in this way, after implementing this the run times of the downloads went from 10-15 minutes to consistently 2-3 minutes, the only change which was required in the processing class was the import and then:
processed_data = get_youtube_data(unprocessed_data)
Becomes...
processed_data=utils.fast_parse(
self,
"get_youtube_data",
unprocessed_data
)
(Loguru is the one external library included, if you don't use it already I suggest you do (especially for multithreading), there is a great wrapper from them that allows catching the errors across all the threads while also only needing to change a single line of code).
TL;DR
Below is a function that you can copy-paste as is and it should make your functions faster. See example usage above :P
from concurrent.futures import ThreadPoolExecutor, as_completed
import pandas as pd
from loguru import logger
def fast_parse(python_class, parse_function, data_to_parse, number_of_workers=4, **kwargs):
"""
Util function to split any data set to the number of workers,
Then return results using any give parsing function.
Note that when using dicts the Index of the Key will be passed to the function
Object too, so that needs to be handled.
:param python_class: Instantiated class object which contains the parse function.
:param parse_function: Function to parse data, can either be list or dict.
:param data_to_parse: Data to be parsed.
:param number_of_workers: Number of workers to split the parsing to.
:param kwargs: Optional, extra params which parse function may need.
"""
try:
function_object = getattr(python_class, parse_function)
except AttributeError as e:
logger.error(f"{python_class} doesn't have {parse_function}")
return
else:
results = []
with ThreadPoolExecutor(max_workers=number_of_workers) as executor:
if type(data_to_parse) == list:
future_to_result = {executor.submit(
function_object, data, **kwargs): data for data in data_to_parse}
elif type(data_to_parse) == dict or type(data_to_parse) == pd.Series:
for index, data in data_to_parse.items():
future_to_result = {executor.submit(
function_object, index, data, **kwargs)}
else:
logger.error("Unsupported data type")
return
for future in as_completed(future_to_result):
try:
data = future.result()
except Exception as exc:
logger.error(
f"{future_to_result[future]} generated an exception: {exc}")
finally:
results.append(data)
return results