本文章使用 国赛第五套数据

题目案例:

image.png

image.png

数据源:clean_month.csv 点击进入下载

题目一 - 1:

    分析题目需求:去除掉月份为空的数据

    思考:Map 程序按行读取(LongWritable - 行, Text - 数据内容)

        Map 读取后用","分割,处理 csv 数据,提取字段到一个新的 String month 中。

        使用 if 判断 month 是否为非首行(month)和是否为非空,成立进入写入上下文阶段,context.write(源字段, 空)

        使用 Throws 来抛去错误的内容,防止错误出现。

代码示例:

package com.sub1;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class mMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException
    {
        String[] fields = value.toString().split(",");

        String month = fields[1];
        if(!month.equals("month") && !month.isEmpty()) {
            context.write(value, NullWritable.get());
        }
    }
}

  Reducer类

package com.sub1;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class mReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
    @Override
    protected void reduce(Text text, Iterable<NullWritable> nulls, Context context)
        throws IOException, InterruptedException {
        context.write(text, NullWritable.get());
    }
}

题目二 - 1:

    分析题目需求:输出排序后的数据

    思考:Mapper 阶段输出程序名和 Float 类型的温度,Reduce 阶段进行排序与处理。

        Mapper 阶段需判断空值和首行,Reducer 需要使用 HashMap 和 ArrayList 进行排序与存储数据。

代码示例(Mapper):

package com.sub2;

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class mMapper extends Mapper<LongWritable, Text, Text, FloatWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
        String[] fields = value.toString().split(",");
        String city = fields[0].trim();
        if(!city.equals("city")) {
            Float tem = Float.parseFloat(fields[4].trim());
            context.write(new Text(city), new FloatWritable(tem));
        }
    }
}

Reducer:

package com.sub2;

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.*;

public class mReducer extends Reducer<Text, FloatWritable, Text, FloatWritable> {
    protected final Map<String, Float> map = new HashMap<>();

    @Override
    protected void reduce(Text text, Iterable<FloatWritable> value, Context context) {
        float hiTemp = 0;
        for (FloatWritable i: value) {
            if (i.get() > hiTemp) {
                hiTemp = i.get();
            }
        }
        map.put(text.toString(), hiTemp);
    }

    @Override
    protected void cleanup( Context context)
        throws IOException, InterruptedException{
        List<Map.Entry<String, Float>> list = new ArrayList<>(map.entrySet());
        Collections.sort(list, new Comparator<Map.Entry<String, Float>>() {
            @Override
            public int compare(Map.Entry<String, Float> o1, Map.Entry<String, Float> o2) {
                int maxV;
                maxV = o2.getValue().compareTo(o1.getValue());
                if(maxV != 0) {
                    return maxV;
                }
                return o2.getKey().charAt(0) - o1.getKey().charAt(1);
            }
        });
        for(Map.Entry<String, Float> stringFloatEntry: list) {
            context.write(new Text(stringFloatEntry.getKey()), new FloatWritable(stringFloatEntry.getValue()));
        }
    }
}

其他:

Driver类:每个包需要自己的 Driver 类作为 MapReduce 任务的主入口,在 Driver 类定义Mapper 和 Reducer 阶段的输入输出位置与类型。

一般地,MapReduce 程序会使用<Key, Value>的键值对,所以在任务中需要包含四个数据类型,

Mapper 类: Mapper<行(LongWritable),文本(Text), [输出]Text, NullWritable(空)>

Reducer 类:Reducer<[mapper 传来的数据]Text, [mapper 传来的数据]NullWritbale, Text, NullWritbable>

Driver 类代码(第二题)

package com.sub2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;

public class mDriver {
    public static void main(String[] args)
    throws IOException, InterruptedException, Exception {
        Configuration conf = new Configuration();
        String[] rargs = new GenericOptionsParser(conf, args).getRemainingArgs();

        Job job = Job.getInstance(conf, "2");
        job.setJarByClass(mDriver.class);
        job.setMapperClass(mMapper.class);
        job.setReducerClass(mReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FloatWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FloatWritable.class);

        FileInputFormat.setInputPaths(job, new Path(rargs[0]));
        FileOutputFormat.setOutputPath(job, new Path(rargs[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}