注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

Koala++'s blog

计算广告学 RTB

 
 
 

日志

 
 

Nutch 1.0 源代码分析[5] Generator(2)   

2010-03-24 18:39:10|  分类: Nutch |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |

         Generate的中间部分为:

job = new NutchJob(getConf());

job.setJobName("generate: partition " + segment);

 

job.setInt("partition.url.by.host.seed", new Random().nextInt());

 

FileInputFormat.addInputPath(job, tempDir);

job.setInputFormat(SequenceFileInputFormat.class);

 

job.setMapperClass(SelectorInverseMapper.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(SelectorEntry.class);

job.setPartitionerClass(PartitionUrlByHost.class);

job.setReducerClass(PartitionReducer.class);

job.setNumReduceTasks(numLists);

 

FileOutputFormat.setOutputPath(job, output);

job.setOutputFormat(SequenceFileOutputFormat.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(CrawlDatum.class);

job.setOutputKeyComparatorClass(HashComparator.class);

try {

    JobClient.runJob(job);

} catch (IOException e) {

    LockUtil.removeLockFile(fs, lock);

    fs.delete(tempDir, true);

    throw e;

}

         输入目录是前面的临时目录,输出目录是上一篇看过的。

         Map函数在SelectorInverserMapper中实现的:

public static class SelectorInverseMapper extends MapReduceBase implements Mapper<FloatWritable, SelectorEntry, Text, SelectorEntry> {

    public void map(FloatWritable key, SelectorEntry value,

           OutputCollector<Text, SelectorEntry> output,

Reporter reporter) throws IOException {

       SelectorEntry entry = (SelectorEntry) value;

       output.collect(entry.url, entry);

    }

}

         Map是以entry中的url作为keyentry分开。Partitioner使用的是PartitionUrlByHost,这个已经看过了。

         ReducerPartitionReducer实现:

public static class PartitionReducer extends MapReduceBase implements

       Reducer<Text, SelectorEntry, Text, CrawlDatum> {

    public void reduce(Text key, Iterator<SelectorEntry> values,

           OutputCollector<Text, CrawlDatum> output, Reporter reporter)

           throws IOException {

       // if using HashComparator, we get only one input key in case of

       // hash collision so use only URLs from values

       while (values.hasNext()) {

           SelectorEntry entry = values.next();

           output.collect(entry.url, entry.datum);

       }

    }

}

         这里将一个host的所有entry合到一起。注释写到为了防止冲空,这里只用url作为key

/** Sort fetch lists by hash of URL. */

public static class HashComparator extends WritableComparator {

    public int compare(WritableComparable a, WritableComparable b) {

       Text url1 = (Text) a;

       Text url2 = (Text) b;

       int hash1 = hash(url1.getBytes(), 0, url1.getLength());

       int hash2 = hash(url2.getBytes(), 0, url2.getLength());

       return (hash1 < hash2 ? -1 : (hash1 == hash2 ? 0 : 1));

    }

 

    private static int hash(byte[] bytes, int start, int length) {

       int hash = 1;

       // make later bytes more significant in hash code, so that sorting

       // by hashcode correlates less with by-host ordering.

       for (int i = length - 1; i >= 0; i--)

           hash = (31 * hash) + (int) bytes[start + i];

       return hash;

    }

}

         这里的hash值计算的注释写到,让后面的字节在hash值中更重要,这让以hashcode排序时,就不会太与host名称相关了。好比http://163.com/special/http://163.com/sports/,如果是从开始向结尾循环,那http://163.com/这几个就是相同的,那hash值就是相近的。这样就分不开,也就是说这就不是一个好的hash函数。

         Generate的最后一部分:

// update the db from tempDir

Path tempDir2 = new Path(getConf().get("mapred.temp.dir", ".")

       + "/generate-temp-" + System.currentTimeMillis());

 

job = new NutchJob(getConf());

job.setJobName("generate: updatedb " + dbDir);

job.setLong(Nutch.GENERATE_TIME_KEY, generateTime);

FileInputFormat.addInputPath(job, tempDir);

FileInputFormat.addInputPath(job, new Path(dbDir,

       CrawlDb.CURRENT_NAME));

job.setInputFormat(SequenceFileInputFormat.class);

job.setMapperClass(CrawlDbUpdater.class);

job.setReducerClass(CrawlDbUpdater.class);

job.setOutputFormat(MapFileOutputFormat.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(CrawlDatum.class);

FileOutputFormat.setOutputPath(job, tempDir2);

try {

    JobClient.runJob(job);

    CrawlDb.install(job, dbDir);

}

fs.delete(tempDir2, true);

         这里输入目录是刚才select job的输出目录和inject的输出目录,map函数和reduce函数在CrawlDbUpdate中实现:

public void map(WritableComparable key, Writable value,

       OutputCollector<Text, CrawlDatum> output, Reporter reporter)

       throws IOException {

    if (key instanceof FloatWritable) { // tempDir source

       SelectorEntry se = (SelectorEntry) value;

       output.collect(se.url, se.datum);

    } else {

       output.collect((Text) key, (CrawlDatum) value);

    }

}

         这里判断是从哪个文件读来的,如果是tempDir,它的keyFloatWritable,而如果是current它的keyurl,所以要分别处理。

public void reduce(Text key, Iterator<CrawlDatum> values,

       OutputCollector<Text, CrawlDatum> output, Reporter reporter)

       throws IOException {

    while (values.hasNext()) {

       CrawlDatum val = values.next();

       if (val.getMetaData().containsKey(

              Nutch.WRITABLE_GENERATE_TIME_KEY)) {

           LongWritable gt = (LongWritable) val.getMetaData().get(

                  Nutch.WRITABLE_GENERATE_TIME_KEY);

           genTime.set(gt.get());

           if (genTime.get() != generateTime) {

              orig.set(val);

              genTime.set(0L);

              continue;

           }

       } else {

           orig.set(val);

       }

    }

    if (genTime.get() != 0L) {

       orig.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY,

              genTime);

    }

    output.collect(key, orig);

}

         reduce里面判断,如果WRITABLE_GENERATE_TIME_KEY的值表明不是这次generate的,就将genTime设为0

 

 

 

 

 

 

 

 

 

 

 

  评论这张
 
阅读(2045)| 评论(0)
推荐 转载

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017