ZBLOG

import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop...

根据日志信息,错误在于Reducer没有成功运行,同时Mapper的输出记录数为0。

问题可能出现在Reducer的输入类型上。Reducer期望接收的是IntWritable和Iterable类型,但是由于Mapper中使用了相同的键和值类型,导致在Shuffle阶段无法正确进行分组操作。因此需要更改Mapper的输出类型为Text和IntWritable,并修改对应的代码逻辑。

以下是修改后的代码:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Task1 {

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "task1");
    job.setJarByClass(Task1.class);
    job.setMapperClass(Mapper1.class);
    job.setReducerClass(Reducer1.class);
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/exam/20560640321/train.csv"));
    FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output/task1"));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

public static class Mapper1 extends Mapper<Object, Text, Text, IntWritable> {

    @Override
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        // 解析csv文件中的一行记录
        String[] fields = value.toString().split(",");
        if (fields.length < 2) {
            // 处理字段数目不足的情况
            return;
        }
        String datetimeStr = fields[1];
        int count = Integer.parseInt(fields[10]);
        // 提取月份
        String[] datetimeParts = datetimeStr.split(" ");
        String[] dateParts = datetimeParts[0].split("/");
        String month = dateParts[1];
        // 输出月份和计数值
        context.write(new Text(month), new IntWritable(count));
    }
}

public static class Reducer1 extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        // 统计同一月份的总共享单车使用量
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        // 输出月份和总共享单车使用量
        context.write(key, new IntWritable(sum));
    }
}

}

希望以上修改能够解决你的问题。

本站部分文章来源于网络,版权归原作者所有,如有侵权请联系站长删除。
转载请注明出处:https://golang.0voice.com/?id=4639

分享:
扫描分享到社交APP
上一篇
下一篇
发表列表
游客 游客
此处应有掌声~
评论列表

还没有评论,快来说点什么吧~

联系我们

在线咨询: 点击这里给我发消息

微信号:3007537140

上班时间: 10:30-22:30

关注我们
x

注册

已经有帐号?