python多线程原理和详解(一)

shell 85 0

python多线程原理和详解

线程概念

1 . 线程是什么?

线程也叫轻量级进程,是操作系统能够进行运算调度的最小单位,它被包涵在进程之中,是进程中的实际运作单位。线程自己不拥有系统资源,只拥有一点儿在运行中必不可少的资源,但它可与同属一个进程的其他线程共享进程所拥有的全部资源。一个线程可以创建和撤销另一个线程,同一个进程中的多个线程之间可以并发执行。

2 . 线程和进程关系

进程就是一个应用程序在处理机上的一次执行过程,它是一个动态的概念,而线程是进程中的一部分,进程包含多个线程在运行。

​ 多线程可以共享全局变量,多进程不能。多线程中,所有子线程的进程号相同;多进程中,不同的子进程进程号不同。

​ 进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调度的一个独立单位。

a. 地址空间,进程内的一个执行单元,进程至少有一个线程,它们共享进程的地址空间,而进程有自己独立的地址空间;
b. 资源拥有:进程是资源分配和拥有的单位,同一个进程内的线程共享进程的资源;
c. 线程是处理器调度的基本单位,但进程不是;
d. 二者均可并发执行;

3 . 线程创建方式

python主要是通过thread和threading这两个模块来实现多线程支持。python的thread模块是比较底层的模块,python的threading模块是对thread做了一些封装,可以更加方便的被使用。但是python(cpython)由于GIL的存在无法使用threading充分利用CPU资源,如果想充分发挥多核CPU的计算能力需要使用multiprocessing模块。

3.1 如何创建线程

python3.x中已经摒弃了Python2.x中采用函数式thread模块中的start_new_thread()函数来产生新线程方式。

​ python3.x中通过threading模块创建新的线程有两种方法:一种是通过threading.Thread(Target=executable Method)-即传递给Thread对象一个可执行方法(或对象);第二种是继承threading.Thread定义子类并重写run()方法。第二种方法中,唯一必须重写的方法是run()

3.1.1 通过threading.Thread进行创建多线程
import threading
import time
def target():
    print("the current threading %s is runing"
       %(threading.current_thread().name))
    time.sleep(1)
    print("the current threading %s is ended"%(threading.current_thread().name))

print("the current threading %s is runing"%(threading.current_thread().name))
## 属于线程t的部分
t = threading.Thread(target=target)
t.start()
## 属于线程t的部分
t.join() # join是阻塞当前线程(此处的当前线程时主线程) 主线程直到Thread-1结束之后才结束
print("the current threading %s is ended"%(threading.current_thread().name))
3.1.2 通过继承threading.Thread定义子类创建多线程

使用Threading模块创建线程,直接从threading.Thread继承,然后重写init方法和run方法:

import threading
import time

class myThread(threading.Thread):  # 继承父类threading.Thread
   def __init__(self, threadID, name, counter):
      threading.Thread.__init__(self)
      self.threadID = threadID
      self.name = name
      self.counter = counter

   def run(self):  # 把要执行的代码写到run函数里面 线程在创建后会直接运行run函数
      print("Starting " + self.name)
      print_time(self.name, self.counter, 5)
      print("Exiting " + self.name)


def print_time(threadName, delay, counter):
   while counter:
      time.sleep(delay)
      print("%s process at: %s" % (threadName, time.ctime(time.time())))
      counter -= 1


# 创建新线程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)

# 开启线程
thread1.start()
thread2.start()

# 等待线程结束
thread1.join()
thread2.join()

print("Exiting Main Thread")

通过以上案例可以知道,thread1和thread2执行顺序是乱序的。要使之有序,需要进行线程同步。下面文章会讲线程间同步。

4 . 并发和并行

并发:在操作系统中,是指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机上运行,但任一个时刻点上只有一个程序在处理机上运行。简言之,是指系统具有处理多个任务的能力。

并行:当系统有一个以上CPU时,则线程的操作有可能非并发。当一个CPU执行一个线程时,另一个CPU可以执行另一个线程,两个线程互不抢占CPU资源,可以同时进行,这种方式我们称之为并行(Parallel)。简言之,是指系统具有同时处理多个任务的能力。

下面我们来两个例子

import threading #线程
import time


def music():
    print('begin to listen music {}'.format(time.ctime()))
    time.sleep(3)
    print('stop to listen music {}'.format(time.ctime()))


def game():
    print('begin to play game {}'.format(time.ctime()))
    time.sleep(5)
    print('stop to play game {}'.format(time.ctime()))


if __name__ == '__main__':
    music()
    game()
    print('ending.....')

 执行结果:
begin to listen music Sun Dec  6 17:43:00 2020
stop to listen music Sun Dec  6 17:43:03 2020
begin to play game Sun Dec  6 17:43:03 2020
stop to play game Sun Dec  6 17:43:08 2020
ending.....

music的时间为3秒,game的时间为5秒,如果按照我们正常的执行,直接执行函数,那么将按顺序顺序执行,整个过程8秒。

