跳转至

Python-multiprocessing:进程

进程

  • 系统中最大可以有65535个进程 pid不允许有相同的
  • 进程中的资源每个都是独立的 跟全局变量互不相通
  • process 创建的子进程都结束完主进程才会结束,但是不影响主进程的执行
  • 主进程被终止或者kill时 子进程也跟随结束
  • obj.join(timeout)的特点是阻塞主进程 只有join()的对象执行完毕 主进程代码才会继续往下走 或者设置参数等待多少秒后向下执行 但join()的进程不会终止
  • 我们使用multiprocessing.process创建子类来实现多进程
  • 用子类创建的进程 子类里面需要重新父类run 方法

进程池

  • 如果添加的任务大于进程池,没被执行的任务会等待进程池中的进程完成一个任务之后,会自动的用刚才的进程进完成当前的新任务
  • 进程池 pool 一般用来缓冲 主进程结束后会kill子进程: p1 = pool(参数) 参数可以指定进程池中最大的进程数
  • Pool的实例对象 p1.closs() 关闭进程池 关闭之后不会再接收新的任务
  • p1.join() 必须放p1.closs()语句后面 阻塞主进程 必须等待p1 结束之后 主进程才会继续执行
  • 进程越多的情况下 消耗的资源切换的资源会增多
  • p1.apply_async(func1name,(参数,)) 这种方式添加的任务会并发执行 异步执行
  • p1.apply 阻塞同步执行,这种方式添加的任务,会等待上一次的任务结束,才会继续往执行进程池中间的任务

进程通讯Queue

进程中的通讯

  • 只用作用于multiprocessing.process创建的进程(队列: 先进先出)
# from multiprocessing import Queue     这种是用multiprocessing创建的多进程进行通讯时用到的
# q = Queue(3)  初始化一个Queue对象 最多可接收3条put消息  如没有参数 则可以添加到内存的尽头
# q.put()   往队列里面添加一个消息
# q.full() 判断队列是否满载 如果队列满了返回True  反之False         案例  Queue_01
# q.empty() 判断代码是否为空如果为空则返回True 反之False            案例  Queue_01
# q.qsize  打印出来目前队列中的队列数量 ,如果队列中没有可以取出来的消息 则堵塞进程,等待传入消息    案例  Queue_01
# q.put_nowait   不阻塞进程直接往里面传数据 如果数据已经满了 则会报错
# q.get_nowait

from multiprocessing import Queue
q = Queue(3)
q.put('消息2')
q.put('消息')
print(q.full(),'full 这里因为队列中只有一条消息 没有满载 所以返回False')
q.put('消息3')
print(q.full(),'full 这里队列因为前面的三条消息 所以队列满了 返回True')
print(q.empty(),'empty 队列里面还有消息 所以返回 False')
print(q.qsize(),'这里打印队列里面消息的数量')
q.get()    #取出来一条
print(q.qsize(),'这里因为以上面取出来了一条  所以打印2')
q.get_nowait()
q.get_nowait()
print(q.qsize(),'这里因为上面两个get_nowait取完了  所以为0')
# q.get()  #这里会等待有数据传输进来   如果没有则堵塞进程 下面的代码就无法执行
# q.get_nowait()  #这一句代码不等待数据传输进来  如果取不到数据 直接异常
q.put('消息1')
q.put('消息2')
q.put('消息3')
# q.put('消息4')
print(q.qsize(),'上面的消息如果不注释掉 就会造成堵塞 因为队列中的消息只能容纳一条   所以打印输出3')


# q.put_nowait('已经满了 这条命令不会阻塞 会报异常消息')
q.put('消息5')# 添加一下消息
print('这里不会执行是因为以上面的命令让这里堵塞了')

q.get()
print(q.qsize(),'测试1')

进程池中的通讯

  • 需要用到multiprocessing.Manager.Queue来接收消息
from multiprocessing import Manager,Pool
import os,time,sys
def reader(q):
    print('reader启动{0},父进程为{1}'.format(os.getpgid(),os.getppid()))
    for i in range(q.qsize()):
        print('reader从queue获取到消息:{0}'.format(q.get(True)))


def writer(q):
    print('writer启动{0},父进程为{1}'.format(os.getpid(),os.getppid()))
    for i in 'fongge':
        q.put(i)


if __name__ == '__main__':
    print('{0}启动'.format(os.getpid))
    q = Manager().Queue()
    po = Pool()
    po.apply_async(writer,(q,))
    po.apply_async(reader, (q,))
    po.close()
    po.join()
    print('')