source: geniuswebcore/test/geniusweb/simplerunner/BlockingQueueTest.py@ 82

Last change on this file since 82 was 81, checked in by Bart Vastenhouw, 3 years ago

Added python timedependent parties (conceder, hardliner, etc)

File size: 891 bytes
Line 
1from threading import Thread
2import time
3import unittest
4from geniusweb.simplerunner.BlockingQueue import BlockingQueue
5
6
7
8class BlockingQueueTest(unittest.TestCase):
9 def testSimple(self):
10 q=BlockingQueue[int](4)
11 self.assertEqual(0, q.size())
12 q.put(1)
13 q.put(2)
14 q.put(3)
15 self.assertEqual(3, q.size())
16 self.assertEqual(1,q.take())
17 self.assertEqual(2,q.take())
18 self.assertEqual(3,q.take())
19 self.assertEqual(0, q.size())
20
21 def testThreading(self):
22 q=BlockingQueue[int](4)
23
24 def pushnumbers():
25 for n in range(4):
26 time.sleep(1)
27 q.put(n)
28
29 t = Thread(target = pushnumbers)
30 t.start()
31
32 for n in range(4):
33 val=q.take()
34 print(val)
35 self.assertEqual(n, val)
36
37
Note: See TracBrowser for help on using the repository browser.