[73] | 1 | from collections import deque
|
---|
| 2 | from multiprocessing import Queue
|
---|
| 3 | from threading import Thread, Condition
|
---|
| 4 | import threading
|
---|
| 5 | import traceback
|
---|
| 6 | from typing import TypeVar, Generic, List, Optional
|
---|
| 7 |
|
---|
| 8 | from tudelft.utilities.listener.Listener import Listener
|
---|
| 9 | from uri.uri import URI
|
---|
| 10 |
|
---|
| 11 | from geniusweb.connection.ConnectionEnd import ConnectionEnd
|
---|
| 12 | from geniusweb.references.Reference import Reference
|
---|
| 13 | from geniusweb.simplerunner.BlockingQueue import BlockingQueue
|
---|
| 14 |
|
---|
| 15 |
|
---|
| 16 | S=TypeVar("S")
|
---|
| 17 |
|
---|
| 18 | class Info (Generic[S]):
|
---|
| 19 | '''
|
---|
| 20 | Wrapper around data so that we can put Null end EOS in a
|
---|
| 21 | {@link ArrayBlockingQueue}
|
---|
| 22 | @param <S> the type of contained data.
|
---|
| 23 | '''
|
---|
| 24 |
|
---|
| 25 | class Data (Info[S]):
|
---|
| 26 |
|
---|
| 27 | def __init__(self, data:S):
|
---|
| 28 | self._data = data
|
---|
| 29 |
|
---|
| 30 | def get(self)->S:
|
---|
| 31 | return self._data;
|
---|
| 32 |
|
---|
| 33 | def __repr__(self):
|
---|
| 34 | return str(self._data)
|
---|
| 35 |
|
---|
| 36 | class EOS (Info[S]):
|
---|
| 37 | '''
|
---|
| 38 | End of stream.
|
---|
| 39 | '''
|
---|
| 40 | def __repr__(self):
|
---|
| 41 | return "EOS"
|
---|
| 42 |
|
---|
| 43 | # I use this single instance everywhere
|
---|
| 44 | THE_EOS:EOS = EOS()
|
---|
| 45 |
|
---|
| 46 |
|
---|
| 47 | IN = TypeVar('IN')
|
---|
| 48 | OUT = TypeVar('OUT')
|
---|
| 49 |
|
---|
| 50 | class BasicConnection(ConnectionEnd[IN, OUT]):
|
---|
| 51 | '''
|
---|
| 52 | A basic connection that implements connection with direct calls
|
---|
| 53 |
|
---|
| 54 | @param <IN> the type of the incoming data
|
---|
| 55 | @param <OUT> the type of outgoing data
|
---|
| 56 | '''
|
---|
| 57 |
|
---|
| 58 | def __init__(self, reference:Reference , uri:URI ):
|
---|
| 59 | '''
|
---|
| 60 | @param reference Reference that was used to create this connection.
|
---|
| 61 | @param uri the URI of the remote endpoint that makes up the
|
---|
| 62 | connection. This is a URI that uniquely identifies the
|
---|
| 63 | remote object
|
---|
| 64 | '''
|
---|
| 65 | self._reference = reference
|
---|
| 66 | self._uri = uri
|
---|
| 67 | self._listeners:List[Listener[IN]] = []
|
---|
| 68 | self._synclock = threading.RLock()
|
---|
| 69 | self._error:Optional[Exception]=None
|
---|
| 70 |
|
---|
| 71 | # to be initialized
|
---|
| 72 | self._handler:Optional[Listener[OUT]] = None
|
---|
| 73 | self._messages = BlockingQueue[Info](4)
|
---|
| 74 |
|
---|
| 75 |
|
---|
| 76 | def init(self, newhandler:Listener[OUT] ) :
|
---|
| 77 | '''
|
---|
| 78 | To be called to hook up the other side that will handle a send action
|
---|
| 79 | from us. Must be called first.
|
---|
| 80 |
|
---|
| 81 | @param newhandler a Listener<OUT> that can handle send actions.
|
---|
| 82 | '''
|
---|
| 83 | if self._handler:
|
---|
| 84 | raise ValueError("already initialized")
|
---|
| 85 | self._handler = newhandler
|
---|
| 86 | this=self
|
---|
| 87 |
|
---|
| 88 | class MyHandlerThread(Thread):
|
---|
| 89 | '''
|
---|
| 90 | thread that handles this._messages until EOS is hit.
|
---|
| 91 | It runs in scope of init and uses 'this'
|
---|
| 92 | '''
|
---|
| 93 | def run(self):
|
---|
| 94 | try:
|
---|
| 95 | while (True):
|
---|
| 96 | #print("INTO"+str(self))
|
---|
| 97 | mess = this._messages.take()
|
---|
| 98 | #print("OUT"+str(self))
|
---|
| 99 | if mess==THE_EOS:
|
---|
| 100 | break;
|
---|
| 101 | this._handler.notifyChange(mess.get())
|
---|
| 102 | except Exception as e:
|
---|
| 103 | this.setError(e)
|
---|
| 104 | this._handler = None
|
---|
| 105 | #print("BasicConnection closed");
|
---|
| 106 |
|
---|
| 107 | handlerThread=MyHandlerThread()
|
---|
| 108 | handlerThread.start();
|
---|
| 109 |
|
---|
| 110 | def setError(self, e:Exception):
|
---|
| 111 | '''
|
---|
| 112 | Error condition occurs. Record error and close connection
|
---|
| 113 |
|
---|
| 114 | @param e
|
---|
| 115 | '''
|
---|
| 116 | with self._synclock:
|
---|
| 117 | if not self._error:
|
---|
| 118 | # maybe log instead?
|
---|
| 119 | traceback.print_exc()
|
---|
| 120 | self._error = e
|
---|
| 121 | self.close()
|
---|
| 122 |
|
---|
| 123 | def send(self, data:OUT ) :
|
---|
| 124 | with self._synclock:
|
---|
| 125 | if not self._handler:
|
---|
| 126 | raise ValueError(
|
---|
| 127 | "BasicConnection has not been initialized or was closed")
|
---|
| 128 | # it seems there is no InterruptedException possible in python.
|
---|
| 129 | self._messages.put(Data(data))
|
---|
| 130 |
|
---|
| 131 | def getReference(self) -> Reference :
|
---|
| 132 | return self._reference
|
---|
| 133 |
|
---|
| 134 | def getRemoteURI(self)->URI:
|
---|
| 135 | return self._uri
|
---|
| 136 |
|
---|
| 137 | def close(self):
|
---|
| 138 | with self._synclock:
|
---|
| 139 | print("flushing and terminating " + str(self))
|
---|
| 140 | if not self._handler or self._messages.contains(THE_EOS):
|
---|
| 141 | return
|
---|
| 142 | # it seems there is no InterruptedException possible in python.
|
---|
| 143 | self._messages.put(THE_EOS)
|
---|
| 144 |
|
---|
| 145 | def __repr__(self):
|
---|
| 146 | return "BasicConnection[" + str(self._reference) + "]"
|
---|
| 147 |
|
---|
| 148 | def getError(self)->Optional[Exception]:
|
---|
| 149 | return self._error;
|
---|
| 150 |
|
---|
| 151 | def isOpen(self)->bool:
|
---|
| 152 | '''
|
---|
| 153 | @return true iff this connection is open. Returns false also when then
|
---|
| 154 | handler is in the close-down process
|
---|
| 155 | '''
|
---|
| 156 | return self._handler != None and not self._messages.contains(THE_EOS)
|
---|
| 157 |
|
---|
| 158 | #****************** implements listenable ****************
|
---|
| 159 | # override because notifyListeners should throw exceptions.
|
---|
| 160 | def addListener(self, l:Listener[IN]):
|
---|
| 161 | self._listeners.append(l)
|
---|
| 162 |
|
---|
| 163 | def removeListener(self, l:Listener[IN] ) :
|
---|
| 164 | self._listeners.remove(l)
|
---|
| 165 |
|
---|
| 166 | def notifyListeners(self, data:IN ) :
|
---|
| 167 | for l in self._listeners:
|
---|
| 168 | l.notifyChange(data)
|
---|
| 169 |
|
---|