Benchmarking multi core capabilities for Python using MultiProcessingBenchmark

Introduction


A lot of folks who code in Python have a big boy toy, and those in the field of Data Engineering must be using Pandas (or Dask). Unlike Scala, which can be used to build concurrent applications because of concurrency primitives and which allows for better memory management, Python natively does not support that. But, Python does come with a module multiprocessing which allows it to use all the cores of a system, and which can be useful especially if you have a machine with hefty resources. 

But it is always better to use multiProcessing? This post walks you through application of multiprocessing module on common pandas functions on a time series dataset using a library MultiProcessingBenchmark (pypi package can be found here and github link is here). You can install it by

pip install MultiProcessingBenchmark

Disclaimer: The results in the blog are from my system which has i7 quad core processor, 8 gigs of ram. Results on a different system with plentiful resources would be different. I encourage you to post the results here for comparison.

Parallellism on Simple Statistical Functions


Python achieves multiprocessing through Pool, which creates multiple processes and distributes data to be processed at each process for a particular function. For example, take a simple function, sum. For single core execution, it looks like the following.

def sum_func(df):
    df = df.sum()
    return df

This function takes a dataframe, calculates the mean, and sends it back.

For multiprocessing, it would look something like this


def sum_func_multiproc(df, func, n_cores=4):
  df_split = np.array_split(df, n_cores)
  pool = Pool(n_cores)
df_res = pd.concat(pool.map(func, df_split))
pool.close()
pool.join()
  res = df_res.sum()

In this functions, first, the dataframe is split into 4 parts. Then, a pool of processes is created which is equal to the number of cores specified, and then map function is used to map each part of the dataframe to an instance of the function, which process each function independently and concurrently. The results of all the executions are then concatenated into a single dataframe.

The resulting dataframe contains 4 individual sums of 4 individual dataframes (which derived from the master dataset). So, to compute the sum of master dataset, we have to calculate sum of sums, which is the last line of the code (and which incidentally increases the execution time).

This is how parallellism is achieved in pandas through multiprocessing.

Now, this library can be used to benchmark several statistical functions, namely sum, count, mean, rolling mean and standard deviation. You can do so using the following code.


from MultiProcessingBenchmark import EntryPoint
import multiprocessing

bench = EntryPoint.Benchmark()
n_cores = multiprocessing.cpu_count()
rows = 375000
first_df_start = '01-02-2020'
bench.SimpleStatistics(n_cores, rows, first_df_start)
Here, first_df_start is used to specify the start of a dataset, because the dataset is a time series dataset. rows is used to specify number of 

All the functions in the SimpleStatistics class look similar to the functions mentioned above.

The result looks something like this



We can see that for almost all the functions, time taken for multi-core execution is more than time taken for single-core execution. This is mostly due to the fact that there are two passes of said functions on multi-core execution, first on each individual dataset (of 4 datasets), and one on the concatenated dataset, not to mention the concat operation of all 4 datasets. 

Utility Functions 

Utility functions in the library are Sort, Search, Merge, Merge_asof, Join and concat. They are different in execution but similar in the fact that except for sort, they all take multiple arguments.

For example, the function to search for a value is 

def searching(self, df, val):
    df = df.loc[df['A'] == val]
    return df

This function takes multiple arguments, df and val.

The same function in multiprocessing is 

    def searching(self, df, func, val, n_cores):
        df_split = np.array_split(df, n_cores)
        pool = Pool(n_cores)
        df_res = pd.concat(self.pool.starmap(func, [(i, val) for i in df_split]))
        pool.close()
        pool.join()

In this function, for multiprocessing, a dataframe is split in 4 parts, each of which is assigned to an instance of the search function, the rows that match the values are returned for each process, the results are then concatenated. 

For merge, merge_asof, join and concat, two different datasets are provided, and the operation performed on them can be distributed across multiple processes, similar to how it is in search function. 

To perform the benchmark for utlity functions, the library can be used as follows.

from MultiProcessingBenchmark import EntryPoint
import multiprocessing

