Kumulativní stavová transformace v Apache Spark Streaming



Tento blogový příspěvek pojednává o stavových transformacích ve Spark Streaming. Naučte se vše o kumulativním sledování a dovednostech pro kariéru Hadoop Spark.

Přispěl Prithviraj Bose

V mém předchozím blogu jsem diskutoval o stavových transformacích pomocí konceptu oken Apache Spark Streaming. Můžete si to přečíst tady .





V tomto příspěvku budu diskutovat o kumulativních stavových operacích v Apache Spark Streaming. Pokud jste ve Spark Streamingu noví, důrazně vám doporučuji přečíst si můj předchozí blog, abyste pochopili, jak funguje okno.

Druhy stavové transformace ve streamování jisker (pokračování ...)

> Kumulativní sledování

Použili jsme reduceByKeyAndWindow (…) API pro sledování stavů klíčů, ale okna představují omezení pro určité případy použití. Co když chceme hromadit stavy klíčů, místo abychom je omezovali na časové okno? V takovém případě bychom museli použít updateStateByKey (…) OHEŇ.



aplikace analýzy velkých dat

Toto API bylo představeno ve verzi Spark 1.3.0 a je velmi populární. Toto API má však určitou režii výkonu, jeho výkon se snižuje s tím, jak se velikost stavů časem zvyšuje. Napsal jsem ukázku, abych ukázal použití tohoto API. Kód najdete tady .

Spark 1.6.0 představil nové API mapWithState (…) který řeší režijní náklady spojené s updateStateByKey (…) . V tomto blogu budu diskutovat o tomto konkrétním API pomocí ukázkového programu, který jsem napsal. Kód najdete tady .

Než se ponořím do průchodu kódem, ušetřím pár slov o kontrolním bodu. U každé stavové transformace je kontrolní bod povinný. Kontrolní bodování je mechanismus k obnovení stavu klíčů v případě selhání ovladače. Když se ovladač restartuje, stav klíčů se obnoví ze souborů kontrolních bodů. Umístění kontrolních bodů jsou obvykle HDFS nebo Amazon S3 nebo jakékoli spolehlivé úložiště. Při testování kódu lze také ukládat v místním systému souborů.



V ukázkovém programu posloucháme soketový textový stream na hostiteli = localhost a portu = 9999. Tokenizuje příchozí stream do (slova, počet výskytů) a sleduje počet slov pomocí rozhraní 1.6.0 API mapWithState (…) . Kromě toho jsou klíče bez aktualizací odstraněny pomocí StateSpec.timeout API. Kontrolujeme body v HDFS a frekvence kontrolních bodů je každých 20 sekund.

Nejprve vytvořme relaci Spark Streaming,

Spark-streaming-session

Vytvoříme a checkpointDir v HDFS a poté zavolejte metodu objektu getOrCreate (…) . The getOrCreate API kontroluje checkpointDir abyste zjistili, zda existují nějaké předchozí stavy k obnovení, pokud existují, pak znovu vytvoří relaci Spark Streaming a aktualizuje stavy klíčů z dat uložených v souborech před přechodem na nová data. Jinak vytvoří novou relaci Spark Streaming.

The getOrCreate vezme název adresáře kontrolního bodu a funkci (kterou jsme pojmenovali createFunc ) jehož podpis by měl být () => StreamingContext .

Podívejme se na kód uvnitř createFunc .

Řádek 2: Vytvoříme streamovací kontext s názvem úlohy na „TestMapWithStateJob“ a dávkovým intervalem = 5 sekund.

Řádek č. 5: Nastavte adresář kontrolního bodu.

tabulka, jak vytvořit sadu

Řádek 8: Nastavte specifikaci stavu pomocí třídy org.apache.streaming.StateSpec objekt. Nejprve nastavíme funkci, která bude sledovat stav, poté nastavíme počet oddílů pro výsledné DStreams, které mají být generovány během následných transformací. Nakonec jsme nastavili časový limit (na 30 sekund), kde pokud není přijata žádná aktualizace klíče do 30 sekund, bude stav klíče odstraněn.

Řádek 12 #: Nastavení proudu soketu, zploštění příchozích dávkových dat, vytvoření páru klíč – hodnota, volání mapWithState , nastavte interval kontrolního bodu na 20 s a nakonec vytiskněte výsledky.

Rámec Spark volá th e createFunc pro každý klíč s předchozí hodnotou a aktuálním stavem. Vypočítáme součet a aktualizujeme stav kumulativním součtem a nakonec vrátíme součet pro klíč.

jak používat spyder python

Zdroje Github -> TestMapStateWithKey.scala , TestUpdateStateByKey.scala

Máte na nás dotaz? Uveďte to prosím v sekci komentáře a my se vám ozveme.

Související příspěvky:

Začínáme s Apache Spark & ​​Scala

Stavové transformace s vytvářením oken ve streamování jisker