RDD using Spark: The Building Block of Apache Spark



Tento blog o RDD používající Spark vám poskytne podrobné a komplexní znalosti o RDD, který je základní jednotkou Spark & ​​Jak užitečné je.

, Samotné slovo stačí k vytvoření jiskry v mysli každého inženýra Hadoop. NA n v paměti nástroj pro zpracování což je v klastrových výpočtech bleskové. Ve srovnání s MapReduce je sdílení dat v paměti RDD 10-100x rychlejší než sdílení v síti a na disku a to vše je možné díky RDD (Resilient Distributed Data sets). Klíčové body, kterým se dnes v tomto článku věnovaném RDD pomocí Sparku věnujeme, jsou:

Potřebujete RDD?

Proč potřebujeme RDD? -RDD pomocí Sparku





Svět se vyvíjí s a Data Science z důvodu postupu v . Algoritmy na základě Regrese , , a který běží dál Distribuováno Iterativní výpočet ation móda, která zahrnuje opětovné použití a sdílení dat mezi více výpočetními jednotkami.

Tradiční techniky vyžadovaly stabilní mezilehlé a distribuované úložiště HDFS skládající se z opakujících se výpočtů s replikací dat a jejich serializací, díky nimž byl proces mnohem pomalejší. Najít řešení nebylo nikdy snadné.



datové struktury a algoritmy v java tutoriálu

To je kde RDD (Resilient Distributed Datasets) comes to a celkově.

RDD Jsou snadno použitelné a snadno se vytvářejí, protože data jsou importována ze zdrojů dat a ukládána do RDD. Dále jsou operace použity k jejich zpracování. Jsou to distribuovaná sbírka paměti s oprávněním jako Pouze ke čtení a co je nejdůležitější, jsou Tolerantní k chybám .



Jestli nějaký datový oddíl z RDD je ztracený , lze jej regenerovat použitím stejného proměna operace se ztraceným oddílem v systému Windows linie , spíše než zpracovávat všechna data od nuly. Tento druh přístupu ve scénářích v reálném čase může dělat zázraky v situacích ztráty dat nebo při výpadku systému.

Co jsou RDD?

RDD nebo ( Odolná distribuovaná sada dat ) je zásadní datová struktura ve Sparku. Termín Pružný definuje schopnost, která generuje data automaticky nebo data vrácení zpět do původní stav když dojde k neočekávané kalamitě s pravděpodobností ztráty dat.

Data zapsaná do RDD jsou rozdělené a uloženy do více spustitelných uzlů . Pokud provádějící uzel selže v době běhu, pak okamžitě získá zálohu z další spustitelný uzel . To je důvod, proč jsou RDD považovány za pokročilý typ datových struktur ve srovnání s jinými tradičními datovými strukturami. RDD mohou ukládat strukturovaná, nestrukturovaná a polostrukturovaná data.

Pojďme posunout kupředu s naším RDD pomocí blogu Spark a dozvíme se o jedinečných vlastnostech RDD, které mu dávají náskok před jinými typy datových struktur.

Vlastnosti RDD

  • V paměti (RAM) Výpočty : Koncept výpočtu v paměti posouvá zpracování dat do rychlejší a efektivnější fáze, kde je celkově výkon systému je upgradováno.
  • L jeho hodnocení : Termín Líné hodnocení říká transformace jsou aplikovány na data v RDD, ale výstup není generován. Místo toho jsou použité transformace přihlášen.
  • Vytrvalost : Výsledné RDD jsou vždy znovu použitelný.
  • Hrubozrnné operace : Uživatel může prostřednictvím použít transformace na všechny prvky v souborech dat mapa, filtr nebo skupina vytvořená operace.
  • Tolerantní k chybám : Pokud dojde ke ztrátě dat, systém může vrátit zpět k jeho původní stav pomocí přihlášeného transformace .
  • Nezměnitelnost : Data definovaná, načtená nebo vytvořená nemohou být změněno jakmile je přihlášen do systému. V případě, že potřebujete získat přístup a upravit existující RDD, musíte vytvořit nový RDD použitím sady Proměna funkce na aktuální nebo předchozí RDD.
  • Rozdělení na oddíly : Je to rozhodující jednotka paralelismu ve Sparku RDD. Ve výchozím nastavení je počet vytvořených oddílů založen na vašem zdroji dat. Můžete dokonce rozhodnout o počtu oddílů, které chcete použít vlastní oddíl funkce.

Vytvoření RDD pomocí Sparku

RDD lze vytvořit v třemi způsoby:

  1. Čtení dat z paralelní sbírky
val PCRDD = spark.sparkContext.parallelize (Array ('Mon', 'Út', 'St', 'Čt', 'Pá', 'So'), 2) val resultRDD = PCRDD.collect () resultRDD.collect ( ). foreach (println)
  1. Přihlašování proměna na předchozích RDD
