大数据 | MapReduce 示例编程
本文章使用 国赛第五套数据
题目案例:


数据源:clean_month.csv 点击进入下载
题目一 - 1:
分析题目需求:去除掉月份为空的数据
思考:Map 程序按行读取(LongWritable - 行, Text - 数据内容)
Map 读取后用","分割,处理 csv 数据,提取字段到一个新的 String month 中。
使用 if 判断 month 是否为非首行(month)和是否为非空,成立进入写入上下文阶段,context.write(源字段, 空)
使用 Throws 来抛去错误的内容,防止错误出现。
代码示例:
package com.sub1;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class mMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
String[] fields = value.toString().split(",");
String month = fields[1];
if(!month.equals("month") && !month.isEmpty()) {
context.write(value, NullWritable.get());
}
}
}Reducer类
package com.sub1;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class mReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text text, Iterable<NullWritable> nulls, Context context)
throws IOException, InterruptedException {
context.write(text, NullWritable.get());
}
}题目二 - 1:
分析题目需求:输出排序后的数据
思考:Mapper 阶段输出程序名和 Float 类型的温度,Reduce 阶段进行排序与处理。
Mapper 阶段需判断空值和首行,Reducer 需要使用 HashMap 和 ArrayList 进行排序与存储数据。
代码示例(Mapper):
package com.sub2;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class mMapper extends Mapper<LongWritable, Text, Text, FloatWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
String city = fields[0].trim();
if(!city.equals("city")) {
Float tem = Float.parseFloat(fields[4].trim());
context.write(new Text(city), new FloatWritable(tem));
}
}
}Reducer:
package com.sub2;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.*;
public class mReducer extends Reducer<Text, FloatWritable, Text, FloatWritable> {
protected final Map<String, Float> map = new HashMap<>();
@Override
protected void reduce(Text text, Iterable<FloatWritable> value, Context context) {
float hiTemp = 0;
for (FloatWritable i: value) {
if (i.get() > hiTemp) {
hiTemp = i.get();
}
}
map.put(text.toString(), hiTemp);
}
@Override
protected void cleanup( Context context)
throws IOException, InterruptedException{
List<Map.Entry<String, Float>> list = new ArrayList<>(map.entrySet());
Collections.sort(list, new Comparator<Map.Entry<String, Float>>() {
@Override
public int compare(Map.Entry<String, Float> o1, Map.Entry<String, Float> o2) {
int maxV;
maxV = o2.getValue().compareTo(o1.getValue());
if(maxV != 0) {
return maxV;
}
return o2.getKey().charAt(0) - o1.getKey().charAt(1);
}
});
for(Map.Entry<String, Float> stringFloatEntry: list) {
context.write(new Text(stringFloatEntry.getKey()), new FloatWritable(stringFloatEntry.getValue()));
}
}
}其他:
Driver类:每个包需要自己的 Driver 类作为 MapReduce 任务的主入口,在 Driver 类定义Mapper 和 Reducer 阶段的输入输出位置与类型。
一般地,MapReduce 程序会使用<Key, Value>的键值对,所以在任务中需要包含四个数据类型,
Mapper 类: Mapper<行(LongWritable),文本(Text), [输出]Text, NullWritable(空)>
Reducer 类:Reducer<[mapper 传来的数据]Text, [mapper 传来的数据]NullWritbale, Text, NullWritbable>
Driver 类代码(第二题)
package com.sub2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
public class mDriver {
public static void main(String[] args)
throws IOException, InterruptedException, Exception {
Configuration conf = new Configuration();
String[] rargs = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = Job.getInstance(conf, "2");
job.setJarByClass(mDriver.class);
job.setMapperClass(mMapper.class);
job.setReducerClass(mReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FloatWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
FileInputFormat.setInputPaths(job, new Path(rargs[0]));
FileOutputFormat.setOutputPath(job, new Path(rargs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 Summer
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果