MapReduce编程案例(下)

  • Versions变动版本记录
  • 数值累加

Version变动版本记录

在所有有版本变动的记录后面追加一条字段信息:该信息就是上一个版本的版本号,只限同用户

1
2
3
4
5
6
7
8
数据格式:
20170308,黄渤,光环斗地主,8,360手机助手,0.1版本,北京
20170308,黄渤,光环斗地主,22,360手机助手,0.2版本,北京
20170308,黄渤,光环斗地主,14,360手机助手,0.3版本,北京
...
字段信息:
用户ID,用户名,游戏名,小时,数据来源,游戏版本,用户所在地
id, name, game, hour, source, version, city

思路:把id、name、game、hours、source、version、city封装成Version对象,实现WritableComparable接口,并根据id、name、version先后排序。排序后判断相邻的数据版本号是否一致来进行版本号的添加。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package org.pross.version;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/**
* @author pross shawn
*
* create time:2018年3月19日
*
* content:Version实体类
*/
public class Version implements WritableComparable<Version>{
private String id;
private String name;
private String game;
private int hour;
private String source;
private String version;
private String city;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getGame() {
return game;
}
public void setGame(String game) {
this.game = game;
}
public int getHour() {
return hour;
}
public void setHour(int hour) {
this.hour = hour;
}
public String getSource() {
return source;
}
public void setSource(String source) {
this.source = source;
}
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city;
}
public Version(String id, String name, String game, int hour, String source, String version, String city) {
super();
this.id = id;
this.name = name;
this.game = game;
this.hour = hour;
this.source = source;
this.version = version;
this.city = city;
}
public Version() {
super();
}

@Override
public String toString() {
return id + "," + name + "," + game + "," + hour + "," + source+ "," + version + "," + city;
}

/*
* 序列化与反序列化
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(id);
out.writeUTF(name);
out.writeUTF(game);
out.writeInt(hour);
out.writeUTF(source);
out.writeUTF(version);
out.writeUTF(city);
}

@Override
public void readFields(DataInput in) throws IOException {
this.id = in.readUTF();
this.name = in.readUTF();
this.game = in.readUTF();
this.hour = in.readInt();
this.source = in.readUTF();
this.version = in.readUTF();
this.city = in.readUTF();
}

/*
* 比较器 排序
*/
@Override
public int compareTo(Version version) {
int resultID=this.id.compareTo(version.getId());
if(resultID==0){
int resultName=this.name.compareTo(version.getName());
if(resultName==0){
return this.version.compareTo(version.getVersion());
}else{
return resultName;
}
}else{
return resultID;
}
}
}

在MR程序中,map阶段拆分数据后把数据封装到Version对象中,在reduce阶段进行数据迭代,判断版本号是否一致,当id和name不一致,证明不是两个不同的用户;如果一致,则进行版本号的拼接,放入Key中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package org.pross.version;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
* @author pross shawn
*
* create time:2018年3月19日
*
* content:在所有有版本变动的记录后面追加一条字段信息:该信息就是上一个版本的版本号,只限同用户
*
* 文件路径:E:\BigData\7_Hadoop\hadoop-mapreduce-day5\data\input_version
*/
public class VersionMR extends Configured implements Tool{

public static void main(String[] args) throws Exception {
int run=ToolRunner.run(new VersionMR(), args);
System.exit(run);
}

@Override
public int run(String[] args) throws Exception {
//指定HDFS相关参数
Configuration conf=new Configuration();
FileSystem fs=FileSystem.get(conf);

Job job=Job.getInstance(conf);
job.setJarByClass(VersionMR.class);

//指定mapper类和reduce类
job.setMapperClass(VersionMRMapper.class);
job.setReducerClass(VersionMRReduce.class);

//指定输出类型
job.setMapOutputKeyClass(Version.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

//指定路径
Path inputPath = new Path("E:\\BigData\\7_Hadoop\\hadoop-mapreduce-day5\\data\\input_version");
Path outputPath = new Path("E:\\BigData\\7_Hadoop\\hadoop-mapreduce-day5\\data\\output_version");
//如果输出路径存在则删除
if(fs.exists(outputPath)){
fs.delete(outputPath,true);
}
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);

//提交任务
boolean bool = job.waitForCompletion(true);
return bool? 0 : 1 ;
}


/**
* map组件
*/
public static class VersionMRMapper extends Mapper<LongWritable, Text, Version, NullWritable>{
/*
* 数据格式:
* 20170308,黄渤,光环斗地主,8,360手机助手,0.1版本,北京
* 20170308,黄渤,光环斗地主,5,360手机助手,0.1版本,北京
*
* 字段信息:
* 用户ID,用户名,游戏名,小时,数据来源,游戏版本,用户所在地
* id, name, game, hour, source, version, city
*
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException ,InterruptedException {
//拆分数据
String[] split = value.toString().split(",");
//把数据封装到对象中
Version version = new Version(split[0], split[1],split[2],Integer.parseInt(split[3]), split[4], split[5], split[6]);
context.write(version, NullWritable.get());
}

}

/**
* reduce组件
*/
public static class VersionMRReduce extends Reducer<Version, NullWritable, Text, NullWritable>{

String lastID=null;
String lastName=null;
String lastVersion=null;

@Override
protected void reduce(Version key, Iterable<NullWritable> values,Context context)throws IOException, InterruptedException {
for(NullWritable nvl:values){
if(lastVersion==null){
//第一次进入程序,lastVersion为空,直接打印,因为相当于没有上一条数据
context.write(new Text(key.toString()), NullWritable.get());
}else{
//当ID和Name一致时
if(lastID.equals(key.getId()) && lastName.equals(key.getName())){
//判断上个版本号和当前版本号是否一致
if(!lastVersion.equals(key.getVersion())){
context.write(new Text(key.toString()+"-"+lastVersion), NullWritable.get());
}
//当ID和Name有一个不一致时,证明是两个不同的用户
}else{
context.write(new Text(key.toString()), NullWritable.get());
}
}
//进行数据迭代处理,方便本次和下次的数据进行对比
lastID=key.getId();
lastName=key.getName();
lastVersion=key.getVersion();
}
}
}
}

数值累加

求所有数对应位置的叠加和

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
举例:
0001.txt文件有数据:
1
2
3
4
...
10
0002.txt文件有数据:
10
10
10
10
...
返回结果是:
1 1
2 3
3 6
4 10
...
10 55
也就是,每一行数字后面都追加一个累加到该数字的和值

思路:先求出每个文件的总和,然后按照文件的顺序进行叠加