val words = spark.sparkContext.parallelize (Seq ('Spark', 'is', 'a', 'very', 'powerful', 'language')) val wordpair = words.map (w = (w.charAt ( 0), w)) wordpair.collect (). Foreach (println)
  1. Čtení dat z externí úložiště nebo cesty souborů jako HDFS nebo HBase
val Sparkfile = spark.read.textFile ('/ user / edureka_566977 / spark / spark.txt.') Sparkfile.collect ()

Operace prováděné na RDD:

Na RDD se provádějí hlavně dva typy operací, a to:

  • Transformace
  • Akce

Transformace : The operace aplikujeme na RDD filtr, přístup a upravit data v nadřazeném RDD ke generování a po sobě jdoucí RDD je nazýván proměna . Nový RDD vrací ukazatel na předchozí RDD zajišťující závislost mezi nimi.

Transformace jsou Líné hodnocení, jinými slovy, operace použité na RDD, které pracujete, budou zaznamenány, ale ne popraven. Systém vyvolá výsledek nebo výjimku po spuštění Akce .

Transformace můžeme rozdělit na dva typy, jak je uvedeno níže:

  • Úzké transformace
  • Široké transformace

Úzké transformace Aplikujeme úzké transformace na a jeden oddíl rodičovského RDD ke generování nového RDD, protože data potřebná ke zpracování RDD jsou k dispozici na jednom oddílu disku mateřský ASD . Příklady úzkých transformací jsou:

  • mapa()
  • filtr()
  • flatMap ()
  • rozdělit()
  • mapPartitions ()

Široké transformace: Aplikujeme širokou transformaci více oddílů vygenerovat nový RDD. Data potřebná ke zpracování RDD jsou k dispozici na více oddílech mateřský ASD . Příklady širokých transformací jsou:

  • redukovat ()
  • svaz()

Akce : Akce pověří aplikaci Apache Spark výpočet a předat výsledek nebo výjimku zpět do RDD ovladače. Několik akcí zahrnuje:

  • sbírat()
  • počet()
  • vzít()
  • za prvé()

Prakticky aplikujme operace na RDD:

IPL (indická Premier League) je kriketový turnaj, který je na špičkové úrovni. Pojďme se tedy dnes dostat k datové sadě IPL a provést naši RDD pomocí Sparku.

  • Za prvé, stáhneme si údaje o shodě CSV s IPL. Po stažení začne vypadat jako soubor EXCEL s řádky a sloupci.

V dalším kroku zapálíme jiskru a načteme soubor match.csv z jeho umístění, v mém případě méhoCSVumístění souboru je „/User/edureka_566977/test/matches.csv“

Nyní pojďme začít s Proměna část první:

  • mapa():

Používáme Transformace mapy použít konkrétní transformační operaci na každý prvek RDD. Zde vytvoříme RDD podle názvu CKfile, kam ukládáme našeCSVsoubor. Vytvoříme další RDD zvané státy uložit podrobnosti o městě .

spark2-shell val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.collect.foreach (println) val uvádí = CKfile.map (_. split (',') (2)) States.collect (). foreach (println)

  • filtr():

Transformace filtru, samotný název popisuje jeho použití. Tuto transformační operaci používáme k odfiltrování selektivních dat ze souboru poskytnutých dat. Podáváme žádost operace filtru zde získáte záznamy o zápasech IPL roku 2017 a uložte jej do souboru RDD.

java, kdy to použít
val fil = CKfile.filter (line => line.contains ('2017')) fil.collect (). foreach (println)

  • flatMap ():

Aplikujeme flatMap je transformační operace na každý z prvků RDD k vytvoření nového RDD. Je to podobné jako u transformace mapy. zde aplikujemeFlatmapna vyplivnout zápasy města Hyderabad a ukládat data dofilRDDRDD.

val filRDD = fil.flatMap (line => line.split ('Hyderabad')). collect ()

  • rozdělit():

Každá data, která zapíšeme do RDD, jsou rozdělena do určitého počtu oddílů. Tuto transformaci používáme k nalezení počet oddílů data jsou ve skutečnosti rozdělena na.

val fil = CKfile.filter (line => line.contains ('2017')) fil.partitions.size

  • mapPartitions ():

MapPatitions považujeme za alternativu Map () apro každého() spolu. K vyhledání používáme zde mapPartitions počet řádků máme v našem souboru RDD.

val fil = CKfile.filter (line => line.contains ('2016')) fil.mapPartitions (idx => Array (idx.size) .iterator) .collect

  • removeBy ():

PoužívámeSnížitBy() zapnuto Páry klíč – hodnota . Tuto transformaci jsme použili na našiCSVsoubor najít přehrávač s nejvyšší muž zápasů .

val ManOfTheMatch = CKfile.map (_. split (',') (13)) val MOTMcount = ManOfTheMatch.map (WINcount => (WINcount, 1)) val ManOTH = MOTMcount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) ManOTH.take (10) .foreach (println)

  • svaz():

Název vysvětluje vše, používáme unijní transformaci je spojte dva RDD dohromady . Zde vytváříme dva RDD, a to fil a fil2. fil RDD obsahuje záznamy o shodách IPL z roku 2017 a fil2 RDD obsahuje záznam o shodě IPL z roku 2016.

