Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Un spotify à la maison avec Spark et Cassandra

Un spotify à la maison avec Spark et Cassandra

Hands-on Lab présenté à Devoxx 2015 par Duy Hai Doan (doanduyhai) et Alexis Seigneurin (@aseigneurin)

Alexis Seigneurin

April 10, 2015
Tweet

More Decks by Alexis Seigneurin

Other Decks in Technology

Transcript

  1. @doanduyhai et @aseigneurin #SparkCassandra Contenu  de  la  clé  USB  

    •  Machine virtuelle •  Contient Cassandra + une appli à importer dans VirtualBox •  Un repository Maven pour Java 8 (backup du votre avant!) •  m2repo.(zip|tgz) à décompresser pour remplacer ~/.m2 •  Un repo Ivy2 pour Scala (backup du votre avant!) •  ivy2cache.(zip|tgz) à décompresser pour remplacer ~/.ivy2 •  Un projet HomeSpotifiy-Scala/HomeSpotifiy-Java •  À importer dans Eclipse ou IntelliJ IDEA
  2. @doanduyhai et @aseigneurin #SparkCassandra Le  plan   •  L'essentiel sur

    Cassandra •  L'essentiel sur Spark •  Le connecteur Spark – Cassandra •  Déroulement du Hand's On
  3. @doanduyhai et @aseigneurin #SparkCassandra Cassandra   •  Base de données

    NoSQL distribuée •  Cluster sur commodity hardware •  Abstraction de table distribuée
  4. @doanduyhai et @aseigneurin #SparkCassandra Distribu'on   •  Topologie : Ring,

    pas de Single Point Of Failure •  Répartition par hash de la partition (clé primaire) •  Hashage : Murmur3 •  Données distribuées et répliquées n1 n2 n3 n4 n5 n6 n7 n8
  5. @doanduyhai et @aseigneurin #SparkCassandra Répar''on  des  données  par  token  

    •  Pour chaque requête, fournir une #partition •  Cassandra applique le hash Murmur3 à
  6. @doanduyhai et @aseigneurin #SparkCassandra Cassandra  Query  Language  (CQL)   INSERT

    INTO users(login, name, age) VALUES(‘jdoe’, ‘John DOE’, 33); UPDATE users SET age = 34 WHERE login = ‘jdoe’; DELETE age FROM users WHERE login = ‘jdoe’; SELECT age FROM users WHERE login = ‘jdoe’;
  7. @doanduyhai et @aseigneurin #SparkCassandra Spark   •  Traitement distribué de

    données •  Cluster sur commodity hardware •  Écrit en Scala, bindings Java et Python
  8. @doanduyhai et @aseigneurin #SparkCassandra No'on  de  RDD   •  Resilient

    Distributed Dataset •  Collection d'objets (types primitifs ou personnalisés) •  Partitions → traitement parallèle •  Tolérant à la panne
  9. @doanduyhai et @aseigneurin #SparkCassandra Manipula'on  de  RDD   •  Création

    depuis : •  Fichier (local ou HDFS) •  Source NoSQL (Cassandra…) •  … •  Transformations : •  Retournent un nouveau RDD •  Actions finales : •  foreach(), collect(), … •  Envoi vers base NoSQL
  10. @doanduyhai et @aseigneurin #SparkCassandra Spark  en  pra'que   •  Création

    d'une configuration •  SparkConf •  Création d'un contexte Spark •  SparkContext / JavaSparkContext •  Création d'un RDD •  RDD / JavaRDD/JavaPairRDD
  11. @doanduyhai et @aseigneurin #SparkCassandra Spark  en  pra'que   val conf

    = new SparkConf() .setAppName("myapp") .setMaster("local[4]") val sc = new SparkContext(conf) val data = Array(1, 2, 3, 4, 5) val rdd = sc.parallelize(data) SparkConf conf = new SparkConf() .setAppName("myapp") .setMaster("local[4]"); JavaSparkContext sc = new JavaSparkContext(conf); List<Integer> data = Arrays.asList(1, 2, 3, 4, 5); JavaRDD<Integer> rdd = sc.parallelize(data); Scala Java
  12. @doanduyhai et @aseigneurin #SparkCassandra Transforma'ons  u'les   map(x -> …)

    •  Applique une fonction sur chaque élément mapToPair(x -> new Tuple2<T1,T2>(…,…)) •  Applique une fonction, retourne un tuple filter(x -> true/false) •  Sélectionne des éléments reduceByKey((x,y) -> …) •  Combine les valeurs de tuples ayant les mêmes clés, 2 à 2
  13. @doanduyhai et @aseigneurin #SparkCassandra Transforma'ons  u'les   sortBy(MyClass::getValue, true/false, 1)

    •  Trie les éléments take(n) •  Conserve n éléments foreach(x -> System.out.println(x)) •  Itère sur les élements (pratique pour le debug) join(unAutreRDD) •  Jointure entre deux RDD : •  Sur les clés : nécessite des RDD de tuples clé/valeur •  Valeur obtenue : tuple contenant la valeur de "gauche" et la valeur de "droite"
  14. @doanduyhai et @aseigneurin #SparkCassandra Spark  et  Cassandra   •  Stockage

    + traitement distribués •  Haute disponibilité des clusters •  Multi data-centers •  Permet des opérations "complexes" •  JOIN, GROUP BY… •  Traitements batch
  15. @doanduyhai et @aseigneurin #SparkCassandra Connecteur  Spark-­‐Cassandra   •  Développé par

    DataStax, Open Source •  https://github.com/datastax/spark-cassandra-connector •  Utilisable en Java ou Scala •  Lecture de données Cassandra → RDD Spark •  Lecture d'une table entière ou requête CQL •  RDD Spark → Écriture dans Cassandra
  16. @doanduyhai et @aseigneurin #SparkCassandra Connecteur  Spark-­‐Cassandra   •  Data locality

    •  En lecture •  En écriture : répartition dans Spark avant écriture •  Jointure: répartition dans Spark avant une jointure •  Push predicates •  Projections (select) •  Restrictions (where) •  Mapping de records en objets
  17. @doanduyhai et @aseigneurin #SparkCassandra C* SparkM SparkW C* SparkW  

    C* SparkW   C* SparkW   C* SparkW   Data  locality   Spark partition RDD Cassandra tokens ranges
  18. @doanduyhai et @aseigneurin #SparkCassandra Ini'alisa'on   val conf = new

    SparkConf(true) .set("spark.cassandra.connection.host", "192.168.51.10") SparkConf conf = new SparkConf(true) .set("spark.cassandra.connection.host", "192.168.51.10") Scala Java
  19. @doanduyhai et @aseigneurin #SparkCassandra Lecture  d'une  table   import com.datastax.spark.connector._

    import com.datastax.spark.connector.rdd.CassandraRDD val rows:CassandraRDD[CassandraRow] = sc.cassandraTable(keyspace, table) rows.map(row => row.getString("fieldname")) import com.datastax.spark.connector.japi.CassandraRow; import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions; JavaRDD<CassandraRow> rows = javaFunctions(sc).cassandraTable(keyspace, table); rows.map(row -> row.getString("fieldname")) Scala Java
  20. @doanduyhai et @aseigneurin #SparkCassandra Projec'ons  /  Restric'ons   val rows:CassandraRDD[CassandraRow]

    = sc.cassandraTable("test", "cars") .select("id", "model") .where("color = ?", "black") JavaRDD<CassandraRow> rows = javaFunctions(sc).cassandraTable("test", "cars") .select("id", "model") .where("color = ?", "black") Scala Java
  21. @doanduyhai et @aseigneurin #SparkCassandra Requête  CQL   import org.apache.spark.sql.cassandra.CassandraSQLContext val

    sc: SparkContext = ... val cc = new CassandraSQLContext(sc) val rdd: SchemaRDD = cc.sql("SELECT * from keyspace.table WHERE ...") import org.apache.spark.sql.cassandra.CassandraSQLContext SparkContext sc = ... CassandraSQLContext cc = new CassandraSQLContext(sc) SchemaRDD rdd = cc.sql("SELECT * from keyspace.table WHERE ...") Scala Java
  22. @doanduyhai et @aseigneurin #SparkCassandra Mapping  records-­‐objets   case class WordCount(word:

    String, count: Int) sc.cassandraTable[WordCount]("test", "words") import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapRowTo; class WordCount {…} sc.cassandraTable("test", "words", mapRowTo(WordCount.class)) Scala Java
  23. @doanduyhai et @aseigneurin #SparkCassandra Écriture  dans  Cassandra   rdd.saveToCassandra(keyspace, table)

    rdd.saveToCassandra("test", "words", SomeColumns("word", "count" as "num")) import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow; javaFunctions(rdd).writerBuilder(keyspace, table, mapToRow(MyType.class)) .saveToCassandra(); Scala Java
  24. @doanduyhai et @aseigneurin #SparkCassandra La  machine  virtuelle   •  Lancer

    la VM •  192.168.51.10 •  Se logguer dans la VM (terminal ou SSH) •  vagrant/vagrant •  Cassandra •  192.168.51.10:9042 •  > cqlsh
  25. @doanduyhai et @aseigneurin #SparkCassandra Les  données   •  Données pré-chargées

    dans Cassandra •  Données de musique •  Artistes •  Albums
  26. @doanduyhai et @aseigneurin #SparkCassandra L'applica'on   •  Accessible depuis votre

    machine •  http://192.168.51.10:9000/ •  5 exercices •  Instructions : données à lire… •  Visualisation des résultats •  À coder en Java ou Scala
  27. @doanduyhai et @aseigneurin #SparkCassandra Pour  coder   •  Projet HomeSpotify-Scala

    ou HomeSpotify-Java •  À importer dans Eclipse ou IntelliJ IDEA •  Contient les stubs des exercices •  Branches •  > git checkout <branch> •  scala •  scala_solution •  java •  java_solution