Multiprocessing of large datasets using pandas and dask
I wrote a post on multiprocessing with pandas a little over 2 years back. A lot has changed, and I have started to use dask
and distributed
for distributed computation using pandas
. Here I will show how to implement the multiprocessing with pandas blog using dask
.
For this example, I will download and use the NYC Taxi & Limousine data. In this example I will use the January 2009 Yellow tripdata file (2GB in size), and run on my laptop. Extending to multiple data files and much larger sizes is possible too.
We start by importing dask.dataframe
below.
import dask.dataframe as dd
Any large CSV (and other format) file can be read using a pandas
like read_csv
command.
df = dd.read_csv(r"C:\temp\yellow_tripdata_2009-01.csv")
It is important to understand that unlike the pandas
read_csv
, the above command does not actually load the data. It does some data inference, and leaves the other aspects for later.
Using the npartitions
attribute, we can see how many partitions the data will be broken in for loading. Viewing the raw df
object would give you a shell of the dataframe with column and datatypes inferred. The actual data is not loaded yet.
df.npartitions
40
You can infer the columns and datatypes.
df.columns
Index(['vendor_name', 'Trip_Pickup_DateTime', 'Trip_Dropoff_DateTime', 'Passenger_Count', 'Trip_Distance', 'Start_Lon', 'Start_Lat', 'Rate_Code', 'store_and_forward', 'End_Lon', 'End_Lat', 'Payment_Type', 'Fare_Amt', 'surcharge', 'mta_tax', 'Tip_Amt', 'Tolls_Amt', 'Total_Amt'], dtype='object')
df.dtypes
vendor_name object Trip_Pickup_DateTime object Trip_Dropoff_DateTime object Passenger_Count int64 Trip_Distance float64 Start_Lon float64 Start_Lat float64 Rate_Code float64 store_and_forward float64 End_Lon float64 End_Lat float64 Payment_Type object Fare_Amt float64 surcharge float64 mta_tax float64 Tip_Amt float64 Tolls_Amt float64 Total_Amt float64 dtype: object
Computing the length of the dataset can be done by using the size
attribute.
size = df.size
size, type(size)
(dd.Scalar<size-ag..., dtype=int32>, dask.dataframe.core.Scalar)
As you can see above, the size
does not return a value yet. The computation is actually defferred until we compute
it.
%%time
size.compute()
253663434
This computation comes back with 25MM rows. This computation actually took a while. This is because when we compute
size, we are not only calculating the size of the data, but we are also actually loading the dataset. Now you think that is not very efficient. There are a couple of approaches you can take:
pandas
but in a distributed paradigb. dask
will intelligently load data and process all the computations once by figuring out the various dependencies. This is a great approach if you don't have a lot of RAM available.Now the way to load data in memory is by using the persist
method on the df
object.
df = df.persist()
The above persist
call is non-blocking and you need to wait a bit for the data to load. Once it is loaded, you can compute the size as above.
%%time
df.size.compute()
253663434
That computed instantly. Now you can scale to much larger data sizes and compute in parallel.