百木园-与人分享,
就是让自己快乐。

Python内置库:concurrent.confutures(简单异步执行)

Python进行异步执行的库有threading(多线程)和multiprocessing(多进程),这两个库为程序提供了丰富的异步操作,但是如果只是进行一些简单的异步执行,并不需要用到多复杂的场景,可以考虑使用concurrent.confutures。它提供一些简单常用的异步执行操作,比如submit和map方法,并且在异步执行完后还可以获取执行对象的返回结果。

一、抽象基类:Executor

这是一个抽象基类,后面会介绍的用于线程异步执行的ThreadPoolExecutor类和用于进程异步执行的ProcessPoolExecutor类都是Executor的子类,Executor抽象类定义了一些常用的异步操作,在进行多线程操作和多进程操作的时候都可以使用这些方法。

1. submit方法

submit(fn, *args, **kwargs):调用可执行对象fn并以fn(*args, **kwargs)的方式执行,返回一个Future对象。
示例:

# max_workers用于指定pool中最大工作对象数量(线程或进程)
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(pow, 323, 1235)
print(future.result()) # result方法可以获取fn的执行结果

2. map方法

map(func, *iterables, timeout=None, chunksize=1):这个函数和Python内置的map函数非常相似,但是有以下两点区别:

  • func的执行是异步的,多个func的调用是并发调用。
  • iterables参数是立即执行,而不是延迟执行。

参数说明:

  • 返回值:是一个代表结果的迭代器,可以遍历这个迭代器以获得结果。
  • timeout:指定超时时间,单位为秒。如果map的__next__()已经被调用,并且Executor.map()在timeout指定秒数后返回的结果不可用,则会抛出concurrent.futures.TimeoutError。
  • func:如果func在调用过程中抛出了异常,那么在返回的迭代器中获取对应结果时这个异常就会被抛出。
  • chunksize:可以用在ProcessPoolExecutor,在ThreadPoolExecutor中这个参数则是无效的。chunksize用来指定pool中任务块的数量,默认只有一个任务块被用来执行(一个任务块也可以并发执行多个func),对于较大的iterables,多指定几个任务块,可以显著提升性能(这个参数在Python3.5才加入)。

示例:

from concurrent.futures import ThreadPoolExecutor

def func(a):
return 6 / a

with ThreadPoolExecutor() as executor:
result = executor.map(func, [3, 2])
for res in result:
print(res)

\'\'\'
打印结果:
2.0
3.0
\'\'\'

4. shutdown

shutdown(wait=True, *, cancel_futures=False):当future对象执行完成后向executor发送一个信号,释放正在使用的所有资源。但是这个函数之后再调用Executor.submit()和Executor.map()的话,则会抛出RuntimeError。
参数说明:

  • wait:当wait设置为True时,会一直到所有future对象执行完毕并且executor的所有资源都释放之后此方法才会返回。当wait设置为False时,此方法会立即返回,executor对应的资源则会在所有future对象都执行完毕后才会释放。但是无论wait的值是什么,executor程序会等到所有future对象都执行完毕后才会退出。
  • cancel_futures:当这个参数被设置为True时,所有还未开始执行的future都会被取消,但是无论这个参数的值是什么,正在执行或者已执行完毕的future则不会被取消。(cancel_futures在Python3.9中才加入)
  • 注:为避免每次都显式调用shutdown方法,可以使用with语法,它会在结束时自动调用shutdown方法。

二、线程间的异步:ThreadPoolExecutor

ThreadPoolExecutor是抽象类Executor的子类,使用一个线程池来执行所有的异步调用。注意,当一个future中调用了另一个future时会导致死锁。
concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix=\'\', initializer=None, initargs=())

  • max_workers:指定线程池中的最大线程数。(Python3.5更新:如果没有指定或者为None,则默认与机器上的处理器个数相同。如果线程中侧重于I/O而不是CPU计算,则可以在此基础上乘以5来进行设置。)(Python3.8更新:默认会使用min(32, os.cpu_count() + 4)个线程。)
  • thread_name_prefix:参数用于设置线程名称的前缀。(Python3.6新增)
  • initializer和initargs:指定一个在每个线程开始前执行的可调用对象initializer,initargs则是传入此对象的参数。(Python3.7新增)

示例:(来自官方文档)

import concurrent.futures
import urllib.request

