Multiprocessing of large datasets using pandas and dask

I wrote a post on multiprocessing with pandas a little over 2 years back. A lot has changed, and I have started to use `dask`

and `distributed`

for distributed computation using `pandas`

. Here I will show how to implement the multiprocessing with pandas blog using `dask`

.

For this example, I will download and use the NYC Taxi & Limousine data. In this example I will use the January 2009 Yellow tripdata file (2GB in size), and run on my laptop. Extending to multiple data files and much larger sizes is possible too.

We start by importing `dask.dataframe`

below.

In [1]:

```
import dask.dataframe as dd
```

Any large CSV (and other format) file can be read using a `pandas`

like `read_csv`

command.

In [2]:

```
df = dd.read_csv(r"C:\temp\yellow_tripdata_2009-01.csv")
```

It is important to understand that unlike the `pandas`

`read_csv`

, the above command does not actually load the data. It does some data inference, and leaves the other aspects for later.

Using the `npartitions`

attribute, we can see how many partitions the data will be broken in for loading. Viewing the raw `df`

object would give you a shell of the dataframe with column and datatypes inferred. The actual data is not loaded yet.

In [3]:

```
df.npartitions
```

Out[3]:

40

You can infer the columns and datatypes.

In [4]:

```
df.columns
```

Out[4]:

Index(['vendor_name', 'Trip_Pickup_DateTime', 'Trip_Dropoff_DateTime', 'Passenger_Count', 'Trip_Distance', 'Start_Lon', 'Start_Lat', 'Rate_Code', 'store_and_forward', 'End_Lon', 'End_Lat', 'Payment_Type', 'Fare_Amt', 'surcharge', 'mta_tax', 'Tip_Amt', 'Tolls_Amt', 'Total_Amt'], dtype='object')

In [5]:

```
df.dtypes
```

Out[5]:

vendor_name object Trip_Pickup_DateTime object Trip_Dropoff_DateTime object Passenger_Count int64 Trip_Distance float64 Start_Lon float64 Start_Lat float64 Rate_Code float64 store_and_forward float64 End_Lon float64 End_Lat float64 Payment_Type object Fare_Amt float64 surcharge float64 mta_tax float64 Tip_Amt float64 Tolls_Amt float64 Total_Amt float64 dtype: object

Computing the length of the dataset can be done by using the `size`

attribute.

In [6]:

```
size = df.size
size, type(size)
```

Out[6]:

(dd.Scalar<size-ag..., dtype=int32>, dask.dataframe.core.Scalar)

`size`

does not return a value yet. The computation is actually defferred until we `compute`

it.

In [7]:

```
%%time
size.compute()
```

Out[7]:

253663434

This computation comes back with 25MM rows. This computation actually took a while. This is because when we `compute`

size, we are not only calculating the size of the data, but we are also actually loading the dataset. Now you think that is not very efficient. There are a couple of approaches you can take:

- If you have access to a (cluster of )computers with large enough RAM, then you can load and persist the data in memory. The subsequent computations will compute in memory and will be a lot faster. This also allows you to do many computations much like using
`pandas`

but in a distributed paradigb. - Another approach is to setup a whole bunch of deferred computations, and to compute out of core. Then
`dask`

will intelligently load data and process all the computations once by figuring out the various dependencies. This is a great approach if you don't have a lot of RAM available.

Now the way to load data in memory is by using the `persist`

method on the `df`

object.

In [8]:

```
df = df.persist()
```

`persist`

call is non-blocking and you need to wait a bit for the data to load. Once it is loaded, you can compute the size as above.

In [9]:

```
%%time
df.size.compute()
```

Out[9]:

253663434

That computed instantly. Now you can scale to much larger data sizes and compute in parallel.

python programming development pandas dask

- Numpy Vs Pandas Performance Comparison
- Optimizing Python Code: Numba vs Cython
- Multi-Processing With Pandas
- Running ZEO as a Windows Service
- QuantLib Python Notebooks On Docker