注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

Koala++'s blog

计算广告学 RTB

 
 
 

日志

 
 

Nutch 1.0 源代码分析[4] Generator(1)   

2010-03-24 18:37:16|  分类: Nutch |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |

         Crawl中,第二个比较重要的类是Generator,调用generate的那行代码是:

Path segment = generator.generate(crawlDb, segments, -1, topN,

       System.currentTimeMillis());

generate主要是调用另一个重载函数:

JobConf job = new NutchJob(getConf());

boolean filter = job.getBoolean(CRAWL_GENERATE_FILTER, true);

return generate(dbDir, segments, numLists, topN, curTime, filter, false);

         这个重载的函数的前半部分为:

Path tempDir = new Path(getConf().get("mapred.temp.dir", ".")

       + "/generate-temp-" + System.currentTimeMillis());

 

Path segment = new Path(segments, generateSegmentName());

Path output = new Path(segment, CrawlDatum.GENERATE_DIR_NAME);

 

Path lock = new Path(dbDir, CrawlDb.LOCK_NAME);

FileSystem fs = FileSystem.get(getConf());

LockUtil.createLockFile(fs, lock, force);

         这里如Injector一样创建一个临时文件夹,文件夹名类似:/tmp/hadoop-daowu/ mapred/temp/generate-temp-1268187730024segment文件名类似:crawl/segments/ 20100310102408output是输出目录,文件名类似:crawl/segments/20100310102408/ crawl_generate,再给这个目录加锁。

// map to inverted subset due for fetch, sort by score

JobConf job = new NutchJob(getConf());

job.setJobName("generate: select " + segment);

 

if (numLists == -1) { // for politeness make

    numLists = job.getNumMapTasks(); // a partition per fetch task

}

if ("local".equals(job.get("mapred.job.tracker")) && numLists != 1) {

    numLists = 1;

}

job.setLong(CRAWL_GEN_CUR_TIME, curTime);

// record real generation time

long generateTime = System.currentTimeMillis();

job.setLong(Nutch.GENERATE_TIME_KEY, generateTime);

job.setLong(CRAWL_TOP_N, topN);

job.setBoolean(CRAWL_GENERATE_FILTER, filter);

 

FileInputFormat

       .addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME));

job.setInputFormat(SequenceFileInputFormat.class);

 

job.setMapperClass(Selector.class);

job.setPartitionerClass(Selector.class);

job.setReducerClass(Selector.class);

 

FileOutputFormat.setOutputPath(job, tempDir);

job.setOutputFormat(SequenceFileOutputFormat.class);

job.setOutputKeyClass(FloatWritable.class);

job.setOutputKeyComparatorClass(DecreasingFloatComparator.class);

job.setOutputValueClass(SelectorEntry.class);

try {

    JobClient.runJob(job);

} catch (IOException e) {

    LockUtil.removeLockFile(fs, lock);

    throw e;

}

         输入路径为Injector的输出路径,输入格式也是Injector的输出格式(当然了!)MapperPartitionerReducer都在Selector中实现。输出路径是临时文件夹,输出的keyFloatWritable,而valueSelectorEntry

         Selector中实现的mapper中重要的几行是:

float sort = 1.0f;

sort = scfilters.generatorSortValue((Text) key, crawlDatum, sort);

 

// sort by decreasing score, using DecreasingFloatComparator

sortValue.set(sort);

// record generation time

crawlDatum.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY,

       genTime);

entry.datum = crawlDatum;

entry.url = (Text) key;

output.collect(sortValue, entry); // invert for sort by score

         这里用ScoringFilters计算得分,而valueSelectorEntry对象,它有两个成员CrwalDatum对象,它就是从文件中的value值再加上一个元数据,_ngt_,和url

Selector中实现的是getPartition为:

private Partitioner<Text, Writable> hostPartitioner = new PartitionUrlByHost();

public int getPartition(FloatWritable key, Writable value,

       int numReduceTasks) {

    return hostPartitioner.getPartition(((SelectorEntry) value).url,

           key, numReduceTasks);

}

         PartitionUrlByHost中的getPartition的代码如下:

public int getPartition(Text key, Writable value, int numReduceTasks) {

    String urlString = key.toString();

    try {

       urlString = normalizers.normalize(urlString,

              URLNormalizers.SCOPE_PARTITION);

    }

    URL url = null;

    try {

       url = new URL(urlString);

    }

    int hashCode = (url == null ? urlString : url.getHost()).hashCode();

 

    // make hosts wind up in different partitions on different runs

    hashCode ^= seed;

 

    return (hashCode & Integer.MAX_VALUE) % numReduceTasks;

}

         这里是通过url.getHost().hashCode来进行划分的。

         reducer分开看:

while (values.hasNext() && count < limit) {

    SelectorEntry entry = values.next();

    Text url = entry.url;

    String urlString = url.toString();

    URL u = null;

 

    // skip bad urls, including empty and null urls

    try {

       u = new URL(url.toString());

    }

 

    String host = u.getHost();

    host = host.toLowerCase();

    String hostname = host;

 

if (byIP) {

        if (maxedHosts.contains(host)) {

           continue;

        }

        if (dnsFailureHosts.contains(host)) {

           continue;

        }

        try {

           InetAddress ia = InetAddress.getByName(host);

           host = ia.getHostAddress();

           urlString = new URL(u.getProtocol(), host, u.getPort(),

              u.getFile()).toString();

        } catch (UnknownHostException uhe) {

           // remember hostnames that could not be looked up

           dnsFailureHosts.add(hostname);

           dnsFailure++;

   

           continue;

        }

}

 

         Limitgenerate的参数topN,就是爬前多少个,得到urlhost后,判断这个host是不是已经达到了一定的抓取url数,出现过就不再处理,因为当初partitioner就用host来分的。这里会判断dns能否解析,如果不能,就将它加入到dnsFailureHosts中,下次就不用再处理它了。

try {

    urlString = normalizers.normalize(urlString,

           URLNormalizers.SCOPE_GENERATE_HOST_COUNT);

    host = new URL(urlString).getHost();

}

 

// only filter if we are counting hosts

if (maxPerHost > 0) {

    IntWritable hostCount = hostCounts.get(host);

    if (hostCount == null) {

       hostCount = new IntWritable();

       hostCounts.put(host, hostCount);

    }

 

    // increment hostCount

    hostCount.set(hostCount.get() + 1);

 

    // skip URL if above the limit per host.

    if (hostCount.get() > maxPerHost) {

       if (hostCount.get() == maxPerHost + 1) {

           // remember the raw hostname that is maxed out

           maxedHosts.add(hostname);

       }

       continue;

    }

}

 

output.collect(key, entry);

         记录下每一个host相应的url数量,如果已经超过,就加入到maxedHosts中。

         Reducer将输出结果以得分降序的方式写入文件。

public static class DecreasingFloatComparator extends

       FloatWritable.Comparator {

    /** Compares two FloatWritables decreasing. */

    public int compare(byte[] b1, int s1, int l1, byte[] b2,

int s2, int l2) {

       return super.compare(b2, s2, l2, b1, s1, l1);

    }

}

         这里就是把比较顺序反写了。

 

 

  评论这张
 
阅读(2277)| 评论(1)
推荐 转载

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017