【澳门葡京备用网址】进度与线程,python全栈开发从入门到放弃之socket并发编程多进程

进度同步(multiprocessing.Lock(锁机制)、multiprocessing.Semaphore(信号量机制)、multiprocessing.伊夫nt(事件机制))

  在计算机中,有一对硬件和软件,例如处理器、打印机等,都属于竞争类能源,当有要求时,很多经过都要争抢这么些能源,而对此那类能源,就属于临界财富。当多进程共同处理某2个数量时,这几个数目也就属于一个逼近能源。操作系统对总计机内各个能源都使其在竞争中有序化,可是对于数据的话,尤其是用户动态暴发的数目,当处理时就成为了临界财富,所以大家作为程序猿来说,须要对临界能源加以敬爱,否则就会并发数量错乱情况。那是在拉长度序效能的优势下,带来的1个隐患。 

翻阅目录

一. cpython并发编程之多进度
1.1 multiprocessing模块介绍
1.2 Process类的介绍
1.3 Process类的行使
1.4 进度间通讯(IPC)形式一:队列
1.5 进度间通讯(IPC)形式二:管道(领会一些)
1.6 进度间通讯格局三:共享数据
1.7 进度同步(锁),信号量,事件…
1.8 进程池
二. python并发编程之二十二十四线程
2.1 threading模块
2.2 Python GIL(Global Interpreter Lock)
2.3 同步锁
2.4 死锁与递归锁
2.5 信号量Semahpore
2.6 事件Event
2.7 条件Condition(了解)

2.8 定时器Timer
2.9 线程queue
2.10 Python标准模块–concurrent.futures
三.  协程

四. 协程模块greenlet

五. gevent模块(单线程并发)

六. 综合应用

 

1.1 multiprocessing模块介绍

   
python中的八线程不可以利用多核优势,借使想要充足地动用多核CPU的能源(os.cpu_count()查看),在python中多数情景须求动用多进度。Python提供了尤其好用的多进度包multiprocessing。
   
multiprocessing模块用来开启子进度,并在子进度中实施我们定制的任务(比如函数),该模块与二十二十四线程模块threading的编程接口类似。

  multiprocessing模块的效能多多:协理子进度、通讯和共享数据、执行不一款型的一只,提供了Process、Queue、Pipe、Lock等零件。

   
必要重新强调的一点是:与线程分化,进程没有任何共享状态,进度修改的多少,改动仅限于该进程内。

何以是经过(process)?

程序并不能够独立运维,唯有将次第装载到内存中,系统为它分配能源才能运作,而那种实践的程序就叫做进度。程序和进度的界别就在于,程序是命令的汇集,它是进度运转的静态描述文本;进程是先后的三遍执行活动,属于动态概念。

multiprocessing.Lock(锁机制)

  当多少个经过使用同一份数据能源的时候,就会掀起多少安全或相继混乱难题。

澳门葡京备用网址 1澳门葡京备用网址 2

from multiprocessing import Process
import random
import time

def func(addr):
    print('我是%s'%addr)
    time.sleep(random.random())
    print('谢谢!')

if __name__ == '__main__':
    l = ['四川的','湖南的','河南的','江苏的']
    for addr in l:
        p = Process(target=func,args=(addr,))
        p.start()
    time.sleep(2)
    print('\n\n我选%s'%random.choice(l))
# 关于抢占输出资源的事情,是指多进程并发执行时,并不是一个进程执行完任务后其他进程再执行。
# 比如 此程序会输出:我是四川的  我是河南的 我是江苏的 谢谢!谢谢!我是湖南的 谢谢! 谢谢!
# 而不是 : 我是四川的 谢谢! 我是河南的 谢谢! ...

多进程关于抢占输出资源的事情

多进程有关抢占输出能源的事务

澳门葡京备用网址 3澳门葡京备用网址 4

from multiprocessing import Process
import random
import time
from multiprocessing import Lock
def func(addr,lock):
    lock.acquire()
    print('我是%s'%addr)
    time.sleep(random.random())
    print('谢谢!')
    lock.release()

if __name__ == '__main__':
    lock = Lock()
    l = ['四川的','湖南的','河南的','江苏的']
    for addr in l:
        p = Process(target=func,args=(addr,lock))
        p.start()
    time.sleep(4)
    print('\n\n我选%s'%random.choice(l))

使用锁维护输出资源

拔取锁维护输出财富

  上边那种状态,使用了加锁的花样保险了先后的逐一执行,可是举办又成为了串行,下跌了效用,可是只可以说,它确保了数量的安全性。

     
下边举例来说锁的根本:模拟12306抢票难题。模拟银行账户的存取款难点。

澳门葡京备用网址 5澳门葡京备用网址 6

# 注意,文件中存储需要以{'c':1}这种形式,c的引号一定要带
# 否则json识别不出来
# 此代码的效果,并发执行,但是多进程同时读写同一个文件数据,造成数据混乱

from multiprocessing import Process,Lock
import json
import time

def check(i,l):
    with open('a.txt','r',encoding='utf-8') as f:
        dic = json.load(f)
    print('第%s个人在查票,余票为%s' % (i, dic['c']))
    pay(i,l)

def pay(i,l):
    with open('a.txt','r',encoding='utf-8') as f:
        dic = json.load(f)
    time.sleep(0.5)# 模拟网络延迟,当购买过程中也会有网络延迟
    if dic['c']:
        print('第%s个人买到票了 '%i)
        dic['c'] -= 1
    else:
        print('第%s个人没买到票'%i)
    with open('a.txt','w') as f:
        json.dump(dic,f)

if __name__ == '__main__':
    l = Lock()
    for i in range(10):
        p = Process(target=check,args=(i+1,l))
        p.start()

多个人同时抢票

多少人同时抢票

很明朗,上述例子中,因为多进度同时对一个逼近财富(a.txt文件)进行了读写操作,使文件内数据错乱,也致使了余票为1张,不过不少人都抢到票的假象。那就加锁来缓解它吧

澳门葡京备用网址 7澳门葡京备用网址 8

from multiprocessing import Process,Lock
import json
import time

def check(i,l):
    with open('a.txt','r',encoding='utf-8') as f:
        dic = json.load(f)
    print('第%s个人在查票,余票为%s' % (i, dic['c']))
    l.acquire()
    pay(i,l)# 为什么在这里加锁? 因为每个人都可以查票,读取数据,不会造成数据混乱,但是当买票的时候,就需要对临界资源的写入,所以对写操作加锁,使某一个进程在写文件时候,其他进程不能碰此文件。
    l.release()

def pay(i,l):
    with open('a.txt','r',encoding='utf-8') as f:
        dic = json.load(f)
    time.sleep(0.5)# 模拟网络延迟,当购买过程中也会有网络延迟
    if dic['c']:
        print('第%s个人买到票了 '%i)
        dic['c'] -= 1
    else:
        print('第%s个人没买到票'%i)
    with open('a.txt','w') as f:
        json.dump(dic,f)

if __name__ == '__main__':
    l = Lock()
    for i in range(10):
        p = Process(target=check,args=(i+1,l))
        p.start()

加锁消除领票难题

有关银行存取款的标题。同一个账户,有些人一向存,某些人在同目前间一向取,如果不对数码举办爱慕起来,就会招致的一种多少错乱难点。

from multiprocessing import Process, Lock,Value

def save_money(num):
    for i in range(100):
        time.sleep(0.05)
        num.value += 1

def draw_money(num):
    for i in range(100):
        time.sleep(0.05)
        num.value -= 1

if __name__ == '__main__':
    num = Value('i',1000)# 多进程中共享数据,一个int类型的数据,1000
    man = Process(target=save_money,args=(num,))
    woman = Process(target=draw_money,args=(num,))
    man.start()
    woman.start()
    time.sleep(6)
    print(num.value)

from multiprocessing import Process, Lock,Value

def save_money(num,l):
    for i in range(100):
        time.sleep(0.05)
        l.acquire()
        num.value += 1
        l.release()

def draw_money(num,l):
    for i in range(100):
        time.sleep(0.05)
        l.acquire()# 在操作存取款的数据时,先将数据锁住,不允许其他人更改此数据
        num.value -= 1
        l.release()

if __name__ == '__main__':
    l = Lock()
    num = Value('i',1000)# 多进程中共享数据,一个int类型的数据,1000
    man = Process(target=save_money,args=(num,l))
    woman = Process(target=draw_money,args=(num,l))
    man.start()
    woman.start()
    time.sleep(6)
    print(num.value)

这样才对!!!

一. cpython并发编程之多进度

1.2 Process类的介绍

Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)

强调:
1. 需要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号

    参数介绍:

1 group参数未使用,值始终为None
2 
3 target表示调用对象,即子进程要执行的任务
4 
5 args表示调用对象的位置参数元组,args=(1,2,'egon',)
6 
7 kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}
8 
9 name为子进程的名称

 主意介绍:

1 p.start():启动进程,并调用该子进程中的p.run() 
 2 p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  
 3 
 4 p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
 5 p.is_alive():如果p仍然运行,返回True
 6 
 7 p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程  

   属性介绍:

1 p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
2 
3 p.name:进程的名称
4 
5 p.pid:进程的pid
6 
7 p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
8 
9 p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可) 

什么样是线程(thread)?

线程是操作系统能够举办演算调度的小小单位。它被含有在经过中,是经过中的实际运作单位。一条线程指的是进度中三个单纯顺序的控制流,八个历程中得以并发多个线程,每条线程并行执行不同的义务。

multiprocessing.Semaphore(信号量机制)

上述讲的Lock,属于互斥锁,也等于一把钥匙配备一把锁,同时只允许锁住某一个数据。而信号量则是多把钥匙配备多把锁,相当于说同时允许锁住多个数据。

诸如在一个清水蓝发廊,里边有四人服务人员,那么那个发廊最多就同时同意进入7个人客人,当又有第十人客人来的时候,就需求在门外等候;当服务人士服务完某位客人后,才允许继续的人再进入二个,换句话说,这一个发廊最多而且接待6人客人,多的客人必须等待。

信号量同步基于内部计数器,用户开始化一个计数器初值(比如上述例子中就初步化为5),每调用一次acquire(),计数器减1;每调用一回release(),计数器加1。当计数器为0时,acquire()调用被打断。那是迪科斯彻(Dijkstra)信号量概念P()和V()的Python完成。信号量同步机制适用于访问像服务器那样的蝇头能源。信号量与进度池的概念很像,但是要不相同开,信号量涉及到加锁的定义

举个栗子:

from multiprocessing import Semaphore
from multiprocessing import Process
import time
import random

def sing(i,se):
    se.acquire()# 每次进来一位客人,信号量内部计数器减1
    print('%s进入小黑屋'%i)
    time.sleep(random.randint(1,3))
    print('%s交钱走人'%i)
    se.release()# 每次离开一位客人,信号量内部计数器加1


if __name__ == '__main__':
    se = Semaphore(5)# 初始化5把钥匙配备5把锁
    for i in range(10): # 模拟10个人要进入小黑屋子
        p = Process(target=sing,args=(i,se))
        p.start()

1.1 multiprocessing模块介绍

python中的十六线程不能够运用多核优势,倘若想要丰盛地使用多核CPU的财富(os.cpu_count()查看),在python中一大半气象须要拔取多进度。Python提供了拾分好用的多进度包multiprocessing。
 multiprocessing模块用来开启子进程,并在子进度中施行大家定制的天职(比如函数),该模块与多线程模块threading的编程接口类似。

multiprocessing模块的职能多多:帮助子过程、通讯和共享数据、执行不一款型的联手,提供了Process、Queue、Pipe、Lock等零件。

强调:
与线程差别,进度没有别的共享状态,进度修改的数码,改动仅限于该进度内。

1.3 Process类的施用

**=====================part1:创造并开启子进度的三种方法**

留意:在windows中Process()必须置于# if __name__ ==
‘__main__’:下

Since Windows has no fork, the multiprocessing module starts a new
Python process and imports the calling module. 
If Process() gets called upon import, then this sets off an infinite
succession of new processes (or until your machine runs out of
resources). 
This is the reason for hiding calls to Process() inside

if __name__ == “__main__”
since statements inside this if-statement will not get called upon
import.

由于Windows没有fork,多处理模块运维1个新的Python进度并导入调用模块。 
假使在导入时调用Process(),那么那将开行无限继承的新进度(或直到机器耗尽能源)。 
那是藏匿对Process()内部调用的原,使用if __name__ == “__main
__”,那些if语句中的语句将不会在导入时被调用。

出现开启格局一

from multiprocessing import Process     #导入子进程
import time,random
import os
def piao(name):         #定义一个函数传入name参数
    print(os.getppid(),os.getpid())      #os.getppid查看父类的进程id,os.getpid查看子进程的进程id
    print('%s is piaoing' %name)
    # time.sleep(random.randint(1,3))
    print('%s is piao end' %name)
if __name__ == '__main__':                #调用自身模块时
    p1=Process(target=piao,kwargs={'name':'alex',})    #定义子进程
    p2=Process(target=piao,args=('wupeiqi',))
    p3=Process(target=piao,kwargs={'name':'yuanhao',})
    p1.start()  #开启子进程
    p2.start()   #开启子进程是无序的
    p3.start()
    print('主进程',os.getpid())    会先启动主进程在开始子进程,

并发方式二

from multiprocessing import Process
import time,random
import os
class Piao(Process):                  #定义一个类,父类是Process
    def __init__(self,name):
        super().__init__()            #在子类用父类的
        self.name=name
    def run(self):
        print(os.getppid(),os.getpid())
        print('%s is piaoing' %self.name)
        # time.sleep(random.randint(1,3))
        print('%s is piao end' %self.name)
if __name__ == '__main__':
    p1=Piao('alex')                 #然后直接在类传入参数
    p2=Piao('wupeiqi')
    p3=Piao('yuanhao')

    p1.start()                      #在启动
    p2.start()
    p3.start()
    print('主进程',os.getpid(),os.getppid())

socket完结多并发实例

澳门葡京备用网址 9澳门葡京备用网址 10

from socket import *
from multiprocessing import Process
s=socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #就是它,在bind前加
s.bind(('127.0.0.1',8088))
s.listen(5)
def talk(conn,addr):
    while True: #通信循环
        try:
            data=conn.recv(1024)
            if not data:break
            conn.send(data.upper())
        except Exception:
            break
    conn.close()
if __name__ == '__main__':
    while True:#链接循环
        conn,addr=s.accept()
        p=Process(target=talk,args=(conn,addr))
        p.start()
    s.close()

服务端

澳门葡京备用网址 11澳门葡京备用网址 12

from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8088))

while True:
    msg=input('>>: ').strip()
    if not msg:continue
    c.send(msg.encode('utf-8'))
    data=c.recv(1024)
    print(data.decode('utf-8'))
c.close()

客户端

 

过程与线程的界别?

线程共享内存空间,进度的内存是单独的。

同多少个历程的线程之间可以直接沟通,但七个进程并行通讯必须透过七个当中代理。

创立三个新的线程很粗略,成立壹个新的历程须求对其父进度展开一次克隆。

3个线程可以决定和操作同一进度里的其余线程,不过经过只好操作子进度。

multiprocessing.伊芙nt(事件机制)

python中的事件机制,紧要用于主进度控制其余进度的履行,事件首要提供了多个主意
set、wait、clear。

    e = Event()
    e.set()
#将is_set()设为True
    e.clear() #
将is_set()设为False
    e.wait()
#判断is_set的bool值,如果bool为True,则非阻塞,bool值为False,则阻塞
    e.is_set() # 标识
    事件是经过is_set()的bool值,去标识e.wait()
的阻塞状态
    当is_set()的bool值为False时,e.wait()是阻塞状态
    当is_set()的bool值为True时,e.wait()是非阻塞状态
    当使用set()时,是把is_set的bool变为True
    当使用clear()时,是把is_set的bool变为False

举个栗子:

from multiprocessing import Process, Event
import time


def tra(e):
    while 1: #红绿灯需要一直亮着,要么红灯,要么绿灯
        if e.is_set(): #True代表绿灯了,表示可以过车
            time.sleep(5)#睡5秒,让车在这5秒的时间内通过
            print('\033[31m红灯亮\033[0m')#绿灯亮5秒后提示红灯亮
            e.clear()#把is_set设置为False
        else:
            time.sleep(5)#此时代表红灯亮,应该红灯亮5秒.在此等5秒
            print('\033[32m绿灯亮\033[0m')#红灯亮够5秒该绿灯亮了
            e.set()#将is_set设置为True

def Car(i,e):
    e.wait()#车等在红绿灯,此时要看是红灯还剩绿灯,如果is_set = True 就可以过车
    print('第%s辆车过去了' % i)


if __name__ == '__main__':
    e = Event()
    triff_light = Process(target=tra,args=(e,))#信号灯的进程
    triff_light.start()
    for i in range(50):#描述50辆车的进程
        if i % 3 == 0:
            time.sleep(2)
        car = Process(target=Car,args=(i+1,e,))
        car.start()

1.2 Process类的牵线

创立进度的类:

Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)

强调:
1. 需要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号

参数介绍:

group参数未使用,值始终为None

target表示调用对象,即子进程要执行的任务

args表示调用对象的位置参数元组,args=(1,2,'egon',)

kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}

name为子进程的名称

措施介绍:

p.start():启动进程,并调用该子进程中的p.run() 
p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  

p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
p.is_alive():如果p仍然运行,返回True

p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程

品质介绍:

p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置

p.name:进程的名称

p.pid:进程的pid

p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)

p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)

 

1.4 进度同步(锁)

进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,
part1:共享同一打印终端,发现会有多行内容打印到一行的现象(多个进程共享并抢占同一个打印终端,乱了)
以上厕所为例

from multiprocessing import Process,Lock
import os
import time
def work():

    print('task[%s] 上厕所' %os.getpid())
    time.sleep(3)
    print('task[%s] 上完厕所' %os.getpid())
if __name__ == '__main__':
    p1=Process(target=work)
    p2=Process(target=work)
    p3=Process(target=work)
    p1.start()
    p2.start()
    p3.start()

    print('主')
输出结果:


task[5580] 上厕所
task[11164] 上厕所
task[12828] 上厕所
task[5580] 上完厕所
task[11164] 上完厕所
task[12828] 上完厕所

可以了然的看看,多少个子进程在抢一个打印终端,二个程序还没上完厕所,操作系统有个机制不会让下三个子进度不会再等待中。

 

用Lock模块,

from multiprocessing import Process,Lock     #Lock添加锁模块
import os
import time
def work(mutex):
    mutex.acquire()
    print('task[%s] 上厕所' %os.getpid())
    time.sleep(3)
    print('task[%s] 上完厕所' %os.getpid())
    mutex.release()
