source: geniuswebcore/geniusweb/profileconnection/WebSocketContainer.py@ 71

Last change on this file since 71 was 71, checked in by Bart Vastenhouw, 3 years ago

LEARN algorithm now also available. Fixed small issues.

File size: 2.0 KB
RevLine 
[71]1from abc import ABC, abstractmethod
2import threading
3
4from uri.uri import URI # type:ignore
5from websocket._app import WebSocketApp # type:ignore
6
7from geniusweb.profileconnection.Session import Session
8from geniusweb.profileconnection.WebSocketClient import WebSocketClient
9
10
11class WebSocketContainer(ABC):
12 '''
13 mockable interface to websocket system.
14 This allows us to test the WebSocketProfileConnector
15 without having a profilesserver running.
16 This uses websocket_client module (to be pip-installed)
17 '''
18 @abstractmethod
19 def setDefaultMaxTextMessageBufferSize(self, bufsize:int):
20 '''
21 @param bufsize the new buffer size for websocket
22 '''
23
24 @abstractmethod
25 def connectToServer(self, uri:URI, client:WebSocketClient) -> Session:
26 '''
27 @param uri the websocket uri to contact
28 @param client the WebSocketClient to be attached
29 '''
30
31
32class WsSession(Session):
33 '''
34 Websocket based implementation.
35 '''
36 def __init__(self, uri:URI , client:WebSocketClient):
37 self._ws = WebSocketApp(str(uri),
38 on_message = lambda ws,text: client.onMessage(text),
39 on_error = lambda ws,error: client.onError(error),
40 on_close = lambda ws,a,b: client.onClose(), # is really called with 3 args??
41 on_open = lambda ws: client.onOpen(self))
42 threading.Thread(target=lambda:self._ws.run_forever()).start()
43
44
45 def send(self, text: str):
46 self._ws.send(text)
47
48 def close(self):
49 self._ws.close()
50
51 def __repr__(self):
52 return 'Session to '+str(self._ws.url)
53
54
55class DefaultWebSocketContainer(WebSocketContainer):
56 '''
57 "Real" websocket implementation using WebSocketApp
58 '''
59 def setDefaultMaxTextMessageBufferSize(self, bufsize:int):
60 print("WARNING setDefaultMaxTextMessageBufferSize not implemented")
61
62 def connectToServer(self, uri:URI, client:WebSocketClient) -> Session:
63 return WsSession(uri, client)
64
Note: See TracBrowser for help on using the repository browser.