Python进行异步执行的库有threading(多线程)和multiprocessing(多进程),这两个库为程序提供了丰富的异步操作,但是如果只是进行一些简单的异步执行,并不需要用到多复杂的场景,可以考虑使用concurrent.confutures。它提供一些简单常用的异步执行操作,比如submit和map方法,并且在异步执行完后还可以获取执行对象的返回结果。
一、抽象基类:Executor
这是一个抽象基类,后面会介绍的用于线程异步执行的ThreadPoolExecutor类和用于进程异步执行的ProcessPoolExecutor类都是Executor的子类,Executor抽象类定义了一些常用的异步操作,在进行多线程操作和多进程操作的时候都可以使用这些方法。
1. submit方法
submit(fn, *args, **kwargs):调用可执行对象fn并以fn(*args, **kwargs)的方式执行,返回一个Future对象。
示例:
# max_workers用于指定pool中最大工作对象数量(线程或进程)
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(pow, 323, 1235)
print(future.result()) # result方法可以获取fn的执行结果
2. map方法
map(func, *iterables, timeout=None, chunksize=1):这个函数和Python内置的map函数非常相似,但是有以下两点区别:
- func的执行是异步的,多个func的调用是并发调用。
- iterables参数是立即执行,而不是延迟执行。
参数说明:
- 返回值:是一个代表结果的迭代器,可以遍历这个迭代器以获得结果。
- timeout:指定超时时间,单位为秒。如果map的__next__()已经被调用,并且Executor.map()在timeout指定秒数后返回的结果不可用,则会抛出concurrent.futures.TimeoutError。
- func:如果func在调用过程中抛出了异常,那么在返回的迭代器中获取对应结果时这个异常就会被抛出。
- chunksize:可以用在ProcessPoolExecutor,在ThreadPoolExecutor中这个参数则是无效的。chunksize用来指定pool中任务块的数量,默认只有一个任务块被用来执行(一个任务块也可以并发执行多个func),对于较大的iterables,多指定几个任务块,可以显著提升性能(这个参数在Python3.5才加入)。
示例:
from concurrent.futures import ThreadPoolExecutor
def func(a):
return 6 / a
with ThreadPoolExecutor() as executor:
result = executor.map(func, [3, 2])
for res in result:
print(res)
\'\'\'
打印结果:
2.0
3.0
\'\'\'
4. shutdown
shutdown(wait=True, *, cancel_futures=False):当future对象执行完成后向executor发送一个信号,释放正在使用的所有资源。但是这个函数之后再调用Executor.submit()和Executor.map()的话,则会抛出RuntimeError。
参数说明:
- wait:当wait设置为True时,会一直到所有future对象执行完毕并且executor的所有资源都释放之后此方法才会返回。当wait设置为False时,此方法会立即返回,executor对应的资源则会在所有future对象都执行完毕后才会释放。但是无论wait的值是什么,executor程序会等到所有future对象都执行完毕后才会退出。
- cancel_futures:当这个参数被设置为True时,所有还未开始执行的future都会被取消,但是无论这个参数的值是什么,正在执行或者已执行完毕的future则不会被取消。(cancel_futures在Python3.9中才加入)
- 注:为避免每次都显式调用shutdown方法,可以使用with语法,它会在结束时自动调用shutdown方法。
二、线程间的异步:ThreadPoolExecutor
ThreadPoolExecutor是抽象类Executor的子类,使用一个线程池来执行所有的异步调用。注意,当一个future中调用了另一个future时会导致死锁。
concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix=\'\', initializer=None, initargs=())
- max_workers:指定线程池中的最大线程数。(Python3.5更新:如果没有指定或者为None,则默认与机器上的处理器个数相同。如果线程中侧重于I/O而不是CPU计算,则可以在此基础上乘以5来进行设置。)(Python3.8更新:默认会使用min(32, os.cpu_count() + 4)个线程。)
- thread_name_prefix:参数用于设置线程名称的前缀。(Python3.6新增)
- initializer和initargs:指定一个在每个线程开始前执行的可调用对象initializer,initargs则是传入此对象的参数。(Python3.7新增)
示例:(来自官方文档)
import concurrent.futures
import urllib.request
URLS = [\'http://www.foxnews.com/\',
\'http://www.cnn.com/\',
\'http://europe.wsj.com/\',
\'http://www.bbc.co.uk/\',
\'http://some-made-up-domain.com/\']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
# as_completed返回一个包含future对象的迭代器,但future对象只有执行完毕或被取消才能返回
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print(\'%r generated an exception: %s\' % (url, exc))
else:
print(\'%r page is %d bytes\' % (url, len(data)))
三、进程间的异步:ProcessPoolExecutor
ProcessPoolExecutor是抽象Executor的一个子类,使用一个进程池来执行所有的异步调用。它会使用multiprocessing模块,并且会绕过GIL(全局解释锁),但这也意味着只能执行和返回可picklable的对象。注意:在子进程中再次调用另外的Executor对象或Future对象将会导致死锁。
concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())
- max_workers:该参数用于指定进程池中的最大进程数,如果没有指定或者设置为None,则默认会使用与机器的处理器数量相同的数。如果参数小于或等于0,则会抛出ValueError。在Windows系统上,最大线程数必须小于等于61,否则抛出ValueError,如果没有指定,则默认为61(哪怕机器的处理数量大于61)。
- mp_context:此参数可以为None或者multiprocessing模块中的context对象,它将会用于启动进程,如果没有指定或者为None,则默认使用multiprocessing模块中的context对象。(Python3.7更新:mp_context参数允许控制进程池中进程的start_method方法)
- initializer和initargs:指定一个在每个进程开始前执行的可调用对象initializer,initargs则是传入此对象的参数。
- Python3.3更新:如果一个进程被突然终止,则会立即抛出BrokenProcessPool。
示例:(来自官方文档)
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print(\'%d is prime: %s\' % (number, prime))
if __name__ == \'__main__\':
main()
四、Future对象
concurrent.futures.Future:Future对象是对异步执行的封装,由Executor.submit()创建。
方法说明:
- cancel():会尝试取消调用。如果调用正在执行或者已执行完毕,则该调用不会被取消,并会返回False。否则会取消该调用,并返回True。
- cancelled():如果调用成功取消则返回True。
- running():如果调用已经开始执行,并且无法被取消,则返回True。
- done():如果指调用被成功取消,或者已执行完毕,则返回True。
- result(timeout=None):返回调用的执行结果。如果调用还没有执行完毕,则会等待指定timeout秒数,如果在timeout时间内没有执行完毕,则会抛出concurrent.futures.TimeoutError。如果timeout没有指定或者为None,则会一直等待。如果Future执行完成之前被取消了,则会抛出CancelledError。如果调用抛出了异常,则此方法也会抛出同样的异常。
- exception(timeout=None):返回调用抛出的异常,如果没有发生异常,则返回None。
- add_done_callback(fn):当future被取消或者执行完毕时fn会被立即调用,并且该future会作为fn的唯一参数被传入。当调用中加入多个fn时,则这些fn会以调用add_done_callback方法的调用顺序执行。
以下的一些方法只是用于单元测试和Executor的实现:
- set_running_or_notify_cancel():这个方法只能由Executor的实现或者单元测试在Future对象执行之前调用。如果这个方法返回False,则表示Future对象已被取消,即Future.cancel()已被调用并返回True,即“notify_cancel”生效。如果这个方法返回True,则Future对象不会被取消并且会将它变成正在运行的状态,也就是说调用Future.running()时将返回True,即“running”生效。这个方法只能被调用一次,并且不能在调用Future.set_result()或Future.set_exception()之后进行调用。
- set_result(result):设置Future调用的返回结果。这个方法只能由Executor的实现和单元测试使用。(Python3.8更新:如果Future已经执行完毕,则会抛出concurrent.futures.InvalidStateError)
- set_exception(exception):将exception异常设置为Future调用的返回结果。这个方法只能由Executor的实现和单元测试使用。(Python3.8更新:如果Future已经执行完毕,则会抛出concurrent.futures.InvalidStateError)
五、模块中的其他函数
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)
等待由fs指定的所有future对象(可能是由不同的Executor创建)执行完毕。返回一个由集合构成的二元组,第一个集合为done,包含了所有完成或者已被取消的future对象,第二个集合为not_done,包含了所有的待定或者正在运行的future对象。
- timeout:用来指定最大等待时间(秒),如果没有指定或者为None,则表示会无限制等待下去。
- return_when:表示这个函数返回的时机,且只能是以下常量之一:
- FIRST_COMPLETED:当有任何一个future对象执行完毕或者被取消则返回。
- FIRST_EXCEPTION:当有任何一个future对象执行过程中发生了异常则返回,如果没有future发生异常,则相当于ALL_COMPLETED。
- ALL_COMPLETED:当所有future对象都执行完毕或者被取消则返回。
concurrent.futures.as_completed(fs, timeout=None)
返回一个由fs指定的future实例的迭代器,当这些future实例执行完毕或者被取消时就会生成对应的future对象。fs指定的future实例如果存在重复的,则这些重复的实例只会返回一次。并且在调用该方法前就执行完成的future实例会被优先返回。如果在timeout指定的时间内还有future没有开始执行(调用__next__()方法),则会抛出concurrent.futures.TimeoutError,如果timeout没有指定,则会一直等待下去。
六、Exception类
concurrent.futures.CancelledError:当一个future对象被取消的时候抛出。
concurrent.futures.TimeoutError:当一个future对象的执行时间超出了timeout指定的时间时抛出。
concurrent.futures.BrokenExecutor:派生自RuntimeError,当executor因某些原因被中断,导致不能submit或者不能执行一个新的任务时会抛出。
concurrent.futures.InvalidStateError:当某个操作在一个当前状态不允许的future上执行时抛出。
concurrent.futures.thread.BrokenThreadPool:派生自BrokenExecutor,当ThreadPoolExecutor中的一个executor初始化失败时抛出。
concurrent.futures.process.BrokenProcessPool:派生自BrokenExecutor(以前的名称为RuntimeError),当ProcessPoolExecutor中一个executor被非正常终止时(如被外部操作杀死时)抛出。
本文大多是从官方文档直接翻过来的:https://docs.python.org/3/library/concurrent.futures.html
来源:https://www.cnblogs.com/guyuyun/p/14489123.html
图文来源于网络,如有侵权请联系删除。