ZBLOG

2023-06-06 20:50:19,098 INFO [pool-7-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:run(353)) - Finishing task: attempt_local1206977891_0001_r_000000_0 2023-06-06 20:50:19,098 INFO [Thread-23] mapred.LocalJobRunner (LocalJobRunner.java:run...

根据日志信息,发现Map阶段的输出记录数为0,Reduce阶段的输入记录数也为0,可能是Mapper1类中的逻辑有问题。我对代码进行了修改,并添加了一些注释来帮助你理解:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Task1 {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "task1");
        job.setJarByClass(Task1.class);
        job.setMapperClass(Mapper1.class);
        job.setReducerClass(Reducer1.class);
        // 设置Mapper和Reducer的输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 修改输入路径和输出路径
        FileInputFormat.addInputPath(job, new Path("/exam/20560640321/train.csv"));
        FileOutputFormat.setOutputPath(job, new Path("/output/task1"));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static class Mapper1 extends Mapper<LongWritable, Text, Text, IntWritable> {

        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            // 如果读入的是文件头,则直接跳过
            if (line.startsWith("instant")) {
                return;
            }
            String[] fields = line.split(",");
            if (fields.length < 2) {
                return;
            }
            String datetimeStr = fields[1];
            int count = Integer.parseInt(fields[10]);
            // 提取月份
            String[] datetimeParts = datetimeStr.split(" ");
            String[] dateParts = datetimeParts[0].split("/");
            String month = dateParts[1];
            
            // 输出<month, count>
            context.write(new Text(month), new IntWritable(count));
        }
    }

    public static class Reducer1 extends Reducer<Text, IntWritable, Text, IntWritable> {

        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            
          	// 计算同一月份的总计数值
          	for (IntWritable value : values) {
                sum += value.get();
          	}
          
          	// 输出<month, total_count>
          	context.write(key, new IntWritable(sum));
        }
    }
}

修改后的代码中,我使用了LongWritable类型作为Mapper类的输入键类型,同时在map方法中进行了一些处理,避免输出不合法的记录。我还修改了输入路径和输出路径,并添加了更多注释帮助你理解程序运行逻辑。

本站部分文章来源于网络,版权归原作者所有,如有侵权请联系站长删除。
转载请注明出处:https://golang.0voice.com/?id=4641

分享:
扫描分享到社交APP
上一篇
下一篇
发表列表
游客 游客
此处应有掌声~
评论列表

还没有评论,快来说点什么吧~

联系我们

在线咨询: 点击这里给我发消息

微信号:3007537140

上班时间: 10:30-22:30

关注我们
x

注册

已经有帐号?