source: src/main/java/geniusweb/partiesserver/websocket/PartySocket.java@ 5

Last change on this file since 5 was 4, checked in by bart, 5 years ago

Faster example parties

File size: 6.3 KB
RevLine 
[1]1package geniusweb.partiesserver.websocket;
2
3import java.io.EOFException;
4import java.io.IOException;
5import java.net.URI;
6import java.util.Date;
7import java.util.logging.Level;
8
9import javax.websocket.CloseReason;
10import javax.websocket.CloseReason.CloseCodes;
11import javax.websocket.OnClose;
12import javax.websocket.OnError;
13import javax.websocket.OnMessage;
14import javax.websocket.OnOpen;
15import javax.websocket.Session;
16import javax.websocket.server.PathParam;
17import javax.websocket.server.ServerEndpoint;
18
19import com.fasterxml.jackson.core.JsonParseException;
20import com.fasterxml.jackson.databind.JsonMappingException;
21import com.fasterxml.jackson.databind.ObjectMapper;
22
23import geniusweb.actions.Action;
24import geniusweb.actions.PartyId;
25import geniusweb.connection.DefaultConnection;
26import geniusweb.partiesserver.repository.RunningPartiesRepo;
27import geniusweb.partiesserver.repository.RunningParty;
28import geniusweb.party.Party;
29import geniusweb.party.inform.Finished;
30import geniusweb.party.inform.Inform;
31import geniusweb.party.inform.Settings;
32import geniusweb.references.Reference;
33import tudelft.utilities.listener.Listener;
34import tudelft.utilities.logging.ReportToLogger;
35import tudelft.utilities.logging.Reporter;
36
37/**
38 * Returns a websocket that communicates with the {@link RunningParty}. We close
39 * the socket when we detect the party is gone (eg, due to a bug in the party or
40 * a time-out). If the socket breaks, we close the party as it becomes isolated.
41 */
42@ServerEndpoint("/party/{party}")
43public class PartySocket extends DefaultConnection<Inform, Action> {
44 private final static ObjectMapper jackson = new ObjectMapper();
45 private final Reporter log;
46 private final RunningPartiesRepo runningparties;
47
48 // should all be final, except that we can only set them when start is
49 // called...
50 private Session session;
51
52 private PartyId partyID;
53
54 private SendBuffer outstream;
55 private Throwable error = null;
56
57 public PartySocket() {
58 this(RunningPartiesRepo.instance(),
59 new ReportToLogger("partiesserver"));
60 }
61
62 public PartySocket(RunningPartiesRepo parties, Reporter reporter) {
63 this.runningparties = parties;
64 this.log = reporter;
65 }
66
67 @OnOpen
68 public void start(Session session,
69 @PathParam("party") final String partyidname) throws IOException {
70 if (partyidname == null) {
71 throw new IllegalArgumentException("party can't be null");
72 }
73 this.session = session;
74 this.partyID = new PartyId(partyidname);
75 this.outstream = new SendBuffer(session.getBasicRemote(), log);
76
77 // listen to parties repo, so that we can act if party is terminated
78 runningparties.addListener(new Listener<PartyId>() {
79 @Override
80 public void notifyChange(PartyId data) {
81 if (runningparties.get(partyID) == null) {
82 // party was removed, probably due to time out.
83 try {
84 log.log(Level.INFO,
85 "Party " + partyID + " was terminated ");
86 runningparties.removeListener(this);
87 session.close(new CloseReason(CloseCodes.GOING_AWAY,
88 "party died"));
89 } catch (IOException e) {
90 log.log(Level.WARNING,
91 "failed to close the socket for " + partyID, e);
92 }
93 }
94 }
95 });
96 RunningParty runningparty = runningparties.get(partyID);
97
98 if (runningparty == null) {
99 session.close(new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT,
100 "No such party: " + partyID));
101 } else {
102 runningparties.replace(runningparty.withConnection(this));
103 }
104
105 // we do not listen for changes on the Parties factory as we can't
106 // replace the running party anyway
107 }
108
109 /**
110 * Called when a message comes in on the server socket that needs to be
111 * passed into the {@link Party}. We also sniff the message for termination
112 * indicators.
113 *
114 * @param informmessage the incoming string, supposedly a JSON-fornatted
115 * {@link Inform}
116 * @param session the session. We already have this info.
117 */
118 @OnMessage
119 public void onMessage(String informmessage, Session session)
120 throws JsonParseException, JsonMappingException, IOException {
121 if (this.session != session) {
122 throw new IllegalStateException("Unexpected change of session ID");
123 }
124 Inform info = jackson.readValue(informmessage, Inform.class);
125 RunningParty party = runningparties.get(partyID);
126 if (party != null) {
[4]127 // first sniff, to ensure the deadlines are updated first.
128 sniff(info, party);
[1]129 log.log(Level.FINE, "Inform " + partyID + ": " + info);
130 try {
131 party.inform(info);
132 } catch (Throwable e) {
133 // simple sandbox
134 log.log(Level.WARNING, "Party failed on inform:", e);
135 }
136 } // else dead but that's handled in the listener above
137 }
138
139 /**
140 * Sniffs the Inform for termination-related information
141 *
142 * @param info
143 * @param party the party we're handling
144 * @throws IOException
145 */
146 private void sniff(Inform info, RunningParty party) throws IOException {
147 if (info instanceof Finished) {
148 session.close(new CloseReason(CloseCodes.GOING_AWAY,
149 "session reached agreement"));
150 } else if (info instanceof Settings) {
151 Date end = ((Settings) info).getProgress().getTerminationTime();
152 runningparties.replace(party.withEndTime(end));
153 }
154 }
155
156 @OnClose
157 public void onClose() throws IOException {
158 log.log(Level.INFO, "socket closed to " + partyID);
159 /*
160 * for now just kill the party. Maybe we can do something with reconnect
161 * but that will be complex.
162 */
163 runningparties.remove(partyID);
164 outstream.stop();
165 }
166
167 @OnError
168 public void onError(Throwable t) throws Throwable {
169 if (t instanceof EOFException) {
170 // This is a standard exception from Apache when a socket closes.
171 // Just ignore
172 // it...
173 log.log(Level.FINEST,
174 "apache reported EOF from (probably closed) socket, ignoring");
175 return;
176 }
177 log.log(Level.WARNING, "Unhandled exception was reported by Tomcat", t);
178 }
179
180 @Override
181 public void send(Action action) throws IOException {
182 log.log(Level.FINE, "Action " + partyID + ": " + action);
183 outstream.send(jackson.writeValueAsString(action));
184 }
185
186 @Override
187 public Reference getReference() {
188 return null;
189 }
190
191 @Override
192 public URI getRemoteURI() {
193 // In serverendpoints we apparently can't see who was contacting us.
194 throw new UnsupportedOperationException();
195 }
196
197 @Override
198 public void close() {
199 try {
200 session.close();
201 } catch (IOException e) {
202 log.log(Level.SEVERE, "failed to close " + partyID, e);
203 }
204 }
205
206 @Override
207 public Throwable getError() {
208 return error;
209 }
210
211}
Note: See TracBrowser for help on using the repository browser.