跳转到内容

Python/Threading

维基教科书,自由的教学读本

Python 中的线程用于同时运行多个线程(任务、函数调用)。请注意,这并不意味着它们在不同的 CPU 上执行。如果程序已经使用了 100% 的 CPU 时间,Python 线程将不会让您的程序运行得更快。在这种情况下,您可能需要研究多进程并行编程。如果您对使用 Python 进行并行编程感兴趣,请参阅 此处

Python 线程用于执行任务需要等待的情况。一个例子是与另一台计算机上托管的服务(如 Web 服务器)交互。线程允许 Python 在等待时执行其他代码;这可以通过 sleep 函数轻松模拟。

Python 3已经停用了thread模块,并改名为 _thread 模块。Python 3在_thread 模块的基础上开发了更高级的 threading 模块。threading 模块除了包含 _thread 模块中的所有方法外,还提供的其他方法,常用的方法如下:

  • threading.current_thread() 返回当前线程的信息
  • threading.enumerate() 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
  • threading.active_count() 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果

GIL

[编辑]

GIL(Global Interpreter Lock)是 CPython 解释器进程级的互斥锁,它让“同一时刻只有一个线程在执行 Python 字节码”,从而保证解释器内部对象的安全,但也使多线程无法真正并行(CPU 密集任务无效),I/O 密集任务仍可并发。

一个进程一把锁,嵌在 ceval.c 的字节码执行循环入口;任何线程想执行 Python 字节码,必须先抢 GIL;因此同一进程内多个线程在 CPython 层面永远串行。

为什么当初要加 GIL?(1992 年引入)早期大量全局状态(引用计数、内存管理、对象头)无锁保护;单锁方案实现简单、性能高(CPU单核单线程时代);避免细粒度锁带来的复杂度和死锁风险。

线程切换机制(默认时间片 + 信号量)

  • 时间片:默认 5 ms(sys.setswitchinterval,可调);
  • I/O 阻塞:内部 lock->wait() 前主动 drop_gil();
  • 线程抢锁优先级:等待时间最长者优先,防止饥饿。

能否去掉 GIL?理论可行,实现极难:需给所有对象头、引用计数、容器、异常体系加细粒度锁或无锁算法;

PEP 703(“No-GIL”)2023 年合并到主分支,Python 3.12 实验性构建可用,默认仍保留 GIL,预计 3.13/3.14 才提供生产级开关。

创建线程

[编辑]

直接创建线程

[编辑]
threading.Thread(target=None, name=None, args=(), kwargs={})

target 指要创建的线程的方法名,name 指给此线程命名,命名后可以调用 threading.current_thread().name 方法输出该线程的名字, args/kwargs 指 target 指向的方法需要传递的参数,必须是元组形式,如果只有一个参数,需要以添加逗号。

创建一个线程,打印 1-10 的数字,并在每次打印之间等待一秒钟:

import threading
import time

def loop1_10():
    for i in range(1, 11):
        time.sleep(1)
        print(i)

threading.Thread(target=loop1_10).start()

类的继承创建线程

[编辑]

通过直接从 threading.Thread 继承创建一个新的子类。只能重载基类的run方法和_init__构造器。实例化后调用 start() 方法启动新线程,即相当于它调用了线程的 run() 方法。

#!/usr/bin/env python

import threading
import time


class MyThread(threading.Thread):
    def run(self):                                         # Default called function with mythread.start()
        print("{} started!".format(self.getName()))        # "Thread-x started!"
        time.sleep(1)                                      # Pretend to work for a second
        print("{} finished!".format(self.getName()))       # "Thread-x finished!"

def main():
    for x in range(4):                                     # Four times...
        mythread = MyThread(name = "Thread-{}".format(x))  # ...Instantiate a thread and pass a unique ID to it
        mythread.start()                                   # ...Start the thread, run method will be invoked
        time.sleep(.9)                                     # ...Wait 0.9 seconds before starting another

if __name__ == '__main__':
    main()

输出为:

Thread-0 started!
Thread-1 started!
Thread-0 finished!
Thread-2 started!
Thread-1 finished!
Thread-3 started!
Thread-2 finished!
Thread-3 finished!

守护线程

[编辑]

如果当前python线程是守护线程,那么意味着这个线程是“不重要”的,“不重要”意味着如果他的主线程结束了但该守护线程没有运行完,守护线程就会被强制结束。如果线程是非守护线程,那么父进程只有等到非守护线程运行完毕后才能结束。

守护daemon这里的含义是替主线程打杂,主线程一旦退出,它立即被抛弃;“后台随时可扔的仆从”。