if __name__ == '__main__':
    mutex=Lock()                      #定义模块的值
    p1=Process(target=work,args=(mutex,))       #把模块的命名传给函数
    p2=Process(target=work,args=(mutex,))
    p3=Process(target=work,args=(mutex,))
    p1.start()
    p2.start()
    p3.start()

    print('主')
输出结果:


task[12404] 上厕所
task[12404] 上完厕所
task[5312] 上厕所
task[5312] 上完厕所
task[10544] 上厕所
task[10544] 上完厕所

近日就是一个子进度进入其他进度都在守候那多少个经过甘休才能跻身

需知:加锁的目标是为着保障多少个经过修改同一块数据时,同暂时间只好有一个修改,即串行的修改,没错,速度是慢了,就义了速度而保证了数据安全。

进程之间数据隔离,可是共享一套文件系统,因此可以因而文件来促成进度一贯的通讯,但难题是必须团结加锁处理

从而,就让大家帮文件作为数据库,模拟抢票(Lock互斥锁)

json读文件一定要充裕双引号,不然无法辨识

澳门葡京备用网址 13澳门葡京备用网址 14

模拟抢票
#db.txt文件内容
{"count": 1}        #假如票数只有一个了

#导入模块
import os,time
from multiprocessing import Process,Lock  
import json,random

#剩余票数
def func():
    f=json.load(open('db.txt'))     #用json序列化
    print('剩余票数%s'%f['count'])   # 直接取根据字典取值
#买票
def woak():
    f=json.load(open('db.txt'))   
    if f['count'] >0:              #判断字典的值是否大于0,大于0说明有票
        f['count']-=1             #有票就购买,然后就-1
        time.sleep(random.randint(1,3))     #模拟在网络上的延迟
        json.dump(f,open('db.txt','w'))        
        print('%s 购票成功'%os.getpid())   
def func1(mutex):
    func()
    time.sleep(random.randint(1,3))   #模拟网络上的延迟
    mutex.acquire()                          #加上锁
    woak()
    mutex.release()                          #当一个子进程结束就释放锁,不然永远这个子进程不会结束
if __name__ == '__main__':
    mutex=Lock()                                 
    for i in range(50):
        p=Process(target=func1,args=(mutex,))
        p.start()