bench = EntryPoint.Benchmark()
n_cores = multiprocessing.cpu_count()
val = 96.50
rows = 375000
other_df_rows = 375000
first_df_start = '01-02-2020'
second_df_start = '02-15-2020'
bench.utilFunctions(val, n_cores, rows, other_df_rows, first_df_start, second_df_start)
val is used to search for a particular value, rows and other_df_rows are rows for the first and second dataset (to be used in merge, merge_asof, join, concat) and start dates are starting dates for dataframes. 

The result for the benchmark look something like this



We can see that single core execution is faster than multi-core execution, mainly due to the fact that there is an extra concat required, and my system does not have a lot of memory. If the dataset was larger and my system could handle it, multi-core execution would be faster. 

Aggregation without/ with loops

Without loops

The main area in which pandas (and multi-core execution) shines is aggregations. The aggregation function without loops looks like the following.

    def groupByAggregateNoLoop(self, df):

        group_df = df.reset_index().groupby(['index']).aggregate({'A':'mean','B':'count','C':'sum','D':'prod'}).reset_index()

The functions takes dates column (named index) and aggregates other columns based on this column. 

The same function in multiprocessing looks like

   def groupByAggregateNoLoop(self, df, func, n_cores):
        df_split = np.array_split(df, n_cores)
        pool = Pool(n_cores)
        df_res = pd.concat(self.pool.map(func, self.df_split))
        pool.close()
        pool.join()
 The dataframe is split into 4 parts, each dataframe is sent into an intance of the function, the results of the aggregation are sent back and then the results are concatenated. 

With loops

The aggregation function with loops is as following,

      def groupByAggregateLoop( df):
    group_df = pd.DataFrame()
    for chunk in np.array_split(df, 4):
        group_df = group_df.append(chunk.reset_index().groupby(['index']).aggregate(
                {'A': 'mean', 'B': 'count', 'C': 'sum', 'D': 'prod'}).reset_index(),ignore_index=True)

The dataframe is split in 4 parts, and each part is processed one after the other sequentially, in a linear fashion. Thus there are 4 loops.
The aggregation function with loops for multiprocessing is as follows.

    def groupByAggregateLoop(self, df, func, n_cores):
          df_split = np.array_split(df, n_cores)
        pool = Pool(n_cores)
        df_res = pd.concat(self.pool.map(func, self.df_split))
        pool.close()
        pool.join()

The dataframe is split in 4 parts, each of the dataset is sent to an instance of a function. There, the split dataset is again split in 4 parts, then it is aggregated sequentially. The results are then concatenated into a single dataframe, for each dataset out of 4. The results of those 4 datasets are then combined in a single dataframe. 

To perform the benchmark using the library, following code can be used. 

from MultiProcessingBenchmark import EntryPoint
import multiprocessing

bench = EntryPoint.Benchmark()
n_cores = multiprocessing.cpu_count()
rows = 375000
first_df_start = '01-02-2020'
bench.agg_without_loop(n_cores, rows, first_df_start)
bench.agg_with_loops(n_cores, rows, first_df_start)

following code can be used. The results look like the following.

Without Loops





With Loops



In this instance when i have a quad core cpu and 8 gigs of ram, the results favor single core execution. But if i had a system with say, 24 cores CPU and 64 gigs of ram, multi-core execution would be faster. The results are faster on single core cpu on my system because there are not enough cores to perform processing concurrently and also, the ram is limited so even if i had enough number of cores, there just wouldn't be enough memory to hold data for the parallel processes. 

Conclusion

From this post, it is clear that at least on my system, multi-core execution is not faster than single core execution in any case. But, i can say with certain confidence that given a good enough system multi-core execution would give single core-execution a run for it's money in case of utility functions. Also, i have already tested aggregation functions in a system with 24 cores and 64 gigs of memory, and it was at least 4 times as fast as single core execution (if i get my hands on a big boy toy i'll update the results). 
I encourage you to post your benchmarks for comparison. Hope you enjoyed this post. 

Comments