Régis Behmo
regisb
Big Data Platforms, ESSEC/CentraleSupelec, Nov 28 2017
| Hadoop | Spark |
|---|---|
| Java Virtual Machine (JVM) | |
| Write to disk (HDFS) | In-memory |
| Native data structures | Resilient Distributed Datasets (RDD) |
| Java (+ Hadoop streaming) | Java + Scala + Python + R |
| - | Python + Scala shell |
| Pluggable SQL (Hive) | Spark SQL (native) |
| Pluggable ML | Spark ML (native) |
$ sudo apt-get install default-jre
$ java -version
openjdk version "1.8.0_131"
OpenJDK Runtime Environment (build 1.8.0_131-8u131-b11-2ubuntu1.16.04.3-b11)
OpenJDK 64-Bit Server VM (build 25.131-b11, mixed mode)
$ sudo apt-get install python3
$ wget http://apache.crihan.fr/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
$ tar xzf spark-2.2.0-bin-hadoop2.7.tgz
$ cd ~/work/
$ wget http://classics.mit.edu/Homer/iliad.mb.txt
$ wget http://classics.mit.edu/Homer/odyssey.mb.txt
$ cd ~/work/spark-2.2.0-bin-hadoop2.7/
$ ./bin/pyspark
Python 2.7.12 (default, Nov 19 2016, 06:48:10)
>>>
$ PYSPARK_PYTHON=python3 ./bin/pyspark
Python 3.5.2 (default, Sep 14 2017, 22:51:06)
>>>$ pip install --user ipython==5.5.0
$ PYSPARK_PYTHON=ipython ./bin/pyspark
Python 2.7.12 (default, Nov 19 2016, 06:48:10)
In [1]: $ pip3 install --user ipython
$ PYSPARK_PYTHON=ipython3 ./bin/pyspark
Python 3.5.2 (default, Sep 14 2017, 22:51:06)
In [1]: >>> rdd = sc.parallelize(range(0, 10))
>>> rdd.collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> rdd.count()
10
>>> rdd.first()
0
>>> rdd.map(lambda x: x*x).collect()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> rdd = sc.textFile("../iliad.mb.txt")
>>> rdd.flatMap(lambda sentence: sentence.split())\
.map(lambda word: (word, 1))\
.reduceByKey(lambda v1, v2: v1 + v2)\
.sortBy(lambda wc: -wc[1])\
.take(10)
[('the', 9573), ('and', 6481), ('of', 5584), ('to', 3291), ('his', 2487), ('he', 2448), ('in', 2184), ('a', 1789), ('with', 1593), ('that', 1434)]
| Transformations | Actions |
|---|---|
| map, distinct, filter, reduceByKey, sortByKey, join... | reduce, collect, count, first, take... |
| Arguments: 1 or more RDD | |
| Returns: RDD | Returns: not an RDD |
| Lazy evaluation | Immediate evaluation |
| Sometimes shuffle | Shuffle necessary |
$ vim ~/work/wordcount.py
from pyspark import SparkContext
sc = SparkContext()
rdd = sc.textFile("iliad.mb.txt")
result = rdd.flatMap(lambda sentence: sentence.split())\
.map(lambda word: (word, 1))\
.reduceByKey(lambda v1, v2: v1 + v2)\
.sortBy(lambda wc: -wc[1])\
.take(10)
print(result)
$ ./spark-2.2.0-bin-hadoop2.7/bin/spark-submit ./wordcount.py
Pro tip: append this to your script
input("Access http://localhost:4040 to debug. Then press ctrl+c to exit")
$ cd ~/work/spark-2.2.0-bin-hadoop2.7/conf/
$ cp log4j.properties.template log4j.properties
$ vim log4j.properties
...
log4j.rootCategory=ERROR, console
...
from nltk.corpus import stopwords
english_stop_words = stopwords.words("english")
Régis Behmo
regisb
Big Data Platforms, ESSEC/CentraleSupelec, Nov 28 2017
https://github.com/regisb/bigdataplatforms-2017/blob/master/homework/iliad_odyssey.py
Launch one master:
./sbin/start-master.sh --host 192.168.1.M --port 7077
Launch multiple slaves:
./sbin/start-slave.sh --host 0.0.0.0 spark://192.168.1.M:7077
(prefix with SPARK_NO_DAEMONIZE=1 to launch workers in the foreground)
./bin/pyspark --master spark://192.168.1.M:7077
./bin/spark-submit --master spark://192.168.1.M:7077 myscript.py
Slave:
./sbin/start-slave.sh --host 0.0.0.0 --cores 2 --memory 512m spark://192.168.1.M:7077
Spark shell:
./bin/pyspark --master spark://192.168.1.M:7077 \
--total-executor-cores 2 --executor-memory 512m
Spark script:
./bin/spark-submit --master spark://192.168.1.M:7077 \
--total-executor-cores 2 --executor-memory 512m myscript.py
Application (optional):
from pyspark import SparkConf
conf = SparkConf()
conf.set("spark.cores.max", 2)
conf.set("spark.executor.memory", "512m")
sc = SparkContext(conf=conf)
sc.textFile("/home/student/work/iliad.mb.txt")
sc.textFile("hdfs://192.168.1.101:9000/data/iliad.mb.txt")
sc.textFile("hdfs://192.168.1.101:9000/data/blogs/raw.txt")
sc.textFile("hdfs://192.168.1.101:9000/data/blogs/raw1000.txt")
$ ./bin/hdfs dfs -fs hdfs://192.168.1.101:9000 -ls /data
drwxr-xr-x - regis supergroup 0 2017-11-18 20:44 /data/blogs
-rw-r--r-- 3 regis supergroup 691239883 2017-11-19 09:11 /data/enwik9-text
-rw-r--r-- 3 regis supergroup 808298 2017-11-18 22:28 /data/iliad.mb.txt
-rw-r--r-- 3 regis supergroup 808298 2017-11-19 14:47 /data/iliad.oneline.txt
$ ./bin/hdfs dfs -fs hdfs://192.168.1.101.1:9000 -copyToLocal /data/enwik9-text .
Launch one namenode:
./bin/hdfs namenode -fs hdfs://192.168.1.N:9000
Launch multiple datanodes:
./bin/hdfs datanode -fs hdfs://192.168.1.N:9000
Wifi SSID: "Big Data Platforms" (No password)
Obtain your IP address: ifconfig (I'm at 192.168.1.101)
Virtualbox:
hdfs://192.168.1.101:9000/data/iliad.mb.txthdfs://192.168.1.101:9000/data/blogs/raw.txthdfs://192.168.1.101:9000/data/iliad.oneline.txt)https://github.com/regisb/bigdataplatforms-2017/blob/master/homework/word2vec.py
Slides: https://regisb.github.io/bigdataplatforms-2017
OpenClassrooms course: Réalisez des calculs distribués sur des données massives (Behmo & Hudelot)