package geniusweb.partiesserver.websocket; import java.io.EOFException; import java.io.IOException; import java.net.URI; import java.util.Date; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.logging.Level; import javax.websocket.CloseReason; import javax.websocket.CloseReason.CloseCodes; import javax.websocket.OnClose; import javax.websocket.OnError; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import com.fasterxml.jackson.databind.ObjectMapper; import geniusweb.actions.Action; import geniusweb.actions.PartyId; import geniusweb.connection.ConnectionEnd; import geniusweb.inform.Inform; import geniusweb.inform.Settings; import geniusweb.partiesserver.repository.RunningPartiesRepo; import geniusweb.partiesserver.repository.RunningParty; import geniusweb.party.Party; import geniusweb.references.Reference; import tudelft.utilities.listener.Listener; import tudelft.utilities.logging.ReportToLogger; import tudelft.utilities.logging.Reporter; /** * Returns a websocket that communicates with the {@link RunningParty}. We close * the socket when we detect the party is gone (eg, due to a bug in the party or * a time-out). If the socket breaks, we close the party as it becomes isolated. */ @ServerEndpoint("/party/{party}") public class PartySocket implements ConnectionEnd { private final static ObjectMapper jackson = new ObjectMapper(); private static final int MAX_MESSAGE_SIZE = 20 * 1024 * 1024; private final Reporter log; private final RunningPartiesRepo runningparties; private final List> listeners = new CopyOnWriteArrayList>(); private final ActiveThreads threads; // should all be final, except that we can only set them when start is // called... private Session session; private PartyId partyID; private SendBuffer outstream; private Throwable error = null; public PartySocket() { this(RunningPartiesRepo.instance(), new ReportToLogger("partiesserver")); } public PartySocket(RunningPartiesRepo parties, Reporter reporter) { this.runningparties = parties; this.log = reporter; this.threads = new ActiveThreads(log); } @OnOpen public void start(Session session, @PathParam("party") final String partyidname) throws IOException { if (partyidname == null) { throw new IllegalArgumentException("party can't be null"); } session.setMaxTextMessageBufferSize(MAX_MESSAGE_SIZE); session.setMaxBinaryMessageBufferSize(MAX_MESSAGE_SIZE); this.session = session; this.partyID = new PartyId(partyidname); this.outstream = new SendBuffer(session.getBasicRemote(), log); // listen to parties repo, so that we can act if party is terminated runningparties.addListener(new Listener() { @Override public void notifyChange(PartyId data) { if (runningparties.get(partyID) == null) { // party was removed, probably due to time out. try { log.log(Level.INFO, "Party " + partyID + " was terminated "); runningparties.removeListener(this); session.close(new CloseReason(CloseCodes.GOING_AWAY, "detected that party vanished")); } catch (IOException e) { log.log(Level.WARNING, "failed to close the socket for " + partyID, e); } } } }); RunningParty runningparty = runningparties.get(partyID); if (runningparty == null) { session.close(new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT, "No such party: " + partyID)); } else { runningparties.replace(runningparty.withConnection(this)); } // we do not listen for changes on the Parties factory as we can't // replace the running party anyway } /** * Called when a message comes in on the server socket that needs to be * passed into the {@link Party}. We also sniff the message for termination * indicators. * * @param informmessage the incoming string, supposedly a JSON-fornatted * {@link Inform} * @param session the session. We already have this info. * @throws IOException if there is a socket/connection error, we can't parse * the received data, etc */ @OnMessage public void onMessage(String informmessage, Session session) throws IOException { if (this.session != session) { throw new IllegalStateException("Unexpected change of session ID"); } if (!threads.isEmpty()) { log.log(Level.WARNING, "Party " + partyID + " is still busy with previous message"); } Inform info = jackson.readValue(informmessage, Inform.class); RunningParty party = runningparties.get(partyID); if (party != null) { // first sniff, to ensure the deadlines are updated first. sniff(info, party); log.log(Level.FINE, "Inform " + partyID + ": " + info); try { threads.add(); party.inform(info); threads.remove(); } catch (Throwable e) { threads.remove(); if (e instanceof ThreadDeath) /* * ThreadDeath always prints stacktrace anwyay, stacktrace * is useless because it contains the thrower's stacktrace * instead of a useful message. */ log.log(Level.WARNING, "Party was killed while handling inform."); else log.log(Level.WARNING, "Party failed on inform.", e); /* * severe as someone wants to debug that. Not severe for us we * don't use jackson.writeValueAsString(e) here because * CloseReason has 123 char limit. The error will be too large * to fit and will be truncated therefore we send a plain string * and hope that enough will arrive at the other side (the * protocol) */ session.close(new CloseReason( CloseReason.CloseCodes.CLOSED_ABNORMALLY, "party threw exception: " + collectReasons(e))); } } // else party is dead but that's handled in the listener above } /** * * @param e an exception * @return collect all reasons and sub-reasons */ private String collectReasons(Throwable e) { String reason = ""; do { reason = reason + " : " + e.getClass().getSimpleName() + ":" + e.getMessage(); e = e.getCause(); } while (e != null); return reason; } /** * Sniffs the Inform for termination-related information * * @param info * @param party the party we're handling * @throws IOException */ private void sniff(Inform info, RunningParty party) throws IOException { // do NOT sniff Finished messages and close sockets. Parties and their // sockets need to stay running till all messages are handled if (info instanceof Settings) { Date end = ((Settings) info).getProgress().getTerminationTime(); runningparties.replace(party.withEndTime(end)); } } @OnClose public void onClose() throws IOException { log.log(Level.INFO, "socket closed to " + partyID); if (!threads.isEmpty()) { log.log(Level.WARNING, "Party " + partyID + " failed to terminate. Trying to kill the remaining threads."); threads.killall(); } runningparties.remove(partyID); outstream.stop(); } @OnError public void onError(Throwable t) throws Throwable { if (t instanceof EOFException) { // This is a standard exception from Apache when a socket closes. // Just ignore // it... log.log(Level.FINEST, "apache reported EOF from (probably closed) socket, ignoring"); return; } log.log(Level.WARNING, "Unhandled exception was reported by Tomcat", t); } @Override public void send(Action action) throws IOException { log.log(Level.FINE, "Action " + partyID + ": " + action); outstream.send(jackson.writeValueAsString(action)); } @Override public Reference getReference() { return null; } @Override public URI getRemoteURI() { // In serverendpoints we apparently can't see who was contacting us. throw new UnsupportedOperationException(); } @Override public void close() { try { session.close(); } catch (IOException e) { log.log(Level.SEVERE, "failed to close " + partyID, e); } } @Override public Throwable getError() { return error; } @Override public void addListener(Listener l) { listeners.add(l); } @Override public void removeListener(Listener l) { listeners.remove(l); } /** * This should only be called by the owner of the listenable, not by * listeners or others. Avoid calling this from synchronized blocks as a * notified listener might immediately make more calls to you. *

* Any listeners that throw an exception will be intercepted and their * stacktrace is printed. * * @param data information about the change. */ public void notifyListeners(Inform data) { for (Listener l : listeners) { l.notifyChange(data); } } }