irpas技术客

大数据从入门到实战 - 第3章 MapReduce基础实战_发芽ing的小啊呜_头歌大数据从入门到实战

未知 4407

大数据从入门到实战 - 第3章 MapReduce基础实战 一、关于此次实践1、实战简介2、全部任务 二、实践详解1、第 1 关:成绩统计2、第 2 关:文件内容合并去重3、第 3 关:信息挖掘 - 挖掘父子关系 叮嘟!这里是小啊呜的学习课程资料整理。好记性不如烂笔头,今天也是努力进步的一天。一起加油进阶吧!

一、关于此次实践 1、实战简介

MapReduce是Hadoop的核心功能之一,掌握它对学习Hadoop至关重要。

Hadoop Map/Reduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集。

本章我们来通过几个示例来学习MapReduce的用法。

2、全部任务

二、实践详解 1、第 1 关:成绩统计 import java.io.IOException; import java.util.StringTokenizer; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { /********** Begin **********/ //Mapper函数 public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private int maxValue = 0; public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString(),"\n"); while (itr.hasMoreTokens()) { String[] str = itr.nextToken().split(" "); String name = str[0]; one.set(Integer.parseInt(str[1])); word.set(name); context.write(word,one); } //context.write(word,one); } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int maxAge = 0; int age = 0; for (IntWritable intWritable : values) { maxAge = Math.max(maxAge, intWritable.get()); } result.set(maxAge); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); String inputfile = "/user/test/input"; String outputFile = "/user/test/output/"; FileInputFormat.addInputPath(job, new Path(inputfile)); FileOutputFormat.setOutputPath(job, new Path(outputFile)); job.waitForCompletion(true); /********** End **********/ } } 命令行 touch file01 echo Hello World Bye World cat file01 echo Hello World Bye World >file01 cat file01 touch file02 echo Hello Hadoop Goodbye Hadoop >file02 cat file02 start-dfs.sh hadoop fs -mkdir /usr hadoop fs -mkdir /usr/input hadoop fs -ls /usr/output hadoop fs -ls / hadoop fs -ls /usr hadoop fs -put file01 /usr/input hadoop fs -put file02 /usr/input hadoop fs -ls /usr/input

测评

2、第 2 关:文件内容合并去重 import java.io.IOException; import java.util.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class Merge { /** * @param args * 对A,B两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件C */ //在这重载map函数,直接将输入中的value复制到输出数据的key上 注意在map方法中要抛出异常:throws IOException,InterruptedException public static class Map extends Mapper<Object, Text, Text, Text>{ /********** Begin **********/ public void map(Object key, Text value, Context content) throws IOException, InterruptedException { Text text1 = new Text(); Text text2 = new Text(); StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { text1.set(itr.nextToken()); text2.set(itr.nextToken()); content.write(text1, text2); } } /********** End **********/ } //在这重载reduce函数,直接将输入中的key复制到输出数据的key上 注意在reduce方法上要抛出异常:throws IOException,InterruptedException public static class Reduce extends Reducer<Text, Text, Text, Text> { /********** Begin **********/ public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Set<String> set = new TreeSet<String>(); for(Text tex : values){ set.add(tex.toString()); } for(String tex : set){ context.write(key, new Text(tex)); } } /********** End **********/ } public static void main(String[] args) throws Exception{ // TODO Auto-generated method stub Configuration conf = new Configuration(); conf.set("fs.default.name","hdfs://localhost:9000"); Job job = Job.getInstance(conf,"Merge and duplicate removal"); job.setJarByClass(Merge.class); job.setMapperClass(Map.class); job.setCombinerClass(Reduce.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); String inputPath = "/user/tmp/input/"; //在这里设置输入路径 String outputPath = "/user/tmp/output/"; //在这里设置输出路径 FileInputFormat.addInputPath(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

测评

3、第 3 关:信息挖掘 - 挖掘父子关系 import java.io.IOException; import java.util.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class simple_data_mining { public static int time = 0; /** * @param args * 输入一个child-parent的表格 * 输出一个体现grandchild-grandparent关系的表格 */ //Map将输入文件按照空格分割成child和parent,然后正序输出一次作为右表,反序输出一次作为左表,需要注意的是在输出的value中必须加上左右表区别标志 public static class Map extends Mapper<Object, Text, Text, Text>{ public void map(Object key, Text value, Context context) throws IOException,InterruptedException{ /********** Begin **********/ String line = value.toString(); String[] childAndParent = line.split(" "); List<String> list = new ArrayList<>(2); for (String childOrParent : childAndParent) { if (!"".equals(childOrParent)) { list.add(childOrParent); } } if (!"child".equals(list.get(0))) { String childName = list.get(0); String parentName = list.get(1); String relationType = "1"; context.write(new Text(parentName), new Text(relationType + "+" + childName + "+" + parentName)); relationType = "2"; context.write(new Text(childName), new Text(relationType + "+" + childName + "+" + parentName)); } /********** End **********/ } } public static class Reduce extends Reducer<Text, Text, Text, Text>{ public void reduce(Text key, Iterable<Text> values,Context context) throws IOException,InterruptedException{ /********** Begin **********/ //输出表头 if (time == 0) { context.write(new Text("grand_child"), new Text("grand_parent")); time++; } //获取value-list中value的child List<String> grandChild = new ArrayList<>(); //获取value-list中value的parent List<String> grandParent = new ArrayList<>(); //左表,取出child放入grand_child for (Text text : values) { String s = text.toString(); String[] relation = s.split("\\+"); String relationType = relation[0]; String childName = relation[1]; String parentName = relation[2]; if ("1".equals(relationType)) { grandChild.add(childName); } else { grandParent.add(parentName); } } //右表,取出parent放入grand_parent int grandParentNum = grandParent.size(); int grandChildNum = grandChild.size(); if (grandParentNum != 0 && grandChildNum != 0) { for (int m = 0; m < grandChildNum; m++) { for (int n = 0; n < grandParentNum; n++) { //输出结果 context.write(new Text(grandChild.get(m)), new Text( grandParent.get(n))); } } } /********** End **********/ } } public static void main(String[] args) throws Exception{ // TODO Auto-generated method stub Configuration conf = new Configuration(); Job job = Job.getInstance(conf,"Single table join"); job.setJarByClass(simple_data_mining.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); String inputPath = "/user/reduce/input"; //设置输入路径 String outputPath = "/user/reduce/output"; //设置输出路径 FileInputFormat.addInputPath(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

测评

Ending! 更多课程知识学习记录随后再来吧!

就酱,嘎啦!

注: 人生在勤,不索何获。


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #头歌大数据从入门到实战 #大数据从入门到实战 #第3章 #1 #关成绩统计2第 #2