Map/Reduce中Join查询实现皇牌天下投注网

来源:http://www.prospettivedarte.com 作者:计算机教程 人气:148 发布时间:2019-11-04
摘要:3. 代码分析 Example 8-9. Application to find the maximum temperature by sorting temperatures in the key         public class MaxTemperatureUsingSecondarySort extends Configured implements Tool {          

3. 代码分析

  1. Example 8-9. Application to find the maximum temperature by sorting temperatures in the key  
  2.   
  3.   
  4. public class MaxTemperatureUsingSecondarySort extends Configured implements Tool {  
  5.   
  6.   
  7.   // Map任务  
  8.   static class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntPair, NullWritable> {  
  9.   private NcdcRecordParser parser = new NcdcRecordParser();  
  10.   public void map(LongWritable key, Text value,  
  11.       OutputCollector<IntPair, NullWritable> output, Reporter reporter)  
  12.       throws IOException {  
  13.     parser.parse(value);   // 解析输入的文本  
  14.     if (parser.isValidTemperature()) {  
  15.     // 这里把年份与温度组合成一个key,value为空  
  16.       output.collect(new IntPair(parser.getYearInt(),  parser.getAirTemperature()), NullWritable.get());  
  17.     }  
  18.   }  
  19. }  
  20. 皇牌天下投注网,  
  21.   
  22. // Reduce任务  
  23. static class MaxTemperatureReducer extends MapReduceBase  
  24.   implements Reducer<IntPair, NullWritable, IntPair, NullWritable> {  
  25.   public void reduce(IntPair key, Iterator<NullWritable> values,  
  26.       OutputCollector<IntPair, NullWritable> output, Reporter reporter)  
  27.       throws IOException {  
  28.     // 输出聚合的key值,这里的key是先按年份进行聚合,所我们会看到相同所有年份相同的key会聚合在一起,而这些聚合后的key按温度进行降序按列  
  29.     // 所以聚合中第一个key为温度最高的,所以这里输出的key为这一年中温度最高的值  
  30.     output.collect(key, NullWritable.get());  
  31.   }  
  32. }  
  33.   
  34.   
  35. // 切分器,这里是按年份* 127 % reduceNum来进行切分的  
  36. public static class FirstPartitioner  
  37.   implements Partitioner<IntPair, NullWritable> {  
  38.   @Override  
  39.   public void configure(JobConf job) {}  
  40.   @Override  
  41.   public int getPartition(IntPair key, NullWritable value, int numPartitions) {  
  42.     return Math.abs(key.getFirst() * 127) % numPartitions;  
  43.   }  
  44. }  
  45.   
  46.   
  47. // 聚合key的一个比较器  
  48. public static class KeyComparator extends WritableComparator {  
  49.   protected KeyComparator() {  
  50.     super(IntPair.class, true);  
  51.   }  
  52.   @Override  
  53.   public int compare(WritableComparable w1, WritableComparable w2) {  
  54.     IntPair ip1 = (IntPair) w1;  
  55.     IntPair ip2 = (IntPair) w2;  
  56.     // 这里要注意的是,一定要在聚合参数相同的情况下,再比较另一个参数  
  57.     // 这里是先比较年份,再比较温度,按温度降序排序  
  58.     int cmp = IntPair.compare(ip1.getFirst(), ip2.getFirst());  
  59.     if (cmp != 0) {  
  60.       return cmp;  
  61.     }  
  62.     return -IntPair.compare(ip1.getSecond(), ip2.getSecond()); //reverse  
  63.   }  
  64. }  
  65.   // 设置聚合比较器  
  66.   public static class GroupComparator extends WritableComparator {  
  67.     protected GroupComparator() {  
  68.       super(IntPair.class, true);  
  69.     }  
  70.     @Override  
  71.     public int compare(WritableComparable w1, WritableComparable w2) {  
  72.       IntPair ip1 = (IntPair) w1;  
  73.       IntPair ip2 = (IntPair) w2;  
  74.     // 这里是按key的第一个参数来聚合,就是年份  
  75.       return IntPair.compare(ip1.getFirst(), ip2.getFirst());  
  76.     }  
  77.   }  
  78.   @Override  
  79.   public int run(String[] args) throws IOException {  
  80.     JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args);  
  81.     if (conf == null) {  
  82.       return -1;  
  83.     }  
  84.     conf.setMapperClass(MaxTemperatureMapper.class);  
  85.     conf.setPartitionerClass(FirstPartitioner.class);     // 设置切分器  
  86.     conf.setOutputKeyComparatorClass(KeyComparator.class); // 设置key的比较器  
  87.     conf.setOutputValueGroupingComparator(GroupComparator.class); // 设置聚合比较器  
  88.     conf.setReducerClass(MaxTemperatureReducer.class);  
  89.     conf.setOutputKeyClass(IntPair.class);  // 设置key的一个组合类型,如里这个类型实现了WritableComparable<T>的话,那就不要设置setOutputKeyComparatorClass了.  
  90.     conf.setOutputValueClass(NullWritable.class);  // 输出的value为NULL,因为这里的实际value已经组合到了key中  
  91.     JobClient.runJob(conf);  
  92.     return 0;  
  93.   }  
  94.   
  95.   
  96.   public static void main(String[] args) throws Exception {  
  97.     int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(), args);  
  98.     System.exit(exitCode);  
  99.   }  
  100. }  

