Posts Tagged ‘code’

[python] 共享一下关于python资料及source code

Monday, May 26th, 2008

书籍资料以及书籍中的code

1. ebook-chm-python-oreilly-programming-python-2nd-edition

2. python-programming-an-introduction-to-computer-science

3. ebook-thinking-in-python-html-code

4. 2000-python-programming-on-win32-code-oreilly practical-python-source-code

5. python-cookbookoreilly2005code

6. python_programming_on_win32_sourcecode

7. ebook-program-core-python-programming

8. oreillypythoninanutshell2003-chm

9. oreilly-programming-python-2nd-edition-with-source-code

[python] 一个简单的spider

Thursday, May 22nd, 2008

今天刚装一个插件 Google Syntax Highlighter for WordPress,用来高亮显示各种code的.于是用python写了一个非常简单的spider.试验一下.:)

# #############################
# Spider
#        get the page content of the URI
#

class Spider:
    def __init__(self):
        """
        """
        self._count = 0
        self._data = 0
        self._cost = 0
        self._agents = []
        self._agents.append('Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.8.1.14) Gecko/20080404 Firefox/2.0.0.14')
        self._agents.append('Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1)')
        self._agents.append('Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.8.1.13) Gecko/20080311 Firefox/2.0.0.13')
        self._agents.append('Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)')
        self._agents.append('Mozilla/5.0 (compatible; YodaoBot/1.0; http://www.yodao.com/help/webmaster/spider/; )')
        self._agents.append('Mozilla/5.0 (compatible; Yahoo! Slurp; http://help.yahoo.com/help/us/ysearch/slurp)')
        self._failed = 0
        self._name = "G1029"
    def get(self, u):
        """
        """
        s = ""
        c = time.time()
        try:
            random.shuffle(self._agents)
            request = urllib2.Request(url=u)
            request.add_header('User-Agent', self._agents[0])
            f = urllib2.urlopen(request)
            s = f.read()
            f.close()
        except:
            pass
            self._failed += 1
            print "ERROR: "+u
        self._count += 1
        self._cost += (time.time() - c)
        self._data += len(s)
        return s
    def set(self, item, value):
        """
        """
        if item == 'name' :
            self._name = value
        else :
            return None
        return 1
    def count(self):
        """
        """
        return self._count
    def failed(self):
        """
        """
        return self._failed
    def cost(self):
        """
        """
        return self._cost
    def data(self):
        """
        """
        return self._data
    def dump(self):
        """
        """
        print 'Spider Name: %s Count: %d Failed: %d Cost: %d Data: %d' % (self._name, self._count, self._failed, self._cost, self._data)
        return



似乎不错哦, 呵呵.先用这个吧

[python] Simple Watch Dog

Thursday, May 15th, 2008


为了让spider的进程可以做到简单的保护,做了一个简单的watch dog. 原理很简单, 就是给dog喂食, 如果dog发现没有及时喂食, 就kill要守护的process,然后restart这个process.

主要class 有两个:

  • watchdog 负责守护进程
  • feeder 负责喂食

用法也很简单

  • 启动watchdog
  • 在你想要守护的进程中加入feeder,

几点说明:

  • 由于是时间仓促, 其中,有些地方做的还比较简单, 没有异常处理, 请见谅 :)
  • 我做的feeder是为了在我的python spider使用, 其他语言自己port一下吧

具体代码下载( watchdog)

import os
import sys
import time
import copy
import socket
import threading

