Python多任务:多线程和多进程

python的多任务其实用了很久了,因为刚开始写代码的时候总是看网上说高并发、异步之类的,就觉得很高大上,所以刻意地去学过,后来在实际开发工作有过为了使用而使用,也有过真正因为性能问题而必须要使用。今天想把目前掌握的一些内容记录下来。

其实应该介绍一下网上流传甚广的“Python速度慢”和GIL,但是这两个话题在网上有非常多的文章讨论过,就不想再多写了。

Python多任务其实有多线程、多进程和协程三种实现方法,但是协程一般只在性能要求特别高的情况下使用,并且在实现上相对于多线程和多进程要复杂一些,所以不在这里写,以后单独为协程写一篇笔记。

多线程和多进程的适用场景

一句话总结就是:多线程适用于IO密集型的代码,多进程适用于CPU密集型的代码。

所谓IO密集型,就是代码中涉及的大量的磁盘、网络、数据库等数据交互。例如爬虫,涉及到大量的网络请求和磁盘读写操作,再比如远程数据库读写,也涉及到网络请求和磁盘读写, 还有就是内存中的内容写入到本地磁盘。

所谓CPU密集型,是指代码会进行大量的计算而导致占用大量CPU,比如像AI的算法(计算量大到CPU都不够,必须使用GPU了),或者是计算一个很大的数是不是素数(这是后面的一个例子)。

多线程

目标函数

在实际编写多线程之前,要先编写一个函数作为多任务的目标函数。在这里我以一个爬虫函数作为目标函数。

我在代码中使用cnblog作为爬取对象,cnblog是一个很不错的博客站点,代码仅用于展示功能,如果有读者想要尝试运行代码的话,希望不要过于频繁爬取他们的网站,以免给他们带来过大的请求负担。

以下代码写在blog_spider.py中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
"""
爬取cnblog首页的的信息
"""
import requests
from bs4 import BeautifulSoup


# 定义需要爬取的url
urls = [f"https://www.cnblogs.com/sitehome/p/{page}" for page in range(1, 51)]


def craw(url):
"""爬取指定url的信息"""
content = requests.get(url).text

return content


def parse(html):
"""对给定的html进行解析"""
soup = BeautifulSoup(html, "html.parser")
links = soup.find_all("a", class_="post-item-title")
return [(link["href"], link.get_text()) for link in links]


多线程的简单实现

下面的代码中用单线程和多线程分别进行爬虫,对比耗时以观测性能上的差距
以下代码写在multi_thread.py中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
"""
对比多线程和单线程在爬虫上的效率
"""
import time
import threading
from blog_spider import urls, craw


def single_thread():
"""
单线程爬虫
"""
for url in urls:
craw(url)


def multi_thread():
"""
多线程爬虫
"""
threads = []
for url in urls:
# target是目标函数,args是目标函数的参数所组成的一个元组,
threads.append(
threading.Thread(target=craw, args=(url,))
)

# 开始线程任务
for thread in threads:
thread.start()

# 阻塞主线程,直到所有的线程多执行完成
for thread in threads:
thread.join()


if __name__ == '__main__':
start = time.time()
single_thread()
end = time.time()
print("单线程耗时:%s s" % (end - start))

start = time.time()
multi_thread()
end = time.time()
print("多线程耗时:%s s" % (end - start))
```

上面的代码中,thread.join()的作用是阻塞主线程,这样可以使用所有的子线程都运行完成后才结束主线程,避免当子线程还在执行的时候但是由于主线程的结束而被迫终止。

## 多线程中的资源竞争和线程锁
在使用多线程的时候经常会遇到资源竞争的问题,比如当多个子线程同时对一个变量进行计算,如果不加控制,最终的结果很可能就不是预期的。

下面以对一个数字进行大数量级次数的累加为例讲解这个问题。

其实逻辑很简单,将number初始化为0,然后for循环一百万次,每次对number执行+1操作,然后用两个子线程同时进行一样的操作,我们所预期的结果最终number应该等于2000000,但是由于资源竞争问题,所以不用线程锁(也叫互斥锁)加以控制,几乎不会得到正确的答案。

下面的样例代码中同时写了有互斥锁和没有互斥锁两种函数

以下代码写在multi_thread_lock.py中
```python
"""
对一个数字进行多次累加,可以观察到在多线程情况下,
如果不加互斥锁,可能会出现脏数据,
plus_with_lock是加了互斥锁的,
plus_without_lock是没有互斥锁的
"""
import threading
from concurrent.futures import ThreadPoolExecutor


number_with_lock = 0
number_without_lock = 0
lock = threading.Lock()


def plus_with_lock():
global number_with_lock
with lock:
for _ in range(1000000):
number_with_lock += 1


def plus_without_lock():
global number_without_lock
for _ in range(1000000):
number_without_lock += 1


if __name__ == '__main__':
t1 = threading.Thread(target=plus_with_lock,)
t2 = threading.Thread(target=plus_with_lock,)
t1.start()
t2.start()
t1.join()
t2.join()
print(number_with_lock)

t3 = threading.Thread(target=plus_without_lock,)
t4 = threading.Thread(target=plus_without_lock,)
t3.start()
t4.start()
t3.join()
t4.join()
print(number_without_lock)

线程池

根据我个人的实际使用经验来看,在应用多线程的时候,大部分情况下都是使用线程池,而不是像前面的两个案例那样手工控制每个线程的行为。使用线程池有两个好处:

  1. 降低性能消耗
    创建线程这个动作会消耗一定的资源,像上面那样每次需要的时候都创建一个新的子线程,如果创建很多个子线程的话对性能有一定的影响
  2. 代码简单
    线程池在代码实现上相对简单一点

