source: src/main/java/geniusweb/partiesserver/websocket/PartySocket.java@ 46

Last change on this file since 46 was 46, checked in by ruud, 20 months ago

Fixed small issues in domaineditor.

File size: 9.1 KB
RevLine 
[46]1package geniusweb.partiesserver.websocket;
2
3import java.io.EOFException;
4import java.io.IOException;
5import java.net.URI;
6import java.util.Date;
7import java.util.List;
8import java.util.concurrent.CopyOnWriteArrayList;
9import java.util.logging.Level;
10
11import javax.websocket.CloseReason;
12import javax.websocket.CloseReason.CloseCodes;
13import javax.websocket.OnClose;
14import javax.websocket.OnError;
15import javax.websocket.OnMessage;
16import javax.websocket.OnOpen;
17import javax.websocket.Session;
18import javax.websocket.server.PathParam;
19import javax.websocket.server.ServerEndpoint;
20
21import geniusweb.actions.Action;
22import geniusweb.actions.PartyId;
23import geniusweb.connection.ConnectionEnd;
24import geniusweb.inform.Inform;
25import geniusweb.inform.Settings;
26import geniusweb.partiesserver.Constants;
27import geniusweb.partiesserver.repository.RunningPartiesRepo;
28import geniusweb.partiesserver.repository.RunningParty;
29import geniusweb.party.Party;
30import geniusweb.references.Reference;
31import tudelft.utilities.listener.Listener;
32import tudelft.utilities.logging.ReportToLogger;
33import tudelft.utilities.logging.Reporter;
34
35/**
36 * Returns a websocket that communicates with the {@link RunningParty}. The
37 * websocket is initiated/called/requested by an external party, typicall hte
38 * protocol running on another server. We close the socket when we detect the
39 * party is gone (eg, due to a bug in the party or a time-out). If the socket
40 * breaks, we close the party as it becomes isolated.
41 */
42@ServerEndpoint("/party/{party}")
43public class PartySocket implements ConnectionEnd<Inform, Action> {
44
45 private final Reporter log;
46 private final RunningPartiesRepo runningparties;
47 private final List<Listener<Inform>> listeners = new CopyOnWriteArrayList<Listener<Inform>>();
48 private final ActiveThreads threads;
49
50 // should all be final, except that we can only set them when start is
51 // called...
52 private Session session;
53
54 private PartyId partyID;
55
56 private SendBuffer outstream;
57 private Throwable error = null;
58
59 private StringBuffer buffer = new StringBuffer();
60
61 public PartySocket() {
62 this(RunningPartiesRepo.instance(),
63 new ReportToLogger("partiesserver"));
64 }
65
66 public PartySocket(RunningPartiesRepo parties, Reporter reporter) {
67 this.runningparties = parties;
68 this.log = reporter;
69 this.threads = new ActiveThreads(log);
70 }
71
72 @OnOpen
73 public void start(Session session,
74 @PathParam("party") final String partyidname) throws IOException {
75 if (partyidname == null) {
76 throw new IllegalArgumentException("party can't be null");
77 }
78
79 this.session = session;
80 this.partyID = new PartyId(partyidname);
81 this.outstream = new SendBuffer(session.getBasicRemote(), log);
82
83 // listen to parties repo, so that we can act if party is terminated
84 runningparties.addListener(new Listener<PartyId>() {
85 @Override
86 public void notifyChange(PartyId data) {
87 if (runningparties.get(partyID) == null) {
88 // party was removed, probably due to time out.
89 try {
90 log.log(Level.INFO,
91 "Party " + partyID + " was terminated ");
92 runningparties.removeListener(this);
93 session.close(new CloseReason(CloseCodes.GOING_AWAY,
94 "detected that party vanished"));
95 } catch (IOException e) {
96 log.log(Level.WARNING,
97 "failed to close the socket for " + partyID, e);
98 }
99 }
100 }
101 });
102 RunningParty runningparty = runningparties.get(partyID);
103
104 if (runningparty == null) {
105 session.close(new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT,
106 "No such party: " + partyID));
107 } else {
108 runningparties.replace(runningparty.withConnection(this));
109 }
110
111 // we do not listen for changes on the Parties factory as we can't
112 // replace the running party anyway
113 }
114
115 /**
116 * Called when a message comes in on the server socket that needs to be
117 * passed into the {@link Party}. We also sniff the message for termination
118 * indicators.
119 *
120 * @param informmessage the incoming string, supposedly a JSON-fornatted
121 * {@link Inform}
122 * @param last true if this is the last part of the incoming
123 * message. We queue these manually here, to avoid
124 * pre-allocating a fixed, very large memory block.
125 * @param session the session. We already have this info.
126 * @throws IOException if there is a socket/connection error, we can't parse
127 * the received data, etc
128 */
129 @OnMessage
130 public void onMessage(String informpart, boolean islast, Session session)
131 throws IOException {
132 if (this.session != session) {
133 throw new IllegalStateException("Unexpected change of session ID");
134 }
135 if (!threads.isEmpty()) {
136 log.log(Level.WARNING, "Party " + partyID
137 + " is still busy with previous message");
138 }
139 buffer.append(informpart);
140 if (!islast)
141 return;
142
143 String informmessage = buffer.toString();
144 buffer = new StringBuffer();
145
146 // if we get here, full message is now in
147
148 Inform info = Constants.getJackson().readValue(informmessage, Inform.class);
149 RunningParty party = runningparties.get(partyID);
150 if (party != null) {
151 // first sniff, to ensure the deadlines are updated first.
152 sniff(info, party);
153 log.log(Level.FINE, "Inform " + partyID + ": " + info);
154 try {
155 threads.add();
156 party.inform(info);
157 threads.remove();
158 } catch (Throwable e) {
159 threads.remove();
160 if (e instanceof ThreadDeath)
161 /*
162 * ThreadDeath always prints stacktrace anwyay, stacktrace
163 * is useless because it contains the thrower's stacktrace
164 * instead of a useful message.
165 */
166 log.log(Level.WARNING,
167 "Party was killed while handling inform.");
168 else
169 log.log(Level.WARNING, "Party failed on inform.", e);
170 /*
171 * severe as someone wants to debug that. Not severe for us we
172 * don't use jackson.writeValueAsString(e) here because
173 * CloseReason has 123 char limit. The error will be too large
174 * to fit and will be truncated therefore we send a plain string
175 * and hope that enough will arrive at the other side (the
176 * protocol)
177 */
178 session.close(new CloseReason(
179 CloseReason.CloseCodes.CLOSED_ABNORMALLY,
180 "party threw exception: " + collectReasons(e)));
181 }
182 } // else party is dead but that's handled in the listener above
183 }
184
185 /**
186 *
187 * @param e an exception
188 * @return collect all reasons and sub-reasons
189 */
190 private String collectReasons(Throwable e) {
191 String reason = "";
192 do {
193 reason = reason + " : " + e.getClass().getSimpleName() + ":"
194 + e.getMessage();
195 e = e.getCause();
196 } while (e != null);
197 return reason;
198 }
199
200 /**
201 * Sniffs the Inform for termination-related information
202 *
203 * @param info
204 * @param party the party we're handling
205 * @throws IOException
206 */
207 private void sniff(Inform info, RunningParty party) throws IOException {
208 // do NOT sniff Finished messages and close sockets. Parties and their
209 // sockets need to stay running till all messages are handled
210 if (info instanceof Settings) {
211 Date end = ((Settings) info).getProgress().getTerminationTime();
212 runningparties.replace(party.withEndTime(end));
213 }
214 }
215
216 @OnClose
217 public void onClose() throws IOException {
218 log.log(Level.INFO, "socket closed to " + partyID);
219 if (!threads.isEmpty()) {
220 log.log(Level.WARNING, "Party " + partyID
221 + " failed to terminate. Trying to kill the remaining threads.");
222 threads.killall();
223 }
224 runningparties.remove(partyID);
225 outstream.stop();
226 }
227
228 @OnError
229 public void onError(Throwable t) throws Throwable {
230 if (t instanceof EOFException) {
231 // This is a standard exception from Apache when a socket closes.
232 // Just ignore it...
233 log.log(Level.FINEST,
234 "apache reported EOF from (probably closed) socket, ignoring");
235 return;
236 }
237 log.log(Level.WARNING, "Unhandled exception was reported by Tomcat", t);
238 }
239
240 @Override
241 public void send(Action action) throws IOException {
242 if (action == null) {
243 log.log(Level.INFO, "null action received. Closing websocket");
244 // signals that party closed the stream
245 close();
246 return;
247 }
248 log.log(Level.FINE, "Action " + partyID + ": " + action);
249 outstream.send(Constants.getJackson().writeValueAsString(action));
250 }
251
252 @Override
253 public Reference getReference() {
254 return null;
255 }
256
257 @Override
258 public URI getRemoteURI() {
259 // In serverendpoints we apparently can't see who was contacting us.
260 throw new UnsupportedOperationException();
261 }
262
263 @Override
264 public void close() {
265 try {
266 session.close();
267 } catch (IOException e) {
268 log.log(Level.SEVERE, "failed to close " + partyID, e);
269 }
270 }
271
272 @Override
273 public Throwable getError() {
274 return error;
275 }
276
277 @Override
278 public void addListener(Listener<Inform> l) {
279 listeners.add(l);
280 }
281
282 @Override
283 public void removeListener(Listener<Inform> l) {
284 listeners.remove(l);
285 }
286
287 /**
288 * This should only be called by the owner of the listenable, not by
289 * listeners or others. Avoid calling this from synchronized blocks as a
290 * notified listener might immediately make more calls to you.
291 * <p>
292 * Any listeners that throw an exception will be intercepted and their
293 * stacktrace is printed.
294 *
295 * @param data information about the change.
296 */
297 public void notifyListeners(Inform data) {
298 for (Listener<Inform> l : listeners) {
299 l.notifyChange(data);
300 }
301 }
302
303}
Note: See TracBrowser for help on using the repository browser.