MapReduce编程案例(上)

  • 单词计数WordCount
  • 数组排序并加序号
  • 共同好友

MapRedeuce相关概念

MapReduce是一个分布式运算程序的编程框架,是用户开发”基于Hadoop的数据分析应用”的核心框架

MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上

FileInputFormat中默认的切片机制:切片大小默认等于block的大小

用户编写MR程序分为三个部分:Mapper、Reducer、Driver(提交运行MR程序的客户端)

Mapper的输入数据和输出数据是K-V对的形式,Reducer的输入数据类型对应Mapper的输出类型

ReduceTask进程对每一组相同K的K-V组调用一次reduce()方法

单词计数WordCount

思路:逐行读取文本内容–>把读取到的一行文本内容切割为一个一个的单词–>把每个单词出现一次的信息记录为一个key-value,key=word,value=1–>收集所有相同的单词,然后统计value值的总和

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
*
* @author pross shawn
*
* create time:2018年3月17日
*
* content:例子程序 wordcount
*/
public class WordCountMR {

public static void main(String[] args) throws Exception {
//指定hdfs相关的参数,没有涉及集群文件可以把set信息注释
Configuration conf = new Configuration();
// conf.set("fs.defaultFS", "hdfs://hadoop02:9000");
// System.setProperty("HADOOP_USER_NAME", "hadoop");
FileSystem fs = FileSystem.get(conf);

//通过Configuration对象获取job对象,job对象会组织所有的该mapreduce程序所有的各种组件
Job job = Job.getInstance(conf);
//设置jar包所在路径,一般即为类名的反射
job.setJarByClass(WordCountMR.class);

//指定mapper和reducer类
job.setMapperClass(WordCountMRMapper.class);
job.setReducerClass(WordCountMRReducer.class);
//指定mapper和reduce的输入输出类型,如果reducer的输入输出类型和mapper一致可以省略
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

//设置程序的输入路径和输出路径
Path inputPath = new Path("E:\\BigData\\7_Hadoop\\hadoop-mapreduce-day2\\data\\score\\input");
Path outputPath = new Path("E:\\BigData\\7_Hadoop\\hadoop-mapreduce-day2\\data\\score\\output_wordcount");
FileInputFormat.setInputPaths(job, inputPath);
//判断输出路径是否存在,存在则删除
if(fs.exists(outputPath)){
fs.delete(outputPath, true);
}
FileOutputFormat.setOutputPath(job, outputPath);

//提交任务,布尔值决定要不要将运行进度信息输出给用户
boolean isDone = job.waitForCompletion(true);
//主线程根据mapreduce程序的运行结果成功与否决定是否退出
System.exit(isDone ? 0 : 1);
}

/**
* mapper组件
* LongWritable key : 该key就是value该行文本的在文件当中的起始偏移量
* Text value : 就是MapReduce框架默认的数据读取组件TextInputFormat读取文件当中的一行文本
*/
public static class WordCountMRMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
//定义输出类型,避免重复实例化其对象
Text outKey=new Text();
IntWritable outValue=new IntWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//切分单词
String[] words=value.toString().split(" ");

for(String word:words){
//每个单词计数一次,也就是把单词组织成<hello,1>这样的K-V
outKey.set(word);
outValue.set(1);
context.write(outKey, outValue);
}
}
}

/**
* reduce组件
* 输入类型就是Map阶段的处理结果,输出类型就是Reduce最后的输出
* reducetask将这些收到K-V数据拿来处理时,是这样调用我们的reduce方法的:
* 先将自己收到的所有的K-V对按照K分组(根据K是否相同) 将某一组K-V中的第一个K-V中的K传给reduce方法的key变量
* 把这一组kv中所有的v用一个迭代器传给reduce方法的变量values
*/
public static class WordCountMRReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//结果汇总
int sum=0;
for(IntWritable v:values){
sum+=v.get();
}
//输出
context.write(key, new IntWritable(sum));
}
}
}

数组排序并加序号

思路:利用shuffle阶段会把K-V对中的key值自动排序功能,先把数组元素放到key中进行排序,然后根据排序的顺序在reduce阶段为元素编上序号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
* @author pross shawn
*
* create time:2018年3月17日
*
* content:数字排序并加序号
*
*/
public class ArraySort {
static int number=0;
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(ArraySort.class);

job.setMapperClass(ArraySortMapper.class);
job.setReducerClass(ArraySortReducer.class);

job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);

FileInputFormat.setInputPaths(job, new Path("E:/BigData/7_Hadoop/hadoop-mapreduce-day2/data/array/input"));
FileOutputFormat.setOutputPath(job, new Path("E:/BigData/7_Hadoop/hadoop-mapreduce-day2/data/array/output2"));

boolean isDone = job.waitForCompletion(true);
System.exit(isDone ? 0 : 1);

}
/*
* Mapper组件
*/
public static class ArraySortMapper extends Mapper<LongWritable,Text,IntWritable,IntWritable>{
//定义输出类型,避免重复实例化其对象
IntWritable outKey=new IntWritable();
IntWritable outValue=new IntWritable();

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//去除到元素的前后空格
String tempNumber = value.toString().trim();
int outKeyTemp=Integer.parseInt(tempNumber);
outKey.set(outKeyTemp);
outValue.set(0);
context.write(outKey,outValue);
}
}

/*
* Ruduce组件
*/
public static class ArraySortReducer extends Reducer<IntWritable,IntWritable, IntWritable, IntWritable>{

@Override
protected void reduce(IntWritable key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {
for(IntWritable t:values){
number++;
context.write(new IntWritable(number),key);
}
}
}
}

