Running PySpark with Jupyter Notebooks

Many users would like to run jupyter notebooks with PySpark.  Unfortunately, it’s not so simple as simply starting “jupyter notebook”  That will open a jupyter notebook server, but pyspark code will not run.

What we need to do is run jupyter inside of Pyspark.  To do that, we can set the following variables:

One way to run this would be the following:

This will open up a jupyter notebook window in which pyspark code will run in your pyspark shell.

If you’re running on a cluster, then you can do pass the arguments to the cluster node.



Using the Spark Shell

Please see my previous post for using Spark.

We are going to go over using the spark shell.

Step 1:  Running the Spark shell

Start the Spark shell, by running

$ ~/spark/spark-shell



You should see something like this:


Step 2: Check out spark UI

Now that your shell is started, you should be able to browse to http://localhost:4040 and check out the spark UI.  Note that this is a different port (4040 vs 8080) from the previous example.

STEP 3: Spark context

Within Spark shell, variable sc is the SparkContext Type sc in scala prompt and see what happens. Your output might look like this

To see all methods in sc variable, type sc. and double-TAB This will show all the available methods on sc variable. (This only works on Scala shell for now)

Try the following:

==> Print the name of application name sc.appName

==> Find the ‘Spark master’ for the shell sc.master

STEP 4: Load a file

Let’s load an example file:

twinkle twinkle little star how I wonder what you are up above the world so high like a diamond in the sky twinkle twinkle little star

Let’s load the file:

==> What is the ‘type’ of f ?
hint : type f on the console

==> Inspect Spark Shell UI on port 4040, do you see any processing done? Why (not)?

==> Print the first line / record from RDD
hint : f.first()

==> Again, inspect Spark Shell UI on port 4040, do you see any processing done? Why (not)?

==> Print first 3 lines of RDD
hint : f.take(???) (provide the correct argument to take function)

==> Again, inspect Spark Shell UI on port 4040, do you see any processing done? Why (not)?

==> Print all the content from the file
hint : f.collect()

==> How many lines are in the file?
hint : f.count()

==> Inspect the ‘Jobs’ section in Shell UI (in browser)
Also inspect the event time line

Getting Started With Spark

Many of you may be interested in how to get going with Spark.   Let’s look at a walkthrough.

Step 0: Get your system Ready.

You need to download a working jdk, if you don’t already have it.  We recommend at least Java 7.

Now, you need to download Scala as well, if you don’t have it already

Use the following link:

Once ready, open a shell window (or windows command prompt) and test and make sure scala and sbt are both installed and in your path.

$ scala

$ sbt

Windows users have a few extra steps.

First, download winutils.exe from

Put this in a directory and add to your path. (example C:\Winutils\)  This provides support for running hadoop libraries in windows.  For more info, please see this link:

Run the following command (again, Windows users only)

C:\winutils\bin\winutils.exe chmod 777 C:\tmp\hive


Ok, now we should be ready to run Spark.

Step 1:  Download Spark

You can download the latest spark from

Here is a link you can use:

wget spark-2.1.0-bin-hadoop2.7.tgz

Once here, I like to put the spark directory named spark in my home directory.  Mac and Linux users can do that as follows:

$ mv spark-2.1.0-bin-hadoop2.7 ~/spark

Windows users can do something similar from a command prompt

> rename spark-2.1.0-bin-hadoop2.7 c:\spark

Step 2: Run Spark

You can run spark as follows

$ ~/spark/bin/  #Mac/Linux

c:\spark\bin\start-all.exe  (Windows)

Spark is now running on your machine!

Step 3: Check out the Spark UI

Go to localhost:8080. This will be your Spark master.  It should look something like this.

Check the following things out:

  1. How many Masters are running?  How Many Workers?
  2. What nodes are they running on?
  3. What is the memory availability?


Why Spark’s MLlib Clustering does not have a named vector (NamedVector) class

I wondered where the equivalent of NamedVector (from Mahout) was in Spark’s MLlib. It’s certainly useful to have some way of identifying a vector from some other fashion than the data itself. When running mahout I always use the NamedVector class when clustering (or the -nv option from the command line), and always wondered why anyone would do otherwise  because without which it makes locating the vectors difficult.

R users will recall a nifty feature that elegantly solves the named
vector problem: the rowname, that can be used to identify rows in a
matrix (or dataframe) that we use to send to our machine learning
algorithms. This is better than spending a column in our dataframe
for identifying data.

In the case of Spark, there is no such class or capability. There is
Vectors.dense, and Vectors.sparse, each of which takes only items of
type double. No identifiers are allowed, unless one wants to use up
one item of the vector as a numeric key. This is a bad idea; as that
key will then be treated as a variable, and it’s not a variable.

