Random Forest on GPUs: 2000x Faster than Apache Spark

This blog post compares using RAPIDS and Dask vs Apache Spark for model training

If you prefer to watch a video demo, click here.

Random forest is a machine learning algorithm trusted by many data scientists for its robustness, accuracy, and scalability. The algorithm trains many decision trees through bootstrap aggregation, then predictions are made from aggregating the outputs of the trees in the forest. Due to its ensemble nature, a random forest is an algorithm that can be implemented in distributed computing settings. Trees can be trained in parallel across processes and machines in a cluster, resulting in significantly faster training time than using a single process.

In this article, we explore implementations of distributed random

forest training on clusters of CPU machines using Apache Spark and compare that to the performance of training on clusters of GPU machines using RAPIDS and dask. While GPU computing in the ML world

has traditionally been reserved for deep learning applications, RAPIDS is a library that executes data processing and non-deep learning ML workloads on GPUs, leading to immense performance speedups when compared to executing on CPUs. We trained a random forest model using 300 million instances: Spark took 37 minutes on a 20-node CPU cluster, whereas RAPIDS took 1 second on a 20-node GPU cluster. That’s over 2000x faster with GPUs!

Warp speed random forest with GPUs and RAPIDS!

Experiment overview

We use the publicly available NYC Taxi dataset and train a random forest regressor that can predict the fare amount of a taxi ride using attributes related to rider pickup. Taxi rides from 2017, 2018, and 2019 were used as the training set, amounting to 300,700,143 instances.

The Spark and RAPIDS code is available in Jupyter notebooks here.

Hardware

Spark clusters are managed using Amazon EMR, while Dask/RAPIDS clusters are managed using Saturn Cloud.

Both clusters have 20 worker nodes with these AWS instance types:

  • Spark: r5.2xlarge
    • 8 CPU, 64 GB RAM
    • On-demand price: $0.504/hour
  • RAPIDS: g4dn.xlarge
    • 4 CPU, 16 GB RAM
    • 1 GPU , 16 GB GPU RAM (NVIDIA T4)
    • On-demand price: $0.526/hour

Saturn Cloud can also launch Dask clusters with NVIDIA Tesla V100 GPUs, but we chose g4dn.xlarge for this exercise to maintain a similar hourly cost profile as the Spark cluster.

Spark

Apache Spark is an open-source big data processing engine built-in Scala with a Python interface that calls down to the Scala/JVM code. It’s a staple in the Hadoop processing ecosystem, built around the MapReduce paradigm, and has interfaces for DataFrames as well as machine learning.

Setting up a Spark cluster is outside of the scope of this article, but once you have a cluster ready, you can run the following inside a Jupyter notebook to initialize Spark:

import findspark findspark.init()

from pyspark.sql import SparkSession

spark = (SparkSession
    .builder
    .config('spark.executor.memory', '36g')
    .getOrCreate())

The findspark package detects the location of the Spark install on your system; this may not be required if the Spark packages are discoverable. There are several configuration settings that need to be set to get performant Spark code, and it depends on your cluster setup and workflow. In this case, we set spark.executor.memory to ensure we don’t encounter any memory overflow or Java heap errors.

RAPIDS AI

RAPIDS is an open-source Python framework that executes data science code on GPUs instead of CPUs. This results in huge performance gains for data science work, similar to those seen for training deep learning models. RAPIDS has interfaces for DataFrames, ML, graph analysis, and more. RAPIDS uses Dask to handle parallelizing to machines with multiple GPUs, as well as a cluster of machines each with one or more GPUs.

Setting up GPU machines can be a bit tricky, but Saturn Cloud has pre-built images for launching GPU clusters so you get up and running in just a few minutes! To initialize a Dask client pointing to your cluster, you can run the following:

from dask.distributed import Client
from dask_saturn import SaturnCluster

cluster = SaturnCluster()
client = Client(cluster)

To set up a Dask cluster yourself, refer to this docs page.

Data loading

The data files are hosted on a public S3 bucket, so we can read the CSVs directly from there. The S3 bucket has all files in the same directory, so we use s3fs to select the files we want:

import s3fs
fs = s3fs.S3FileSystem(anon=True)
files = [f"s3://" for x in fs.ls('s3://nyc-tlc/trip data/')
         if 'yellow' in x and ('2019' in x or '2018' in x or '2017' in x)]
         
cols = ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime',
        'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag',
        'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra',
        'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge',
        'total_amount']

With Spark, we need to read in each CSV file individually than combine them together:

import functools
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import DataFrame

# manually specify schema because inferSchema in read.csv is quite slow
schema = StructType([
    StructField('VendorID', DoubleType()),
    StructField('tpep_pickup_datetime', TimestampType()),
    ...
    # refer to notebook for full schema object
]) 

