DBInputFormat pro přenos dat z SQL do NoSQL databáze

Cílem tohoto blogu je naučit se, jak přenášet data z databází SQL do HDFS, jak přenášet data z databází SQL do databází NoSQL.

V tomto blogu prozkoumáme možnosti a možnosti jedné z nejdůležitějších součástí technologie Hadoop, tj. MapReduce.

Společnosti dnes přijímají rámec Hadoop jako svou první volbu pro ukládání dat kvůli jeho schopnostem efektivně zpracovávat velká data. Ale víme také, že data jsou univerzální a existují v různých strukturách a formátech. Aby bylo možné ovládat tak obrovskou škálu dat a jejich různých formátů, měl by existovat mechanismus, který by vyhověl všem odrůdám, a přesto přinesl efektivní a konzistentní výsledek.





Nejvýkonnější komponentou v rámci Hadoop je MapReduce, který může poskytnout kontrolu nad daty a jejich strukturou lépe než ostatní protějšky. I když to vyžaduje režii křivky učení a složitost programování, pokud zvládnete tyto složitosti, určitě zvládnete jakýkoli druh dat s Hadoop.

Rámec MapReduce rozděluje všechny své zpracovatelské úkoly na v zásadě dvě fáze: Map a Reduce.



Příprava nezpracovaných dat pro tyto fáze vyžaduje pochopení některých základních tříd a rozhraní. Super třída pro tyto přepracování je InputFormat.

The InputFormat class je jednou z hlavních tříd v rozhraní Hadoop MapReduce API. Tato třída je zodpovědná za definování dvou hlavních věcí:

  • Rozdělení dat
  • Čtečka záznamů

Rozdělení dat je základní koncept v rámci Hadoop MapReduce, který definuje jak velikost jednotlivých mapových úkolů, tak jeho potenciální exekuční server. The Čtečka záznamů je zodpovědný za skutečné čtení záznamů ze vstupního souboru a jejich odeslání (jako páry klíč / hodnota) do mapovače.



O počtu mapovačů se rozhoduje na základě počtu rozdělení. Úkolem InputFormatu je vytvořit rozdělení. Většina z časového rozdělení je ekvivalentní velikosti bloku, ale ne vždy se rozdělení vytvoří na základě velikosti bloku HDFS. Zcela záleží na tom, jak byla přepsána metoda getSplits () vašeho InputFormat.

Mezi MR splitem a HDFS blokem je zásadní rozdíl. Blok je fyzický blok dat, zatímco rozdělení je jen logický blok, který čte mapovač. Rozdělení neobsahuje vstupní data, obsahuje pouze odkaz nebo adresu dat. Rozdělení má v zásadě dvě věci: délku v bajtech a sadu umístění úložiště, což jsou pouze řetězce.

Abychom tomu lépe porozuměli, vezměme si jeden příklad: Zpracování dat uložených ve vaší MySQL pomocí MR. Protože v tomto případě neexistuje koncept bloků, teorie: „rozdělení se vždy vytváří na základě bloku HDFS“,selže. Jednou z možností je vytvořit rozdělení na základě rozsahů řádků v tabulce MySQL (a to dělá DBInputFormat, vstupní formát pro čtení dat z relačních databází). Můžeme mít k počet dělení skládajících se z n řádků.

Pouze pro InputFormats založené na FileInputFormat (InputFormat pro zpracování dat uložených v souborech) se rozdělení vytvoří na základě celkové velikosti vstupních souborů v bajtech. S velikostí bloků FileSystem vstupních souborů se však zachází jako s horní mezí pro vstupní rozdělení. Pokud máte soubor menší než velikost bloku HDFS, získáte pro tento soubor pouze 1 mapovač. Pokud chcete mít jiné chování, můžete použít mapred.min.split.size. Opět však záleží výhradně na getSplits () vašeho InputFormat.

příklad java pole objektů

V balíčku org.apache.hadoop.mapreduce.lib.input máme k dispozici tolik již existujících vstupních formátů.

CombineFileInputFormat.html

CombineFileRecordReader.html

CombineFileRecordReaderWrapper.html

CombineFileSplit.html

CombineSequenceFileInputFormat.html

CombineTextInputFormat.html

FileInputFormat.html

FileInputFormatCounter.html

FileSplit.html

FixedLengthInputFormat.html

InvalidInputException.html

KeyValueLineRecordReader.html

KeyValueTextInputFormat.html

MultipleInputs.html

NLineInputFormat.html

SequenceFileAsBinaryInputFormat.html

SequenceFileAsTextInputFormat.html

SequenceFileAsTextRecordReader.html

SequenceFileInputFilter.html

SequenceFileInputFormat.html

SequenceFileRecordReader.html