The reasoning behind this, I think, is that a NamedVector is
essentially a key-value pair, with the name as a key and the Vector as
a value. Spark does have key values pairs with a 2-item Tuple with
the first item as key, and the second item as the value. So, in
theory, there’s no need for a named vector.

To avoid confusion, there is also something totally different called a
labeled point. LabeledPoint to be exact. This does include a label,
but this label is used as the outcome variable of a classification or
regression training (or any other supervised algorithm). For example,
for a spam classifier, the outcome variable would be 0.0 = “not spam”
and 1.0 = “spam”. It’s not used to identify the vector. And, it like
everything else about the vector it is a double type — despite the
term “label.”

At first glance, it seems like being able to name or key our vectors
would be useful. This is especially true in clustering, where we want
to keep track of which of our clustered points matches with each cluster.
For classification and regression problems it’s not SO critical, as we’re
really more concerned with training the model to predict future data
rather than get the values from our training set.

It seems that Spark’s creators decided to make clustering run a bit
like classification by training a “model” that can then run predict(),
which to those with prior experience in machine learning is a little
odd. After all, isn’t clustering an unsupervised algorithm? We don’t
usually train a model in unsupervised learning, the word “train” after
all implies supervised learning.

The advantage of this approach is that clustering ends up working just
like the prediction models, in fact clustering “models” can then
double as prediction models once trained. And, being able to do the
prediction removes the need for storing a name with the vector. Or
even storing vectors at all along with the model.

So with kmeans we can do something like this.

Note that we’re calling predict() at the end.

This is fine for kmeans, as the final work performing the predict() is
pretty simple, just has to find the closest centroid. Still, it seems
like we’re doing work twice, once to do the clustering, and again to
get the cluster assignment, which the original output of kmeans should
have given us.

In fact, KMeansModel doesn’t give us a list of cluster membership from
the trained model. It has exactly two members:

k : Double // Number of cluster centroids
val clusterCenters : Array[Array[Double]]
//An array of the cluster centers (centroids)

And two methods:

predict(point: Array[Double]) : Int
// Gives us an integer for cluster membership
computeCost(data: RDD[Array[Double]]) : Double
// Calculate WSSSE (cost) of data

That’s it. So despite the fact that the vectors must have gone into
making the model, they aren’t saved with the model. The only way to
get them is to call predict(). Note how this is totally different
from the way mahout works.

So that’s the reason. Hope this clears up how clustering vectors works in MLib.
It was a bit confusing for me too — I think my past knowledge of mahout hurt me.


R: error installing R package devtools on Linux

As I attempted to install the R package “devtools” on my Ubuntu 14.04 laptop I encountered an error as follows:

> install.packages("devtools")
Installing package into ‘/home/tfox/R/x86_64-pc-linux-gnu-library/3.2’
(as ‘lib’ is unspecified)
also installing the dependencies ‘mime’, ‘bitops’, ‘brew’, ‘httr’, ‘RCurl’, ‘memoise’, ‘whisker’, ‘evaluate’, ‘rstudioapi’, ‘jsonlite’, ‘roxygen2’

... SNIP ...

* installing *source* package ‘RCurl’ ...
** package ‘RCurl’ successfully unpacked and MD5 sums checked
checking for curl-config... no
Cannot find curl-config
ERROR: configuration failed for package ‘RCurl’
* removing ‘/home/tfox/R/x86_64-pc-linux-gnu-library/3.2/RCurl’
Warning in install.packages :
installation of package ‘RCurl’ had non-zero exit status

ERROR: dependency ‘RCurl’ is not available for package ‘httr’
* removing ‘/home/tfox/R/x86_64-pc-linux-gnu-library/3.2/httr’
Warning in install.packages :
installation of package ‘httr’ had non-zero exit status
ERROR: dependencies ‘httr’, ‘RCurl’ are not available for package ‘devtools’
* removing ‘/home/tfox/R/x86_64-pc-linux-gnu-library/3.2/devtools’
Warning in install.packages :
installation of package ‘devtools’ had non-zero exit status

Oops, the error indicates we have an OS dependency on a shared library
which we don’t have. Noticing this we need the development version
of libcurl. Let’s install it and try again.

For Ubuntu / Mint / Debian:

sudo apt-get install libcurl4-gnutls-dev

For CentOS / Fedora / RHEL:

$ sudo yum -y install libcurl libcurl-devel

Once we install this, we can re-install devtools and it
installs fine.

Starting a Hadoop Cluster in EC2 — Starting the Cluster

For those experimenting with Hadoop, the quickest way to get going is to spin up a cluster in Amazon AWS’s Elastic Compute Cloud, better known as EC2. Of course, this is not just for beginners, as many companies, particularly smaller ones, rely heavily on AWS.

