《操作系统》的实验代码。
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

337 line
14 KiB

  1. From:
  2. - http://www.laurentluce.com/posts/python-threads-synchronization-locks-rlocks-semaphores-conditions-events-and-queues/
  3. - http://yoyzhou.github.io/blog/2013/02/28/python-threads-synchronization-locks/
  4. - http://blog.chinaunix.net/uid-429659-id-3186991.html
  5. - http://blog.csdn.net/yidangui/article/details/8707187
  6. - http://blog.csdn.net/yidangui/article/details/8707205
  7. - http://blog.csdn.net/yidangui/article/details/8707209
  8. - http://blog.csdn.net/yidangui/article/details/8707197
  9. ## threads: Python threads synchronization: Locks, RLocks, Semaphores, Conditions, Events and Queues.
  10. ### threading简介
  11. python是支持多线程的,并且是native的线程。主要是通过thread和threading这两个模块来实现的。
  12. #### 实现模块
  13. - thread:多线程的底层支持模块,一般不建议使用;
  14. - threading:对thread进行了封装,将一些线程的操作对象化。
  15. #### threading模块
  16. - Timer与Thread类似,但要等待一段时间后才开始运行;
  17. - Lock 锁原语,这个我们可以对全局变量互斥时使用;
  18. - RLock 可重入锁,使单线程可以再次获得已经获得的锁;
  19. - Condition 条件变量,能让一个线程停下来,等待其他线程满足某个“条件”;
  20. - Event 通用的条件变量。多个线程可以等待某个事件发生,在事件发生后,所有的线程都被激活;
  21. - Semaphore为等待锁的线程提供一个类似“等候室”的结构;
  22. - BoundedSemaphore 与semaphore类似,但不允许超过初始值;
  23. - Queue:实现了多生产者(Producer)、多消费者(Consumer)的队列,支持锁原语,能够在多个线程之间提供很好的同步支持。
  24. thread是比较底层的模块,threading是对thread做了一些包装的,可以更加方便的被使用。创建thread的方式有:
  25. - 第一种方式:创建一个threading.Thread()的实例对象,给它一个函数。在它的初始化函数(__init__)中将可调用对象作为参数传入
  26. - 第二种方式:创建一个threading.Thread的实例,传给它一个可调用类对象,类中使用__call__()函数调用函数
  27. - 第三种方式:是通过继承Thread类,重写它的run方法;
  28. 第一种和第三种常用。
  29. 第一种方式举例:
  30. ```
  31. #coding=utf-8
  32. import threading
  33. def thread_fun(num):
  34. for n in range(0, int(num)):
  35. print " I come from %s, num: %s" %( threading.currentThread().getName(), n)
  36. def main(thread_num):
  37. thread_list = list();
  38. # 先创建线程对象
  39. for i in range(0, thread_num):
  40. thread_name = "thread_%s" %i
  41. thread_list.append(threading.Thread(target = thread_fun, name = thread_name, args = (20,)))
  42. # 启动所有线程
  43. for thread in thread_list:
  44. thread.start()
  45. # 主线程中等待所有子线程退出
  46. for thread in thread_list:
  47. thread.join()
  48. if __name__ == "__main__":
  49. main(3)
  50. ```
  51. 第三种方式举例1:
  52. ```
  53. #!/usr/bin/env python
  54. import threading
  55. import time
  56. count=1
  57. class KissThread(threading.Thread):
  58. def run(self):
  59. global count
  60. print "Thread # %s:Pretending to do stuff" % count
  61. count+=1
  62. time.sleep(2)
  63. print "done with stuff"
  64. for t in range(5):
  65. KissThread().start()
  66. ```
  67. 第三种方式举例2:
  68. ```
  69. import threading
  70. class MyThread(threading.Thread):
  71. def __init__(self):
  72. threading.Thread.__init__(self)
  73. def run(self):
  74. print "I am %s" % (self.name)
  75. if __name__ == "__main__":
  76. for i in range(0, 5):
  77. my_thread = MyThread()
  78. my_thread.start()
  79. ```
  80. ### Thread类常用方法
  81. #### getName(self)
  82. 返回线程的名字
  83. #### setName方法
  84. 可以指定每一个thread的name
  85. ```
  86. def __init__(self):
  87. threading.Thread.__init__(self)
  88. self.setName("new" + self.name)
  89. ```
  90. #### isAlive(self)
  91. 布尔标志,表示这个线程是否还在运行中
  92. #### isDaemon(self)
  93. 返回线程的daemon标志
  94. #### run(self)
  95. 定义线程的功能函数
  96. #### start方法
  97. 启动线程
  98. #### join方法
  99. join方法原型如下,这个方法是用来程序挂起,直到线程结束,如果给出timeout,则最多阻塞timeout秒
  100. ```
  101. def join(self, timeout=None):
  102. ```
  103. #### setDaemon方法
  104. 当我们在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程就分兵两路,当主线程完成想退出时,会检验子线程是否完成。如果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是,只要主线程完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以用setDaemon方法,并设置其参数为True。
  105. ### Queue提供的类
  106. - Queue队列
  107. - LifoQueue后入先出(LIFO)队列
  108. - PriorityQueue 优先队列
  109. ### 互斥锁
  110. Python编程中,引入了对象互斥锁的概念,来保证共享数据操作的完整性。每个对象都对应于一个可称为" 互斥锁" 的标记,这个标记用来保证在任一时刻,只能有一个线程访问该对象。在Python中我们使用threading模块提供的Lock类。添加一个互斥锁变量mutex = threading.Lock(),然后在争夺资源的时候之前我们会先抢占这把锁mutex.acquire(),对资源使用完成之后我们在释放这把锁mutex.release()。
  111. 当一个线程调用Lock对象的acquire()方法获得锁时,这把锁就进入“locked”状态。因为每次只有一个线程可以获得锁,所以如果此时另一个线程试图获得这个锁,该线程就会变为同步阻塞状态。直到拥有锁的线程调用锁的release()方法释放锁之后,该锁进入“unlocked”状态。线程调度程序从处于同步阻塞状态的线程中选择一个来获得锁,并使得该线程进入运行(running)状态。
  112. ```
  113. import threading
  114. import time
  115. counter = 0
  116. mutex = threading.Lock()
  117. class MyThread(threading.Thread):
  118. def __init__(self):
  119. threading.Thread.__init__(self)
  120. def run(self):
  121. global counter, mutex
  122. time.sleep(1);
  123. if mutex.acquire():
  124. counter += 1
  125. print "I am %s, set counter:%s" % (self.name, counter)
  126. mutex.release()
  127. if __name__ == "__main__":
  128. for i in range(0, 100):
  129. my_thread = MyThread()
  130. my_thread.start()
  131. ```
  132. ### Condition条件变量
  133. Python提供的Condition对象提供了对复杂线程同步问题的支持。Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法。使用Condition的主要方式为:线程首先acquire一个条件变量,然后判断一些条件。如果条件不满足则wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。
  134. 另外:Condition对象的构造函数可以接受一个Lock/RLock对象作为参数,如果没有指定,则Condition对象会在内部自行创建一个RLock;除了notify方法外,Condition对象还提供了notifyAll方法,可以通知waiting池中的所有线程尝试acquire内部锁。由于上述机制,处于waiting状态的线程只能通过notify方法唤醒,所以notifyAll的作用在于防止有线程永远处于沉默状态。
  135. #### “生产者-消费者”模型
  136. 代码中主要实现了生产者和消费者线程,双方将会围绕products来产生同步问题,首先是2个生成者生产products ,而接下来的4个消费者将会消耗products.
  137. 实现举例:
  138. ```
  139. #coding=utf-8
  140. #!/usr/bin/env python
  141. import threading
  142. import time
  143. condition = threading.Condition()
  144. products = 0
  145. class Producer(threading.Thread):
  146. def __init__(self):
  147. threading.Thread.__init__(self)
  148. def run(self):
  149. global condition, products
  150. while True:
  151. if condition.acquire():
  152. if products < 10:
  153. products += 1;
  154. print "Producer(%s):deliver one, now products:%s" %(self.name, products)
  155. condition.notify()
  156. else:
  157. print "Producer(%s):already 10, stop deliver, now products:%s" %(self.name, products)
  158. condition.wait();
  159. condition.release()
  160. time.sleep(1)
  161. class Consumer(threading.Thread):
  162. def __init__(self):
  163. threading.Thread.__init__(self)
  164. def run(self):
  165. global condition, products
  166. while True:
  167. if condition.acquire():
  168. if products > 1:
  169. products -= 1
  170. print "Consumer(%s):consume one, now products:%s" %(self.name, products)
  171. condition.notify()
  172. else:
  173. print "Consumer(%s):only 1, stop consume, products:%s" %(self.name, products)
  174. condition.wait();
  175. condition.release()
  176. time.sleep(2)
  177. if __name__ == "__main__":
  178. for p in range(0, 2):
  179. p = Producer()
  180. p.start()
  181. for c in range(0, 4):
  182. c = Consumer()
  183. c.start()
  184. ```
  185. ### 信号量semaphore
  186. semaphore是一个变量,控制着对公共资源或者临界区的访问。信号量维护着一个计数器,指定可同时访问资源或者进入临界区的线程数。每次有一个线程获得信号量时,计数器-1。若计数器为0,其他线程就停止访问信号量,直到另一个线程释放信号量。
  187. ```
  188. #coding=utf-8
  189. import threading
  190. import random
  191. import time
  192. class SemaphoreThread(threading.Thread):
  193. """class using semaphore"""
  194. availableTables=['A','B','C','D','E']
  195. def __init__(self,threadName,semaphore):
  196. """initialize thread"""
  197. threading.Thread.__init__(self,name=threadName)
  198. self.sleepTime=random.randrange(1,6)
  199. #set the semaphore as a data attribute of the class
  200. self.threadSemaphore=semaphore
  201. def run(self):
  202. """Print message and release semaphore"""
  203. #acquire the semaphore
  204. self.threadSemaphore.acquire()
  205. #remove a table from the list
  206. table=SemaphoreThread.availableTables.pop()
  207. print "%s entered;seated at table %s." %(self.getName(),table),
  208. print SemaphoreThread.availableTables
  209. time.sleep(self.sleepTime)
  210. #free a table
  211. print " %s exiting;freeing table %s." %(self.getName(),table),
  212. SemaphoreThread.availableTables.append(table)
  213. print SemaphoreThread.availableTables
  214. #release the semaphore after execution finishes
  215. self.threadSemaphore.release()
  216. threads=[] #list of threads
  217. #semaphore allows five threads to enter critical section
  218. threadSemaphore=threading.Semaphore(len(SemaphoreThread.availableTables))
  219. #创建一个threading.Semaphore对象,他最多允许5个线程访问临界区。
  220. #Semaphore类的一个对象用计数器跟踪获取和释放信号量的线程数量。
  221. #create ten threads
  222. for i in range(1,11):
  223. threads.append(SemaphoreThread("thread"+str(i),threadSemaphore))
  224. #创建一个列表,该列表由SemaphoreThread对象构成,start方法开始列表中的每个线程
  225. #start each thread
  226. for thread in threads:
  227. thread.start()
  228. ```
  229. SemaphoreThread类的每个对象代表饭馆里的一个客人。类属性availableTables跟踪饭馆中可用的桌子。
  230. 信号量有个内建的计数器,用于跟踪他的acquire和release方法调用的次数。内部计数器的初始值可作为参数传给Semaphore构造函数。默认值为1.计数器大于0,Semaphore的acquire方法就为线程获得信号量,并计数器自减。
  231. ### 死锁现象
  232. 所谓死锁: 是指两个或两个以上的进程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。 由于资源占用是互斥的,当某个进程提出申请资源后,使得有关进程在无外力协助下,永远分配不到必需的资源而无法继续运行,这就产生了一种特殊现象死锁。
  233. ```
  234. import threading
  235. counterA = 0
  236. counterB = 0
  237. mutexA = threading.Lock()
  238. mutexB = threading.Lock()
  239. class MyThread(threading.Thread):
  240. def __init__(self):
  241. threading.Thread.__init__(self)
  242. def run(self):
  243. self.fun1()
  244. self.fun2()
  245. def fun1(self):
  246. global mutexA, mutexB
  247. if mutexA.acquire():
  248. print "I am %s , get res: %s" %(self.name, "ResA")
  249. if mutexB.acquire():
  250. print "I am %s , get res: %s" %(self.name, "ResB")
  251. mutexB.release()
  252. mutexA.release()
  253. def fun2(self):
  254. global mutexA, mutexB
  255. if mutexB.acquire():
  256. print "I am %s , get res: %s" %(self.name, "ResB")
  257. if mutexA.acquire():
  258. print "I am %s , get res: %s" %(self.name, "ResA")
  259. mutexA.release()
  260. mutexB.release()
  261. if __name__ == "__main__":
  262. for i in range(0, 100):
  263. my_thread = MyThread()
  264. my_thread.start()
  265. ```