Distributed Computing with Apache Spark

Régis Behmo
regisb

Big Data Platforms, ESSEC/CentraleSupelec, Nov 28 2017

Slides: https://regisb.github.io/bigdataplatforms-2017

Apache Spark

Slides: https://regisb.github.io/bigdataplatforms-2017

Hadoop Spark
Java Virtual Machine (JVM)
Write to disk (HDFS) In-memory
Native data structures Resilient Distributed Datasets (RDD)
Java (+ Hadoop streaming) Java + Scala + Python + R
- Python + Scala shell
Pluggable SQL (Hive) Spark SQL (native)
Pluggable ML Spark ML (native)

Installation

  • Java Runtime Environment (JRE)
    $ sudo apt-get install default-jre
    $ java -version
    openjdk version "1.8.0_131"
    OpenJDK Runtime Environment (build 1.8.0_131-8u131-b11-2ubuntu1.16.04.3-b11)
    OpenJDK 64-Bit Server VM (build 25.131-b11, mixed mode)
  • Python 3
    $ sudo apt-get install python3
  • Spark download (2.2.0 pre-built for Hadoop 2.7)
    $ wget http://apache.crihan.fr/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
    $ tar xzf spark-2.2.0-bin-hadoop2.7.tgz

Installation (from VM)

  • Get bigbox.7z (7.1 Gb)
  • Uncompress with 7zip
  • Import Bigbox.ovf file in Virtualbox

Data download

$ cd ~/work/
$ wget http://classics.mit.edu/Homer/iliad.mb.txt
$ wget http://classics.mit.edu/Homer/odyssey.mb.txt

Python Shell

$ cd ~/work/spark-2.2.0-bin-hadoop2.7/
$ ./bin/pyspark
Python 2.7.12 (default, Nov 19 2016, 06:48:10)
>>>

Python 3 Shell

$ PYSPARK_PYTHON=python3 ./bin/pyspark
Python 3.5.2 (default, Sep 14 2017, 22:51:06)
>>>

iPython Shell

$ pip install --user ipython==5.5.0
$ PYSPARK_PYTHON=ipython ./bin/pyspark
Python 2.7.12 (default, Nov 19 2016, 06:48:10)
In [1]: 

iPython 3 Shell

$ pip3 install --user ipython
$ PYSPARK_PYTHON=ipython3 ./bin/pyspark
Python 3.5.2 (default, Sep 14 2017, 22:51:06)
In [1]: 

Your first resilient distributed dataset (RDD)

>>> rdd = sc.parallelize(range(0, 10))
>>> rdd.collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> rdd.count()
10
>>> rdd.first()
0
>>> rdd.map(lambda x: x*x).collect()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Wordcount! \o/

>>> rdd = sc.textFile("../iliad.mb.txt")
>>> rdd.flatMap(lambda sentence: sentence.split())\
    .map(lambda word: (word, 1))\
    .reduceByKey(lambda v1, v2: v1 + v2)\
    .sortBy(lambda wc: -wc[1])\
    .take(10)
[('the', 9573), ('and', 6481), ('of', 5584), ('to', 3291), ('his', 2487), ('he', 2448), ('in', 2184), ('a', 1789), ('with', 1593), ('that', 1434)]

Documentation

https://spark.apache.org/docs/latest/rdd-programming-guide.html

RDD operations

Transformations Actions
map, distinct, filter, reduceByKey, sortByKey, join... reduce, collect, count, first, take...
Arguments: 1 or more RDD
Returns: RDD Returns: not an RDD
Lazy evaluation Immediate evaluation
Sometimes shuffle Shuffle necessary

Directed Acyclic Graph (DAG)

dag

Running a script

$ vim ~/work/wordcount.py
from pyspark import SparkContext

sc = SparkContext()
rdd = sc.textFile("iliad.mb.txt")
result = rdd.flatMap(lambda sentence: sentence.split())\
    .map(lambda word: (word, 1))\
    .reduceByKey(lambda v1, v2: v1 + v2)\
    .sortBy(lambda wc: -wc[1])\
    .take(10)

print(result)
$ ./spark-2.2.0-bin-hadoop2.7/bin/spark-submit ./wordcount.py

Advanced: Debugging with Spark UI

http://localhost:4040

Spark UI

Pro tip: append this to your script

input("Access http://localhost:4040 to debug. Then press ctrl+c to exit")

Pro tip: reduce Spark logging level

$ cd ~/work/spark-2.2.0-bin-hadoop2.7/conf/
$ cp log4j.properties.template log4j.properties
$ vim log4j.properties
...
log4j.rootCategory=ERROR, console
...

