Getting Started With Spark

Many of you may be interested in how to get going with Spark.   Let’s look at a walkthrough.

Step 0: Get your system Ready.

You need to download a working jdk, if you don’t already have it.  We recommend at least Java 7.

Now, you need to download Scala as well, if you don’t have it already

Use the following link: https://www.scala-lang.org/download/

Once ready, open a shell window (or windows command prompt) and test and make sure scala and sbt are both installed and in your path.

$ scala

$ sbt

Windows users have a few extra steps.

First, download winutils.exe from https://github.com/steveloughran/winutils/raw/master/hadoop-2.6.0/bin/winutils.exe

Put this in a directory and add to your path. (example C:\Winutils\)  This provides support for running hadoop libraries in windows.  For more info, please see this link:  https://wiki.apache.org/hadoop/WindowsProblems

Run the following command (again, Windows users only)

C:\winutils\bin\winutils.exe chmod 777 C:\tmp\hive

HADOOP_HOME=c:\winutils\

Ok, now we should be ready to run Spark.

Step 1:  Download Spark

You can download the latest spark from http://spark.apache.org/downloads.html

Here is a link you can use:

wget spark-2.1.0-bin-hadoop2.7.tgz

Once here, I like to put the spark directory named spark in my home directory.  Mac and Linux users can do that as follows:

$ mv spark-2.1.0-bin-hadoop2.7 ~/spark

Windows users can do something similar from a command prompt

> rename spark-2.1.0-bin-hadoop2.7 c:\spark

Step 2: Run Spark

You can run spark as follows

$ ~/spark/bin/start-all.sh  #Mac/Linux

c:\spark\bin\start-all.exe  (Windows)

Spark is now running on your machine!

Step 3: Check out the Spark UI

Go to localhost:8080. This will be your Spark master.  It should look something like this.

Check the following things out:

  1. How many Masters are running?  How Many Workers?
  2. What nodes are they running on?
  3. What is the memory availability?

 

Why Spark’s MLlib Clustering does not have a named vector (NamedVector) class

I wondered where the equivalent of NamedVector (from Mahout) was in Spark’s MLlib. It’s certainly useful to have some way of identifying a vector from some other fashion than the data itself. When running mahout I always use the NamedVector class when clustering (or the -nv option from the command line), and always wondered why anyone would do otherwise  because without which it makes locating the vectors difficult.

R users will recall a nifty feature that elegantly solves the named
vector problem: the rowname, that can be used to identify rows in a
matrix (or dataframe) that we use to send to our machine learning
algorithms. This is better than spending a column in our dataframe
for identifying data.

In the case of Spark, there is no such class or capability. There is
Vectors.dense, and Vectors.sparse, each of which takes only items of
type double. No identifiers are allowed, unless one wants to use up
one item of the vector as a numeric key. This is a bad idea; as that
key will then be treated as a variable, and it’s not a variable.

The reasoning behind this, I think, is that a NamedVector is
essentially a key-value pair, with the name as a key and the Vector as
a value. Spark does have key values pairs with a 2-item Tuple with
the first item as key, and the second item as the value. So, in
theory, there’s no need for a named vector.

To avoid confusion, there is also something totally different called a
labeled point. LabeledPoint to be exact. This does include a label,
but this label is used as the outcome variable of a classification or
regression training (or any other supervised algorithm). For example,
for a spam classifier, the outcome variable would be 0.0 = “not spam”
and 1.0 = “spam”. It’s not used to identify the vector. And, it like
everything else about the vector it is a double type — despite the
term “label.”

At first glance, it seems like being able to name or key our vectors
would be useful. This is especially true in clustering, where we want
to keep track of which of our clustered points matches with each cluster.
For classification and regression problems it’s not SO critical, as we’re
really more concerned with training the model to predict future data
rather than get the values from our training set.

It seems that Spark’s creators decided to make clustering run a bit
like classification by training a “model” that can then run predict(),
which to those with prior experience in machine learning is a little
odd. After all, isn’t clustering an unsupervised algorithm? We don’t
usually train a model in unsupervised learning, the word “train” after
all implies supervised learning.

The advantage of this approach is that clustering ends up working just
like the prediction models, in fact clustering “models” can then
double as prediction models once trained. And, being able to do the
prediction removes the need for storing a name with the vector. Or
even storing vectors at all along with the model.

So with kmeans we can do something like this.

Note that we’re calling predict() at the end.

This is fine for kmeans, as the final work performing the predict() is
pretty simple, just has to find the closest centroid. Still, it seems
like we’re doing work twice, once to do the clustering, and again to
get the cluster assignment, which the original output of kmeans should
have given us.

In fact, KMeansModel doesn’t give us a list of cluster membership from
the trained model. It has exactly two members:


k : Double // Number of cluster centroids
val clusterCenters : Array[Array[Double]]
//An array of the cluster centers (centroids)

And two methods:

predict(point: Array[Double]) : Int
// Gives us an integer for cluster membership
computeCost(data: RDD[Array[Double]]) : Double
// Calculate WSSSE (cost) of data

That’s it. So despite the fact that the vectors must have gone into
making the model, they aren’t saved with the model. The only way to
get them is to call predict(). Note how this is totally different
from the way mahout works.

So that’s the reason. Hope this clears up how clustering vectors works in MLib.
It was a bit confusing for me too — I think my past knowledge of mahout hurt me.

Cheers!

Building Spark with Maven: the PermGen space error and Java heap space error

Spark doesn’t build properly with maven out of the box. While this is clearly stated in the Building Spark Page in the documentation, it’s easy to miss.

