source: geniuswebcore/test/geniusweb/simplerunner/BlockingQueueTest.py@ 93

Last change on this file since 93 was 90, checked in by Bart Vastenhouw, 3 years ago

Refactor to help reusing partiesserver.

File size: 890 bytes
Line 
1from threading import Thread
2import time
3import unittest
4from geniusweb.simplerunner.BlockingQueue import BlockingQueue
5
6
7class BlockingQueueTest(unittest.TestCase):
8
9 def testSimple(self):
10 q = BlockingQueue[int](4)
11 self.assertEqual(0, q.size())
12 q.put(1)
13 q.put(2)
14 q.put(3)
15 self.assertEqual(3, q.size())
16 self.assertEqual(1, q.take())
17 self.assertEqual(2, q.take())
18 self.assertEqual(3, q.take())
19 self.assertEqual(0, q.size())
20
21 def testThreading(self):
22 q = BlockingQueue[int](4)
23
24 def pushnumbers():
25 for n in range(4):
26 time.sleep(1)
27 q.put(n)
28
29 t = Thread(target=pushnumbers)
30 t.start()
31
32 for n in range(4):
33 val = q.take()
34 print(val)
35 self.assertEqual(n, val)
36
Note: See TracBrowser for help on using the repository browser.