URLS = [\'http://www.foxnews.com/\',
\'http://www.cnn.com/\',
\'http://europe.wsj.com/\',
\'http://www.bbc.co.uk/\',
\'http://some-made-up-domain.com/\']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
# as_completed返回一个包含future对象的迭代器,但future对象只有执行完毕或被取消才能返回
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print(\'%r generated an exception: %s\' % (url, exc))
else:
print(\'%r page is %d bytes\' % (url, len(data)))

三、进程间的异步:ProcessPoolExecutor

ProcessPoolExecutor是抽象Executor的一个子类,使用一个进程池来执行所有的异步调用。它会使用multiprocessing模块,并且会绕过GIL(全局解释锁),但这也意味着只能执行和返回可picklable的对象。注意:在子进程中再次调用另外的Executor对象或Future对象将会导致死锁。
concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())

  • max_workers:该参数用于指定进程池中的最大进程数,如果没有指定或者设置为None,则默认会使用与机器的处理器数量相同的数。如果参数小于或等于0,则会抛出ValueError。在Windows系统上,最大线程数必须小于等于61,否则抛出ValueError,如果没有指定,则默认为61(哪怕机器的处理数量大于61)。
  • mp_context:此参数可以为None或者multiprocessing模块中的context对象,它将会用于启动进程,如果没有指定或者为None,则默认使用multiprocessing模块中的context对象。(Python3.7更新:mp_context参数允许控制进程池中进程的start_method方法)
  • initializer和initargs:指定一个在每个进程开始前执行的可调用对象initializer,initargs则是传入此对象的参数。
  • Python3.3更新:如果一个进程被突然终止,则会立即抛出BrokenProcessPool。

示例:(来自官方文档)

import concurrent.futures
import math

PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]

def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False

sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True

def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print(\'%d is prime: %s\' % (number, prime))

if __name__ == \'__main__\':
main()

四、Future对象

concurrent.futures.Future:Future对象是对异步执行的封装,由Executor.submit()创建。
方法说明:

  • cancel():会尝试取消调用。如果调用正在执行或者已执行完毕,则该调用不会被取消,并会返回False。否则会取消该调用,并返回True。
  • cancelled():如果调用成功取消则返回True。
  • running():如果调用已经开始执行,并且无法被取消,则返回True。
  • done():如果指调用被成功取消,或者已执行完毕,则返回True。
  • result(timeout=None):返回调用的执行结果。如果调用还没有执行完毕,则会等待指定timeout秒数,如果在timeout时间内没有执行完毕,则会抛出concurrent.futures.TimeoutError。如果timeout没有指定或者为None,则会一直等待。如果Future执行完成之前被取消了,则会抛出CancelledError。如果调用抛出了异常,则此方法也会抛出同样的异常。
  • exception(timeout=None):返回调用抛出的异常,如果没有发生异常,则返回None。
  • add_done_callback(fn):当future被取消或者执行完毕时fn会被立即调用,并且该future会作为fn的唯一参数被传入。当调用中加入多个fn时,则这些fn会以调用add_done_callback方法的调用顺序执行。

以下的一些方法只是用于单元测试和Executor的实现:

  • set_running_or_notify_cancel():这个方法只能由Executor的实现或者单元测试在Future对象执行之前调用。如果这个方法返回False,则表示Future对象已被取消,即Future.cancel()已被调用并返回True,即“notify_cancel”生效。如果这个方法返回True,则Future对象不会被取消并且会将它变成正在运行的状态,也就是说调用Future.running()时将返回True,即“running”生效。这个方法只能被调用一次,并且不能在调用Future.set_result()或Future.set_exception()之后进行调用。
  • set_result(result):设置Future调用的返回结果。这个方法只能由Executor的实现和单元测试使用。(Python3.8更新:如果Future已经执行完毕,则会抛出concurrent.futures.InvalidStateError)
  • set_exception(exception):将exception异常设置为Future调用的返回结果。这个方法只能由Executor的实现和单元测试使用。(Python3.8更新:如果Future已经执行完毕,则会抛出concurrent.futures.InvalidStateError)

五、模块中的其他函数

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)
等待由fs指定的所有future对象(可能是由不同的Executor创建)执行完毕。返回一个由集合构成的二元组,第一个集合为done,包含了所有完成或者已被取消的future对象,第二个集合为not_done,包含了所有的待定或者正在运行的future对象。

  • timeout:用来指定最大等待时间(秒),如果没有指定或者为None,则表示会无限制等待下去。
  • return_when:表示这个函数返回的时机,且只能是以下常量之一:
    • FIRST_COMPLETED:当有任何一个future对象执行完毕或者被取消则返回。
    • FIRST_EXCEPTION:当有任何一个future对象执行过程中发生了异常则返回,如果没有future发生异常,则相当于ALL_COMPLETED。
    • ALL_COMPLETED:当所有future对象都执行完毕或者被取消则返回。

concurrent.futures.as_completed(fs, timeout=None)
返回一个由fs指定的future实例的迭代器,当这些future实例执行完毕或者被取消时就会生成对应的future对象。fs指定的future实例如果存在重复的,则这些重复的实例只会返回一次。并且在调用该方法前就执行完成的future实例会被优先返回。如果在timeout指定的时间内还有future没有开始执行(调用__next__()方法),则会抛出concurrent.futures.TimeoutError,如果timeout没有指定,则会一直等待下去。

六、Exception类

concurrent.futures.CancelledError:当一个future对象被取消的时候抛出。
concurrent.futures.TimeoutError:当一个future对象的执行时间超出了timeout指定的时间时抛出。
concurrent.futures.BrokenExecutor:派生自RuntimeError,当executor因某些原因被中断,导致不能submit或者不能执行一个新的任务时会抛出。
concurrent.futures.InvalidStateError:当某个操作在一个当前状态不允许的future上执行时抛出。
concurrent.futures.thread.BrokenThreadPool:派生自BrokenExecutor,当ThreadPoolExecutor中的一个executor初始化失败时抛出。
concurrent.futures.process.BrokenProcessPool:派生自BrokenExecutor(以前的名称为RuntimeError),当ProcessPoolExecutor中一个executor被非正常终止时(如被外部操作杀死时)抛出。

本文大多是从官方文档直接翻过来的:https://docs.python.org/3/library/concurrent.futures.html

来源:https://www.cnblogs.com/guyuyun/p/14489123.html
图文来源于网络,如有侵权请联系删除。

未经允许不得转载:百木园 » Python内置库:concurrent.confutures(简单异步执行)

相关推荐

  • 暂无文章