只要当前主线程中尚存任何一个非守护线程没有结束,守护线程就全部工作;只有当最后一个非守护线程结束是,守护线程随着主线程一同结束工作。

import threading
import time

# 每1秒加1
def job1(num):
    while True:
        num += 1
        print('{} is running >> {}'.format(threading.current_thread().name, num))
        time.sleep(1)

# 每2秒加2
def job2(num):
    while True:
        num += 2
        print('{} is running >> {}'.format(threading.current_thread().name, num))
        time.sleep(2)

# 线程1,一秒加一
new_job1 = threading.Thread(target=job1, name='Add1', args=(100,))
# 设置为守护线程
new_job1.setDaemon(True)
new_job1.start()

# 线程2,两秒加二
new_job2 = threading.Thread(target=job2, name='Add2', args=(1,))
new_job2.setDaemon(True)
new_job2.start()

# 主线程等待9秒
time.sleep(9)
print('{} Ending'.format(threading.current_thread().name))

随着输出 MainThread Ending 后,程序就运行结束了,这表明子线程全为守护线程时,会随着主线程的结束而强制结束。

线程的阻塞会合

[编辑]

join([timeout])方法会使线程进入等待状态(阻塞),直到调用join()方法的子线程运行结束。同时也可以通过设置 timeout 参数来设定等待的时间。

线程并发控制

[编辑]

互斥锁(Lock)

[编辑]

互斥锁只能加锁一次然后释放一次。

import threading
import time


num = 0

lock = threading.Lock()

def job1():
    global num
    for i in range(1000000):
        lock.acquire() # 加锁
        num += 1
        lock.release() # 释放锁
        # 上述代码也可以直接写为
        # with lock:
        # 	 num += 1

new_job1 = threading.Thread(target=job1, name='Add1')
new_job1.start()

for i in range(1000000):
    lock.acquire() # 加锁
    num += 2
    lock.release() # 释放锁

# 等待线程执行完毕
time.sleep(3)
print('num = {}'.format(num))

递归锁

[编辑]

递归锁(Rlock)允许多层加锁、释放锁:

import threading, time


def run1():
    lock.acquire()
    print("grab the first part data")
    global num
    num += 1
    lock.release()
    return num


def run2():
    lock.acquire()
    print("grab the second part data")
    global num2
    num2 += 1
    lock.release()
    return num2


def run3():
    lock.acquire()
    res = run1()
    print('--------between run1 and run2-----')
    res2 = run2()
    lock.release()
    print(res, res2)


if __name__ == '__main__':
    num, num2 = 0, 0
    lock = threading.RLock()
    for i in range(3):
        t = threading.Thread(target=run3)
        t.start()

while threading.active_count() != 1:
    print(threading.active_count())
else:
    print('----all threads done---')
    print(num, num2)

信号量

[编辑]
import threading
import time

# 设置信号量,即同时执行的线程数为3
lock = threading.BoundedSemaphore(3)

def job1():
    lock.acquire()
    print('{} is coming, {}'.format(threading.current_thread().name, time.strftime('%H:%M:%S',time.localtime(time.time()))))
    time.sleep(3)
    lock.release()

for i in range(10):
    new_job1 = threading.Thread(target=job1, name='Thread{}'.format(i))
    new_job1.start()

事件

[编辑]

threading.Event()类会在全局定义一个Flag,当 Flag=False 时,调用 wait()方法会阻塞所有线程;而当 Flag=True 时,调用 wait() 方法不再阻塞。形象的比喻就是“红绿灯”:在红灯时阻塞所有线程,而在绿灯时又会一次性放行所有排队中的线程。Event类有四个方法:

  • set() 将Flag设置为True
  • wait() 阻塞所有线程
  • clear() 将Flag设置为False
  • is_set() 返回bool值,判断Flag是否为True

例如,主线程通知所有子线程可以开工:

import threading
import time

evt = threading.Event()          # 默认 False

def worker():
    print("worker: 等待信号...")
    evt.wait()                   # 阻塞
    print("worker: 收到信号,开始干活")

t = threading.Thread(target=worker)
t.start()

time.sleep(2)
print("main: 发信号")
evt.set()                        # 置位并唤醒等待线程
t.join()

队列

[编辑]