import threading #线程
import time


def music():
    print('begin to listen music {}'.format(time.ctime()))
    time.sleep(3)
    print('stop to listen music {}'.format(time.ctime()))


def game():
    print('begin to play game {}'.format(time.ctime()))
    time.sleep(5)
    print('stop to play game {}'.format(time.ctime()))


if __name__ == '__main__':
    t1 = threading.Thread(target=music) #创建一个线程对象t1 子线程
    t2 = threading.Thread(target=game) #创建一个线程对象t2 子线程

    t1.start()
    t2.start()

    # t1.join() #等待子线程执行完 t1不执行完,谁也不准往下走
    t2.join()

    print('ending.......') #主线程
    print(time.ctime())

执行结果:
begin to listen music Sun Dec  6 17:45:13 2020
begin to play game Sun Dec  6 17:45:13 2020
stop to listen music Sun Dec  6 17:45:16 2020
stop to play game Sun Dec  6 17:45:18 2020
ending.......
Sun Dec  6 17:45:18 2020

在这个例子中,我们开了两个线程,将music和game两个函数分别通过线程执行,运行结果显示两个线程同时开始,由于听音乐时间3秒,玩游戏时间5秒,所以整个过程完成时间为5秒。我们发现,通过开启多个线程,原本8秒的时间缩短为5秒,原本顺序执行现在是不是看起来好像是并行执行的?看起来好像是这样,听音乐的同时在玩游戏,整个过程的时间随最长的任务时间变化。但真的是这样吗?那么下面我来提出一个GIL锁的概念。

5 . GIL(全局解释器锁)

无论你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只允许一个线程运行。如下两个case:

case1:

import time
from threading import Thread


def add():
    sum = 0
    i = 1
    while i<=1000000:
        sum += i
        i += 1
    print('sum:',sum)


def mul():
    sum2 = 1
    i = 1
    while i<=100000:
        sum2 = sum2 * i
        i += 1
    print('sum2:',sum2)


start = time.time()

add()
mul() #串行比多线程还快

print('cost time %s'%(time.time()-start))

执行结果:
sum: 500000500000
sum2: 282422940796034....
cost time 4.465214014053345

case2:

import time
from threading import Thread


def add():
    sum = 0
    i = 1
    while i<=1000000:
        sum += i
        i += 1
    print('sum:',sum)


def mul():
    sum2 = 1
    i = 1
    while i<=100000:
        sum2 = sum2 * i
        i += 1
    print('sum2:',sum2)


start = time.time()
t1 = Thread(target=add)
t2 = Thread(target=mul)

l = []
l.append(t1)
l.append(t2)

for t in l:
    t.start()

for t in l:
    t.join()

print('cost time %s'%(time.time()-start))

执行结果:
sum: 500000500000
sum2: 282422940796034....
cost time 4.523313760757446

这是怎么回事,串行执行比多线程还快?不符合常理呀。是不是颠覆了你的人生观,这个就和GIL锁有关,同一时刻,系统只允许一个线程执行,那么,就是说,本质上我们之前理解的多线程的并行是不存在的,那么之前的例子为什么时间确实缩短了呢?这里有涉及到一个任务的类型。

6 . IO和CPU密集型任务

任务:

   1.IO密集型(会有cpu空闲的时间)  注:sleep等同于IO操作, socket通信也是IO   
   2.计算密集型

而之前那个例子恰好是IO密集型的例子,后面这个由于涉及到了加法和乘法,属于计算密集型操作,那么,就产生了一个结论,多线程对于IO密集型任务有作用,而计算密集型任务不推荐使用多线程。而其中我们还可以得到一个结论:由于GIL锁,多线程不可能真正实现并行,所谓的并行也只是宏观上并行微观上并发,本质上是由于遇到io操作不断的cpu切换所造成并行的现象。由于cpu切换速度极快,所以看起来就像是在同时执行。问题:没有利用多核的优势,这就造成了多线程不能同时执行,并且增加了切换的开销,串行的效率可能更高。

线程间同步

线程同步,线程间协同,通过某种技术,让一个线程访问某些数据时,其他线程不能访问这些数据,直到该线程完 成对数据的操作。

如果多个线程共同对某个数据修改,则可能出现不可预料的结果,为了保证数据的正确性,需要对多个线程进行同步。

交替执行的线程安全吗? 先来看一下下面的这个例子:

import time
import threading
share_data = 0

def tstart(arg):
    time.sleep(0.1)
    global share_data
    for i in range(1000000):
        share_data += 1

if __name__ == '__main__':
    t1 = threading.Thread(target = tstart, args = ('',))
    t2 = threading.Thread(target = tstart, args = ('',))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print('share_data result:', share_data)

上面这段代码执行结果share_data多数情况下会小于2000000,上一篇文章介绍过,python解释器CPython中引入了一个全局解释器锁(GIL),也就是任一时刻都只有一个线程在执行,但是这里还会出问题,为什么?

