15. Juni 2015

Apache Spark auf einem Amazon Elastic MapReduce Cluster ausführen

Share

Spark ist ein Apache Projekt, um große Datenmengen in einem Cluster performant verarbeiten zu können und erfreut sich momentan großer Beliebtheit. In diesem Beitrag geht es darum wie man Spark auf einem Elastic MapReduce Cluster in der Amazon Cloud zum Laufen bekommt und was man dabei beachten muss. Es gibt hier eine offizielle Anleitung, an die ich mich hauptsächlich gehalten habe.
Reicht ein einzelner (leistungsstarker) Rechner nicht mehr aus, da die Menge an Daten so groß ist, dass man nicht in akzeptabler Zeit ein Ergebnis erhält, so muss man mehrere Rechner in einem Cluster zusammen schließen um die Daten zu verarbeiten. Hierfür ist Spark gemacht: es unterstützt die Verarbeitung von Daten in einem Cluster und dies möglichst performant, indem so viele Daten wie möglich im Arbeitsspeicher gehalten werden. Ein Cluster kann aus einigen wenigen, bis zu mehreren tausend Rechnern bestehen.
Möchte ein Unternehmen ein Cluster aufsetzen um seine Daten zu verarbeiten, so ist es einerseits kostenspielig die Hardware für ein Cluster im eigenen Rechenzentrum bereitzustellen und zu betreiben und andererseits braucht man Mitarbeiter, die die nötigen Erfahrungen für den Betrieb mitbringen. Von daher liegt der Gedanke nahe sich ein Cluster zu mieten. Amazon Web Services (AWS) bieten die Möglichkeit sich problemlos ein Cluster mit dem Service Elastic MapReduce (EMR) aufzusetzen. Dies hat den weiteren Vorteil, dass man nur für die tatsächlich Benutzungszeit zahlt. Wird das Cluster nicht mehr verwendet so kann es wieder heruntergefahren werden um Kosten zu sparen und zu einem anderen Zeitpunkt wieder gestartet werden.

Vorbereitungen

