source: geniuswebcore/test/geniusweb/simplerunner/BlockingQueueTest.py

Last change on this file was 100, checked in by ruud, 21 months ago

python installs also wheel to avoid error messages

File size: 890 bytes
Line 
1from threading import Thread
2import time
3import unittest
4from geniusweb.simplerunner.BlockingQueue import BlockingQueue
5
6
7class BlockingQueueTest(unittest.TestCase):
8
9 def testSimple(self):
10 q = BlockingQueue[int](4)
11 self.assertEqual(0, q.size())
12 q.put(1)
13 q.put(2)
14 q.put(3)
15 self.assertEqual(3, q.size())
16 self.assertEqual(1, q.take())
17 self.assertEqual(2, q.take())
18 self.assertEqual(3, q.take())
19 self.assertEqual(0, q.size())
20
21 def testThreading(self):
22 q = BlockingQueue[int](4)
23
24 def pushnumbers():
25 for n in range(4):
26 time.sleep(1)
27 q.put(n)
28
29 t = Thread(target=pushnumbers)
30 t.start()
31
32 for n in range(4):
33 val = q.take()
34 print(val)
35 self.assertEqual(n, val)
36
Note: See TracBrowser for help on using the repository browser.