A tutorial on training a Logistic Regression classifier on Spark using Clojure.
In this short tutorial I’m going to show how to train a logistic regression classifier in a scalable manner with Apache Spark and Clojure using Flambo.
Assumptions:
The goal of the tutorial is to help you familiarize yourself with Flambo – a Clojure DSL for Apache Spark. Even though Flambo is far from being complete, it already does a decent job of wrapping basic Spark APIs into idiomatic Clojure.
During the course of the tutorial, we are going to train a classifier capable of predicting whether a wine would taste good given certain objective chemical characteristics.
Run these commands:
$ lein new app t01spark
$ cd t01spark
Here, t01spark
is the name of the project. You can give it any name you’d like. Don’t forger to change the current directory to the project you’ve just created.
Open project.clj
in a text editor and update the dependency section so it looks like this:
:dependencies
"1.6.0"]
[[org.clojure/clojure "0.6.0"]
[yieldbot/flambo .10 "1.3.0"]] [org.apache.spark/spark-mllib_2
Please note that although listing Spark jars in this manner is perfectly fine for exploratory projects, it is not suitable for production use. For that you will need to list them as “provided” dependencies in the profiles section, but let’s keep things simple for now.
Make sure that AOT is enabled, otherwise you will see strange ClassNotFound errors. Add this to the project file:
:aot :all
It also could make sense to add some extra memory for Spark:
:jvm-opts ^:replace ["-server" "-Xmx2g"]
In this tutorial we are going to use the Wine Quality Dataset. Download and save it along with the project.clj
file:
$ wget http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv
The simplest way or running the Clojure REPL is Leiningen’s `repl` command:
$ lein repl
Clojure 1.6.0
Java HotSpot(TM) 64-Bit Server VM 1.8.0_xxx
...
user=>
Of course, nothing prevents you from running REPL in Emacs with Cider, IntelliJ IDEA or any other Clojure-aware IDE.
require '[flambo.api :as f]
user=> (:as cf]
'[flambo.conf :as ft]
'[flambo.tuple :as s])
'[clojure.string
import '[org.apache.spark.mllib.linalg Vectors]
user=> (
'[org.apache.spark.mllib.regression LabeledPoint]
'[org.apache.spark.mllib.classification LogisticRegressionWithLBFGS] '[org.apache.spark.mllib.evaluation BinaryClassificationMetrics])
def spark
user=> (let [cfg (-> (cf/spark-conf)
("local[2]")
(cf/master "t01spark")
(cf/app-name "spark.akka.timeout" "300"))]
(cf/set (f/spark-context cfg)))
We’ve just created a Spark context bound to a local, in-process Spark server. You should see lots of INFO log messages in the terminal. That’s normal. Again, creating a Spark context like this will work for tutorial purposes, although in real life you’d probably want to wrap this expression into a memoizing function and call it whenever you need a context.
The data is stored in a CSV file with a header. We don’t need that header. To get rid of it, let’s enumerate rows and retain only those with indexes greater than zero. Then we split each row by the semicolon character and convert each element to float:
def data
user=> (;; Read lines from file
-> (f/text-file spark "winequality-red.csv")
(;; Enumerate lines.
;; This function is missing from Flambo,
;; so we call the method directly
(.zipWithIndex);; This is here purely for convenience:
;; it transforms Spark tuples into Clojure vectors
(f/map f/untuple);; Get rid of the header
< 0 idx)))
(f/filter (f/fn [[line idx]] (;; Split lines and transform values
(f/map (f/fn [[line _]]->> (s/split line #";")
(map #(Float/parseFloat %))))))) (
Let’s verify what’s in the RDD:
3)
user=> (f/take data 7.4 0.7 0.0 1.9 0.076 11.0 34.0 0.9978 3.51 0.56 9.4 5.0)
[(7.8 0.88 0.0 2.6 0.098 25.0 67.0 0.9968 3.2 0.68 9.8 5.0)
(7.8 0.76 0.04 2.3 0.092 15.0 54.0 0.997 3.26 0.65 9.8 5.0)] (
Looks legit.
The subjective wine quality information is contained in the Quality variable. It takes values in the [0..10] range. Let’s transform that into a binary variable by splitting it over the median. Wines with Quality below 6 will be considered “not good”, 6 and above - “good”.
I explored this dataset in R and found that the most interesting variables are Citric Acid, Total Sulfur Dioxide and Alcohol. I encourage you to experiment with adding other variables to the model. Also, using logarithms of those variables instead of raw values might be a good idea. Please refer to the Wine Quality Dataset documentation for a full variable list.
def dataset
user=> (
(f/map data
(f/fn [[_ _ citric-acid _ _ _
total-sulfur-dioxide _ _ _
alcohol quality]];; A wine is "good" if the quality is above the median
let [good (if (<= 6 quality) 0.0 1.0)
(;; these will be our predictors
double-array [citric-acid
pred (
total-sulfur-dioxide
alcohol])];; Spark requires samples to be packed into LabeledPoints
(LabeledPoint. good (Vectors/dense pred))))))
3)
user=> (f/take dataset 1.0,[0.0,34.0,9.399999618530273])>
[#<LabeledPoint (1.0,[0.0,67.0,9.800000190734863])>
#<LabeledPoint (1.0,[0.03999999910593033,54.0,9.800000190734863])>] #<LabeledPoint (
There is no order guarantee in derived RDDs, so you might get a different result.
; Temporary cache the source dataset
user=> (f/cache dataset) ; BTW, caching is a side effect
def training
user=> (-> (f/sample dataset false 0.8 1234)
(
(f/cache)))
def validation
user=> (-> (.subtract dataset training)
(
(f/cache)))
map f/count [training validation]) ; Check the counts
user=> (1291 235)
(
; no need to cache it anymore user=> (.unpersist dataset)
Caching is crucial for MLlib performance. Actually, Spark MLlib algorithms will complain if you feed them with uncached datasets.
MLlib-related parts are completely missing from Flambo, but that’s hopefully coming soon. For now, let’s use the Java API directly.
def classifier
user=> (doto (LogisticRegressionWithLBFGS.)
(;; Otherwise we'll need to provide it
true)))
(.setIntercept
def model
user=> (doto (.run classifier (.rdd training))
(;; We need the "raw" probability predictions
(.clearThreshold)))
user=> [(.intercept model) (.weights model)]9.805476268219566
[1.6766504448212323,0.011619041367225583,-0.9683045663615859]>] #<DenseVector [-
First, let’s create a function to compute the area under the precision-recall curve and the area under the receiver operating characteristic curve. These are the most important indicators of the predictive power of a trained classification model.
defn metrics [ds model]
user=> (;; Here we construct an RDD containing [prediction, label]
;; tuples and compute classification metrics.
let [pl (f/map ds (f/fn [point]
(let [y (.label point)
(
x (.features point)]
(ft/tuple (.predict model x) y))))
metrics (BinaryClassificationMetrics. (.rdd pl))]
[(.areaUnderROC metrics) (.areaUnderPR metrics)]))
Obtain metrics for the training dataset:
user=> (metrics training model)0.7800174890996763 0.7471259498290513]
[for the validation dataset:
And then
user=> (metrics validation model)0.7785138248847928 0.7160113864756078] [
Slightly overfitting. OK, let’s turn L2 regularization on and rebuild the model:
doto (.optimizer classifier)
user=> (0.0001))
(.setRegParam
def model
user=> (doto (.run classifier (.rdd training))
(
(.clearThreshold)))
user=> (metrics training model)0.7794660966515655 0.748073583460006]
[
user=> (metrics validation model)0.7807459677419355 0.7200550175610565] [
Looks good? I’m sure you can do better.
As a final step, let’s define a function that we could use for predicting wine quality:
defn is-good? [model citric-acid
user=> (
total-sulfur-dioxide alcohol]let [point (-> (double-array [citric-acid
(
total-sulfur-dioxide
alcohol])
(Vectors/dense))
prob (.predict model point)]< 0.5 prob)))
(
0.0 34.0 9.399999618530273)
user=> (is-good? model true
We have built a simple logistic regression classifier in Clojure on Apache Spark using Flambo. Some parts of the Flambo API are still missing, but it’s definitely usable. It was not terribly difficult to get it working and I hope you had fun.