TextInputFormat.html

Výchozí hodnota je TextInputFormat.

Podobně máme tolik výstupních formátů, které čtou data z reduktorů a ukládají je do HDFS:

FileOutputCommitter.html

FileOutputFormat.html

FileOutputFormatCounter.html

FilterOutputFormat.html

LazyOutputFormat.html

MapFileOutputFormat.html

MultipleOutputs.html

NullOutputFormat.html

PartialFileOutputCommitter.html

PartialOutputCommitter.html

SequenceFileAsBinaryOutputFormat.html

SequenceFileOutputFormat.html

TextOutputFormat.html

Výchozí hodnota je TextOutputFormat.

Než přečtete tento blog, dozvěděli byste se:

  • Jak napsat program zmenšení mapy
  • O různých typech InputFormats dostupných v Mapreduce
  • Jaká je potřeba InputFormats
  • Jak psát vlastní InputFormats
  • Jak přenášet data z databází SQL na HDFS
  • Jak přenášet data z databází SQL (zde MySQL) do databází NoSQL (zde Hbase)
  • Jak přenášet data z jedné databáze SQL do jiné tabulky v databázích SQL (Možná to nemusí být tak důležité, pokud to děláme ve stejné databázi SQL. Není však nic špatného mít stejnou znalost. Nikdy nevíte jak se může začít používat)

Předpoklad:

c ++ volání odkazem
  • Předinstalovaný Hadoop
  • Předinstalovaný SQL
  • Předinstalovaný Hbase
  • Základní porozumění Java
  • MapReduce knowledge
  • Základní znalosti rámce Hadoop

Pojďme pochopit prohlášení o problému, které zde budeme řešit:

V naší relační databázi Edureka máme tabulku zaměstnanců v MySQL DB. Nyní podle obchodního požadavku musíme přesunout všechna data dostupná v relační DB do souborového systému Hadoop, tj. HDFS, NoSQL DB známého jako Hbase.

K provedení tohoto úkolu máme mnoho možností:

  • Sqoop
  • Žlab
  • MapReduce

Nyní pro tuto operaci nechcete instalovat a konfigurovat žádný další nástroj. Zbývá vám jen jedna možnost, kterou je zpracovatelský rámec Hadoop MapReduce. Rámec MapReduce vám při přenosu poskytne plnou kontrolu nad daty. Se sloupci můžete manipulovat a umístit přímo na kterékoli ze dvou cílových umístění.

Poznámka:

  • Abychom mohli načíst tabulky z tabulky MySQL, musíme stáhnout a umístit konektor MySQL do cesty třídy Hadoop. Chcete-li to provést, stáhněte si konektor com.mysql.jdbc_5.1.5.jar a uložte jej pod adresář Hadoop_home / share / Hadoop / MaPreduce / lib.
Stahování cp / com.mysql.jdbc_5.1.5.jar $ HADOOP_HOME / share / hadoop / mapreduce / lib /
  • Také umístěte všechny sklenice Hbase pod Hadoop classpath, aby váš program MR získal přístup k Hbase. Chcete-li to provést, proveďte následující příkaz :
cp $ HBASE_HOME / lib / * $ HADOOP_HOME / share / hadoop / mapreduce / lib /

Verze softwaru, které jsem použil při provádění tohoto úkolu, jsou:

  • Hadooop - 2.3.0
  • HBase 0.98.9-Hadoop2
  • Zatmění Měsíce

Aby se program vyhnul jakýmkoli problémům s kompatibilitou, předepisuji svým čtenářům spuštění příkazu s podobným prostředím.

Vlastní DBInputWritable:

