Multiprocessing of large datasets using pandas and dask

I wrote a post on multiprocessing with pandas a little over 2 years back. A lot has changed, and I have started to use dask and distributed for distributed computation using pandas. Here I will show how to implement the multiprocessing with pandas blog using dask.

For this example, I will download and use the NYC Taxi & Limousine data. In this example I will use the January 2009 Yellow tripdata file (2GB in size), and run on my laptop. Extending to multiple data files and much larger sizes is possible too.

We start by importing dask.dataframe below.

In [1]:
import dask.dataframe as dd


Any large CSV (and other format) file can be read using a pandas like read_csv command.

In [2]:
df = dd.read_csv(r"C:\temp\yellow_tripdata_2009-01.csv")


It is important to understand that unlike the pandas read_csv, the above command does not actually load the data. It does some data inference, and leaves the other aspects for later.

Using the npartitions attribute, we can see how many partitions the data will be broken in for loading. Viewing the raw df object would give you a shell of the dataframe with column and datatypes inferred. The actual data is not loaded yet.

In [3]:
df.npartitions

Out[3]:
40

You can infer the columns and datatypes.

In [4]:
df.columns

Out[4]:
Index(['vendor_name', 'Trip_Pickup_DateTime', 'Trip_Dropoff_DateTime',
'Passenger_Count', 'Trip_Distance', 'Start_Lon', 'Start_Lat',
'Rate_Code', 'store_and_forward', 'End_Lon', 'End_Lat', 'Payment_Type',
'Fare_Amt', 'surcharge', 'mta_tax', 'Tip_Amt', 'Tolls_Amt',
'Total_Amt'],
dtype='object')
In [5]:
df.dtypes

Out[5]:
vendor_name               object
Trip_Pickup_DateTime      object
Trip_Dropoff_DateTime     object
Passenger_Count            int64
Trip_Distance            float64
Start_Lon                float64
Start_Lat                float64
Rate_Code                float64
store_and_forward        float64
End_Lon                  float64
End_Lat                  float64
Payment_Type              object
Fare_Amt                 float64
surcharge                float64
mta_tax                  float64
Tip_Amt                  float64
Tolls_Amt                float64
Total_Amt                float64
dtype: object

Computing the length of the dataset can be done by using the size attribute.

In [6]:
size = df.size
size, type(size)

Out[6]:
(dd.Scalar<size-ag..., dtype=int32>, dask.dataframe.core.Scalar)

As you can see above, the size does not return a value yet. The computation is actually defferred until we compute it.

In [7]:
%%time
size.compute()

Wall time: 48 s

Out[7]:
253663434

This computation comes back with 25MM rows. This computation actually took a while. This is because when we compute size, we are not only calculating the size of the data, but we are also actually loading the dataset. Now you think that is not very efficient. There are a couple of approaches you can take:

• If you have access to a (cluster of )computers with large enough RAM, then you can load and persist the data in memory. The subsequent computations will compute in memory and will be a lot faster. This also allows you to do many computations much like using pandas but in a distributed paradigb.
• Another approach is to setup a whole bunch of deferred computations, and to compute out of core. Then dask will intelligently load data and process all the computations once by figuring out the various dependencies. This is a great approach if you don't have a lot of RAM available.

Now the way to load data in memory is by using the persist method on the df object.

In [8]:
df = df.persist()


The above persist call is non-blocking and you need to wait a bit for the data to load. Once it is loaded, you can compute the size as above.

In [9]:
%%time
df.size.compute()

Wall time: 35 ms

Out[9]:
253663434

That computed instantly. Now you can scale to much larger data sizes and compute in parallel.