queue模块是Python内置的标准模块,模块实现了三种类型的队列,它们的区别仅仅是条目取回的顺序,分别对应3个类:Queue,LifoQueue,PriorityQueue。共同的成员方法:

  • Queue.qsize()返回队列的大致大小。
  • Queue.empty()如果队列为空,返回 True ,否则返回 False 。
  • Queue.full()如果队列是满的返回 True ,否则返回 False 。
  • Queue.put(item, block=True, timeout=None) 将 item 放入队列。如果可选参数 block 是 true 并且 timeout 是 None (默认),则在必要时阻塞至有空闲插槽可用。如果 timeout 是个正数,将最多阻塞 timeout 秒,如果在这段时间没有可用的空闲插槽,将引发 Full 异常。反之 (block 是 false),如果空闲插槽立即可用,则把 item 放入队列,否则引发 Full 异常 ( 在这种情况下,timeout 将被忽略)。
  • Queue.put_nowait(item)相当于 put(item, block=False)。
  • Queue.get(block=True, timeout=None)从队列中移除并返回一个项目。如果可选参数 block 是 true 并且 timeout 是 None (默认值),则在必要时阻塞至项目可得到。如果 timeout 是个正数,将最多阻塞 timeout 秒,如果在这段时间内项目不能得到,将引发 Empty 异常。反之 (block 是 false) , 如果一个项目立即可得到,则返回一个项目,否则引发 Empty 异常 (这种情况下,timeout 将被忽略)。
  • Queue.get_nowait()相当于 get(block=False) 。
  • Queue.task_done()在完成一项工作以后,task_done()告诉队列,该任务已处理完成。注意,从业务逻辑上,获取数据get()不等价于该任务已经处理完成。
  • Queue.join()阻塞至队列中所有的元素都被接收put()和处理完毕task_done()。队列添加新工作时,未完成任务的计数就会增一,当调用task_done()函数后,就代表执行完一个工作,未完成任务的计数就会减一,当计数为0时 join() 阻塞被解除。

先进先出队列 queue.Queue(maxsize=0)

maxsize 是个整数,用于设置可以放入队列中的项目数的上限。当达到这个大小的时候,插入操作将被阻塞,直至队列中的项目被消费掉。如果 maxsize 小于等于零,队列尺寸为无限大。

from queue import Queue
# FIFO
queue_obj = Queue()  # 创建一个队列对象
for i in range(4):
    queue_obj.put(i)
while not queue_obj.empty():
    print(queue_obj.get())

后进先出队列即栈 queue.LifoQueue(maxsize=0)

from queue import Queue,LifoQueue
# LIFO
queue_obj = LifoQueue()  # 创建一个队列对象
for i in range(4):
    queue_obj.put(i)
while not queue_obj.empty():
    print(queue_obj.get())

优先级队列queue.PriorityQueue(maxsize=0),按照级别顺序取出元素,级别最低的最先取出。队列中的元素一般采取元组(priority_number, data)的形式进行存储

  • 优先级不同,可以比较大小
  • 优先级一样,数据部分可以比较大小
  • 优先级一样,数据部分不可以比较大小则报错 ypeError: '<' not supported between instances of 'dict' and 'dict'

优先级队列本质上是元素可以全序(total order)比较,内部用堆(heap)实现。所以,可以自己实现一个类作为优先级队列的元素的类型,将数据包装到类中,在类中自定义或重写 def __lt__(self, other)魔法方法。

SimpleQueue queue.SimpleQueue是Python 3.7版本新增的一个轻量级FIFO队列类,提供了线程安全的简单队列实现。与标准的queue.Queue相比,它功能更简单,但性能更好,并且具有重入性安全特性。主要特性

  • 无界队列:SimpleQueue是无界的,不支持设置最大容量(没有maxsize参数)
  • 线程安全:支持多生产者、多消费者场景
  • 重入性安全:可以在同一线程中被安全地中断调用(例如同一线程中signal_handler函数读写队列),适合在析构函数、weakref回调或信号处理器中使用
  • 高性能:相比Queue,实现更简单,性能更好
  • 内存效率:比Queue更节省内存

内存屏障Barrier

[编辑]

threading.Barrier(parties, action=None, timeout=None)

条件锁

[编辑]
cond = threading.Condition()
with cond:            # 必须先拿底层锁
    cond.wait()        # 挂起直到被 notify(会暂时放锁)
    cond.wait_for(谓词) # 挂起直到谓词为真,自带重试
    cond.notify(n=1)   # 唤醒 n 个等待线程
    cond.notify_all()  # 唤醒全部

线程局部存储

[编辑]
import threading

# 创建全局ThreadLocal对象
local_data = threading.local()
def add():
	# 取出ThreadLocal中的数据
    n = local_data.num
    local_data.num_add = n + n

def divid():
    n = local_data.num
    local_data.num_divid = n / 2

def times():
    local_data.result = local_data.num_add * local_data.num_divid

def job1(num):
	# 将数据存入ThreadLocal中
    local_data.num = num
    add()
    divid()
    times()
    print('{} result >> {}'.format(threading.current_thread().name, local_data.result))