'''
输出结果:
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
剩余票数1
11976 购票成功                   #只有一个购票成功

模仿买票

 

学习了通过使用共享的文书的主意,达成进度一直的共享,即共享数据的法门,那种格局必须考虑周到同步、锁等难题。而且文件是操作系统提供的抽象,可以看作进程平素通讯的介质,与mutiprocess模块无关。

 

但其实mutiprocessing模块为大家提供了依照新闻的IPC通讯机制:队列和管道。

IPC机制中的队列又是按照(管道+锁)完成的,可以让大家从长短不一的锁难点中脱身出来,

大家应有尽量幸免使用共享数据,尽大概使用新闻传递和队列,避免处理千丝万缕的一块和锁难点,而且在经过数目扩展时,往往可以赢得更好的可获展性。

Python GIL(Global Interpreter Lock)

不论是开启多少个线程,有稍许个CPU,python在实践的时候在一如既往时刻只同意2个线程允许。

生产者消费者模型

第一种:

from multiprocessing import Queue,Process



def producer(q,product):
    for i in range(20):
        info = product + '的娃娃%s号' % i
        q.put(info)
    q.put(None)


def consumer(q,name):
    while 1:
        info = q.get()
        if info:
            print('%s拿走了%s' % (name,info))
        else:
            break

if __name__ == '__main__':
    q = Queue(20)
    p_pro = Process(target=producer,args=(q,'炫彩'))
    p_con = Process(target=consumer,args=(q,'corn'))
    p_pro.start()
    p_con.start()

第二种:

from multiprocessing import Queue,Process

def producer(q,product):
    for i in range(20):
        info = product + '的娃娃%s号' % str(i)
        q.put(info)


def consumer(q,name,color):
    while 1:
        info = q.get()
        if info:
            print('%s,%s拿走了%s\033[0m' % (color,name,info))
        else:# 当消费者获得队列中数据时,如果获得的是None,就是获得到了生产者不再生产数据的标识
            break# 此时消费者结束即可

if __name__ == '__main__':
    q = Queue()
    p_pro1 = Process(target=producer,args=(q,'炫彩'))
    p_pro2 = Process(target=producer,args=(q,'苍井井'))
    p_pro3 = Process(target=producer,args=(q,'波多多'))
    p_con1 = Process(target=consumer,args=(q,'alex','\033[31m'))
    p_con2 = Process(target=consumer,args=(q,'wusir','\033[32m'))
    p_l = [p_con1,p_con2,p_pro1,p_pro2,p_pro3]
    [i.start() for i in p_l]
    p_pro1.join()
    p_pro2.join()
    p_pro3.join()
    q.put(None)# 几个消费者就要接受几个结束标识
    q.put(None)

1.3 Process类的运用

1.5 进度间通讯(IPC)形式一:队列(推荐使用)

   
进程互相之间相互隔离,要贯彻进程间通讯(IPC),multiprocessing模块帮助三种样式:队列和管道,那三种办法都以采纳音信传递的

    始建队列的类(底层就是以管道和锁定的方法贯彻)

1 Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。 

  参数介绍:

1 maxsize是队列中允许最大项数,省略则无大小限制。  

格局介绍:

    主要措施:

1 q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
2 q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
3  
4 q.get_nowait():同q.get(False)
5 q.put_nowait():同q.put(False)
6 
7 q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
8 q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
9 q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样

    其他形式(驾驭):

1 q.cancel_join_thread():不会在进程退出时自动连接后台线程。可以防止join_thread()方法阻塞
2 q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,如果某个使用者正在被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
3 q.join_thread():连接队列的后台线程。此方法用于在调用q.close()方法之后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread方法可以禁止这种行为

   应用:

rom multiprocessing import Process,Queue
# 1:可以往队列里放任意类型的数据 2 队列:先进先出
q=Queue(3)
q.put('first')
q.put('second')
q.put('third')
# q.put('fourht')

print(q.get())
print(q.get())
print(q.get())
# print(q.get())


q=Queue(3)
q.put('first',block=False)
q.put('second',block=False)
q.put('third',block=False)
# q.put('fourth',block=False)
q.put('fourth',block=True,timeout=3)


q.get(block=False)
q.get(block=True,timeout=3)

q.get_nowait() #q.get(block=False)

  生产者消费者模型

在产出编程中采取生产者和消费者情势可以缓解大部分产出难题。该格局通过平衡生产线程和消费线程的劳作能力来狠抓程序的一体化处理数据的进度。

干什么要利用生产者和买主格局

在线程世界里,生产者就是生产数据的线程,消费者就是花费数量的线程。在二十四线程开发当中,固然劳动者处理速度很快,而消费者处理速度很慢,那么生产者就不大概不等待顾客处理完,才能持续生产数据。同样的道理,尽管顾客的拍卖能力超过生产者,那么消费者就务须等待生产者。为了化解那个难点于是引入了劳动者和顾客情势。

怎样是劳动者消费者方式

劳动者消费者情势是由此多少个容器来缓解劳动者和顾客的强耦合难点。生产者和顾客互相之间不直接通信,而经过阻塞队列来展开报导,所以生产者生产完数据将来并非等待买主处理,直接扔给卡住队列,消费者不找生产者要多少,而是径直从绿灯队列里取,阻塞队列就一定于八个缓冲区,平衡了劳动者和顾客的拍卖能力。

依据队列达成生产者消费者模型

澳门葡京备用网址 15澳门葡京备用网址 16

# 生产者消费者模型1
from multiprocessing import Process,Queue
import time
import random
import os
def consumer(q):
    while True:
        res=q.get()
        if res is None:
            break
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃了 %s\033[0m' % (os.getpid(), res))
def producer(q):
    for i in range(5):
        time.sleep(2)
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 制造了 %s\033[0m' %(os.getpid(),res))
    q.put(None)
if __name__ == '__main__':
    q=Queue()
    #生产者们:厨师们
    p1=Process(target=producer,args=(q,))

    #消费者们:吃货们
    p2=Process(target=consumer,args=(q,))

    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print('主')

from multiprocessing import Process,Queue
import time,random,os


def consumer(q):
    while True:
        time.sleep(random.randint(1,3))
        res=q.get()
        if res is None:break
        print('\033[45m消费者拿到了:%s\033[0m' %res)

def producer(seq,q):
    for item in seq:
        time.sleep(random.randint(1,3))
        print('\033[46m生产者生产了:%s\033[0m' %item)

        q.put(item)

if __name__ == '__main__':
    q=Queue()

    c=Process(target=consumer,args=(q,))
    c.start()

    producer(('包子%s' %i for i in range(10)),q)
    q.put(None)
    c.join()
    print('主线程')

澳门葡京备用网址 17澳门葡京备用网址 18

#生产者消费者模型3
from multiprocessing import Process,JoinableQueue
import time
import random
import os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃了 %s\033[0m' % (os.getpid(), res))
        q.task_done()

def product_baozi(q):
    for i in range(5):
        time.sleep(2)
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 制造了 %s\033[0m' %(os.getpid(),res))
    q.join()

if __name__ == '__main__':
    q=JoinableQueue()
    #生产者们:厨师们
    p1=Process(target=product_baozi,args=(q,))

    #消费者们:吃货们
    p4=Process(target=consumer,args=(q,))
    p4.daemon=True

    p1.start()
    p4.start()

    p1.join()
    print('主')
    #p2结束了

澳门葡京备用网址 19澳门葡京备用网址 20

from multiprocessing import Process,JoinableQueue
import time
import random
import os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃了 %s\033[0m' % (os.getpid(), res))
        q.task_done()

def product_baozi(q):
    for i in range(3):
        time.sleep(2)
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 制造了 %s\033[0m' %(os.getpid(),res))
    q.join()

def product_gutou(q):
    for i in range(3):
        time.sleep(2)
        res='骨头%s' %i
        q.put(res)
        print('\033[44m%s 制造了 %s\033[0m' %(os.getpid(),res))
    q.join()

def product_ganshui(q):
    for i in range(3):
        time.sleep(2)
        res='泔水%s' %i
        q.put(res)
        print('\033[44m%s 制造了 %s\033[0m' %(os.getpid(),res))
    q.join()
if __name__ == '__main__':
    q=JoinableQueue()
    #生产者们:厨师们
    p1=Process(target=product_baozi,args=(q,))
    p2=Process(target=product_gutou,args=(q,))
    p3=Process(target=product_ganshui,args=(q,))

    #消费者们:吃货们
    p4=Process(target=consumer,args=(q,))
    p5=Process(target=consumer,args=(q,))
    p4.daemon=True
    p5.daemon=True

    p_l=[p1,p2,p3,p4,p5]
    for p in p_l:
        p.start()

    p1.join()
    p2.join()
    p3.join()

    print('主')

 

开创队列的其余三个类:

   
JoinableQueue([maxsize]):那如同一个Queue对象,但队列允许项目的使用者公告生成者项目已经被成功拍卖。文告进度是使用共享的信号和规则变量来完成的。

    参数介绍:

    maxsize是队列中允许最大项数,省略则无大小限制。    

  办法介绍:

    JoinableQueue的实例p除了与Queue对象相同的方法之外还怀有:

   
q.task_done():使用者利用此格局发出信号,表示q.get()的回到项目已经被拍卖。假诺调用此办法的次数超越从队列中剔除项目标数额,将掀起ValueError很是

   
q.join():生产者调用此措施进行围堵,直到队列中保有的类型均被拍卖。阻塞将不断到行列中的逐个门类均调用q.task_done()方法截至

 

Python threading模块

进程间通讯——队列和管道(multiprocess.Queue、multiprocess.Pipe)

经过间通讯–IPC(Inter-Process
Communication)

1.创设并开启子进程的二种艺术

注:
在windows中Process()必须置于# if __name__ == ‘__main__’:下

Since Windows has no fork, the multiprocessing module starts a new
Python process and imports the calling module. 
If Process() gets called upon import, then this sets off an infinite
succession of new processes (or until your machine runs out of
resources). 
This is the reason for hiding calls to Process() inside

if __name__ == “__main__”
since statements inside this if-statement will not get called upon
import.

出于Windows没有fork,多处理模块运行三个新的Python进度并导入调用模块。 
一旦在导入时调用Process(),那么那将开行无限继承的新进度(或直到机器耗尽财富)。 
那是藏匿对Process()内部调用的法则,使用if __name__ == “__main
__”,这么些if语句中的语句将不会在导入时被调用。

澳门葡京备用网址 21澳门葡京备用网址 22

#! /usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = "shuke"
# Date: 2017/6/26 0026

import time
import random
from multiprocessing import Process


def talk(name):
    print("%s is say 'Hello'" % name)
    time.sleep(3)
    print("talking end")

if __name__ == '__main__':
    p1=Process(target=talk,args=('Shuke',))         # args是元组的形式,必须加逗号
    p2=Process(target=talk,args=('Tom',))
    p3=Process(target=talk,args=('Eric',))
    p4=Process(target=talk,args=('Lucy',))
    p1.start()
    p2.start()
    p3.start()
    p4.start()

打开进度(格局一)

澳门葡京备用网址 23澳门葡京备用网址 24

import time
import random
from multiprocessing import Process


class Talk(Process):    # 继承Process类

    def __init__(self,name):
        super(Talk, self).__init__()    # 继承父类__init__方法
        self.name=name

    def run(self):          # 必须实现一个run方法,规定
        print("%s is say 'Hello'" % self.name)
        time.sleep(random.randint(1,3))
        print("%s talking end"% self.name)

if __name__ == '__main__':
    p1=Talk('Shuke')
    p2=Talk('Eric')
    p3=Talk('Tome')
    p4=Talk('Lucy')

    p1.start()          # start方法会自动调用run方法运行
    p2.start()
    p3.start()
    p4.start()
    print("主线程")

'''
执行结果:
主线程
Shuke is say 'Hello'
Lucy is say 'Hello'
Tome is say 'Hello'
Eric is say 'Hello'
Tome talking end
Eric talking end
Lucy talking end
Shuke talking end
'''

拉开进度(格局二)

并发落成socket通讯示例

澳门葡京备用网址 25澳门葡京备用网址 26

from socket import *
from multiprocessing import Process

server = socket(AF_INET, SOCK_STREAM)
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 8081))
server.listen(5)


def talk(conn, client_addr):
    while True:
        try:
            msg = conn.recv(1024)
            if not msg: break
            conn.send(msg.upper())
        except Exception:
            break


if __name__ == '__main__':  # windows下start进程一定要写到这下面
    while True:
        conn, addr = server.accept()
        p = Process(target=talk, args=(conn, addr))
        p.start()

server端

澳门葡京备用网址 27澳门葡京备用网址 28

from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8081))

while True:
    msg=input('>>:').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))

client端

存在的难题:

每来二个客户端,都在服务端开启3个经过,如若并发来三个万个客户端,要敞开10000个经过吗,你本人尝试着在你协调的机器上打开二万个,10万个经过试一试。

缓解格局:进度池

管道

进度间通信(IPC)格局二:管道(不推荐使用,精通即可)

澳门葡京备用网址 29澳门葡京备用网址 30

#创建管道的类:
Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道
#参数介绍:
dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。
#主要方法:
    conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
    conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象
 #其他方法:
conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法
conn1.fileno():返回连接使用的整数文件描述符
conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。

conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。
conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收    

conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。

介绍

澳门葡京备用网址 31澳门葡京备用网址 32

from multiprocessing import Process,Pipe

import time,os
def consumer(p,name):
    left,right=p
    left.close()
    while True:
        try:
            baozi=right.recv()
            print('%s 收到包子:%s' %(name,baozi))
        except EOFError:
            right.close()
            break
def producer(seq,p):
    left,right=p
    right.close()
    for i in seq:
        left.send(i)
        # time.sleep(1)
    else:
        left.close()
if __name__ == '__main__':
    left,right=Pipe()

    c1=Process(target=consumer,args=((left,right),'c1'))
    c1.start()


    seq=(i for i in range(10))
    producer(seq,(left,right))

    right.close()
    left.close()

    c1.join()
    print('主进程')

依据管道达成的通讯

专注:生产者和消费者都不曾采纳管道的某部端点,就应有将其倒闭,如在劳动者中关闭管道的右端,在消费者中关闭管道的左端。若是忘记执行这一个步骤,程序大概再消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在全部进度中关闭管道后才能生产EOFError很是。由此在劳动者中关闭管道不会有其余作用,付费消费者中也关闭了同等的管道端点。

一直调用

  1. import threading,time

  2.  

  3. def run_num(num):

  4.     “””

  5.     定义线程要运转的函数

  6.     :param num:

  7.     :return:

  8.     “””

  9.     print(“running on number:%s”%num)

  10.     time.sleep(3)

  11.  

  12. if
    __name__ == ‘__main__’:

  13.     # 生成一个线程实例t1

  14.     t1 =
    threading.Thread(target=run_num,args=(1,))

  15.     # 生成几个线程实例t2

  16.     t2 =
    threading.Thread(target=run_num,args=(2,))

  17.     # 运转线程t1

  18.     t1.start()

  19.     # 运转线程t2

  20.     t2.start()

  21.     # 获取线程名

  22.     print(t1.getName())

  23.     print(t2.getName())

  24. 输出:

  25. running on number:1

  26. running on number:2

  27. Thread-1

  28. Thread-2

队列(multiprocess.Queue) 

    import queue  #
无法开展多进度之间的数码传输

(1)from multiprocessing import Queue
借助Queue消除劳动者消费者模型,队列是平安的。
  q = Queue(num)
  num : 队列的最大尺寸
  q.get()#
阻塞等待获取数据,假使有数据直接获取,如果没有数量,阻塞等待
  q.put()#
阻塞,假若得以连续往队列中放数据,就直接放,不可以放就短路等待

  q.get_nowait()#
不阻塞,倘若有数量直接得到,没有多少就报错
  q.put_nowait()#
不阻塞,如若得以屡次三番往队列中放数据,就径直放,无法放就报错

(2)from multiprocessing import
JoinableQueue#可总是的行列

  JoinableQueue是屡次三番Queue,所以可以行使Queue中的方法

  并且JoinableQueue又多了五个方法
  q.join()# 用于生产者。等待
q.task_done的回来结果,通过重回结果,生产者就能获撤销费者当前消费了稍稍个数据
  q.task_done() #
用于消费者,是指每消费队列中2个数额,就给join重回三个标识。

2. Process对象的其余艺术和质量

经过对象的其他方法一:terminate,is_alive

澳门葡京备用网址 33澳门葡京备用网址 34

import time
import random
from multiprocessing import Process


class Talk(Process):    # 继承Process类

    def __init__(self,name):
        super(Talk, self).__init__()    # 继承父类__init__方法
        self.name=name

    def run(self):          # 必须实现一个run方法,规定
        print("%s is say 'Hello'" % self.name)
        time.sleep(random.randint(1,3))
        print("%s talking end"% self.name)

if __name__ == '__main__':
    p1=Talk('Shuke')

    p1.start()          # start方法会自动调用run方法运行
    p1.terminate()      # 关闭进程,不会立即关闭,所以is_alive立刻查看的结果可能还是存活
    print(p1.is_alive())# True
    time.sleep(1)       # 模拟CPU调度的延时
    print("====分割线====")
    print(p1.is_alive())# False

'''
执行结果:
True
====分割线====
False
'''

terminate,is_alive

进度对象的此外艺术二:p1.daemon=True,p1.join

澳门葡京备用网址 35澳门葡京备用网址 36

import time
import random
from multiprocessing import Process


class Talk(Process):

    def __init__(self,name):
        super(Talk, self).__init__()
        self.name=name

    def run(self):
        print("%s is say 'Hello'" % self.name)
        time.sleep(random.randint(1,3))
        print("%s talking end"% self.name)

if __name__ == '__main__':
    p1=Talk('Shuke')
    p1.daemon = True    # 一定要在p1.start()前设置,设置p1为守护进程,禁止p1创建子进程,并且父进程结束,p1跟着一起结束
    p1.start()          # start方法会自动调用run方法运行
    p1.join(0.0001)     # 等待p1停止,等0.0001秒就不再等了

p1.daemon=True,p1.join

剖析p1.join

澳门葡京备用网址 37澳门葡京备用网址 38

from multiprocessing import Process

import time
import random
def piao(name):
    print('%s is piaoing' %name)
    time.sleep(random.randint(1,3))
    print('%s is piao end' %name)

p1=Process(target=piao,args=('egon',))
p2=Process(target=piao,args=('alex',))
p3=Process(target=piao,args=('yuanhao',))
p4=Process(target=piao,args=('wupeiqi',))

p1.start()
p2.start()
p3.start()
p4.start()

p1.join()
p2.join()
p3.join()
p4.join()

print('主线程')

#疑问:既然join是等待进程结束,那么我像下面这样写,进程不就又变成串行的了吗?
#当然不是了
#注意:进程只要start就会在开始运行了,所以p1-p4.start()时,系统中已经有四个并发的进程了
#而我们p1.join()是在等p1结束,没错p1只要不结束主线程就会一直卡在原地,这也是问题的关键
#join是让主线程等,而p1-p4仍然是并发执行的,p1.join的时候,其余p2,p3,p4仍然在运行,等p1.join结束,可能p2,p3,p4早已经结束了,这样p2.join,p3.join.p4.join直接通过
# 所以4个join花费的总时间仍然是耗费时间最长的那个进程运行的时间


#上述启动进程与join进程可以简写为
p_l=[p1,p2,p3,p4]

for p in p_l:
    p.start()

for p in p_l:
    p.join()

有了join,程序不就是串行了吗???

【澳门葡京备用网址】进度与线程,python全栈开发从入门到放弃之socket并发编程多进程。进度对象的此外属性:name,pid

澳门葡京备用网址 39澳门葡京备用网址 40

import time
import random
from multiprocessing import Process


class Talk(Process):

    def __init__(self,name):
        # self.name=name
        # super().__init__() #Process的__init__方法会执行self.name=Piao-1,
        #                    #所以加到这里,会覆盖我们的self.name=name

        # 为我们开启的进程设置名字的做法
        super().__init__()
        self.name=name

    def run(self):
        print("%s is say 'Hello'" % self.name)
        time.sleep(random.randint(1,3))
        print("%s talking end"% self.name)

if __name__ == '__main__':
    p1=Talk('Shuke')
    p1.start()          # start方法会自动调用run方法运行
    print("====")
    print(p1.pid)       # 查看pid

'''
执行结果:
====
20484
Shuke is say 'Hello'
Shuke talking end
'''

属性:name,pid

八 共享数据

展望将来,基于音信传递的产出编程是任其自然

尽管是应用线程,推荐做法也是将先后设计为大气独自的线程集合

通过新闻队列互换数据。那样石破天惊地减小了对运用锁定和其余一同手段的须要,

还可以扩大到分布式系统中

进度间通讯应该尽量防止使用本节所讲的共享数据的点子

进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的

虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. For example,

澳门葡京备用网址 41澳门葡京备用网址 42

from multiprocessing import Manager,Process,Lock
import os
def work(d,lock):
    # with lock: #不加锁而操作共享的数据,肯定会出现数据错乱
        d['count']-=1

if __name__ == '__main__':
    lock=Lock()
    with Manager() as m:
        dic=m.dict({'count':100})
        p_l=[]
        for i in range(100):
            p=Process(target=work,args=(dic,lock))
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
        print(dic)
        #{'count': 94}

View Code

澳门葡京备用网址 43澳门葡京备用网址 44

互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去,如果指定信号量为3,那么来一个人获得一把锁,计数加1,当计数等于3时,后面的人均需要等待。一旦释放,就有人可以获得一把锁

    信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念

from multiprocessing import Process,Semaphore
import time,random

def go_wc(sem,user):
    sem.acquire()
    print('%s 占到一个茅坑' %user)
    time.sleep(random.randint(0,3)) #模拟每个人拉屎速度不一样,0代表有的人蹲下就起来了
    sem.release()

if __name__ == '__main__':
    sem=Semaphore(5)
    p_l=[]
    for i in range(13):
        p=Process(target=go_wc,args=(sem,'user%s' %i,))
        p.start()
        p_l.append(p)

    for i in p_l:
        i.join()
    print('============》')

信号量(了解)

继承式调用

  1. import threading,time

  2.  

  3. class
    MyThread(threading.Thread):

  4.     def __init__(self,num):

  5.         threading.Thread.__init__(self)

  1.         self.num = num

  2.     #
    定义逐个线程要运营的函数,函数名必须是run

  3.     def run(self):

  4.         print(“running on number:%s”%self.num)

  1.         time.sleep(3)

  2.  

  3. if
    __name__ == ‘__main__’:

  4.     t1 = MyThread(1)

  5.     t2 = MyThread(2)

  6.     t1.start()

  7.     t2.start()

  8. 输出:

  9. running on number:1

  10. running on number:2

管道(multiprocess.Pipe)

   from multiprocessing import
Pipe

   con1,con2 = Pipe()

 管道是不安全的.

 管道是用以多进度之间通信的一种格局.

 假使在单进度中运用管道,那么就是con1收多少,就是con2发数据.

            如果是con1发数据,就是con2收数据

 倘诺在多进程中选用管道,那么就务须是父进度使用con1收,子进度就不可以不运用con2发

                  父进度使用con1发,子进程就非得拔取con2收

                  父进度使用con2收,子进度就务须利用con1发

                  父进度使用con2发,子进度就亟须使用con1收

在管道中有一个老牌的失实叫做EOFError.是指,父进度若是关闭了发送端,子进度还持续接收数据,就会爆发EOFError错误

 

进程间的共享内存(Value,Manager)

 from multiprocessing import
Manager
 m = Manager()
 num = m.dict({键 : 值})
 num = m.list([1,2,3])

3. 进度同步(锁)

进度之间数据不共享,不过共享同一套文件系统,所以访问同三个文件,或同三个打印终端,是没极度的

#多进程共享一个打印终端(用python2测试看两个进程同时往一个终端打印,出现打印到一行的错误)
from multiprocessing import Process
import time
class Logger(Process):
    def __init__(self):
        super(Logger,self).__init__()
    def run(self):
        print(self.name)


for i in range(1000000):
    l=Logger()
    l.start()

#多进程共享一套文件系统
from multiprocessing import Process
import time,random

def work(f,msg):
    f.write(msg)
    f.flush()


f=open('a.txt','w') #在windows上无法把f当做参数传入,可以传入一个文件名,然后在work内用a+的方式打开文件,进行写入测试
for i in range(5):
    p=Process(target=work,args=(f,str(i)))
    p.start()

注:
既然可以用文件共享数据,那么进度间通讯用文件作为数据传输介质就可以了呀,可以,不过有失常态:

1.效率

2.内需本身加锁处理

需知:加锁的目标是为了有限帮忙三个进程修改同一块数据时,同近来间只能够有贰个改动,即串行的改动,没错,速度是慢了,捐躯了速度而有限支持了数据安全。

经过之间数据隔离,然则共享一套文件系统,因此可以经过文件来完结进程平素的通讯,但难点是必须自个儿加锁处理。所以,就让我们用文件作为数据库,模拟抢票,(Lock互斥锁),见下文抢票示例。

读书了经过行使共享的文本的主意,完结进程一贯的共享,即共享数据的法门,那种方法必须考虑周密同步、锁等题材。而且文件是操作系统提供的肤浅,可以当作进度一直通讯的介质,与mutiprocess模块非亲非故。

但实质上mutiprocessing模块为大家提供了依据音信的IPC通讯机制:队列和管道。

IPC机制中的队列又是依据(管道+锁)已毕的,可以让我们从长短不一的锁难题中摆脱出来,大家应该尽量防止使用共享数据,尽只怕采纳音讯传递和队列,防止处理盘根错节的同台和锁难题,而且在进程数目增添时,往往能够得到更好的可扩张性。

 

进程池

在行使Python举办系统管理的时候,尤其是同时操作多个文件目录,只怕远程控制多台主机,并行操作可以节省巨量的岁月。多进度是兑现产出的手法之1、需求小心的难题是:

  1. 很备受关注必要出现执行的任务平常要远大于核数
  2. 三个操作系统不能极端开启进程,平时有多少个核就开多少个进度
  3. 进度开启过多,效能反而会骤降(开启进度是内需占用系统财富的,而且打开多余核数目标进度也不知所可成功相互)

譬如当被操作对象数目不大时,可以平素运用multiprocessing中的Process动态成生多个进度,1捌个幸好,但假若是多多益善个,上千个。。。手动的去界定过程数量却又太过繁琐,此时可以表达进度池的效果。

我们就足以透过爱戴二个经过池来支配进度数目,比如httpd的进度形式,规定最小进度数和最大进度数… 
ps:对于远程进度调用的尖端应用程序而言,应该接纳进程池,Pool可以提供指定数量的长河,供用户调用,当有新的请求提交到pool中时,假设池还未曾满,那么就会创立3个新的经过用来执行该请求;但一旦池中的进度数一度高达规定最大值,那么该请求就会等待,直到池中有经过甘休,就收录进程池中的进度。

 
  成立进度池的类:借使指定numprocess为3,则经过池会从无到有开创多个进度,然后自始至终使用那多个经过去执行全部职责,不会敞开其余进程

1 Pool([numprocess  [,initializer [, initargs]]]):创建进程池 

    参数介绍:

1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None
3 initargs:是要传给initializer的参数组

 方法介绍:

    主要方法:

澳门葡京备用网址 45澳门葡京备用网址 46

1 p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()
2 p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。
3    
4 p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
5 P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用

要害形式

其他方法:

澳门葡京备用网址 47澳门葡京备用网址 48

方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法
obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。
obj.ready():如果调用完成,返回True
obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常
obj.wait([timeout]):等待结果变为可用。
obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函

此外艺术

进度池的应用

澳门葡京备用网址 49澳门葡京备用网址 50

from multiprocessing import Pool
import os,time,random
def task(n):
    print('<%s> is running' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2
if __name__ == '__main__':
    print(os.cpu_count())         #查看CPU是几核的
    p=Pool(4)                   #只能四个进程才能运行,如果后面还有连接则只会在这四个先前的进程号使用
    for i in range(1,7):
        res=p.apply(task,args=(i,)) #同步运行,阻塞、直到本次任务执行完毕拿到res
        print('本次任务的结果:%s' %res)

'''
输出结果:
4
<3688> is running
本次任务的结果:1
<7212> is running
本次任务的结果:4
<3576> is running
本次任务的结果:9
<1608> is running
本次任务的结果:16
#可以看到前面创建了4个进程,如果有新的进程只是从原先的进程中使用
<3688> is running
本次任务的结果:25
<7212> is running
本次任务的结果:36
'''

同步apply

澳门葡京备用网址 51澳门葡京备用网址 52

import os,time,random
def task(n):
    print('<%s> is running' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2
if __name__ == '__main__':
    print(os.cpu_count())         #查看CPU是几核的
    p=Pool(4)                   #只能四个进程才能运行,如果后面还有连接则只会在这四个先前的进程号使用
    obj_l=[]
    for i in range(1,21):
        obj=p.apply_async(task,args=(i,))    #同步运行,阻塞、直到本次任务执行完毕拿到obj
        obj_l.append(obj)
    p.close() #禁止往进程池内再添加任务
    p.join()
    print('主')
    for obj in obj_l:
        print('本次任务的结果:%s'%obj.get())      #需要注意如果不.get()的话是拿不到返回值的

#异步 先把进程都发过去,之后不管了,然后在给进程计算
'''
4
<2532> is running
<7088> is running
<5304> is running
<1052> is running
<5304> is running
<1052> is running
<2532> is running
<1052> is running
<2532> is running
<7088> is running
<1052> is running
<2532> is running
<5304> is running
<7088> is running
<5304> is running
<7088> is running
<1052> is running
<2532> is running
<2532> is running
<7088> is running
主
本次任务的结果:1
本次任务的结果:4
本次任务的结果:9
本次任务的结果:16
本次任务的结果:25
本次任务的结果:36
本次任务的结果:49
本次任务的结果:64
本次任务的结果:81
本次任务的结果:100
本次任务的结果:121
本次任务的结果:144
本次任务的结果:169
本次任务的结果:196
本次任务的结果:225
本次任务的结果:256
本次任务的结果:289
本次任务的结果:324
本次任务的结果:361
本次任务的结果:400
'''

apply_async

澳门葡京备用网址 53澳门葡京备用网址 54

#一:使用进程池(非阻塞,apply_async)
#coding: utf-8
from multiprocessing import Process,Pool
import time

def func(msg):
    print( "msg:", msg)
    time.sleep(1)
    return msg

if __name__ == "__main__":
    pool = Pool(processes = 3)
    res_l=[]
    for i in range(10):
        msg = "hello %d" %(i)
        res=pool.apply_async(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
        res_l.append(res)
    print("==============================>") #没有后面的join,或get,则程序整体结束,进程池中的任务还没来得及全部执行完也都跟着主进程一起结束了

    pool.close() #关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
    pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束

    print(res_l) #看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>对象组成的列表,而非最终的结果,但这一步是在join后执行的,证明结果已经计算完毕,剩下的事情就是调用每个对象下的get方法去获取结果
    for i in res_l:
        print(i.get()) #使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get

#二:使用进程池(阻塞,apply)
#coding: utf-8
from multiprocessing import Process,Pool
import time

def func(msg):
    print( "msg:", msg)
    time.sleep(0.1)
    return msg

if __name__ == "__main__":
    pool = Pool(processes = 3)
    res_l=[]
    for i in range(10):
        msg = "hello %d" %(i)
        res=pool.apply(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
        res_l.append(res) #同步执行,即执行完一个拿到结果,再去执行另外一个
    print("==============================>")
    pool.close()
    pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束

    print(res_l) #看到的就是最终的结果组成的列表
    for i in res_l: #apply是同步的,所以直接得到结果,没有get()方法
        print(i)

apply和apply_rsync的区别

磨炼:tcp套接字连接

澳门葡京备用网址 55澳门葡京备用网址 56

#Pool内的进程数默认是cpu核数,假设为4(查看方法os.cpu_count())
#开启6个客户端,会发现2个客户端处于等待状态
#在每个进程内查看pid,会发现pid使用为4个,即多个客户端公用4个进程
from socket import *
from multiprocessing import Pool
import os

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)

def talk(conn,client_addr):
    print('进程pid: %s' %os.getpid())
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == '__main__':
    p=Pool()
    while True:
        conn,client_addr=server.accept()
        p.apply_async(talk,args=(conn,client_addr))
        # p.apply(talk,args=(conn,client_addr)) #同步的话,则同一时间只有一个客户端能访问

server

澳门葡京备用网址 57澳门葡京备用网址 58

from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))

Client

发觉:并发开启几个客户端,服务端同一时半刻间只有三个不等的pid,干掉1个客户端,其它一个客户端才会进入,被1个进程之一处理

回调函数

亟待回调函数的面貌:进程池中其余二个职务一旦处理完了,就马上告诉主进度:作者好了额,你可以处理小编的结果了。主进度则调用一个函数去处理该结果,该函数即回调函数

作者们得以把耗时间(阻塞)的职务放到进度池中,然后指定回调函数(主进度负责实施),那样主进度在举行回调函数时就节约了I/O的进度,直接得到的是任务的结果。

澳门葡京备用网址 59澳门葡京备用网址 60

#pip3 install requests
from multiprocessing import Pool
import requests
import os
import time
def get_page(url):
    print('<%s> is getting [%s]' %(os.getpid(),url))
    response=requests.get(url)
    time.sleep(2)
    print('<%s> is done [%s]' % (os.getpid(), url))
    return {'url':url,'text':response.text}
def parse_page(res):
    print('<%s> parse [%s]' %(os.getpid(),res['url']))
    with open('db.txt','a') as f:
        parse_res='url:%s size:%s\n' %(res['url'],len(res['text']))
        f.write(parse_res)
if __name__ == '__main__':
    p=Pool(4)
    urls = [
        'https://www.baidu.com',
        'http://www.openstack.org',
        'https://www.python.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]
    for url in urls:
        p.apply_async(get_page,args=(url,),callback=parse_page)
    p.close()
    p.join()
    print('主',os.getpid())

'''
输出结果:
<5664> is getting [https://www.baidu.com]
<8912> is getting [http://www.openstack.org]
<7712> is getting [https://www.python.org]
<2608> is getting [https://help.github.com/]
<5664> is done [https://www.baidu.com]
<5664> is getting [http://www.sina.com.cn/]
<2044> parse [https://www.baidu.com]
<7712> is done [https://www.python.org]
<2044> parse [https://www.python.org]
<2608> is done [https://help.github.com/]
<2044> parse [https://help.github.com/]
<8912> is done [http://www.openstack.org]
<2044> parse [http://www.openstack.org]
<5664> is done [http://www.sina.com.cn/]
<2044> parse [http://www.sina.com.cn/]
主 2044
'''

View Code

澳门葡京备用网址 61澳门葡京备用网址 62

from multiprocessing import Pool
import time,random
import requests
import re

def get_page(url,pattern):
    response=requests.get(url)
    if response.status_code == 200:
        return (response.text,pattern)

def parse_page(info):
    page_content,pattern=info
    res=re.findall(pattern,page_content)
    for item in res:
        dic={
            'index':item[0],
            'title':item[1],
            'actor':item[2].strip()[3:],
            'time':item[3][5:],
            'score':item[4]+item[5]

        }
        print(dic)
if __name__ == '__main__':
    pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S)

    url_dic={
        'http://maoyan.com/board/7':pattern1,
    }

    p=Pool()
    res_l=[]
    for url,pattern in url_dic.items():
        res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)
        res_l.append(res)

    for i in res_l:
        i.get()

    # res=requests.get('http://maoyan.com/board/7')
    # print(re.findall(pattern,res.text))

爬虫

固然在主进度中等待进程池中存有义务都履行达成后,再统一处理结果,则无需回调函数

澳门葡京备用网址 63澳门葡京备用网址 64

from multiprocessing import Pool
import time,random,os

def work(n):
    time.sleep(1)
    return n**2
if __name__ == '__main__':
    p=Pool()

    res_l=[]
    for i in range(10):
        res=p.apply_async(work,args=(i,))
        res_l.append(res)

    p.close()
    p.join() #等待进程池中所有进程执行完毕

    nums=[]
    for res in res_l:
        nums.append(res.get()) #拿到所有结果
    print(nums) #主进程拿到所有的处理结果,可以在主进程中进行统一进行处理

View Code

 

Join and Daemon

 进程池

1.4 进程间通讯(IPC)格局一:队列

 进度互相之间相互隔离,要兑现进度间通讯,即IPC,multiprocessing模块支持二种样式:队列和管道,那三种方式都以使用新闻传递的,广泛应用在分布式系统中。

Queue模块有三种队列及构造函数:
  1. Python Queue模块的FIFO队列先进先出。 class Queue.Queue(maxsize)
  2. LIFO类似于堆,即先进后出。 class Queue.LifoQueue(maxsize)
  3. 还有一种是先行级队列级别越低越先出来。 class
Queue.PriorityQueue(maxsize)

Join

Join的听从是阻塞主进程,无法推行join前边的次第。

多线程多join的动静下,依次执行各线程的join方法,前边3个线程执行完结才能履行后边1个线程。

无参数时,则等待该线程截至,才实施后续的顺序。

设置参数后,则等待该线程设定的年月后就推行后边的主进度,而不管该线程是还是不是得了。

  1. import threading,time

  2.  

  3. class
    MyThread(threading.Thread):

  4.     def __init__(self,num):

  5.         threading.Thread.__init__(self)

  1.         self.num = num

  2.     #
    定义每一种线程要运维的函数,函数名必须是run

  3.     def run(self):

  4.         print(“running on number:%s”%self.num)

  1.         time.sleep(3)

  2.         print(“thread:%s”%self.num)

  3.  

  4. if
    __name__ == ‘__main__’:

  5.     t1 = MyThread(1)

  6.     t2 = MyThread(2)

  7.     t1.start()

  8.     t1.join()

  9.     t2.start()

  10.     t2.join()

  11. 输出:

  12. running on number:1

  13. thread:1

  14. running on number:2

  15. thread:2

安装参数效果如下:

  1. if
    __name__ == ‘__main__’:

  2.     t1 = MyThread(1)

  3.     t2 = MyThread(2)

  4.     t1.start()

  5.     t1.join(2)

  6.     t2.start()

  7.     t2.join()

  8. 输出:

  9. running on number:1

  10. running on number:2

  11. thread:1

  12. thread:2

 含义:

   进度池:三个池塘,里边有一定数量的经过。那些进程一直处在待命状态,一旦有职务来,马上就有进程去处理。

 因为在实质上业务中,职责量是有多有少的,假若职责量尤其的多,无法要开对应那么多的长河数

 开启那么多进度首先就需要消耗大批量的小运让操作系统来为你管理它。其次还索要费用大批量时日让
 cpu帮您调度它。

 进度池还会帮程序员去管理池中的进程。

Queue类(成立队列)

Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递,底层是以管道和锁的方式实现的。

参数介绍:

maxsize是队列中允许最大项数,省略则无大小限制。    

办法介绍:

一言九鼎方法:

q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.

q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)

q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
q.join() 实际上意味着等到队列为空,再执行别的操作

其余措施:

q.cancel_join_thread():不会在进程退出时自动连接后台线程。可以防止join_thread()方法阻塞
q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,如果某个使用者正在被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
q.join_thread():连接队列的后台线程。此方法用于在调用q.close()方法之后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread方法可以禁止这种行为

应用:

'''
multiprocessing 模块支持进程间通信的两种主要形式:管道和队列
都是基于消息传递实现的,都是队列接口
'''

from multiprocessing import Process,Queue
import time

q=Queue(5)
q.put([1,2,3])
q.put(('a','b','c'))
q.put(100)
q.put("Hello World")
q.put({'name':'shuke'})
# q.put('队列满了')           # 如果队列元素满了,后续put进入队列的数据将会处于等待状态,直到队列的元素被消费,才可以加入
print(q.qsize())            # 5; 返回队列的大小
print(q.full())             # True

print(q.get())              # [1, 2, 3]
print(q.get())              # ('a', 'b', 'c')
print(q.get())              # 100
print(q.get())              # Hello World
print(q.get())              # {'name': 'shuke'}
# print(q.get())            # 如果队列元素全部被消费完成,会一直卡住,直到队列中被放入新的元素
print(q.empty())            # True

Daemon

私自认同情形下,主线程在剥离时会等待全部子线程的终止。若是期望主线程不等待子线程,而是在退出时自动终止全数的子线程,就要求安装子线程为后台线程(daemon)。方法是透过调用线程类的setDaemon()方法。

  1. import time,threading

  2.  

  3. def run(n):

  4.     print(“%s”.center(20,”*”)%n)

  5.     time.sleep(2)

  6.     print(“done”.center(20,”*”))

  7.  

  8. def main():

  9.     for i in
    range(5):

  10.         t =
    threading.Thread(target=run,args=(i,))

  11.         t.start()

  12.         t.join(1)

  13.         print(“starting thread”,t.getName())

  14.  

  15. m =
    threading.Thread(target=main,args=())

  16. #
    将main线程设置位Daemon线程,它看做程序主线程的护理线程,当主线程退出时,m线程也会退出,由m运行的此外子线程会同时退出,不管是否履行到位

  1. m.setDaemon(True)

  2. m.start()

  3. m.join(3)

  4. print(“main thread done”.center(20,”*”))

  5. 输出:

  6. *********0*********

  1. starting thread Thread-2

  2. *********1*********

  1. ********done********
  1. starting thread Thread-3

  2. *********2*********

  1. **main thread done**

 方法:

劳动者消费者模型

在产出编程中采纳生产者和消费者形式可以缓解半数以上面世问题。该形式通过平衡生产线程和消费线程的劳作能力来增强程序的完整处理数据的进程。

干什么要拔取生产者和消费者格局

在线程世界里,生产者就是生产数量的线程,消费者就是费用数量的线程。在十二线程开发当中,如若劳动者处理速度很快,而顾客处理速度很慢,那么生产者就非得等待买主处理完,才能继承生产数据。同样的道理,如若顾客的拍卖能力超出生产者,那么消费者就必须等待生产者。为了缓解这几个难题于是引入了劳动者和顾客方式。

怎样是劳动者消费者形式

劳动者消费者方式是由此1个器皿来消除劳动者和顾客的强耦合难点。生产者和买主相互之间不间接通讯,而经过阻塞队列来展开杂志公布,所以生产者生产完数据之后不要等待买主处理,直接扔给卡住队列,消费者不找生产者要多少,而是平昔从绿灯队列里取,阻塞队列就相当于贰个缓冲区,平衡了劳动者和买主的处理能力。

基于队列完毕生产者消费者模型

澳门葡京备用网址 65澳门葡京备用网址 66

from multiprocessing import Process,Queue
import time
import random

def producer(seq,q,name):
    for item in seq:
        time.sleep(random.randint(1,3))
        q.put(item)
        print("%s 生产者生产了: %s"%(name,item))


def consumer(q,name):
    while True:
        time.sleep(random.randint(1,3))
        res=q.get()
        print("%s 消费者消费了: %s"%(name,res))


if __name__ == '__main__':
    q=Queue()
    seq=("苹果%s"% i for i in range(5))

    p=Process(target=consumer,args=(q,'Tom'))       # 以元组的方式传参
    p.start()
    producer(seq,q,'shuke')
    print("=====主线程=====")

'''
执行结果:
shuke 生产者生产了: 苹果0
Tom 消费者消费了: 苹果0
shuke 生产者生产了: 苹果1
Tom 消费者消费了: 苹果1
shuke 生产者生产了: 苹果2
shuke 生产者生产了: 苹果3
Tom 消费者消费了: 苹果2
shuke 生产者生产了: 苹果4
=====主线程=====
Tom 消费者消费了: 苹果3
Tom 消费者消费了: 苹果4
'''

劳动者消费者模型示例(基于队列)

澳门葡京备用网址 67澳门葡京备用网址 68

# 生产者发送结束标志给消费者
from multiprocessing import Process,Queue
import time
import random

def producer(seq,q,name):
    for item in seq:
        time.sleep(random.randint(1,3))
        q.put(item)
        print("%s 生产者生产了: %s"%(name,item))


def consumer(q,name):
    while True:
        time.sleep(random.randint(1,3))
        res=q.get()
        if res is None:break
        print("%s 消费者消费了: %s"%(name,res))


if __name__ == '__main__':
    q=Queue()
    seq=("苹果%s"% i for i in range(5))

    c=Process(target=consumer,args=(q,'Tom'))       # 以元组的方式传参
    c.start()

    producer(seq,q,'shuke')
    q.put(None)
    c.join()    # 主线程等待直到c消费者进程运行结束再继续往下运行
    print("=====主线程=====")

'''
执行结果:
shuke 生产者生产了: 苹果0
Tom 消费者消费了: 苹果0
shuke 生产者生产了: 苹果1
Tom 消费者消费了: 苹果1
shuke 生产者生产了: 苹果2
Tom 消费者消费了: 苹果2
shuke 生产者生产了: 苹果3
Tom 消费者消费了: 苹果3
shuke 生产者生产了: 苹果4
Tom 消费者消费了: 苹果4
=====主线程=====
'''

主线程等到买主截至

线程锁(互斥锁Mutex)

一个进度下可以运转两个线程,七个线程共享父进度的内存空间,也就意味着逐个线程可以访问同一份数据,此时,如果三个线程同时要修改同一份数据就须求线程锁。

  1. import time,threading

  2.  

  3. def addNum():

  4.     # 在各种线程中都取得那些全局变量

  1.     global num

  2.     print(“–get num:”,num)

  3.     time.sleep(1)

  4.     # 对此国有变量进行-1操作

  5.     num -= 1

  6. # 设置五个共享变量

  7. num = 100

  8. thread_list = []

  9. for i
    in range(100):

  10.     t = threading.Thread(target=addNum)

  1.     t.start()

  2.     thread_list.append(t)

  3. # 等待全部线程执行已毕

  4. for t
    in thread_list:

  5.     t.join()

  6.  

  7. print(“final num:”,num)

加锁版本

Lock时阻塞其余线程对共享能源的访问,且同一线程只可以acquire一回,如多于三回就应运而生了死锁,程序不可以继续执行。

  1. import time,threading

  2.  

  3. def addNum():

  4.     # 在各种线程中都取得那个全局变量

  1.     global num

  2.     print(“–get num:”,num)

  3.     time.sleep(1)

  4.     # 修改数据前加锁

  5.     lock.acquire()

  6.     # 对此国有变量举办-1操作

  7.     num -= 1

  8.     # 修改后释放

  9.     lock.release()

  10. # 设置2个共享变量

  11. num = 100

  12. thread_list = []

  13. # 生成全局锁

  14. lock =
    threading.Lock()

  15. for i
    in range(100):

  16.     t = threading.Thread(target=addNum)

  1.     t.start()

  2.     thread_list.append(t)

  3. # 等待全体线程执行已毕

  4. for t
    in thread_list:

  5.     t.join()

  6.  

  7. print(“final num:”,num)

 1).map(func,iterable)

    func:进程池中的进度执行的职分函数
    iterable:
可迭代对象,是把可迭代对象中的各个成分依次传给职务函数当参数

JoinableQueue类 (创制队列的其它二个类)

JoinableQueue([maxsize]):那似乎三个Queue对象,但队列允许项目标顾客通知劳动者队列已经被成功拍卖,文告进程是选拔共享的信号和规格变量来已毕的。

参数介绍:

maxsize是队列中允许最大项数,省略则无大小限制。

格局介绍:

JoinableQueue的实例p除了与Queue对象相同的方法之外还装有:

  • q.task_done():
    使用者利用此办法发出信号,表示q.get()的归来项目早就被拍卖。若是调用此格局的次数当先从队列中删除项目标数量,将吸引ValueError至极。
  • q.join():
    生产者调用此格局进行围堵,直到队列中享有的类型均被处理。阻塞将不止到行列中的每一种项目均调用q.task_done()方法停止。

澳门葡京备用网址 69澳门葡京备用网址 70

from multiprocessing import Process,JoinableQueue
import time
import random

def producer(seq,q,name):
    for item in seq:
        q.put(item)
        print("%s 生产者生产了: %s"%(name,item))
    q.join()            # 生产者调用此方法进行阻塞


def consumer(q,name):
    while True:
        res=q.get()
        if res is None:break
        print("%s 消费者消费了: %s"%(name,res))
        q.task_done()       # 使用者使用此方法发出信号,表示q.get()的返回元素已经被消费处理。

if __name__ == '__main__':
    q=JoinableQueue()
    seq=("苹果%s"% i for i in range(5))

    c=Process(target=consumer,args=(q,'Tom'))       # 以元组的方式传参
    c.daemon=True     # 在start之前进行设置为守护进程,在主线程停止时c也停止,但是不用担心,producer内调用q.join保证了consumer已经处理完队列中的所有元素
    c.start()

    producer(seq,q,'shuke')
    print("=====主线程=====")

'''
执行结果:
shuke 生产者生产了: 苹果0
Tom 消费者消费了: 苹果0
shuke 生产者生产了: 苹果1
Tom 消费者消费了: 苹果1
shuke 生产者生产了: 苹果2
Tom 消费者消费了: 苹果2
shuke 生产者生产了: 苹果3
Tom 消费者消费了: 苹果3
shuke 生产者生产了: 苹果4
Tom 消费者消费了: 苹果4
=====主线程=====
'''

q.join与q.task_done示例

澳门葡京备用网址 71澳门葡京备用网址 72

from multiprocessing import Process,JoinableQueue
import time
import random

def producer(seq,q,name):
    for item in seq:
        time.sleep(random.randint(1,3))
        q.put(item)
        print("%s 生产者生产了: %s"%(name,item))
    q.join()


def consumer(q,name):
    while True:
        time.sleep(random.randint(1, 3))
        res=q.get()
        if res is None:break
        print("%s 消费者消费了: %s"%(name,res))
        q.task_done()

if __name__ == '__main__':
    q=JoinableQueue()
    seq=("苹果%s"% i for i in range(5))

    c1=Process(target=consumer,args=(q,'消费者1'))       # 以元组的方式传参
    c2=Process(target=consumer,args=(q,'消费者2'))
    c3=Process(target=consumer,args=(q,'消费者3'))
    c1.daemon=True     # 在start之前进行设置为守护进程,在主线程停止时c也停止,但是不用担心,producer内调用q.join保证了consumer已经处理完队列中的所有元素
    c2.daemon=True
    c3.daemon=True
    c1.start()
    c2.start()
    c3.start()

    producer(seq,q,'shuke')
    print("=====主线程=====")

'''
执行结果:
shuke 生产者生产了: 苹果0
消费者3 消费者消费了: 苹果0
shuke 生产者生产了: 苹果1
消费者1 消费者消费了: 苹果1
shuke 生产者生产了: 苹果2
消费者2 消费者消费了: 苹果2
shuke 生产者生产了: 苹果3
消费者1 消费者消费了: 苹果3
shuke 生产者生产了: 苹果4
消费者3 消费者消费了: 苹果4
=====主线程=====
'''

1个劳动者+两个顾客

澳门葡京备用网址 73澳门葡京备用网址 74

from multiprocessing import Process,JoinableQueue
import time
import random

def producer(seq,q,name):
    for item in seq:
        # time.sleep(random.randint(1,3))
        q.put(item)
        print("%s 生产者生产了: %s"%(name,item))
    q.join()


def consumer(q,name):
    while True:
        # time.sleep(random.randint(1, 3))
        res=q.get()
        if res is None:break
        print("%s 消费者消费了: %s"%(name,res))
        q.task_done()

if __name__ == '__main__':
    q=JoinableQueue()
    seq=["苹果%s"% i for i in range(5)]

    c1=Process(target=consumer,args=(q,'消费者1'))       # 以元组的方式传参
    c2=Process(target=consumer,args=(q,'消费者2'))
    c3=Process(target=consumer,args=(q,'消费者3'))
    c1.daemon=True     # 在start之前进行设置为守护进程,在主线程停止时c也停止,但是不用担心,producer内调用q.join保证了consumer已经处理完队列中的所有元素
    c2.daemon=True
    c3.daemon=True
    c1.start()
    c2.start()
    c3.start()

    # producer(seq,q,'shuke')     # 也可以是下面三行的形式,开启一个新的子进程当生产者,不用主线程当生产者
    p=Process(target=producer,args=(seq,q,'shuke'))     # 注意此处参数seq为列表
    p.start()
    p.join()
    print("=====主线程=====")

'''
执行结果:
shuke 生产者生产了: 苹果0
shuke 生产者生产了: 苹果1
消费者3 消费者消费了: 苹果0
shuke 生产者生产了: 苹果2
消费者2 消费者消费了: 苹果1
消费者3 消费者消费了: 苹果2
shuke 生产者生产了: 苹果3
消费者2 消费者消费了: 苹果3
shuke 生产者生产了: 苹果4
消费者3 消费者消费了: 苹果4
=====主线程=====
'''

拉开三个子过程当作生产者而不是主线程

 

GIL VS Lock

GIL保证同暂且间只好有多少个线程来举办。lock是用户级的lock,与GIL没有关联。

澳门葡京备用网址 75

 2).apply(func,args=())同步实施任务

    func:进度池中的进度执行的天职函数
    args:
可迭代对象型的参数,是传给任务函数的参数

    同步处理义务时,不需要close和join

    同步处理职责时,进度池中的全体进度是一般进度(主进度需求等待其推行完结)

1.5 进程间通讯(IPC)方式二:管道(明白一些)

管道也得以说是队列的其它一种样式,上边我们就从头介绍基于管道完毕进度之间的消息传递

RLock(递归锁)

奥迪Q3lock允许在同一线程中被多次acquire,线程对共享财富的释放内需把装有锁都release。即n次acquire,要求n次release。

  1. def run1():

  2.     print(“grab the first part data”)

  3.     lock.acquire()

  4.     global num

  5.     num += 1

  6.     lock.release()

  7.     return num

  8.  

  9. def run2():

  10.     print(“grab the second part data”)

  11.     lock.acquire()

  12.     global num2

  13.     num2 += 1

  14.     lock.release()

  15.     return num2

  16.  

  17. def run3():

  18.     lock.acquire()

  19.     res = run1()

  20.     print(“between run1 and
    run2″.center(50,”*”))

  1.     res2 = run2()

  2.     lock.release()

  3.     print(res,res2)

  4.  

  5. if
    __name__ == ‘__main__’:

  6.     num,num2 = 0,0

  7.     lock = threading.RLock()

  8.     for i in
    range(10):

  9.         t =
    threading.Thread(target=run3)

  10.         t.start()

  11.  

  12. while
    threading.active_count() != 1:

  13.     print(threading.active_count())

  1. else:
  1.     print(“all threads done”.center(50,”*”))

  2.     print(num,num2)

那二种锁的要害不一致是,福特ExplorerLock允许在同一线程中被数13次acquire。而Lock却不容许那种景色。注意,假如应用中华VLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正自由所占有的锁。

 3).apply_async(func,args=(),callback=None)异步执行职责

    func:进程池中的进度执行的任务函数
    args:
可迭代对象型的参数,是传给职分函数的参数
    callback:
回调函数,就是说每当进度池中有进度处理完任务了,重返的结果可以交到回调函数,由回调函数举行尤其的拍卖,回调函数唯有异步才有,同步是未曾的

    异步处理职务时,进度池中的全体进度是医护进度(主进度代码执行已毕守护进度就终止)

    异步处理义务时,必要求添加close和join

Pipe类(制造管道)

Pipe([duplex]): 在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道

参数介绍:

dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。

澳门葡京备用网址 ,办法介绍:

关键方法:

  • conn1.recv():
    接收conn2.send(obj)发送的对象。要是没有音信可收取,recv方法会向来不通。假设连接的此外一端已经倒闭,那么recv方法会抛出EOFError。
  • conn1.send(obj): 通过连接发送对象。obj是与系列化包容的人身自由对象。

其他措施:

conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法。
conn1.fileno():返回连接使用的整数文件描述符。
conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。
conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收    
conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。

根据管道落成进程间通讯(与队列的办法是接近的,队列就是管道加锁已毕的)

from multiprocessing import Process,Pipe
import time

def consumer(p,name):
    left,right = p
    left.close()
    while True:
        try:
            fruit = right.recv()
            print("%s 收到水果: %s" % (name,fruit))
        except EOFError:
            right.close()
            break

def producer(seq,p):
    left,right = p
    right.close()
    for item in seq:
        left.send(item)
    else:
        left.close()

if __name__ == '__main__':
    left,right = Pipe()
    c1=Process(target=consumer,args=((left,right),'Tom'))
    c1.start()

    seq=(i for i in range(5))
    producer(seq,(left,right))
    right.close()
    left.close()

    c1.join()
    print("===主线程===")

'''
执行结果:
Tom 收到水果: 0
Tom 收到水果: 1
Tom 收到水果: 2
Tom 收到水果: 3
Tom 收到水果: 4
===主线程===
'''

 注:
生产者和顾客都不曾行使管道的某部端点,就相应将其倒闭,如在劳动者中关闭管道的右端,在消费者中关闭管道的左端。即便忘记执行那几个步骤,程序或许再消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在有着进程中关闭管道后才能生产EOFError格外。因此在劳动者中关闭管道不会有其余功用,除非消费者中也关闭了一致的管道端点。

管道可以用来双向通讯,平时接纳在客户端/服务器中动用的请求/响应模型或远程进程调用,就足以使用管道编写与经过并行的顺序,如下:

澳门葡京备用网址 76澳门葡京备用网址 77

from multiprocessing import Process,Pipe

import time,os
def adder(p,name):
    server,client=p
    client.close()
    while True:
        try:
            x,y=server.recv()
        except EOFError:
            server.close()
            break
        res=x+y
        server.send(res)
    print('server done')
if __name__ == '__main__':
    server,client=Pipe()

    c1=Process(target=adder,args=((server,client),'c1'))
    c1.start()

    server.close()

    client.send((10,20))
    print(client.recv())
    client.close()

    c1.join()
    print('主进程')

示例

注:
send()和recv()方法应用pickle模块对目的开展连串化。

 

Semaphore(信号量)

互斥锁同时只同意三个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据,比如领票处有二个窗口,那最七只允许三个人同时售票,前边的人只可以等前边任意窗口的人相差才能领票。

  1. import threading,time

  2.  

  3. def run(n):

  4.     semaphore.acquire()

  5.     time.sleep(1)

  6.     print(“run the thread:%s”%n)

  7.     semaphore.release()

  8.  

  9. if
    __name__ == ‘__main__’:

  10.     # 最多允许六个线程同时运维

  11.     semaphore =
    threading.BoundedSemaphore(5)

  12.     for i in
    range(20):

  13.         t =
    threading.Thread(target=run,args=(i,))

  14.         t.start()

  15.  

  16. while
    threading.active_count() != 1:

  17.     # print(threading.active_count())

  1.     pass

  2. else:

  1.     print(“all threads done”.center(50,”*”))

回调函数

  进程的天职函数的重回值,被当成回调函数的形参接收到,以此进行进一步的拍卖操作

回调函数是由主进度调用的,而不是子进度,子进度只担负把结果传递给回调函数

 

1.6 进度间通讯格局三:共享数据

展望将来,基于音信传递的产出编程是毫无疑问,即便是拔取线程,推荐做法也是将先后设计为大气独自的线程集合通过消息队列交流数据。这样翻天覆地地缩减了对应用锁定和其他一起手段的必要,还足以扩大到分布式系统中。

注:
进度间通讯应该尽量幸免使用本节所讲的共享数据的点子

进程间数据是独自的,可以依靠队列或管道完结通讯,二者都以基于音信传递的,固然进度间数据独立,但可以透过Manager完结多少共享,事实上Manager的机能远不止于此。

澳门葡京备用网址 78澳门葡京备用网址 79

from multiprocessing import Process,Manager
import os

def foo(name,d,l):
    l.append(os.getpid())
    d[name]=os.getpid()
if __name__ == '__main__':
    with Manager() as manager:
        d=manager.dict({'name':'shuke'})
        l=manager.list(['init',])

        p_l=[]
        for i in range(5):
            p=Process(target=foo,args=('p%s' %i,d,l))
            p.start()
            p_l.append(p)

        for p in p_l:
            p.join() #必须有join不然会报错

        print(d)
        print(l)
'''
执行结果:
{'p0': 62792, 'p4': 63472, 'name': 'shuke', 'p1': 60336, 'p3': 62704, 'p2': 63196}
['init', 60336, 62704, 62792, 63196, 63472]
'''

示例

 

Timer(定时器)

Timer隔一定时间调用2个函数,倘使想已毕每隔一段时间就调用多少个函数,就要在Timer调用的函数中,再度设置Timer。提姆er是Thread的三个派生类。

  1. import threading

  2.  

  3. def hello():

  4.     print(“hello,world!”)

  5. # delay 5秒未来执行hello函数

  6. t = threading.Timer(5,hello)

  7. t.start()

1.7 进度同步(锁),信号量,事件…

仿照抢票(Lock–>互斥锁)

# 文件db的内容为:{"count":1}
# 注意一定要用双引号,不然json无法识别
from multiprocessing import Process,Lock
import json
import time
import random
import os

def work(filename,lock): #买票
    # lock.acquire()
    with lock:      # with语法下面的代码块执行完毕会自动释放锁
        with open(filename,encoding='utf-8') as f:
            dic=json.loads(f.read())
            # print('剩余票数: %s' % dic['count'])
        if dic['count'] > 0:
            dic['count']-=1
            time.sleep(random.randint(1,3)) #模拟网络延迟
            with open(filename,'w',encoding='utf-8') as f:
                f.write(json.dumps(dic))
            print('%s 购票成功' %os.getpid())
        else:
            print('%s 购票失败' %os.getpid())
    # lock.release()

if __name__ == '__main__':
    lock=Lock()
    p_l=[]
    for i in range(5):
        p=Process(target=work,args=('db',lock))
        p_l.append(p)
        p.start()
    for p in p_l:
        p.join()

    print('主线程')

'''
执行结果:
63448 购票成功
13676 购票失败
61668 购票失败
63544 购票失败
17816 购票失败
主线程
'''

澳门葡京备用网址 80澳门葡京备用网址 81

#互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去,如果指定信号量为3,那么来一个人获得一把锁,计数加1,当计数等于3时,后面的人均需要等待。一旦释放,就有人可以获得一把锁

#信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念

from multiprocessing import Process,Semaphore
import time,random

def go_wc(sem,user):
    sem.acquire()
    print('%s 占到一个茅坑' %user)
    time.sleep(random.randint(0,3)) #模拟每个人拉屎速度不一样,0代表有的人蹲下就起来了
    sem.release()

if __name__ == '__main__':
    sem=Semaphore(5)
    p_l=[]
    for i in range(13):
        p=Process(target=go_wc,args=(sem,'user%s' %i,))
        p.start()
        p_l.append(p)

    for i in p_l:
        i.join()
    print('============》')

信号量Semahpore(同线程一样)

澳门葡京备用网址 82澳门葡京备用网址 83

# python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。
# 事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
clear:将“Flag”设置为False
set:将“Flag”设置为True

#_*_coding:utf-8_*_
#!/usr/bin/env python

from multiprocessing import Process,Event
import time,random

def car(e,n):
    while True:
        if not e.is_set(): #Flase
            print('\033[31m红灯亮\033[0m,car%s等着' %n)
            e.wait()
            print('\033[32m车%s 看见绿灯亮了\033[0m' %n)
            time.sleep(random.randint(3,6))
            if not e.is_set():
                continue
            print('走你,car', n)
            break

def police_car(e,n):
    while True:
        if not e.is_set():
            print('\033[31m红灯亮\033[0m,car%s等着' % n)
            e.wait(1)
            print('灯的是%s,警车走了,car %s' %(e.is_set(),n))
            break

def traffic_lights(e,inverval):
    while True:
        time.sleep(inverval)
        if e.is_set():
            e.clear() #e.is_set() ---->False
        else:
            e.set()

if __name__ == '__main__':
    e=Event()
    # for i in range(10):
    #     p=Process(target=car,args=(e,i,))
    #     p.start()

    for i in range(5):
        p = Process(target=police_car, args=(e, i,))
        p.start()
    t=Process(target=traffic_lights,args=(e,10))
    t.start()

    print('============》')

伊夫nt(同线程一样)

 

Event

Python提供了伊芙nt对象用于线程间通信,它是无线程设置的信号标志,倘使信号标志位为假,则线程等待指引信号被其余线程设置为真。伊夫nt对象完结了简约的线程通讯机制,它提供了安装信号、清除信号、等待等用于落到实处线程间的通讯。

1.8 进程池 星级: *****

1. 安装信号

采纳伊芙nt的set()方法可以安装伊夫nt对象内部的信号标志为真。伊夫nt对象提供了isSet()方法来判定其中间信号标志的转态,当使用event对象的set()方法后,isSet()方法重返真。

哪些时候使用进度池?

开多进度的目标是为着并发,如若有多核,平时有多少个核就开多少个进度,进度开启过多,功用反而会稳中有降(开启进度是索要占用系统财富的,而且打开多余核数目标经过也无能为力形成彼此),但很明显须求出现执行的天职要远大于核数,这时大家就足以由此维护一个历程池来支配进程数目,比如httpd的经过情势,规定最小进度数和最大进度数…
   

当被操作对象数目不大时,可以直接拔取multiprocessing中的Process动态成生七个经过,二十一个好在,但固然是成百上千个,上千个对象,手动的去界定进度数量却又太过繁琐,此时可以表明进度池的功能。

对于远程进程调用的高等应用程序而言,应该拔取进度池,Pool可以提供指定数量的进度,供用户调用,当有新的哀求提交到pool中时,假使池还不曾满,那么就会创立一个新的长河用来实施该请求;但只要池中的进度数已经达到规定最大值,那么该请求就会等待,直到池中有进程为止,就录取进程池中的进度。

注:
在利用Python举办系统管理的时候,越发是还要操作八个文件目录,或然远程控制多台主机,并行操作可以省去大批量的大运。

1. 清除信号

应用伊夫nt的clear()方法可以祛除伊夫nt对象内部的信号标志,即将其设为假,当使用伊芙nt的clear()方法后,isSet()方法再次回到假。

Pool类(创建进度池)

Pool([numprocess  [,initializer [, initargs]]]):创建进程池

参数介绍:

numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
initializer:是每个工作进程启动时要执行的可调用对象,默认为None
initargs:是要传给initializer的参数组

措施介绍:

主要格局:

p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()
p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。

p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
p.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数
P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用

任何格局:

方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法
obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。
obj.ready():如果调用完成,返回True
obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常
obj.wait([timeout]):等待结果变为可用。

 应用

 
 提交义务,并在主进程中得到结果(在此之前的Process是实施义务,结果放到队列里,以后得以在主进度中一直得到结果)

from multiprocessing import Pool
import time
def work(n):
    print('开工啦...')
    time.sleep(3)
    return n**2

if __name__ == '__main__':
    q=Pool()

    #异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果,否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了
    res=q.apply_async(work,args=(2,))
    q.close()
    q.join() #join在close之后调用
    print(res.get())

    #同步apply用法:主进程一直等apply提交的任务结束后才继续执行后续代码
    # res=q.apply(work,args=(2,))
    # print(res)

1. 等待

伊芙nt的wait()方法只有在里面信号为确实时候才会飞快的实践并做到再次来到。当伊夫nt对象的其中信号标志为假时,则wait()方法平昔守候其为真时才回来。

经过伊芙nt来兑现八个或多少个线程间的并行,上边以红绿灯为例,即起步3个线程做交通指挥灯,生成多少个线程做车辆,车辆行驶按红停绿行的规则。

  1. import threading,time,random

  2.  

  3. def light():

  4.     if
    not event.isSet():

  5.         event.set()

  6.     count = 0

  7.     while True:

  8.         if count < 5:

  9.             print(“\033[42;1m–green light
    on–\033[0m”.center(50,”*”))

  10.         elif count < 8:

  11.             print(“\033[43;1m–yellow light
    on–\033[0m”.center(50,”*”))

  12.         elif count < 13:

  13.             if event.isSet():

  14.                 event.clear()

  15.             print(“\033[41;1m–red light
    on–\033[0m”.center(50,”*”))

  16.         else:

  17.             count = 0

  18.             event.set()

  19.         time.sleep(1)

  20.         count += 1

  21.  

  22.  

  23. def car(n):

  24.     while 1:

  25.         time.sleep(random.randrange(10))

  1.         if event.isSet():

  2.             print(“car %s is running…”%n)

  3.         else:

  4.             print(“car %s is waiting for the red
    light…”%n)

  5.  

  6. if
    __name__ == “__main__”:

  1.     event = threading.Event()

  2.     Light =
    threading.Thread(target=light,)

  3.     Light.start()

  4.  

  5.     for i in
    range(3):

  6.         t =
    threading.Thread(target=car,args=(i,))

  7.         t.start()

拔取进程池维护稳定数目标历程

澳门葡京备用网址 84澳门葡京备用网址 85

'''
Pool内的进程数默认是cpu核数,假设为4(查看方法os.cpu_count())
开启6个客户端,会发现2个客户端处于等待状态
在每个进程内查看pid,会发现pid使用为4个,即多个客户端公用4个进程
'''

from socket import *
from multiprocessing import Pool
import os

server = socket(AF_INET, SOCK_STREAM)
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 8081))
server.listen(5)


def talk(conn, client_addr):
    print("进程PID: %s"%(os.getpid()))
    while True:
        try:
            msg = conn.recv(1024)
            if not msg: break
            conn.send(msg.upper())
        except Exception:
            break


if __name__ == '__main__':  # windows下start进程一定要写到这下面
    p = Pool()      # 默认使用CPU的核数
    while True:
        conn,client_addr=server.accept()
        p.apply_async(talk,args=(conn,client_addr))
        # p.apply(talk,args=(conn,client_addr))   # #同步的话,则同一时间只有一个客户端能访问

server端

澳门葡京备用网址 86澳门葡京备用网址 87

from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8081))

while True:
    msg=input('>>:').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))

client端

澳门葡京备用网址 88澳门葡京备用网址 89

from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8081))

while True:
    msg=input('>>:').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))

client1端

 

queue队列

Python中队列是线程间最常用的互换数据的情势。Queue模块是提供队列操作的模块。

回调函数(callback)  星级: *****

成立1个队列对象

  1. import queue

  2.  

  3. q = queue.Queue(maxsize = 10)

queue.Queue类是一个队列的联合已毕。队列长度可以无限或许个别。可以通过Queue的构造函数的可选参数maxsize来设定队列长度。如若maxsize小于1表示队列长度无限。

1. 不须求回调函数的现象

只要在主过程中等待历程池中具有任务都履行完成后,再统一处理结果,则无需回调函数。

from multiprocessing import Pool
import time,random,os

def work(n):
    time.sleep(1)
    return n**2
if __name__ == '__main__':
    p=Pool()

    res_l=[]
    for i in range(10):
        res=p.apply_async(work,args=(i,))
        res_l.append(res)

    p.close()
    p.join() #等待进程池中所有进程执行完毕

    nums=[]
    for res in res_l:
        nums.append(res.get()) #拿到所有结果
    print(nums) #主进程拿到所有的处理结果,可以在主进程中进行统一进行处理

将一个值放入队列中

  1. q.put(“a”)

调用队列对象的put()方法在队尾插入2个序列。put()有多个参数,第2个item为须求的,为插入项目标值;第1个block为可选参数,暗中认同为1。固然队列当前为空且block为1,put()方法就使调用线程暂停,直到空出贰个多少单元。即便block为0,put()方法将引发Full万分。

2.  回调函数的选取场景

进程池中别的3个职务一旦处理完了,就立时告诉主进度:小编好了额,你可以处理作者的结果了。主进度则调用3个函数去处理该结果,该函数即回调函数。

笔者们可以把耗时间(阻塞)的天职放到进度池中,然后指定回调函数(主进程负责履行),那样主进程在实施回调函数时就省去了I/O的经过,直接得到的是任务的结果。

from multiprocessing import Pool
import time,random,os

def get_page(url):
    print('(进程 %s) 正在下载页面 %s' %(os.getpid(),url))
    time.sleep(random.randint(1,3))
    return url #用url充当下载后的结果

def parse_page(page_content):
    print('<进程 %s> 正在解析页面: %s' %(os.getpid(),page_content))
    time.sleep(1)
    return '{%s 回调函数处理结果:%s}' %(os.getpid(),page_content)


if __name__ == '__main__':
    urls=[
        'http://maoyan.com/board/1',
        'http://maoyan.com/board/2',
        'http://maoyan.com/board/3',
        'http://maoyan.com/board/4',
        'http://maoyan.com/board/5',
        'http://maoyan.com/board/7',

    ]
    # 要创建进程池中的进程数,如果省略,将默认使用cpu_count()的值
    p=Pool()            
    res_l=[]

    #异步的方式提交任务,然后把任务的结果交给callback处理
    #注意:会专门开启一个进程来处理callback指定的任务(单独的一个进程,而且只有一个)
    for url in urls:
        res=p.apply_async(get_page,args=(url,),callback=parse_page)
        res_l.append(res)

    #异步提交完任务后,主进程先关闭p(必须先关闭),然后再用p.join()等待所有任务结束(包括callback)
    p.close()
    p.join()
    print('{主进程 %s}' %os.getpid())

    #收集结果,发现收集的是get_page的结果
    #所以需要注意了:
    #1. 当我们想要在将get_page的结果传给parse_page处理,那么就不需要i.get(),通过指定callback,就可以将i.get()的结果传给callback执行的任务
    #2. 当我们想要在主进程中处理get_page的结果,那就需要使用i.get()获取后,再进一步处理
    for i in res_l: #本例中,下面这两步是多余的
        callback_res=i.get()
        print(callback_res)

'''
打印结果:
(进程 52346) 正在下载页面 http://maoyan.com/board/1
(进程 52347) 正在下载页面 http://maoyan.com/board/2
(进程 52348) 正在下载页面 http://maoyan.com/board/3
(进程 52349) 正在下载页面 http://maoyan.com/board/4
(进程 52348) 正在下载页面 http://maoyan.com/board/5
<进程 52345> 正在解析页面: http://maoyan.com/board/3
(进程 52346) 正在下载页面 http://maoyan.com/board/7
<进程 52345> 正在解析页面: http://maoyan.com/board/1
<进程 52345> 正在解析页面: http://maoyan.com/board/2
<进程 52345> 正在解析页面: http://maoyan.com/board/4
<进程 52345> 正在解析页面: http://maoyan.com/board/5
<进程 52345> 正在解析页面: http://maoyan.com/board/7
{主进程 52345}
http://maoyan.com/board/1
http://maoyan.com/board/2
http://maoyan.com/board/3
http://maoyan.com/board/4
http://maoyan.com/board/5
http://maoyan.com/board/7
'''

澳门葡京备用网址 90澳门葡京备用网址 91

from multiprocessing import Pool
import time,random
import requests
import re

def get_page(url,pattern):
    response=requests.get(url)
    if response.status_code == 200:
        return (response.text,pattern)

def parse_page(info):
    page_content,pattern=info
    res=re.findall(pattern,page_content)
    for item in res:
        dic={
            'index':item[0],
            'title':item[1],
            'actor':item[2].strip()[3:],
            'time':item[3][5:],
            'score':item[4]+item[5]

        }
        print(dic)
if __name__ == '__main__':
    pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S)

    url_dic={
        'http://maoyan.com/board/7':pattern1,
    }

    p=Pool()
    res_l=[]
    for url,pattern in url_dic.items():
        res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)
        res_l.append(res)

    for i in res_l:
        i.get()

    # res=requests.get('http://maoyan.com/board/7')
    # print(re.findall(pattern,res.text))
'''
执行结果:
{'actor': '阿米尔·汗,萨卡诗·泰瓦,法缇玛·萨那·纱卡', 'index': '1', 'score': '9.8', 'title': '摔跤吧!爸爸', 'time': '2017-05-05'}
{'actor': '李微漪,亦风', 'index': '2', 'score': '9.3', 'title': '重返·狼群', 'time': '2017-06-16'}
{'actor': '高强,于月仙,李玉峰', 'index': '3', 'score': '9.2', 'title': '忠爱无言', 'time': '2017-06-09'}
{'actor': '杨培,尼玛扎堆,斯朗卓嘎', 'index': '4', 'score': '9.1', 'title': '冈仁波齐', 'time': '2017-06-20'}
{'actor': '约翰尼·德普,哈维尔·巴登,布兰顿·思怀兹', 'index': '5', 'score': '8.9', 'title': '加勒比海盗5:死无对证', 'time': '2017-05-26'}
{'actor': '戴夫·帕特尔,鲁妮·玛拉,大卫·文翰', 'index': '6', 'score': '8.8', 'title': '雄狮', 'time': '2017-06-22'}
{'actor': '蔡卓妍,周柏豪,钟欣潼', 'index': '7', 'score': '8.6', 'title': '原谅他77次', 'time': '2017-06-23'}
{'actor': '水田山葵,山新,大原惠美', 'index': '8', 'score': '8.6', 'title': '哆啦A梦:大雄的南极冰冰凉大冒险', 'time': '2017-05-30'}
{'actor': '盖尔·加朵,克里斯·派恩,罗宾·怀特', 'index': '9', 'score': '8.6', 'title': '神奇女侠', 'time': '2017-06-02'}
{'actor': '范楚绒,洪海天,谢元真', 'index': '10', 'score': '8.5', 'title': '潜艇总动员之时光宝盒', 'time': '2015-05-29'}
'''

爬虫应用

将1个值从队列中取出

  1. q.get()

调用队列对象的get()方法从队头删除并回到三个类型。可选参数为block,暗中同意为True。如若队列为空且block为True,get()就使调用线程暂停,直到有品种可用。如若队列为空且block为False,队列将引发Empty十分。

**apply_async(非阻塞**)和apply(**阻塞**)的分裂示例:**

**利用进度池(非阻塞,apply_async**

澳门葡京备用网址 92澳门葡京备用网址 93

from multiprocessing import Process,Pool
import time

def func(msg):
    print( "msg:", msg)
    # time.sleep(1)
    return 'Bye Bye!'

if __name__ == "__main__":
    processes=4                 # 进程池的进程总数
    pool = Pool(processes)      # 实例化
    res_l=[]
    for i in range(5):
        msg = "hello 同学%s" % str(i)
        res=pool.apply_async(func, args=(msg,))   # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
        res_l.append(res)

    print("============= 我是分割线 =================")
    pool.close()        # 关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
    pool.join()         # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool进程池,join函数等待所有子进程结束
    print("Sub-process(es) done.")
    for i in res_l:
        print(res.get())

'''
执行结果:
============= 我是分割线 =================
msg: hello 同学0
msg: hello 同学1
msg: hello 同学2
msg: hello 同学3
msg: hello 同学4
Sub-process(es) done.
Bye Bye!
Bye Bye!
Bye Bye!
Bye Bye!
Bye Bye!
'''

apply_async

运用进度池(阻塞,apply

澳门葡京备用网址 94澳门葡京备用网址 95

from multiprocessing import Process,Pool
import time

def func(msg):
    print( "msg:", msg)
    # time.sleep(1)
    return 'Bye Bye!'

if __name__ == "__main__":
    processes=4                 # 进程池的进程总数
    pool = Pool(processes)      # 实例化
    res_l=[]
    for i in range(5):
        msg = "hello 同学%s" % str(i)
        res=pool.apply(func, args=(msg,))   # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
        res_l.append(res)                   # 同步执行,即执行完一个拿到结果,再去执行另外一个

    print("============= 我是分割线 =================")
    pool.close()        # 关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
    pool.join()         # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool进程池,join函数等待所有子进程结束
    print("Sub-process(es) done.")
    print(res_l)
    for i in res_l:     # apply是同步的,所以直接得到结果,没有get()方法
        print(res)

'''
执行结果:
msg: hello 同学0
msg: hello 同学1
msg: hello 同学2
msg: hello 同学3
msg: hello 同学4
============= 我是分割线 =================
Sub-process(es) done.
['Bye Bye!', 'Bye Bye!', 'Bye Bye!', 'Bye Bye!', 'Bye Bye!']
Bye Bye!
Bye Bye!
Bye Bye!
Bye Bye!
Bye Bye!
'''

apply

澳门葡京备用网址 96澳门葡京备用网址 97

#coding: utf-8
import multiprocessing
import os, time, random

def Lee():
    print("\nRun task Lee-%s" %(os.getpid())) #os.getpid()获取当前的进程的ID
    start = time.time()
    time.sleep(random.random() * 10) #random.random()随机生成0-1之间的小数
    end = time.time()
    print('Task Lee, runs %0.2f seconds.' %(end - start))

def Marlon():
    print("\nRun task Marlon-%s" %(os.getpid()))
    start = time.time()
    time.sleep(random.random() * 40)
    end=time.time()
    print('Task Marlon runs %0.2f seconds.' %(end - start))

def Allen():
    print("\nRun task Allen-%s" %(os.getpid()))
    start = time.time()
    time.sleep(random.random() * 30)
    end = time.time()
    print('Task Allen runs %0.2f seconds.' %(end - start))

def Frank():
    print("\nRun task Frank-%s" %(os.getpid()))
    start = time.time()
    time.sleep(random.random() * 20)
    end = time.time()
    print('Task Frank runs %0.2f seconds.' %(end - start))

def Egon():
    print("\nRun task Egon-%s" %(os.getpid()))
    start = time.time()
    time.sleep(random.random() * 20)
    end = time.time()
    print('Task Egon runs %0.2f seconds.' %(end - start))

def Lily():
    print("\nRun task Lily-%s" %(os.getpid()))
    start = time.time()
    time.sleep(random.random() * 20)
    end = time.time()
    print('Task Lily runs %0.2f seconds.' %(end - start))

if __name__=='__main__':
    function_list=  [Lee, Marlon, Allen, Frank, Egon, Lily]
    print("parent process %s" %(os.getpid()))

    pool=multiprocessing.Pool(4)
    for func in function_list:
        pool.apply_async(func)     #Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中

    print('Waiting for all subprocesses done...')
    pool.close()
    pool.join()    #调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
    print('All subprocesses done.')

多个进程池

多个进程池

 

Python Queue模块有两种队列及构造函数

  1. # 先进先出

  2. class
    queue.Queue(maxsize=0)

  3. # 先进后出

  4. class
    queue.LifoQueue(maxsize=0)

  5. # 优先级队列级别越低越先出

  6. class
    queue.PriorityQueue(maxsize=0)

二. python并发编程之十二线程

 

常用方法

  1. q = queue.Queue()

  2. # 重回队列的大小

  3. q.qsize()

  4. # 若是队列为空,再次来到True,反之False

  1. q.empty()

  2. # 尽管队列满了,重临True,反之False

  1. q.full()

  2. # 获取队列,timeout等待时间

  3. q.get([block[,timeout]])

  4. # 相当于q.get(False)

  5. q.get_nowait()

  6. # 等到队列为空再执行其余操作

  7. q.join()

2.1 threading模块

multiprocess模块的接口完全模拟了threading模块的接口,二者在接纳范围,有很大的相似性,因此不再详细介绍。

劳动者消费者模型

在开发编程中行使生产者和顾客格局可以化解大部分冒出难题。该方式通过平衡生产线程和消费线程的行事能力来增进程序的全部处理数量的速度。

2.1.1打开线程的三种方法(同Process)

# 方法一
from threading import Thread
import time


def sayhi(name):
    time.sleep(2)
    print('%s say hello!'% name)

if __name__ == '__main__':
    t = Thread(target=sayhi,args=('shuke',))
    t.start()
    print("=====我是分割线=====")
    print("主线程")

# 方法二
from threading import Thread
import time


class Sayhi(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        time.sleep(2)
        print("%s say hello!"%self.name)

if __name__ == '__main__':
    t=Sayhi('shuke')
    t.start()
    print("=====我是分割线=====")
    print("主线程")

为啥要采用生产者和顾客形式

在线程世界里,生产者就是生产数据的线程,消费者就是开销数量的线程。在四线程开发当中,如若劳动者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待买主处理完,才能一连生产数量。同样的道理,即便消费者的拍卖能力超乎生产者,那么消费者就不大概不等待生产者。为了化解这几个标题于是引入了劳动者和消费者形式。

2.1.2 子线程与子进度的不相同 

什么是生产者消费者情势

劳动者消费者方式是通过1个容器来缓解劳动者和消费者的强耦合问题。生产者和消费相互之间不直接通信,而因而阻塞队列来拓展报导,所以生产者生产完数据今后不再等待顾客处理,间接扔给卡住队列,消费者不找生产者要多少,而是一直从绿灯队列里取,阻塞队列就约等于1个缓冲区,平衡了劳动者和买主的处理能力。

最基本的生产者消费者模型的事例。

  1. import queue,threading,time

  2.  

  3. q = queue.Queue(maxsize=10)

  4.  

  5. def Producer():

  6.     count = 1

  7.     while True:

  8.         q.put(“骨头%s”%count)

  9.         print(“生产了骨头”,count)

  10.         count += 1

  11.  

  12. def Consumer(name):

  13.     while q.qsize() > 0:

  14.         print(“[%s]
    取到[%s]并且吃了它…”%(name,q.get()))

  15.         time.sleep(1)

  16.  

  17. p = threading.Thread(target=Producer,)

  1. c1 =
    threading.Thread(target=Consumer,args=(“旺财”,))

  2. c2 =
    threading.Thread(target=Consumer,args=(“来福”,))

  3. p.start()

  4. c1.start()

  5. c2.start()

线程与经过的履行进程比较

1. 依照输出结果比较

澳门葡京备用网址 98澳门葡京备用网址 99

# 线程方式
from threading import Thread

def work():
    print("Hello python!")

if __name__ == '__main__':
    t = Thread(target=work)
    t.start()
    print('主线程/主进程')
'''
执行结果:
Hello python!
主线程/主进程
'''

线程

澳门葡京备用网址 100澳门葡京备用网址 101

# 进程方式
from multiprocessing import Process

def work():
    print("Hello python!")

if __name__ == '__main__':
    t = Process(target=work)
    t.start()
    print('主线程/主进程')
'''
执行结果:
主线程/主进程
Hello python!
'''

进程

注:
相比执行结果,能够观望线程的举办进程>进程的推行进程

2. 依照pid来展开比较

澳门葡京备用网址 102澳门葡京备用网址 103

# 线程方式
# 在主进程下开启多个线程,每个线程都跟主进程的pid一样
from threading import Thread
import os

def work():
    print("Pid: %s" % os.getpid())

if __name__ == '__main__':
    t1 = Thread(target=work)
    t2 = Thread(target=work)
    t3 = Thread(target=work)
    t1.start()
    t2.start()
    t3.start()
    print("主线程/主进程pid: %s" % os.getpid())
'''
执行结果:
Pid: 65652
Pid: 65652
Pid: 65652
主线程/主进程pid: 65652
'''

线程

澳门葡京备用网址 104澳门葡京备用网址 105

# 进程方式
# 开多个进程,每个进程都有不同的pid
from multiprocessing import Process
import os

def work():
    print("Pid: %s" % os.getpid())

if __name__ == '__main__':
    t1 = Process(target=work)
    t2 = Process(target=work)
    t3 = Process(target=work)
    t1.start()
    t2.start()
    t3.start()
    print('主线程/主进程pid: %s' % os.getpid())
'''
主线程/主进程pid: 20484
Pid: 5800
Pid: 67076
Pid: 62244
'''

进程

多线程multiprocessing

multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以使用multiprocessing.Process对象来创立二个进程。该进度可以运作在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start()、run()、join()的措施。其余multiprocessing包中也有Lock、伊夫nt、Semaphore、Condition类(那个目标足以像二十多线程那样,通过参数传递各样进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一些与threading使用同一套API,只不过换成了多进度的境况。

注意:

在UNIX平台上,当某些进度甘休之后,该进度须要被父进度调用wait,否则进度成为僵尸进度(Zombie)。所以,有必不可少对各个Process对象调用join()方法(实际上等同wait)。对于四线程来说,由于唯有壹个进度,所以不存在此要求性。

multiprocessing提供了threading包中尚无的IPC(比如Pipe和Queue),功效上更高。应事先考虑Pipe和Queue,幸免拔取Lock、伊夫nt、Semaphore、Condition等共同方式(因为它们占有的不是用户进度的能源)。

多进度应该防止共享能源。在八线程中,大家可以相比便于地共享能源,比如动用全局变量或传递参数。在多进度意况下,由于种种进度有和好单独的内存空间,以上措施并不适宜。此时大家可以透过共享内存和Manager的法子来共享能源。但诸如此类做增加了先后的复杂度,并因为共同的急需而低沉了程序的功能。

Process.PID中保留有PID,假设经过还未曾start(),则PID为None。

下边种种线程和进度都做相同件事——打印PID。但是,全数的天职在打印的时候都会向同二个专业输出(stdout)输出,那样输出的字符会混合在一齐不能阅读。使用Lock同步,在三个职责输出落成之后,在同意另一个任务输出,可防止止多个任务同时向终点输出。

  1. import os,threading,multiprocessing
  1.  

  2. def info(sign,lock):

  3.     lock.acquire()

  4.     print(sign,os.getpid())

  5.     lock.release()

  6.  

  7. print(“Main:”,os.getpid())

  8.  

  9. if
    __name__ == “__main__”:

  1.     record = []

  2.     lock = threading.Lock()

  3.     for i in
    range(5):

  4.         thread =
    threading.Thread(target=info,args=(“thread”,lock))

  5.         thread.start()

  6.         record.append(thread)

  7.  

  8.     for i in
    record:

  9.         i.join()

  10.     print(“—————“)

  11.     record2 = []

  12.     lock2 = multiprocessing.Lock()

  13.     for i in
    range(5):

  14.         process =
    multiprocessing.Process(target=info,args=(“process”,lock2))

  15.         process.start()

  16.         record.append(process)

  17.  

  18.     for i in
    record2:

  19.         i.join()

具有的Thread的PID都与主程序相同,而各样Process都有3个分歧的PID。

2.1.3 小小的磨炼

勤学苦练一: 八线程并发的socket服务端

澳门葡京备用网址 106澳门葡京备用网址 107

#_*_coding:utf-8_*_
#!/usr/bin/env python
import multiprocessing
import threading

import socket
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.bind(('127.0.0.1',8080))
s.listen(5)

def action(conn):
    while True:
        data=conn.recv(1024)
        print(data)
        conn.send(data.upper())

if __name__ == '__main__':

    while True:
        conn,addr=s.accept()


        p=threading.Thread(target=action,args=(conn,))
        p.start()

多线程并发的socket服务端

服务端

澳门葡京备用网址 108澳门葡京备用网址 109

#_*_coding:utf-8_*_
#!/usr/bin/env python


import socket

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect(('127.0.0.1',8080))

while True:
    msg=input('>>: ').strip()
    if not msg:continue

    s.send(msg.encode('utf-8'))
    data=s.recv(1024)
    print(data)

客户端

客户端

勤学苦练二:
七个职务,3个收取用户输入,一个将用户输入的情节格式化成大写,三个将格式化后的结果存入文件

澳门葡京备用网址 110澳门葡京备用网址 111

from threading import Thread
msg_l=[]
format_l=[]
def talk():
    while True:
        msg=input('>>: ').strip()
        if not msg:continue
        msg_l.append(msg)

def format_msg():
    while True:
        if msg_l:
            res=msg_l.pop()
            format_l.append(res.upper())

def save():
    while True:
        if format_l:
            with open('db.txt','a',encoding='utf-8') as f:
                res=format_l.pop()
                f.write('%s\n' %res)

if __name__ == '__main__':
    t1=Thread(target=talk)
    t2=Thread(target=format_msg)
    t3=Thread(target=save)
    t1.start()
    t2.start()
    t3.start()

示例

进程间通信

差距进程间内存是不共享的,要想已毕五个进度间的数据互换,可以应用Queue和
Pipe。

2.1.4 线程的join与setdaemon

与经过的不二法门类似,其实是multiprocessing模仿threading的接口

from threading import Thread
import time

def work(name):
    time.sleep(2)
    print("%s say hello" % name)

if __name__ == '__main__':
    t = Thread(target=work,args=('shuke',))
    t.setDaemon(True)
    t.start()
    t.join()
    print("主线程")
    print(t.is_alive())
'''
执行结果:
shuke say hello
主线程
False
'''

Pipe

Pipe能够是单向(half-duplex),也足以是双向(duplex)。通过mutiprocessing.Pipe(duplex=False)成立单向管道(专擅认同为双向)。三个进度从pipe一端输入对象,然后被pipe另一端的进度接收,单向管道只同意管道一端的进度输入,而双向管道则允许从两端输入。

  1. from multiprocessing import
    Process,Pipe

  2.  

  3. def f(conn):

  4.     conn.send([42,None,”Hello”])

  5.     conn.close()

  6.  

  7. if
    __name__ == “__main__”:

  1.     parent_conn,child_conn = Pipe()
  1.     p =
    Process(target=f,args=(child_conn,))

  2.     p.start()

  3.     print(parent_conn.recv())

  4.     p.join()

Pipe对象建立的时候,再次回到3个蕴涵八个因素的表,每种成分代表Pipe的一端(Connection对象)。对Pipe的某一端调用send()方法来传送对象,在另一端采纳recv()来接受。

2.1.5 线程的其余措施补充

Thread实例对象的法子

  • isAlive(): 重返线程是还是不是活动的。
  • getName(): 重返线程名。
  • setName(): 设置线程名。

threading模块提供的局地措施

  • threading.currentThread(): 重返当前的线程变量。
  • threading.enumerate():
    重返2个富含正在运维的线程的list。正在运营指线程运转后、截至前,不包涵运维前和平息后的线程。
  • threading.activeCount():
    重临正在运作的线程数量,与len(threading.enumerate())有平等的结果。

    from threading import Thread
    import threading
    import time

    def work():

    time.sleep(2)
    print(threading.current_thread().getName())
    

    # 在主进程下开启线程
    if name == ‘main‘:

    t = Thread(target=work)
    t.start()
    print(threading.current_thread().getName())
    print(threading.current_thread())     # 主线程
    print(threading.enumerate())            # 连同主线程在内有两个运行的线程
    print(threading.active_count())
    print("主线程/主进程")
    

    ”’
    执行结果:
    MainThread
    <_MainThread(MainThread, started 67280)>
    [<_MainThread(MainThread, started 67280)>, ]
    2
    主线程/主进程
    Thread-1
    ”’

Queue

运用情势跟threading里的queue大致。

Queue与Pipe相近似,都以先进先出的结构。但Queue允许多进度放入,两个经过从队列取出对象。Queue使用multiprocessing.Queue(maxsize)创立,maxsize表示队列中得以存放对象的最大数据。

部分历程使用put()在Queue中放入字符串,那个字符串中包括PID和岁月。另2个进度从Queue中取出,并打印自身的PID以及get()的字符串。

  1. from multiprocessing import
    Process,Queue

  2.  

  3. def f(q):

  4.     q.put([44,None,”Hello”])

  5.  

  6. if
    __name__ == “__main__”:

  1.     q = Queue()

  2.     p = Process(target=f,args=(q,))

  1.     p.start()

  2.     print(q.get())

  3.     p.join()

 2.1.6 线程池

参照小说:  

 

共享变量Managers

Python中经过间共享数据,处理主题的Queue和Pipe外,还提供了更高层次的卷入。使用multiprocessing.Managers可以简单地利用这个高级接口。

Manager()重临的manager对象说了算了2个server进度,此进度包涵的python对象可以被其余的进度经过proxies来拜会,从而完成多进度数据通讯且安全。

Manager接济的项目有list、dict、Namespace、Lock、ENCORELock、Semaphore、BoundedSemaphore、Condition、伊芙nt、Queue、Value和Array。

  1. from multiprocessing import
    Manager,Process

  2. import os

  3.  

  4. def f(d,l):

  5.     d[os.getpid()] = os.getpid()

  1.     l.append(os.getpid())

  2.     print(l)

  3.  

  4. if
    __name__ == “__main__”:

  1.     with Manager() as manager:

  2.         d = manager.dict()

  3.         l = manager.list(range(5))

  1.         p_list = []

  2.         for i in
    range(10):

  3.             p =
    Process(target=f,args=(d,l,))

  4.             p.start()

  5.             p_list.append(p)

  6.         for res in p_list:

  7.             res.join()

  8.  

  9.         print(d)

  10.         print(l)

2.2  Python GIL(Global Interpreter Lock)

'''
定义:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple 
native threads from executing Python bytecodes at once. This lock is necessary mainly 
because CPython’s memory management is not thread-safe. (However, since the GIL 
exists, other features have grown to depend on the guarantees that it enforces.)
'''
结论: 在Cpython解释器中,同一个进程下开启的多线程,同一时刻只能有一个线程执行,无法利用多核优势

 

 那篇文章透彻的辨析了GIL对python三二十四线程的熏陶,强烈推荐看浏览:http://www.dabeaz.com/python/UnderstandingGIL.pdf 

 澳门葡京备用网址 112

此间只需清楚: 有了GIL的留存,同目前刻统一进度中只有七个线程被实践。

需求:

我们有两个职责急需处理,处理方式肯定是要玩出并发的效应,消除方案得以是:

方案一:开启多少个经过

方案二:三个历程下,开启多少个线程 

单核情状下,分析结果: 

  假若五个职责是计量密集型,没有多核来并行计算,方案一徒增了创立进度的支付,方案二胜

  假如八个任务是I/O密集型,方案一创设进度的开支大,且经过的切换速度远不如线程,方案二胜

多核情状下,分析结果:

  如果五个职责是精打细算密集型,多核意味着并行总括,在python中1个历程中平等时刻唯有3个线程执行用不上多核,方案一胜

  如若多个职责是I/O密集型,再多的核也化解不了I/O问题,方案二胜

多进度适用于计算密集型任务,可以打开多进度来丰硕利用多核优势,同时出现处理任务。

四线程适用于IO密集型职务,线程之间的开发小,切换速度快,处理速度进步,此时,多核的优势不可以被采纳。

结论:
今后的处理器基本上都以多核,python对于总结密集型的天职开三十二线程的频率并不恐怕拉动多大品质上的升官,甚至不如串行(没有大气切换),不过,对于IO密集型的天职作用依旧有拨云见日升级的。

澳门葡京备用网址 113澳门葡京备用网址 114

#计算密集型
from threading import Thread
from multiprocessing import Process
import os
import time
def work():
    res=0
    for i in range(1000000):
        res+=i

if __name__ == '__main__':
    t_l=[]
    start_time=time.time()
    # for i in range(300): #串行
    #     work()

    for i in range(300):
        t=Thread(target=work) #在我的机器上,4核cpu,多线程大概15秒
        # t=Process(target=work) #在我的机器上,4核cpu,多进程大概10秒
        t_l.append(t)
        t.start()

    for i in t_l:
        i.join()

    stop_time=time.time()
    print('run time is %s' %(stop_time-start_time))

    print('主线程')

测算密集型

澳门葡京备用网址 115澳门葡京备用网址 116

#I/O密集型
from threading import Thread
from multiprocessing import Process
import time
import os
def work():
    time.sleep(2) #模拟I/O操作,可以打开一个文件来测试I/O,与sleep是一个效果
    print(os.getpid())

if __name__ == '__main__':
    t_l=[]
    start_time=time.time()
    for i in range(1000):
        t=Thread(target=work) #耗时大概为2秒
        # t=Process(target=work) #耗时大概为25秒,创建进程的开销远高于线程,而且对于I/O密集型,多cpu根本不管用
        t_l.append(t)
        t.start()

    for t in t_l:
        t.join()
    stop_time=time.time()
    print('run time is %s' %(stop_time-start_time))

I/O密集型 

应用:

八线程用于IO密集型,如socket,爬虫,web
多进度用于计算密集型,如金融分析

 

进程同步

当多少个进程须要访问共享能源的时候,Lock可以用来防止访问的争论。

  1. from multiprocessing import
    Process,Lock

  2.  

  3. def f(l,i):

  4.     l.acquire()

  5.     try:

  6.         print(“hello word”,i)

  7.     finally:

  8.         l.release()

  9.  

  10. if
    __name__ == “__main__”:

  1.     lock = Lock()

  2.     for i in
    range(10):

  3.         Process(target=f,args=(lock,i)).start()

 2.3 同步锁

利用办法与经过锁一样

import time
import threading

num = 100  #设定一个共享变量
# R=threading.Lock()

def addNum():
    global num #在每个线程中都获取这个全局变量
    #num-=1
    # R.acquire()
    temp=num
    time.sleep(0.1)
    num =temp-1  # 对此公共变量进行-1操作
    # R.release()

thread_list = []

for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待所有线程执行完毕
    t.join()

print('Result: ', num)

 锁日常被用来落实对共享能源的一起访问。为每3个共享财富创制三个Lock对象,当你须要拜访该能源时,调用acquire方法来得到锁对象(倘使其余线程已经取得了该锁,则当前线程需等待其被假释),待能源访问完后,再调用release方法释放锁,如下所示:

import threading

R=threading.Lock()

R.acquire()
'''
对公共数据的操作
'''
R.release()

进程池

进程池内部维护二个进度体系,当使用时,则去进度池中赢得二个历程,假设经过池种类中尚无可供使用的进度,那么程序就会等待,直到进程池中有可用的经过截至。

经过池中有五个章程:apply(阻塞)和apply_async(非阻塞)。

apply(func[,args[,kwds]])

apply用于传递不定参数,主进度会阻塞与函数。主进度的实践流程同单进度一致。

apply_async(func[,args[,kwds[,callback]]])

与apply用法一致,但它是非阻塞的且协助结果重回后展开回调。主进度循环运行进度中不等待apply_async的归来结果,在主进度甘休后,即便子进度还未回来整个程序也会脱离。固然apply_async是非阻塞的,但其回到结果的get方法却是阻塞的,如运用result.get()会堵塞主进度。倘诺对回到结果不感兴趣,那么可以在主进度中动用pool.close与pool.join来防护主进度退出。注意join方法肯定要close或terminate之后调用。

  1. from multiprocessing import
    Process,Pool,freeze_support

  2. import time,os

  3.  

  4. def Foo(i):

  5.     time.sleep(2)

  6.     print(“in process”,os.getpid())

  7.     return i + 100

  8.  

  9. def Bar(arg):

  10.     print(“–>exec done:”,arg,os.getpid())

  1.  

  2. if
    __name__ == “__main__”:

  1.     # freeze_support()

  2.     # 允许进度池同时放入二个经过

  1.     pool = Pool(processes=3)

  2.     print(“主进程”,os.getpid())

  3.     for i in
    range(10):

  4.         #
    callback回调函数,各种进程截至的时候调用

  5.         pool.apply_async(func=Foo,args=(i,),callback=Bar)

  1.         # 串行

  2.         #
    pool.apply(func=Foo,args=(i,))

  3.     print(‘end’)

  4.     pool.close()

  5.     #
    进度池中经过执行达成后再关闭,倘诺注释程序直接关门

  6.     pool.join()

close()

闭馆pool,使其不再接受新的职务。

terminate()

截至工作历程,不再处理未处理的义务。

join()

主进程阻塞等待子进度的退出,join方法要在close或terminate之后拔取。

GIL VS Lock

Python已经有一个GIL来保险同临时间只好有多少个线程来实施了,为何那里还索要lock? 

高达共识:锁的目标是为着保证共享的数量,同暂且间只好有一个线程来修改共享的数目

得出结论:体贴不同的多少就活该加分化的锁。

最后,问题就很晴朗了,GIL
与Lock是两把锁,爱惜的多寡差距,前者是解释器级其余(当然维护的就是解释器级其余数据,比如垃圾回收的数量),后者是珍重用户本身付出的应用程序的数目,很强烈GIL不担负那件事,只能用户自定义加锁处理,即Lock。GIL是解释器级其余锁,LOCK是应用程序级别(用户级别)的锁。

详解:

因为Python解释器会自动定期举办内存回收,可以了解为python解释器里有一个单身的线程,每过一段时间它起wake
up做一回全局轮询看看如何内存数据是足以被清空的,此时自身的顺序
里的线程和
py解释器自个儿的线程是并发运营的,如果线程删除了1个变量,py解释器的废品回收线程在清空这些变量的历程中的clearing时刻,大概2个其它线程正好又再一次给那些还没来及得清空的内存空间赋值了,结果就有只怕新赋值的数码被删去了,为了化解类似的标题,python解释器简单粗暴的加了锁,即当二个线程运维时,此外人都不能够动,那样就化解了上述的标题,
 那足以说是Python早期版本的遗留难题。 

 

2.4 死锁与递归锁

死锁:
 是指五个或多个以上的进度或线程在举行进度中,因争夺能源而造成的一种互动等待的光景,若无外力效率,它们都将不大概推进下去。此时称系统处于死锁状态或系列暴发了死锁,那个永恒在互动等待的进度称为死锁进程,如下就是死锁。

澳门葡京备用网址 117澳门葡京备用网址 118

from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('\033[41m%s 拿到A锁\033[0m' %self.name)

        mutexB.acquire()
        print('\033[42m%s 拿到B锁\033[0m' %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('\033[43m%s 拿到B锁\033[0m' %self.name)
        time.sleep(2)

        mutexA.acquire()
        print('\033[44m%s 拿到A锁\033[0m' %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()

'''
Thread-1 拿到A锁
Thread-1 拿到B锁
Thread-1 拿到B锁
Thread-2 拿到A锁
然后就卡住,死锁了
'''

死锁示例

焚林而猎办法,递归锁,在Python中为了扶助在同一线程中再三请求同一财富,python提供了可重入锁途睿欧Lock。

这么些LX570Lock内部维护着多个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被反复require。直到3个线程全数的acquire都被release,其余的线程才能取得财富。上边的例子倘诺采用揽胜Lock代替Lock,则不会时有爆发死锁:

mutexA=mutexB=threading.RLock() #一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,则counter继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止

 

2.5 信号量Semahpore

Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;
计数器无法小于0;当计数器为0时,acquire()将封堵线程直到其余线程调用release()。

实例:(同时唯有七个线程能够博得semaphore,即可以限制最亚松森接数为5)

 

import threading
import time

semaphore = threading.Semaphore(5)  # 设置为5,表示同一时刻可以通过5个线程进行操作

def func():
    if semaphore.acquire():
        print (threading.currentThread().getName() + ' get semaphore')
        time.sleep(2)
        semaphore.release()

for i in range(20):
  t1 = threading.Thread(target=func)
  t1.start()

与进度池是截然两样的概念,进度池Pool(4),最大不得不发出5个进程,而且从始至终同一时半刻刻唯有两个经过存在,不会发生新的,而信号量是发出一堆线程/进度,同一时刻可以经过四个线程/进度展开数量操作。

 

2.6 事件Event

     
线程的多少个要害本性是各类线程都以独立运作且状态不行预测。如果程序中的其他线程需求通过判断有个别线程的情状来鲜明本人下一步的操作,那时线程同步难点就
会变得至极讨厌。为了消除那几个题材,大家需求动用threading库中的伊夫nt对象。
对象涵盖一个可由线程设置的信号标志,它同意线程等待有些事件的爆发。在
开始情状下,伊芙nt对象中的信号标志被设置为假。假使有线程等待三个伊夫nt对象,
而那个伊芙nt对象的注明为假,那么那个线程将会被直接不通直至该标志为真。三个线程假诺将三个伊芙nt对象的信号标志设置为真,它将唤起全体等待那一个伊夫nt对象的线程。假如2个线程等待3个业已被设置为实在伊夫nt对象,那么它将忽略这几个事件,
继续执行。(可以结合实际生活中的红绿灯举办驾驭)

event.isSet():  #返回event的状态值;
event.wait():   #如果 event.isSet()==False将阻塞线程;
event.set():    #设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
event.clear():  #恢复event的状态值为False。

 可以设想一种采取场景(仅仅作为阐明),例如,大家有三个线程从Redis队列中读取数据来拍卖,那些线程都要品尝去连接Redis的劳动,一般情状下,假诺Redis连接不成事,在挨家挨户线程的代码中,都会去品尝再一次连接。若是大家想要在起步时确保Redis服务日常,才让这个工作线程去连接Redis服务器,那么我们就足以选择threading.伊芙nt机制来协调各样工作线程的连日操作:主线程中会去品味连接Redis服务,要是常常的话,触发事件,各工作线程会尝试连接Redis服务。

澳门葡京备用网址 119澳门葡京备用网址 120

import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)

def worker(event):
    logging.debug('Waiting for redis ready...')
    event.wait()
    logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime())
    time.sleep(1)

def main():
    readis_ready = threading.Event()
    t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1')
    t1.start()

    t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2')
    t2.start()

    logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event')
    time.sleep(3) # simulate the check progress
    readis_ready.set()

if __name__=="__main__":
    main()

不了解redis可以参考mysql的例子(一样的道理)

redis示例

澳门葡京备用网址 121澳门葡京备用网址 122

from threading import Thread,Event
import threading
import time,random
def conn_mysql():
    print('\033[41m%s 等待连接mysql。。。\033[0m' %threading.current_thread().getName())
    event.wait()
    print('\033[41mMysql初始化成功,%s开始连接。。。\033[0m' %threading.current_thread().getName())


def check_mysql():
    print('\033[43m正在检查mysql。。。\033[0m')
    time.sleep(random.randint(1,3))
    event.set()
    time.sleep(random.randint(1,3))

if __name__ == '__main__':
    event=Event()
    t1=Thread(target=conn_mysql) #等待连接mysql
    t2=Thread(target=conn_mysql) #等待连接myqsl
    t3=Thread(target=check_mysql) #检查mysql

    t1.start()
    t2.start()
    t3.start()
'''
执行结果:
Thread-1 等待连接mysql。。。
Thread-2 等待连接mysql。。。
正在检查mysql。。。
Mysql初始化成功,Thread-1开始连接。。。
Mysql初始化成功,Thread-2开始连接。。。
'''

mysql示例

threading.伊芙nt的wait方法还收受1个逾期参数,专断认同情形下如若事件相同没有爆发,wait方法会一直不通下去,而参与这些超时参数之后,如果打断时间领先这么些参数设定的值之后,wait方法会重回。对应于下面的运用场景,假诺Redis服务器一致没有运维,大家愿意子线程可以打印一些日志来不断地提醒大家眼下未曾1个可以接连的Redis服务,大家就足以经过设置那几个超时参数来达到那样的目标:

def conn_mysql():
    count=0
    while not e.is_set():
        print('%s 第 <%s> 次尝试' %(threading.current_thread().getName(),count))
        count+=1
        e.wait(0.5)
    print('%s ready to conn mysql' %threading.current_thread().getName())
    time.sleep(1)

澳门葡京备用网址 123澳门葡京备用网址 124

from threading import Thread,Event
import threading
import time,random
def conn_mysql():
    while not event.is_set():
        print('\033[42m%s 等待连接mysql。。。\033[0m' %threading.current_thread().getName())
        event.wait(0.1)
    print('\033[42mMysql初始化成功,%s开始连接。。。\033[0m' %threading.current_thread().getName())

def check_mysql():
    print('\033[41m正在检查mysql。。。\033[0m')
    time.sleep(random.randint(1,3))
    event.set()
    time.sleep(random.randint(1,3))

if __name__ == '__main__':
    event=Event()
    t1=Thread(target=conn_mysql)
    t2=Thread(target=conn_mysql)
    t3=Thread(target=check_mysql)

    t1.start()
    t2.start()
    t3.start()

修订上述mysql版本

完善mysql示例

那样,大家就足以在等待Redis服务运营的同时,看到工作线程太守在等候的意况。

应用: 连接池

 

2.7 条件Condition(了解)

使得线程等待,只有满意某条件时,才假释n个线程

import threading

def run(n):
    con.acquire()
    con.wait()
    print("run the thread: %s" %n)
    con.release()

if __name__ == '__main__':

    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()

    while True:
        inp = input('>>>')
        if inp == 'q':
            break
        con.acquire()
        con.notify(int(inp))
        con.release()

澳门葡京备用网址 125澳门葡京备用网址 126

def condition_func():

    ret = False
    inp = input('>>>')
    if inp == '1':
        ret = True

    return ret


def run(n):
    con.acquire()
    con.wait_for(condition_func)
    print("run the thread: %s" %n)
    con.release()

if __name__ == '__main__':

    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()

示例

 

2.8 定时器Timer

定时器,指定n秒后举办某操作

from threading import Timer

def hello():
    print("hello, world")

t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed

 

2.9 线程queue

queue队列 :使用import queue,用法与经过Queue一样

queue is especially useful in threaded programming when information must
be exchanged safely between multiple threads.

1. class queue.``Queue(maxsize=0)
#先进先出

import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
结果(先进先出):
first
second
third
'''

2. class queue.``LifoQueue(maxsize=0) #last in fisrt
out 

import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
结果(后进先出):
third
second
first
'''

3. class queue.``PriorityQueue(maxsize=0)
#储存数据时可设置优先级的行列

import queue

q=queue.PriorityQueue()
#put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
'''
结果(数字越小优先级越高,优先级高的优先出队):
(10, 'b')
(20, 'a')
(30, 'c')
'''

 

2.10 Python标准模块–concurrent.futures

concurrent.futures模块是在Python3.2中增进的。依照Python的官方文档,concurrent.futures模块提要求开发者一个实践异步调用的高档接口。concurrent.futures基本上就是在Python的threading和multiprocessing模块之上打造的抽象层,更易于使用。尽管那么些抽象层简化了那些模块的应用,不过也下落了许多世故,所以只要您需求处理局地定制化的职务,concurrent.futures恐怕并不符合你。

concurrent.futures包含抽象类Executor,它并不可以平素被接纳,所以您要求动用它的三个子类:ThreadPoolExecutor恐怕ProcessPoolExecutor。正如您所猜的,那两个子类分别对应着Python的threading和multiprocessing接口。那两个子类都提供了池,你可以将线程可能经过放入其中。

 

三. 协程

协程:是单线程下的现身,又称微线程,纤程。英文名Coroutine。一句话表达怎么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序本身决定调度的。

内需强调的是:

  1. python的线程属于基本级其余,即由操作系统控制调度(如单线程一旦遇见io就被迫交出cpu执行权限,切换其余线程运维)
  2. 单线程内打开协程,一旦境遇io,从应用程序级别(而非操作系统)控制切换

对照操作系统控制线程的切换,用户在单线程内决定协程的切换,优点如下:

  1. 协程的切换费用更小,属于程序级其他切换,操作系统完全感知不到,由此特别轻量级
  2. 单线程内就足以兑现产出的效应,最大限度地拔取cpu

要促成协程,关键在于用户程序本身支配程序切换,切换在此之前必须由用户程序本人保留协程上五次调用时的情状,如此,每一遍重复调用时,可以从上次的义务继续执行

(详细的:协程拥有和谐的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其余地点,在切回到的时候,復苏原先保留的寄存器上下文和栈)

为此,大家前边曾经学习过一种在单线程下可以保存程序运营状态的不二法门,即yield,大家来简单复习一下:

  1. yiled可以保留处境,yield的情景保存与操作系统的保存线程状态很像,不过yield是代码级别决定的,更轻量级
  2. send可以把一个函数的结果传给此外1个函数,以此落成单线程内程序之间的切换 

澳门葡京备用网址 127澳门葡京备用网址 128

#不用yield:每次函数调用,都需要重复开辟内存空间,即重复创建名称空间,因而开销很大
import time
def consumer(item):
    # print('拿到包子%s' %item)
    x=11111111111
    x1=12111111111
    x3=13111111111
    x4=14111111111
    y=22222222222
    z=33333333333
    pass

def producer(target,seq):
    for item in seq:
        target(item)    # 每次调用函数,会临时产生名称空间,调用结束则释放,循环100000000次,则重复这么多次的创建和释放,开销非常大

start_time=time.time()
producer(consumer,range(100000000))
stop_time=time.time()
print('run time is:%s' %(stop_time-start_time))     # 13.474999904632568


#使用yield:无需重复开辟内存空间,即重复创建名称空间,因而开销小
import time
def init(func):
    def wrapper(*args,**kwargs):
        g=func(*args,**kwargs)
        next(g)
        return g
    return wrapper

@init
def consumer():
    x=11111111111
    x1=12111111111
    x3=13111111111
    x4=14111111111
    y=22222222222
    z=33333333333
    while True:
        item=yield
        # print('拿到包子%s' %item)
        pass
def producer(target,seq):
    for item in seq:
        target.send(item) #无需重新创建名称空间,从上一次暂停的位置继续,相比上例,开销小

start_time=time.time()
producer(consumer(),range(100000000))
stop_time=time.time()
print('run time is:%s' %(stop_time-start_time))     # 10.674000024795532

以身作则表达

协程的定义(满足1,2,3就可称之为协程):

  1. 必须在唯有二个单线程里一帆风顺产出
  2. 修改共享数据不需加锁
  3. 用户程序里同心协力保留两个控制流的内外文栈
  4. 叠加:3个协程蒙受IO操作自动切换来其余协程(如何促成检测IO,yield、greenlet都心有余而力不足兑现,就用到了gevent模块(select机制)

缺点:

协程的实质是单线程下,不可以运用多核,可以是一个主次开启三个进度,各个进度内打开三个线程,每一种线程内打开协程。

协程指的是单个线程,由此一旦协程出现堵塞,将会卡住整个线程。

 

四. 协程模块greenlet

 greenlet是三个用C完结的协程模块,相比与python自带的yield,它可以使你在任意函数之间自由切换,而不需把那么些函数先申明为generator。

from greenlet import greenlet

def test1():
    print('test1,first')
    gr2.switch()
    print('test1,sencod')
    gr2.switch()
def test2():
    print('test2,first')
    gr1.switch()
    print('test2,sencod')


gr1=greenlet(test1)
gr2=greenlet(test2)
gr1.switch()
'''
执行结果:
test1,first
test2,first
test1,sencod
test2,sencod
'''

澳门葡京备用网址 129澳门葡京备用网址 130

from greenlet import greenlet
def eat(name):
    print('%s eat fruit apple' %name)
    gr2.switch('shuke')
    print('%s eat fruit banana' %name)
    gr2.switch()
def play_phone(name):
    print('%s play basketbal' %name)
    gr1.switch()
    print('%s play football' %name)

gr1=greenlet(eat)
gr2=greenlet(play_phone)
gr1.switch(name='jack')  #可以在第一次switch时传入参数,以后都不需要

'''
执行结果:
jack eat fruit apple
shuke play basketbal
jack eat fruit banana
shuke play football
'''

switch传参

唯有的切换(在没有io的情况下或许没有重新开发内存空间的操作),反而会稳中有降程序的履行进程。

澳门葡京备用网址 131澳门葡京备用网址 132

#顺序执行
import time
def f1():
    res=0
    for i in range(10000000):
        res+=i

def f2():
    res=0
    for i in range(10000000):
        res*=i


start_time=time.time()
f1()
f2()
stop_time=time.time()
print('run time is: %s' %(stop_time-start_time)) #1.7395639419555664


#切换
from greenlet import greenlet
import time
def f1():
    res=0
    for i in range(10000000):
        res+=i
        gr2.switch()


def f2():
    res=0
    for i in range(10000000):
        res*=i
        gr1.switch()

gr1=greenlet(f1)
gr2=greenlet(f2)

start_time=time.time()
gr1.switch()
stop_time=time.time()
print('run time is: %s' %(stop_time-start_time)) #7.789067983627319

示例

greenlet只是提供了一种比generator尤其方便的切换方式,依旧是尚未缓解碰着IO自动切换的标题。

 

五. gevent模块(单线程并发)

Gevent
是3个第1方库,可以轻松通过gevent完成产出同步或异步编程,在gevent中用到的要害情势是Greenlet,
它是以C增加模块方式接入Python的轻量级协程。
格林let全体周转在主程序操作系统进度的内部,但它们被合作式地调度。

g1=gevent.spawn()成立二个协程对象g1,spawn括号内首先个参数是函数名,如eat,后边可以有八个参数,能够是岗位实参或紧要字实参,都是传给函数eat的。

遇上IO阻塞时会自动切换职分

import gevent

def eat():
    print('eat food 1')
    gevent.sleep(2)     # 等饭来
    print('eat food 2')

def play_phone():
    print('play phone 1')
    gevent.sleep(1)     # 网卡了
    print('play phone 2')

# gevent.spawn(eat)
# gevent.spawn(play_phone)
# print('主')  # 直接结束

# 因而也需要join方法,进程或线程的jion方法只能join一个,而gevent的join方法可以join多个
g1=gevent.spawn(eat)
g2=gevent.spawn(play_phone)
gevent.joinall([g1,g2])
print('主')

'''
执行结果:
eat food 1
play phone 1
play phone 2
eat food 2
主
'''

注:
上例中gevent.sleep(2)模拟的是gevent能够识其他io阻塞,而time.sleep(2)或其余的梗塞,gevent是不大概直接识其余,此时就必要展开打补丁,将卡住设置为gevent可以识其他IO阻塞。

平日的写法为,在文件的起来,如下

from gevent import monkey;monkey.patch_all()
import gevent
import time

澳门葡京备用网址 133澳门葡京备用网址 134

from gevent import monkey;monkey.patch_all()

import gevent
import time


def eat():
    print('eat food 1')
    time.sleep(2)
    print('eat food 2')

def play_phone():
    print('play phone 1')
    time.sleep(1)
    print('play phone 2')



g1=gevent.spawn(eat)
g2=gevent.spawn(play_phone)
gevent.joinall([g1,g2])
print('主')

示例

联合与异步

概念:

共同和异步的概念对于许两人的话是二个模糊的定义,是一种似乎只好意会不可以言传的东西。其实大家的生活中存在着众多联名异步的例子。比如:你叫本人去吃饭,作者听见了就立刻和您去用餐,借使大家有视听,你就会直接叫本人,直到作者听到和您一只去用餐,那几个过程叫联合;异步进程指你叫小编去就餐,然后你就去吃饭了,而任由作者是还是不是和您一起去就餐。而自我获取音信后或然立马就走,也说不定过段时间再走。假设本身请你吃饭,就是一道,假使您请俺吃饭就用异步,那样您比较省钱。哈哈哈。。。

import gevent

def task(pid):
    """
    Some non-deterministic task
    """
    gevent.sleep(0.5)
    print('Task %s done' % pid)

def synchronous():
    for i in range(1, 10):
        task(i)

def asynchronous():
    threads = [gevent.spawn(task, i) for i in range(10)]
    gevent.joinall(threads)

print('Synchronous:')
synchronous()       # 同步

print('Asynchronous:')
asynchronous()      # 异步

地点程序的基本点片段是将task函数封装到格林let内部线程的gevent.spawn
开端化的greenlet列表存放在数组threads中,此列表被传给gevent.joinall 函数,后者阻塞当前流程,并施行全部给定的greenlet。执行流程只会在
全数greenlet执行完后才会三番三次向下走。

#gevent线程的一些用法
g1=gevent.spawn(func,1,,2,3,x=4,y=5)
g2=gevent.spawn(func2)
g1.join() #等待g1结束
g2.join() #等待g2结束
#或者上述两步合作一步:gevent.joinall([g1,g2])
g1.value#拿到func1的返回值

澳门葡京备用网址 135澳门葡京备用网址 136

from gevent import monkey;monkey.patch_all()
import gevent
import requests
import time

def get_page(url):
    print('GET: %s' %url)
    response=requests.get(url)
    if response.status_code == 200:
        print('%d bytes received from %s' %(len(response.text),url))


start_time=time.time()
gevent.joinall([
    gevent.spawn(get_page,'https://www.python.org/'),
    gevent.spawn(get_page,'https://www.yahoo.com/'),
    gevent.spawn(get_page,'https://github.com/'),
])
stop_time=time.time()
print('run time is %s' %(stop_time-start_time))

协程应用:爬虫

协程应用(爬虫)

透过gevent已毕单线程下的socket并发(from gevent import
monkey;monkey.patch_all()一定要松开导入socket模块从前,否则gevent无法甄别socket的短路)

澳门葡京备用网址 137澳门葡京备用网址 138

from gevent import monkey;monkey.patch_all()
from socket import *
import gevent

#如果不想用money.patch_all()打补丁,可以用gevent自带的socket
# from gevent import socket
# s=socket.socket()

def server(server_ip,port):
    s=socket(AF_INET,SOCK_STREAM)
    s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    s.bind((server_ip,port))
    s.listen(5)
    while True:
        conn,addr=s.accept()
        gevent.spawn(talk,conn,addr)

def talk(conn,addr):
    try:
        while True:
            res=conn.recv(1024)
            print('client %s:%s msg: %s' %(addr[0],addr[1],res))
            conn.send(res.upper())
    except Exception as e:
        print(e)
    finally:
        conn.close()

if __name__ == '__main__':
    server('127.0.0.1',8080)

服务端

澳门葡京备用网址 139澳门葡京备用网址 140

#_*_coding:utf-8_*_
__author__ = 'Linhaifeng'

from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))

客户端

澳门葡京备用网址 141澳门葡京备用网址 142

from threading import Thread
from socket import *
import threading

def client(server_ip,port):
    c=socket(AF_INET,SOCK_STREAM)
    c.connect((server_ip,port))

    count=0
    while True:
        c.send(('%s say hello %s' %(threading.current_thread().getName(),count)).encode('utf-8'))
        msg=c.recv(1024)
        print(msg.decode('utf-8'))
        count+=1
if __name__ == '__main__':
    for i in range(500):
        t=Thread(target=client,args=('127.0.0.1',8080))
        t.start()

二十四线程并发多个客户端

 

六. 综合使用

粗略主机批量管理工具

需求:

  1. 长机分组
  2. 主机音讯配置文件用configparser解析
  3. 可批量执行命令、发送文书,结果实时再次回到,执行格式如下 
    1. batch_run  -h h1,h2,h3   -g web_clusters,db_servers    -cmd
       “df -h” 
    2. batch_scp   -h h1,h2,h3   -g web_clusters,db_servers  -action
      put  -local test.py  -remote /tmp/ 
  4. 主机用户名密码、端口可以不一样
  5. 履行远程命令使用paramiko模块
  6. 批量命令需拔取multiprocessing并发

code: https://github.com/shuke163/learnpy/tree/master/homework/day09/managetool 

 

 

 

 

 

 

 

比较学习参考:

  1.   https://tracholar.github.io/wiki/python/python-multiprocessing-tutorial.html

2.
  

 

 

 

 

 

 

相关文章

发表评论

电子邮件地址不会被公开。 必填项已用*标注

*
*
Website