注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

Koala++'s blog

计算广告学 RTB

 
 
 

日志

 
 

Nutch 1.0 源代码分析[11] DeleteDuplicates  

2010-03-24 18:47:26|  分类: Nutch |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |

         DeleteDuplicates前面的注释中写到:

Delete duplicate documents in a set of Lucene indexes. Duplicates have either the same contents (via MD5 hash) or the same URL.

This tool uses the following algorithm:

Phase 1 - remove URL duplicates:

In this phase documents with the same URL are compared, and only the most recent document is retained - all other URL duplicates are scheduled for deletion.

Phase 2 - remove content duplicates:

In this phase documents with the same content hash are compared. If property "dedup.keep.highest.score" is set to true (default) then only the document with the highest score is retained. If this property is set to false, only the document with the shortest URL is retained - all other content duplicates are scheduled for deletion.

Phase 3 - delete documents:

In this phase documents scheduled for deletion are marked as deleted in Lucene index(es).

         删除Lucene索引中的重复文档,重复是可能它们有相同的内容(通过MD5 hash)或是相同的URL

         它通过以下的步骤去重:

第一步-去除相同的URL重复:

         在这一步中有相同URL的文档中,只保留最新的文档,其它文档被标记要删除。

第二步-去掉内容重复:

         在这一步中,比较文档内容hash值是否相同,如果属性“dedup.keep.highest.score”被设为真(默认值),那么只有得分最高的文档保留,如果这个属性值为假,则URL最短的文档保留。其它文档被标记要删除。

         Crawl中的一句为:

dedup.dedup(new Path[] { indexes });

         先看dedup中的第一部分:

Path outDir1 = new Path("dedup-urls-"

       + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));

 

JobConf job = new NutchJob(getConf());

 

for (int i = 0; i < indexDirs.length; i++) {

    FileInputFormat.addInputPath(job, indexDirs[i]);

}

job.setJobName("dedup 1: urls by time");

 

job.setInputFormat(InputFormat.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IndexDoc.class);

 

job.setReducerClass(UrlsReducer.class);

FileOutputFormat.setOutputPath(job, outDir1);

 

job.setOutputKeyClass(MD5Hash.class);

job.setOutputValueClass(IndexDoc.class);

job.setOutputFormat(SequenceFileOutputFormat.class);

 

JobClient.runJob(job);

         UrlReducerreduce函数:

public void reduce(Text key, Iterator<IndexDoc> values,

       OutputCollector<MD5Hash, IndexDoc> output, Reporter reporter)

       throws IOException {

    WritableUtils.cloneInto(latest, values.next());

    while (values.hasNext()) {

       IndexDoc value = values.next();

       if (value.time > latest.time) {

           // discard current and use more recent

           latest.keep = false;

           output.collect(latest.hash, latest);

           WritableUtils.cloneInto(latest, value);

       } else {

           // discard

           value.keep = false;

           output.collect(value.hash, value);

       }

    }

    // keep the latest

    latest.keep = true;

    output.collect(latest.hash, latest);

}

         将一个value复制到latest中,如果下一个value的时间晚于latest,就将latest.keep标记为false,表示要删除,如果下一个value早于latest,就将这个value.keep标记为false。也就是只保留时间最晚的那个文档。

         Dedup的第2部分:

Path outDir2 = new Path("dedup-hash-"

       + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));

job = new NutchJob(getConf());

job.setJobName("dedup 2: content by hash");

 

FileInputFormat.addInputPath(job, outDir1);

job.setInputFormat(SequenceFileInputFormat.class);

job.setMapOutputKeyClass(MD5Hash.class);

job.setMapOutputValueClass(IndexDoc.class);

job.setPartitionerClass(HashPartitioner.class);

job.setSpeculativeExecution(false);

 

job.setReducerClass(HashReducer.class);

FileOutputFormat.setOutputPath(job, outDir2);

 

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IndexDoc.class);

job.setOutputFormat(SequenceFileOutputFormat.class);

 

JobClient.runJob(job);

         HashReducer类的reduce函数:

public void reduce(MD5Hash key, Iterator<IndexDoc> values,

       OutputCollector<Text, IndexDoc> output, Reporter reporter)

       throws IOException {

    boolean highestSet = false;

    while (values.hasNext()) {

       IndexDoc value = values.next();

       // skip already deleted

       if (!value.keep) {

           output.collect(value.url, value);

           continue;

       }

       if (!highestSet) {

           WritableUtils.cloneInto(highest, value);

           highestSet = true;

           continue;

       }

       IndexDoc toDelete = null, toKeep = null;

       boolean metric = byScore ? (value.score > highest.score)

              : (value.urlLen < highest.urlLen);

       if (metric) {

           toDelete = highest;

           toKeep = value;

       } else {

           toDelete = value;

           toKeep = highest;

       }

 

       toDelete.keep = false;

       output.collect(toDelete.url, toDelete);

       WritableUtils.cloneInto(highest, toKeep);

    }

    // no need to add this -in phase 2 we only process docs to delete them

    // highest.keep = true;

    // output.collect(key, highest);

}

         !value.keep表示已经标记为删除了,在当前就只是表示通过URL发现是重复的了,接下来是如果是通过文档得分来判断,即byScore为真,则保留得分最高的文档,否则保留url比较短的文档,最后的注释写到因为第二步只是标记要删除的文档所以没有必要将得分高的文档也返回了。

         Dedup的最后一部分:

fs.delete(outDir1, true);

 

job = new NutchJob(getConf());

job.setJobName("dedup 3: delete from index(es)");

 

FileInputFormat.addInputPath(job, outDir2);

job.setInputFormat(SequenceFileInputFormat.class);

 

job.setInt("io.file.buffer.size", 4096);

job.setMapperClass(DeleteDuplicates.class);

job.setReducerClass(DeleteDuplicates.class);

 

job.setOutputFormat(DeleteDuplicates.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

 

JobClient.runJob(job);

 

fs.delete(outDir2, true);

         重点看DeleteDuplicatesmapreduce函数:

/** Map [*,IndexDoc] pairs to [index,doc] pairs. */

public void map(WritableComparable key, Writable value,

       OutputCollector<Text, IntWritable> output, Reporter reporter)

       throws IOException {

    IndexDoc indexDoc = (IndexDoc) value;

    // don't delete these

    if (indexDoc.keep)

       return;

    // delete all others

    output.collect(indexDoc.index, new IntWritable(indexDoc.doc));

}

         如果indexDoc.keep为真,表示是保留的,不做处理。outputkey是文档的索引号,value是文档编号。

/** Delete docs named in values from index named in key. */

public void reduce(Text key, Iterator<IntWritable> values,

       OutputCollector<WritableComparable, Writable> output,

       Reporter reporter) throws IOException {

    Path index = new Path(key.toString());

    IndexReader reader = IndexReader.open(new FsDirectory(fs, index,

false, getConf()));

    try {

       while (values.hasNext()) {

           IntWritable value = values.next();

           reader.deleteDocument(value.get());

       }

    } finally {

       reader.close();

    }

}

         Map输出的keysegment的索引号,则这里打开这个索引,把这个segment中所有要删除的文档全部删除。

 

 

 

 

 

 

  评论这张
 
阅读(1637)| 评论(0)
推荐 转载

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017