1 | from builtins import Exception
|
---|
2 | import json
|
---|
3 | import logging
|
---|
4 | from time import sleep
|
---|
5 |
|
---|
6 | from pyson.ObjectMapper import ObjectMapper
|
---|
7 | from tudelft.utilities.listener.DefaultListenable import DefaultListenable
|
---|
8 | from tudelft_utilities_logging.Reporter import Reporter
|
---|
9 | from uri import URI # type:ignore
|
---|
10 | import websocket # type:ignore
|
---|
11 |
|
---|
12 | from geniusweb.profile.Profile import Profile
|
---|
13 | from geniusweb.profile.Profile import Profile
|
---|
14 | from geniusweb.profileconnection.ProfileInterface import ProfileInterface
|
---|
15 | from geniusweb.profileconnection.Session import Session
|
---|
16 | from geniusweb.profileconnection.WebSocketClient import WebSocketClient
|
---|
17 | from geniusweb.profileconnection.WebSocketContainer import WebSocketContainer
|
---|
18 |
|
---|
19 |
|
---|
20 | class WebsocketProfileConnector (DefaultListenable[Profile], ProfileInterface, WebSocketClient):
|
---|
21 |
|
---|
22 | _pyson:ObjectMapper = ObjectMapper()
|
---|
23 | TIMEOUT_MS = 2000
|
---|
24 | _profile:Profile = None #type:ignore
|
---|
25 | _session=None
|
---|
26 |
|
---|
27 | def __init__(self, uri:URI , reporter:Reporter , wscontainer:WebSocketContainer):
|
---|
28 | '''
|
---|
29 | @param uri the URI that will provide the {@link Profile}
|
---|
30 | @param reporter the {@link Reporter} for logging issues
|
---|
31 | @param wscontainer the {@link WebSocketContainer} that can provide new
|
---|
32 | websockets. Typically ContainerProvider
|
---|
33 | .getWebSocketContainer()
|
---|
34 | @throws DeploymentException if the annotated endpoint instance is not
|
---|
35 | valid
|
---|
36 | @throws IOException if there was a network or protocol problem
|
---|
37 | that prevented the client endpoint being
|
---|
38 | connected to its server.
|
---|
39 | '''
|
---|
40 | super().__init__()
|
---|
41 | if uri == None or reporter == None or wscontainer == None:
|
---|
42 | raise ValueError("uri, reporter and wsconnector must be not null")
|
---|
43 |
|
---|
44 | self._uri = uri;
|
---|
45 | self._logger = reporter;
|
---|
46 | self._wscontainer = wscontainer
|
---|
47 | # see #1763 expected max size of profiles.
|
---|
48 | wscontainer.setDefaultMaxTextMessageBufferSize(200000)
|
---|
49 | wscontainer.connectToServer( uri, self)
|
---|
50 |
|
---|
51 | def onOpen(self, session:Session):
|
---|
52 | self._session=session
|
---|
53 | self._logger.log(logging.INFO, "Connected: " + str(session))
|
---|
54 |
|
---|
55 | def onClose(self):
|
---|
56 | if self._session :
|
---|
57 | self._logger.log(logging.INFO,"Closed websocket: " + str(self._session))
|
---|
58 | self._session = None
|
---|
59 |
|
---|
60 | def onMessage(self, message:str):
|
---|
61 | try:
|
---|
62 | # this will be called every time the profile changes.
|
---|
63 | self._logger.log(logging.INFO, "Received profile: " + message);
|
---|
64 | profile:Profile = self._pyson.parse(json.loads(message), Profile) #type:ignore
|
---|
65 | self._profile = profile;
|
---|
66 | self.notifyListeners(profile);
|
---|
67 | except Exception as e:
|
---|
68 | self._logger.log(logging.CRITICAL,
|
---|
69 | "Something went wrong while processing message "+repr(message), e)
|
---|
70 |
|
---|
71 |
|
---|
72 | def onError(self, e: BaseException):
|
---|
73 | self._logger.log(logging.CRITICAL,
|
---|
74 | "Something went wrong while processing downloaded profile "+repr(self._uri), e)
|
---|
75 |
|
---|
76 | #Override
|
---|
77 | def getProfile(self) ->Profile :
|
---|
78 | remaining_wait = self.TIMEOUT_MS;
|
---|
79 | while self._profile == None and remaining_wait > 0:
|
---|
80 | remaining_wait -= 100
|
---|
81 | sleep(0.1)
|
---|
82 | if self._profile == None:
|
---|
83 | raise IOError("Server is not responding. failed to fetch profile from " + str(self._uri))
|
---|
84 |
|
---|
85 | return self._profile
|
---|
86 |
|
---|
87 | def close(self):
|
---|
88 | if self._session != None:
|
---|
89 | self._session.close()
|
---|