def read_csv(path):
    df = spark.read.csv(path,
                        header=True,
                        schema=schema,
                        timestampFormat='yyyy-MM-dd HH:mm:ss',
                       )

    df = df.select(cols)

    return df

dfs = []
for tf in files:
    df = read_csv(tf)
    dfs.append(df)
taxi = functools.reduce(DataFrame.unionAll, dfs)
taxi.count()

With Dask+RAPIDS, we can read in all the CSV files in one shot:

import dask_cudf
taxi = dask_cudf.read_csv(files, 
                          assume_missing=True,
                          parse_dates=[1,2], 
                          usecols=cols, 
                          storage_options=)
len(taxi)

Feature engineering

We’ll generate a few features based on the pickup time and then cache/persist the DataFrame. In both frameworks, this executes all the CSV loading and preprocessing, and stores the results in RAM (in the RAPIDS case, GPU RAM). The features we will use for training are:

features = ['pickup_weekday', 'pickup_hour', 'pickup_minute',
            'pickup_week_hour', 'passenger_count', 'VendorID', 
            'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 
            'DOLocationID']

For Spark, we need to collect the features into a Vector class:

from pyspark.ml.feature import VectorAssembler
from [pyspark](https://saturncloud.io/glossary/pyspark).ml.pipeline import Pipeline

taxi = taxi.withColumn('pickup_weekday', F.dayofweek(taxi.tpep_pickup_datetime).cast(DoubleType()))
taxi = taxi.withColumn('pickup_hour', F.hour(taxi.tpep_pickup_datetime).cast(DoubleType()))
taxi = taxi.withColumn('pickup_minute', F.minute(taxi.tpep_pickup_datetime).cast(DoubleType()))
taxi = taxi.withColumn('pickup_week_hour', ((taxi.pickup_weekday * 24) + taxi.pickup_hour).cast(DoubleType()))
taxi = taxi.withColumn('store_and_fwd_flag', F.when(taxi.store_and_fwd_flag == 'Y', 1).otherwise(0))
taxi = taxi.withColumn('label', taxi.total_amount)  
taxi = taxi.fillna(-1)

assembler = VectorAssembler(
    inputCols=features,
    outputCol='features',
)

pipeline = Pipeline(stages=[assembler])

assembler_fitted = pipeline.fit(taxi)
X = assembler_fitted.transform(taxi)

X.cache()
X.count()

For RAPIDS, we convert all float values to float32 precision for GPU computing:

from dask import persist
from dask.distributed import wait

taxi['pickup_weekday'] = taxi.tpep_pickup_datetime.dt.weekday
taxi['pickup_hour'] = taxi.tpep_pickup_datetime.dt.hour
taxi['pickup_minute'] = taxi.tpep_pickup_datetime.dt.minute
taxi['pickup_week_hour'] = (taxi.pickup_weekday * 24) + taxi.pickup_hour
taxi['store_and_fwd_flag'] = (taxi.store_and_fwd_flag == 'Y').astype(float)
taxi = taxi.fillna(-1)

X = taxi[features].astype('float32')
y = taxi['total_amount']

X, y = persist(X, y)
_ = wait([X, y])
len(X)

Train random forest!

We initialize and train the random forest in a couple of lines for both packages.

Spark:


from pyspark.ml.[regression](https://saturncloud.io/glossary/regression) import RandomForestRegressor
import RandomForestRegressor

rf = RandomForestRegressor(numTrees=100, maxDepth=10, seed=42)
fitted = rf.fit(X)

RAPIDS:

from cuml.dask.ensemble import RandomForestRegressor
rf = RandomForestRegressor(n_estimators=100, max_depth=10, seed=42)
_ = rf.fit(X, y)

Results

We trained a random forest model on 300,700,143 instances of NYC taxi data on Spark (CPU) and RAPIDS (GPU) clusters. Both clusters had 20 worker nodes and approximately the same hourly price. Here are the results for each portion of the workflow:

TaskSparkRapids
Load/row count20.6 seconds25.5 seconds
Feature engineering54.3 seconds23.1 seconds
Random forest36.9 minutes1.02 seconds

That’s 37 minutes with Spark vs. 1 second for RAPIDS

GPUs for the win! Think about how much faster you can iterate and improve your model when you don’t have to wait over 30 minutes for a single fit. Once you add in hyperparameter tuning or testing different models, each iteration can easily add up to hours or days.

Need to see it to believe it? You can find the notebooks here and run them yourself!

Do you need faster Random Forest?

Yes! You can get going on a Dask/RAPIDS cluster in seconds with Saturn Cloud. Saturn handles all the tooling infrastructure, security, and deployment headaches to get you up and running with RAPIDS right away. Click here to use it for free.


About Saturn Cloud

Saturn Cloud is your all-in-one solution for data science & ML development, deployment, and data pipelines in the cloud. Spin up a notebook with 4TB of RAM, add a GPU, connect to a distributed cluster of workers, and more. Request a demo today to learn more.