Python历程、线程、协程详解
历程与线程的汗青
我们都知道计较机是由硬件和软件构成的。硬件中的CPU是计较机的焦点,它包袱计较机的所有任务。 操纵系统是运行在硬件之上的软件,是计较机的打点者,它认真资源的打点和分派、任务的调治。 措施是运行在系统上的具有某种成果的软件,好比说欣赏器,音乐播放器等。 每次执行措施的时候,城市完成必然的成果,好比说欣赏器帮我们打开网页,为了担保其独立性,就需要一个专门的打点和节制执行措施的数据布局——历程节制块。 历程就是一个措施在一个数据集上的一次动态执行进程。 历程一般由措施、数据集、历程节制块三部门构成。我们编写的措施用来描写历程要完成哪些成果以及如何完成;数据集则是措施在执行进程中所需要利用的资源;历程节制块用来记录历程的外部特征,描写历程的执行变革进程,系统可以操作它来节制和打点历程,它是系统感知历程存在的独一符号。
在早期的操纵系统里,计较机只有一个焦点,历程执行措施的最小单元,任务调治回收时间片轮转的抢占式方法举办历程调治。每个历程都有各自的一块独立的内存,担保历程互相间的内存地点空间的断绝。 跟着计较机技能的成长,历程呈现了许多漏洞,一是历程的建设、取消和切换的开销较量大,二是由于对称多处理惩罚机(对称多处理惩罚机(SymmetricalMulti-Processing)又叫SMP,是指在一个计较机上搜集了一组处理惩罚器(多CPU),各CPU之间共享内存子系统以及总线布局)的呈现,可以满意多个运行单元,而多历程并行开销过大。 这个时候就引入了线程的观念。 线程也叫轻量级历程,它是一个根基的CPU执行单位,也是措施执行进程中的最小单位,由线程ID、措施计数器、寄存器荟萃 和仓库配合构成。线程的引入减小了措施并发执行时的开销,提高了操纵系统的并发机能。 线程没有本身的系统资源,只拥有在运行时必不行少的资源。但线程可以与同属与同一历程的其他线程共享历程所拥有的其他资源。
历程与线程之间的干系
线程是属于历程的,线程运行在历程空间内,同一历程所发生的线程共享同一内存空间,当历程退出时该历程所发生的线程城市被强制退出并排除。线程可与属于同一历程的其它线程共享历程所拥有的全部资源,可是其自己根基上不拥有系统资源,只拥有一点在运行中必不行少的信息(如措施计数器、一组寄存器和栈)。
python 线程
Threading用于提供线程相关的操纵,线程是应用措施中事情的最小单位。
1、threading模块
threading 模块成立在 _thread 模块之上。thread 模块以初级、原始的方法来处理惩罚和节制线程,而 threading 模块通过对 thread 举办二次封装,提供了更利便的 api 来处理惩罚线程。
import threading import time def worker(num): """ thread worker function :return: """ time.sleep(1) print("The num is %d" % num) return for i in range(20): t = threading.Thread(target=worker,args=(i,),name=“t.%d” % i) t.start()
上述代码建设了20个“前台”线程,然后节制器就交给了CPU,CPU按照指定算法举办调治,分片执行指令。
Thread要领说明
t.start() : 激活线程,
t.getName() : 获取线程的名称
t.setName() : 配置线程的名称
t.name : 获取或配置线程的名称
t.is_alive() : 判定线程是否为激活状态
t.isAlive() :判定线程是否为激活状态
t.setDaemon() 配置为靠山线程或前台线程(默认:False);通过一个布尔值配置线程是否为守护线程,必需在执行start()要领之后才可以利用。假如是靠山线程,主线程执行进程中,靠山线程也在举办,主线程执行完毕后,靠山线程岂论乐成与否,均遏制;假如是前台线程,主线程执行进程中,前台线程也在举办,主线程执行完毕后,期待前台线程也执行完成后,措施遏制
t.isDaemon() : 判定是否为守护线程
t.ident :获取线程的标识符。线程标识符是一个非零整数,只有在挪用了start()要领之后该属性才有效,不然它只返回None。
t.join() :逐个执行每个线程,执行完毕后继承往下执行,该要领使得多线程变得无意义
t.run() :线程被cpu调治后自动执行线程工具的run要领
2、线程锁threading.RLock和threading.Lock
#p#分页标题#e#
由于线程之间是举办随机调治,而且每个线程大概只执行n条执行之后,CPU接着执行其他线程。为了担保数据的精确性,引入了锁的观念。所以,大概呈现如下问题:
例:假设列表A的所有元素就为0,当一个线程从前向后打印列表的所有元素,别的一个线程则从后向前修改列表的元素为1,那么输出的时候,列表的元素就会一部门为0,一部门为1,这就导致了数据的纷歧致。锁的呈现办理了这个问题。
import threading import time globals_num = 0 lock = threading.RLock() def Func(): lock.acquire() # 得到锁 global globals_num globals_num += 1 time.sleep(1) print(globals_num) lock.release() # 释放锁 for i in range(10): t = threading.Thread(target=Func) t.start()
3、threading.RLock和threading.Lock 的区别
RLock答允在同一线程中被多次acquire。而Lock却不答允这种环境。 假如利用RLock,那么acquire和release必需成对呈现,即挪用了n次acquire,必需挪用n次的release才气真正释放所占用的琐。
import threading lock = threading.Lock() #Lock工具 lock.acquire() lock.acquire() #发生了死琐。 lock.release() lock.release() import threading rLock = threading.RLock() #RLock工具 rLock.acquire() rLock.acquire() #在同一线程内,措施不会堵塞。 rLock.release() rLock.release()
4、threading.Event
python线程的事件用于主线程节制其他线程的执行,事件主要提供了三个要领 set、wait、clear。
事件处理惩罚的机制:全局界说了一个“Flag”,假如“Flag”值为 False,那么当措施执行 event.wait 要领时就会阻塞,假如“Flag”值为True,那么event.wait 要领时便不再阻塞。
clear:将“Flag”配置为False
set:将“Flag”配置为True
Event.isSet() :判定标识位是否为Ture。
import threading def do(event): print('start') event.wait() print('execute') event_obj = threading.Event() for i in range(10): t = threading.Thread(target=do, args=(event_obj,)) t.start() event_obj.clear() inp = input('input:') if inp == 'true': event_obj.set()
当线程执行的时候,假如flag为False,则线程会阻塞,当flag为True的时候,线程不会阻塞。它提供了当地和长途的并发性。
5、threading.Condition
一个condition变量老是与某些范例的锁相接洽,这个可以利用默认的环境或建设一个,当几个condition变量必需共享和同一个锁的时候,是很有用的。锁是conditon工具的一部门:没有须要别离跟踪。
condition变量听从上下文打点协议:with语句块关闭之前可以获取与锁的接洽。 acquire() 和 release() 会挪用与锁相关联的相应的要领。
其他和锁关联的要领必需被挪用,wait()要了解释放锁,当别的一个线程利用 notify() or notify_all()叫醒它之前会一直阻塞。一旦被叫醒,wait()会从头得到锁并返回,
Condition类实现了一个conditon变量。 这个conditiaon变量答允一个或多个线程期待,直到他们被另一个线程通知。 假如lock参数,被给定一个非空的值,,那么他必需是一个lock可能Rlock工具,它用来做底层锁。不然,会建设一个新的Rlock工具,用来做底层锁。
wait(timeout=None) : 期待通知,可能比及设定的超时时间。当挪用这wait()要领时,假如挪用它的线程没有获得锁,那么会抛出一个RuntimeError 异常。 wati()释放锁今后,在被挪用沟通条件的另一个历程用notify() or notify_all() 唤醒之前 会一直阻塞。wait() 还可以指定一个超时时间。
假如有期待的线程,notify()要了解叫醒一个在期待conditon变量的线程。notify_all() 则会叫醒所有在期待conditon变量的线程。
留意: notify()和notify_all()不会释放锁,也就是说,线程被叫醒后不会立即返回他们的wait() 挪用。除非线程挪用notify()和notify_all()之后放弃了锁的所有权。
#p#分页标题#e#
在典范的设计气势气魄里,操作condition变量用锁去通许会见一些共享状态,线程在获取到它想获得的状态前,会重复挪用wait()。修改状态的线程在他们状态改变时挪用 notify() or notify_all(),用这种方法,线程会尽大概的获取到想要的一个期待者状态。 例子: 出产者-消费者模子,
import threading import time def consumer(cond): with cond: print("consumer before wait") cond.wait() print("consumer after wait") def producer(cond): with cond: print("producer before notifyAll") cond.notifyAll() print("producer after notifyAll") condition = threading.Condition() c1 = threading.Thread(name="c1", target=consumer, args=(condition,)) c2 = threading.Thread(name="c2", target=consumer, args=(condition,)) p = threading.Thread(name="p", target=producer, args=(condition,)) c1.start() time.sleep(2) c2.start() time.sleep(2) p.start()
6、queue模块
Queue 就是对行列,它是线程安详的
举例来说,我们去麦当劳用饭。饭馆内里有厨师地位,前台认真把厨房做好的饭卖给顾主,顾主则去前台领取做好的饭。这里的前台就相当于我们的行列。形成管道样,厨师做好饭通过前台传送给顾主,所谓单向行列
这个模子也叫出产者-消费者模子。
import queue q = queue.Queue(maxsize=0) # 结构一个先进显出行列,maxsize指定行列长度,为0 时,暗示行列长度无限制。 q.join() # 比及行列为kong的时候,在执行此外操纵 q.qsize() # 返回行列的巨细 (不行靠) q.empty() # 当行列为空的时候,返回True 不然返回False (不行靠) q.full() # 当行列满的时候,返回True,不然返回False (不行靠) q.put(item, block=True, timeout=None) # 将item放入Queue尾部,item必需存在,可以参数block默认为True,暗示当行列满时,会期待行列给出可用位置, 为False时为非阻塞,此时假如行列已满,会激发queue.Full 异常。 可选参数timeout,暗示 会阻塞配置的时间,事后, 假如行列无法给出放入item的位置,则激发 queue.Full 异常 q.get(block=True, timeout=None) # 移除并返回行列头部的一个值,可选参数block默认为True,暗示获取值的时候,假如行列为空,则阻塞,为False时,不阻塞, 若此时行列为空,则激发 queue.Empty异常。 可选参数timeout,暗示会阻塞配置的时候,事后,假如行列为空,则激发Empty异常。 q.put_nowait(item) # 等效于 put(item,block=False) q.get_nowait() # 等效于 get(item,block=False)
代码如下:
#!/usr/bin/env python import Queue import threading message = Queue.Queue(10) def producer(i): while True: message.put(i) def consumer(i): while True: msg = message.get() for i in range(12): t = threading.Thread(target=producer, args=(i,)) t.start() for i in range(10): t = threading.Thread(target=consumer, args=(i,)) t.start()
那就本身做个线程池吧:
# 简朴往行列中传输线程数 import threading import time import queue class Threadingpool(): def __init__(self,max_num = 10): self.queue = queue.Queue(max_num) for i in range(max_num): self.queue.put(threading.Thread) def getthreading(self): return self.queue.get() def addthreading(self): self.queue.put(threading.Thread) def func(p,i): time.sleep(1) print(i) p.addthreading() if __name__ == "__main__": p = Threadingpool() for i in range(20): thread = p.getthreading() t = thread(target = func, args = (p,i)) t.start()
#p#分页标题#e#
#往行列中无限添加任务 import queue import threading import contextlib import time StopEvent = object() class ThreadPool(object): def __init__(self, max_num): self.q = queue.Queue() self.max_num = max_num self.terminal = False self.generate_list = [] self.free_list = [] def run(self, func, args, callback=None): """ 线程池执行一个任务 :param func: 任务函数 :param args: 任务函数所需参数 :param callback: 任务执行失败或乐成后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数) :return: 假如线程池已经终止,则返回True不然None """ if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: self.generate_thread() w = (func, args, callback,) self.q.put(w) def generate_thread(self): """ 建设一个线程 """ t = threading.Thread(target=self.call) t.start() def call(self): """ 轮回去获取任务函数并执行任务函数 """ current_thread = threading.currentThread self.generate_list.append(current_thread) event = self.q.get() # 获取线程 while event != StopEvent: # 判定获取的线程数不便是全局变量 func, arguments, callback = event # 拆分元祖,得到执行函数,参数,回调函数 try: result = func(*arguments) # 执行函数 status = True except Exception as e: # 函数执行失败 status = False result = e if callback is not None: try: callback(status, result) except Exception as e: pass # self.free_list.append(current_thread) # event = self.q.get() # self.free_list.remove(current_thread) with self.work_state(): event = self.q.get() else: self.generate_list.remove(current_thread) def close(self): """ 封锁线程,给传输全局非元祖的变量来举办封锁 :return: """ for i in range(len(self.generate_list)): self.q.put(StopEvent) def terminate(self): """ 溘然封锁线程 :return: """ self.terminal = True while self.generate_list: self.q.put(StopEvent) self.q.empty() @contextlib.contextmanager def work_state(self): self.free_list.append(threading.currentThread) try: yield finally: self.free_list.remove(threading.currentThread) def work(i): print(i) return i +1 # 返回给回调函数 def callback(ret): print(ret) pool = ThreadPool(10) for item in range(50): pool.run(func=work, args=(item,),callback=callback) pool.terminate() # pool.close()
python 历程
multiprocessing是python的多历程打点包,和threading.Thread雷同。
1、multiprocessing模块
#p#分页标题#e#
直接从侧面用subprocesses替换线程利用GIL的方法,由于这一点,multiprocessing模块可以让措施员在给定的呆板上充实的操作CPU。在multiprocessing中,通过建设Process工具生成历程,然后挪用它的start()要领,
from multiprocessing import Process def func(name): print('hello', name) if __name__ == "__main__": p = Process(target=func,args=('zhangyanlin',)) p.start() p.join() # 期待历程执行完毕
在利用并发设计的时候最好尽大概的制止共享数据,尤其是在利用多历程的时候。 假如你真有需要 要共享数据, multiprocessing提供了两种方法。
(1)multiprocessing,Array,Value
数据可以用Value或Array存储在一个共享内存舆图里,如下:
from multiprocessing import Array,Value,Process def func(a,b): a.value = 3.333333333333333 for i in range(len(b)): b[i] = -b[i] if __name__ == "__main__": num = Value('d',0.0) arr = Array('i',range(11)) c = Process(target=func,args=(num,arr)) d= Process(target=func,args=(num,arr)) c.start() d.start() c.join() d.join() print(num.value) for i in arr: print(i)
输出:
3.1415927 [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
#p#分页标题#e#
建设num和arr时,“d”和“i”参数由Array模块利用的typecodes建设:“d”暗示一个双精度的浮点数,“i”暗示一个有标记的整数,这些共享工具将被线程安详的处理惩罚。
Array(‘i’, range(10))中的‘i’参数:
‘c’: ctypes.c_char ‘u’: ctypes.c_wchar ‘b’: ctypes.c_byte ‘B’: ctypes.c_ubyte
‘h’: ctypes.c_short ‘H’: ctypes.c_ushort ‘i’: ctypes.c_int ‘I’: ctypes.c_uint
‘l’: ctypes.c_long, ‘L’: ctypes.c_ulong ‘f’: ctypes.c_float ‘d’: ctypes.c_double
(2)multiprocessing,Manager
由Manager()返回的manager提供list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array范例的支持。
from multiprocessing import Process,Manager def f(d,l): d["name"] = "zhangyanlin" d["age"] = 18 d["Job"] = "pythoner" l.reverse() if __name__ == "__main__": with Manager() as man: d = man.dict() l = man.list(range(10)) p = Process(target=f,args=(d,l)) p.start() p.join() print(d) print(l)
输出:
{0.25: None, 1: '1', '2': 2} [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
Server process manager比 shared memory 更机动,因为它可以支持任意的工具范例。别的,一个单独的manager可以通过历程在网络上差异的计较机之间共享,不外他比shared memory要慢。
2、历程池(Using a pool of workers)
Pool类描写了一个事情历程池,他有几种差异的要领让任务卸载事情历程。
历程池内部维护一个进措施列,当利用时,则去历程池中获取一个历程,假如历程池序列中没有可供利用的进历程,那么措施就会期待,直到历程池中有可用历程为止。
我们可以用Pool类建设一个历程池, 展开提交的任务给历程池。 例:
#apply from multiprocessing import Pool import time def f1(i): time.sleep(0.5) print(i) return i + 100 if __name__ == "__main__": pool = Pool(5) for i in range(1,31): pool.apply(func=f1,args=(i,)) #apply_async def f1(i): time.sleep(0.5) print(i) return i + 100 def f2(arg): print(arg) if __name__ == "__main__": pool = Pool(5) for i in range(1,31): pool.apply_async(func=f1,args=(i,),callback=f2) pool.close() pool.join()
一个历程池工具可以节制事情历程池的哪些事情可以被提交,它支持超时和回调的异步功效,有一个雷同map的实现。
processes :利用的事情历程的数量,假如processes是None那么利用 os.cpu_count()返回的数量。
initializer: 假如initializer是None,那么每一个事情历程在开始的时候会挪用initializer(*initargs)。
#p#分页标题#e#
maxtasksperchild:事情历程退出之前可以完成的任务数,完成后用一个心的事情历程来替代原历程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在事情历程就会一直存活。
context: 用在拟定事情历程启动时的上下文,一般利用 multiprocessing.Pool() 可能一个context工具的Pool()要领来建设一个池,两种要领都适当的配置了context
留意:Pool工具的要领只可以被建设pool的历程所挪用。
New in version 3.2: maxtasksperchild
New in version 3.4: context
历程池的要领
apply(func[, args[, kwds]]) :利用arg和kwds参数挪用func函数,功效返回前会一直阻塞,由于这个原因,apply_async()更适归并发执行,别的,func函数仅被pool中的一个历程运行。
apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply()要领的一个变体,会返回一个功效工具。假如callback被指定,那么callback可以吸收一个参数然后被挪用,当功效筹备好回调时会挪用callback,挪用失败时,则用error_callback替换callback。 Callbacks应被当即完成,不然处理惩罚功效的线程会被阻塞。
close() : 阻止更多的任务提交到pool,待任务完成后,事情历程会退出。
terminate() : 不管任务是否完成,当即遏制事情历程。在对pool工具历程垃圾接纳的时候,会当即挪用terminate()。
join() : wait事情线程的退出,在挪用join()前,必需挪用close() or terminate()。这样是因为被终止的历程需要被父历程挪用wait(join等价与wait),不然历程会成为僵尸历程。
map(func, iterable[, chunksize])¶
map_async(func, iterable[, chunksize[, callback[, error_callback]]])¶
imap(func, iterable[, chunksize])¶
imap_unordered(func, iterable[, chunksize])
starmap(func, iterable[, chunksize])¶
starmap_async(func, iterable[, chunksize[, callback[, error_back]]])
python 协程
线程和历程的操纵是由措施触发系统接口,最后的执行者是系统;协程的操纵则是措施员。
协程存在的意义:对付多线程应用,CPU通过切片的方法来切换线程间的执行,线程切换时需要耗时(生存状态,下次继承)。协程,则只利用一个线程,在一个线程中划定某个代码块执行顺序。
协程的合用场景:当措施中存在大量不需要CPU的操纵时(IO),合用于协程;
event loop是协程执行的节制点, 假如你但愿执行协程, 就需要用到它们。
event loop提供了如下的特性:
注册、执行、打消延时挪用(异步函数)
建设用于通信的client和server协议(东西)
建设和此外措施通信的子历程和协议(东西)
把函数挪用送入线程池中
协程示例:
import asyncio async def cor1(): print("COR1 start") await cor2() print("COR1 end") async def cor2(): print("COR2") loop = asyncio.get_event_loop() loop.run_until_complete(cor1()) loop.close()
最后三行是重点。
asyncio.get_event_loop() : asyncio启动默认的event loop
run_until_complete() : 这个函数是阻塞执行的,知道所有的异步函数执行完成,
close() : 封锁event loop。
1、greenlet
import greenlet def fun1(): print("12") gr2.switch() print("56") gr2.switch() def fun2(): print("34") gr1.switch() print("78") gr1 = greenlet.greenlet(fun1) gr2 = greenlet.greenlet(fun2) gr1.switch()
2、gevent
gevent属于第三方模块需要下载安装包
pip3 install --upgrade pip3 pip3 install gevent
import gevent def fun1(): print("www.baidu.com") # 第一步 gevent.sleep(0) print("end the baidu.com") # 第三步 def fun2(): print("www.zhihu.com") # 第二步 gevent.sleep(0) print("end th zhihu.com") # 第四步 gevent.joinall([ gevent.spawn(fun1), gevent.spawn(fun2), ])
#p#分页标题#e#
碰着IO操纵自动切换:
import gevent import requests def func(url): print("get: %s"%url) gevent.sleep(0) date =requests.get(url) ret = date.text print(url,len(ret)) gevent.joinall([ gevent.spawn(func, 'https://www.pythontab.com/'), gevent.spawn(func, 'https://www.yahoo.com/'), gevent.spawn(func, 'https://github.com/'), ])
文章转自:http://www.cnblogs.com/aylin/p/5601969.html