You’ll get an error like the following:

$ mvn clean package
.....

[INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-2.10/classes...
[ERROR] PermGen space -> [Help 1]

[INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-2.10/classes...
[ERROR] Java heap space -> [Help 1]

Basically, this says that the PermGen space and/or Java heap space have been exceeded. While Java heap space issues are so common that I instinctively bump that up when seeing issues (i.e., -Xmx2g), the Perm space has to be set separately.

That said, the solution is pretty straightforward.

That should fix the problem and allow spark to be built properly. I also had issues building spark on my home directory, which was encrypted. This was a second issue, and I posted a workaround for it in an earlier post.

Clustering with K-Means with Spark and MLlib

MLlib provides a parallelized clustering algorithm called kmeans||, which allows us to have an efficiently parallelized algorithm on Spark.  Clustering is an unsupervised machine learning that helps us discover natural patterns in data.

What is k-means?  It’s about the simplest algorithm out there, it essentially starts with a given number (k) of centroids, and randomly iterates the centroids until they converge.  Points are then assigned to the nearest centroid, making a cluster.  It works great for data which is clusterable by circles/spheres (actually hyperspheres).  For data which has more convoluted patterns, such as rings and other shapes, we can use hierarchical clustering.

The other limitation is that we have to know k in advance to make the algorithm work.  Sometimes we know what we’re looking for, but usually, we don’t.  That means we often end up having to run clustering many times, measuring how much data we capture.

If we have a dataset we want to cluster, the first step is to convert it to a vector class.   MLlib offers two: Vectors.dense and Vectors.sparse.  The latter is very good one-hot encoding (is_red, is_blue, is_green, etc), and especially for encoding text vectors, such as tfidf or word2vec.  I’ll talk in another post about how to vectorize text.

Instead, let’s just use Vectors.dense and we can use a dataset near and dear to R users: mtcars.  It’s one of R’s standard example datasets which gives some statistics on a few different models of cars.  We can extract mtcars from R by using write.csv, and we can use the file as mtcars.csv, but we’ll remove the header row for simplicity.  Of course, it’d be silly to use Spark for such a tiny dataset (as we could easily just use R), but it serves the purpose of an example.

Great. Now we have some vectors.   We had to drop the name associated with each car and so our vectors are nameless — more on that later.

Now we need to make a KMeansModel object.   This may seem strange at first glance since in R and Mahout, there’s no model associated with K-means since there’s no training involved in an unsupervised ML algorithm.  Probably for the sake of consistency, MLlib treats Kmeans as a model that has to be “trained” with data, and then can be applied to new data using predict(), as if it was performing classification. While odd, this actually is a bonus because it easily allows us to use our clusters as a classification model for unseen data.

So clusters in this case is the KMeansModel object.  We chose a “K” value of 2, which probably isn’t going to get good results with this dataset.  How do we check that? We can use computecost()

The Spark documentation calls the cost WSSSE (Within Set Sum of Squared Errors).   Typically this should get better as k gets higher, but higher values of k may not produce very useful clusters (lots of clusters-of-one, for instance).

Intuitively, we should set k to be just before a point of inflection wherein the law of diminishing returns sets in, sometimes called the “elbow method.” But we should also look at where we start getting lots of small and meaningless clusters.

So now we have a KMeansModel set with our value of k. What does that give us? It assigns a number to each cluster (in the case of k=2, then just 0,1), but remember that we dropped the name for each vector. So we know which vector is in each cluster, but how do we relate this to the original data?  As I’ve done this exercise in Mahout, I was looking for the NamedVector class, which unfortunately doesn’t exist in Spark. The Spark team apparently doesn’t feel one is needed.

In Spark, the right way to do this is to join back the vector to the original data. To do that, we need to create a pair of names and vectors.

So that gives us our clustering results. As we said before, we can call predict() on new data that we might have, to see which cluster it would correspond to.

The new data doesn’t actually change the model, however. That’s frozen in time forever until we train a new one. There is, however, another class called StreamingKmeans, which will actually adjust clusters to new data, so we can use it in a streaming fashion. We’ll talk about that another time.

Installing Spark on encrypted file systems: Resolving the “File Name Too Long” error.

I mostly run Linux on the systems I work on. Though I prefer CentOS for server and cloud instances, due to better compatibility with Hadoop, I’m more partial to Ubuntu or especially Linux Mint for my workstation Linux, especially on my laptop.

Ubuntu and Mint both use ecryptfs (http://ecryptfs.org) for home folder encryption, which is optional. I use it because I don’t like the idea of my files being accessible in case I lose track of my laptop.

While encryptfs is great, it creates some complications while building spark, because there’s apparently some limitations with very long filenames.

Once I try to build spark (for example, with maven), I get an error after much maven spam:


$ mvn -DskipTests clean package
....
....
[error] uncaught exception during compilation: java.io.IOException
[error] File name too long

One easy workaround is not to build spark in the home folder. It doesn’t need to be there. Putting it in /usr/local, /opt, or something else is probably ok. But it is true that for most, the home folder is the first place it’s going to go, and so it’s nice to have another workaround.

There is a jira on the subject here:
https://issues.apache.org/jira/browse/SPARK-4820

The solution is to edit pom.xml, and add the following lines in compile options:

<arg>-Xmax-classfile-name</arg>
<arg>128</arg>

For me on Spark 1.3.0, that worked out to line 1130 in the pom.xml

If you used sbt instead of maven to build, the solution per the jira is similar:

scalacOptions in Compile ++= Seq("-Xmax-classfile-name", "128"),