TODO (starter)

  1. Print the top 10 most frequent words with their probability of appearance
  2. Get rid of special characters (.,:!?')
  3. Identify the transformations and the actions in your script
  4. How many times are the transformations evaluated? (Hint: it depends)
  5. Can you reduce this number? (Hint: check out "persist")

TODO (intermediate)

  1. Print the top 10 words from the Iliad that have "most disappeared" in The Odyssey (Hint: you need to understand "join")
  2. Do the same by swapping the Iliad and The Odyssey
  3. Improve your script by getting rid of stopwords:
    from nltk.corpus import stopwords
    english_stop_words = stopwords.words("english")
  4. Use the Spark UI (http://localhost:4040) to make your script faster

Distributed Computing with Apache Spark

Distributed Architectures & Machine Learning

Régis Behmo
regisb

Big Data Platforms, ESSEC/CentraleSupelec, Nov 28 2017

Slides: https://regisb.github.io/bigdataplatforms-2017

Part 1 homework

https://github.com/regisb/bigdataplatforms-2017/blob/master/homework/iliad_odyssey.py

Configuration: launch Spark cluster

Launch one master:

./sbin/start-master.sh --host 192.168.1.M --port 7077

Launch multiple slaves:

./sbin/start-slave.sh --host 0.0.0.0 spark://192.168.1.M:7077

(prefix with SPARK_NO_DAEMONIZE=1 to launch workers in the foreground)

Spark shell

./bin/pyspark --master spark://192.168.1.M:7077

Launching jobs

./bin/spark-submit --master spark://192.168.1.M:7077 myscript.py

Configuration: resource allocation

Slave:

./sbin/start-slave.sh --host 0.0.0.0 --cores 2 --memory 512m spark://192.168.1.M:7077

Configuration: resource allocation

Spark shell:

./bin/pyspark --master spark://192.168.1.M:7077 \
    --total-executor-cores 2 --executor-memory 512m

Spark script:

./bin/spark-submit --master spark://192.168.1.M:7077 \
    --total-executor-cores 2 --executor-memory 512m myscript.py

Configuration: resource allocation

Application (optional):

from pyspark import SparkConf
conf = SparkConf()
conf.set("spark.cores.max", 2)
conf.set("spark.executor.memory", "512m")
sc = SparkContext(conf=conf)

Data loading

sc.textFile("/home/student/work/iliad.mb.txt")
sc.textFile("hdfs://192.168.1.101:9000/data/iliad.mb.txt")
sc.textFile("hdfs://192.168.1.101:9000/data/blogs/raw.txt")
sc.textFile("hdfs://192.168.1.101:9000/data/blogs/raw1000.txt")

Data loading

$ ./bin/hdfs dfs -fs hdfs://192.168.1.101:9000 -ls /data
drwxr-xr-x   - regis supergroup          0 2017-11-18 20:44 /data/blogs
-rw-r--r--   3 regis supergroup  691239883 2017-11-19 09:11 /data/enwik9-text
-rw-r--r--   3 regis supergroup     808298 2017-11-18 22:28 /data/iliad.mb.txt
-rw-r--r--   3 regis supergroup     808298 2017-11-19 14:47 /data/iliad.oneline.txt
$ ./bin/hdfs dfs -fs hdfs://192.168.1.101.1:9000 -copyToLocal /data/enwik9-text .

Spark cluster

HDFS cluster

Configuration: HDFS

Launch one namenode:

./bin/hdfs namenode -fs hdfs://192.168.1.N:9000

Launch multiple datanodes:

./bin/hdfs datanode -fs hdfs://192.168.1.N:9000

Local configuration

Wifi SSID: "Big Data Platforms" (No password)

Obtain your IP address: ifconfig (I'm at 192.168.1.101)

Virtualbox:

  • Adjust your RAM/CPU
  • Check network settings: "NAT" → "Advanced" → "Cable connected"

DEBUG

TODO (warmup)

  1. Create a cluster of 4-6 Spark nodes
  2. If possible, launch a couple HDFS datanodes
  3. Launch wordcount.py on hdfs://192.168.1.101:9000/data/iliad.mb.txt
  4. Launch wordcount.py on hdfs://192.168.1.101:9000/data/blogs/raw.txt
  5. Launch two jobs at the same time. Make them run at the same time (Hint: check resource allocation)
  6. What happens when a Spark node is brutally shutdown?

TODO (intermediate)

  1. What is Word2Vec?
  2. Create a Word2Vec model of the Iliad (Hint: it's better to have each paragraph on a single line, see hdfs://192.168.1.101:9000/data/iliad.oneline.txt)
  3. Who is Achilles + (Priam - Hector)?

Solution

https://github.com/regisb/bigdataplatforms-2017/blob/master/homework/word2vec.py

Questions?

Slides: https://regisb.github.io/bigdataplatforms-2017

Going further (in French)

OpenClassrooms course: Réalisez des calculs distribués sur des données massives (Behmo & Hudelot)

Videos: https://dataarchitect.minutebutterfly.com/