class Watch_Dog (object):
    def __init__(self):
        self._exit = 0
        self._sock = None
        self._processes = {}
        self._processes_lock = threading.Lock()
        self._feed_interval = 120
        self._check_interval = 30
        self._addr = None
        return
    def open(self, addr):
        # record address
        self._addr = addr
        # create sock and bind to addr
        try:
            self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            self._sock.bind(self._addr)
            self._sock.setblocking(0)
        except:
            pass
            print 'creat or bind socket failed'
            return 0
        # create worker thread
        h = threading.Thread(target=self._worker, args=())
        h.start()
        return 1
    def close(self):
        # close thread
        self._exit = 1
        #close socket
        try:
            self._sock.close()
        except:
            pass
        return
    def process(self):
        last_check_time = time.time()
        while not self._exit:
            if time.time() - last_check_time > self._check_interval :
                last_check_time = time.time()
                self._check()
            time.sleep(1)
        return
    def _worker(self):
        while not self._exit:
            try:
                recved, addr = self._sock.recvfrom(1024)
                if len(recved) > 0:
                    ss = recved.split('\n')
                    if len(ss) == 3 and ss[0] == 'register' :
                        self._register(ss[1], ss[2])
                    elif len(ss) == 2 and ss[0] == 'feed' :
                        self._feed(ss[1])
            except:
                pass
            time.sleep(0.1)
        return
    def _register(self, id, path):
        print 'regiester', id, path
        key = hash(id)
        self._processes_lock.acquire()
        if not self._processes.has_key(key) :
            self._processes[key] = [id, path, time.time()]
        self._processes_lock.release()
        return
    def _feed(self, id):
        print 'feed', id
        key = hash(id)
        self._processes_lock.acquire()
        if self._processes.has_key(key) :
            self._processes[key][2] = time.time()
        self._processes_lock.release()
        return
    def _check(self):
        print time.time(), 'check process'
        # make copy
        self._processes_lock.acquire()
        processes = copy.copy(self._processes)
        self._processes_lock.release()
        # check each robot
        for k in processes.keys():
            id = processes[k][0]
            pt = processes[k][1]
            tm = processes[k][2]
            if time.time() - tm > self._feed_interval :
                self._remove(id)
                pid = int(id)
                h = threading.Thread(target=self._restart, args=(pid, pt))
                h.start()
        return
    def _remove(self, id):
        key = hash(id)
        self._processes_lock.acquire()
        del self._processes[key]
        self._processes_lock.release()
        return
    def _restart(self, pid, path):
        print 'thread >> start kill',pid,'restart',path
        if os.name == 'nt' :
            os.popen('taskkill /F /PID '+str(pid))
            os.startfile(path)
        elif os.name == 'posix' :
            os.popen('kill -9 '+str(pid))
            os.spawnvp(os.P_NOWAIT, 'python', ['python', path])
        else :
            print 'not suppot this os now'
        print 'thread >> finish'
        return

class Feeder (object) :
    def __init__(self):
        self._sock = None
        self._pid = None
        self._exit = 0
        self._feed_interval = 10
        self._addr = None
        self._path = None
        return
    def open(self, addr, path):
        self._addr = addr
        self._path = path
        self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self._pid = os.getpid()
        self._register()
        h = threading.Thread(target=self._worker, args=())
        h.start()
        return
    def close(self):
        self._exit = 1
        self._sock.close()
        return
    def _worker(self):
        last_feed_time = time.time()
        while not self._exit :
            if time.time() - last_feed_time > self._feed_interval :
                last_feed_time = time.time()
                self._feed()
            time.sleep(1)
        return
    def _register(self):
        self._sock.sendto('register\n'+str(self._pid)+'\n'+self._path, self._addr)
        return
    def _feed(self):
        self._sock.sendto('feed\n'+str(self._pid), self._addr)
        return

if __name__ == '__main__':

    print 'Current OS :', os.name

    addr = ('localhost', 15028)

    wd = Watch_Dog()

    if not wd.open(addr) : exit(-1)

    wd.process()

    wd.close()

Zend Framework — 中文分词 Analyzer

Tuesday, April 29th, 2008

最近在看Zend_Lucene_Search.查阅了一些资料. 这篇文章写的不错, 推荐一下.估计已经有很多人看过了.但是,其中的例子有点小问题, 需要修改一下才能跑起来, 我这里简单修改了一下, 放到附件中, 供以后需要的朋友下载 analyzer code.

附录代码(请不要直接使用, 请下载上面的zip中的code使用):

require_once 'Zend/Search/Lucene/Analysis/Analyzer.php'; 

