python多进程
多进程使用使用场景
计算密集型(多进程)
计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,
计算密集型任务同时进行的数量应当等于CPU的核心数。(多进程)
IO密集型(多线程)
涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。
对于IO密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C语言最差。
什么是进程?
进程就是一个程序在一个数据集上的一次动态执行过程。进程一般由程序、数据集、进程控制块三部分组成。我们编写的程序用来描述进程要完成哪些功能以及如何完成;数据集则是程序在执行过程中所需要使用的资源;进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志。
进程的生命周期:创建(New)、就绪(Runnable)、运行(Running)、阻塞(Block)、销毁(Destroy)
进程的状态(分类):(Actived)活动进程、可见进程(Visiable)、后台进程(Background)、服务进程(Service)、空进程
python的三种进程创建方式
-
fork: os.fork() 函数在 Windows 系统上无效,只在 UNIX 及类 UNIX 系统上(包括UNIX、Linux 和 Mac OS X)效。
-
import os print(\'父进程 ID =\', os.getpid()) # 创建一个子进程,下面代码会被两个进程执行 pid = os.fork() print(\'当前进程 ID =\',os.getpid(),\" pid=\",pid) #根据 pid 值,分别为子进程和父进程布置任务 if pid == 0: print(\'子进程, ID=\',os.getpid(),\" 父进程 ID=\",os.getppid()) else: print(\'父进程, ID=\',os.getpid(),\" pid=\",pid) #pid为0的代表子进程。 \"\"\" 其中,pid 作为函数的返回值,主进程和子进程都会执行该语句,但主进程执行 fork() 函数得到的 pid 值为非 0 值(其实是子进程的进程 ID),而子进程执行该语句得到的 pid 值为 0。因此,pid 常常作为区分父进程和子进程的标志。 在大多数操作系统中,都会为执行的进程配备唯一的 ID 号,os 模块提供了 getpid() 和 getppid() 函数,可分别用来获取当前进程的 ID 号和父进程的 ID 号。 \"\"\"
-
缺点:
1.兼容性差,只能在类linux系统下使用,windows系统不可使用;
2.扩展性差,当需要多条进程的时候,进程管理变得很复杂;
3.会产生“孤儿”进程和“僵尸”进程,需要手动回收资源。 -
优点:
是系统自带的接近低层的创建方式,运行效率高。
-
-
Process进程
-
# -*- coding: utf-8 -*- import os from multiprocessing import Process import time def fun(name): print(\"2 子进程信息: pid=%s, ppid=%s\" % (os.getpid(), os.getppid())) print(\"hello \" + name) def test(): print(\'ssss\') if __name__ == \"__main__\": print(\"1 主进程信息: pid=%s, ppid=%s\" % (os.getpid(), os.getppid())) ps = Process(target=fun, args=(\'jingsanpang\', )) print(\"111 ##### ps pid: \" + str(ps.pid) + \", ident:\" + str(ps.ident)) print(\"3 进程信息: pid=%s, ppid=%s\" % (os.getpid(), os.getppid())) print(ps.is_alive()) # 启动之前 is_alive为False(系统未创建) ps.start() print(ps.is_alive()) # 启动之后,is_alive为True(系统已创建) print(\"222 #### ps pid: \" + str(ps.pid) + \", ident:\" + str(ps.ident)) print(\"4 进程信息: pid=%s, ppid=%s\" % (os.getpid(), os.getppid())) ps.join() # 等待子进程完成任务 类似于os.wait() print(ps.is_alive()) print(\"5 进程信息: pid=%s, ppid=%s\" % (os.getpid(), os.getppid())) ps.terminate() #终断进程 print(\"6 进程信息: pid=%s, ppid=%s\" % (os.getpid(), os.getppid()))
-
特点:
-
主进程执行完后会默认等待子进程结束后回收资源,不需要手动回收资源;join()函数用来控制子进程结束的顺序,其内部也有一个清除僵尸进程的函数,可以回收资源;
-
Process进程创建时,子进程会将主进程的Process对象完全复制一份,这样在主进程和子进程各有一个 Process对象,但是p.start()启动的是子进程,主进程中的Process对象作为一个静态对象存在,不执行。
-
当子进程执行完毕后,会产生一个僵尸进程,其会被join函数回收,或者再有一条进程开启,start函数也会回收僵尸进程,所以不一定需要写join函数。
-
windows系统在子进程结束后会立即自动清除子进程的Process对象,而linux系统子进程的Process对象如果没有join函数和start函数的话会在主进程结束后统一清除。
-
注意:Process对象可以创建进程,但Process对象不是进程,其删除与否与系统资源是否被回收没有直接的关系。
另外还可以通过继承Process对象来重写run方法创建进程
-
-
-
进程池POOL(多个进程)
-
apply 阻塞式
-
import time from multiprocessing import Pool def run(msg): print(\'msg:%s\' %msg) 程序随眠3秒, time.sleep(3) print(\'end\') if __name__ == \"__main__\": print(\"开始执行主程序\") start_time=time.time() # 使用进程池创建子进程 size=3 pool=Pool(size) print(\"开始执行子进程\") for i in range(size): pool.apply(run,(i,)) print(\"主进程结束耗时%s\"%(time.time()-start_time)) \"\"\" 开始执行主程序 开始执行子进程 msg:0 end msg:1 end msg:2 end 主进程结束耗时9.223527431488037 \"\"\"
-
进程开始运行,碰到子进程,操作系统切换到子进程,等待子进程运行结束后,再切换到另外一个子进程,直到所有子进程运行完毕。然后在切换到主进程,运行剩余的部分。
-
-
apply_async 异步非阻塞
-
import time from multiprocessing import Pool def run(msg): print(\'msg:%s\' %msg) # 程序随眠3秒, time.sleep(3) print(\'end\') if __name__ == \"__main__\": print(\"开始执行主程序\") start_time=time.time() # 使用进程池创建子进程 size=3 pool=Pool(size) print(\"开始执行子进程\") for i in range(size): pool.apply_async(run,(i,)) print(\"主进程结束耗时%s\"%(time.time()-start_time)) \"\"\" 开始执行主程序 开始执行子进程 主进程结束耗时0.06100344657897949 \"\"\"
-
主进程开始运行,碰到子进程后,主进程说:让我先运行个够,等到操作系统进行进程切换的时候,再交给子进程运行。因为我们的程序太短,还没等到操作系统进行进程切换,主进程就运行完毕了。
-
-
python多线程
python主要是通过thread和threading这两个模块来实现多线程支持。python的thread模块是比较底层的模块,python的threading模块是对thread做了一些封装,可以更加方便的被使用。但是python(cpython)由于GIL的存在无法使用threading充分利用CPU资源,如果想充分发挥多核CPU的计算能力需要使用multiprocessing模块(Windows下使用会有诸多问题)。
GIL 限制了同一时刻只能有一个线程运行,无法发挥多核 CPU 的优势。首先需要明确的一点是 GIL 并不是 Python 的特性,它是在实现 Python 解析器(CPython)时所引入的一个概念。就好比 C++ 是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++ ,Visual C++等。Python 也一样,同样一段代码可以通过 CPython,PyPy,Psyco 等不同的 Python 执行环境来执行。像其中的 JPython 就没有GIL。然而因为 CPython 是大部分环境下默认的 Python 执行环境。所以在很多人的概念里 CPython 就是 Python,也就想当然的把 GIL 归结为 Python 语言的缺陷。所以这里要先明确一点:GIL 并不是 Python 的特性,Python 完全可以不依赖于 GIL。
GIL 本质就是一把互斥锁,既然是互斥锁,所有互斥锁的本质都一样,都是将并发运行变成串行,以此来控制同一时间内共享数据只能被一个任务所修改,进而保证数据安全.在一个 Python 的进程内,不仅有主线程或者由该主线程开启的其他线程,还有解释器开启的垃圾回收等解释器级别的线程,总之,所有线程都运行在这一个进程内,所有数据都是共享的,这其中,代码作为一种数据也是被所有线程共享的,多个线程先访问到解释器的代码,即拿到执行权限,然后将 target 的代码交给解释器的代码去执行,
解释器的代码是所有线程共享的,所以垃圾回收线程也可能访问到解释器的代码而去执行,这就导致了一个问题:对于同一个数据 100,可能线程 1 执行 x=100 的同时,而垃圾回收执行的是回收 100 的操作,解决这种问题没有什么高明的方法,就是加锁处理,即 GIL。
什么是线程
线程是指进程内的一个执行单元,也是进程内的可调度实体.
进程和线程的联系
- 进程就是一个应用程序在处理机上的一次执行过程,它是一个动态的概念,**而线程是进程中的一部分,进程包含多个线程在运行。
- 多线程可以共享全局变量,多进程不能。多线程中,所有子线程的进程号相同;多进程中,不同的子进程进程号不同。
- 进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调度的一个独立单位.
- 线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位.线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源
- 一个线程可以创建和撤销另一个线程;同一个进程中的多个线程之间可以并发执行
进程和线程的区别
- 地址空间:进程内的一个执行单元;进程至少有一个线程;它们共享进程的地址空间;而进程有自己独立的地址空间;
- 资源拥有:进程是资源分配和拥有的单位,同一个进程内的线程共享进程的资源
- 线程是处理器调度的基本单位,但进程是系统进行资源分配和调度的基本单位.
- 二者均可并发执行.
- 简而言之,一个程序至少有一个进程,一个进程至少有一个线程.
- 线程的划分尺度小于进程,使得多线程程序的并发性高。另外,进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率。
python多线程方法
threading 进行多线程
import time
import threading
def task_thread(counter):
print(f\'线程名称:{threading.current_thread().name} 参数:{counter} 开始时间:{time.strftime(\"%Y-%m-%d %H:%M:%S\")}\')
num = counter
while num:
time.sleep(3)
num -= 1
print(f\'线程名称:{threading.current_thread().name} 参数:{counter} 结束时间:{time.strftime(\"%Y-%m-%d %H:%M:%S\")}\')
if __name__ == \'__main__\':
print(f\'主线程开始时间:{time.strftime(\"%Y-%m-%d %H:%M:%S\")}\')
#初始化3个线程,传递不同的参数
t1 = threading.Thread(target=task_thread, args=(3,))
t2 = threading.Thread(target=task_thread, args=(2,))
t3 = threading.Thread(target=task_thread, args=(1,))
#开启三个线程
t1.start()
t2.start()
t3.start()
#等待运行结束
t1.join()
t2.join()
t3.join()
print(f\'主线程结束时间:{time.strftime(\"%Y-%m-%d %H:%M:%S\")}\')
\"\"\"
主线程开始时间:2022-03-02 12:49:20
线程名称:Thread-1 参数:3 开始时间:2022-03-02 12:49:20
线程名称:Thread-2 参数:2 开始时间:2022-03-02 12:49:20
线程名称:Thread-3 参数:1 开始时间:2022-03-02 12:49:20
线程名称:Thread-3 参数:1 结束时间:2022-03-02 12:49:23 3
线程名称:Thread-2 参数:2 结束时间:2022-03-02 12:49:26 6
线程名称:Thread-1 参数:3 结束时间:2022-03-02 12:49:29 9
体现了GIL锁机制
主线程结束时间:2022-03-02 12:49:29
\"\"\"
1创建 threading.Thread 实例,调用其 start() 方法
-
import time import threading class MyThread(threading.Thread): def __init__(self, counter): super().__init__() self.counter = counter def run(self): print( f\'线程名称:{threading.current_thread().name} 参数:{self.counter} 开始时间:{time.strftime(\"%Y-%m-%d %H:%M:%S\")}\' ) counter = self.counter while counter: time.sleep(3) counter -= 1 print( f\'线程名称:{threading.current_thread().name} 参数:{self.counter} 结束时间:{time.strftime(\"%Y-%m-%d %H:%M:%S\")}\' ) if __name__ == \"__main__\": print(f\'主线程开始时间:{time.strftime(\"%Y-%m-%d %H:%M:%S\")}\') # 初始化3个线程,传递不同的参数 t1 = MyThread(3) t2 = MyThread(2) t3 = MyThread(1) # 开启三个线程 t1.start() t2.start() t3.start() # 等待运行结束 t1.join() t2.join() t3.join() print(f\'主线程结束时间:{time.strftime(\"%Y-%m-%d %H:%M:%S\")}\')
2继承 Thread 类,在子类中重写 run() 和 init() 方法
-
# 传参 import time import threading def task_thread(counter): print(f\'线程名称:{threading.current_thread().name} 参数:{counter} 开始时间:{time.strftime(\"%Y-%m-%d %H:%M:%S\")}\') num = counter while num: time.sleep(3) num -= 1 print(f\'线程名称:{threading.current_thread().name} 参数:{counter} 结束时间:{time.strftime(\"%Y-%m-%d %H:%M:%S\")}\') class MyThread(threading.Thread): def __init__(self, target, args): super().__init__() self.target = target self.args = args def run(self): self.target(*self.args) if __name__ == \"__main__\": print(f\'主线程开始时间:{time.strftime(\"%Y-%m-%d %H:%M:%S\")}\') # 初始化3个线程,传递不同的参数 t1 = MyThread(target=task_thread,args=(3,)) t2 = MyThread(target=task_thread,args=(2,)) t3 = MyThread(target=task_thread,args=(1,)) # 开启三个线程 t1.start() t2.start() t3.start() # 等待运行结束 t1.join() t2.join() t3.join() print(f\'主线程结束时间:{time.strftime(\"%Y-%m-%d %H:%M:%S\")}\')
3线程同步-lock(互斥锁-保证数据的正确性)
import time, threading
num = 0
lock = threading.Lock()
def task_thread(n):
global num
# 获取锁,用于线程同步
\"\"\"
为了保证数据的正确性,需要使用互斥锁对多个线程进行同步,限制当一个线程正在访问数据时,其他只能等待,直到前一线程释放锁。使用 threading.Thread 对象的 Lock 和 Rlock 可以实现简单的线程同步,这两个对象都有 acquire 方法和 release 方法,对于那些需要每次只允许一个线程操作的数据,可以将其操作放到 acquire 和 release 方法之间。
\"\"\"
lock.acquire()
for i in range(1000000):
num = num + n
num = num - n
#释放锁,开启下一个线程
lock.release()
t1 = threading.Thread(target=task_thread, args=(6,))
t2 = threading.Thread(target=task_thread, args=(17,))
t3 = threading.Thread(target=task_thread, args=(11,))
t1.start(); t2.start(); t3.start()
t1.join(); t2.join(); t3.join()
print(num)
# 结果为0
4线程同步之 Semaphore(信号量-同时允许一定数量的线程访问共享数据)
import threading
import time
# 同时只有5个人办理业务
semaphore = threading.BoundedSemaphore(5)
# 模拟银行业务办理
def yewubanli(name):
semaphore.acquire()
time.sleep(3)
print(f\"{time.strftime(\'%Y-%m-%d %H:%M:%S\')} {name} 正在办理业务\")
semaphore.release()
thread_list = []
for i in range(12):
t = threading.Thread(target=yewubanli, args=(i,))
thread_list.append(t)
for thread in thread_list:
thread.start()
for thread in thread_list:
thread.join()
# while threading.active_count() != 1:
# time.sleep(1)
\"\"\"
2018-07-08 12:33:57 4 正在办理业务
2018-07-08 12:33:57 1 正在办理业务
2018-07-08 12:33:57 3 正在办理业务
2018-07-08 12:33:57 0 正在办理业务
2018-07-08 12:33:57 2 正在办理业务
-------------------------- 5
2018-07-08 12:34:00 7 正在办理业务
2018-07-08 12:34:00 5 正在办理业务
2018-07-08 12:34:00 6 正在办理业务
2018-07-08 12:34:00 9 正在办理业务
2018-07-08 12:34:00 8 正在办理业务
-------------------------- 5
2018-07-08 12:34:03 11 正在办理业务
2018-07-08 12:34:03 10 正在办理业务
\"\"\"
5线程同步之 Condition
条件对象能让一个线程 A 停下来,等待其他线程 B ,线程 B 满足了某个条件后通知(notify)线程 A 继续运行。线程首先获取一个条件变量锁,如果条件不足,则该线程等待(wait)并释放条件变量锁,如果满足就执行线程,也可以通知其他状态为 wait 的线程。其他处于 wait 状态的线程接到通知后会重新判断条件。
import threading
class Boy(threading.Thread):
def __init__(self, cond, name):
super(Boy, self).__init__()
self.cond = cond
self.name = name
def run(self):
self.cond.acquire()
print(self.name + \": 嫁给我吧!?\")
self.cond.notify() # 唤醒一个挂起的线程,让hanmeimei表态
self.cond.wait() # 释放内部所占用的琐,同时线程被挂起,直至接收到通知被唤醒或超时,等待hanmeimei回答
print(self.name + \": 我单下跪,送上戒指!\")
self.cond.notify()
self.cond.wait()
print(self.name + \": Li太太,你的选择太明治了。\")
self.cond.release()
class Girl(threading.Thread):
def __init__(self, cond, name):
super(Girl, self).__init__()
self.cond = cond
self.name = name
def run(self):
self.cond.acquire()
self.cond.wait() # 等待Lilei求婚
print(self.name + \": 没有情调,不够浪漫,不答应\")
self.cond.notify()
self.cond.wait()
print(self.name + \": 好吧,答应你了\")
self.cond.notify()
self.cond.release()
cond = threading.Condition()
boy = Boy(cond, \"LiLei\")
girl = Girl(cond, \"HanMeiMei\")
girl.start()
boy.start()
6线程同步之 Event
事件用于线程间通信。一个线程发出一个信号,其他一个或多个线程等待,调用 event 对象的 wait 方法,线程则会阻塞等待,直到别的线程 set 之后,才会被唤醒。上面求婚哥的例子使用 Event 代码如下:
import threading, time
class Boy(threading.Thread):
def __init__(self, cond, name):
super(Boy, self).__init__()
self.cond = cond
self.name = name
def run(self):
print(self.name + \": 嫁给我吧!?\")
self.cond.set() # 唤醒一个挂起的线程,让hanmeimei表态
time.sleep(0.5)
self.cond.wait()
print(self.name + \": 我单下跪,送上戒指!\")
self.cond.set()
time.sleep(0.5)
self.cond.wait()
self.cond.clear()
print(self.name + \": Li太太,你的选择太明治了。\")
class Girl(threading.Thread):
def __init__(self, cond, name):
super(Girl, self).__init__()
self.cond = cond
self.name = name
def run(self):
self.cond.wait() # 等待Lilei求婚
self.cond.clear()
print(self.name + \": 没有情调,不够浪漫,不答应\")
self.cond.set()
time.sleep(0.5)
self.cond.wait()
print(self.name + \": 好吧,答应你了\")
self.cond.set()
cond = threading.Event()
boy = Boy(cond, \"LiLei\")
girl = Girl(cond, \"HanMeiMei\")
boy.start()
girl.start()
7multiprocessing
Python中线程与进程使用的同一模块 multiprocessing。使用方法也基本相同,唯一不同的是,from multiprocessing import Pool 这样导入的 Pool 表示的是进程池,from multiprocessing.dummy import Pool这样导入的 Pool表示的是线程池。这样就可以实现线程里面的并发了。
线程池实例:
from multiprocessing.dummy import Pool as ThreadPool
import time
def fun(n):
time.sleep(2)
start = time.time()
for i in range(5):
fun(i)
print(\"单线程顺序执行耗时:\", time.time() - start)
start2 = time.time()
# 开8个 worker,没有参数时默认是 cpu 的核心数
pool = ThreadPool(processes=2)
# 在线程中执行 urllib2.urlopen(url) 并返回执行结果
results2 = pool.map(fun, range(5))
pool.close()
pool.join()
print(\"线程池(5)并发执行耗时:\", time.time() - start2)
来源:https://blog.csdn.net/csdn17806812300/article/details/123236956
本站部分图文来源于网络,如有侵权请联系删除。