balíček com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable veřejná třída DBInputWritable implementuje Writable, DBWritable {private int id private String name, dept public void readFields (DataInput in) throws IOException {} public void readFields (ResultSet rs) vrhá SQLException // Objekt sady výsledků představuje data vrácená z příkazu SQL {id = rs.getInt (1) name = rs.getString (2) dept = rs.getString (3)} public void write (DataOutput out) hodí IOException { } public void write (PreparedStatement ps) hodí SQLException {ps.setInt (1, id) ps.setString (2, name) ps.setString (3, odd)} public int getId () {return id} public String getName () {return name} public String getDept () {return dept}}

Vlastní DBOutputWritable:

balíček com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBOutputWritable implements Writable, DBWritable {private String name private int id private String dept public DBOutputWritable (String name, int id, String dept) {this.name = name this.id = id this.dept = dept} public void readFields (DataInput in) throws IOException {} public void readFields (ResultSet rs) throws SQLException {} public void write (DataOutput out) throws IOException {} public void write (PreparedStatement ps) hodí SQLException {ps.setString (1, name) ps.setInt (2, id) ps.setString (3, odd)}}

Vstupní tabulka:

vytvořit databázi edureka
vytvořit tabulku emp (empid int není null, název varchar (30), odd varchar (20), primární klíč (empid))
vložit do emp hodnot (1, 'abhay', 'vývoj'), (2, 'brundesh', 'test')
vyberte * z emp

Případ 1: Přenos z MySQL na HDFS

balíček com.inputFormat.copy import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce .Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop .io.Text import org.apache.hadoop.io.IntWritable public class MainDbtohdfs {public static void main (String [] args) throws Exception {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc .Driver ', // driver class' jdbc: mysql: // localhost: 3306 / edureka ', // db url' root ', // user name' root ') // password Job job = new Job (conf) job .setJarByClass (MainDbtohdfs.class) job.setMapperClass (Map.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setInputFormatClass (DBInputFormat.class) File.utput, new Path (args [0])) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // název vstupní tabulky null, null, nový řetězec [] {'empid', 'name', 'odd'} / / sloupce tabulky) Cesta p = nová Cesta (args [0]) FileSystem fs = FileSystem.get (nový URI (args [0]), conf) fs.delete (p) System.exit (job.waitForCompletion (true)? 0: 1)}}

Tato část kódu nám umožňuje připravit nebo nakonfigurovat vstupní formát pro přístup k našemu zdrojovému SQL DB. Parametr zahrnuje třídu ovladače, adresa URL má adresu databáze SQL, jeho uživatelské jméno a heslo.

DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // třída ovladače 'jdbc: mysql: // localhost: 3306 / edureka', // db url 'root', // uživatelské jméno 'root') //Heslo

Tato část kódu nám umožňuje předat podrobnosti o tabulkách v databázi a nastavit je v objektu úlohy. Parametry samozřejmě zahrnují instanci úlohy, vlastní zapisovatelnou třídu, která musí implementovat rozhraní DBWritable, název zdrojové tabulky, podmínku, pokud existuje else null, všechny parametry řazení else null, seznam sloupců tabulky.

DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // název vstupní tabulky null, null, nový řetězec [] {'empid', 'name', 'odd'} // sloupce tabulky)

Mapovač

balíček com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io .IntWritable veřejná třída Map rozšiřuje Mapper {
chráněná neplatná mapa (klíč LongWritable, hodnota DBInputWritable, kontext ctx) {try {název řetězce = value.getName () IntWritable id = nový IntWritable (value.getId ()) řetězec string = value.getDept ()
ctx.write (nový text (jméno + '' + id + '' + odd.), id)
} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Reducer: Použitý Reduktor identity

Příkaz ke spuštění:

hadoop jar dbhdfs.jar com.inputFormat.copy.MainDbtohdfs / dbtohdfs

Výstup: Tabulka MySQL převedená na HDFS

hadoop dfs -ls / dbtohdfs / *

Případ 2: Přenos z jedné tabulky v MySQL do druhé v MySQL

vytváření výstupní tabulky v MySQL

vytvořit tabulku zaměstnanec1 (název varchar (20), id int, odd varchar (20))

balíček com.inputFormat.copy import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib .db.DBInputFormat import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io.NullWritable veřejná třída Mainonetable_to_other_table {public static void main (String [] args) throws Exception {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // driver class 'jdbc: mysql: // localhost : 3306 / edureka ', // db url' root ', // uživatelské jméno' root ') // heslo Job job = new Job (conf) job.setJarByClass (Mainonetable_to_other_table.class) job.setMapperClass (Map.class) job .setReducerClass (Reduce.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setOutputKeyClass (DBOutputWritable.class) job.setOutputValueClass (Nul lWritable.class) job.setInputFormatClass (DBInputFormat.class) job.setOutputFormatClass (DBOutputFormat.class) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // název vstupní tabulky null, null, nový řetězec [] {'empid ',' name ',' dept '} // sloupce tabulky) DBOutputFormat.setOutput (job,' employee1 ', // název výstupní tabulky nový String [] {' name ',' id ',' odd '} // tabulka sloupce) System.exit (job.waitForCompletion (true)? 0: 1)}}

Tato část kódu nám umožňuje nakonfigurovat název výstupní tabulky v SQL DB. Parametry jsou instance úlohy, název výstupní tabulky a názvy výstupních sloupců.

DBOutputFormat.setOutput (úloha, 'employee1', // název výstupní tabulky nový řetězec [] {'name', 'id', 'odd'} // sloupce tabulky)

Mapovač: Stejné jako v případě 1

Redukce:

balíček com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Reducer import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io .NullWritable public class Reduce extends Reducer {protected void reduce (Text key, Iterable values, Context ctx) {int sum = 0 String line [] = key.toString (). Split ('') try {ctx.write (new DBOutputWritable (řádek [0] .toString (), Integer.parseInt (řádek [1] .toString ()), řádek [2] .toString ()), NullWritable.get ())} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Příkaz ke spuštění:

hadoop jar dbhdfs.jar com.inputFormat.copy.Mainonetable_to_other_table

Výstup: Přenesená data z tabulky EMP v MySQL jinému zaměstnanci tabulky 1 v MySQL

Případ 3: Přenos z tabulky v MySQL do tabulky NoSQL (Hbase)

Vytváření tabulky Hbase pro přizpůsobení výstupu z tabulky SQL:

vytvořit 'zaměstnanec', 'Official_info'

Třída řidiče:

balíček Dbtohbase import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.HTableInterface import org.apache .hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil import org.apache.hadoop.io.Text veřejná třída MainDbToHbase {public static void main (String [] args) vyvolá výjimku {Configuration conf = HBaseConfiguration.create () HTableInterface mytable = new HTable (conf, 'emp') DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // driver class 'jdbc: mysql: // localhost: 3306 / edureka' , // db url 'root', // uživatelské jméno 'root') // heslo Job job = new Job (conf, 'dbtohbase') job.setJarByClass (MainDbToHbase.class) job.s etMapperClass (Map.class) job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class) TableMapReduceUtil.initTableReducerJob ('zaměstnanec', Reduce.class, job) job.setInputForm. třída) DBInputFormat.setInput (úloha, DBInputWritable.class, 'emp', // název vstupní tabulky null, null, nový řetězec [] {'empid', 'name', 'odd'} // sloupce tabulky) System.exit (job.waitForCompletion (true)? 0: 1)}}

Tato část kódu umožňuje nakonfigurovat třídu výstupního klíče, která je v případě hbase ImmutableBytesWritable

job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class)

Zde předáváme název tabulky hbase a reduktor, který působí na stůl.

TableMapReduceUtil.initTableReducerJob ('employee', Reduce.class, job)

Mapovač:

balíček Dbtohbase import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.io . LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable public class Map extends Mapper {private IntWritable one = new IntWritable (1) protected void map (LongWritable id, DBInputWritable value, Context context) {try {String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), new Text (line + ') '+ odd.))} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

V této části kódu bereme hodnoty z getrů třídy DBinputwritable a pak je předáváme
ImmutableBytesWritable tak, aby dosáhli reduktoru ve formě bytewriatble, které Hbase rozumí.

Řetězec line = value.getName () Řetězec cd = value.getId () + '' Řetězec dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), nový Text (line + '' + odd. ))

Redukce:

balíček Dbtohbase import java.io.IOException import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableReducer import org.apache.hadoop .hbase.util.Bytes import org.apache.hadoop.io.Text veřejná třída Reduce rozšiřuje TableReducer {public void redukovat (klíč ImmutableBytesWritable, hodnoty Iterable, kontext kontext) vyvolá IOException, InterruptedException {String [] příčina = hodnoty smyčky for (Text val: values) {cause = val.toString (). split ('')} // Put to HBase Put put = new Put (key.get ()) put.add (Bytes.toBytes ('Official_info') ), Bytes.toBytes ('name'), Bytes.toBytes (příčina [0])) put.add (Bytes.toBytes ('Official_info'), Bytes.toBytes ('departement)), Bytes.toBytes (příčina [1 ])) context.write (key, put)}}

Tato část kódu nám umožňuje rozhodnout se o přesném řádku a sloupci, do kterého bychom ukládali hodnoty z redukce. Zde ukládáme každý empid do samostatného řádku, protože jsme udělali empid jako klíč řádku, který by byl jedinečný. V každém řádku ukládáme oficiální informace zaměstnanců ve skupině sloupců „official_info“ ve sloupcích „jméno“ a „oddělení“.

Put put = new Put (key.get ()) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('name'), Bytes.toBytes (příčina [0])) put.add (bajty). toBytes ('official_info'), Bytes.toBytes ('department'), Bytes.toBytes (příčina [1])) context.write (key, put)

Přenesená data v Hbase:

skenování zaměstnance

Jak vidíme, mohli jsme úspěšně dokončit úkol migrace našich obchodních dat z relační databáze SQL do databáze NoSQL.

V příštím blogu se naučíme psát a spouštět kódy pro další vstupní a výstupní formáty.

jak pořídit snímek obrazovky v selenu

Stále zveřejňujte své komentáře, dotazy nebo zpětnou vazbu. Rád bych od vás slyšel.

Máte na nás dotaz? Uveďte to prosím v sekci komentáře a my se vám ozveme.

Související příspěvky: