博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python全栈开发 * 进程之间的通信,进程之间数据共享 * 180726
阅读量:6978 次
发布时间:2019-06-27

本文共 7922 字,大约阅读时间需要 26 分钟。

进程之间的通信(IPC)队列和管道 一.队列   基于管道实现     管道 + 锁      数据安全 (一).队列     队列遵循先进先出原则(FIFO)     多用于维护秩序,买票,秒杀     队列的所有方法:         put()(给队列里添加数据),put_nowait(),         get()(从队列中获取数据),get_nowait(),         相同点:有值的时候取值         区别:get()没有值时会阻塞             get_nowait() 没有值时会报错         full()(返回布尔值),empty()(返回bool值),         qsize()(队列大小)
示例:        from queue import Queue        q=Queue(5)        print(q.qsize())   #0        q.put(1)        q.put(2)        q.put(3)        q.put(4)        # q.put_nowait()   #已知队列长度,队列未放满数据执行此方法会报错        # 报错信息:  TypeError: put_nowait() missing 1 required positional argument: 'item'        q.put(5)        print(q.qsize())   #5        print(q.full())   #True  #队列满了返回True.        print(q.get())        print(q.get())        print(q.get())        print(q.get())        print(q.get())        print(q.empty())   #True    队列里无值返回True        print(q.qsize())   #1 以上五步取出队列中的数据,此时队列长度是0.        # q.get()    #队列里无值,用此方法程序阻塞不返回任何东西        # q.get_nowait()    #队列里无值,用此方法程序会报错,报错信息:queue.Empt
补充知识:         栈 :先进后出           应用:三级菜单  计算文件夹的总大小 (二).进程中的队列     进程中的队列方法:     empty(), full()在多进程中是不可靠     put(),put_nowait()     get(),get_nowait()
示例:    from multiprocessing import Queue    q=Queue(3)    q.put(2)    q.put(3)    q.put(5)    print(q.qsize())     #3    # q.put_nowait()  #  未给队列传参数或报错TypeError: put_nowait() missing 1 required positional argument: 'obj'    print(q.get())   #2    print(q.get())   #3    print(q.get())   #5    print(q.qsize())    #0    q.get()            #无值不返回    q.get_nowait()     #无值会报错queue.Empty
主进程放 子进程取
from multiprocessing import Queue,Process    def con(q):  #定义一个con函数    传一个参数 q  是创建的队列对象        print(q.get())  #获取队列数据,如果没有数据则一直阻塞直到有数据为止    if __name__=="__main__":   #在这一判断条件下面创建子进程        q=Queue()          #创建一个队列对象        p=Process(target=con,args=(q,)) #创建一个子进程子进程对象为con,并传一个参数q.        p.start()    #申请开启子进程        q.put(123)   #给队列中添加数据
子进程放,另一个子进程取
from multiprocessing import Queue,Process    def con(q):  #定义一个con函数    传一个参数 q  是创建的队列对象        print(q.get())  #获取队列数据,如果没有数据则一直阻塞直到有数据为止    if __name__=="__main__":   #在这一判断条件下面创建子进程        q=Queue()          #创建一个队列对象        p=Process(target=con,args=(q,)) #创建一个子进程子进程对象为con,并传一个参数q.        p.start()    #申请开启子进程        q.put(123)   #给队列中添加数据
生产者消费者模型                     --解决创造(生产)数据和处理(消费)数据的效率不平衡问题 把创造数据和处理数据放在不同的进程中 根据他们的效率来调整进程的个数 生产数据快  消费数据慢   内存空间的浪费 生产数据慢  消费数据快   效率低下
import time    import random    from multiprocessing import Queue,Process    def consumer(q,name):#创建一个函数消费者        while True:     #无限循环            food=q.get()            #1 获取队列中数据            if food=="stop":break   #2.判断获取的数据是否是结束符,如果是则停止获取数据(消费食物)            print("%s吃了%s"% (name,food))   #3.提示消费者吃了生产者生产的食物            time.sleep(random.random())      #4.消费者消费食物需要一定的时间  5_8 重复一次(消费者进程进行了两次)    def producer(q,name,food,n=10):#创建一个函数生产者        for i in range(n): #9.生产者生产食物的数量计数器            time.sleep(random.random())  #10.生产食物需要消耗一定的时间            fd=food+str(i)                 #11给食物标号            print("%s生产了%s"% (name,fd)) #12提示生产者生产的食物            q.put(fd)      #13把食物放进队列    14-18重复一遍 ,生产者进程进行了两次    if __name__=="__main__":#在此判断条件下创建一个子进程        q=Queue()   #创建一个队列        c=Process(target=consumer,args=(q,"alex"))#创建一个消费者子进程,,传两个参数        c.start() #申请开启子进程        c1=Process(target=consumer,args=(q,"alex")) #再创建一个消费者子进程,传两个参数        c1.start()#申请开启子进程        p=Process(target=producer,args=(q,"太白","食物"))  #创建一个生产者子进程,传三个参数        p.start()#申请开启子进程        p1 = Process(target=producer, args=(q, "egon", "甜点"))#再创建一个生产者子进程,传三个参数        p1.start()#申请开启子进程        p.join()        #第一个生产者生产结束再执行下一步        p1.join()       #第二个生产者生产结束再执行下一步        q.put("stop")    #全部生产者生产结束以后 再发送结束符        q.put("stop")    #有几个消费者就发送几个结束符
让consumer 停下来的方法         在所有生产者结束生产之后,向队列中放入一个结束符         有几个消费者就向队列中放入几个结束符         在消费者消费的过程中,接收到结束符,就结束消费的进程 生产者消费者模型  ---JoinableQueue版
import time    import random    from multiprocessing import JoinableQueue,Process    def consumer(q,name):#创建消费者函数        while True:         #6.无限循环            food=q.get()     #7.获取队列中的食物            print("%s吃了%s" %(name,food))#8提示消费者吃了食物            time.sleep(random.random())    #9.消费者消费食物需要时间            q.task_done()                  #10.每消费完一个食物就给生产者发送一次信息   16-20 以此类推    def producer(q,name,food,n=10):#创建生产者函数        for i in range(10):             #1.无限循环     11-15 以此类推            time.sleep(random.random())  #2.生产食物需要耗费时间            fd=food+str(i)               #3.生产食物名+序号            print("%s生产了%s"%(name,fd))#4.提示生产者生产了食物            q.put(fd)                     #5.将生产的食物添加到队列中        q.join()             #等消费者全部消费结束之后才结束阻塞#    if __name__=="__main__":#在此条件下创建一个子进程        q=JoinableQueue()                                  #创建一个队列(JoinableQueue)        c=Process(target=consumer,args=(q,"alex"))        #创建一个消费者进程两个参数        c.daemon=True   #设置守护进程        c.start()       #申请开启消费者进程        c1=Process(target=consumer,args=(q,"alex"))        #在创建一个消费者进程,传两个参数        c1.daemon=True   #设置守护进程        c1.start()        #申请开启另一个消费者进程        p=Process(target=producer,args=(q,"太白","食物"))   #创建一个生产者进程 传三个参数        p.start()         #申请开启一个消费者进程        p1=Process(target=producer,args=(q,"egon",'甜点'))  #再创建一个生产者进程,传三个参数        p1.start()        #申请开启另一个生产者进程        p.join()          #结束第一个生产者进程后再执行后面的代码        p1.join()         #结束第二个生产者进程后结束主程序    (守护程序直接结束)(消费者程序结束)
总结 :
只有multiprocessing中的队列才能帮助你实现IPC(进程之间的通信)    永远不可能出现数据不安全的情况,多个进程不会同时取走同一个数据    由于先进先出的特点 + 进程通信的功能 + 数据进程安全,经常用它来完成进程之间的通
生产者消费者模型
生产者和消费者的效率平衡的问题    内存的控制--队列的长度限制    让消费者自动停下来
JoinableQueue
在消费数据的时候   task_done    在生产端\主进程    join    流程:    消费者消费完毕    生产者结束阻塞    主进程代码结束    守护进程结束=>消费者结束
二.管道       数据不安全
示例:    from multiprocessing import Pipe    left,right=Pipe()    left.send("888")    print(right.recv())

 管道第一版:

