source: geniuswebcore/test/geniusweb/simplerunner/ClassPathConnectionFactoryTest.py@ 100

Last change on this file since 100 was 100, checked in by ruud, 14 months ago

python installs also wheel to avoid error messages

File size: 2.0 KB
Line 
1import time
2import unittest
3
4from uri.uri import URI
5
6from geniusweb.references.PartyRef import PartyRef
7from geniusweb.simplerunner.ClassPathConnectionFactory import ClassPathConnectionFactory, \
8 ConnectionPair
9
10
11class ClassPathConnectionFactoryTest(unittest.TestCase):
12 factory=ClassPathConnectionFactory()
13 PARTYREF = PartyRef(URI("pythonpath:test.testparty.TestParty.TestParty"))
14
15
16 def testWrongURI(self):
17 self.assertRaises(ValueError,
18 lambda:self.factory.connect(PartyRef(URI("http://blabla"))))
19
20
21 def testNullPath(self):
22 # bad because classpath should not start with //
23 self.assertRaises(ValueError, lambda:self.factory.connect(PartyRef(URI(
24 "pythonpath://some.class.path"))))
25
26 def testUnknownParty(self):
27 self.assertRaises(ValueError, lambda:self.factory.connect(PartyRef(URI(
28 "pythonpath:blabla.bla"))))
29
30 def testParty(self) :
31 conn = self.factory.connect(self.PARTYREF)
32 self.assertIsNotNone(conn)
33 conn.close()
34
35 def testConnectionPairProtocolClosesNormally(self):
36 cpair = ConnectionPair(self.PARTYREF);
37 self.assertEquals(2, len(cpair.getOpenConnections()))
38 cpair.getProtocolToPartyConn().close()
39 time.sleep(0.1)
40 self.assertEqual(0, len(cpair.getOpenConnections()))
41
42 def testConnectionPairProtocolClosesWithExc(self):
43 cpair = ConnectionPair(self.PARTYREF)
44 self.assertEqual(2, len(cpair.getOpenConnections()))
45 cpair.getProtocolToPartyConn().setError(ValueError("test"))
46 time.sleep(0.1)
47 self.assertEqual(0, len(cpair.getOpenConnections()))
48
49 def testConnectionPairPartyClosesNormally(self):
50 cpair = ConnectionPair(self.PARTYREF)
51 self.assertEqual(2, len(cpair.getOpenConnections()))
52 cpair._getPartyToProtocolConn().close()
53 time.sleep(0.1)
54
55 self.assertEqual(0, len(cpair.getOpenConnections()))
56
57 def testConnectionPairPartyClosesWithExc(self):
58 cpair = ConnectionPair(self.PARTYREF)
59 self.assertEqual(2, len(cpair.getOpenConnections()))
60 cpair._getPartyToProtocolConn().setError(ValueError("test"))
61 time.sleep(0.1)
62 self.assertEqual(0, len(cpair.getOpenConnections()))
63
Note: See TracBrowser for help on using the repository browser.