GVKun编程网logo

多进程或Python中的线程?(python中多进程和多线程的区别)

16

如果您对多进程或Python中的线程?感兴趣,那么本文将是一篇不错的选择,我们将为您详在本文中,您将会了解到关于多进程或Python中的线程?的详细内容,我们还将为您解答python中多进程和多线程的

如果您对多进程或Python中的线程?感兴趣,那么本文将是一篇不错的选择,我们将为您详在本文中,您将会了解到关于多进程或Python中的线程?的详细内容,我们还将为您解答python中多进程和多线程的区别的相关问题,并且为您提供关于PyQt应用程序中的线程:使用Qt线程还是Python线程?、python ftp文件线程或多进程、Python 中的线程和进程、Python 多线程、多进程 (三)之 线程进程对比、多进程的有价值信息。

本文目录一览:

多进程或Python中的线程?(python中多进程和多线程的区别)

多进程或Python中的线程?(python中多进程和多线程的区别)

我有一个python应用程序,它可以捕获数据集合,并针对该集合中的每个数据执行任务。由于涉及到延迟,因此该任务需要一些时间才能完成。由于这种延迟,我不希望每个数据都随后执行任务,而是希望它们全部并行发生。我应该使用多进程吗?或执行此操作的线程?

我尝试使用线程,但遇到了一些麻烦,通常某些任务实际上不会执行。

PyQt应用程序中的线程:使用Qt线程还是Python线程?

PyQt应用程序中的线程:使用Qt线程还是Python线程?

我正在编写一个GUI应用程序,该应用程序通过Web连接定期检索数据。由于此检索需要一段时间,因此这会导致UI在检索过程中无响应(无法拆分成较小的部分)。这就是为什么我想将Web连接外包给一个单独的工作线程。

[是的,我知道,现在我有两个问题。]

无论如何,该应用程序使用PyQt4,所以我想知道更好的选择是:使用Qt的线程还是使用Python threading模块?各自的优点/缺点是什么?还是您有完全不同的建议?

尽管在我的特定情况下,解决方案可能会使用非阻塞网络请求,例如Jeff Ober和LukášLalinský建议的(所以基本上将并发性问题留给了网络实现),但我仍然希望深入回答一般问题:

与本地Python线程(来自threading模块)相比,使用PyQt4(即Qt)线程有什么优缺点?

编辑2:谢谢大家的回答。尽管没有达成100%的协议,但似乎普遍的共识是答案是“使用Qt”,因为这样做的优点是与库的其余部分集成在一起,而没有造成真正的不利。

对于希望在这两种线程实现之间进行选择的任何人,我强烈建议他们阅读此处提供的所有答案,包括方丈链接到的PyQt邮件列表线程。

我考虑了一些悬赏的答案;最后,我选择了方丈作为非常相关的外部参考。然而,这是一个密切的电话。

再次感谢。

答案1

小编典典

大致相同。主要区别在于QThreads与Qt(异步信号/插槽,事件循环等)更好地集成在一起。另外,您不能在Python线程中使用Qt(例如,不能通过QApplication.postEvent将事件发布到主线程):您需要一个QThread才能工作。

一般的经验法则是,如果您要以某种方式与Qt进行交互,则可以使用QThreads;否则,请使用Python线程。

PyQt的作者对此问题也发表了一些较早的评论:“它们都是相同的本机线程实现的包装器”。两种实现都以相同的方式使用GIL。

python ftp文件线程或多进程

python ftp文件线程或多进程

我们不断从数据库中生成数据文件,然后通过FTP’D将数据文件传输到其他位置。我们有一个极化器,它不断监视此文件夹,并在它看到文件后立即将其作为FTP的文件

当前,此过程是顺序的,当文件变大时,会有备份,这会导致严重的延迟。我想并行处理事情,即一次处理多个文件。我不确定线程​​/多处理以及它如何为我工作。

这是我的代码

import ftplibftp = ftplib.FTP(''domainname.com'')ftp.login("username","password")infiles = [file1,file2,file3.....filen]for x in infiles:    f = open(x,''rb'')    ftp.storbinary(''STOR %s'' %x, f)    f.close()ftp.quit()

我在想,因为这是I / O密集型工作,所以多处理是必经之路,有关如何进行的任何想法。

答案1

小编典典

multiprocessing.Pool作为更高级别的界面,您可能会发现有用。

from multiprocessing import Pooldef upload(file):     ftp = ftplib.FTP(''domainname.com'')    ftp.login("username","password")    f = open(x,''rb'')    ftp.storbinary(''STOR %s'' %x, f)    f.close()    ftp.quit()infiles = [file1,file2,file3.....filen]pool = Pool(10) # submit 10 at oncepool.map(upload,infiles)

很好,因为map其行为类似于内置函数。对于调试,只需替换pool.map->即可map

Python 中的线程和进程

Python 中的线程和进程

引入进程和线程的概念及区别

threading 模块提供的类:  
  Thread, Lock, Rlock, Condition, [Bounded]Semaphore, Event, Timer, local。

1. 什么是进程

计算机程序只不过是磁盘中可执行的二进制(或其他类型)的数据。它们只有在被读取到内存中,被操作系统调用的时候才开始它们的生命期。

进程(有时被称为重量级进程)是程序的一次执行。每个进程都有自己的地址空间、内存、数据栈及其它记录其运行轨迹的辅助数据。

操作系统管理在其上运行的所有进程,并为这些进程公平的分配时间,进程也可以通过 fork 和 spawn 操作来完成其它的任务。

不过各个进程有自己的内存空间、数据栈等,所以只能使用进程间通讯,而不能直接共享信息

2. 线程的基本概念

线程是进程中执行运算的最小单位,是进程中的一个实体,是被系统独立调度和分派的基本单位,线程自己不拥有系统资源,只拥有一点在运行中必不可少的资源,但它可与同属一个进程的其它线程共享进程所拥有的全部资源。一个线程可以创建和撤消另一个线程,同一进程中的多个线程之间可以并发执行。

 

3、线程和进程的关系以及区别?

** 进程和线程的关系:**

  • (1)一个线程只能属于一个进程,而一个进程可以有多个线程,但至少有一个线程。

  • (2)资源分配给进程,同一进程的所有线程共享该进程的所有资源。

  • (3)处理机分给线程,即真正在处理机上运行的是线程

  • (4)线程在执行过程中,需要协作同步。不同进程的线程间要利用消息通信的办法实现同步。线程是指进程内的一个执行单元,也是进程内的可调度实体.