from multiprocessing import Pipe,Process    def consumer(left,right):        left.close()   # 一个进程中,如果接收时其中一端关闭, 另一端开启在无限循环接收中会报错,        while True:   #左右两端都开启则可以无限循环接收,不会报错            try:                print(right.recv())   #接收信息            except EOFError:                break    if __name__=="__main__":        left,right=Pipe()    #创建一个管道        p=Process(target=consumer,args=(left,right))  #创建一个consumer子进程,传两个参数        p.start()  #申请开启一个子进程        right.close()   #关闭右管道口        for i in range(10):  #计数器            left.send("hello")  #左管道口发送信息(发送十次)        left.close()  #所有信息发完关闭右管道口    EOF异常的触发:        在这一个进程中,如果不再用这个端点了,应该close.        这一在recv的时候,如果其他端点都被关闭了,就能够知道不会再有新的消息传进来        此时就不会在这里阻塞等待,而是抛出一个EOFError        close并不是关闭了整个管道,而是修改了操作系统对管道端点的引用
一个进程发另一个进程收
from multiprocessing import Process,Pipe    def consumer(p,name):        produce,consume=p        produce.close()        while True:            try:                food=consume.recv()                print("%s收到%s"%(name,food))            except EOFError:                break    def producer(p,sep=10):        produce,consume=p        consume.close()        for i in range(sep):            produce.send(i)    if __name__=="__main__":        produce,consume=Pipe()        for i in range(5):            c=Process(target=consumer,args=((produce,consume),"c1"))            c.start()        for i in range(5):            p=Process(target=producer,args=((produce,consume),))            p.start()        producer((produce,consume))        produce.close()        consume.close()
进程之间的数据共享
from  multiprocessing import Manager,Process,Lockdef work(d,lock):#定义一个work函数 传两个参数    with lock:        d["count"]-=1#字典的count对应的值减一if __name__=="__main__":    lock=Lock()    m=Manager()    dic=m.dict({
"count":100}) p_l=[] for i in range(100): p=Process(target=work,args=(dic,lock)) #创建一个work子进程 传两个参数dic,lock,一共创建一百个 p_l.append(p) #每创建一个进程就添加进列表中 p.start() #每创建一个就申请开启一个 for p in p_l: #循环遍历列表中每一个进程 p.join() #直到所有进程都结束再执行后面的代码 print(dic) #0 最后打印字典

 

转载于:https://www.cnblogs.com/J-7-H-2-F-7/p/9385187.html

你可能感兴趣的文章
从零开始学习Sencha Touch MVC应用之十五
查看>>
tornado源码分析系列一
查看>>
[C#]函数式监听器 TListener 简单例子及文档
查看>>
魔法变量*args 和 **kwargs
查看>>
实验十二—swing编程
查看>>
洛谷3778 [APIO2017]商旅
查看>>
const char **
查看>>
一个面试官眼里的校园招聘——写给求职心切的应届毕业生
查看>>
开发中常用的插件
查看>>
dev repositoryItem 手工定义
查看>>
SQL Server 2012中HADR读写均衡功能需要注意的一些小细节
查看>>
ajax无线级刷新
查看>>
RC上电复位时间计算
查看>>
Fluently NHibernate映射多个实体程序集
查看>>
穿墙效果
查看>>
如何理解Spring对缓存的支持
查看>>
第十五课:奇葩的元素节点iframe
查看>>
Problem A. Speaking in Tongues
查看>>
Linux driver 板级文件跟踪一般方法
查看>>
Unity反射探针用法教程
查看>>