source: protocol/src/main/java/geniusweb/protocol/session/saop/SAOP.java@ 52

Last change on this file since 52 was 52, checked in by ruud, 14 months ago

Fixed small issues in domaineditor.

File size: 12.5 KB
Line 
1package geniusweb.protocol.session.saop;
2
3import java.io.IOException;
4import java.sql.Date;
5import java.text.SimpleDateFormat;
6import java.util.Collections;
7import java.util.List;
8import java.util.Timer;
9import java.util.TimerTask;
10import java.util.concurrent.atomic.AtomicBoolean;
11import java.util.logging.Level;
12import java.util.stream.Collectors;
13
14import geniusweb.actions.Action;
15import geniusweb.actions.PartyId;
16import geniusweb.deadline.Deadline;
17import geniusweb.events.ProtocolEvent;
18import geniusweb.inform.ActionDone;
19import geniusweb.inform.Finished;
20import geniusweb.inform.Inform;
21import geniusweb.inform.Settings;
22import geniusweb.inform.YourTurn;
23import geniusweb.progress.ProgressFactory;
24import geniusweb.protocol.CurrentNegoState;
25import geniusweb.protocol.ProtocolException;
26import geniusweb.protocol.partyconnection.ProtocolToPartyConn;
27import geniusweb.protocol.partyconnection.ProtocolToPartyConnFactory;
28import geniusweb.protocol.partyconnection.ProtocolToPartyConnections;
29import geniusweb.protocol.session.SessionProtocol;
30import geniusweb.protocol.session.SessionSettings;
31import geniusweb.protocol.session.SessionState;
32import geniusweb.references.Parameters;
33import geniusweb.references.PartyWithProfile;
34import geniusweb.references.ProfileRef;
35import geniusweb.references.ProtocolRef;
36import geniusweb.references.Reference;
37import tudelft.utilities.listener.DefaultListenable;
38import tudelft.utilities.logging.Reporter;
39import tudelft.utilities.repository.NoResourcesNowException;
40
41/**
42 * The protocol runs as follows
43 * <ol>
44 * <li>The protocol tries to start all parties. If not all parties start, the
45 * parties are freed up and another attempt is done to start all parties some
46 * time later.
47 * <li>All parties are sent the {@link SessionSettings}. Only parties specified
48 * initially in the settings do participate.
49 * <li>The session deadline clock now starts ticking.
50 * <li>All parties are sent their settings.
51 * <li>All parties get YourTurn in clockwise order. A party must do exactly one
52 * action after it received YourTurn.
53 * <li>The negotiation continues until an agreement is reached (all parties
54 * agreed to the last bid), the {@link Deadline} is reached, or a party fails to
55 * adhere to the protocol.
56 * <li>If the session times out, the connections are cut and the negotiation
57 * completes without errors and without agreement.
58 * </ol>
59 * <p>
60 * This logs to "Protocol" logger if there are issues
61 * <p>
62 * This object is mutable: the internal state changes as parties interact with
63 * the protocol.
64 * <p>
65 * Thread safe: all entry points are synchronized.
66 */
67public class SAOP extends DefaultListenable<ProtocolEvent>
68 implements SessionProtocol {
69 public static final int TIME_MARGIN = 20;// ms extra delay after deadline
70 public static final int MINDURATION = 100;
71 public static final int MIN_SLEEP_TIME = 1000;
72 public static final int MAX_SLEEP_TIME = 60000;
73 private static final ProtocolRef SAOP = new ProtocolRef("SAOP");
74 private final Reporter log;
75
76 // these 4 variables are why protocols are so complex.
77 private SAOPState state = null; // mutable!
78 private volatile AtomicBoolean isFinishedInfoSent = new AtomicBoolean(
79 false);
80 private volatile Timer deadlinetimer = null;
81 private volatile ProtocolToPartyConnections conns;
82
83 public SAOP(SAOPState state, Reporter logger,
84 ProtocolToPartyConnections connects) {
85 if (state == null) {
86 throw new NullPointerException("state must be not null");
87 }
88 if (state.getSettings().getDeadline().getDuration() < MINDURATION) {
89 throw new IllegalArgumentException(
90 "Duration must be at least " + MINDURATION);
91 }
92 this.log = logger;
93 this.state = state;
94 this.conns = connects;
95 }
96
97 /**
98 *
99 * @param state normally the initial state coming from SAOPSettings
100 * @param logger the Reporter to log to
101 */
102 public SAOP(SAOPState state, Reporter logger) {
103 this(state, logger,
104 new ProtocolToPartyConnections(Collections.emptyList()));
105 }
106
107 @Override
108 public synchronized void start(
109 ProtocolToPartyConnFactory connectionfactory) {
110
111 try {
112 connect(connectionfactory);
113 setDeadline();
114 setupParties();
115 nextTurn();
116 } catch (Throwable e) {
117 handleError("Failed to start up session", null, e);
118 }
119 }
120
121 @Override
122 public String getDescription() {
123 return "All parties get YourTurn in clockwise order, after which they can do their next action. "
124 + "No new participants after start. End after prescribed deadline or when some bid is unanimously Accepted."
125 + "Parties can only act on their own behalf and only when it is their turn.";
126 }
127
128 @Override
129 public void addParticipant(PartyWithProfile party)
130 throws IllegalStateException {
131 throw new IllegalStateException(
132 "Dynamic joining a negotiation is not allowed in SAOP");
133 }
134
135 @Override
136 public SessionState getState() {
137 return state;
138 }
139
140 @Override
141 public ProtocolRef getRef() {
142 return SAOP;
143 }
144
145 /*******************************************************************
146 * private functions. Some are protected only, for testing purposes
147 ********************************************************************/
148 /**
149 * step 1 in protocol: connect all involved parties and start the clock.
150 * This always "succeeds" with a valid (but possibly final) state
151 * <p>
152 * This is 'protected' to allow junit testing, this code is not a 'public'
153 * part of the interface.
154 *
155 * @param connectionfactory the connectionfactory for making party
156 * connections
157 *
158 * @throws InterruptedException if the connection procedure is unterrupted
159 *
160 * @throws IOException if this fails to properly conect to the
161 * parties, eg interrupted or server not
162 * responding..
163 */
164 protected synchronized void connect(
165 ProtocolToPartyConnFactory connectionfactory)
166 throws InterruptedException, IOException {
167 List<PartyWithProfile> participants = state.getSettings()
168 .getAllParties();
169 List<Reference> parties = participants.stream()
170 .map(parti -> (parti.getParty().getPartyRef()))
171 .collect(Collectors.toList());
172 List<ProtocolToPartyConn> connections = null;
173 log.log(Level.INFO, "SAOP connect " + parties);
174 while (connections == null) {
175 try {
176 connections = connectionfactory.connect(parties);
177 } catch (NoResourcesNowException e) {
178 long waitms = e.getLater().getTime()
179 - System.currentTimeMillis();
180 log.log(Level.INFO,
181 "No resources available to run session, waiting"
182 + waitms);
183 Thread.sleep(Math.min(MAX_SLEEP_TIME,
184 Math.max(MIN_SLEEP_TIME, waitms)));
185 }
186 }
187 for (int i = 0; i < participants.size(); i++) {
188 // now we bookkeep the connections ourselves,
189 // and update the state to keep in sync.
190 conns = conns.with(connections.get(i));
191 setState(this.state.with(connections.get(i).getParty(),
192 participants.get(i)));
193 }
194 }
195
196 /**
197 * Set state to proper deadline. Starts the timer tasks. This tasks triggers
198 * a call to handleError when the session times out.
199 */
200 private synchronized void setDeadline() {
201 long now = System.currentTimeMillis();
202 Deadline deadline = state.getSettings().getDeadline();
203 setState(state.with(ProgressFactory.create(deadline, now)));
204 deadlinetimer = new Timer();
205 TimerTask task = new TimerTask() {
206 @Override
207 public void run() {
208 if (!state.isFinal(System.currentTimeMillis())) {
209 log.log(Level.SEVERE,
210 "BUG. Deadline timer has triggered but state is not final");
211 }
212 log.log(Level.INFO,
213 "SAOP deadline reached. Terminating session.");
214 finish();
215 }
216 };
217 // set timer TIME_MARGIN after real deadline to ensure we're not too
218 // early
219 deadlinetimer.schedule(task, deadline.getDuration() + TIME_MARGIN);
220 log.log(Level.INFO, "SAOP deadline set to "
221 + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
222 .format(new Date(System.currentTimeMillis()
223 + deadline.getDuration() + TIME_MARGIN)));
224 }
225
226 /**
227 * step 2 in protocol: listen to connections and send settings to the
228 * parties.
229 * <p>
230 * This is 'protected' to allow junit testing, this code is not a 'public'
231 * part of the interface.
232 *
233 * @throws ProtocolException if a party does not follow the protocol
234 */
235 protected synchronized void setupParties() throws ProtocolException {
236 for (ProtocolToPartyConn conn : conns) {
237 conn.addListener(action -> actionRequest(conn, action));
238 }
239
240 for (ProtocolToPartyConn connection : conns) {
241 try {
242 sendSettings(connection);
243 } catch (IOException e) {
244 throw new ProtocolException("Failed to initialize",
245 connection.getParty(), e);
246 }
247 }
248
249 }
250
251 /**
252 * Inform a party about its settings
253 *
254 * @param connection
255 * @throws IOException if party got disconnected
256 */
257 private synchronized void sendSettings(ProtocolToPartyConn connection)
258 throws IOException {
259 PartyId partyid = connection.getParty();
260 ProfileRef profile = state.getPartyProfiles().get(partyid).getProfile();
261 Parameters params = state.getPartyProfiles().get(partyid).getParty()
262 .getParameters();
263 if (profile == null) {
264 throw new IllegalArgumentException(
265 "Missing profile for party " + connection.getReference());
266 }
267 connection.send(new Settings(connection.getParty(), profile, getRef(),
268 state.getProgress(), params));
269 }
270
271 /**
272 * This is called when one of the party connections does an action.
273 * Synchronized so that we always handle only 1 action at a time.
274 *
275 * @param partyconn the connection on which the action came in.
276 * @param action the {@link Action} taken by some party
277 */
278 protected synchronized void actionRequest(
279 final ProtocolToPartyConn partyconn, final Action action) {
280 if (action == null) {
281 Throwable err = partyconn.getError();
282 if (err == null) {
283 err = new ProtocolException("Party sent a null action",
284 partyconn.getParty());
285 }
286 handleError(partyconn + "Protocol error", partyconn.getParty(),
287 err);
288 return;
289 }
290
291 try {
292 if (!partyconn.getParty().equals(state.getNextActor())) {
293 // party does not have the turn.
294 throw new ProtocolException(
295 "Party acts without having the turn",
296 partyconn.getParty());
297 }
298 // FIXME? this ignores possible broadcast errors
299 conns.broadcast(new ActionDone(action));
300 setState(state.with(partyconn.getParty(), action));
301 if (!state.isFinal(System.currentTimeMillis()))
302 nextTurn();
303 } catch (Throwable e) {
304 handleError("failed to handle action " + action,
305 partyconn.getParty(), e);
306 }
307
308 }
309
310 /**
311 * Signal next participant it's his turn
312 *
313 * @throws IOException
314 */
315 synchronized void nextTurn() {
316 PartyId party = state.getNextActor();
317 try {
318 conns.get(party).send(new YourTurn());
319 } catch (IOException e) {
320 handleError("failed to send YourTurn", party, e);
321 }
322 }
323
324 /**
325 * Update state to include the given error and finishes up the session.
326 *
327 * @param message The message to attach to the error
328 * @param party the party where the error occured. Can be null in
329 * exceptional cases.
330 * @param e the exception that occured.
331 */
332 private synchronized void handleError(final String message,
333 final PartyId party, final Throwable e) {
334 if (e instanceof ProtocolException) {
335 setState(state.with((ProtocolException) e));
336
337 } else {
338 setState(state.with(new ProtocolException(message, party, e)));
339 }
340 if (party != null)
341 setState(state.withoutParty(party));
342 log.log(Level.WARNING, "SAOP protocol intercepted error due to party "
343 + party + ":" + message, e);
344 }
345
346 /**
347 * Sets the new state. If the new state is final, the finish-up procedure is
348 * executed.
349 *
350 * @param newstate the new state.
351 */
352 private synchronized void setState(SAOPState newstate) {
353 long now = System.currentTimeMillis();
354 if (state.isFinal(now)) {
355 finish();
356 return;
357 }
358 this.state = newstate;
359 if (newstate.isFinal(now)) {
360 finish();
361 }
362 }
363
364 /**
365 * Called when we reach final state. Cancels deadline timer. Send finished
366 * info to all parties, notify current nego state as final and set
367 * {@link #isFinishedInfoSent}. Double calls are automatically ignored.
368 */
369 private synchronized void finish() {
370 if (deadlinetimer != null) {
371 deadlinetimer.cancel();
372 deadlinetimer = null;
373 }
374 if (!isFinishedInfoSent.compareAndSet(false, true))
375 return;
376 Inform finished = new Finished(state.getAgreements());
377 for (ProtocolToPartyConn conn : conns) {
378 sendFinish(conn, finished);
379 }
380 notifyListeners(new CurrentNegoState(state));
381 }
382
383 private void sendFinish(ProtocolToPartyConn connection, Inform finished) {
384 try {
385 connection.send(finished);
386 connection.close();
387 } catch (Exception e) {
388 log.log(Level.INFO, "Failed to send Finished to " + connection, e);
389 }
390 }
391
392}
Note: See TracBrowser for help on using the repository browser.