进程与线程的区别:

  • (1)调度:线程作为调度和分配的基本单位,进程作为拥有资源的基本单位

  • (2)并发性:不仅进程之间可以并发执行,同一个进程的多个线程之间也可并发执行

  • (3)拥有资源:进程是拥有资源的一个独立单位,线程不拥有系统资源,但可以访问隶属于进程的资源.

  • (4)系统开销:在创建或撤消进程时,由于系统都要为之分配和回收资源,导致系统的开销明显大于创建或撤消线程时的开销。

1 多进程创建方式

可以归纳为三种:fork,multiprocessing 以及进程池 Pool。

(1) fork 方式

1 import os
2 
3 # 注意,fork函数,只在Unix/Linux/Mac上运行,windows不可以
4 pid = os.fork()
5 
6 if pid == 0:
7     print(''哈哈1'')
8 else:
9     print(''哈哈2'')

注意:fork () 函数只能在 Unix/Linux/Mac 上面运行,不可以在 Windows 上面运行。

说明:

  • 程序执行到 os.fork () 时,操作系统会创建一个新的进程(子进程),然后复制父进程的所有信息到子进程中
  • 然后父进程和子进程都会从 fork () 函数中得到一个返回值,在子进程中这个值一定是 0,而父进程中是子进程的 id 号

在 Unix/Linux 操作系统中,提供了一个 fork () 系统函数,它非常特殊。

普通的函数调用,调用一次,返回一次,但是 fork () 调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。

子进程永远返回 0,而父进程返回子进程的 ID。

这样做的理由是,一个父进程可以 fork 出很多子进程,所以,父进程要记下每个子进程的 ID,而子进程只需要调用 getppid () 就可以拿到父进程的 ID。我们可以通过 os.getpid () 获取当前进程 ID,通过 os.getppid () 获取父进程 ID。

那么,父子进程之间的执行有顺序吗?

答案是没有!这完全取决于操作系统的调度算法。

而多次 fork () 就会产生一个树的结构:

(2)multiprocessing 方式

如果你打算编写多进程的服务程序,Unix/Linux 无疑是正确的选择。由于 Windows 没有 fork 调用,难道在 Windows 上无法用 Python 编写多进程的程序?当然可以!由于 Python 是跨平台的,自然也应该提供一个跨平台的多进程支持。multiprocessing 模块就是跨平台版本的多进程模块。

 1 import  os
 2 import time
 3 
 4 from multiprocessing import  Process
 5 
 6 def run_proc(name):
 7     print(''子进程运行中,name%s,pin=%d...''%(name,os.getpid()))
 8 
 9     time.sleep(10)
10     print(''子进程已经结束'')
11 
12 if __name__==''__main__'':
13     print(''父进程%d.''%os.getpid())
14     p=Process(target=run_proc,args=(''test'',))
15     print(''子进程将要执行'')
16     p.start()

从结果我们看出,只要通过 start () 开启了子进程之后,主进程会等待子进程执行完才结束!

Process 的语法结构如下:

import  os
import time
from multiprocessing import Process
class MyProcess(Process):
    def __init__(self,name):
        Process.__init__(self)
        self.name=name

    def run(self):
        print(''子进程运行中,name= %s ,pid=%d...'' % (self.name, os.getpid()))
        import time
        time.sleep(10)
        print(''子进程已结束'')
        

if __name__=="__main__":
    my=MyProcess(''test'')
    my.start()

(3)Pool 方式

#coding=utf-8
from multiprocessing import Pool
import os, time, random

def worker(msg):
    print("%s开始执行,进程号为%d"%(msg, os.getpid()))
    time.sleep(1)
    print "%s执行完毕"%(msg)

if __name__ == ''__main__'':
    po = Pool(3)  # 定义一个进程池,最大进程数3
    for i in range(10):
        # Pool.apply_async(要调用的目标,(传递给目标的参数元祖,))
        # 每次循环将会用空闲出来的子进程去调用目标
        po.apply_async(worker, (i,))

    print("----start----")
    po.close()  # 关闭进程池,关闭后po不再接收新的请求
    po.join()  # 等待po中所有子进程执行完成,必须放在close语句之后
    print("-----end-----")

实现一个多进程下的文件夹复制功能:

 1 #coding=utf-8
 2 
 3 import os
 4 from multiprocessing import Pool
 5 
 6 
 7 def copyFileTask(name, oldFolderName, newFolderName):
 8     # 完成copy一个文件的功能
 9     fr = open(oldFolderName+"/"+name, ''rb+'')
