source: src/main/java/geniusweb/runserver/WebSocketProtToPartyConn.java@ 40

Last change on this file since 40 was 40, checked in by bart, 3 years ago

Refactor to help reusing partiesserver.

File size: 8.1 KB
RevLine 
[40]1package geniusweb.runserver;
2
3import java.io.BufferedReader;
4import java.io.IOException;
5import java.io.InputStreamReader;
6import java.net.HttpURLConnection;
7import java.net.URI;
8import java.net.URISyntaxException;
9import java.util.logging.Level;
10
11import javax.websocket.ClientEndpoint;
12import javax.websocket.CloseReason;
13import javax.websocket.ContainerProvider;
14import javax.websocket.DeploymentException;
15import javax.websocket.OnClose;
16import javax.websocket.OnError;
17import javax.websocket.OnMessage;
18import javax.websocket.OnOpen;
19import javax.websocket.Session;
20import javax.websocket.WebSocketContainer;
21
22import geniusweb.actions.Action;
23import geniusweb.actions.PartyId;
24import geniusweb.inform.Inform;
25import geniusweb.protocol.partyconnection.ProtocolToPartyConn;
26import geniusweb.references.Reference;
27import tudelft.utilities.listener.DefaultListenable;
28import tudelft.utilities.logging.ReportToLogger;
29import tudelft.utilities.logging.Reporter;
30
31/**
32 * A websocket based {@link ProtocolToPartyConn}. it contains a websocket
33 * connection to a party (that will be started using the partyserver
34 * start-new-party protocol). Must be public so that it can be accessed as
35 * ClientEndpoint by apache tomcat. This is internally used.
36 *
37 * <p>
38 * The @ClientEndpoint annotation is important, the runserver is creating these
39 * sockets programatically, from a received ws: URL.
40 */
41@ClientEndpoint
42public class WebSocketProtToPartyConn extends DefaultListenable<Action>
43 implements ProtocolToPartyConn {
44 // 20MB seems reasonable limit?
45 private static final int MAX_MESSAGE_SIZE = 20 * 1024 * 1024;
46 private final Reporter log;
47
48 private final Reference reference;
49
50 private WebSocketContainer container;
51 private Session session = null;
52 private URI partyuri;
53 private Throwable error = null;
54
55 /**
56 * Buffers incoming text.
57 */
58 private StringBuffer buffer = new StringBuffer();
59
60 /**
61 * Constructor that uses default {@link Reporter}
62 *
63 * @param reference the address to to a http GET to get a websocket to a new
64 * running party.
65 */
66 public WebSocketProtToPartyConn(Reference reference) {
67 this(reference, new ReportToLogger("runserver"));
68 }
69
70 /**
71 *
72 * @param reference the address to to a http GET to get a websocket to a new
73 * running party.
74 * @param reporter the {@link Reporter} used for logging important
75 * events/issues
76 * @throws IOException if the party does not start properly.
77 */
78 public WebSocketProtToPartyConn(Reference reference, Reporter reporter) {
79 this.reference = reference;
80 this.log = reporter;
81
82 }
83
84 public WebSocketProtToPartyConn init() throws IOException {
85 log.log(Level.INFO, "Trying to start up party " + reference);
86 URI partyuri = startParty(reference);
87
88 container = ContainerProvider.getWebSocketContainer();
89 // #50 disable: might cause large buffer. And why is this a good limit
90 // anyway
91 // container.setDefaultMaxTextMessageBufferSize(200000);
92
93 log.log(Level.INFO,
94 "Trying to make websocket connection to running party "
95 + partyuri);
96 try {
97 container.connectToServer(this, partyuri);
98 } catch (DeploymentException e) {
99 throw new IOException(
100 "Failed to connect with running party at " + partyuri, e);
101 }
102 log.log(Level.INFO, "Created websocket connection at " + partyuri);
103 return this;
104 }
105
106 /**
107 *
108 * @param reference the address to to a http GET to get a websocket to a new
109 * running party.
110 * @return URI of websocket behind which is a running party
111 * @throws IOException if party did not start correctly
112 */
113 private URI startParty(Reference reference) throws IOException {
114 HttpURLConnection conn = (HttpURLConnection) reference.getURI().toURL()
115 .openConnection();
116 // if this throws, the IOException seems to contain the "retry later"
117 // message already
118 conn.setRequestMethod("GET");
119 conn.setReadTimeout(20000);
120 conn.connect();
121 BufferedReader rd;
122 try {
123 rd = new BufferedReader(
124 new InputStreamReader(conn.getInputStream()));
125 } catch (IOException e) {
126 if (conn.getErrorStream() == null) {
127 throw new IOException("Failed to connect " + reference
128 + ", no server error message", e);
129 }
130 // read the detail message from the error stream
131 rd = new BufferedReader(
132 new InputStreamReader(conn.getErrorStream()));
133 throw new IOException(rd.readLine(), e);
134 }
135 String partysocketaddress = rd.readLine();
136 log.log(Level.INFO, "startparty returned " + partysocketaddress);
137 if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
138 throw new IOException(
139 "Something went wrong connecting to the party:"
140 + conn.getResponseCode());
141 }
142 try {
143 this.partyuri = new URI(partysocketaddress);
144 } catch (URISyntaxException e1) {
145 throw new IOException("Failed to start party at " + reference, e1);
146 }
147 return partyuri;
148 }
149
150 /************* implements {@link ConnectionWithParty} ***************/
151 @Override
152 public void send(Inform info) throws IOException {
153 if (session == null)
154 return;
155 session.getBasicRemote()
156 .sendText(Jackson.instance().writeValueAsString(info));
157 }
158
159 @Override
160 public Reference getReference() {
161 return this.reference;
162 }
163
164 @Override
165 public PartyId getParty() {
166 String name = partyuri.getPath();
167 return new PartyId(name.substring(name.lastIndexOf('/') + 1));
168 }
169
170 @Override
171 public void close() {
172 log.log(Level.INFO, "Party close called");
173 // a close event is not necessarily a bug?
174 // setError(new SocketException("socket is closed"));
175 try {
176 session.close();
177 this.session = null;
178 } catch (IOException e) {
179 log.log(Level.SEVERE, "failed to close websocket", e);
180 }
181 }
182
183 /************* implements ClientEndpoint ***************/
184
185 @OnOpen
186 public void onOpen(Session session) {
187 log.log(Level.INFO, "Party is being connected: " + session);
188// #50 we now buffer partial messages
189// session.setMaxTextMessageBufferSize(MAX_MESSAGE_SIZE);
190// session.setMaxBinaryMessageBufferSize(MAX_MESSAGE_SIZE);
191 this.session = session;
192 }
193
194 @OnClose
195 public void onClose(Session userSession, CloseReason reason) {
196 log.log(Level.INFO, "closing websocket " + reference + ":" + reason);
197 if (reason.getReasonPhrase() != null
198 && !reason.getReasonPhrase().isEmpty()) {
199 setError(new RemoteException("Session " + reference
200 + " closed abnormally (see party server for more details) "
201 + reason.getReasonPhrase()));
202 }
203 // seems NOT an error. Unless the party deliberately and silently
204 // closes its connection.
205 // Setting error will cause a print of stacktrace that is disturbing.
206 // It seems we can not check at this point if session was finished.
207 // setError(new SocketException("socket has been closed"));
208 }
209
210 @OnMessage
211 public void processMessage(String message, boolean islast) {
212 // #50 avoid fixed (large) buffers
213 buffer.append(message);
214 if (islast) {
215 String msg = buffer.toString();
216 buffer = new StringBuffer();
217 try {
218 Action act = Jackson.instance().readValue(msg, Action.class);
219 notifyListeners(act);
220 } catch (IOException e) {
221 log.log(Level.WARNING,
222 "received bad message from client " + reference, e);
223 setError(e);
224 close();
225 }
226 }
227 }
228
229 @OnError
230 public void processError(Throwable t) {
231 log.log(Level.SEVERE, "Something went wrong internally!", t);
232 setError(t);
233 }
234
235 @Override
236 public URI getRemoteURI() {
237 return partyuri;
238 }
239
240 @Override
241 public Throwable getError() {
242 return error;
243 }
244
245 /**
246 * Sets the connection to final error state and report null to listeners.
247 * Only the first error is reported, the rest is ignored.
248 *
249 * @param err the error that occurred.
250 */
251 private void setError(Throwable err) {
252 boolean isSet = false;
253 synchronized (this) {
254 if (this.error == null) {
255 this.error = err;
256 isSet = true;
257 }
258 }
259 if (isSet) {
260 // special event that tells protocol that we're probably in error
261 // state.
262 notifyListeners(null);
263 }
264 }
265
266}
267
268/**
269 * Indicates an exceeption occured on a remote machine. The remote machine did
270 * not pass a stacktrace (because it does not fit into a websocket close call).
271 *
272 */
273class RemoteException extends Exception {
274 public RemoteException(String message) {
275 super(message, null, false, false);
276
277 }
278}
Note: See TracBrowser for help on using the repository browser.