输出结果如下:

 

  1. % hadoop jar job.jar MaxTemperatureUsingSecondarySort input/ncdc/all   
  2. > output-secondarysort  
  3. % hadoop fs -cat output-secondarysort/part-* | sort | head  
  4. 1901    317  
  5. 1902    244  
  6. 1903    289  
  7. 1904    256  
  8. 1905    283  
  9. 1906    294  
  10. 1907    283  
  11. 1908    289  

要注意的是有时候用户会存储这些value,所以你要在Map的时候输出这些value,而不是上面的NULL,而输出的这些value是经过排序的。皇牌天下投注网 1

C、Map执行完成之后,输出的中间结果如下:

1. 问题的提出

对于如下数据,我们要计算出每一年的最高温度值:

  1. (1900,34)  
  2. (1900,32)  
  3. ....  
  4. (1950, 0)  
  5. (1950, 22)  
  6. (1950, −11)  
  7. (1949, 111)  
  8. (1949, 78)  

计算结果可能如下:

  1. 1901 317  
  2. 1902 244  
  3. 1903 289  
  4. 1904 256  
  5. ...  
  6. 1949 111  

我们一般的方法是把key设置成年份,把value设置成温度,在reduce的时候去遍历所有相同key的value值,找出最大的那个值,在Reduce返回的时候,只collect这个最大的值,这是一种办法,但是这种办法在效率上相对比较差,也不够灵活,下面我们来看看怎么使用Secondary Sort来解决这个问题

            System.err.println("Usage: Example_Join_01 <in_path_one> <in_path_two> <output>");

我们知道,在reduce之前,MP框架会对收到的<K,V>对按K进行排序,而对于一个特定的K来说,它的List<V>是没有被排过序的,就是说这些V是无序的,因为它们来自不同的Map端,而且很多应用也不依赖于K所对应的list<V>的顺序,但是有一些应用就要就要依赖于相同K的V的顺序,而且还要把他们聚合在一起,下面会提出这样一个问题,是参考Hadoop The Defiinitive Guide的第八章 (下载见 http://www.linuxidc.com/Linux/2012-01/51182.htm )。

        second.write(out);

2. Secondary Sort

Secondary Sort 实际上就是一种对Value进行二次排序,然后按key的特定部分进行聚合的方法,这里用到了一个组合Key的概念,就是把K与要排序的Value组合在一起,生成一个新的Key值,上面的例子中,新的组合key为<1900,32>,也就是<年份,温度>的组合,(1900, 35°C),(1900, 34°C),这样组合以后,生成一个新的key,但是这样组合以后,它们会被切分到不同的Reduce上,所以我们这里要写一个根据新组合的key的第一个参数(年份)来进行相应的partitioner,在JobConf中可以使用setPartitionerClass来进行设置,这样可以解决相同年份的key会被聚合在同一个Reduce上,但是还没有解析在同一个Reduce上,把部分key相同的记录聚合(group)在一起,所以这里我们要设置一个group的比较器,这样就可以把相同年份的记录聚合在一起,但对于相同key(这里是指key中第一个参数相同)的排序问题,我们要使用一个KeyComparator比较器来做,就是在group中对key进行二次排序,在上面例子中就是按key中第二个参数温度来降序排序,这里要注意的是这里的输入是key,而不是value,这就是我们为什么把value组合在key一起的原因,而写这个比较方法的时候,还要注意一定要符合Group方法的原因,如果group是按key的第一个参数来得,那这里key的比较就要在第一个参数相同的情况下,才能会第二个参数(value)进行比较,我想这里解释了为什么这种排序叫Secondary Sort的原因吧,在上面的例子中,key的比较是先比较第一个参数(年份),如果第一个参数相同,再比较第二个参数(温度),按第二个参数降序排列。

所以一般要使用Secondary Sort,在JobConf要配置这三个参数

  • setPartitionerClass                // 这个是用来设置key的切分,上面例子中是按key中的第一个参数来切分
  • setOutputValueGroupingComparator   // 这里设置group,就是按key的哪一个参数进行聚合,上面的例子中也是按第一个参数年份进行聚合
  • setOutputKeyComparatorClass        // 这个是设置key的比较器,设置聚合的key的一个二次排序方法

            return t1.getFirst().compareTo(t2.getFirst());

        if (cmp != 0) {

            }

1003,0    201001    abc

        // 设置输入和输出的目录

 

1005,0    jue

    }

 

                    return;

        first.readFields(in);

 

1003,0    kaka

七、其他的支撑代码

        }

B、资料表,也就是这里的info.txt需要放在前面,也就是标识号是0.否则无法输出理想结果。

        this.first = first;

                } else {

1003,0    kaka

        // 设置reduce

 

        job.setMapperClass(Example_Join_01_Mapper.class);

1006,0    zhao

1005,1    201006    pqr

                    // 数据格式规范,区分标识为1

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

                if (values.length < 2) {

    public TextPair(String first, String second) {

3、遍历余下的结果,输出。

 

1004,0    da

 

            // 获取输入文件的全路径和名称

 

        GenericOptionsParser parser = new GenericOptionsParser(conf, agrs);

1005,0    jue

 

    public TextPair(Text first, Text second) {

public static class Example_Join_01_Comparator extends WritableComparator {

1005,1    201002    def

public static class Example_Join_01_Reduce extends Reducer<TextPair, Text, Text, Text> {

        public int compare(WritableComparable a, WritableComparable b) {

public static class Example_Join_01_Partitioner extends Partitioner<TextPair, Text> {

1、首先是TextPair代码,没有什么可以细说的,贴出来:

 

 

        @Override

 

1、代码比较简单,首先获取关键的ID值,就是key的第一个字段。

201006    1005    pqr

2、分组操作,代码如下

                InterruptedException {

同一区:

                    // data数据格式不规范,字段小于2,抛弃数据

                    context.write(tp, new Text(values[1]));

在做这个Join查询的时候,必然涉及数据,我这里设计了2张表,分别较data.txt和info.txt,字段之间以/t划分。

1005    jue

1006    201003    ghi    zhao

        //conf.set("hadoop.job.ugi", "root,hadoop");

1005    201006    pqr    jue

        }

        // 设置运行的job

        if (agrs.length < 3) {

201002    1005    def

}

 

1003    201004    jkl    kaka

分区我在以前的文档中写过,这里不做描述了,就说是按照map输出的符合key的第一个字段做分区关键字。分区之后,相同key会划分到一个reduce中去处理(如果reduce设置是1,那么就是分区有多个,但是还是在一个reduce中处理。但是结果会按照分区的原则排序)。分区后结果大致如下:

 

 

201001    1003    abc

首先是map的代码,我贴上,然后简要说说

        set(new Text(), new Text());

    public void readFields(DataInput in) throws IOException {

                    TextPair tp = new TextPair(new Text(values[0]), new Text("0"));

        set(new Text(first), new Text(second));

 

 

    }

八、总结

1003,1    201001    abc

                }

        }

1、map之后的输出会进行一些分区的操作,代码贴出来:

                    // data数据格式不规范,字段小于3,抛弃数据

                    // 数据格式规范,区分标识为0

1006    zhao

    }

1003,1    201004    jkl

 

同一组:

        job.setGroupingComparatorClass(Example_Join_01_Comparator.class);

        @SuppressWarnings("unchecked")

贴上代码如下:

1003    kaka

2、对于资源表,如果我们采用0和1这样的模式来区分,资源表是需要放在前的。例如本例中info.txt就是资源表,所以标识位就是0.如果写为1的话,可以试下,在分组之后,资源表对应的值放在了迭代器最后一位,无法追加在最后所有的结果集合中。

            TextPair t1 = (TextPair) a;

    private Text second;

A、pathName是文件在HDFS中的全路径(例如:hdfs://M1:9000/MengYan/join/data/info.txt),可以以endsWith()的方法来判断。

1003    201001    abc    kaka

            super(TextPair.class, true);

        }

        public int getPartition(TextPair key, Text value, int numParititon) {

        job.setJarByClass(Example_Join_01.class);

 

        job.setMapOutputKeyClass(TextPair.class);

 

        System.exit(job.waitForCompletion(true) ? 0 : 1);

分组操作就是把在相同分区的数据按照指定的规则进行分组的操作,就以上来看,是按照复合key的第一个字段做分组原则,达到忽略复合key的第二个字段值的目的,从而让数据能够迭代在一个reduce中。输出后结果如下:

1004,1    201005    mon

        this.second = second;

            }

1003,1    201004    jkl

1005,1    201002    def

1004    da

    }

                } else {

1003,0    kaka

1004,0    201005    mon

 

    }

 

本文由皇牌天下投注网发布于计算机教程,转载请注明出处:Map/Reduce中Join查询实现皇牌天下投注网

关键词:

上一篇:类的加载和双亲委派模型,双亲委派模型

下一篇:没有了

最火资讯