Als erstes muss man sich Spark lokal installieren, ich habe die Version 1.3.1 mit Unterstützung für Hadoop ab Version 2.6 genommen. Alle Versionen sind hier verfügbar: http://spark.apache.org/downloads.html . Während des Schreibens dieses Blogbeitrags ist die Version 1.4.0 neu erschienen.
Damit alles problemlos funktioniert, benötigt man lokal Scala in der Version 2.10.* . Möchte man Scala Version 2.11.* verwenden, so muss man sich Spark selber bauen und auf einige Features verzichten. Weiterhin benötigt man Python, in diesem Tutorial habe ich die Version 2.7.6 verwendet.
Ist Spark entpackt, so befindet sich im Quellordner ein Verzeichnis ec2 und dort liegt ein Python-Skript mit dem Namen spark-ec2. Als erstes sollte man probieren ob das Skript ausführbar ist:
Tipp: Bei der ersten Ausführung legt das Skript ein Verzeichnis lib an, man sollte also Schreibrechte in dem Ordner besitzen.
~/bin/spark/ec2$ ./spark-ec2 --help
Um sich mit Amazon Web Services (AWS) zu verbinden benötigt man selbstverständlich ein Konto das man sich hier anlegen kann: https://aws-portal.amazon.com/gp/aws/developer/registration/index.html
Auf der AWS Console (erreichbar unter http://aws.amazon.com/console/) muss man sich erstmal ein Key Pair erstellen. Dazu geht man auf den Service EC2 und wählt dort links unten Key Pairs aus. Nun kann man sich ein neues Key-Pair erzeugen, im folgenden habe ich den Namen aws genommen. Danach sollte man sich die Datei aws.pem runterladen und lokal ablegen, z.B. unter ~/.ssh/
Die Rechte für die Datei sollten angepasst werden damit ssh zufrieden ist:
~$ chmod 600 .ssh/aws.pem
Weiterhin benötigt man seinen AWS Access Key, dies ist der Schlüssel um auf die Dienste von AWS von Programmen aus zuzugreifen. Diesen findet man rechts oben in dem Menü auf der AWS Console unter Security Credentials. Es wird zwar von Amazon empfohlen Identity and Access Management (IAM) zu verwenden, der Einfachheit halber habe ich aber im Folgenden den Root Access Key meines Kontos genommen. Den Root Access Key zu verwenden ist in der Regel unsicher, da er uneingeschränkten Zugriff auf alle Dienste von AWS bietet. Unter dem Punkt Access Keys lässt sich dieser erstellen.
Diese müssen lokal verfügbar gemacht werden:
~$ export AWS_ACCESS_KEY_ID=XXXXXXXXXXXXXXX
~$ export AWS_SECRET_ACCESS_KEY=YYYYYYYYYYYYYYYY

Nun kann es losgehen

Um ein Cluster zu starten, führt man nun folgenden Befehl aus:
~/bin/spark/ec2$ ./spark-ec2 --key-pair=aws --identity-file=~/.ssh/aws.pem --slaves=3 --region=eu-west-1 --instance-type=m3.large --spark-version=1.3.1 --no-ganglia --copy-aws-credentials launch SparkCluster
Im Folgenden erkläre ich die Kommandozeilen-Parameter:

  • --key-pair : Name des Key-Pairs, das wir vorher erzeugt haben
  • --identity-file : Pfad zu der Datei wo der Schlüssel liegt
  • --slaves : Anzahl der Slaves, die in dem Cluster enthalten sein sollen. Zusätzlich zu den Slaves wird auch immer ein Master gestartet. Die maximale Anzahl von EC2 Instanzen ist per Grundeinstellung limitiert, dieses Limit kann aber auf Anfrage heraufgesetzt werden.
  • --region : die Region, wo das Cluster laufen soll. Es ist sinnvoll die selbe Region zu nehmen, wo auch die Daten liegen, die man verarbeiten möchte, damit diese nicht zwischen den AWS Regionen (also z.B. zwischen Dublin und Frankfurt) hin- und herübertragen werden müssen. Dies wäre langsam und würde außerdem zusätzlich Kosten verursachen.
  • --instance-type : definiert den Typ der EC2 Instanz (siehe http://aws.amazon.com/de/ec2/instance-types/). Spark benötigt eine Instanz mit 64-Bit Prozessor.
  • --spark-version : die Version von Spark. Mit --spark-git-repo kann sogar auch eine Referenz auf ein Git Repository angegeben werden von dem Spark geladen wird.
  • --no-ganglia : gibt an, dass wir kein Ganglia Monitoring Server brauchen (bei mir gab es auch Probleme diesen zu starten)
  • --copy-aws-credentials : kopiert AWS_ACCESS_KEY_ID und AWS_SECRET_ACCESS_KEY auf den Server, so dass diese nicht erneut eingegeben werden müssen, wenn man auf S3 zugreift.

Das Starten eines Clusters kann einige Minuten dauern, aber danach sollte man eine Nachricht bekommen, dass das Starten erfolgreich war und man bekommt die URL des Spark Master Knoten:
Spark standalone cluster started at http://ec2-54-77-180-252.eu-west-1.compute.amazonaws.com:8080
Done!
Ruft man die URL im Browser auf, sieht man das Spark-Dashboard des Master Knoten. Es sollten ebenfalls die 3 Worker laufen, welche (in meinem Beispiel) 6 Cores und 18.2 GB an Arbeitsspeicher zusammen bereitstellen. Worker sind das selbe wie Slaves und diese besitzen ebenfalls ein Dashboard auf das man über die angegebene URL zugreifen kann. Um die Architektur eines Spark Clusters zu verstehen lohnt ein Blick auf http://spark.apache.org/docs/1.3.1/cluster-overview.html
Als nächstes sollte man sich per ssh mit dem Master Knoten verbinden:
~/bin/spark/ec2$ ./spark-ec2 -k EMR -i ~/.ssh/EMR.pem --region=eu-west-1 login SparkCluster
War dies erfolgreich, so erhält man den Kommandozeilenprompt des Masters:
root@ip-172-31-21-234 ~]$
Wenn man von der Spark-Shell nicht mit Log-Nachrichten überschüttet werden möchte, sollte man log4j entsprechend konfigurieren:
root@ip-172-31-21-234 ~]$ mv spark/conf/log4j.properties.template spark/conf/log4j.properties
root@ip-172-31-21-234 ~]$ vim spark/conf/log4j.properties

In der Datei sollte man das Log-Level von log4j.rootCategory auf WARN setzen.
Jetzt kann man die Spark-Shell starten und es erscheint ein scala-Prompt:
root@ip-172-31-21-234 ~]$ spark/bin/spark-shell --driver-memory 4G
Spark selber ist in Scala geschrieben und dementsprechend funktioniert die Spark-Shell mit Scala-Ausdrücken.
Geht man auf das Spark-Dashboard sollte man nun unter Running Applications die Spark-Shell sehen. Klickt man auf den Link kommt man auf das Dashboard für die Spark-Shell. Dort ist ein weiterer Link mit dem Namen Application Details UI . Diese Seite ist für die Überwachung der Ausführung von Jobs interessant, da man hier z.B. sieht wie weit der Fortschritt der Ausführung eines Jobs ist.

Verarbeiten von Daten in der Spark-Shell

Nun wo wir alles aufgesetzt haben können wir loslegen. Um einige Testdaten zu haben, habe ich im Vorraus bereits Daten in einen S3 Bucket abgelegt. Als Daten habe ich alle Events von GitHub für den Monat Mai 2015 genommen die auf der Seite https://www.githubarchive.org/ frei verfügbar sind. Diese liegen als 745 gezippte JSON-Dateien vor und enthalten pro Zeile ein Event (ca. 5,6 GB). Als Beispiel für diesen Artikel möchte ich gerne herausfinden welche Programmiersprache im Monat Mai am beliebtesten war.