class Simple_Chinese_Analyzer extends Zend_Search_Lucene_Analysis_Analyzer_Common
{

private $_position;

private $_cnStopWords = array();

public function setCnStopWords($cnStopWords){
$this->_cnStopWords = $cnStopWords;
}

public function reset()
{
$this->_position = 0;
$search = array(
",", "/", "\\", ".", ";", ":", "\"", "!", "~", "`", "^",
"(", ")", "?", "-", "t", "n", "'", "<", ">", "\r", "\r\n",
"$", "&", "%", "#", "@", "+", "=", "{", "}", "[", "]", ":",
")", "(", ".", "。", ",", "!", ";", "“", "”", "‘", "’",
"[", "]", "、", "—", " ", "《", "》", "-", "…", "【", "】");
$this->_input = str_replace($search,' ',$this->_input);
$this->_input = str_replace($this->_cnStopWords,' ',$this->_input);
}

public function nextToken()
{
if ($this->_input === null) {
return null;
}
while ($this->_position < strlen($this->_input)) {
while ($this->_position < strlen($this->_input) &&
$this->_input[$this->_position]==' ' ) {
$this->_position++;
}
$termStartPosition = $this->_position;
$temp_char = $this->_input[$this->_position];
$isCnWord = false;
if(ord($temp_char)>127){
$i = 0;
while ($this->_position < strlen($this->_input) &&
ord( $this->_input[$this->_position] )>127) {
$this->_position = $this->_position + 3;
$i ++;
if($i==2){
$isCnWord = true;
break;
}
}
if($i==1)continue;
}else{
while ($this->_position < strlen($this->_input) &&
ctype_alnum( $this->_input[$this->_position] )) {
$this->_position++;
}
}
if ($this->_position == $termStartPosition) {
return null;
}

$token = new Zend_Search_Lucene_Analysis_Token(
substr($this->_input,
$termStartPosition,
$this->_position - $termStartPosition),
$termStartPosition,
$this->_position);
$token = $this->normalize($token);
if($isCnWord)$this->_position = $this->_position - 3;
if ($token !== null) {
return $token;
}
}
return null;
}

}

$stopWords = array('a', 'an', 'at', 'the', 'and', 'or', 'is', 'am');

$stopWordsFilter = new Zend_Search_Lucene_Analysis_TokenFilter_StopWords($stopWords);
$analyzer = new Simple_Chinese_Analyzer();
$cnStopWords = array('的');
$analyzer->setCnStopWords($cnStopWords);
$analyzer->addFilter($stopWordsFilter);
$value = 'this is  (a test)【中文】的测试, 长春市长春药店';
$analyzer->setInput($value, 'utf-8');

$position     = 0;
$tokenCounter = 0;
while (($token = $analyzer->nextToken()) !== null)
{
$tokenCounter++;
$tokens[] = $token;
}
print_r($tokens);

?>

Search Engine 学习笔记 —— Store Server

Tuesday, January 29th, 2008

最近读了介绍google架构的paper : The Anatomy of a Large-Scale Hypertextual Web Search Engine. 还是学习到了很多东西, 尤其对我这个刚刚入门搜索引擎的人.:). 其中架构中, 有这样一个角色就是 Store Server 和 Repository Database. 他们是全部抓取的page的保存, 而架构中的其他角色都是围绕这个page来进行, 而且通过Repository可以将其他所有数据恢复. 因此, 这个部分应该整个search engine的基础.所以.今天我们来设计并完成一个Simple Store Server. 这里只是一个simple,具体完善的实现, 等我测试稳定, 会放到开源目录里.:)

在开始之前需要一些热身准备活动. :). 如果你有一下知识,可以更好的理解架构,以及代码部分的实现和编译.

  • 需要了解ACE架构, ACE Reactor, ACE Task等.
  • 需要了解mysql++.
  • 需要了解zipstream.
  • 需要知道多线程的基础知识.
  • 需要一些基本的web知识.

OK.有了一些基础知识, 我开始第一步, 架构的设计.由于之前的一些测试表明 Dreamhost无法接受较大的HTTP RAW DATA, 因此需要自己实现一个简单的web功能,并将crawler收集的数据通过mysql++发送给已经在Dreamhost建立好的数据库存储. 以后indexer可以通过Repository来获取page去建立index, links, barrels等. 这些都是以后会逐步设计实现的.

之所以使用WEB是希望这个Store server以后可以方便的移植到fcgi,或者在linux起独立的进程,更重要的是http协议是相对来说在internet上限制小的协议, 有些公司只允许http访问, 而TCP/UDP的协议是不允许的. 考虑到crawler是分布式在不同的peer上,因此使用http post的方法上传数据, 还是更加容易满足大多数peer的情况. :). 对于store server也可以作为public peer的一种职能. 而Repository也将是分布式的存储.

由于我们是设计simple的server,所以太多feature不能一一实现, 只要在架构允许, 那么使用者以后可以自己扩展并且实现自己想要的功能.

这里还要说明一点,为什么模型设计成多线程, 主要由于mysql++实现是使用阻塞的socket. 这样没有办法和非阻塞的reactore模型一起使用. 因为server得设计应该是高效的, 自然就想到的线程模型. web thread负责接受http post过来的数据, 然后扔到一个队列中, 而负责database操作的thread从queue中那种数据, 并insert到数据库里. 这个想法很自然. 多个线程共享的资源只有一个queue, 因此对queue的操作需要保护.