思考:当本地文件中的数据过大时,这样的排序并加编号的方案是否可行 ?

求共同好友

数据格式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
A:B,C,D,F,E,O (表示B,C,D,E,F,O是A用户的好友)
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J,K
...

输出格式:

1
2
A-B:C E  (A和B的共同还有是E,C)
A-C:D F

逆推法:结果为A-B:C E,那么key:A-B;value:C E,得知A-B:C;A-B:E,意思是E是A和B的共同好友;继续可以转换A:E,B:E;那么单看A的好友可以拆分为,A:B,A:C,A:D。

我们整理来一下,从源数据格式推出结果:A:B,C,D,F,E,O / B:A,C,E,K –> A:C,E / B:C,E –>A-B:E / A-B:C –> A-B:C E,这样推理过来就需要两个MR来解决,拆分来写,只需要后一个MR程序调用前一个MR程序的结果即可;多job串联,可以使用JobControl这个类把具有依赖关系的多个job串联起来,然后调度先后执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
package org.pross.friend;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
* @author pross shawn
*
* create time:2018年3月17日
*
* content:找出共同好友
*/
public class CommonFriend{

public static void main(String[] args) throws Exception {
Configuration conf1 = new Configuration();
Configuration conf2 = new Configuration();
FileSystem fs = FileSystem.get(conf1);

// 第一个job任务
Job job1 = Job.getInstance(conf1);
job1.setJarByClass(CommonFriend.class);
job1.setMapperClass(CommonFriend1Mapper.class);
job1.setReducerClass(CommonFriend1Reduce.class);

job1.setMapOutputKeyClass(Text.class);
job1.setMapOutputValueClass(Text.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(Text.class);

Path inputPath1 = new Path("E:/BigData/7_Hadoop/hadoop-mapreduce-day2/data/friend/input");
Path outputPath1 = new Path("E:/BigData/7_Hadoop/hadoop-mapreduce-day2/data/friend/output_merge");
FileInputFormat.setInputPaths(job1, inputPath1);
if (fs.exists(outputPath1)) {
fs.delete(outputPath1, true);
}
FileOutputFormat.setOutputPath(job1, outputPath1);

//第二个job任务
Job job2 =Job.getInstance(conf2);
job2.setMapperClass(CommonFriend2Mapper.class);
job2.setReducerClass(CommonFriend2Reducer.class);
job2.setMapOutputKeyClass(Text.class);
job2.setMapOutputValueClass(Text.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
Path inputPath2 = new Path("E:/BigData/7_Hadoop/hadoop-mapreduce-day2/data/friend/output_merge");
Path outputPath2 = new Path("E:/BigData/7_Hadoop/hadoop-mapreduce-day2/data/friend/output_lastMerge");
FileInputFormat.setInputPaths(job2, inputPath2);
if(fs.exists(outputPath2)){
fs.delete(outputPath2, true);
}
FileOutputFormat.setOutputPath(job2, outputPath2);

/*
* 多个job串联
* 使用JobControl把具有依赖的多个job串联起来
*/
JobControl control=new JobControl("CommonFriend");
ControlledJob ajob=new ControlledJob(job1.getConfiguration());
ControlledJob bjob=new ControlledJob(job2.getConfiguration());
// bjob的执行依赖ajob
bjob.addDependingJob(ajob);

control.addJob(ajob);
control.addJob(bjob);

Thread t=new Thread(control);
t.start();

//等待0.5秒执行下一个
while(!control.allFinished()){
Thread.sleep(500);
}

System.exit(0);
}

/*
* 第一个MR程序
*/
public static class CommonFriend1Mapper extends Mapper<LongWritable, Text, Text, Text> {
/*
* 数据源格式
* A:B,C,D,F,E,O B:A,C,E,K
* ...
*/
Text outkey = new Text();
Text outValue = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 拆分
String[] user_friends = value.toString().split(":");
String user = user_friends[0];
String[] friends = user_friends[1].split(",");
for (String friend : friends) {
outkey.set(friend);
outValue.set(user);
context.write(outkey, outValue);
}
}
}
public static class CommonFriend1Reduce extends Reducer<Text, Text, Text, Text> {
Text keyOut = new Text();

@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
ArrayList<String> userList = new ArrayList<String>();
for (Text t : values) {
userList.add(t.toString());
}
Collections.sort(userList);
int size = userList.size();
for (int i = 0; i < size - 1; i++) {
for (int j = i + 1; j < size; j++) {
String outKey = userList.get(i) + "-" + userList.get(j);
keyOut.set(outKey);
context.write(keyOut, key);
}
}
}
}


/*
* 第二个MR程序
* 数据格式:
* B-C A
* B-D A
* B-F A
* B-G A
* ...
*/
public static class CommonFriend2Mapper extends Mapper<LongWritable, Text, Text, Text> {
Text keyOut = new Text();
Text valueOut = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
keyOut.set(split[0]);
valueOut.set(split[1]);
context.write(keyOut, valueOut);
}
}

public static class CommonFriend2Reducer extends Reducer<Text, Text, Text, Text> {
Text valueOut = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
for (Text t : values) {
sb.append(t.toString()).append(" ");
}
String outValueStr = sb.toString();
valueOut.set(outValueStr);
context.write(key, valueOut);
}
/*
* 输出数据:
* A-B E C
* A-C D F
* ...
*/
}
}