for i in range(5):
    t = threading.Thread(target=job1, args=(i,), name='Thread{}'.format(i))
    t.start()

线程池

[编辑]

concurrent.futures.ThreadPoolExecutor

[编辑]

python3新引入的库concurrent.futures.ThreadPoolExecutor可以并行执行多个线程,适用于 I/O 密集型任务。

concurrent.futures.ThreadPoolExecutor类的常用方法:

  • map(func, *iterables, timeout=None, chunksize=1):并行执行一个函数对多个输入迭代器进行映射。使用 map 方法,有两个特点:无需提前使用submit方法;返回结果的顺序和元素的顺序相同,即使子线程先返回也不会获取结果.
  • shutdown(wait=True):停止池的操作,并等待所有提交的任务完成(wait=True)或立即返回(wait=False)。
  • submit(fn, *args, **kwargs) 在线程池中提交任务。返回一个 concurrent.futures.Future 对象,用于表示异步操作的结果。

示例:

from concurrent.futures import ThreadPoolExecutor

def task(n):
    return n * n

with ThreadPoolExecutor(max_workers=4) as executor:
    future = executor.submit(task, 2)
    print(future.result())  # 输出: 4

concurrent.futures.Future

[编辑]
  • concurrent.futures.Future 是一个表示异步操作结果的对象,它提供了一种机制来检查异步操作是否完成、获取结果以及处理可能出现的异常。Future 对象是 concurrent.futures 模块中的核心组件,主要用于线程池和进程池中的异步任务管理。
  • cancel():尝试取消这个任务。如果任务已经完成或已经被取消,返回 False。如果任务尚未执行,则任务会被取消,返回 True。
  • cancelled():检查这个任务是否已经被取消。
  • done():检查任务是否已经完成(无论是成功还是失败)。
  • result(timeout=None): 输出对应的线程运行后方法的返回结果,如果线程还在运行,那么其会一直阻塞在那里,直到该线程运行完,当然,也可以设置
  • result(timeout),即如果调用还没完成那么这个方法将等待 timeout 秒。如果在 timeout 秒内没有执行完成,concurrent.futures.TimeoutError 将会被触发。
  • exception(timeout=None):获取任务抛出的异常。如果任务成功完成,则返回 None。如果任务抛出异常,exception() 方法会返回该异常。如果指定了 timeout 参数并且任务还未完成,则会阻塞至超时或任务完成。
  • add_done_callback(fn):在任务完成时调用 fn 回调函数。fn 必须是一个可调用对象,接收一个 Future 对象作为参数示例:
  • remove_done_callback(fn):从回调函数列表中移除 fn。只有在回调函数还没有执行时,这个方法才有效
from concurrent.futures import ThreadPoolExecutor

def task(n):
    return n * n

with ThreadPoolExecutor() as executor:
    future = executor.submit(task, 2)
    print(future.result())  # 输出: 4

as_completed

[编辑]
concurrent.futures.as_completed(fs, timeout=None)

返回一个包含fs所指定的Future实例的迭代器。在没有任务完成的时候,会一直阻塞;如果设置了 timeout 参数,timeout 秒之后结果仍不可用,则返回的迭代器将引发concurrent.futures.TimeoutError。如果timeout未指定或为 None,则不限制等待时间。当有某个任务完成的时候,该方法会 yield 这个任务,就能执行 for循环体的语句,然后继续阻塞住,循环到所有的任务结束。先完成的任务会先返回给主线程。

import time
from concurrent.futures import ThreadPoolExecutor, as_completed

def add(a, b):
    time.sleep(3)
    return a + b

task = []
list_a = [1, 2, 3, 4, 5]
list_b = [6, 7, 8, 9, 10]
with ThreadPoolExecutor(2) as pool:
    for i in range(len(list_a)):
        task.append(pool.submit(add, list_a[i], list_b[i]))
	
	# 使用as_completed遍历
    for i in as_completed(task):
        print('result = {}'.format(i.result()))

该方法与第一种的直接遍历所具有的优势是,不需要等待所有线程全部返回,而是每返回一个子线程就能够处理,上面的result方法会阻塞后面的线程。

wait方法

[编辑]
wait(fs, timeout=None, return_when=ALL_COMPLETED)

fs 为指定的 Future 实例,timeout 可以用来控制返回前最大的等待秒数。 timeout 可以为 int 或 float 类型。 如果 timeout 未指定或为 None ,则不限制等待时间。return_when 指定此函数应在何时返回必须为以下常数之一:

  • FIRST_COMPLETED 等待第一个线程结束时返回,即结束等待
  • FIRST_EXCEPTION 函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于 ALL_COMPLETED
  • ALL_COMPLETED 函数将在所有可等待对象结束或取消时返回