hadoop簡單使用,Hadoop教程(一)

 2023-12-06 阅读 16 评论 0

摘要:英文原文:cloudera,編譯:ImportNew?–?Royce Wong Hadoop從這里開始!和我一起學習下使用Hadoop的基本知識,下文將以Hadoop Tutorial為主體帶大家走一遍如何使用Hadoop分析數據! hadoop簡單使用?這個專題將描述用戶在使用Hadoop MapReduce(下文縮

英文原文:cloudera,編譯:ImportNew?–?Royce Wong

Hadoop從這里開始!和我一起學習下使用Hadoop的基本知識,下文將以Hadoop Tutorial為主體帶大家走一遍如何使用Hadoop分析數據!

hadoop簡單使用?這個專題將描述用戶在使用Hadoop MapReduce(下文縮寫成MR)框架過程中面對的最重要的東西。Mapreduce由client APIs和運行時(runtime)環境組成。其中client APIs用來編寫MR程序,運行時環境提供MR運行的環境。API有2個版本,也就是我們通常說的老api和新api。運行時有兩個版本:MRv1和MRv2。該教程將會基于老api和MRv1。

其中:老api在org.apache.hadoop.mapred包中,新api在 org.apache.hadoop.mapreduce中。

前提

首先請確認已經正確安裝、配置了CDH,并且正常運行。

MR概覽

hadoop菜鳥入門,Hadoop MapReduce 是一個開源的計算框架,運行在其上的應用通常可在擁有幾千個節點的集群上并行處理海量數據(可以使P級的數據集)。

MR作業通常將數據集切分為獨立的chunk,這些chunk以并行的方式被map tasks處理。MR框架對map的輸出進行排序,然后將這些輸出作為輸入給reduce tasks處理。典型的方式是作業的輸入和最終輸出都存儲在分布式文件系統(HDFS)上。

通常部署時計算節點也是存儲節點,MR框架和HDFS運行在同一個集群上。這樣的配置允許框架在集群的節點上有效的調度任務,當然待分析的數據已經在集群上存在,這也導致了集群內部會產生高聚合帶寬現象(通常我們在集群規劃部署時就需要注意這樣一個特點)。

hadoop技術,MapReduce框架由一個Jobracker(通常簡稱JT)和數個TaskTracker(TT)組成(在cdh4中如果使用了Jobtracker HA特性,則會有2個Jobtracer,其中只有一個為active,另一個作為standby處于inactive狀態)。JobTracker負責在所有tasktracker上調度任務,監控任務并重新執行失敗的任務。所有的tasktracker執行jobtracker分配過來的任務。

應用至少需要制定輸入、輸出路徑,并提供實現了適當接口和(或)抽象類的map和reduce函數。這些路徑和函數以及其他的任務參數組成了任務配置對象(job configuration)。Hadoop 任務客戶端提交任務(jar包或者可執行程序等)和配置對象到JT。JT將任務實現和配置對象分發到數個TT(由JT分配),調度、監控任務,并向客戶端返回狀態和檢測信息。

Hadoop由JavaTM實現,用戶可以使用java、基于JVM的其他語言或者以下的方式開發MR應用:

  • Hadoop Streaming- 允許用戶以任何一種可執行程序(如shell腳本)實現為mapper和(或)reducer來創建和運行MR任務。
  • Hadoop Pigs – 一種兼容SWIG(不基于JNITM)的C++ API,用來實現MapReduce應用。

輸入和輸出

MapReuce框架內部處理的是kv對(key-value pair),因為MR將任務的輸入當做一個kv對的集合,將輸出看做一個kv對的集合。輸出kv對的類型可以不同于輸入對。

key和vaue的類型必須在框架內可序列化(serializable),所以key value 必須實現Writable接口。同時,key 類必須實現WritableComparable以便框架對key進行排序。

典型的MR任務輸入和輸出類型轉換圖為:
(input) k1-v1 -> map -> k2-v2 -> combine -> k2-v2 -> reduce -> k3-v3 (output)

經典的WordCount1.0

玩Hadoop不得不提WordCount,CDH原文里也以這為例,當然這里也以它為例:)

簡單說下WordCount,它是計算輸入數據中每個word的出現次數。因為足夠簡單所以經典,和Hello World有的一拼!

上源碼:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package org.myorg;
????import java.io.IOException;
????import java.util.*;
????import org.apache.hadoop.fs.Path;
????import org.apache.hadoop.conf.*;
????import org.apache.hadoop.io.*;
????import org.apache.hadoop.mapred.*;
????import org.apache.hadoop.util.*;
????public class WordCount {
??????public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
????????private final static IntWritable one = new IntWritable(1);
????????private Text word = new Text();
????????public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
??????????String line = value.toString();
??????????StringTokenizer tokenizer = new StringTokenizer(line);
??????????while (tokenizer.hasMoreTokens()) {
????????????word.set(tokenizer.nextToken());
????????????output.collect(word, one);
??????????}
????????}
??????}
??????public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
????????public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
??????????int sum = 0;
??????????while (values.hasNext()) {
????????????sum += values.next().get();
??????????}
??????????output.collect(key, new IntWritable(sum));
????????}
??????}
??????public static void main(String[] args) throws Exception {
????????JobConf conf = new JobConf(WordCount.class);
????????conf.setJobName("wordcount");
????????conf.setOutputKeyClass(Text.class);
????????conf.setOutputValueClass(IntWritable.class);
????????conf.setMapperClass(Map.class);
????????conf.setCombinerClass(Reduce.class);
????????conf.setReducerClass(Reduce.class);
????????conf.setInputFormat(TextInputFormat.class);
????????conf.setOutputFormat(TextOutputFormat.class);
????????FileInputFormat.setInputPaths(conf, new Path(args[0]));
????????FileOutputFormat.setOutputPath(conf, new Path(args[1]));
????????JobClient.runJob(conf);
??????}
????}

