Python自定义进程池实例分析【生产者、消费者模型问题】

本文实例分析了Python自定义进程池。分享给大家供大家参考,具体如下:代码说明一切:

#encoding=utf-8#author: walker#date: 2014-05-21#function: 自定义进程池遍历目录下文件from multiprocessing import Process, Queue, Lockimport time, os#消费者class Consumer(Process):  def __init__(self, queue, ioLock):    super(Consumer, self).__init__()    self.queue = queue    self.ioLock = ioLock  def run(self):    while True:      task = self.queue.get()  #队列中无任务时,会阻塞进程      if isinstance(task, str) and task == 'quit':        break;      time.sleep(1)  #假定任务处理需要1秒钟      self.ioLock.acquire()      print( str(os.getpid()) + ' ' + task)      self.ioLock.release()    self.ioLock.acquire()    print 'Bye-bye'    self.ioLock.release()#生产者def Producer():  queue = Queue()  #这个队列是进程/线程安全的  ioLock = Lock()  subNum = 4  #子进程数量  workers = build_worker_pool(queue, ioLock, subNum)  start_time = time.time()  for parent, dirnames, filenames in os.walk(r'D:\test'):    for filename in filenames:      queue.put(filename)      ioLock.acquire()      print('qsize:' + str(queue.qsize()))      ioLock.release()      while queue.qsize() > subNum * 10: #控制队列中任务数量        time.sleep(1)  for worker in workers:    queue.put('quit')  for worker in workers:    worker.join()  ioLock.acquire()  print('Done! Time taken: {}'.format(time.time() - start_time))  ioLock.release()#创建进程池def build_worker_pool(queue, ioLock, size):  workers = []  for _ in range(size):    worker = Consumer(queue, ioLock)    worker.start()    workers.append(worker)  return workersif __name__ == '__main__':  Producer()

ps:

self.ioLock.acquire()...self.ioLock.release()

可用:

with self.ioLock:  ...

替代。再来一个好玩的例子:

#encoding=utf-8#author: walker#date: 2016-01-06#function: 一个多进程的好玩例子import os, sys, timefrom multiprocessing import Poolcur_dir_fullpath = os.path.dirname(os.path.abspath(__file__))g_List = ['a']#修改全局变量g_Listdef ModifyDict_1():  global g_List  g_List.append('b')#修改全局变量g_Listdef ModifyDict_2():  global g_List  g_List.append('c')#处理一个def ProcOne(num):  print('ProcOne ' + str(num) + ', g_List:' + repr(g_List))#处理所有def ProcAll():  pool = Pool(processes = 4)  for i in range(1, 20):    #ProcOne(i)    #pool.apply(ProcOne, (i,))    pool.apply_async(ProcOne, (i,))  pool.close()  pool.join()ModifyDict_1() #修改全局变量g_Listif __name__ == '__main__':  ModifyDict_2() #修改全局变量g_List  print('In main g_List :' + repr(g_List))  ProcAll()

Windows7 下运行的结果:

λ python3 demo.pyIn main g_List :['a', 'b', 'c']ProcOne 1, g_List:['a', 'b']ProcOne 2, g_List:['a', 'b']ProcOne 3, g_List:['a', 'b']ProcOne 4, g_List:['a', 'b']ProcOne 5, g_List:['a', 'b']ProcOne 6, g_List:['a', 'b']ProcOne 7, g_List:['a', 'b']ProcOne 8, g_List:['a', 'b']ProcOne 9, g_List:['a', 'b']ProcOne 10, g_List:['a', 'b']ProcOne 11, g_List:['a', 'b']ProcOne 12, g_List:['a', 'b']ProcOne 13, g_List:['a', 'b']ProcOne 14, g_List:['a', 'b']ProcOne 15, g_List:['a', 'b']ProcOne 16, g_List:['a', 'b']ProcOne 17, g_List:['a', 'b']ProcOne 18, g_List:['a', 'b']ProcOne 19, g_List:['a', 'b']

Ubuntu 14.04下运行的结果:

In main g_List :['a', 'b', 'c']ProcOne 1, g_List:['a', 'b', 'c']ProcOne 2, g_List:['a', 'b', 'c']ProcOne 3, g_List:['a', 'b', 'c']ProcOne 5, g_List:['a', 'b', 'c']ProcOne 4, g_List:['a', 'b', 'c']ProcOne 8, g_List:['a', 'b', 'c']ProcOne 9, g_List:['a', 'b', 'c']ProcOne 7, g_List:['a', 'b', 'c']ProcOne 11, g_List:['a', 'b', 'c']ProcOne 6, g_List:['a', 'b', 'c']ProcOne 12, g_List:['a', 'b', 'c']ProcOne 13, g_List:['a', 'b', 'c']ProcOne 10, g_List:['a', 'b', 'c']ProcOne 14, g_List:['a', 'b', 'c']ProcOne 15, g_List:['a', 'b', 'c']ProcOne 16, g_List:['a', 'b', 'c']ProcOne 17, g_List:['a', 'b', 'c']ProcOne 18, g_List:['a', 'b', 'c']ProcOne 19, g_List:['a', 'b', 'c']

可以看见Windows7下第二次修改没有成功,而Ubuntu下修改成功了。据uliweb作者limodou讲,原因是Windows下是充重启实现的子进程;Linux下是fork实现的。更多关于Python相关内容感兴趣的读者可查看本站专题:《Python URL操作技巧总结》、《Python图片操作技巧总结》、《Python数据结构与算法教程》、《Python Socket编程技巧总结》、《Python函数使用技巧总结》、《Python字符串操作技巧汇总》、《Python入门与进阶经典教程》及《Python文件与目录操作技巧汇总》希望本文所述对大家Python程序设计有所帮助。命运如同手中的掌纹,无论多曲折,终掌握在自己手中。

Python自定义进程池实例分析【生产者、消费者模型问题】

相关文章:

你感兴趣的文章:

标签云: