注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

Koala++'s blog

计算广告学 RTB

 
 
 

日志

 
 

Nutch 1.0 源代码分析[6] Fetcher(1)   

2010-03-24 18:40:16|  分类: Nutch |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |

         Crawl中调用fetch的代码为:

fetcher.fetch(segment, threads, org.apache.nutch.fetcher.Fetcher

       .isParsing(conf)); // fetch it

         Fetch的代码为:

JobConf job = new NutchJob(getConf());

job.setJobName("fetch " + segment);

 

job.setInt("fetcher.threads.fetch", threads);

job.set(Nutch.SEGMENT_NAME_KEY, segment.getName());

job.setBoolean("fetcher.parse", parsing);

 

// for politeness, don't permit parallel execution of a single task

job.setSpeculativeExecution(false);

 

FileInputFormat.addInputPath(job, new Path(segment,

       CrawlDatum.GENERATE_DIR_NAME));

job.setInputFormat(InputFormat.class);

 

job.setMapRunnerClass(Fetcher.class);

 

FileOutputFormat.setOutputPath(job, segment);

job.setOutputFormat(FetcherOutputFormat.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(NutchWritable.class);

 

JobClient.runJob(job);

         Job输入目录是generator产生的crawl/segments/20100310102408/crawl_generate目录,输出目录为crawl/segments/20100310102408

         Fetchrun函数中

public void run(RecordReader<Text, CrawlDatum> input,

       OutputCollector<Text, NutchWritable> output, Reporter reporter)

       throws IOException {

    int threadCount = getConf().getInt("fetcher.threads.fetch", 10);

 

    feeder = new QueueFeeder(input, fetchQueues, threadCount * 50);

    feeder.start();

 

    for (int i = 0; i < threadCount; i++) { // spawn threads

       new FetcherThread(getConf()).start();

    }

 

    do { // wait for threads to exit

       try {

           Thread.sleep(1000);

       }

       reportStatus();

 

    } while (activeThreads.get() > 0);

}

         这里默认启动10个线程,并在所有线程结束后退出。而在run里面删掉的几行代码中:

this.fetchQueues = new FetchItemQueues(getConf());

 

feeder = new QueueFeeder(input, fetchQueues, threadCount * 50);

feeder.start();

         QueueFeeder继承自Thread,它的run函数为:

public void run() {

    boolean hasMore = true;

    int cnt = 0;

 

    while (hasMore) {

       int feed = size - queues.getTotalSize();

       if (feed <= 0) {

           try {

              Thread.sleep(1000);

           }

           continue;

       } else {

           while (feed > 0 && hasMore) {

              try {

                  Text url = new Text();

                  CrawlDatum datum = new CrawlDatum();

                  hasMore = reader.next(url, datum);

                  if (hasMore) {

                     queues.addFetchItem(url, datum);

                     cnt++;

                     feed--;

                  }

              }

           }

       }

    }

}

         这里的size值是threadCount * 50,如果size-queues.getTotalSize(),表示队列还是满的,不需要处理。否则从reader中读取一个新的urldatum,加入到queues里,这个reader就是读取generator产生的数据。

         addFetchItem的代码如下:

public void addFetchItem(Text url, CrawlDatum datum) {

    FetchItem it = FetchItem.create(url, datum, byIP);

    if (it != null)

       addFetchItem(it);

}

         Create的代码如下:

public static FetchItem create(Text url, CrawlDatum datum, boolean byIP) {

    String queueID;

    URL u = null;

    try {

       u = new URL(url.toString());

    }

    String proto = u.getProtocol().toLowerCase();

    String host;

    if (byIP) {

       try {

           InetAddress addr = InetAddress.getByName(u.getHost());

           host = addr.getHostAddress();

       }

    } else {

       host = u.getHost();

       if (host == null) {

           return null;

       }

       host = host.toLowerCase();

    }

    queueID = proto + "://" + host;

    return new FetchItem(url, u, datum, queueID);

}

         得到url的协议proto,如果使用ip来做,就得到hostIP地址,否则就是host的名称。queueIDproto://host

public void addFetchItem(FetchItem it) {

    FetchItemQueue fiq = getFetchItemQueue(it.queueID);

    fiq.addFetchItem(it);

    totalSize.incrementAndGet();

}

         getFetchItemQueue的代码如下:

public synchronized FetchItemQueue getFetchItemQueue(String id) {

    FetchItemQueue fiq = queues.get(id);

    if (fiq == null) {

       fiq = new FetchItemQueue(conf, maxThreads, crawlDelay,

              minCrawlDelay);

       queues.put(id, fiq);

    }

    return fiq;

}

         可以看出FetchItemQueue是:每一个protocal://host一个FetchItemQueue,这个FetchItemQueue的构造函数中设置最大线程数,爬的时候的时间间隔和最小爬的时间间隔。

         总结一下。

FetchItem是描述一个要被抓取的项,成员变量:

String queueID;

Text url;

URL u;

CrawlDatum datum;

         FetchItemQueue是处理来自由同一主机ID(或是proto/hostname,或是proto/IP),它记录正在进行的请求和请求之间的时间。成员变量为:

List<FetchItem> queue = Collections

       .synchronizedList(new LinkedList<FetchItem>());

Set<FetchItem> inProgress = Collections

       .synchronizedSet(new HashSet<FetchItem>());

AtomicLong nextFetchTime = new AtomicLong();

long crawlDelay;

long minCrawlDelay;

int maxThreads;

         FetchItemQueues是一个提供简单处理的类,它记录着所有项的,并可以更方便被函数调用,成员变量为:

Map<String, FetchItemQueue> queues = new HashMap<String, FetchItemQueue>();

AtomicInteger totalSize = new AtomicInteger(0);

int maxThreads;

boolean byIP;

long crawlDelay;

long minCrawlDelay;

         QueueFeeder是一个填充queue的类,它会自动填补那些被FetcherThread消耗的项。成员变量:

private RecordReader<Text, CrawlDatum> reader;

private FetchItemQueues queues;

private int size;

         FetcherThreadrun函数中,fetchQueues.getFetchItem()就是从队列中得到一个项:

public synchronized FetchItem getFetchItem() {

    Iterator<Map.Entry<String, FetchItemQueue>> it = queues.entrySet()

           .iterator();

    while (it.hasNext()) {

       FetchItemQueue fiq = it.next().getValue();

       // reap empty queues

       if (fiq.getQueueSize() == 0 && fiq.getInProgressSize() == 0) {

           it.remove();

           continue;

       }

       FetchItem fit = fiq.getFetchItem();

       if (fit != null) {

           totalSize.decrementAndGet();

           return fit;

       }

    }

    return null;

}

         它从queues中得到key的集合,再从这里面取得一个FetchItemQueue,如果取完,就删除这个队列。

 

 

 

 

 

 

 

 

 

  评论这张
 
阅读(2603)| 评论(0)
推荐 转载

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017