1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
| package org.pross.version;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;
public class VersionMR extends Configured implements Tool{ public static void main(String[] args) throws Exception { int run=ToolRunner.run(new VersionMR(), args); System.exit(run); }
@Override public int run(String[] args) throws Exception { Configuration conf=new Configuration(); FileSystem fs=FileSystem.get(conf); Job job=Job.getInstance(conf); job.setJarByClass(VersionMR.class); job.setMapperClass(VersionMRMapper.class); job.setReducerClass(VersionMRReduce.class); job.setMapOutputKeyClass(Version.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); Path inputPath = new Path("E:\\BigData\\7_Hadoop\\hadoop-mapreduce-day5\\data\\input_version"); Path outputPath = new Path("E:\\BigData\\7_Hadoop\\hadoop-mapreduce-day5\\data\\output_version"); if(fs.exists(outputPath)){ fs.delete(outputPath,true); } FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); boolean bool = job.waitForCompletion(true); return bool? 0 : 1 ; }
public static class VersionMRMapper extends Mapper<LongWritable, Text, Version, NullWritable>{
@Override protected void map(LongWritable key, Text value, Context context) throws IOException ,InterruptedException { String[] split = value.toString().split(","); Version version = new Version(split[0], split[1],split[2],Integer.parseInt(split[3]), split[4], split[5], split[6]); context.write(version, NullWritable.get()); } }
public static class VersionMRReduce extends Reducer<Version, NullWritable, Text, NullWritable>{ String lastID=null; String lastName=null; String lastVersion=null; @Override protected void reduce(Version key, Iterable<NullWritable> values,Context context)throws IOException, InterruptedException { for(NullWritable nvl:values){ if(lastVersion==null){ context.write(new Text(key.toString()), NullWritable.get()); }else{ if(lastID.equals(key.getId()) && lastName.equals(key.getName())){ if(!lastVersion.equals(key.getVersion())){ context.write(new Text(key.toString()+"-"+lastVersion), NullWritable.get()); } }else{ context.write(new Text(key.toString()), NullWritable.get()); } } lastID=key.getId(); lastName=key.getName(); lastVersion=key.getVersion(); } } } }
|