Multiprocessing of large datasets using pandas and dask
I wrote a post on multiprocessing with pandas a little over 2 years back. A lot has changed, and I have started to use
distributed for distributed computation using
pandas. Here I will show how to implement the multiprocessing with pandas blog using
For this example, I will download and use the NYC Taxi & Limousine data. In this example I will use the January 2009 Yellow tripdata file (2GB in size), and run on my laptop. Extending to multiple data files and much larger sizes is possible too.
We start by importing
import dask.dataframe as dd
Any large CSV (and other format) file can be read using a
df = dd.read_csv(r"C:\temp\yellow_tripdata_2009-01.csv")
It is important to understand that unlike the
read_csv, the above command does not actually load the data. It does some data inference, and leaves the other aspects for later.
npartitions attribute, we can see how many partitions the data will be broken in for loading. Viewing the raw
df object would give you a shell of the dataframe with column and datatypes inferred. The actual data is not loaded yet.
You can infer the columns and datatypes.
Index(['vendor_name', 'Trip_Pickup_DateTime', 'Trip_Dropoff_DateTime', 'Passenger_Count', 'Trip_Distance', 'Start_Lon', 'Start_Lat', 'Rate_Code', 'store_and_forward', 'End_Lon', 'End_Lat', 'Payment_Type', 'Fare_Amt', 'surcharge', 'mta_tax', 'Tip_Amt', 'Tolls_Amt', 'Total_Amt'], dtype='object')
vendor_name object Trip_Pickup_DateTime object Trip_Dropoff_DateTime object Passenger_Count int64 Trip_Distance float64 Start_Lon float64 Start_Lat float64 Rate_Code float64 store_and_forward float64 End_Lon float64 End_Lat float64 Payment_Type object Fare_Amt float64 surcharge float64 mta_tax float64 Tip_Amt float64 Tolls_Amt float64 Total_Amt float64 dtype: object
Computing the length of the dataset can be done by using the
size = df.size size, type(size)
(dd.Scalar<size-ag..., dtype=int32>, dask.dataframe.core.Scalar)
As you can see above, the
size does not return a value yet. The computation is actually defferred until we
Wall time: 48 s
This computation comes back with 25MM rows. This computation actually took a while. This is because when we
compute size, we are not only calculating the size of the data, but we are also actually loading the dataset. Now you think that is not very efficient. There are a couple of approaches you can take:
pandasbut in a distributed paradigb.
daskwill intelligently load data and process all the computations once by figuring out the various dependencies. This is a great approach if you don't have a lot of RAM available.
Now the way to load data in memory is by using the
persist method on the
df = df.persist()
persist call is non-blocking and you need to wait a bit for the data to load. Once it is loaded, you can compute the size as above.
Wall time: 35 ms
That computed instantly. Now you can scale to much larger data sizes and compute in parallel.