1 | from collections import deque
|
---|
2 | from multiprocessing import Queue
|
---|
3 | from threading import Thread, Condition
|
---|
4 | import threading
|
---|
5 | import traceback
|
---|
6 | from typing import TypeVar, Generic, List, Optional
|
---|
7 |
|
---|
8 | from tudelft.utilities.listener.Listener import Listener
|
---|
9 | from uri.uri import URI
|
---|
10 |
|
---|
11 | from geniusweb.connection.ConnectionEnd import ConnectionEnd
|
---|
12 | from geniusweb.references.Reference import Reference
|
---|
13 | from geniusweb.simplerunner.BlockingQueue import BlockingQueue
|
---|
14 |
|
---|
15 |
|
---|
16 | S=TypeVar("S")
|
---|
17 |
|
---|
18 | class Info (Generic[S]):
|
---|
19 | '''
|
---|
20 | Wrapper around data so that we can put Null end EOS in a
|
---|
21 | {@link ArrayBlockingQueue}
|
---|
22 | @param <S> the type of contained data.
|
---|
23 | '''
|
---|
24 |
|
---|
25 | class Data (Info[S]):
|
---|
26 |
|
---|
27 | def __init__(self, data:S):
|
---|
28 | self._data = data
|
---|
29 |
|
---|
30 | def get(self)->S:
|
---|
31 | return self._data;
|
---|
32 |
|
---|
33 | def __repr__(self):
|
---|
34 | return str(self._data)
|
---|
35 |
|
---|
36 | class EOS (Info[S]):
|
---|
37 | '''
|
---|
38 | End of stream.
|
---|
39 | '''
|
---|
40 | def __repr__(self):
|
---|
41 | return "EOS"
|
---|
42 |
|
---|
43 | # I use this single instance everywhere
|
---|
44 | THE_EOS:EOS = EOS()
|
---|
45 |
|
---|
46 |
|
---|
47 | IN = TypeVar('IN')
|
---|
48 | OUT = TypeVar('OUT')
|
---|
49 |
|
---|
50 | class BasicConnection(ConnectionEnd[IN, OUT]):
|
---|
51 | '''
|
---|
52 | A basic connection that implements connection with direct calls
|
---|
53 |
|
---|
54 | @param <IN> the type of the incoming data
|
---|
55 | @param <OUT> the type of outgoing data
|
---|
56 | '''
|
---|
57 |
|
---|
58 | def __init__(self, reference:Reference , uri:URI ):
|
---|
59 | '''
|
---|
60 | @param reference Reference that was used to create this connection.
|
---|
61 | @param uri the URI of the remote endpoint that makes up the
|
---|
62 | connection. This is a URI that uniquely identifies the
|
---|
63 | remote object
|
---|
64 | '''
|
---|
65 | self._reference = reference
|
---|
66 | self._uri = uri
|
---|
67 | self._listeners:List[Listener[IN]] = []
|
---|
68 | self._synclock = threading.RLock()
|
---|
69 | self._error:Optional[Exception]=None
|
---|
70 |
|
---|
71 | # to be initialized
|
---|
72 | self._handler:Optional[Listener[OUT]] = None
|
---|
73 | self._messages = BlockingQueue[Info](4)
|
---|
74 |
|
---|
75 |
|
---|
76 | def init(self, newhandler:Listener[OUT] ) :
|
---|
77 | '''
|
---|
78 | To be called to hook up the other side that will handle a send action
|
---|
79 | from us. Must be called first.
|
---|
80 |
|
---|
81 | @param newhandler a Listener<OUT> that can handle send actions.
|
---|
82 | '''
|
---|
83 | if self._handler:
|
---|
84 | raise ValueError("already initialized")
|
---|
85 | self._handler = newhandler
|
---|
86 | this=self
|
---|
87 |
|
---|
88 | class MyHandlerThread(Thread):
|
---|
89 | '''
|
---|
90 | thread that handles this._messages until EOS is hit.
|
---|
91 | It runs in scope of init and uses 'this'
|
---|
92 | '''
|
---|
93 | def run(self):
|
---|
94 | try:
|
---|
95 | while (True):
|
---|
96 | #print("INTO"+str(self))
|
---|
97 | mess = this._messages.take()
|
---|
98 | #print("OUT"+str(self))
|
---|
99 | if mess==THE_EOS:
|
---|
100 | break;
|
---|
101 | this._handler.notifyChange(mess.get())
|
---|
102 | except Exception as e:
|
---|
103 | this.setError(e)
|
---|
104 | this._handler = None
|
---|
105 | #print("BasicConnection closed");
|
---|
106 |
|
---|
107 | handlerThread=MyHandlerThread()
|
---|
108 | handlerThread.start();
|
---|
109 |
|
---|
110 | def setError(self, e:Exception):
|
---|
111 | '''
|
---|
112 | Error condition occurs. Record error and close connection
|
---|
113 |
|
---|
114 | @param e
|
---|
115 | '''
|
---|
116 | with self._synclock:
|
---|
117 | if not self._error:
|
---|
118 | # maybe log instead?
|
---|
119 | traceback.print_exc()
|
---|
120 | self._error = e
|
---|
121 | self.close()
|
---|
122 |
|
---|
123 | def send(self, data:OUT ) :
|
---|
124 | with self._synclock:
|
---|
125 | if not self._handler:
|
---|
126 | raise ValueError(
|
---|
127 | "BasicConnection has not been initialized or was closed")
|
---|
128 | # it seems there is no InterruptedException possible in python.
|
---|
129 | self._messages.put(Data(data))
|
---|
130 |
|
---|
131 | def getReference(self) -> Reference :
|
---|
132 | return self._reference
|
---|
133 |
|
---|
134 | def getRemoteURI(self)->URI:
|
---|
135 | return self._uri
|
---|
136 |
|
---|
137 | def close(self):
|
---|
138 | with self._synclock:
|
---|
139 | print("flushing and terminating " + str(self))
|
---|
140 | if not self._handler or self._messages.contains(THE_EOS):
|
---|
141 | return
|
---|
142 | # it seems there is no InterruptedException possible in python.
|
---|
143 | self._messages.put(THE_EOS)
|
---|
144 |
|
---|
145 | def __repr__(self):
|
---|
146 | return "BasicConnection[" + str(self._reference) + "]"
|
---|
147 |
|
---|
148 | def getError(self)->Optional[Exception]:
|
---|
149 | return self._error;
|
---|
150 |
|
---|
151 | def isOpen(self)->bool:
|
---|
152 | '''
|
---|
153 | @return true iff this connection is open. Returns false also when then
|
---|
154 | handler is in the close-down process
|
---|
155 | '''
|
---|
156 | return self._handler != None and not self._messages.contains(THE_EOS)
|
---|
157 |
|
---|
158 | #****************** implements listenable ****************
|
---|
159 | # override because notifyListeners should throw exceptions.
|
---|
160 | def addListener(self, l:Listener[IN]):
|
---|
161 | self._listeners.append(l)
|
---|
162 |
|
---|
163 | def removeListener(self, l:Listener[IN] ) :
|
---|
164 | self._listeners.remove(l)
|
---|
165 |
|
---|
166 | def notifyListeners(self, data:IN ) :
|
---|
167 | for l in self._listeners:
|
---|
168 | l.notifyChange(data)
|
---|
169 |
|
---|