根本原因在于对share_data的写不是原子操作,线程在写的过程中被打断,然后切换线程执行,回来时会继续执行被打断的写操作,不过可能覆盖掉这段时间另一个线程写的结果。

这就是多线程写操作带来的线程安全问题。具体来说这种线程同步属于互斥关系。接下来看一下python提供的多线程同步措施。

threading模块的给python线程提供了一些同步机制,具体用法可以参照官网上的文档说明。

Lock:互斥锁,只能有一个线程获取,获取该锁的线程才能执行,否则阻塞;
RLock:递归锁,也称可重入锁,已经获得该锁的线程可以继续多次获得该锁,而不会被阻塞,释放的次数必须和获取的次数相同才会真正释放该锁;
Condition:条件变量,使得一个线程等待另一个线程满足特定条件,比如改变状态或某个值。然后会主动通知另一个线程,并主动放弃锁;
Semaphore:信号锁。为线程间共享的有限资源提供一个”计数器”,如果没有可用资源则会被阻塞;
Event:事件锁,任意数量的线程等待某个事件的发生,在该事件发生后所有线程被激活;
Timer:一种计时器(其用法比较简单,不算同步机制暂不介绍)

互斥锁Lock

其基本用法非常简单:

  1. 创建锁:Lock()
  2. 获得锁:acquire([blocking])
  3. 释放锁:release()
import threading
import time
lock = threading.Lock()     # step 1: 创建互斥锁
share_data = 0

def tstart(arg):
    time.sleep(0.1)
    global share_data
    if lock.acquire():       # step 2: 获取互斥锁,否则阻塞当前线程
        share_data += 1
    lock.release()          # step 3: 释放互斥锁

if __name__ == '__main__':
    tlst = list()
    for i in range(10):
        t = threading.Thread(target=tstart, args=('',))
        tlst.append(t)
    for t in tlst:
        t.start()
    tlst[2].join()
    print("This is main function at:%s" % time.time())
    print('share_data result:', share_data)

结果:
This is main function at:1607262802.907295
share_data result: 6

上面的share_data结果有一定的随机性,因为我们只等待第二个线程执行结束就直接读取结果然后结束主线程了。

不过从上面这个结果我们可以推断出,当第三个线程结束且主线程执行到输出share_data的结果时,至少6个线程完成了对share_data的加1操作。

重入锁RLock

由于当前线程获得锁之后,在释放锁之前有可能再次获取锁导致死锁。python引入了重入锁。

  • 与线程相关,可在一个线程中获取锁,并可继续在同一线程中不阻塞多次获取锁
  • 当锁未释放完,其它线程获取锁就会阻塞,直到当前持有锁的线程释放完锁
  • 锁都应该使用完后释放。可重入锁也是锁,应该acquire多少次,就release多少次
import threading
import time
rlock = threading.RLock()     # step 1: 创建重入锁
share_data = 0

def check_data():
    global share_data
    if rlock.acquire():
        share_data = 11
        if share_data > 10:
            share_data = 1
    rlock.release()

def tstart(arg):
    time.sleep(0.1)
    global share_data
    if rlock.acquire():       # step 2: 获取重入锁,否则阻塞当前线程
        check_data()
        share_data += 1
    rlock.release()          # step 3: 释放重入锁

if __name__ == '__main__':
    t1 = threading.Thread(target = tstart, args = ('',))
    t1.start()
    t1.join()
    print("This is main function at:%s" % time.time())
    print('share_data result:', share_data)

这个例子如果使用互斥锁,就会导致当前线程阻塞。

信号量Semaphore

和Lock很像,信号量对象内部维护一个倒计数器,每一次acquire都会减1,当acquire方法发现计数为0就阻塞请求 的线程,直到其它线程对信号量release后,计数大于0,恢复阻塞的线程。

注意: 计数器永远不会低于0,因为acquire的时候,发现是0,都会被阻塞。 信号量没有做超界限制

import time
import threading
import random
def get_wait_time():
    return random.random()/5.0

# 资源数0
S = threading.Semaphore(0)
def consumer(name):
    S.acquire()
    time.sleep(get_wait_time())
    print(name)

def producer(name):
    # time.sleep(0.1)
    time.sleep(get_wait_time())
    print(name)
    S.release()

if __name__ == "__main__":
    for i in range(5, 10):
        c = threading.Thread(target=consumer, args=("consumer:%s"%i, ))
        c.start()
    for i in range(5):
        p = threading.Thread(target=producer, args=("producer:%s"%i, ))
        p.start()
    time.sleep(2)

结果:
producer:1
producer:4
producer:3
producer:0
consumer:5
producer:2
consumer:6
consumer:7
consumer:8
consumer:9

python 线程其他锁下次我们在一一讲解。

收藏
评论区
发表评论
最新文章
30min搭建kubernetes 2020-12-14 23:49
pytest介绍 2020-11-29 16:34

导读