source: geniuswebcore/geniusweb/simplerunner/BasicConnection.py@ 92

Last change on this file since 92 was 90, checked in by Bart Vastenhouw, 3 years ago

Refactor to help reusing partiesserver.

File size: 4.2 KB
RevLine 
[90]1from collections import deque
2from multiprocessing import Queue
3from threading import Thread, Condition
4import threading
5import traceback
6from typing import TypeVar, Generic, List, Optional
7
8from tudelft.utilities.listener.Listener import Listener
9from uri.uri import URI
10
11from geniusweb.connection.ConnectionEnd import ConnectionEnd
12from geniusweb.references.Reference import Reference
13from geniusweb.simplerunner.BlockingQueue import BlockingQueue
14
15
16S=TypeVar("S")
17
18class Info (Generic[S]):
19 '''
20 Wrapper around data so that we can put Null end EOS in a
21 {@link ArrayBlockingQueue}
22 @param <S> the type of contained data.
23 '''
24
25class Data (Info[S]):
26
27 def __init__(self, data:S):
28 self._data = data
29
30 def get(self)->S:
31 return self._data;
32
33 def __repr__(self):
34 return str(self._data)
35
36class EOS (Info[S]):
37 '''
38 End of stream.
39 '''
40 def __repr__(self):
41 return "EOS"
42
43# I use this single instance everywhere
44THE_EOS:EOS = EOS()
45
46
47IN = TypeVar('IN')
48OUT = TypeVar('OUT')
49
50class BasicConnection(ConnectionEnd[IN, OUT]):
51 '''
52 A basic connection that implements connection with direct calls
53
54 @param <IN> the type of the incoming data
55 @param <OUT> the type of outgoing data
56 '''
57
58 def __init__(self, reference:Reference , uri:URI ):
59 '''
60 @param reference Reference that was used to create this connection.
61 @param uri the URI of the remote endpoint that makes up the
62 connection. This is a URI that uniquely identifies the
63 remote object
64 '''
65 self._reference = reference
66 self._uri = uri
67 self._listeners:List[Listener[IN]] = []
68 self._synclock = threading.RLock()
69 self._error:Optional[Exception]=None
70
71 # to be initialized
72 self._handler:Optional[Listener[OUT]] = None
73 self._messages = BlockingQueue[Info](4)
74
75
76 def init(self, newhandler:Listener[OUT] ) :
77 '''
78 To be called to hook up the other side that will handle a send action
79 from us. Must be called first.
80
81 @param newhandler a Listener&lt;OUT&gt; that can handle send actions.
82 '''
83 if self._handler:
84 raise ValueError("already initialized")
85 self._handler = newhandler
86 this=self
87
88 class MyHandlerThread(Thread):
89 '''
90 thread that handles this._messages until EOS is hit.
91 It runs in scope of init and uses 'this'
92 '''
93 def run(self):
94 try:
95 while (True):
96 #print("INTO"+str(self))
97 mess = this._messages.take()
98 #print("OUT"+str(self))
99 if mess==THE_EOS:
100 break;
101 this._handler.notifyChange(mess.get())
102 except Exception as e:
103 this.setError(e)
104 this._handler = None
105 #print("BasicConnection closed");
106
107 handlerThread=MyHandlerThread()
108 handlerThread.start();
109
110 def setError(self, e:Exception):
111 '''
112 Error condition occurs. Record error and close connection
113
114 @param e
115 '''
116 with self._synclock:
117 if not self._error:
118 # maybe log instead?
119 traceback.print_exc()
120 self._error = e
121 self.close()
122
123 def send(self, data:OUT ) :
124 with self._synclock:
125 if not self._handler:
126 raise ValueError(
127 "BasicConnection has not been initialized or was closed")
128 # it seems there is no InterruptedException possible in python.
129 self._messages.put(Data(data))
130
131 def getReference(self) -> Reference :
132 return self._reference
133
134 def getRemoteURI(self)->URI:
135 return self._uri
136
137 def close(self):
138 with self._synclock:
139 print("flushing and terminating " + str(self))
140 if not self._handler or self._messages.contains(THE_EOS):
141 return
142 # it seems there is no InterruptedException possible in python.
143 self._messages.put(THE_EOS)
144
145 def __repr__(self):
146 return "BasicConnection[" + str(self._reference) + "]"
147
148 def getError(self)->Optional[Exception]:
149 return self._error;
150
151 def isOpen(self)->bool:
152 '''
153 @return true iff this connection is open. Returns false also when then
154 handler is in the close-down process
155 '''
156 return self._handler != None and not self._messages.contains(THE_EOS)
157
158 #****************** implements listenable ****************
159 # override because notifyListeners should throw exceptions.
160 def addListener(self, l:Listener[IN]):
161 self._listeners.append(l)
162
163 def removeListener(self, l:Listener[IN] ) :
164 self._listeners.remove(l)
165
166 def notifyListeners(self, data:IN ) :
167 for l in self._listeners:
168 l.notifyChange(data)
169
Note: See TracBrowser for help on using the repository browser.