Python-multiprocessing:进程
进程
- 系统中最大可以有65535个进程 pid不允许有相同的
- 进程中的资源每个都是独立的 跟全局变量互不相通
- process 创建的子进程都结束完主进程才会结束,但是不影响主进程的执行
- 主进程被终止或者kill时 子进程也跟随结束
- obj.join(timeout)的特点是阻塞主进程 只有join()的对象执行完毕 主进程代码才会继续往下走 或者设置参数等待多少秒后向下执行 但join()的进程不会终止
- 我们使用multiprocessing.process创建子类来实现多进程
- 用子类创建的进程 子类里面需要重新父类run 方法
进程池
- 如果添加的任务大于进程池,没被执行的任务会等待进程池中的进程完成一个任务之后,会自动的用刚才的进程进完成当前的新任务
- 进程池 pool 一般用来缓冲 主进程结束后会kill子进程: p1 = pool(参数) 参数可以指定进程池中最大的进程数
- Pool的实例对象 p1.closs() 关闭进程池 关闭之后不会再接收新的任务
- p1.join() 必须放p1.closs()语句后面 阻塞主进程 必须等待p1 结束之后 主进程才会继续执行
- 进程越多的情况下 消耗的资源切换的资源会增多
- p1.apply_async(func1name,(参数,)) 这种方式添加的任务会并发执行 异步执行
- p1.apply 阻塞同步执行,这种方式添加的任务,会等待上一次的任务结束,才会继续往执行进程池中间的任务
进程通讯Queue
进程中的通讯
- 只用作用于multiprocessing.process创建的进程(队列: 先进先出)
# from multiprocessing import Queue 这种是用multiprocessing创建的多进程进行通讯时用到的
# q = Queue(3) 初始化一个Queue对象 最多可接收3条put消息 如没有参数 则可以添加到内存的尽头
# q.put() 往队列里面添加一个消息
# q.full() 判断队列是否满载 如果队列满了返回True 反之False 案例 Queue_01
# q.empty() 判断代码是否为空如果为空则返回True 反之False 案例 Queue_01
# q.qsize 打印出来目前队列中的队列数量 ,如果队列中没有可以取出来的消息 则堵塞进程,等待传入消息 案例 Queue_01
# q.put_nowait 不阻塞进程直接往里面传数据 如果数据已经满了 则会报错
# q.get_nowait
from multiprocessing import Queue
q = Queue(3)
q.put('消息2')
q.put('消息')
print(q.full(),'full 这里因为队列中只有一条消息 没有满载 所以返回False')
q.put('消息3')
print(q.full(),'full 这里队列因为前面的三条消息 所以队列满了 返回True')
print(q.empty(),'empty 队列里面还有消息 所以返回 False')
print(q.qsize(),'这里打印队列里面消息的数量')
q.get() #取出来一条
print(q.qsize(),'这里因为以上面取出来了一条 所以打印2')
q.get_nowait()
q.get_nowait()
print(q.qsize(),'这里因为上面两个get_nowait取完了 所以为0')
# q.get() #这里会等待有数据传输进来 如果没有则堵塞进程 下面的代码就无法执行
# q.get_nowait() #这一句代码不等待数据传输进来 如果取不到数据 直接异常
q.put('消息1')
q.put('消息2')
q.put('消息3')
# q.put('消息4')
print(q.qsize(),'上面的消息如果不注释掉 就会造成堵塞 因为队列中的消息只能容纳一条 所以打印输出3')
# q.put_nowait('已经满了 这条命令不会阻塞 会报异常消息')
q.put('消息5')# 添加一下消息
print('这里不会执行是因为以上面的命令让这里堵塞了')
q.get()
print(q.qsize(),'测试1')
进程池中的通讯
- 需要用到multiprocessing.Manager.Queue来接收消息
from multiprocessing import Manager,Pool
import os,time,sys
def reader(q):
print('reader启动{0},父进程为{1}'.format(os.getpgid(),os.getppid()))
for i in range(q.qsize()):
print('reader从queue获取到消息:{0}'.format(q.get(True)))
def writer(q):
print('writer启动{0},父进程为{1}'.format(os.getpid(),os.getppid()))
for i in 'fongge':
q.put(i)
if __name__ == '__main__':
print('{0}启动'.format(os.getpid))
q = Manager().Queue()
po = Pool()
po.apply_async(writer,(q,))
po.apply_async(reader, (q,))
po.close()
po.join()
print('')