source: src/main/java/geniusweb/partiesserver/websocket/PartySocket.java@ 9

Last change on this file since 9 was 9, checked in by bart, 5 years ago

Update 28 jan 2020

File size: 8.1 KB
RevLine 
[1]1package geniusweb.partiesserver.websocket;
2
3import java.io.EOFException;
4import java.io.IOException;
5import java.net.URI;
6import java.util.Date;
[8]7import java.util.List;
8import java.util.concurrent.CopyOnWriteArrayList;
[1]9import java.util.logging.Level;
10
11import javax.websocket.CloseReason;
12import javax.websocket.CloseReason.CloseCodes;
13import javax.websocket.OnClose;
14import javax.websocket.OnError;
15import javax.websocket.OnMessage;
16import javax.websocket.OnOpen;
17import javax.websocket.Session;
18import javax.websocket.server.PathParam;
19import javax.websocket.server.ServerEndpoint;
20
21import com.fasterxml.jackson.core.JsonParseException;
22import com.fasterxml.jackson.databind.JsonMappingException;
23import com.fasterxml.jackson.databind.ObjectMapper;
24
25import geniusweb.actions.Action;
26import geniusweb.actions.PartyId;
[8]27import geniusweb.connection.ConnectionEnd;
[1]28import geniusweb.partiesserver.repository.RunningPartiesRepo;
29import geniusweb.partiesserver.repository.RunningParty;
30import geniusweb.party.Party;
31import geniusweb.party.inform.Inform;
32import geniusweb.party.inform.Settings;
33import geniusweb.references.Reference;
34import tudelft.utilities.listener.Listener;
35import tudelft.utilities.logging.ReportToLogger;
36import tudelft.utilities.logging.Reporter;
37
38/**
39 * Returns a websocket that communicates with the {@link RunningParty}. We close
40 * the socket when we detect the party is gone (eg, due to a bug in the party or
41 * a time-out). If the socket breaks, we close the party as it becomes isolated.
42 */
43@ServerEndpoint("/party/{party}")
[8]44public class PartySocket implements ConnectionEnd<Inform, Action> {
[1]45 private final static ObjectMapper jackson = new ObjectMapper();
46 private final Reporter log;
47 private final RunningPartiesRepo runningparties;
[8]48 private final List<Listener<Inform>> listeners = new CopyOnWriteArrayList<Listener<Inform>>();
[9]49 private static final int MAX_MESSAGE_SIZE = 20 * 1024 * 1024;
[1]50
51 // should all be final, except that we can only set them when start is
52 // called...
53 private Session session;
54
55 private PartyId partyID;
56
57 private SendBuffer outstream;
58 private Throwable error = null;
59
60 public PartySocket() {
61 this(RunningPartiesRepo.instance(),
62 new ReportToLogger("partiesserver"));
63 }
64
65 public PartySocket(RunningPartiesRepo parties, Reporter reporter) {
66 this.runningparties = parties;
67 this.log = reporter;
68 }
69
70 @OnOpen
71 public void start(Session session,
72 @PathParam("party") final String partyidname) throws IOException {
73 if (partyidname == null) {
74 throw new IllegalArgumentException("party can't be null");
75 }
[9]76 session.setMaxTextMessageBufferSize(MAX_MESSAGE_SIZE);
77 session.setMaxBinaryMessageBufferSize(MAX_MESSAGE_SIZE);
78
[1]79 this.session = session;
80 this.partyID = new PartyId(partyidname);
81 this.outstream = new SendBuffer(session.getBasicRemote(), log);
82
83 // listen to parties repo, so that we can act if party is terminated
84 runningparties.addListener(new Listener<PartyId>() {
85 @Override
86 public void notifyChange(PartyId data) {
87 if (runningparties.get(partyID) == null) {
88 // party was removed, probably due to time out.
89 try {
90 log.log(Level.INFO,
91 "Party " + partyID + " was terminated ");
92 runningparties.removeListener(this);
93 session.close(new CloseReason(CloseCodes.GOING_AWAY,
[8]94 "detected that party vanished"));
[1]95 } catch (IOException e) {
96 log.log(Level.WARNING,
97 "failed to close the socket for " + partyID, e);
98 }
99 }
100 }
101 });
102 RunningParty runningparty = runningparties.get(partyID);
103
104 if (runningparty == null) {
105 session.close(new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT,
106 "No such party: " + partyID));
107 } else {
108 runningparties.replace(runningparty.withConnection(this));
109 }
110
111 // we do not listen for changes on the Parties factory as we can't
112 // replace the running party anyway
113 }
114
115 /**
116 * Called when a message comes in on the server socket that needs to be
117 * passed into the {@link Party}. We also sniff the message for termination
118 * indicators.
119 *
120 * @param informmessage the incoming string, supposedly a JSON-fornatted
121 * {@link Inform}
122 * @param session the session. We already have this info.
123 */
124 @OnMessage
125 public void onMessage(String informmessage, Session session)
126 throws JsonParseException, JsonMappingException, IOException {
127 if (this.session != session) {
128 throw new IllegalStateException("Unexpected change of session ID");
129 }
130 Inform info = jackson.readValue(informmessage, Inform.class);
131 RunningParty party = runningparties.get(partyID);
132 if (party != null) {
[4]133 // first sniff, to ensure the deadlines are updated first.
134 sniff(info, party);
[1]135 log.log(Level.FINE, "Inform " + partyID + ": " + info);
136 try {
137 party.inform(info);
138 } catch (Throwable e) {
[8]139 log.log(Level.WARNING, "Party failed on inform.", e);
[9]140 e.printStackTrace();
141 // severe as someone wants to debug that. Not severe for us
[8]142 // we don't use jackson.writeValueAsString(e) here because
143 // CloseReason has 123 char limit.
144 // The error will be too large to fit and will be truncated
145 // therefore we send a plain string and hope that enough
146 // will arrive at the other side (the protocol)
147 session.close(new CloseReason(
148 CloseReason.CloseCodes.CLOSED_ABNORMALLY,
149 "party threw exception: " + collectReasons(e)));
[1]150 }
[8]151 } // else party is dead but that's handled in the listener above
[1]152 }
153
154 /**
[8]155 *
156 * @param e an exception
157 * @return collect all reasons and sub-reasons
158 */
159 private String collectReasons(Throwable e) {
160 String reason = "";
161 do {
162 reason = reason + " : " + e.getClass().getSimpleName() + ":"
163 + e.getMessage();
164 e = e.getCause();
165 } while (e != null);
166 return reason;
167 }
168
169 /**
[1]170 * Sniffs the Inform for termination-related information
171 *
172 * @param info
173 * @param party the party we're handling
174 * @throws IOException
175 */
176 private void sniff(Inform info, RunningParty party) throws IOException {
[8]177 // do NOT sniff Finished messages and close sockets. Parties and their
178 // sockets need to stay running till all messages are handled
179 if (info instanceof Settings) {
[1]180 Date end = ((Settings) info).getProgress().getTerminationTime();
181 runningparties.replace(party.withEndTime(end));
182 }
183 }
184
185 @OnClose
186 public void onClose() throws IOException {
187 log.log(Level.INFO, "socket closed to " + partyID);
188 /*
189 * for now just kill the party. Maybe we can do something with reconnect
190 * but that will be complex.
191 */
192 runningparties.remove(partyID);
193 outstream.stop();
194 }
195
196 @OnError
197 public void onError(Throwable t) throws Throwable {
198 if (t instanceof EOFException) {
199 // This is a standard exception from Apache when a socket closes.
200 // Just ignore
201 // it...
202 log.log(Level.FINEST,
203 "apache reported EOF from (probably closed) socket, ignoring");
204 return;
205 }
206 log.log(Level.WARNING, "Unhandled exception was reported by Tomcat", t);
207 }
208
209 @Override
210 public void send(Action action) throws IOException {
211 log.log(Level.FINE, "Action " + partyID + ": " + action);
212 outstream.send(jackson.writeValueAsString(action));
213 }
214
215 @Override
216 public Reference getReference() {
217 return null;
218 }
219
220 @Override
221 public URI getRemoteURI() {
222 // In serverendpoints we apparently can't see who was contacting us.
223 throw new UnsupportedOperationException();
224 }
225
226 @Override
227 public void close() {
228 try {
229 session.close();
230 } catch (IOException e) {
231 log.log(Level.SEVERE, "failed to close " + partyID, e);
232 }
233 }
234
235 @Override
236 public Throwable getError() {
237 return error;
238 }
239
[8]240 @Override
241 public void addListener(Listener<Inform> l) {
242 listeners.add(l);
243 }
244
245 @Override
246 public void removeListener(Listener<Inform> l) {
247 listeners.remove(l);
248 }
249
250 /**
251 * This should only be called by the owner of the listenable, not by
252 * listeners or others. Avoid calling this from synchronized blocks as a
253 * notified listener might immediately make more calls to you.
254 * <p>
255 * Any listeners that throw an exception will be intercepted and their
256 * stacktrace is printed.
257 *
258 * @param data information about the change.
259 */
260 public void notifyListeners(Inform data) {
261 for (Listener<Inform> l : listeners) {
262 l.notifyChange(data);
263 }
264 }
265
[1]266}
Note: See TracBrowser for help on using the repository browser.