Posts Tagged ‘’

[python] Simple Watch Dog

Thursday, May 15th, 2008


为了让spider的进程可以做到简单的保护,做了一个简单的watch dog. 原理很简单, 就是给dog喂食, 如果dog发现没有及时喂食, 就kill要守护的process,然后restart这个process.

主要class 有两个:

  • watchdog 负责守护进程
  • feeder 负责喂食

用法也很简单

  • 启动watchdog
  • 在你想要守护的进程中加入feeder,

几点说明:

  • 由于是时间仓促, 其中,有些地方做的还比较简单, 没有异常处理, 请见谅 :)
  • 我做的feeder是为了在我的python spider使用, 其他语言自己port一下吧

具体代码下载( watchdog)

import os
import sys
import time
import copy
import socket
import threading

class Watch_Dog (object):
    def __init__(self):
        self._exit = 0
        self._sock = None
        self._processes = {}
        self._processes_lock = threading.Lock()
        self._feed_interval = 120
        self._check_interval = 30
        self._addr = None
        return
    def open(self, addr):
        # record address
        self._addr = addr
        # create sock and bind to addr
        try:
            self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            self._sock.bind(self._addr)
            self._sock.setblocking(0)
        except:
            pass
            print 'creat or bind socket failed'
            return 0
        # create worker thread
        h = threading.Thread(target=self._worker, args=())
        h.start()
        return 1
    def close(self):
        # close thread
        self._exit = 1
        #close socket
        try:
            self._sock.close()
        except:
            pass
        return
    def process(self):
        last_check_time = time.time()
        while not self._exit:
            if time.time() - last_check_time > self._check_interval :
                last_check_time = time.time()
                self._check()
            time.sleep(1)
        return
    def _worker(self):
        while not self._exit:
            try:
                recved, addr = self._sock.recvfrom(1024)
                if len(recved) > 0:
                    ss = recved.split('\n')
                    if len(ss) == 3 and ss[0] == 'register' :
                        self._register(ss[1], ss[2])
                    elif len(ss) == 2 and ss[0] == 'feed' :
                        self._feed(ss[1])
            except:
                pass
            time.sleep(0.1)
        return
    def _register(self, id, path):
        print 'regiester', id, path
        key = hash(id)
        self._processes_lock.acquire()
        if not self._processes.has_key(key) :
            self._processes[key] = [id, path, time.time()]
        self._processes_lock.release()
        return
    def _feed(self, id):
        print 'feed', id
        key = hash(id)
        self._processes_lock.acquire()
        if self._processes.has_key(key) :
            self._processes[key][2] = time.time()
        self._processes_lock.release()
        return
    def _check(self):
        print time.time(), 'check process'
        # make copy
        self._processes_lock.acquire()
        processes = copy.copy(self._processes)
        self._processes_lock.release()
        # check each robot
        for k in processes.keys():
            id = processes[k][0]
            pt = processes[k][1]
            tm = processes[k][2]
            if time.time() - tm > self._feed_interval :
                self._remove(id)
                pid = int(id)
                h = threading.Thread(target=self._restart, args=(pid, pt))
                h.start()
        return
    def _remove(self, id):
        key = hash(id)
        self._processes_lock.acquire()
        del self._processes[key]
        self._processes_lock.release()
        return
    def _restart(self, pid, path):
        print 'thread >> start kill',pid,'restart',path
        if os.name == 'nt' :
            os.popen('taskkill /F /PID '+str(pid))
            os.startfile(path)
        elif os.name == 'posix' :
            os.popen('kill -9 '+str(pid))
            os.spawnvp(os.P_NOWAIT, 'python', ['python', path])
        else :
            print 'not suppot this os now'
        print 'thread >> finish'
        return

class Feeder (object) :
    def __init__(self):
        self._sock = None
        self._pid = None
        self._exit = 0
        self._feed_interval = 10
        self._addr = None
        self._path = None
        return
    def open(self, addr, path):
        self._addr = addr
        self._path = path
        self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self._pid = os.getpid()
        self._register()
        h = threading.Thread(target=self._worker, args=())
        h.start()
        return
    def close(self):
        self._exit = 1
        self._sock.close()
        return
    def _worker(self):
        last_feed_time = time.time()
        while not self._exit :
            if time.time() - last_feed_time > self._feed_interval :
                last_feed_time = time.time()
                self._feed()
            time.sleep(1)
        return
    def _register(self):
        self._sock.sendto('register\n'+str(self._pid)+'\n'+self._path, self._addr)
        return
    def _feed(self):
        self._sock.sendto('feed\n'+str(self._pid), self._addr)
        return

if __name__ == '__main__':

    print 'Current OS :', os.name

    addr = ('localhost', 15028)

    wd = Watch_Dog()

    if not wd.open(addr) : exit(-1)

    wd.process()

    wd.close()