注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

Koala++'s blog

计算广告学 RTB

 
 
 

日志

 
 

Larbin[2]global源代码分析  

2010-04-19 12:07:04|  分类: Larbin |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |

       main.ccmain函数中刚开始调用了global的构造函数,global函数中有一行是parseFile,它是用于解析配置文件larbin.conf文件的。

“UserAgent”: UserAgent

“From”: 使用者的邮箱

“startUrl”: 开始爬取的url

“waitduration”: 访问同一服务器的时间间隔

“proxy”: 代理服务器信息

“pageConnexions”: 最大并行连接数

“dnsConnexions”: DNS最大并行连接数

“httpPort”: 用于使用者查看抓取信息的端口

“inputPort”: 用于向labin添加url等输入信息的telnet端口

“depthInSite”: 指定爬虫爬取深度

“limitToDomain”: 限定爬取的域名

“forbiddenExtensions”: 禁止爬取的扩展名

“noExternalLinks”: 不爬取和页面不在同一站点的URL

       其中对startUrl的解析具体如下:

else if (!strcasecmp(tok, "startUrl")) {

    tok = nextToken(&posParse);

    url *u = new url(tok, global::depthInSite, (url *) NULL);

    if (u->isValid()) {

       check(u);

    }

}

       isValid函数通过判断hostfile是否为NULL,然后是判断URL长度是否有不超过url限度的问题。Check函数已经看过了,但是其中的global::URLsDish->put(u)并没有看过。URLsDiskglobal构造函数中初始化:

URLsDisk = new PersistentFifo(reload, fifoFile);

       Reload是通过参数传进来的,它说明是不是接着上次没爬完的爬,而fifoFiletypes.h中定义,它是文件名,为”fifo”。简单起见,下面是删除了一部分代码的PersistentFifo

PersistentFifo::PersistentFifo(bool reload, char *baseName) {

    fileNameLength = strlen(baseName) + 5;

    fileName = new char[fileNameLength + 2];

    strcpy(fileName, baseName);

    mypthread_mutex_init (&lock, NULL);

else {

       // Delete old fifos

       DIR *dir = opendir(".");

       struct dirent *name;

       name = readdir(dir);

       while (name != NULL) {

           if (startWith(fileName, name->d_name)) {

               unlink(name->d_name);

           }

           name = readdir(dir);

       }

       closedir(dir);

 

makeName(0);

       wfds = creat(fileName, S_IRUSR | S_IWUSR);

       rfds = open(fileName, O_RDONLY);

    }

}

       如果不是reload,它将当前目录下所有以fifo开头的文件全部删除。makeName是取得所要写入URL文件的名字fileName,它是以数字为名字的。

       下面是put函数:

/** Put something in the fifo

 * The objet is then deleted

 */

void PersistentFifo::put(url *obj) {

    mypthread_mutex_lock(&lock);

    char *s = obj->serialize(); // statically allocated string

    writeUrl(s);

    in++;

    updateWrite();

    mypthread_mutex_unlock(&lock);

    delete obj;

}

       Pthread的东西,先将url序列化,调用writeUrl写入或缓存,updateWrite是判断是不是写入了一定量的URL

// write an url in the out file (buffered write)

void PersistentFifo::writeUrl(char *s) {

    size_t len = strlen(s);

    assert(len < maxUrlSize + 40 + maxCookieSize);

    if (outbufPos + len < BUF_SIZE) {

       memcpy(outbuf + outbufPos, s, len);

       outbufPos += len;

    } else {

       // The buffer is full

       flushOut();

       memcpy(outbuf + outbufPos, s, len);

       outbufPos = len;

    }

}

       这里判断写入这个url后会不会超过Buffer大小,BUF_SIZE,如果超过,就先把缓存中的内容flush

void PersistentFifo::updateWrite() {

    if ((in % urlByFile) == 0) {

       flushOut();

       close(wfds);

       makeName(++fin);

       wfds = creat(fileName, S_IRUSR | S_IWUSR);

    }

}

       urlByFile的大小为10,000,如果写入了urlByFileURL则重新将URL写入到另一个新的文件中。

       global的构造函数中调用了input.cc中的initInput函数:

