ZBLOG

import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hado...

import java.io.IOException;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Task1 {

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "task1");
    job.setJarByClass(Task1.class);
    job.setMapperClass(Mapper1.class);
    job.setReducerClass(Reducer1.class);
    // 设置Mapper和Reducer的输出类型
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    // 修改输入路径和输出路径
    FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/exam/20560640321"));
    FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/exam/20560640321/output"));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

public static class Mapper1 extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text time = new Text();

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] fields = line.split("\t");
        // 检查使用数量是否为数字
        try {
            int count = Integer.parseInt(fields[7]);
            // 获取时间
            String timeStr = fields[2];
            if (timeStr.compareTo("12:00:00") >= 0 && timeStr.compareTo("15:00:00") <= 0) {
                time.set(fields[1]);
                context.write(time, new IntWritable(count));
            }
        } catch (NumberFormatException e) {
            return;
        }
    }
}

public static class Reducer1 extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;

        // 计算同一天内12点到3点的使用总数
        for (IntWritable value : values) {
            sum += value.get();
        }

        // 输出<day, total_count>
        context.write(key, new IntWritable(sum));
    }
}

}

本站部分文章来源于网络,版权归原作者所有,如有侵权请联系站长删除。
转载请注明出处:https://golang.0voice.com/?id=5090

分享:
扫描分享到社交APP
上一篇
下一篇
发表列表
游客 游客
此处应有掌声~
评论列表

还没有评论,快来说点什么吧~

联系我们

在线咨询: 点击这里给我发消息

微信号:3007537140

上班时间: 10:30-22:30

关注我们
x

注册

已经有帐号?