val fil = CKfile.filter (line => line.contains ('2017')) val fil2 = CKfile.filter (line => line.contains ('2016')) val uninRDD = fil.union (fil2)

Začněme s Akce část, kde zobrazujeme skutečný výstup:

  • sbírat():

Collect je akce, kterou používáme zobrazit obsah v RDD.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.collect.foreach (println)

  • počet():

Početje akce, kterou používáme k počítání počet záznamů přítomný v RDD.Tadypomocí této operace spočítáme celkový počet záznamů v našem souboru match.csv.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.count ()

  • vzít():

Take je akční operace podobná sbírání, ale jediným rozdílem je, že může tisknout libovolné selektivní počet řádků podle požadavku uživatele. Zde použijeme následující kód pro tisk prvních deset předních zpráv.

val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.collect (). foreach (println) statecountm. take (10) .foreach (println)

  • za prvé():

First () je akční operace podobná sbírání () a take ()toslouží k tisku nejvyšší sestavy s výstupem Tady použijeme první operaci () k vyhledání maximální počet odehraných zápasů v konkrétním městě a jako výstup dostaneme Bombaj.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') stavy val = CKfile.map (_. split (',') (2)) val Scount = stavy.map (Scount => ( Scount, 1)) scala & gt val statecount = Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.first ()

Aby byl náš proces našeho učení RDD pomocí Sparku ještě zajímavější, přišel jsem se zajímavým případem použití.

RDD pomocí Spark: Pokémon Use Case

  • Za prvé, Pojďme si stáhnout soubor Pokemon.csv a načíst jej do jiskřiště stejně jako do souboru Matches.csv.
val PokemonDataRDD1 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') PokemonDataRDD1.collect (). foreach (println)

co je rozhraní značek v Javě

Pokémoni jsou ve skutečnosti k dispozici v široké škále. Najdeme několik odrůd.

  • Odebrání schématu ze souboru Pokemon.csv

Možná nebudeme potřebovat Schéma souboru Pokemon.csv. Proto jej odstraníme.

val Head = PokemonDataRDD1.first () val NoHeader = PokemonDataRDD1.filter (line =>! line.equals (Head))

  • Nalezení počtu oddíly náš soubor pokemon.csv je distribuován do.
println ('No.ofpartitions =' + NoHeader.partitions.size)

  • Vodní Pokémon

Nalezení počet vodních pokémonů

val WaterRDD = PokemonDataRDD1.filter (line => line.contains ('Water')) WaterRDD.collect (). foreach (println)

  • Požární Pokémon

Nalezení počet ohnivých pokémonů

val FireRDD = PokemonDataRDD1.filter (line => line.contains ('Fire')) FireRDD.collect (). foreach (println)

  • Můžeme také detekovat populace jiného typu pokémona pomocí funkce count
WaterRDD.count () FireRDD.count ()

  • Protože se mi hra o obranná strategie pojďme najít pokémona s maximální obrana.
val defenceList = NoHeader.map {x => x.split (',')}. mapa {x => (x (6) .toDouble)} println ('Highest_Defence:' + defenceList.max ())

  • Víme maximum hodnota obranné síly ale nevíme, o kterého pokémona jde. Pojďme tedy zjistit, co to je pokemon.
val defWithPokemonName = NoHeader.map {x => x.split (',')}. mapa {x => (x (6) .toDouble, x (1))} val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered (1) (Objednávání [Double] .reverse.on (_._ 1)) MaxDefencePokemon.foreach (println)

  • Nyní roztřídíme pokémona pomocí nejméně obrany
val minDefencePokemon = defenceList.distinct.sortBy (x => x.toDouble, true, 1) minDefencePokemon.take (5) .foreach (println)

  • Nyní se podívejme na Pokémona s méně obranná strategie.
val PokemonDataRDD2 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') val Head2 = PokemonDataRDD2.first () val NoHeader2 = PokemonDataRDD2.filter (line =>! line.equals (Head)) val defWithPokemoner2 = .map {x => x.split (',')}. mapa {x => (x (6) .toDouble, x (1))} val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered (1) (objednávání [Double ] .on (_._ 1)) MinDefencePokemon2.foreach (tisk)

Takže s tímto se dostáváme ke konci tohoto RDD pomocí článku Spark. Doufám, že jsme zažehli trochu světla na vaše znalosti o RDD, jejich vlastnostech a různých typech operací, které s nimi lze provádět.

Tento článek vychází z je navržen tak, aby vás připravil na certifikační zkoušku Cloudera Hadoop a Spark Developer (CCA175). Získáte důkladné znalosti o Apache Spark a Spark Ecosystem, které zahrnují Spark RDD, Spark SQL, Spark MLlib a Spark Streaming. Získáte komplexní znalosti o programovacím jazyce Scala, HDFS, Sqoop, Flume, Spark GraphX ​​a Messaging System, jako je Kafka.