下面是一个以爬虫为目标函数的线程池案例

以下代码写在multi_thread_pool.py中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from blog_spider import craw, parse, urls
from concurrent.futures import ThreadPoolExecutor
import concurrent
import time


start = time.time()
with ThreadPoolExecutor(max_workers=5) as executer:
htmls = executer.map(craw, urls)
# map 方法
url_html_maps = list(zip(urls, htmls))
for url, html in url_html_maps:
print(url)
print(len(html))
end = time.time()
print("多线程爬虫耗时:%s " % (end - start))

with ThreadPoolExecutor(max_workers=5) as executer:
fs = {}
for url, html in url_html_maps:
future = executer.submit(parse, html)
fs[future] = url

for future in concurrent.futures.as_completed(fs):
# as_completed的作用是当fs中有任何一个future完成的时候会先返回,而不是顺序等待
# https://blog.csdn.net/panguangyuu/article/details/105335900
url = fs[future]
print(url, future.result())

```

在上面的代码中可以看出来,我比较喜欢配合with(上下文管理器)来使用线/进程池,因为这样不用手工管理创建和关闭线/进程池,代码更简单。

可以看到,ThreadPoolExecutor有map和submit两种运行子线程,map在代码上简单一些,适合提交线程后不用再对其进行操作和管理的情况,submit适合在线程提交后还要对其进行操作和管理的操作。个人感觉可以优先考虑使用map方法,如果map不能满足需求在考虑使用submit。

## 多线程的回调函数
ThreadPoolExecuter还有一个add_done_callback方法也是非常有用的,他可以为进程添加一个回调函数,当线程执行完成后可以触发这个回调函数,比如可以用于发送邮件、钉钉等消息通知。

这里做一个简单的示例
```python
from blog_spider import craw, parse, urls
from concurrent.futures import ThreadPoolExecutor
import concurrent
import time


def notify():
"""
模拟一个消息通知函数
"""
pass



with ThreadPoolExecutor(max_workers=5) as executer:
fs = {}
for url, html in url_html_maps:
future = executer.submit(parse, html)
fs[future] = url

for future in concurrent.futures.as_completed(fs):
future.add_done_callback(notify)

多进程

多进程和多线程在代码实现上是非常类似的,我正常也是配合with使用进程池,而不是手动控制每一个进程的创建和运行,所以只将进程池的用法。

下面的代码内容比较简单,也有详细的注释,就不多解释,只说两点:

  1. 代码中同时对比了单线程、多线程和多进程在CPU消耗型的场景下性能对比
  2. 代码中调用了一个对代码执行时间计时的计数器,起代码如下
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    """
    可以为函数计时的装饰器
    """
    import time


    def func_timer(function):
    """
    :param function: function that will be timed
    :return: duration
    """

    def function_timer(*args, **kwargs):
    t0 = time.time()
    result = function(*args, **kwargs)
    t1 = time.time()
    print(
    "[Function: {name} finished, spent time: {time:.4f}s]".format(
    name=function.__name__, time=t1 - t0
    )
    )
    return result

    return function_timer

    以下代码写在multi_process_pool.py中
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    """
    计算一个大数是不是一个素数,
    这是一个CPU消耗型的代码,更适合多进程,
    这段代码会对比单线程、多线程和多进程的性能区别
    """
    """
    计算一个大数是不是一个素数,
    这是一个CPU消耗型的代码,更适合多进程,
    这段代码会对比单线程、多线程和多进程的性能区别
    """
    import math
    from concurrent.futures import ThreadPoolExecutor
    from concurrent.futures import ProcessPoolExecutor
    from utils.function_timer import func_timer


    def is_prime(n):
    """
    判断一个数是不是素数,
    n 要能走完所有的逻辑,这样才能消耗大量的CPU,
    如果从中间某一步就结束的话,后面三中情况的对比结果可能就不是预期的那样
    """
    if n < 2:
    return False
    if n == 2:
    return True
    if n % 2 == 0:
    return False
    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
    if n % i == 0:
    return False
    return True


    @func_timer
    def single_thread(numbers):
    for number in numbers:
    is_prime(number)


    @func_timer
    def multi_thread(numbers):
    with ThreadPoolExecutor(max_workers=10) as executer:
    executer.map(is_prime, numbers)


    @func_timer
    def multi_process(numbers):
    with ProcessPoolExecutor(max_workers=10) as executer:
    executer.map(is_prime, numbers)


    if __name__ == '__main__':

    numbers_1 = [112272535095293] * 50 # 这个数会导致代码消耗大量CPU
    numbers_2 = [112272535095290] * 50 # 这个数不是素数,在判断过程中就退出了,不会消耗大量CPU

    single_thread(numbers_1)
    multi_thread(numbers_1)
    multi_process(numbers_1)

    # 以下代码说明多进程只有在CPU消耗型的情况下才有优势
    single_thread(numbers_2)
    multi_thread(numbers_2)
    multi_process(numbers_2)

一些有用的文档

在学习Python多任务的过程中找到了一些个人感觉很不错的文档,而且这篇笔记中有一些细节的捏没有写,比如thread.join那一块就写的很简单,其实背后的知识点是守护线程,所以再此做一个分享

  1. C编程网的《Python编发编程》
    这个教程非常详细的介绍了关于python多任务的细节和案例,十分推荐
  2. Python的官方文档
    毕竟所有的其他文档都来源这里
  3. 廖雪峰的Python教程——进程和线程
    这里面关于进程和线程本身的解释比较好,但是关于Python中的多人介绍有点旧了
  4. 刘江的Python教程——多线程和多进程
    示例代码比较好,对于各种常见的方法也有比较详细的解释