I am going to walk through the step by step process of setting up a 4-node Hadoop cluster on EC2. The first part of this is to just set an image. Of course, the absolute easiest way to go would be to use a pre-built Hortonworks or Cloudera image, but we’ll focus here on using a generic CentOS linux image and going from there.

There’s really nothing at all here in this part intrinsic to Hadoop. This is just setting up four identical CentOS nodes.

First, sign up for Amazon AWS, and provide your billing information, etc.

Once you’ve done that, go to and log in.

Once there, navigate to the EC2 page as shown.


Once there, click “Launch Instance”


Click on “AMI Marketplace”. Select CentOS.


On the next screen, select xlarge size or larger. This is required to run Hadoop.


Select the number of instances (in this case, 4).  Press “Review and Launch.”


Select the amount of storage needed. If you are simply testing Hadoop, the default should be fine.


Select a tag for the instance, as shown. This will apply to all the instances, appended by a number.


Confirm the use of SSD.


Review the parameters set for the new instances.


Following this, you’ll need to create a key for AWS, unless you already have one. I’m assuming here that you don’t already have one. Once you create a new key, it will be sent to you.


Open up a shell window (if you run linux), and save the key to your computer. Copy the key, and chmod it to 400. Your .ssh directory is a good place.

$ cd ~/Downloads
$ chmod 400 my-aws-key-pair.pem
$ mv my-aws-key-pair.pem ~/.ssh/


Upon running the instances, you may see this warning message. However, it shouldn’t impact your instances. Wait for a minute, and then select EC2 once again to see the running instances.


As you can see, your instances are running, but their status is “Initializing.” It will take some time for them to be available. Five minutes or so should be enough.


Scroll over to locate the public IP addresses for each instance. You will also need the domain names as well as the internal domain name and internal IP address. All EC2 instances have two IP addresses and domain names: internal, and external. Internal IP addresses should be used from inside EC2 instances, as you will not need to pay bandwidth charges. From the outside world (like your PC/Mac), you will need to use the external addresses. You will use these when you SSH, and when you view web pages hosted by the machines from your local machine.


After selecting an instance, details will be shown in the bottom. It is here that you retrieve the internal IP addresses, internal IP addresses and the public domain name.


Now you can use your newly generated ssh key and ssh to one of your new instances. In this case, the username would be centos, although from some other instances the default name should be root or ec2-user. From the command line, it looks like this:

$ ssh -i ~/.ssh/my-aws-key-pair.pem centos@your.public.ip.address

Windows users can use putty or bitvise as an SSH client. Mac users can use command line as shown, or other Mac ssh clients.

You can see the prompt once you are in. Now that you are, you can proceed with the installation of Hadoop, or whatever other tools you prefer.


Eventually, you’ll probably terminate your instances, by right clicking and selecting terminate. Once you do, you’ll see this. Don’t do this before it’s neceessary, however.


Proceed onward to our next tutorial on installing Hadoop. (Once it becomes available).

Building Spark with Maven: the PermGen space error and Java heap space error

Spark doesn’t build properly with maven out of the box. While this is clearly stated in the Building Spark Page in the documentation, it’s easy to miss.

You’ll get an error like the following:

$ mvn clean package

[INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-2.10/classes...
[ERROR] PermGen space -> [Help 1]

[INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-2.10/classes...
[ERROR] Java heap space -> [Help 1]

Basically, this says that the PermGen space and/or Java heap space have been exceeded. While Java heap space issues are so common that I instinctively bump that up when seeing issues (i.e., -Xmx2g), the Perm space has to be set separately.

That said, the solution is pretty straightforward.

That should fix the problem and allow spark to be built properly. I also had issues building spark on my home directory, which was encrypted. This was a second issue, and I posted a workaround for it in an earlier post.

Clustering with K-Means with Spark and MLlib

MLlib provides a parallelized clustering algorithm called kmeans||, which allows us to have an efficiently parallelized algorithm on Spark.  Clustering is an unsupervised machine learning that helps us discover natural patterns in data.

What is k-means?  It’s about the simplest algorithm out there, it essentially starts with a given number (k) of centroids, and randomly iterates the centroids until they converge.  Points are then assigned to the nearest centroid, making a cluster.  It works great for data which is clusterable by circles/spheres (actually hyperspheres).  For data which has more convoluted patterns, such as rings and other shapes, we can use hierarchical clustering.

The other limitation is that we have to know k in advance to make the algorithm work.  Sometimes we know what we’re looking for, but usually, we don’t.  That means we often end up having to run clustering many times, measuring how much data we capture.

If we have a dataset we want to cluster, the first step is to convert it to a vector class.   MLlib offers two: Vectors.dense and Vectors.sparse.  The latter is very good one-hot encoding (is_red, is_blue, is_green, etc), and especially for encoding text vectors, such as tfidf or word2vec.  I’ll talk in another post about how to vectorize text.

Instead, let’s just use Vectors.dense and we can use a dataset near and dear to R users: mtcars.  It’s one of R’s standard example datasets which gives some statistics on a few different models of cars.  We can extract mtcars from R by using write.csv, and we can use the file as mtcars.csv, but we’ll remove the header row for simplicity.  Of course, it’d be silly to use Spark for such a tiny dataset (as we could easily just use R), but it serves the purpose of an example.

Great. Now we have some vectors.   We had to drop the name associated with each car and so our vectors are nameless — more on that later.

Now we need to make a KMeansModel object.   This may seem strange at first glance since in R and Mahout, there’s no model associated with K-means since there’s no training involved in an unsupervised ML algorithm.  Probably for the sake of consistency, MLlib treats Kmeans as a model that has to be “trained” with data, and then can be applied to new data using predict(), as if it was performing classification. While odd, this actually is a bonus because it easily allows us to use our clusters as a classification model for unseen data.

So clusters in this case is the KMeansModel object.  We chose a “K” value of 2, which probably isn’t going to get good results with this dataset.  How do we check that? We can use computecost()

The Spark documentation calls the cost WSSSE (Within Set Sum of Squared Errors).   Typically this should get better as k gets higher, but higher values of k may not produce very useful clusters (lots of clusters-of-one, for instance).

Intuitively, we should set k to be just before a point of inflection wherein the law of diminishing returns sets in, sometimes called the “elbow method.” But we should also look at where we start getting lots of small and meaningless clusters.

So now we have a KMeansModel set with our value of k. What does that give us? It assigns a number to each cluster (in the case of k=2, then just 0,1), but remember that we dropped the name for each vector. So we know which vector is in each cluster, but how do we relate this to the original data?  As I’ve done this exercise in Mahout, I was looking for the NamedVector class, which unfortunately doesn’t exist in Spark. The Spark team apparently doesn’t feel one is needed.

In Spark, the right way to do this is to join back the vector to the original data. To do that, we need to create a pair of names and vectors.

So that gives us our clustering results. As we said before, we can call predict() on new data that we might have, to see which cluster it would correspond to.

The new data doesn’t actually change the model, however. That’s frozen in time forever until we train a new one. There is, however, another class called StreamingKmeans, which will actually adjust clusters to new data, so we can use it in a streaming fashion. We’ll talk about that another time.

Installing Spark on encrypted file systems: Resolving the “File Name Too Long” error.

I mostly run Linux on the systems I work on. Though I prefer CentOS for server and cloud instances, due to better compatibility with Hadoop, I’m more partial to Ubuntu or especially Linux Mint for my workstation Linux, especially on my laptop.

Ubuntu and Mint both use ecryptfs ( for home folder encryption, which is optional. I use it because I don’t like the idea of my files being accessible in case I lose track of my laptop.

While encryptfs is great, it creates some complications while building spark, because there’s apparently some limitations with very long filenames.

Once I try to build spark (for example, with maven), I get an error after much maven spam:

$ mvn -DskipTests clean package
[error] uncaught exception during compilation:
[error] File name too long

One easy workaround is not to build spark in the home folder. It doesn’t need to be there. Putting it in /usr/local, /opt, or something else is probably ok. But it is true that for most, the home folder is the first place it’s going to go, and so it’s nice to have another workaround.

There is a jira on the subject here:

The solution is to edit pom.xml, and add the following lines in compile options:


For me on Spark 1.3.0, that worked out to line 1130 in the pom.xml

If you used sbt instead of maven to build, the solution per the jira is similar:

scalacOptions in Compile ++= Seq("-Xmax-classfile-name", "128"),

Welcome to the wonderful world of Machine Learning and Big Data!

I’m Timothy Fox, consultant, trainer, and enthusiast. I’m passionate about machine learning, and big data, and I happen to be so fortunate that I’m able to pursue my passions on a daily basis.

I’ve long been a fan of R and Python, which I’ve used in addition to my background in Java in solving challenging problems. Not having a stats of math background (other than that of an engineering student), I’ve come to learn how to use these tools to accomplish these tasks.

Four years ago, I discovered Hadoop, an amazing framework combining distributed data with distributed processing, neatly solving many of the most vexing problems that we learned about in university about distributed systems.

Hadoop isn’t going anywhere, but much has been built on top of that platform; so now Hadoop is more of a diverse ecosystem with many components. One of the most exciting is Spark, whose MLlib component has really excited me. Many of my blog posts in the near term are going to be about MLLib.

So that’s this blog, in a nutshell.