ZBLOG

import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hado...

import java.io.IOException;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Task1 {

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "task1");
    job.setJarByClass(Task1.class);
    job.setMapperClass(Mapper1.class);
    job.setReducerClass(Reducer1.class);
    // 设置Mapper和Reducer的输出类型
    job.setMapOutputKeyClass(IntWritable.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(IntWritable.class);

    // 修改输入路径和输出路径
    FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/exam/20560640321"));
    FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/exam/20560640321/output"));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

public static class Mapper1 extends Mapper<LongWritable, Text, IntWritable, IntWritable> {

   private final static IntWritable one = new IntWritable(1); 

   public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] fields = line.split("\t");

        // 判断是否为文件头
        if (fields[0].equals("id")) {
            return;
        }

        // 检查数据格式
        if (fields.length != 11) {
            return;
        }

         // 检查租赁数量是否为数字
        try {
            int count = Integer.parseInt(fields[10]);
        } catch (NumberFormatException e) {
            return;
        }

       // 取整温度
       int temp = (int) Math.floor(Double.parseDouble(fields[6]));

       // 输出<temp, count>
       context.write(new IntWritable(temp), new IntWritable(Integer.parseInt(fields[10])));
    }
}

public static class Reducer1 extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {

    @Override
    public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;

        // 计算同一温度下的共享单车总计数值
        for (IntWritable value : values) {
            sum += value.get();
        }

        // 输出<temperature, total_count>
        context.write(key, new IntWritable(sum));
    }
}

}

本站部分文章来源于网络,版权归原作者所有,如有侵权请联系站长删除。
转载请注明出处:https://golang.0voice.com/?id=5097

分享:
扫描分享到社交APP
上一篇
下一篇
发表列表
游客 游客
此处应有掌声~
评论列表

还没有评论,快来说点什么吧~

联系我们

在线咨询: 点击这里给我发消息

微信号:3007537140

上班时间: 10:30-22:30

关注我们
x

注册

已经有帐号?