source: protocol/src/main/java/geniusweb/protocol/session/saop/SAOP.java@ 28

Last change on this file since 28 was 28, checked in by bart, 4 years ago

minor fixes to improve extendability

File size: 11.9 KB
Line 
1package geniusweb.protocol.session.saop;
2
3import java.io.IOException;
4import java.sql.Date;
5import java.text.SimpleDateFormat;
6import java.util.List;
7import java.util.Timer;
8import java.util.TimerTask;
9import java.util.concurrent.atomic.AtomicBoolean;
10import java.util.logging.Level;
11import java.util.stream.Collectors;
12
13import geniusweb.actions.Action;
14import geniusweb.actions.PartyId;
15import geniusweb.deadline.Deadline;
16import geniusweb.events.ProtocolEvent;
17import geniusweb.inform.ActionDone;
18import geniusweb.inform.Finished;
19import geniusweb.inform.Inform;
20import geniusweb.inform.Settings;
21import geniusweb.inform.YourTurn;
22import geniusweb.progress.ProgressFactory;
23import geniusweb.protocol.CurrentNegoState;
24import geniusweb.protocol.ProtocolException;
25import geniusweb.protocol.partyconnection.ProtocolToPartyConn;
26import geniusweb.protocol.partyconnection.ProtocolToPartyConnFactory;
27import geniusweb.protocol.session.SessionProtocol;
28import geniusweb.protocol.session.SessionSettings;
29import geniusweb.protocol.session.SessionState;
30import geniusweb.references.Parameters;
31import geniusweb.references.PartyWithProfile;
32import geniusweb.references.ProfileRef;
33import geniusweb.references.ProtocolRef;
34import geniusweb.references.Reference;
35import tudelft.utilities.listener.DefaultListenable;
36import tudelft.utilities.logging.Reporter;
37import tudelft.utilities.repository.NoResourcesNowException;
38
39/**
40 * The protocol runs as follows
41 * <ol>
42 * <li>The protocol tries to start all parties. If not all parties start, the
43 * parties are freed up and another attempt is done to start all parties some
44 * time later.
45 * <li>All parties are sent the {@link SessionSettings}. Only parties specified
46 * initially in the settings do participate.
47 * <li>The session deadline clock now starts ticking.
48 * <li>All parties are sent their settings.
49 * <li>All parties get YourTurn in clockwise order. A party must do exactly one
50 * action after it received YourTurn.
51 * <li>The negotiation continues until an agreement is reached (all parties
52 * agreed to the last bid), the {@link Deadline} is reached, or a party fails to
53 * adhere to the protocol.
54 * <li>If the session times out, the connections are cut and the negotiation
55 * completes without errors and without agreement.
56 * </ol>
57 * <p>
58 * This logs to "Protocol" logger if there are issues
59 * <p>
60 * This object is mutable: the internal state changes as parties interact with
61 * the protocol.
62 * <p>
63 * Thread safe: all entry points are synchronized.
64 */
65public class SAOP extends DefaultListenable<ProtocolEvent>
66 implements SessionProtocol {
67 public static final int TIME_MARGIN = 20;// ms extra delay after deadline
68 public static final int MINDURATION = 100;
69 public static final int MIN_SLEEP_TIME = 1000;
70 public static final int MAX_SLEEP_TIME = 60000;
71 private static final ProtocolRef SAOP = new ProtocolRef("SAOP");
72 private final Reporter log;
73
74 private SAOPState state = null; // mutable!
75 private volatile AtomicBoolean isFinishedInfoSent = new AtomicBoolean(
76 false);
77 private volatile Timer deadlinetimer = null;
78
79 /**
80 *
81 * @param state normally the initial state coming from SAOPSettings
82 * @param logger the Reporter to log to
83 */
84 public SAOP(SAOPState state, Reporter logger) {
85 if (state == null) {
86 throw new NullPointerException("state must be not null");
87 }
88 if (state.getSettings().getDeadline().getDuration() < MINDURATION) {
89 throw new IllegalArgumentException(
90 "Duration must be at least " + MINDURATION);
91 }
92 this.log = logger;
93 this.state = state;
94 }
95
96 @Override
97 public synchronized void start(
98 ProtocolToPartyConnFactory connectionfactory) {
99
100 try {
101 connect(connectionfactory);
102 setDeadline();
103 setupParties();
104 nextTurn();
105 } catch (Throwable e) {
106 handleError("Failed to start up session", null, e);
107 }
108 }
109
110 @Override
111 public String getDescription() {
112 return "All parties get YourTurn in clockwise order, after which they can do their next action. "
113 + "No new participants after start. End after prescribed deadline or when some bid is unanimously Accepted."
114 + "Parties can only act on their own behalf and only when it is their turn.";
115 }
116
117 @Override
118 public void addParticipant(PartyWithProfile party)
119 throws IllegalStateException {
120 throw new IllegalStateException(
121 "Dynamic joining a negotiation is not allowed in SAOP");
122 }
123
124 @Override
125 public SessionState getState() {
126 return state;
127 }
128
129 @Override
130 public ProtocolRef getRef() {
131 return SAOP;
132 }
133
134 /*******************************************************************
135 * private functions. Some are protected only, for testing purposes
136 ********************************************************************/
137 /**
138 * step 1 in protocol: connect all involved parties and start the clock.
139 * This always "succeeds" with a valid (but possibly final) state
140 * <p>
141 * This is 'protected' to allow junit testing, this code is not a 'public'
142 * part of the interface.
143 *
144 * @param connectionfactory the connectionfactory for making party
145 * connections
146 *
147 * @throws InterruptedException if the connection procedure is unterrupted
148 *
149 * @throws IOException if this fails to properly conect to the
150 * parties, eg interrupted or server not
151 * responding..
152 */
153 protected synchronized void connect(
154 ProtocolToPartyConnFactory connectionfactory)
155 throws InterruptedException, IOException {
156 List<PartyWithProfile> participants = state.getSettings()
157 .getAllParties();
158 List<Reference> parties = participants.stream()
159 .map(parti -> (parti.getParty().getPartyRef()))
160 .collect(Collectors.toList());
161 List<ProtocolToPartyConn> connections = null;
162 log.log(Level.INFO, "SAOP connect " + parties);
163 while (connections == null) {
164 try {
165 connections = connectionfactory.connect(parties);
166 } catch (NoResourcesNowException e) {
167 long waitms = e.getLater().getTime()
168 - System.currentTimeMillis();
169 log.log(Level.INFO,
170 "No resources available to run session, waiting"
171 + waitms);
172 Thread.sleep(Math.min(MAX_SLEEP_TIME,
173 Math.max(MIN_SLEEP_TIME, waitms)));
174 }
175 }
176 for (int i = 0; i < participants.size(); i++) {
177 setState(this.state.with(connections.get(i), participants.get(i)));
178 }
179 }
180
181 /**
182 * Set state to proper deadline. Starts the timer tasks. This tasks triggers
183 * a call to handleError when the session times out.
184 */
185 private synchronized void setDeadline() {
186 long now = System.currentTimeMillis();
187 Deadline deadline = state.getSettings().getDeadline();
188 setState(state.with(ProgressFactory.create(deadline, now)));
189 deadlinetimer = new Timer();
190 TimerTask task = new TimerTask() {
191 @Override
192 public void run() {
193 if (!state.isFinal(System.currentTimeMillis())) {
194 log.log(Level.SEVERE,
195 "BUG. Deadline timer has triggered but state is not final");
196 }
197 log.log(Level.WARNING,
198 "SAOP deadline reached. Terminating session.");
199 finish();
200 }
201 };
202 // set timer TIME_MARGIN after real deadline to ensure we're not too
203 // early
204 deadlinetimer.schedule(task, deadline.getDuration() + TIME_MARGIN);
205 log.log(Level.INFO, "SAOP deadline set to "
206 + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
207 .format(new Date(System.currentTimeMillis()
208 + deadline.getDuration() + TIME_MARGIN)));
209 }
210
211 /**
212 * step 2 in protocol: listen to connections and send settings to the
213 * parties.
214 * <p>
215 * This is 'protected' to allow junit testing, this code is not a 'public'
216 * part of the interface.
217 *
218 * @throws ProtocolException if a party does not follow the protocol
219 */
220 protected synchronized void setupParties() throws ProtocolException {
221 for (ProtocolToPartyConn conn : state.getConnections()) {
222 conn.addListener(action -> actionRequest(conn, action));
223 }
224
225 for (ProtocolToPartyConn connection : state.getConnections()) {
226 try {
227 sendSettings(connection);
228 } catch (IOException e) {
229 throw new ProtocolException("Failed to initialize",
230 connection.getParty(), e);
231 }
232 }
233
234 }
235
236 /**
237 * Inform a party about its settings
238 *
239 * @param connection
240 * @throws IOException if party got disconnected
241 */
242 private synchronized void sendSettings(ProtocolToPartyConn connection)
243 throws IOException {
244 PartyId partyid = connection.getParty();
245 ProfileRef profile = state.getPartyProfiles().get(partyid).getProfile();
246 Parameters params = state.getPartyProfiles().get(partyid).getParty()
247 .getParameters();
248 if (profile == null) {
249 throw new IllegalArgumentException(
250 "Missing profile for party " + connection.getReference());
251 }
252 connection.send(new Settings(connection.getParty(), profile, getRef(),
253 state.getProgress(), params));
254 }
255
256 /**
257 * This is called when one of the party connections does an action.
258 * Synchronized so that we always handle only 1 action at a time.
259 *
260 * @param partyconn the connection on which the action came in.
261 * @param action the {@link Action} taken by some party
262 */
263 protected synchronized void actionRequest(
264 final ProtocolToPartyConn partyconn, final Action action) {
265 if (action == null) {
266 Throwable err = partyconn.getError();
267 if (err == null) {
268 err = new ProtocolException("Party sent a null action",
269 partyconn.getParty());
270 }
271 handleError(partyconn + "Protocol error", partyconn.getParty(),
272 err);
273 return;
274 }
275
276 try {
277 if (!partyconn.getParty().equals(state.getNextActor())) {
278 // party does not have the turn.
279 throw new ProtocolException(
280 "Party acts without having the turn",
281 partyconn.getParty());
282 }
283 // FIXME? this ignores possible broadcast errors
284 state.getConnections().broadcast(new ActionDone(action));
285 setState(state.with(partyconn.getParty(), action));
286 if (!state.isFinal(System.currentTimeMillis()))
287 nextTurn();
288 } catch (Throwable e) {
289 handleError("failed to handle action " + action,
290 partyconn.getParty(), e);
291 }
292
293 }
294
295 /**
296 * Signal next participant it's his turn
297 *
298 * @throws IOException
299 */
300 private synchronized void nextTurn() {
301 PartyId party = state.getNextActor();
302 try {
303 state.getConnections().get(party).send(new YourTurn());
304 } catch (IOException e) {
305 handleError("failed to send YourTurn", party, e);
306 }
307 }
308
309 /**
310 * Update state to include the given error and finishes up the session.
311 *
312 * @param message The message to attach to the error
313 * @param party the party where the error occured
314 * @param e the exception that occured.
315 */
316 private synchronized void handleError(final String message,
317 final PartyId party, final Throwable e) {
318 if (e instanceof ProtocolException) {
319 setState(state.with((ProtocolException) e));
320 } else {
321 setState(state.with(new ProtocolException(message, party, e)));
322 }
323 log.log(Level.WARNING, "SAOP protocol intercepted error due to party "
324 + party + ":" + message, e);
325 }
326
327 /**
328 * Sets the new state. If the new state is final, the finish-up procedure is
329 * executed.
330 *
331 * @param newstate the new state.
332 */
333 private synchronized void setState(SAOPState newstate) {
334 long now = System.currentTimeMillis();
335 if (state.isFinal(now)) {
336 finish();
337 return;
338 }
339 this.state = newstate;
340 if (newstate.isFinal(now)) {
341 finish();
342 }
343 }
344
345 /**
346 * Called when we reach final state. Cancels deadline timer. Send finished
347 * info to all parties, notify current nego state as final and set
348 * {@link #isFinishedInfoSent}. Double calls are automatically ignored.
349 */
350 private synchronized void finish() {
351 if (deadlinetimer != null) {
352 deadlinetimer.cancel();
353 deadlinetimer = null;
354 }
355 if (!isFinishedInfoSent.compareAndSet(false, true))
356 return;
357 Inform finished = new Finished(state.getAgreements());
358 for (ProtocolToPartyConn conn : state.getConnections()) {
359 sendFinish(conn, finished);
360 }
361 notifyListeners(new CurrentNegoState(state));
362 }
363
364 private void sendFinish(ProtocolToPartyConn connection, Inform finished) {
365 try {
366 connection.send(finished);
367 connection.close();
368 } catch (Exception e) {
369 log.log(Level.INFO, "Failed to send Finished to " + connection, e);
370 }
371 }
372
373}
Note: See TracBrowser for help on using the repository browser.