ZBLOG

2023-06-06 20:50:19,098 INFO [pool-7-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:run(353)) - Finishing task: attempt_local1206977891_0001_r_000000_0 2023-06-06 20:50:19,098 INFO [Thread-23] mapred.LocalJobRunner (LocalJobRunner.java:run...

根据日志信息,发现Map阶段的输出记录数为0,Reduce阶段的输入记录数也为0,可能是Mapper1类中的逻辑有问题。我对代码进行了修改,并添加了一些注释来帮助你理解:

  1. import java.io.IOException;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.io.LongWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Job;
  8. import org.apache.hadoop.mapreduce.Mapper;
  9. import org.apache.hadoop.mapreduce.Reducer;
  10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  11. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  12. public class Task1 {
  13. public static void main(String[] args) throws Exception {
  14. Configuration conf = new Configuration();
  15. Job job = Job.getInstance(conf, "task1");
  16. job.setJarByClass(Task1.class);
  17. job.setMapperClass(Mapper1.class);
  18. job.setReducerClass(Reducer1.class);
  19. // 设置Mapper和Reducer的输出类型
  20. job.setMapOutputKeyClass(Text.class);
  21. job.setMapOutputValueClass(IntWritable.class);
  22. job.setOutputKeyClass(Text.class);
  23. job.setOutputValueClass(IntWritable.class);
  24. // 修改输入路径和输出路径
  25. FileInputFormat.addInputPath(job, new Path("/exam/20560640321/train.csv"));
  26. FileOutputFormat.setOutputPath(job, new Path("/output/task1"));
  27. System.exit(job.waitForCompletion(true) ? 0 : 1);
  28. }
  29. public static class Mapper1 extends Mapper<LongWritable, Text, Text, IntWritable> {
  30. @Override
  31. public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  32. String line = value.toString();
  33. // 如果读入的是文件头,则直接跳过
  34. if (line.startsWith("instant")) {
  35. return;
  36. }
  37. String[] fields = line.split(",");
  38. if (fields.length < 2) {
  39. return;
  40. }
  41. String datetimeStr = fields[1];
  42. int count = Integer.parseInt(fields[10]);
  43. // 提取月份
  44. String[] datetimeParts = datetimeStr.split(" ");
  45. String[] dateParts = datetimeParts[0].split("/");
  46. String month = dateParts[1];
  47. // 输出<month, count>
  48. context.write(new Text(month), new IntWritable(count));
  49. }
  50. }
  51. public static class Reducer1 extends Reducer<Text, IntWritable, Text, IntWritable> {
  52. @Override
  53. public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  54. int sum = 0;
  55. // 计算同一月份的总计数值
  56. for (IntWritable value : values) {
  57. sum += value.get();
  58. }
  59. // 输出<month, total_count>
  60. context.write(key, new IntWritable(sum));
  61. }
  62. }
  63. }

修改后的代码中,我使用了LongWritable类型作为Mapper类的输入键类型,同时在map方法中进行了一些处理,避免输出不合法的记录。我还修改了输入路径和输出路径,并添加了更多注释帮助你理解程序运行逻辑。

本站部分文章来源于网络,版权归原作者所有,如有侵权请联系站长删除。
转载请注明出处:https://golang.0voice.com/?id=4641

分享:
扫描分享到社交APP
上一篇
下一篇
发表列表
游客 游客
此处应有掌声~
评论列表

还没有评论,快来说点什么吧~

联系我们

在线咨询: 点击这里给我发消息

微信号:3007537140

上班时间: 10:30-22:30

关注我们
x

注册

已经有帐号?