Als erstes lade ich die Dateien aus dem S3 Bucket, dies ist genauso einfach wie das Einlesen von lokalen Dateien, vorausgesetzt man hat die Rechte auf den S3 Bucket zuzugreifen:
scala> val events = sc.textFile("s3n://de.leanovate.github/2015-05/*.gz")
events ist ein Resilient Distributed Dataset (RDD). Spark verwendet diesen Datentyp für verteilte Berechnungen. Diesen kann man sich in etwa wie eine Collection vorstellen, die auf mehrere Rechner verteilt ist.
Einige Events enthalten die verwendete Programmiersprache des Repositories für das das Event erzeugt wurde. Leider wird die Programmiersprache nicht bei allen Events wirklich gesetzt. Von daher verwende ich einen Regulären Ausdruck um alle Stellen herauszufinden wo der Schlüssel language gesetzt ist:
scala> val regex = """\"language\"\s*:\s*\"(.*?)\"""".r
Danach transformiere ich das RDD eventsin ein neues RDD, das jede Angabe der Programmiersprache als String enthält:
scala> val langs = events.flatMap(line => regex.findAllMatchIn(line)).map(_.group(1))
Es gibt noch andere Möglichkeiten JSON mit Spark einzulesen. Es gibt z.B. die Erweiterung Spark SQL. Diese unterstützt das Parsen von JSON indem es alle angegebenen JSON-Dateien einliest und daraus ein Schema generiert. Danach kann man mit SQL-Befehlen auf die Dateien zugreifen und die gewünschten Informationen extrahieren. Dies erwies sich aber sehr langsam. Das Generieren des Schemas alleine dauerte in etwa 20 Minuten. Eine Abfrage dauerte auch in etwa 20 Minuten.
Nun muss man nur noch zählen wie häufig welche Programmiersprache in dem RDD vorkommt. Dazu benutze ich die hilfreiche Funktion countByValue und sortiere das Ergebnis danach absteigend:
scala> val result = langs.countByValue().toSeq.sortBy(-_._2)
Und hier die 20 beliebtesten Sprachen für den Monat Mai 2015. Man sieht dass es nicht nur Programmiersprachen sind sondern auch andere Sprachen wie z.B. HTML und CSS:
scala> result.take(20).foreach(println)
(JavaScript,431782)
(Python,293238)
(Java,287992)
(Ruby,262020)
(PHP,174248)
(HTML,138719)
(C++,131767)
(CSS,84700)
(Go,84615)
(C#,79710)
(C,77004)
(Shell,42803)
(Scala,39546)
(Objective-C,31055)
(CoffeeScript,20651)
(Rust,17801)
(Perl,14384)
(TypeScript,12212)
(DM,10978)
(Swift,10012)

JavaScript ist der deutliche Sieger und Scala hat es immerhin auf Platz 13 geschafft. Die Ausführung des Jobs dauerte 2,3 Minuten.
Man sollte natürlich nicht vergessen das Cluster nachdem man es nicht mehr benötigt herunterzufahren um nicht unnötig Gebühren zu zahlen:
~/bin/spark/ec2$ ./spark-ec2 -k EMR -i ~/.ssh/EMR.pem --region=eu-west-1 destroy SparkCluster
Um wirklich sicher zu gehen, dass keine EC2-Instanzen mehr am laufen sind, lohnt ein Blick auf die AWS Console.

Weitere Möglichkeiten

Natürlich ist man nicht nur auf die Spark-Shell begrenzt. Es lassen sich auch vorpaketierte Spark jar-Dateien unkompliziert hochladen und ausführen. Es existieren einige Tools auf dem Master-Knoten um Standardaufgaben zu erledigen, wie z.B. das Kopieren der Spark-Konfiguration auf jeden Slave.
Weiterhin hat man die Möglichkeite das Hadoop Distributed File System (HDFS) zu verwenden. HDFS wird schon automatisch beim Start des Clusters mit aufgesetzt. Es gibt die Auswahl zwischen ephemeral-hdfs (kurzlebig) und persistent-hdfs. Wählt man peristent, so hat man die Möglichkeit die Daten zwischen zwei Cluster Neustarts auf Amazon EBS Volumes zu sichern.

Fazit

Möchte man Spark auf einem Cluster laufen lassen und hat keine eigenen Rechnerresourcen zur Verfügung oder die nötige Erfahrung ein Cluster selbst aufzusetzen, so kann es sinnvoll sein, sich bei AWS ein Cluster zu mieten. Das Aufsetzen ist wesentlich unkomplizierter als, der Aufbau und Betrieb eines eigenen Clusters. Die Kosten lassen sich zudem gut abschätzen.

Share

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert

Hiermit akzeptiere ich die Datenschutzbedingungen.

Rufen Sie uns an: 030 – 555 74 70 0

Made with 
in Berlin. 
© leanovate 2024