source: src/main/java/geniusweb/runserver/WebSocketProtToPartyConn.java@ 27

Last change on this file since 27 was 27, checked in by bart, 4 years ago

New protocols Learn and APPLearn. Fixed memory leak.

File size: 7.5 KB
Line 
1package geniusweb.runserver;
2
3import java.io.BufferedReader;
4import java.io.IOException;
5import java.io.InputStreamReader;
6import java.net.HttpURLConnection;
7import java.net.SocketException;
8import java.net.URI;
9import java.net.URISyntaxException;
10import java.util.logging.Level;
11
12import javax.websocket.ClientEndpoint;
13import javax.websocket.CloseReason;
14import javax.websocket.ContainerProvider;
15import javax.websocket.DeploymentException;
16import javax.websocket.OnClose;
17import javax.websocket.OnError;
18import javax.websocket.OnMessage;
19import javax.websocket.OnOpen;
20import javax.websocket.Session;
21import javax.websocket.WebSocketContainer;
22
23import geniusweb.actions.Action;
24import geniusweb.actions.PartyId;
25import geniusweb.inform.Inform;
26import geniusweb.protocol.partyconnection.ProtocolToPartyConn;
27import geniusweb.references.Reference;
28import tudelft.utilities.listener.DefaultListenable;
29import tudelft.utilities.logging.ReportToLogger;
30import tudelft.utilities.logging.Reporter;
31
32/**
33 * A websocket based {@link ProtocolToPartyConn}. it contains a websocket
34 * connection to a party (that will be started using the partyserver
35 * start-new-party protocol). Must be public so that it can be accessed as
36 * ClientEndpoint by apache tomcat. This is internally used.
37 *
38 * <p>
39 * The @ClientEndpoint annotation is important, the runserver is creating these
40 * sockets programatically, from a received ws: URL.
41 */
42@ClientEndpoint
43public class WebSocketProtToPartyConn extends DefaultListenable<Action>
44 implements ProtocolToPartyConn {
45 // 20MB seems reasonable limit?
46 private static final int MAX_MESSAGE_SIZE = 20 * 1024 * 1024;
47 private final Reporter log;
48
49 private final Reference reference;
50
51 private WebSocketContainer container;
52 private Session session = null;
53 private URI partyuri;
54 private Throwable error = null;
55
56 /**
57 * Constructor that uses default {@link Reporter}
58 *
59 * @param reference the address to to a http GET to get a websocket to a new
60 * running party.
61 */
62 public WebSocketProtToPartyConn(Reference reference) {
63 this(reference, new ReportToLogger("runserver"));
64 }
65
66 /**
67 *
68 * @param reference the address to to a http GET to get a websocket to a new
69 * running party.
70 * @param reporter the {@link Reporter} used for logging important
71 * events/issues
72 * @throws IOException if the party does not start properly.
73 */
74 public WebSocketProtToPartyConn(Reference reference, Reporter reporter) {
75 this.reference = reference;
76 this.log = reporter;
77
78 }
79
80 public WebSocketProtToPartyConn init() throws IOException {
81 log.log(Level.INFO, "Trying to start up party " + reference);
82 URI partyuri = startParty(reference);
83
84 container = ContainerProvider.getWebSocketContainer();
85 container.setDefaultMaxTextMessageBufferSize(200000);
86
87 log.log(Level.INFO,
88 "Trying to make websocket connection to running party "
89 + partyuri);
90 try {
91 container.connectToServer(this, partyuri);
92 } catch (DeploymentException e) {
93 throw new IOException(
94 "Failed to connect with running party at " + partyuri, e);
95 }
96 log.log(Level.INFO, "Created websocket connection at " + partyuri);
97 return this;
98 }
99
100 /**
101 *
102 * @param reference the address to to a http GET to get a websocket to a new
103 * running party.
104 * @return URI of websocket behind which is a running party
105 * @throws IOException if party did not start correctly
106 */
107 private URI startParty(Reference reference) throws IOException {
108 HttpURLConnection conn = (HttpURLConnection) reference.getURI().toURL()
109 .openConnection();
110 // if this throws, the IOException seems to contain the "retry later"
111 // message already
112 conn.setRequestMethod("GET");
113 conn.setReadTimeout(20000);
114 conn.connect();
115 BufferedReader rd;
116 try {
117 rd = new BufferedReader(
118 new InputStreamReader(conn.getInputStream()));
119 } catch (IOException e) {
120 if (conn.getErrorStream() == null) {
121 throw new IOException("Failed to connect " + reference
122 + ", no server error message", e);
123 }
124 // read the detail message from the error stream
125 rd = new BufferedReader(
126 new InputStreamReader(conn.getErrorStream()));
127 throw new IOException(rd.readLine(), e);
128 }
129 String partysocketaddress = rd.readLine();
130 log.log(Level.INFO, "startparty returned " + partysocketaddress);
131 if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
132 throw new IOException(
133 "Something went wrong connecting to the party:"
134 + conn.getResponseCode());
135 }
136 try {
137 this.partyuri = new URI(partysocketaddress);
138 } catch (URISyntaxException e1) {
139 throw new IOException("Failed to start party at " + reference, e1);
140 }
141 return partyuri;
142 }
143
144 /************* implements {@link ConnectionWithParty} ***************/
145 @Override
146 public void send(Inform info) throws IOException {
147 if (session == null)
148 return;
149 session.getBasicRemote()
150 .sendText(Jackson.instance().writeValueAsString(info));
151 }
152
153 @Override
154 public Reference getReference() {
155 return this.reference;
156 }
157
158 @Override
159 public PartyId getParty() {
160 String name = partyuri.getPath();
161 return new PartyId(name.substring(name.lastIndexOf('/') + 1));
162 }
163
164 @Override
165 public void close() {
166 log.log(Level.INFO, "Party close called");
167 // a close event is not necessarily a bug?
168 // setError(new SocketException("socket is closed"));
169 try {
170 session.close();
171 this.session = null;
172 } catch (IOException e) {
173 log.log(Level.SEVERE, "failed to close websocket", e);
174 }
175 }
176
177 /************* implements ClientEndpoint ***************/
178
179 @OnOpen
180 public void onOpen(Session session) {
181 log.log(Level.INFO, "Party is being connected: " + session);
182 session.setMaxTextMessageBufferSize(MAX_MESSAGE_SIZE);
183 session.setMaxBinaryMessageBufferSize(MAX_MESSAGE_SIZE);
184 this.session = session;
185 }
186
187 @OnClose
188 public void onClose(Session userSession, CloseReason reason) {
189 log.log(Level.INFO, "closing websocket " + reference + ":" + reason);
190 if (reason.getReasonPhrase() != null) {
191 setError(new RemoteException("Session " + reference
192 + " closed abnoramlly (see party server for more details) "
193 + reason.getReasonPhrase()));
194 return;
195 }
196 setError(new SocketException("socket has been closed"));
197 }
198
199 @OnMessage
200 public void processMessage(String message) {
201 try {
202 Action act = Jackson.instance().readValue(message, Action.class);
203 notifyListeners(act);
204 } catch (IOException e) {
205 log.log(Level.WARNING,
206 "received bad message from client " + reference, e);
207 setError(e);
208 close();
209 }
210 }
211
212 @OnError
213 public void processError(Throwable t) {
214 log.log(Level.SEVERE, "Something went wrong internally!", t);
215 setError(t);
216 }
217
218 @Override
219 public URI getRemoteURI() {
220 return partyuri;
221 }
222
223 @Override
224 public Throwable getError() {
225 return error;
226 }
227
228 /**
229 * Sets the connection to final error state and report null to listeners.
230 * Only the first error is reported, the rest is ignored.
231 *
232 * @param err the error that occurred.
233 */
234 private void setError(Throwable err) {
235 boolean isSet = false;
236 synchronized (this) {
237 if (this.error == null) {
238 this.error = err;
239 isSet = true;
240 }
241 }
242 if (isSet) {
243 // special event that tells protocol that we're probably in error
244 // state.
245 notifyListeners(null);
246 }
247 }
248
249}
250
251/**
252 * Indicates an exceeption occured on a remote machine. The remote machine did
253 * not pass a stacktrace (because it does not fit into a websocket close call).
254 *
255 */
256class RemoteException extends Exception {
257 public RemoteException(String message) {
258 super(message, null, false, false);
259
260 }
261}
Note: See TracBrowser for help on using the repository browser.