void initInput() {

    if (global::inputPort != 0) {

       int allowReuse = 1;

       struct sockaddr_in addr;

       memset((void *) &addr, 0, sizeof(addr));

       addr.sin_addr.s_addr = INADDR_ANY;

       addr.sin_family = AF_INET;

       addr.sin_port = htons(global::inputPort);

       if ((inputFds = socket(AF_INET, SOCK_STREAM, 0)) == -1 ||

setsockopt( inputFds, SOL_SOCKET, SO_REUSEADDR,

(char*) &allowReuse, sizeof(allowReuse)) || bind(inputFds,

          (struct sockaddr *) &addr, sizeof(addr)) != 0 || listen(

              inputFds, 4) != 0) {

           cerr << "unable to get input socket (port " << global::inputPort

                  << ") : " << strerror(errno) << "\n";

           exit(1);

       }

       fcntl(inputFds, F_SETFL, O_NONBLOCK);

       for (int i = 0; i < maxInput; i++) {

           inputConns[i] = new Input;

       }

       nbInput = 0;

    } else {

       nbInput = -1;

    }

}

       这里判断是否可以从配置中的inputProt端口读,下面是初始化inputConns

回转看input.cc中的几个静态变量:

/** socket used for input */

static int inputFds;

/** number of opened input connections */

static int nbInput;

/** array for the opened input connections */

static Input *inputConns[maxInput];

       inputFds是指示用于输入的socketnbInputopened输入连接数,inputConnsopened输入连接数组。

if (nbInput < maxInput - 1 && global::ansPoll[inputFds]) {

    // test if there is a new connection

    struct sockaddr_in addr;

    int fdc;

    socklen_t len = sizeof(addr);

    fdc = accept(inputFds, (struct sockaddr *) &addr, &len);

    if (fdc != -1) {

       global::verifMax(fdc);

       fcntl(fdc, F_SETFL, O_NONBLOCK);

       inputConns[nbInput]->fds = fdc;

       inputConns[nbInput]->pos = 0;

       inputConns[nbInput]->end_pos = 0;

       inputConns[nbInput]->end_posp = 0;

       inputConns[nbInput]->priority = INIT;

       nbInput++;

    }

}

if (nbInput < maxInput - 1) {

    n = inputFds;

    global::setPoll(inputFds, POLLIN);

}

       这里使用非阻塞方式,priorityINIT。并设置POLLEN,数据可读。

// read open sockets

int i = 0;

while (i < nbInput) {

    Input *in = inputConns[i];

    if (global::ansPoll[in->fds] && readMore(in)) {

       char *line = readline(in);

       readMore函数是通过read函数读数据到in->buff,而readline是将buff中的数据得到。

if (in->priority == INIT) {

    // first line

    if (sscanf(line, "priority:%d depth:%u test:%u",

           &in->priority, &in->depth, &in->test) == 3) {

       line = readline(in);

    } else {

       ecrire(in->fds, "Incorrect input\n");

       line = NULL;

       in->priority = END;

    }

}

       得到in的优先级,爬取深度,是否判断它已经爬取过了。

else {

    // this is an url

    url *u = new url(line, in->depth);

    if (u->isValid()) {

       if (in->test) {

           if (global::seen->testSet(u)) {

              hashUrls(); // stats

              if (in->priority) {

                  global::URLsPriority->put(u);

              } else {

                  global::URLsDisk->put(u);

              }

           } else {

              delete u;

           }

       } else {

           hashUrls(); // stats

           global::seen->set(u);

           if (in->priority) {

              global::URLsPriority->put(u);

           } else {

              global::URLsDisk->put(u);

           }

       }

    } else {

       delete u;

    }

    line = readline(in);

}

       这里用global::seen判断是否这个URL已经见过了,如果in->priority不为NULL,则记录到一个SyncFifo对队列中。

if (in->priority == END) {

    // forget this connection

    ecrire(in->fds, "Bye bye...\n");

    close(in->fds);

    nbInput--;

    Input *tmp = inputConns[i];

    inputConns[i] = inputConns[nbInput];

    inputConns[nbInput] = tmp;

} else { // go to next connection

    if (in->fds > n)

       n = in->fds;

    global::setPoll(in->fds, POLLIN);

    i++;

}

       如果priorityEND,表明无法读,所以就把最后一个Input复制到当前这个,也就是注释中写的forget this connection。如果正常的读取完了,则读取下一个。

 

 

  评论这张
 
阅读(2857)| 评论(0)
推荐 转载

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017