package geniusweb.partiesserver.websocket; import java.io.EOFException; import java.io.IOException; import java.net.URI; import java.util.Date; import java.util.logging.Level; import javax.websocket.CloseReason; import javax.websocket.CloseReason.CloseCodes; import javax.websocket.OnClose; import javax.websocket.OnError; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import geniusweb.actions.Action; import geniusweb.actions.PartyId; import geniusweb.connection.DefaultConnection; import geniusweb.partiesserver.repository.RunningPartiesRepo; import geniusweb.partiesserver.repository.RunningParty; import geniusweb.party.Party; import geniusweb.party.inform.Finished; import geniusweb.party.inform.Inform; import geniusweb.party.inform.Settings; import geniusweb.references.Reference; import tudelft.utilities.listener.Listener; import tudelft.utilities.logging.ReportToLogger; import tudelft.utilities.logging.Reporter; /** * Returns a websocket that communicates with the {@link RunningParty}. We close * the socket when we detect the party is gone (eg, due to a bug in the party or * a time-out). If the socket breaks, we close the party as it becomes isolated. */ @ServerEndpoint("/party/{party}") public class PartySocket extends DefaultConnection { private final static ObjectMapper jackson = new ObjectMapper(); private final Reporter log; private final RunningPartiesRepo runningparties; // should all be final, except that we can only set them when start is // called... private Session session; private PartyId partyID; private SendBuffer outstream; private Throwable error = null; public PartySocket() { this(RunningPartiesRepo.instance(), new ReportToLogger("partiesserver")); } public PartySocket(RunningPartiesRepo parties, Reporter reporter) { this.runningparties = parties; this.log = reporter; } @OnOpen public void start(Session session, @PathParam("party") final String partyidname) throws IOException { if (partyidname == null) { throw new IllegalArgumentException("party can't be null"); } this.session = session; this.partyID = new PartyId(partyidname); this.outstream = new SendBuffer(session.getBasicRemote(), log); // listen to parties repo, so that we can act if party is terminated runningparties.addListener(new Listener() { @Override public void notifyChange(PartyId data) { if (runningparties.get(partyID) == null) { // party was removed, probably due to time out. try { log.log(Level.INFO, "Party " + partyID + " was terminated "); runningparties.removeListener(this); session.close(new CloseReason(CloseCodes.GOING_AWAY, "party died")); } catch (IOException e) { log.log(Level.WARNING, "failed to close the socket for " + partyID, e); } } } }); RunningParty runningparty = runningparties.get(partyID); if (runningparty == null) { session.close(new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT, "No such party: " + partyID)); } else { runningparties.replace(runningparty.withConnection(this)); } // we do not listen for changes on the Parties factory as we can't // replace the running party anyway } /** * Called when a message comes in on the server socket that needs to be * passed into the {@link Party}. We also sniff the message for termination * indicators. * * @param informmessage the incoming string, supposedly a JSON-fornatted * {@link Inform} * @param session the session. We already have this info. */ @OnMessage public void onMessage(String informmessage, Session session) throws JsonParseException, JsonMappingException, IOException { if (this.session != session) { throw new IllegalStateException("Unexpected change of session ID"); } Inform info = jackson.readValue(informmessage, Inform.class); RunningParty party = runningparties.get(partyID); if (party != null) { // first sniff, to ensure the deadlines are updated first. sniff(info, party); log.log(Level.FINE, "Inform " + partyID + ": " + info); try { party.inform(info); } catch (Throwable e) { // simple sandbox log.log(Level.WARNING, "Party failed on inform:", e); } } // else dead but that's handled in the listener above } /** * Sniffs the Inform for termination-related information * * @param info * @param party the party we're handling * @throws IOException */ private void sniff(Inform info, RunningParty party) throws IOException { if (info instanceof Finished) { session.close(new CloseReason(CloseCodes.GOING_AWAY, "session reached agreement")); } else if (info instanceof Settings) { Date end = ((Settings) info).getProgress().getTerminationTime(); runningparties.replace(party.withEndTime(end)); } } @OnClose public void onClose() throws IOException { log.log(Level.INFO, "socket closed to " + partyID); /* * for now just kill the party. Maybe we can do something with reconnect * but that will be complex. */ runningparties.remove(partyID); outstream.stop(); } @OnError public void onError(Throwable t) throws Throwable { if (t instanceof EOFException) { // This is a standard exception from Apache when a socket closes. // Just ignore // it... log.log(Level.FINEST, "apache reported EOF from (probably closed) socket, ignoring"); return; } log.log(Level.WARNING, "Unhandled exception was reported by Tomcat", t); } @Override public void send(Action action) throws IOException { log.log(Level.FINE, "Action " + partyID + ": " + action); outstream.send(jackson.writeValueAsString(action)); } @Override public Reference getReference() { return null; } @Override public URI getRemoteURI() { // In serverendpoints we apparently can't see who was contacting us. throw new UnsupportedOperationException(); } @Override public void close() { try { session.close(); } catch (IOException e) { log.log(Level.SEVERE, "failed to close " + partyID, e); } } @Override public Throwable getError() { return error; } }