10     fw = open(newFolderName+"/"+name, ''wb+'')
11 
12     str = fr.read(1024 * 5)
13     while (str != ''''):
14         fw.write(str)
15         str = fr.read(1024 * 5)
16 
17     fr.close()
18     fw.close()
19 
20 
21 def main():
22     # 获取要copy的文件夹名字
23     oldFolderName = raw_input(''请输入文件夹名字:'')
24     # 创建一个文件夹
25     newFolderName = oldFolderName+''-复件''.decode(''utf-8'').encode(''gbk'')
26     os.mkdir(newFolderName)
27 
28     #获取old文件夹里面所有文件的名字
29     fileNames = os.listdir(oldFolderName)
30 
31     #使用多进程的方式copy原文件夹所有内容到新的文件夹中
32     pool = Pool(5)
33     for name in fileNames:
34         pool.apply_async(copyFileTask, (name, oldFolderName, newFolderName))
35 
36     pool.close()
37     pool.join()
38 
39 if __name__ == ''__main__'':
40     main()
View Code

 
(4) python 进程间通信 - Queue

 

from multiprocessing import  Queue,Process
import  time

def write(q):
    for i in ["A","B","C","D","E"]:
        print(''向队列中添加%s''%i)
        q.put(i)
        #print(id(q))
        time.sleep(1)

def read(q):
    #print(q.empty())
    while not q.empty():
        print("从队列中取出来的值是%s"%q.get())
        time.sleep(1)


if __name__ == ''__main__'':
    q=Queue()#创建一个queue队列,当做参数输入子进程中,这样子进程能拿到数据资源
    qw=Process(target=write,args=(q,))
    qw.start()
    qw.join(0.1)

    qr=Process(target=read,args=(q,))
    qr.start()
    qr.join()
    print("通信完毕")
Queue-1

 

(1) 在实例化 Queue 类,可以传递最大消息数,如 q = Queue (5),这段代码是指只允许消息队列中最大有 5 个消息数据。如果不加最大消息数或数量为负值,则表达不限制数量直到占满内存; 
(2) Queue.qsize ():返回当前队列包含的消息数量; 
(3) Queue.empty ():如果队列为空,返回 True,反之 False ; 
(4) Queue.full ():如果队列满了,返回 True, 反之 False; 
(5) Queue.get ([block [, timeout]]):获取队列中的一条消息,然后将其从列队中移除,block 默认值为 True; 
1)如果 block 使用默认值,且没有设置 timeout(单位秒),消息列队如果为空,此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为止,如果设置了 timeout,则会等待 timeout 秒,若还没读取到任何消息,则抛出”Queue.Empty” 异常; 
2)如果 block 值为 False,消息列队如果为空,则会立刻抛出”Queue.Empty” 异常; 
(6) Queue.get_nowait ():相当 Queue.get (False); 
(7) Queue.put (item,[block [, timeout]]):将 item 消息写入队列,block 默认值为 True; 
1)如果 block 使用默认值,且没有设置 timeout(单位秒),消息列队如果已经没有空间可写入,此时程序将被阻塞(停在写入状态),直到从消息列队腾出空间为止,如果设置了 timeout,则会等待 timeout 秒,若还没空间,则抛出”Queue.Full” 异常; 
2)如果 block 值为 False,消息列队如果没有空间可写入,则会立刻抛出”Queue.Full” 异常; 
(8) Queue.put_nowait (item):相当 Queue.put (item, False);

二:进程池 (pool) 中的 Queue

如果要使用 Pool 创建进程,就需要使用 multiprocessing.Manager () 中的 Queue (),而不是 multiprocessing.Queue (),否则会得到一条如下的错误信息: 
RuntimeError: Queue objects should only be shared between processes through inheritance.

from multiprocessing import Manager,Pool
import time

def write(q):
    for i in ["A","B","C","D","E"]:
        print("向队列中添加%s"%i)
        q.put(i)
        time.sleep(1)

def read(q):
    while not q.empty():
        print("从队列中取出的值是%s"%q.get())
        time.sleep(1)

if __name__ == ''__main__'':
    q = Manager().Queue()
    pool = Pool()
    pool.apply_async(write,args=(q,))
    pool.apply_async(read,args=(q,))
    # pool.apply(write,args=(q,))
    # pool.apply(read,args=(q,))

    pool.close()
    pool.join()

    print("数据通信完毕")
Queue-pool

(5) 管道 pipe

 

pipe() 返回两个连接对象代表 pipe 的两端。每个连接对象都有 send() 方法和 recv() 方法。

 

但是如果两个进程或线程对象同时读取或写入管道两端的数据时,管道中的数据有可能会损坏。

 

当进程使用的是管道两端的不同的数据则不会有数据损坏的风险。

 

使用方法:

from multiprocessing import  Process,Pipe
def f(conn):
    conn.send(''约吗'')
    conn.send(''约吗.....'')
    print(conn.recv())#接收数据
    conn.close()

if __name__=="__main__" :
    parent_conn,chid_conn=Pipe()
    p=Process(target=f,args=(chid_conn,))
    p.start()
    print(parent_conn.recv())#接收数据
    print(parent_conn.recv())
    parent_conn.send(''约啊'')#发送数据
View Code

(6)Manager

 进程间的通信 Queue () 和 Pipe (), 可以实现进程间的数据传递。但是要使 python 进程间共享数据,我们就要使用 multiprocessing.Manager。

  Manager () 返回的 manager 对象控制了一个 server 进程,此进程包含的 python 对象可以被其他的进程通过 proxies 来访问。从而达到多进程间数据通信且安全。

  Manager 支持 list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value 和 Array。

from multiprocessing import Process, Manager
import os
 
 
def f(d, l):
    d[os.getpid()] = os.getpid()   # 获取进程号
    l.append(os.getpid())
    print(l)
 
 
if __name__ == ''__main__'':
    with Manager() as manager:
        d = manager.dict()   # 生成一个字典,可以在多个进程间共享和传递
        l = manager.list(range(5))  # 生成一个列表,可以在多个进程间共享和传递
        p_list = []
        for i in range(10):
            p = Process(target=f, args=(d, l))
            p.start()
            p_list.append(p)
        for res in p_list:
            res.join()
 
        print(d)
        print(l)
View Code
[0, 1, 2, 3, 4, 16296]
[0, 1, 2, 3, 4, 16296, 20360]
[0, 1, 2, 3, 4, 16296, 20360, 20772]
[0, 1, 2, 3, 4, 16296, 20360, 20772, 21260]
[0, 1, 2, 3, 4, 16296, 20360, 20772, 21260, 20936]
[0, 1, 2, 3, 4, 16296, 20360, 20772, 21260, 20936, 18136]
[0, 1, 2, 3, 4, 16296, 20360, 20772, 21260, 20936, 18136, 21184]
[0, 1, 2, 3, 4, 16296, 20360, 20772, 21260, 20936, 18136, 21184, 20552]
[0, 1, 2, 3, 4, 16296, 20360, 20772, 21260, 20936, 18136, 21184, 20552, 21076]
[0, 1, 2, 3, 4, 16296, 20360, 20772, 21260, 20936, 18136, 21184, 20552, 21076, 21204]
{16296: 16296, 20360: 20360, 20772: 20772, 21260: 21260, 20936: 20936, 18136: 18136, 21184: 21184, 20552: 20552, 21076: 21076, 21204: 21204}
[0, 1, 2, 3, 4, 16296, 20360, 20772, 21260, 20936, 18136, 21184, 20552, 21076, 21204]
输出结果

进程间的数据默认不共享,那 Manager () 如何来同步数据?manager 对象控制了一个 server 进程,这个对象有共享数据和让其他进程能访问数据的代理。一个进程修改了数据后,其实并不会通过 manager 传送数据,作为代理而言,它不知道数据被改变了。为了在共享数据中同步修改的内容,需要在这些代理的容器中,重新声明这个修改过的内容。

 

 

2. Python 中的线程

Python 的标准库提供了两个模块:thread 和 threading,thread 是低级模块,threading 是高级模块,对 thread 进行了封装。绝大多数情况下,我们只需要使用 threading 这个高级模块。

from time import  ctime,sleep
import threading

def music(func):
    print(threading.current_thread())
    for i in range(2):
        print(''I was listening to %s. %s''%(func,ctime()))
        sleep(2)
        print(''end listing %s''%ctime())

t1=threading.Thread(target=music,args=(''星晴'',))#创建线程,traget执行的函数,args函数参数
t1.start()#开启线程
更多方法:
  • start 线程准备就绪,等待 CPU 调度
  • setName 为线程设置名称
  • getName 获取线程名称
  • setDaemon 设置为后台线程或前台线程(默认); 如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止;如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
  • join 逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
  • run 线程被 cpu 调度后自动执行线程对象的 run 方法
  • Lock 线程锁 (互斥锁 Mutex)
  • Event

线程交互执行,Join (),Daemon :

1、join () 方法:主线程 A 中,创建了子线程 B,并且在主线程 A 中调用了 B.join (),那么,主线程 A 会在调用的地方等待,直到子线程 B 完成操作后,才可以接着往下执行,那么在调用这个线程时可以使用被调用线程的 join 方法。

2、setDaemon () 方法。主线程 A 中,创建了子线程 B,并且在主线程 A 中调用了 B.setDaemon (), 这个的意思是,把主线程 A 设置为守护线程,这时候,要是主线程 A 执行结束了,就不管子线程 B 是否完成,一并和主线程 A 退出。这就是 setDaemon 方法的含义,这基本和 join 是相反的。此外,还有个要特别注意的:必须在 start () 方法调用之前设置,如果不设置为守护线程,程序会被无限挂起。

from time import  ctime,sleep
import threading

def music(func):
    print(threading.current_thread())
    for i in range(2):
        print(''I was listening to %s. %s''%(func,ctime()))
        sleep(2)
        print(''end listing %s''%ctime())
def move(func):
    print(threading.current_thread())
    for i in range(2):
        print(''I was at the %s ! %s''%(func,ctime()))
        sleep(3)
        print(''end moving %s'' % ctime())

threads=[]
t1=threading.Thread(target=music,args=(''星晴'',))
threads.append(t1)
t2=threading.Thread(target=move,args=(''正义联盟'',))
threads.append(t2)

#join()
if __name__=="__main__":
    # music(u''七里香'')
    # move(u''功夫'')
    for t in threads:
        t.start()
    t.join()#等价于t2.join,目的是为了最后执行最后一条打印信息,join 逐个执行每个线程,执行完毕后继续往下执行,
    #只有当所有线程执行完毕后才会执行下一个
    print(''all over %s''%ctime())

 

from time import  ctime,sleep
import threading

def music(func):
    #print(threading.current_thread())#线程对象
    for i in range(2):
        print(''I was listening to %s. %s''%(func,ctime()))
        sleep(2)
        print(''end listing %s''%ctime())
def move(func):
    #print(threading.current_thread())#线程对象
    for i in range(2):
        print(''I was at the %s ! %s''%(func,ctime()))
        sleep(3)
        print(''end moving %s'' % ctime())

threads=[]
t1=threading.Thread(target=music,args=(''星晴'',))
threads.append(t1)
t2=threading.Thread(target=move,args=(''正义联盟'',))
threads.append(t2)


#Daemon(守护进程)将主线程设置为Daemon线程,它退出时,其它子线程会同时退出,不管是否执行完任务。
if __name__=="__main__":
    t2.setDaemon(True)#只守护t2,但不守护t1,t1会全部执行完,同时也会引发t2执行,但t2不会执行完,t1执行完,就退出
    for t in threads:
        #t.setDaemon(True)#(执行一次后就退出)
        t.start()


    #print(threading.current_thread())#线程对象
    #print(threading.active_count())#线程对象的数量(默认会有一条主线程)
    print(''all over %s''%ctime())

线程继承实例化:

 

import threading
import time
#t1=threading.Thread(target=music,args=(''星晴'',))
#继承式调用
class MyThread(threading.Thread):
    def __init__(self,num):
        threading.Thread.__init__(self)
        self.num=num
    def run(self):#定义每个线程要运行的函数
        print(''running on number:%s''%self.num)
        time.sleep(3)


if __name__==''__main__'':
    t1=MyThread(1)
    t2=MyThread(2)
    t1.start()
    t2.start()

 3. 线程锁 (互斥锁 Mutex)

1. 创建锁:mutex = threading.Lock ()  2. 锁定:mutex.acquire ([timeout]) 3. 释放:mutex.release ()

import time
import threading

def addNum():
    global num #在每个线程中都获取这个全局变量
    print(''--get num:'',num )
    time.sleep(1)
    num  -=1 #对此公共变量进行-1操作

num = 100  #设定一个共享变量
thread_list = []
for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待所有线程执行完毕
    t.join()


print(''final num:'', num )

正常来讲,这个 num 结果应该是 0, 但在 python 2.7 上多运行几次,会发现,最后打印出来的 num 结果不总是 0,为什么每次运行的结果不一样呢? 哈,很简单,假设你有 A,B 两个线程,此时都 要对 num 进行减 1 操作, 由于 2 个线程是并发同时运行的,所以 2 个线程很有可能同时拿走了 num=100 这个初始变量交给 cpu 去运算,当 A 线程去处完的结果是 99,但此时 B 线程运算完的结果也是 99,两个线程同时 CPU 运算的结果再赋值给 num 变量后,结果就都是 99。那怎么办呢? 很简单,每个线程在要修改公共数据时,为了避免自己在还没改完的时候别人也来修改此数据,可以给这个数据加一把锁, 这样其它线程想修改此数据时就必须等待你修改完毕并把锁释放掉后才能再访问此数据。

注:不要在 3.x 上运行,不知为什么,3.x 上的结果总是正确的,可能是自动加了锁

加锁版本:

import time
import threading

def addNum():
    global num #在每个线程中都获取这个全局变量
    print(''--get num:'',num )
    time.sleep(1)
    lock.acquire() #修改数据前加锁
    num  -=1 #对此公共变量进行-1操作
    lock.release() #修改后释放

num = 100  #设定一个共享变量
thread_list = []
lock = threading.Lock() #生成全局锁
for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待所有线程执行完毕
    t.join()

print(''final num:'', num )

 4. 死锁和递归锁

 

所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,

 

它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,

 

如下就是死锁:

from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print(''\033[41m%s 拿到A锁\033[0m'' %self.name)

        mutexB.acquire()
        print(''\033[42m%s 拿到B锁\033[0m'' %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print(''\033[43m%s 拿到B锁\033[0m'' %self.name)
        time.sleep(2)

        mutexA.acquire()
        print(''\033[44m%s 拿到A锁\033[0m'' %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == ''__main__'':
    for i in range(5):
        t=MyThread()
        t.start()
死锁现象

输出结果如下:

Thread-1 拿到A锁
Thread-1 拿到B锁
Thread-1 拿到B锁
Thread-2 拿到A锁

分析如上代码是如何产生死锁的: 
启动 5 个线程,执行 run 方法,假如 thread1 首先抢到了 A 锁,此时 thread1 没有释放 A 锁,紧接着执行代码 mutexB.acquire (),抢到了 B 锁,在抢 B 锁时候,没有其他线程与 thread1 争抢,因为 A 锁没有释放,其他线程只能等待,然后 A 锁就执行完 func1 代码,然后继续执行 func2 代码,与之同时,在 func2 中,执行代码 mutexB.acquire (),抢到了 B 锁,然后进入睡眠状态,在 thread1 执行完 func1 函数,释放 AB 锁时候,其他剩余的线程也开始抢 A 锁,执行 func1 代码,如果 thread2 抢到了 A 锁,接下来 thread2 要抢 B 锁,ok,在这个时间段,thread1 已经执行 func2 抢到了 B 锁,然后在 sleep (2), 持有 B 锁没有释放,为什么没有释放,因为没有其他的线程与之争抢,他只能睡着,然后 thread1 握着 B 锁,thread2 要抢 B 锁,ok,这样就形成了死锁

递归锁

我们分析了死锁,那么 python 里面是如何解决这样的递归锁呢? 
在 Python 中为了支持在同一线程中多次请求同一资源,python 提供了可重入锁 RLock。

这个 RLock 内部维护着一个 Lock 和一个 counter 变量,counter 记录了 acquire 的次数,从而使得资源可以被多次 require。直到一个线程所有的 acquire 都被 release,其他的线程才能获得资源。上面的例子如果使用 RLock 代替 Lock,则不会发生死锁:

from threading import Thread,Lock,RLock
import time

mutexA=mutexB=RLock()


class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()
        print(''%s 拿到A锁'' %self.name)

        mutexB.acquire()
        print(''%s 拿到B锁'' %self.name)
        mutexB.release()

        mutexA.release()

    def f2(self):
        mutexB.acquire()
        print(''%s 拿到B锁'' % self.name)
        time.sleep(0.1)
        mutexA.acquire()
        print(''%s 拿到A锁'' % self.name)
        mutexA.release()

        mutexB.release()


if __name__ == ''__main__'':
    for i in range(5):
        t=MyThread()
        t.start()
递归锁

输出结果:

Thread-1 拿到A锁
Thread-1 拿到B锁
Thread-1 拿到B锁
Thread-1 拿到A锁
Thread-2 拿到A锁
Thread-2 拿到B锁
Thread-2 拿到B锁
Thread-2 拿到A锁
Thread-4 拿到A锁
Thread-4 拿到B锁
Thread-4 拿到B锁
Thread-4 拿到A锁
Thread-3 拿到A锁
Thread-3 拿到B锁
Thread-3 拿到B锁
Thread-3 拿到A锁
Thread-5 拿到A锁
Thread-5 拿到B锁
Thread-5 拿到B锁
Thread-5 拿到A锁
View Code

来解释下递归锁的代码: 
由于锁 A,B 是同一个递归锁,thread1 拿到 A,B 锁,counter 记录了 acquire 的次数 2 次,然后在 func1 执行完毕,就释放递归锁,在 thread1 释放完递归锁,执行完 func1 代码,接下来会有 2 种可能,1、thread1 在次抢到递归锁,执行 func2 代码 2、其他的线程抢到递归锁,去执行 func1 的任务代码

5. 信号量 Semaphore(其实也是一把锁)

Semaphore 管理一个内置的计数器

Semaphore 与进程池看起来类似,但是是完全不同的概念。

进程池:Pool (4),最大只能产生四个进程,而且从头到尾都只是这四个进程,不会产生新的。

信号量:信号量是产生的一堆进程 / 线程,即产生了多个任务都去抢那一把锁

 1 from threading import Thread,Semaphore,currentThread
 2 import threading
 3 import time,random
 4 sm=Semaphore(5)#运行的时候有5个人
 5 def task():
 6     sm.acquire()
 7     print(''\033[42m %s上厕所'' % currentThread().getName())
 8     time.sleep(random.randint(1,3))
 9     print(''\033[31m %s上完厕所走了'' % currentThread().getName())
10     sm.release()
11 
12 if __name__=="__main__":
13     for i in range(20):#开了10个线程 ,这20人都要上厕所
14         t=Thread(target=task)
15         t.start()
16 
17 
18 /*********************
19 import threading
20 from threading import Thread,Semaphore,currentThread
21 #继承实例化
22 class MyThread(threading.Thread):
23     def run(self):
24         if semaphore.acquire():
25             print(self.name)
26             time.sleep(3)
27             semaphore.release()
28 
29 if __name__=="__main__":
30     semaphore=threading.Semaphore(5)
31     thrs=[]
32     for i in range(23):
33         thrs.append(MyThread())
34     for t in thrs:
35         t.start()
信号量实例

输出结果:

D:\ProgramData\Anaconda3\python.exe C:/Users/Administrator/PycharmProjects/untitled1/week4/进程与线程/锁的使用.py
 Thread-1上厕所
 Thread-2上厕所
 Thread-3上厕所
 Thread-4上厕所
 Thread-5上厕所
 Thread-1上完厕所走了
 Thread-4上完厕所走了
 Thread-7上厕所
 Thread-6上厕所
 Thread-5上完厕所走了
 Thread-8上厕所
 Thread-3上完厕所走了
 Thread-2上完厕所走了
 Thread-6上完厕所走了
 Thread-9上厕所
 Thread-10上厕所
 Thread-11上厕所
 Thread-8上完厕所走了
 Thread-12上厕所
 Thread-9上完厕所走了
 Thread-7上完厕所走了
 Thread-14上厕所
 Thread-13上厕所
 Thread-14上完厕所走了
 Thread-13上完厕所走了
 Thread-16上厕所
 Thread-15上厕所
 Thread-12上完厕所走了
 Thread-17上厕所
 Thread-11上完厕所走了
 Thread-16上完厕所走了
 Thread-10上完厕所走了
 Thread-18上厕所
 Thread-19上厕所
 Thread-20上厕所
 Thread-18上完厕所走了
 Thread-20上完厕所走了
 Thread-15上完厕所走了
 Thread-19上完厕所走了
 Thread-17上完厕所走了

Process finished with exit code 0
View Code

 

6.Event

线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用 threading 库中的 Event 对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event 对象中的信号标志被设置为假。如果有线程等待一个 Event 对象,而这个 Event 对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个 Event 对象的信号标志设置为真,它将唤醒所有等待这个 Event 对象的线程。如果一个线程等待一个已经被设置为真的 Event 对象,那么它将忽略这个事件,继续执行

from threading import Event
Event.isSet() #返回event的状态值
Event.wait() #如果 event.isSet()==False将阻塞线程;
Event.set() #设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
Event.clear() #恢复

例如 1.,有多个工作线程尝试链接 MySQL,我们想要在链接前确保 MySQL 服务正常才让那些工作线程去连接 MySQL 服务器,如果连接不成功,都会去尝试重新连接。那么我们就可以采用 threading.Event 机制来协调各个工作线程的连接操作

from threading import Thread,Event,currentThread
import  time
e=Event()
def conn_mysql():
    #链接数据库
    count=1
    while not e.is_set(): #当没有检测到时候
        if count>3:
            raise ConnectionError(''尝试链接的次数过多'')
        print(''\033[45m%s 第%s次尝试'' % (currentThread(), count))
        e.wait(timeout=1)#等待检测(里面的参数是超时1秒)
        count += 1
    print(''\033[44m%s 开始链接...'' % (currentThread().getName()))

def  chek_mysql():
  #检测数据库
   print( print(''\033[42m%s 检测mysql...'' % (currentThread().getName())))
   time.sleep(5)
   e.set()

if __name__=="__main__":
    for i in range(3):
        t=Thread(target=conn_mysql)
        t.start()
    #e.set()
    t=Thread(target=chek_mysql)
    t.start()
View Code
def traffic_ligths():
    #红绿灯
    time.sleep(5)
    e.set()

def car():
    #
    print(''\033[42m %s 等绿灯\033[0m'' % currentThread().getName())
    e.wait()
    print(''\033[44m %s 车开始通行'' % currentThread().getName())

if __name__==''__main__'':
    for i in range(10):
        t=Thread(target=car) #10辆车
        t.start()
    traffic_thread=Thread(target=traffic_ligths)
    traffic_thread.start()
红绿灯例子

7. 定时器(Timer)

from threading import Timer
def func(n):
    print(''hello world'',n)
t=Timer(3,func,args=(123,))#等待三秒后执行func函数,因为func函数有参数,那就再传一个参数进去
t.start()

8. 线程 queue

queue 队列 :使用 import queue,用法与进程 Queue 一样

queue.Queue(maxsize=0) #先进先出

import queue
q=queue.Queue(3)#先进先出
q.put(''first'')
q.put(''second'')
q.put(''third'')
print(q.get())
print(q.get())
print(q.get())
View Code

queue.LifoQueue(maxsize=0)# 先进后出

q=queue.LifoQueue()#先进后出
q.put(''first'')
q.put(''second'')
q.put(''third'')
print(q.get())
print(q.get())
print(q.get())
View Code

queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

# put进入一个元组,元组的第一个元素是优先级
# (通常也可以是数字,或者也可以是非数字之间的比较)
# 数字越小,优先级越高
import queue
q=queue.PriorityQueue()
q.put((20,''a''))
q.put((10,''b''))
q.put((30,''c''))
print(q.get())
print(q.get())
print(q.get())
View Code

 

Python 多线程、多进程 (三)之 线程进程对比、多进程

Python 多线程、多进程 (三)之 线程进程对比、多进程

Python 多线程、多进程 (一)之 源码执行流程、GIL Python 多线程、多进程 (二)之 多线程、同步、通信 Python 多线程、多进程 (三)之 线程进程对比、多线程

一、多线程与多进程的对比

在之前简单的提过,CPython 中的 GIL 使得同一时刻只能有一个线程运行,即并发执行。并且即使是多核 CPU,GIL 使得同一个进程中的多个线程也无法映射到多个 CPU 上运行,这么做最初是为了安全着想,慢慢的也成为了限制 CPython 性能的问题。 一个线程想要执行,就必须得到 GIL,否则就不能拿到 CPU 资源。但是也不是说一个线程在拿到 CPU 资源后就一劳永逸,在执行的过程中 GIL 可能会释放并被其他线程获取,所以说其它的线程会与本线程竞争 CPU 资源,线程是抢占式执行的。具体可在 understand GIL 中看到,<a href="http://www.dabeaz.com/python/UnderstandingGIL.pdf" target="blank">[传送门]</a>。 多线程在 python2 中:当一个线程进行 I/O 的时候会释放锁,另外当 ticks 计数达到 100(ticks 可以看作是 Python 自身的一个计数器,也可对比着字节码指令理解,专门做用于 GIL,每次释放后归零,这个计数可以通过 sys.setcheckinterval 来调整)。锁释放之后,就涉及到线程的调度,线程的锁进行,线程的切换。这是会消耗 CPU 资源,因此会造成程序性能问题和等待时延。另外由于线程共享内存的问题,没有进程安全性高。 但是对于多进程,GIL 就无法限制,多个进程可以再多个 CPU 上运行,充分利用多核优势。事情往往是相对的,虽然可以充分利用多核优势,但是进程之的创建和调度却比线程的代价更高。 所以选择多线程还是多进程,主要还是看怎样权衡代价,什么样的情况。

1、CPU 密集代码

下面来利用斐波那契数列模拟 CPU 密集运算。

def fib(n):
    # 求斐波那契数列的第n个值
    if n<=2:
        return 1
    return fib(n-1)+fib(n-2)

<1>、多进程

打印第 25 到 35 个斐波那契数,并计算程序运行时间

import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from concurrent.futures import ProcessPoolExecutor


def fib(n):
    if n<=2:
        return 1
    return fib(n-1)+fib(n-2)

if __name__ == "__main__":
    with ProcessPoolExecutor(3) as executor:  # 使用进程池控制  每次执行3个进程
        all_task = [executor.submit(fib, (num)) for num in range(25,35)]
        start_time = time.time()
        for future in as_completed(all_task):
            data = future.result()
            print("exe result: {}".format(data))

        print("last time is: {}".format(time.time()-start_time))

# 输出
exe result: 75025
exe result: 121393
exe result: 196418
exe result: 317811
exe result: 514229
exe result: 832040
exe result: 1346269
exe result: 2178309
exe result: 3524578
exe result: 5702887
last time is: 4.457437038421631

输出结果,每次打印三个 exe result,总重打印十个结果,多进程运行时间为 4.45 秒

<2>、多线程

import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from concurrent.futures import ProcessPoolExecutor


def fib(n):
    if n<=2:
        return 1
    return fib(n-1)+fib(n-2)

if __name__ == "__main__":
    with ThreadPoolExecutor(3) as executor:  # 使用线程池控制  每次执行3个线程
        all_task = [executor.submit(fib, (num)) for num in range(25,35)]
        start_time = time.time()
        for future in as_completed(all_task):
            data = future.result()
            print("exe result: {}".format(data))

        print("last time is: {}".format(time.time()-start_time))

# 输出
exe result: 121393
exe result: 75025
exe result: 196418
exe result: 317811
exe result: 514229
exe result: 832040
exe result: 1346269
exe result: 2178309
exe result: 3524578
exe result: 5702887
last time is: 7.3467772006988525

最终程序运行时间为 7.34 秒

程序的执行之间与计算机的性能有关,每天计算机的执行时间都会有差异。从上述结果中看显然多线程比多进程要耗费时间。这就是因为对于密集代码 (密集运算,循环语句等),tick 计数很快达到 100,GIL 来回的释放竞争,线程之间频繁切换,所以对于密集代码的执行中,多线程性能不如对进程。

2、I/O 密集代码

一个线程在 I/O 阻塞的时候,会释放 GIL,挂起,然后其他的线程会竞争 CPU 资源,涉及到线程的切换,但是这种代价与较高时延的 I/O 来说是不足为道的。 下面用 sleep 函数模拟密集 I/O

def random_sleep(n):
    time.sleep(n)
    return n

<1>、 多进程

def random_sleep(n):
    time.sleep(n)
    return n

if __name__ == "__main__":
    with ProcessPoolExecutor(5) as executor:
        all_task = [executor.submit(random_sleep, (num)) for num in [2]*30]
        start_time = time.time()
        for future in as_completed(all_task):
            data = future.result()
            print("exe result: {}".format(data))

        print("last time is: {}".format(time.time()-start_time))
#  输出
exe result: 2
exe result: 2
......(30个)
exe result: 2
exe result: 2
last time is: 12.412866353988647

每次打印 5 个结果,总共二十个打印结果,多进程运行时间为 12.41 秒

<2>、多线程

def random_sleep(n):
    time.sleep(n)
    return n

if __name__ == "__main__":
    with ThreadPoolExecutor(5) as executor:
        all_task = [executor.submit(random_sleep, (num)) for num in [2]*30]
        start_time = time.time()
        for future in as_completed(all_task):
            data = future.result()
            print("exe result: {}".format(data))

        print("last time is: {}".format(time.time()-start_time))

#  输出
exe result: 2
exe result: 2
......(30个)
exe result: 2
exe result: 2
last time is: 12.004231214523315

I/O 密集多线程情况下,程序的性能较多进程有了略微的提高。IO 密集型代码 (文件处理、网络爬虫等),多线程能够有效提升效率 (单线程下有 IO 操作会进行 IO 等待,造成不必要的时间浪费,而开启多线程能在线程 A 等待时,自动切换到线程 B,可以不浪费 CPU 的资源,从而能提升程序执行效率)。所以 python 的多线程对 IO 密集型代码比较友好

3、线程进程对比

  • CPU 密集型代码 (各种循环处理、计数等等),多线程性能不如多进程。
  • I/O 密集型代码 (文件处理、网络爬虫等),多进程不如多线程。

二、多进程

在 python 进程、线程 (一) 已经有简单的进程介绍。 不过与多线程编程相比,最需要注意的是这里多进程由并发执行变成了真正意义上的并行执行。

1、fork () 调用

Unix/Linux 操作系统提供了一个 fork () 系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是 fork () 调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。子进程永远返回 0,而父进程返回子进程的 ID。这样做的理由是,一个父进程可以 fork 出很多子进程,所以,父进程要记下每个子进程的 ID,而子进程只需要调用 getppid () 就可以拿到父进程的 ID。Python 的 os 模块封装了常见的系统调用,其中就包括 fork,可以在 Python 程序中轻松创建子进程,但是还是要有 Unix/Linux 系统支持,windows 没有系统调用 fork (),可以在本地虚拟机或者云服务器尝试,默认 liunx 发行版中是有 python2.X 的。 情况一

import os

print("Lanyu")  # 只打印一次
pid = os.fork()

if pid == 0:
  print(''子进程 {} ,父进程是: {}.'' .format(os.getpid(), os.getppid()))
else:
  print(''我是父进程:{}.''.format(os.getpid())

# 输出
Lanyu
我是父进程:2993
子进程2994,父进程2993

fork () 调用复制了一个进程,然后程序中就有两个进程,父进程的 pid 不为 0,所以先打印子进程 2994,父进程 2993。然后子进程 pid=0,打印我是父进程:2993。这里的 Lanyu 打印一次 情况二

import os


pid = os.fork()
print("Lanyu")  # 这里打印两次
if pid == 0:
  print(''子进程 {} ,父进程是: {}.'' .format(os.getpid(), os.getppid()))
else:
  print(''我是父进程:{}.''.format(os.getpid())

# 输出
Lanyu
我是父进程:2993
Lanyu
子进程2994,父进程2993

这里的 Lanyu 打印两次是因为,由于 fork () 函数调用之后,程序立即成成一个子进程,主进程打印一次,子进程再打印一次。因此这里的 Lanyu 打印两次

情况三: 还记得操作系统专业课的时候,老师讲的一道考研题

int main{
    fork();
    fork();
    fork():
    printf(''process'')
    return 0;
}

三次 fork (),问此程序最终打印几个次 process,关键在于 fork () 函数的用途,每一次都会复制一次进程,则最终,一个父进程被复制成 8 个进程,打印 8 次。

2、python 多进程

虽然 python 中没有提供直接的进程调用函数,但是标准库中的模块提供能更多更方便的选择。 ProcessPoolExecutor 进程池,与 multiprocessing 标准的多进程模块。其实 ProcessPoolExecutor 也是对 multiprocessing 的封装调用,并且与 ThreadPoolExecutor 线程池提供的接口类似。而 multiprocessing 则更加底层。

<1>、进程编程

import time
import multiprocessing

def get_html(n):
    time.sleep(n)
    print("sub_progress success")
    return n

if __name__ == "__main__":
    progress = multiprocessing.Process(target=get_html, args=(2,))
    print(progress.pid)  # 打印结果为None,因为这个时候进程还未开启
    progress.start()  # 进程开启
    print(progress.pid)
    progress.join()
    print("main progress end")

# 输出
None
5056
sub_progress success
main progress end

<2>、使用进程池

import time
import multiprocessing

def get_html(n):
    time.sleep(n)
    print("sub_progress success")
    return n


if __name__ == "__main__":
    #使用进程池
    pool = multiprocessing.Pool(multiprocessing.cpu_count())  # 可以指明进程数,默认等于CPU数
    result = pool.apply_async(get_html, args=(3,))

    #等待所有任务完成
    pool.close()
    pool.join()

    print(result.get())

# 输出
sub_progress success
3

<3>、imap 接口

实例一

import time
import multiprocessing

def get_html(n):
    time.sleep(n)
    print("sub_progress success")
    return n


if __name__ == "__main__":

    # imap
    for result in pool.imap(get_html, [1,5,3]):
        print("{} sleep success".format(result))

# 输出
sub_progress success
1 sleep success
sub_progress success
sub_progress success
5 sleep success
3 sleep success

imap 有点像 python 提供的内置函数 map,讲 [1,5,3] 这个列表中的值一个一个传递给 get_html 函数对象,并按照传值的先后顺序,一一执行输出进程结果。

实例二:

import multiprocessing  

import time
def get_html(n):
    time.sleep(n)
    print("sub_progress success")
    return n


if __name__ == "__main__":

    pool = multiprocessing.Pool(multiprocessing.cpu_count())  # 可以进程数,不过最好是等于CPU数,这里也是进程数

    for result in pool.imap_unordered(get_html, [1,5,3]):
        print("{} sleep success".format(result))
# 输出
sub_progress success
1 sleep success
sub_progress success
3 sleep success
sub_progress success
5 sleep success

与 imap 方法不同的是 imap_unordered 方法,imap_unordered 是按照进程的执行完成的先后顺序,打印进程执行结果,而不是依照列表中的先后顺序。可以依照需要调用。

划重点 <b> 多进程编程中,需要在__name__ == "main" 下编写 </b>

更多 API 参考:<a href="https://yiyibooks.cn/xx/python_352/library/multiprocessing.html" target="blank">[传送门]</a>

3、进程通信

<1>、共享变量通信

类比线程之间的通信,首先想到的就是共享变量通信。但是在多进程中,一个进程都有自的隔离区,导致变量不能共享。 情况一

def producer(a):
    a += 100
    time.sleep(2)

def consumer(a):
    time.sleep(2)
    print(a)

if __name__ == "__main__":
    a = 1
    my_producer = Process(target=producer, args=(a,))
    my_consumer = Process(target=consumer, args=(a,))
    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_consumer.join()

# 输出
1

结果进程没有共享变量。

但是 Python 的标准模块提供了 Manager () 在内存中划出一块单独的内存区,供所有的进程使用,共享变量。 情况二

from multiprocessing import Process, Manager

def add_data(p_dict, key, value):
    p_dict[key] = value

if __name__ == "__main__":
    progress_dict = Manager().dict()

    first_progress = Process(target=add_data, args=(progress_dict, "666", 666))  # 更新progress_dict
    second_progress = Process(target=add_data, args=(progress_dict, "999", 999))  # 更新progress_dict

    first_progress.start()
    second_progress.start()
    first_progress.join()
    second_progress.join()

    print(progress_dict)

# 打印结果
{''666'': 666, ''999'': 999}  # 实现了变量的共享

在 Manager 中还可以有其它的数据结构,例如列表数组等可共享使用。

因此,在使用多进程编程的时候,如果像情况二共享全局变量,就仍旧需要加锁实现进程同步。

<2>、Queue 队列通信

在 multiprocessing 模块中有 Queue 类安全的队列,也可以实现通信,不过在这种情况下无法联通线程池。

import time
from multiprocessing import Process, Queue, Pool, Manager

def producer(queue):
    queue.put("a")
    time.sleep(2)

def consumer(queue):
    time.sleep(2)
    data = queue.get()
    print(data)

if __name__ == "__main__":
    queue = Queue(10)  # 使用普通的Queue
    pool = Pool(2)

    pool.apply_async(producer, args=(queue,))
    pool.apply_async(consumer, args=(queue,))

    pool.close()
    pool.join()

# 无输出

想要使用进程池又实现消息队列通信就需要用到 Manager 管理者

import time
from multiprocessing import Process, Queue, Pool, Manager

def producer(queue):
    queue.put("a")
    time.sleep(2)

def consumer(queue):
    time.sleep(2)
    data = queue.get()
    print(data)

if __name__ == "__main__":
    queue = Manager().Queue(10)  # 在使用Manger的时候需要先将Manager实例化在调用Queue
    pool = Pool(2)

    pool.apply_async(producer, args=(queue,))
    pool.apply_async(consumer, args=(queue,))

    pool.close()
    pool.join()

# 输出
正常打印字符a

<3>、pipe 管道通信

pipe 也用于进程通信,从功能上说,提供的接口应该是 queue 的子集。但是 queue 为了更好的控制,所以内部加了很多的锁,而 pipe 在两个进程通信的时候性能会比 queue 更好一些。

def producer(pipe):
    pipe.send("Lanyu")

def consumer(pipe):
    print(pipe.recv())

if __name__ == "__main__":
    recevie_pipe, send_pipe = Pipe()
    #pipe只能适用于两个进程
    my_producer= Process(target=producer, args=(send_pipe, ))
    my_consumer = Process(target=consumer, args=(recevie_pipe,))

    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_consumer.join()

# 输出
Lanyu

三、总结

最开始为了引出 GIL,简单输了 python 源码的执行流程,也是先编译成字节码再执行。在 CPython 中,为了数据完整性和状态同步才有 GIL,GIL 同样使得多线程不能利用 CPU 多核优势,所以性能低部分是因为 GIL。

线程需要加上 GIL 才能获取 CPU 资源,才能执行。线程通信的时候,可以用消息队列 Queue 和全局变量,但是对于全局变量这种通信方式,在执行字节码一定数量之后,会释放 GIL,线程抢占式执行同样导致变量的混乱,所以我们加上了用户级别的互斥锁 Lock,或者迭代锁 Rlock 保证了线程的状态同步。condition 帮我们实现了线程的复杂通信,而 semaphore 信号量,使得我们在多个线程的情况下,控制并发线程的数量。线程池进一步的封装,提供了对线程的状态,异步控制等操作。

对于多进程,可以利用多核 CPU 优势,但是使用多线程和多进程还需要进一步根据密集 I/O 和密集运算型代码等具体情况。多进程标准模块中提供的接口与多线程类似,可相互参照。

陆陆续续总结关于这篇博文也有一个多星期了,但是还是感觉有说不清楚的地方逻辑不通,希望读者能在评论区指出。期间参阅了很多的文档,博客,教程。 印象最深刻的还是 Understand GIL: <a href="http://www.dabeaz.com/python/UnderstandingGIL.pdf" target="blank">[传送门]</a > 这篇关于 GIL 的解释,虽然是英文文档,但是作者总是能以最精炼的句子表达最清晰的观点。

上一篇:Python 多线程、多进程 (二)之 多线程、同步、通信

关于多进程或Python中的线程?python中多进程和多线程的区别的介绍已经告一段落,感谢您的耐心阅读,如果想了解更多关于PyQt应用程序中的线程:使用Qt线程还是Python线程?、python ftp文件线程或多进程、Python 中的线程和进程、Python 多线程、多进程 (三)之 线程进程对比、多进程的相关信息,请在本站寻找。

本文标签: