Régis Behmo
regisb
CentraleSupelec, 6 novembre 2017
| Hadoop | Spark |
|---|---|
| Java Virtual Machine (JVM) | |
| Write to disk (HDFS) | In-memory |
| Native data structures | Resilient Distributed Datasets (RDD) |
| Java (+ Hadoop streaming) | Java + Scala + Python + R |
| - | Python + Scala shell |
| Pluggable SQL (Hive) | Spark SQL (native) |
| Pluggable ML | Spark ML (native) |
$ sudo apt-get install default-jre
$ java -version
openjdk version "1.8.0_131"
OpenJDK Runtime Environment (build 1.8.0_131-8u131-b11-2ubuntu1.16.04.3-b11)
OpenJDK 64-Bit Server VM (build 25.131-b11, mixed mode)
$ sudo apt-get install python3
$ wget http://apache.crihan.fr/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
$ tar xzf spark-2.2.0-bin-hadoop2.7.tgz
$ cd ~/work/
$ wget http://classics.mit.edu/Homer/iliad.mb.txt
$ wget http://classics.mit.edu/Homer/odyssey.mb.txt
$ cd ~/work/spark-2.2.0-bin-hadoop2.7/
$ ./bin/pyspark
Python 2.7.12 (default, Nov 19 2016, 06:48:10)
>>>
$ PYSPARK_PYTHON=python3 ./bin/pyspark
Python 3.5.2 (default, Sep 14 2017, 22:51:06)
>>>$ pip install --user ipython==5.5.0
$ PYSPARK_PYTHON=ipython ./bin/pyspark
Python 2.7.12 (default, Nov 19 2016, 06:48:10)
In [1]: $ pip3 install --user ipython
$ PYSPARK_PYTHON=ipython3 ./bin/pyspark
Python 3.5.2 (default, Sep 14 2017, 22:51:06)
In [1]: >>> rdd = sc.parallelize(range(0, 10))
>>> rdd.collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> rdd.count()
10
>>> rdd.first()
0
>>> rdd.map(lambda x: x*x).collect()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> rdd = sc.textFile("iliad.mb.txt")
>>> rdd.flatMap(lambda sentence: sentence.split())\
.map(lambda word: (word, 1))\
.reduceByKey(lambda v1, v2: v1 + v2)\
.sortBy(lambda wc: -wc[1])\
.take(10)
[('the', 9573), ('and', 6481), ('of', 5584), ('to', 3291), ('his', 2487), ('he', 2448), ('in', 2184), ('a', 1789), ('with', 1593), ('that', 1434)]
| Transformations | Actions |
|---|---|
| map, distinct, filter, reduceByKey, sortByKey, join... | reduce, collect, count, first, take... |
| Arguments: 1 or more RDD | |
| Returns: RDD | Returns: not an RDD |
| Lazy evaluation | Immediate evaluation |
| Sometimes shuffle | Shuffle necessary |
$ vim wordcount.py
from pyspark import SparkContext
sc = SparkContext()
rdd = sc.textFile("iliad.mb.txt")
result = rdd.flatMap(lambda sentence: sentence.split())\
.map(lambda word: (word, 1))\
.reduceByKey(lambda v1, v2: v1 + v2)\
.sortBy(lambda wc: -wc[1])\
.take(10)
print(result)
$ ./spark-2.2.0-bin-hadoop2.7/bin/spark-submit ./wordcount.py
Pro tip: append this to your script
input("Access http://localhost:4040 to debug. Then press ctrl+c to exit")
$ cd spark-2.2.0-bin-hadoop2.7/conf/
$ cp log4j.properties.template log4j.properties
$ vim log4j.properties
...
log4j.rootCategory=ERROR, console
...
from nltk.corpus import stopwords
english_stop_words = stopwords.words("english")
Régis Behmo
regisb
CentraleSupelec, 20 novembre 2017
https://github.com/regisb/centralesupelec-2017/blob/master/homework/iliad_odyssey.py
Launch one master:
./sbin/start-master.sh --host 192.168.1.M --port 7077
Launch multiple slaves:
./sbin/start-slave.sh --host 0.0.0.0 spark://192.168.1.M:7077
(prefix with SPARK_NO_DAEMONIZE=1 to launch workers in the foreground)
./bin/pyspark --master spark://192.168.1.M:7077
./bin/spark-submit --master spark://192.168.1.M:7077 myscript.py
Slave:
./sbin/start-slave.sh --host 0.0.0.0 --cores 2 --memory 512m spark://192.168.1.M:7077
Spark shell:
./bin/pyspark --master spark://192.168.1.M:7077 \
--total-executor-cores 2 --executor-memory 512m
Spark script:
./bin/spark-submit --master spark://192.168.1.M:7077 \
--total-executor-cores 2 --executor-memory 512m myscript.py
Application (optional):
from pyspark import SparkConf
conf = SparkConf()
conf.set("spark.cores.max", 2)
conf.set("spark.executor.memory", "512m")
sc = SparkContext(conf=conf)
sc.textFile("/home/student/work/iliad.mb.txt")
sc.textFile("hdfs://192.168.1.101:9000/data/iliad.mb.txt")
sc.textFile("hdfs://192.168.1.101:9000/data/blogs/raw.txt")
sc.textFile("hdfs://192.168.1.101:9000/data/blogs/raw1000.txt")
$ ./bin/hdfs dfs -fs hdfs://192.168.1.101:9000 -ls /data
drwxr-xr-x - regis supergroup 0 2017-11-18 20:44 /data/blogs
-rw-r--r-- 3 regis supergroup 691239883 2017-11-19 09:11 /data/enwik9-text
-rw-r--r-- 3 regis supergroup 808298 2017-11-18 22:28 /data/iliad.mb.txt
$ ./bin/hdfs dfs -fs hdfs://192.168.1.101.1:9000 -copyToLocal /data/enwik9-text .
Launch one namenode:
./bin/hdfs namenode -fs hdfs://192.168.1.N:9000
Launch multiple datanodes:
./bin/hdfs datanode -fs hdfs://192.168.1.N:9000
Wifi SSID: "Spark CentraleSupelec" (No password)
Obtain your IP address: ifconfig (I'm at 192.168.1.101)
Virtualbox:
hdfs://192.168.1.101:9000/data/iliad.mb.txthdfs://192.168.1.101:9000/data/blogs/raw.txthdfs://192.168.1.101:9000/data/iliad.oneline.txt)Régis Behmo
regisb
CentraleSupelec, 27 novembre 2017
https://github.com/regisb/centralesupelec-2017/blob/master/homework/word2vec.py
from pyspark import SparkContext
from pyspark.mllib.feature import Word2Vec
sc = SparkContext()
rdd = sc.textFile("hdfs://192.168.1.101:9000/data/iliad.oneline.txt")\
.map(lambda line: line.strip().split())\
.map(lambda words: [w.strip(",.:;'\"-?!") for w in words])\
.map(lambda words: [w for w in words if w]).filter(lambda s: s)
model = Word2Vec().setSeed(1).setVectorSize(200).fit(rdd)
vectors = model.getVectors()
def minus(vec1, vec2):
return [v1 - v2 for v1, v2 in zip(vec1, vec2)]
def plus(vec1, vec2):
return [v1 + v2 for v1, v2 in zip(vec1, vec2)]
synonyms = model.findSynonyms(plus(
minus(vectors["Priam"], vectors["Hector"]),
vectors["Achilles"]), 10)
print(list(synonyms))
(Peleus is to Achilles what Priam is to Hector)
[('Priam', 0.94304829835891724),
('Atreus', 0.88183891773223877),
('Saturn', 0.87939071655273438),
('Peleus', 0.87464809417724609),
('Telamon', 0.86552971601486206),
('Laertes', 0.8487703800201416),
('Nestor', 0.83410191535949707),
('noble', 0.81868565082550049),
('Agamemnon', 0.81820404529571533),
('Gerene', 0.81664806604385376)]
Every 1s, print most frequent words
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext()
ssc = StreamingContext(sc, 1)
ssc.checkpoint("./checkpoint")
def count_words(counts, current_count):
if current_count is None:
current_count = 0
return current_count + sum(counts)
ssc.socketTextStream("localhost", 9999)\
.flatMap(lambda line: line.split()).map(lambda word: (word, 1))\
.updateStateByKey(count_words)\
.transform(lambda rdd: rdd.sortBy(lambda wc: -wc[1]))\
.foreachRDD(lambda rdd: print(rdd.take(10)))
ssc.start()
ssc.awaitTermination()
DStream = "Discretized stream" = Sequence of RDDs
DStream = "Discretized stream" = Sequence of RDDs
Make sure checkpoint directory exists
Launch convenient TCP server: nc -lk 9999
"Every 2s, aggregate data that arrived during the past 3s"
.reduceByKeyAndWindow(func, func_inv, 3, 2)
Every 1s, print words that were most frequent during the last 5s
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext()
ssc = StreamingContext(sc, 1)
ssc.checkpoint("./checkpoint")
ssc.socketTextStream("localhost", 9999)\
.flatMap(lambda line: line.split())\
.map(lambda word: (word, 1))\
.reduceByKeyAndWindow(lambda c1, c2: c1+c2, None, 5, 1)\
.transform(lambda rdd: rdd.sortBy(lambda wc: -wc[1]))\
.foreachRDD(lambda rdd: print(rdd.take(10)))
ssc.start()
ssc.awaitTermination()
http://localhost:4040/streaming/
(data courtesy of https://developer.jcdecaux.com/)
{
"number":10120,
"name":"10120 - SALENGRO / DESCARTES",
"address":"41 AV. ROGER SALENGRO (VILLEURBANNE)",
"position":{"lat":45.7759505002626,"lng":4.87143421497628},
"banking":true,
"bonus":false,
"status":"OPEN",
"contract_name":"Lyon",
"bike_stands":15,
"available_bike_stands":6,
"available_bikes":9,
"last_update":1511735128000
}
import json
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext()
ssc = StreamingContext(sc, 5)
stream = ssc.socketTextStream("velib.behmo.com", 9999)
stations = stream.map(lambda station: json.loads(station))\
.map(lambda station: (
station['contract_name'] + ' ' + station['name'],
station['available_bikes']
))\
.pprint()
ssc.checkpoint("./checkpoint")
ssc.start()
ssc.awaitTermination()
Slides: https://regisb.github.io/centralesupelec-2017
Cours Openclassrooms : Réalisez des calculs distribués sur des données massives (Behmo & Hudelot)