首先編譯WordCount.java

$ mkdir wordcount_classes $ javac -cp classpath -d wordcount_classes WordCount.java

其中classpath為:

  • CDH4 /usr/lib/hadoop/:/usr/lib/hadoop/client-0.20/
  • CDH3 /usr/lib/hadoop-0.20/hadoop-0.20.2-cdh3u4-core.jar

打成jar包:

1
$ jar -cvf wordcount.jar -C wordcount_classes/ .

假定:

  • /user/cloudera/wordcount/input 輸入HDFS路徑
  • /user/cloudera/wordcount/output 輸出HDFS路徑

創建文本格式的數據并移動到HDFS:

1
2
3
4
$ echo "Hello World Bye World" > file0
$ echo "Hello Hadoop Goodbye Hadoop" > file1
$ hadoop fs -mkdir /user/cloudera /user/cloudera/wordcount /user/cloudera/wordcount/input
$ hadoop fs -put file* /user/cloudera/wordcount/input

運行wordcount:

1
$ hadoop jar wordcount.jar org.myorg.WordCount /user/cloudera/wordcount/input /user/cloudera/wordcount/output

運行完畢后查看輸出:

1
2
3
4
5
6
$ hadoop fs -cat /user/cloudera/wordcount/output/part-00000
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2

MR應用可以用-files參數指定在當前工作目錄下存在的多個文件,多個文件使用英文逗號分隔。-libjars參數可以將多個jar包添加到map和reduce的classpath。-archive參數可以將工作路徑下的zip或jar包當做參數傳遞,而zip或jar包名字被當做一個鏈接(link)。更加詳細的命令信息可以參考Hadoop Command Guide.

使用-libjars和-files參數運行wordcount的命令:

1
hadoop jar hadoop-examples.jar wordcount -files cachefile.txt -libjars mylib.jar input output

詳細看下wordcount應用

14-26行實現了Mapper,通過map方法(18-25行)一次處理一行記錄,記錄格式為指定的TextInputFormat(行49)。然后將一條記錄行根據空格分隔成一個個單詞。分隔使用的是類StringTokenizer,然后以<word,1>形式發布kv對。

在前面給定的輸入中,第一個map將會輸出:< Hello, 1> < World, 1> < Bye, 1> < World, 1>

第二個map輸出: < Hello, 1> < Hadoop, 1> < Goodbye, 1> < Hadoop, 1>

我們會在本篇文章中深入學習該任務中map的大量輸出的數目,并研究如何在更細粒度控制輸出。

WordCount 在46行指定了combiner。因此每個map的輸出在根據key排序后會通過本地的combiner(實現和reducer一致)進行本地聚合。

第一個map的最終輸出:< Bye, 1> < Hello, 1> < World, 2>

第二個map輸出:< Goodbye, 1> < Hadoop, 2> < Hello, 1>

Reducer實現(28-36行)通過reduce方法(29-35)僅對值進行疊加,計算每個單詞的出現次數。

所以wordcount的最終輸出為: < Bye, 1> < Goodbye, 1> < Hadoop, 2> < Hello, 2> < World, 2>

run方法通過JobConf對象指定了該任務的各種參數,例如輸入/輸出路徑,kv類型,輸入輸出格式等等。程序通過調用 JobClient.runJob (55行)提交并開始監測任務的執行進度。

后面我們將對JobConf、JobClient、Tool做進一步學習。

英文原文:cloudera,編譯:ImportNew?–?Royce Wong

本文鏈接:http://www.importnew.com/4248.html

?

相關文章

  • 為集群配置Impala和Mapreduce
  • Hadoop入門教程(四):MR作業的提交監控、輸入輸出控制及特性使用
  • Hadoop教程(三): MR重要運行參數
  • Hadoop教程(二)
  • 如何提高hadoop中Short-Circuit Local Reads時的性能及安全性
  • 你不需要 Hadoop做數據分析的10個理由 —— 使用之前必須測試其他替代品
  • 關于Hadoop和Cassandra性能問題的討論
  • 使用Hadoop和BIRT對海量數據進行可視化處理
  • 專為Mac本跟蹤Hadoop任務的應用
  • 為Hbase建立高可用性多主節點

轉載于:https://www.cnblogs.com/tangkai/p/3830782.html

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/2/192724.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息