Overview

You can use sparklyr to fit a wide variety of machine learning algorithms in Apache Spark. This analysis compares the performance of six classification models in Apache Spark on the Titanic data set.

For the Titanic data, decision trees and random forests performed the best and had comparatively fast run times. See results for a detailed comparison.

ID Function Description AUC Rank Run time Rank
1 Random forest ml_random_forest 1 3
2 Decision tree ml_decision_tree 2 2
3 Gradient boosted tree ml_gradient_boosted_trees 3 6
4 Logistic regression ml_logistic_regression 4 4
5 Multilayer perceptron (neural net) ml_multilayer_perceptron 5 5
6 Naive Bayes ml_naive_bayes 6 1

Load the data

Load the popular Titanic data set into a local spark cluster.

First, download the titanic package from CRAN. These data can be used to predict survival based on factors including: class, gender, age, and family. A thorough background is available on Kaggle.

Second, create local Spark cluster. This example uses Apache Spark 2.0.0 which includes advanced feature transforms and methods which will be used later in the analysis.

Third, load the data into the local spark context. Data can be loaded from CSV or Parquet format.

library(sparklyr)
library(dplyr)
library(tidyr)
library(titanic)
library(ggplot2)
library(purrr)

# Connect to local spark cluster and load data
sc <- spark_connect(master = "local", version = "2.0.0")
spark_read_parquet(sc, name = "titanic", path = "titanic-parquet")
## Source:   query [?? x 12]
## Database: spark connection master=local[4] app=sparklyr local=TRUE
## 
##    PassengerId Survived Pclass
##          <int>    <int>  <int>
## 1            1        0      3
## 2            2        1      1
## 3            3        1      3
## 4            4        1      1
## 5            5        0      3
## 6            6        0      3
## 7            7        0      1
## 8            8        0      3
## 9            9        1      3
## 10          10        1      2
## # ... with more rows, and 9 more variables: Name <chr>, Sex <chr>,
## #   Age <dbl>, SibSp <int>, Parch <int>, Ticket <chr>, Fare <dbl>,
## #   Cabin <chr>, Embarked <chr>
titanic_tbl <- tbl(sc, "titanic")

Tidy the data

Tidy the data in preparation for model fitting. sparkyr uses dplyr syntax when connecting to the Spark SQL API and specific functions functions for connecting to the Spark ML API.

Spark SQL transforms

Use feature transforms with Spark SQL. Create new features and modify existing features with dplyr syntax.

ID Feature Action
1 Family_Size Create number of siblings and parents
2 Pclass Format passenger class as character not numeric
3 Embarked Remove a small number of missing records
4 Age Impute missing age with average age
# Transform features with Spark SQL API
titanic2_tbl <- titanic_tbl %>% 
  mutate(Family_Size = SibSp + Parch + 1L) %>% 
  mutate(Pclass = as.character(Pclass)) %>%
  filter(!is.na(Embarked)) %>%
  mutate(Age = if_else(is.na(Age), mean(Age), Age)) %>%
  sdf_register("titanic2")

Tip: sdf_register is used to save our table for later analysis.

Spark ML transforms

Use feature transforms with Spark ML. Use ft_bucketizer to bucket family sizes into groups.

# Transform family size with Spark ML API
titanic_final_tbl <- titanic2_tbl %>%
  mutate(Family_Size = as.numeric(Family_size)) %>%
  sdf_mutate(
    Family_Sizes = ft_bucketizer(Family_Size, splits = c(1,2,5,12))
    ) %>%
  mutate(Family_Sizes = as.character(as.integer(Family_Sizes))) %>%
  sdf_register("titanic_final")

Tip: You can use magrittr pipes to chain dplyr commands with sparklyr commands. For example, mutate is a dplyr command that accesses the Spark SQL API whereas sdf_mutate is a sparklyr command that accesses the Spark ML API.

Train-validation split

Randomly partition the data into train and test sets.

# Partition the data
partition <- titanic_final_tbl %>% 
  mutate(Survived = as.numeric(Survived), SibSp = as.numeric(SibSp), Parch = as.numeric(Parch)) %>%
  select(Survived, Pclass, Sex, Age, SibSp, Parch, Fare, Embarked, Family_Sizes) %>%
  sdf_partition(train = 0.75, test = 0.25, seed = 8585)

# Create table references
train_tbl <- partition$train
test_tbl <- partition$test

Tip: Use sdf_partition to create training and testing splits.


Train the models

Train multiple machine learning algorithms on the training data. Score the test data with the fitted models.

Logistic regression

Logistic regression is one of the most common classifiers. Train the logistic regression and examine the predictors.

# Model survival as a function of several predictors
ml_formula <- formula(Survived ~ Pclass + Sex + Age + SibSp + Parch + Fare + Embarked + Family_Sizes)

# Train a logistic regression model
(ml_log <- ml_logistic_regression(train_tbl, ml_formula))
## Call: Survived ~ Pclass_2 + Pclass_3 + Sex_male + Age + SibSp + Parch + Fare + Embarked_Q + Embarked_S + Family_Sizes_1 + Family_Sizes_2
## 
## Coefficients:
##    (Intercept)       Pclass_2       Pclass_3       Sex_male            Age 
##    3.770024202   -1.001174014   -2.077589828   -2.674074995   -0.041217932 
##          SibSp          Parch           Fare     Embarked_Q     Embarked_S 
##   -0.056016163    0.162832732    0.000293634    0.363901651   -0.101122063 
## Family_Sizes_1 Family_Sizes_2 
##    0.141765426   -1.826757360

Other ML algorithms

Run the same formula using the other machine learning algorithms. Notice that training times vary greatly between methods.

## Decision Tree
ml_dt <- ml_decision_tree(train_tbl, ml_formula)

## Random Forest
ml_rf <- ml_random_forest(train_tbl, ml_formula)

## Gradient Boosted Tree
ml_gbt <- ml_gradient_boosted_trees(train_tbl, ml_formula)

## Naive Bayes
ml_nb <- ml_naive_bayes(train_tbl, ml_formula)

## Neural Network
ml_nn <- ml_multilayer_perceptron(train_tbl, ml_formula, layers = c(11,15,2))

Validation data

Score the test data with the trained models.

# Bundle the modelss into a single list object
ml_models <- list(
  "Logistic" = ml_log,
  "Decision Tree" = ml_dt,
  "Random Forest" = ml_rf,
  "Gradient Boosted Trees" = ml_gbt,
  "Naive Bayes" = ml_nb,
  "Neural Net" = ml_nn
)

# Create a function for scoring
score_test_data <- function(model, data=test_tbl){
  pred <- sdf_predict(model, data)
  select(pred, Survived, prediction)
}

# Score all the models
ml_score <- lapply(ml_models, score_test_data)

Compare results

Compare the model results. Examine performance metrics: lift, AUC, and accuracy. Also examine feature importance to see what features are most predictive of survival.

Model lift

Lift compares how well the model predicts survival compared to random guessing. Use the function below to estimate model lift for each scored decile in the test data. The lift chart suggests that the tree models (random forest, gradient boosted trees, or the decision tree) will provide the best prediction.

# Lift function
calculate_lift <- function(scored_data) {
  scored_data %>%
    mutate(bin = ntile(desc(prediction), 10)) %>% 
    group_by(bin) %>% 
    summarize(count = sum(Survived)) %>% 
    mutate(prop = count / sum(count)) %>% 
    arrange(bin) %>% 
    mutate(prop = cumsum(prop)) %>% 
    select(-count) %>% 
    collect() %>% 
    as.data.frame()
}

# Initialize results
ml_gains <- data.frame(bin = 1:10, prop = seq(0, 1, len = 10), model = "Base")

# Calculate lift
for(i in names(ml_score)){
  ml_gains <- ml_score[[i]] %>%
    calculate_lift %>%
    mutate(model = i) %>%
    rbind(ml_gains, .)
}

# Plot results
ggplot(ml_gains, aes(x = bin, y = prop, colour = model)) +
  geom_point() + geom_line() +
  ggtitle("Lift Chart for Predicting Survival - Test Data Set") + 
  xlab("") + ylab("")