source: src/main/java/geniusweb/runserver/WebSocketProtToPartyConn.java@ 33

Last change on this file since 33 was 33, checked in by bart, 3 years ago

LEARN algorithm now also available. Fixed small issues.

File size: 7.7 KB
RevLine 
[33]1package geniusweb.runserver;
2
3import java.io.BufferedReader;
4import java.io.IOException;
5import java.io.InputStreamReader;
6import java.net.HttpURLConnection;
7import java.net.URI;
8import java.net.URISyntaxException;
9import java.util.logging.Level;
10
11import javax.websocket.ClientEndpoint;
12import javax.websocket.CloseReason;
13import javax.websocket.ContainerProvider;
14import javax.websocket.DeploymentException;
15import javax.websocket.OnClose;
16import javax.websocket.OnError;
17import javax.websocket.OnMessage;
18import javax.websocket.OnOpen;
19import javax.websocket.Session;
20import javax.websocket.WebSocketContainer;
21
22import geniusweb.actions.Action;
23import geniusweb.actions.PartyId;
24import geniusweb.inform.Inform;
25import geniusweb.protocol.partyconnection.ProtocolToPartyConn;
26import geniusweb.references.Reference;
27import tudelft.utilities.listener.DefaultListenable;
28import tudelft.utilities.logging.ReportToLogger;
29import tudelft.utilities.logging.Reporter;
30
31/**
32 * A websocket based {@link ProtocolToPartyConn}. it contains a websocket
33 * connection to a party (that will be started using the partyserver
34 * start-new-party protocol). Must be public so that it can be accessed as
35 * ClientEndpoint by apache tomcat. This is internally used.
36 *
37 * <p>
38 * The @ClientEndpoint annotation is important, the runserver is creating these
39 * sockets programatically, from a received ws: URL.
40 */
41@ClientEndpoint
42public class WebSocketProtToPartyConn extends DefaultListenable<Action>
43 implements ProtocolToPartyConn {
44 // 20MB seems reasonable limit?
45 private static final int MAX_MESSAGE_SIZE = 20 * 1024 * 1024;
46 private final Reporter log;
47
48 private final Reference reference;
49
50 private WebSocketContainer container;
51 private Session session = null;
52 private URI partyuri;
53 private Throwable error = null;
54
55 /**
56 * Constructor that uses default {@link Reporter}
57 *
58 * @param reference the address to to a http GET to get a websocket to a new
59 * running party.
60 */
61 public WebSocketProtToPartyConn(Reference reference) {
62 this(reference, new ReportToLogger("runserver"));
63 }
64
65 /**
66 *
67 * @param reference the address to to a http GET to get a websocket to a new
68 * running party.
69 * @param reporter the {@link Reporter} used for logging important
70 * events/issues
71 * @throws IOException if the party does not start properly.
72 */
73 public WebSocketProtToPartyConn(Reference reference, Reporter reporter) {
74 this.reference = reference;
75 this.log = reporter;
76
77 }
78
79 public WebSocketProtToPartyConn init() throws IOException {
80 log.log(Level.INFO, "Trying to start up party " + reference);
81 URI partyuri = startParty(reference);
82
83 container = ContainerProvider.getWebSocketContainer();
84 container.setDefaultMaxTextMessageBufferSize(200000);
85
86 log.log(Level.INFO,
87 "Trying to make websocket connection to running party "
88 + partyuri);
89 try {
90 container.connectToServer(this, partyuri);
91 } catch (DeploymentException e) {
92 throw new IOException(
93 "Failed to connect with running party at " + partyuri, e);
94 }
95 log.log(Level.INFO, "Created websocket connection at " + partyuri);
96 return this;
97 }
98
99 /**
100 *
101 * @param reference the address to to a http GET to get a websocket to a new
102 * running party.
103 * @return URI of websocket behind which is a running party
104 * @throws IOException if party did not start correctly
105 */
106 private URI startParty(Reference reference) throws IOException {
107 HttpURLConnection conn = (HttpURLConnection) reference.getURI().toURL()
108 .openConnection();
109 // if this throws, the IOException seems to contain the "retry later"
110 // message already
111 conn.setRequestMethod("GET");
112 conn.setReadTimeout(20000);
113 conn.connect();
114 BufferedReader rd;
115 try {
116 rd = new BufferedReader(
117 new InputStreamReader(conn.getInputStream()));
118 } catch (IOException e) {
119 if (conn.getErrorStream() == null) {
120 throw new IOException("Failed to connect " + reference
121 + ", no server error message", e);
122 }
123 // read the detail message from the error stream
124 rd = new BufferedReader(
125 new InputStreamReader(conn.getErrorStream()));
126 throw new IOException(rd.readLine(), e);
127 }
128 String partysocketaddress = rd.readLine();
129 log.log(Level.INFO, "startparty returned " + partysocketaddress);
130 if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
131 throw new IOException(
132 "Something went wrong connecting to the party:"
133 + conn.getResponseCode());
134 }
135 try {
136 this.partyuri = new URI(partysocketaddress);
137 } catch (URISyntaxException e1) {
138 throw new IOException("Failed to start party at " + reference, e1);
139 }
140 return partyuri;
141 }
142
143 /************* implements {@link ConnectionWithParty} ***************/
144 @Override
145 public void send(Inform info) throws IOException {
146 if (session == null)
147 return;
148 session.getBasicRemote()
149 .sendText(Jackson.instance().writeValueAsString(info));
150 }
151
152 @Override
153 public Reference getReference() {
154 return this.reference;
155 }
156
157 @Override
158 public PartyId getParty() {
159 String name = partyuri.getPath();
160 return new PartyId(name.substring(name.lastIndexOf('/') + 1));
161 }
162
163 @Override
164 public void close() {
165 log.log(Level.INFO, "Party close called");
166 // a close event is not necessarily a bug?
167 // setError(new SocketException("socket is closed"));
168 try {
169 session.close();
170 this.session = null;
171 } catch (IOException e) {
172 log.log(Level.SEVERE, "failed to close websocket", e);
173 }
174 }
175
176 /************* implements ClientEndpoint ***************/
177
178 @OnOpen
179 public void onOpen(Session session) {
180 log.log(Level.INFO, "Party is being connected: " + session);
181 session.setMaxTextMessageBufferSize(MAX_MESSAGE_SIZE);
182 session.setMaxBinaryMessageBufferSize(MAX_MESSAGE_SIZE);
183 this.session = session;
184 }
185
186 @OnClose
187 public void onClose(Session userSession, CloseReason reason) {
188 log.log(Level.INFO, "closing websocket " + reference + ":" + reason);
189 if (reason.getReasonPhrase() != null
190 && !reason.getReasonPhrase().isEmpty()) {
191 setError(new RemoteException("Session " + reference
192 + " closed abnormally (see party server for more details) "
193 + reason.getReasonPhrase()));
194 }
195 // seems NOT an error. Unless the party deliberately and silently
196 // closes its connection.
197 // Setting error will cause a print of stacktrace that is disturbing.
198 // It seems we can not check at this point if session was finished.
199 // setError(new SocketException("socket has been closed"));
200 }
201
202 @OnMessage
203 public void processMessage(String message) {
204 try {
205 Action act = Jackson.instance().readValue(message, Action.class);
206 notifyListeners(act);
207 } catch (IOException e) {
208 log.log(Level.WARNING,
209 "received bad message from client " + reference, e);
210 setError(e);
211 close();
212 }
213 }
214
215 @OnError
216 public void processError(Throwable t) {
217 log.log(Level.SEVERE, "Something went wrong internally!", t);
218 setError(t);
219 }
220
221 @Override
222 public URI getRemoteURI() {
223 return partyuri;
224 }
225
226 @Override
227 public Throwable getError() {
228 return error;
229 }
230
231 /**
232 * Sets the connection to final error state and report null to listeners.
233 * Only the first error is reported, the rest is ignored.
234 *
235 * @param err the error that occurred.
236 */
237 private void setError(Throwable err) {
238 boolean isSet = false;
239 synchronized (this) {
240 if (this.error == null) {
241 this.error = err;
242 isSet = true;
243 }
244 }
245 if (isSet) {
246 // special event that tells protocol that we're probably in error
247 // state.
248 notifyListeners(null);
249 }
250 }
251
252}
253
254/**
255 * Indicates an exceeption occured on a remote machine. The remote machine did
256 * not pass a stacktrace (because it does not fit into a websocket close call).
257 *
258 */
259class RemoteException extends Exception {
260 public RemoteException(String message) {
261 super(message, null, false, false);
262
263 }
264}
Note: See TracBrowser for help on using the repository browser.