store server

几点说明,

  1. 从上面的架构来看不难发现, 主要效率问题会体现在Queue的读写操作. 尤其Store Server面对大量crawler返回的数据来说, 不过如果store server集成在peer的功能中,那么peer即使crawler也是storer.这样压力主要体现在分布式数据上.至于这里, 我还没有收集跟多的资料.
  2. 在Web Thread目前使用reactor模型, 这里是为了使用以往已经实现的稳定代码, 实际上, 这个simple的设计完全可以使用proactor模型,和ACE中Message Queue来实现, 不过 首先你要熟悉proactor的架构.:)
  3. 我的实现为了简便,并且作为轻量级的simple server,在实现中, 有错误或者不好的地方,请见谅. :)
以下是部分.h文件,

#ifndef WEB_HANDLER_H
#define WEB_HANDLER_H

#include <ace/Thread_Mutex.h>
#include <ace/Task.h>
#include <ace/OS.h>
#include <ace/Log_Msg.h>
#include <ace/Message_Block.h>

#include "../../common/HTTP_Handler/HTTP_Server.h"

#include <map>

class Queue_Handler;

class Web_Handler :
public ACE_Task_Base,
public HTTP_Server_Callback
{
public:
Web_Handler();
~Web_Handler();

public:
bool open(
Queue_Handler *queue_handler,
const std::vector<ACE_INET_Addr> &addrs
);

bool close();

public:
virtual bool on_http_request(
const HTTP_HANDLER handler,
const char *uri,
const size_t content_length,
const std::map<std::string, std::string> &params
);

virtual bool on_http_content(
const HTTP_HANDLER handler,
const char *content_recv,
const size_t content_recv_length
);

virtual bool on_http_hunger(const HTTP_HANDLER handler);

virtual void on_http_close(const HTTP_HANDLER handler);

public:
virtual int svc (void);

private:
struct HANDLER_INFO
{
size_t content_size;
size_t recved_size;
std::string page_url;
ACE_Message_Block *page_content;
};

bool find_http_handler(const HTTP_HANDLER http_handler);
bool delete_http_handler(const HTTP_HANDLER http_handler);

bool send_response(const HTTP_HANDLER handler, std::string err_code);

private:
ACE_Reactor *reactor_;

Queue_Handler *queue_handler_;

HTTP_Server *http_server_;

std::map<HTTP_HANDLER, HANDLER_INFO> sessions_;

};

#endif // WEB_HANDLER_H
#ifndef QUEUE_HANDLER_H
#define QUEUE_HANDLER_H

#include <ace/Thread_Mutex.h>
#include <ace/Task.h>
#include <ace/OS.h>
#include <ace/Log_Msg.h>
#include <ace/Message_Block.h>

#include <deque>
#include <string>
#include <utility>

typedef ACE_Thread_Mutex MUTEX;

typedef std::pair<std::string, ACE_Message_Block*> PAGE_PAIR;

class Queue_Handler : public ACE_Task_Base
{
public:
Queue_Handler();
~Queue_Handler();

public:
bool open();
bool close();

public:
void push(std::string link, const ACE_Message_Block *page);
void pop(PAGE_PAIR &page);

public:
virtual int svc(void);

private:
MUTEX mutex_;

std::deque< PAGE_PAIR > queued_pages_;

};

#endif // QUEUE_HANDLER_H

#ifndef DB_HANDLER_H
#define DB_HANDLER_H

#include <ace/Thread_Mutex.h>
#include <ace/Task.h>
#include <ace/OS.h>
#include <ace/Log_Msg.h>
#include <ace/Message_Block.h>

#include "mysql++.h"

#include <string>

#include "Queue_Handler.h"

class DB_Handler : public ACE_Task_Base
{
public:
DB_Handler();
~DB_Handler();

public:
bool open(
std::string db_name,
std::string db_host,
std::string db_user,
std::string db_pass,
Queue_Handler *queue_handler
);
bool close();

public:
virtual int svc (void);

private:
bool update(const PAGE_PAIR &page);
bool query(const PAGE_PAIR &page);

private:
mysqlpp::Connection *con_;

std::string db_name_;
std::string db_host_;
std::string db_user_;
std::string db_pass_;

Queue_Handler *queue_handler_;
};

#endif // DB_HANDLER_H