source: protocol/src/main/java/geniusweb/protocol/session/amop/AMOP.java@ 52

Last change on this file since 52 was 52, checked in by ruud, 14 months ago

Fixed small issues in domaineditor.

File size: 14.7 KB
Line 
1package geniusweb.protocol.session.amop;
2
3import java.io.IOException;
4import java.sql.Date;
5import java.text.SimpleDateFormat;
6import java.util.List;
7import java.util.Map;
8import java.util.logging.Level;
9import java.util.stream.Collectors;
10
11import geniusweb.actions.Action;
12import geniusweb.actions.EndNegotiation;
13import geniusweb.actions.Offer;
14import geniusweb.actions.PartyId;
15import geniusweb.actions.Vote;
16import geniusweb.actions.Votes;
17import geniusweb.deadline.Deadline;
18import geniusweb.events.ProtocolEvent;
19import geniusweb.inform.ActionDone;
20import geniusweb.inform.Agreements;
21import geniusweb.inform.Finished;
22import geniusweb.inform.Inform;
23import geniusweb.inform.Settings;
24import geniusweb.inform.Voting;
25import geniusweb.inform.YourTurn;
26import geniusweb.issuevalue.Bid;
27import geniusweb.progress.ProgressFactory;
28import geniusweb.protocol.CurrentNegoState;
29import geniusweb.protocol.ProtocolException;
30import geniusweb.protocol.partyconnection.ProtocolToPartyConn;
31import geniusweb.protocol.partyconnection.ProtocolToPartyConnFactory;
32import geniusweb.protocol.partyconnection.ProtocolToPartyConnections;
33import geniusweb.protocol.session.SessionProtocol;
34import geniusweb.protocol.session.SessionSettings;
35import geniusweb.protocol.session.amop.AMOPState.Phase;
36import geniusweb.references.Parameters;
37import geniusweb.references.PartyWithProfile;
38import geniusweb.references.ProfileRef;
39import geniusweb.references.ProtocolRef;
40import geniusweb.references.Reference;
41import tudelft.utilities.listener.DefaultListenable;
42import tudelft.utilities.logging.Reporter;
43import tudelft.utilities.repository.NoResourcesNowException;
44
45/**
46 *
47 * All parties are first sent the {@link SessionSettings}. Only parties
48 * specified initially in the settings do participate.
49 *
50 * <h2>parameter</h2> AMOP parties can receive a parameter: minVotes containing
51 * a Double. If set, the AMOP protocol checks that all {@link Votes} from that
52 * party have {@link Vote#getMinPower()} &gt; minVotes.
53 * <h2>Protocol steps</h2>
54 * <ol>
55 * <li>The protocol tries to start all parties. If not all parties start, the
56 * parties are freed up and another attempt is done to start all parties some
57 * time later.
58 * <li>the variable remainingparties = {all parties}
59 * <li>The session deadline clock now starts ticking.
60 * <li>All parties are sent their settings.
61 * <li>Loop until {@link Deadline} is reached or |remainingparties|&lt;2:
62 * <ol>
63 * <li>protocol sends {@link YourTurn} to all remainingparties. Each party now
64 * must submit an {@link Offer} within {@link #PHASE_TIME} seconds. If a party
65 * fails to submit it is send Finished and removed from from remainingparties.
66 * <li>protocol sends a {@link Voting} containing a List of {@link Bid}
67 * containing all received {@link Bid}s. Each party must place his {@link Votes}
68 * within {@link #PHASE_TIME} seconds. If a party does not submit, it is send a
69 * {@link Finished} and removed from remainingparties. Previous votes for the
70 * same bid do not count. But see {@link Agreements}.
71 * <li>The protocol determines a maximum sized subset of size N&ge;2 of the
72 * votes for bid B for which the vote conditions hold. If there is such a
73 * subset, the parties that placed these votes reached an agreement. They are
74 * added to the agreement set, sent the {@link Finished} info and terminated.
75 * They are removed from the remainingparties.
76 * </ol>
77 * <li>Any remaining parties are sent a {@link Finished} object without
78 * agreement bid and are terminated.
79 * </ol>
80 * <p>
81 * A party can send {@link EndNegotiation} at any time to remove itself from the
82 * negotiation. The other parties may continue anyawy without him.
83 * <p>
84 * Parties receive the {@link Finished} information at the very end when all
85 * agreements have been collected.
86 * <p>
87 * This logs to "Protocol" logger if there are issues
88 * <p>
89 * This object is mutable: the internal state changes as parties interact with
90 * the protocol.
91 * <p>
92 * Threadsafe: all entrypoints are synhronized.
93 */
94public class AMOP extends DefaultListenable<ProtocolEvent>
95 implements SessionProtocol {
96 private static final long PHASE_TIME = 30000; // millis
97 private static final long TIME_MARGIN = 20l;// ms extra delay after deadline
98 private static final int MINDURATION = 100;
99 private static final int MIN_SLEEP_TIME = 1000;
100 private static final int MAX_SLEEP_TIME = 60000;
101 private static final ProtocolRef AMOP = new ProtocolRef("AMOP");
102
103 private final Reporter log;
104 private AMOPState state = null; // mutable!
105 private volatile WillBeCalled finish = new WillBeCalled(() -> finish(),
106 null);
107 private volatile WillBeCalled endPhase = new WillBeCalled(() -> endPhase(),
108 null);
109
110 /**
111 *
112 * @param state normally the initial state coming from SAOPSettings
113 * @param logger the {@link Reporter} to use
114 */
115 public AMOP(AMOPState state, Reporter logger) {
116 if (state == null) {
117 throw new NullPointerException("state must be not null");
118 }
119 if (state.getSettings().getDeadline().getDuration() < MINDURATION) {
120 throw new IllegalArgumentException(
121 "Duration must be at least " + MINDURATION);
122 }
123 if (logger == null) {
124 throw new NullPointerException("Logger must be not null");
125 }
126 this.log = logger;
127 this.state = state;
128 }
129
130 @Override
131 public synchronized void start(
132 ProtocolToPartyConnFactory connectionfactory) {
133 try {
134 // we're in Phase.INIT still
135 connect(connectionfactory);
136 // this moves to
137 setDeadline();
138 setupParties();
139 endPhase.complete(); // end of INIT
140 } catch (Throwable e) {
141 handleError("Failed to start up session", new PartyId("UNKNOWN"),
142 e);
143 }
144 }
145
146 @Override
147 public String getDescription() {
148 return "All parties get YourTurn. They now can submit their Offer within 30 seconds. "
149 + "Next they receive a ElicitComparison. They can now submit their Votes."
150 + "If one of their Vote succeeds, they finish with an agreement. "
151 + "Parties that did not yet reach agreement continue negotiating. "
152 + "A parameter 'minVotes', if set, contains the the smallest value to be used in each Vote";
153 }
154
155 @Override
156 public void addParticipant(PartyWithProfile party)
157 throws IllegalStateException {
158 throw new IllegalStateException(
159 "Dynamic joining a negotiation is not supported in AMOP");
160 }
161
162 @Override
163 public AMOPState getState() {
164 return state;
165 }
166
167 @Override
168 public ProtocolRef getRef() {
169 return AMOP;
170 }
171
172 /*******************************************************************
173 * private functions. Some are protected only, for testing purposes
174 ********************************************************************/
175 /**
176 * step 1 in protocol: connect all involved parties and start the clock.
177 * This always "succeeds" with a valid (but possibly final) state
178 * <p>
179 * This is 'protected' to allow junit testing, this code is not a 'public'
180 * part of the interface.
181 *
182 * @param connectionfactory the connectionfactory for making party
183 * connections
184 *
185 * @throws InterruptedException if the connection procedure is unterrupted
186 *
187 * @throws IOException if this fails to properly conect to the
188 * parties, eg interrupted or server not
189 * responding..
190 */
191 protected synchronized void connect(
192 ProtocolToPartyConnFactory connectionfactory)
193 throws InterruptedException, IOException {
194 List<PartyWithProfile> participants = state.getSettings()
195 .getAllParties();
196 List<Reference> parties = participants.stream()
197 .map(parti -> (parti.getParty().getPartyRef()))
198 .collect(Collectors.toList());
199 List<ProtocolToPartyConn> connections = null;
200 log.log(Level.INFO, "AMOP connect " + parties);
201 while (connections == null) {
202 try {
203 connections = connectionfactory.connect(parties);
204 } catch (NoResourcesNowException e) {
205 long waitms = e.getLater().getTime()
206 - System.currentTimeMillis();
207 log.log(Level.INFO,
208 "No resources available to run session, waiting"
209 + waitms);
210 Thread.sleep(Math.min(MAX_SLEEP_TIME,
211 Math.max(MIN_SLEEP_TIME, waitms)));
212 }
213 }
214 for (int i = 0; i < participants.size(); i++) {
215 setState(this.state.with(connections.get(i), participants.get(i)));
216 }
217 }
218
219 /**
220 * overridable factory method, used for testing.
221 *
222 * @param r the {@link Runnable} that will be called
223 * @param delayMs the time after which r will be called
224 * @return new WillBeCalled(r,delayms)
225 */
226 protected WillBeCalled createWillBeCalled(Runnable r, Long delayMs) {
227 return new WillBeCalled(r, delayMs);
228 }
229
230 /**
231 * Set state to proper deadline. Starts the timer tasks. This tasks triggers
232 * a call to handleError when the session times out.
233 */
234 private void setDeadline() {
235 long now = System.currentTimeMillis();
236 Deadline deadline = state.getSettings().getDeadline();
237 setState(state.with(ProgressFactory.create(deadline, now)));
238
239 finish = createWillBeCalled(() -> finish(),
240 TIME_MARGIN + deadline.getDuration());
241 log.log(Level.INFO, "AMOP deadline set to "
242 + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
243 .format(new Date(System.currentTimeMillis()
244 + deadline.getDuration() + TIME_MARGIN)));
245 }
246
247 /**
248 * step 2 in protocol: listen to connections and send settings to the
249 * parties.
250 * <p>
251 * This is 'protected' to allow junit testing, this code is not a 'public'
252 * part of the interface.
253 *
254 * @throws ProtocolException if a party does not follow the protocol
255 */
256 protected synchronized void setupParties() throws ProtocolException {
257 for (ProtocolToPartyConn conn : state.getConnections()) {
258 conn.addListener(action -> actionRequest(conn, action));
259 }
260
261 for (ProtocolToPartyConn connection : state.getConnections()) {
262 try {
263 sendSettings(connection);
264 } catch (IOException e) {
265 throw new ProtocolException("Failed to initialize",
266 connection.getParty(), e);
267 }
268 }
269
270 }
271
272 /**
273 * Inform a party about its settings
274 *
275 * @param connection
276 * @throws IOException if party got disconnected
277 */
278 private synchronized void sendSettings(ProtocolToPartyConn connection)
279 throws IOException {
280 PartyId partyid = connection.getParty();
281 ProfileRef profile = state.getPartyProfiles().get(partyid).getProfile();
282 Parameters params = state.getPartyProfiles().get(partyid).getParty()
283 .getParameters();
284 if (profile == null) {
285 throw new IllegalArgumentException(
286 "Missing profile for party " + connection.getReference());
287 }
288 connection.send(new Settings(connection.getParty(), profile, getRef(),
289 state.getProgress(), params));
290 }
291
292 /**
293 * This is called when one of the {@link ProtocolToPartyConn}s does an
294 * action. Synchronized so that we always handle only 1 action at a time.
295 *
296 * @param partyconn the connection on which the action came in
297 * @param action the {@link Action} taken by some party
298 */
299 protected synchronized void actionRequest(
300 final ProtocolToPartyConn partyconn, final Action action) {
301 if (action == null) {
302 Throwable err = partyconn.getError();
303 if (err == null) {
304 err = new ProtocolException("Party sent a null action",
305 partyconn.getParty());
306 }
307 handleError(partyconn + "Protocol error", partyconn.getParty(),
308 err);
309 return;
310 }
311
312 try {
313 setState(state.with(partyconn.getParty(), action));
314 } catch (Throwable e) {
315 handleError("failed to handle action " + action,
316 partyconn.getParty(), e);
317 }
318 // we ignore broadcast errors here, may need to be fixed.
319 broadcast(new ActionDone(action));
320
321 if (state.isAllPartiesActed()) {
322 endPhase.complete();
323 }
324
325 }
326
327 /**
328 * The current phase is completed. Proceed to next phase as needed. Reset
329 * the deadline timers and inform parties. Increase progress if necessary.
330 * Must only be called through {@link #endPhase} to ensure this is called
331 * only once.
332 */
333 private synchronized void endPhase() {
334 if (state.isFinal(System.currentTimeMillis()))
335 return;
336
337 List<Offer> bids = null;
338 if (state.getPhase() == Phase.OFFER) {
339 bids = state.getPhaseActions().values().stream()
340 .map(act -> ((Offer) act)).collect(Collectors.toList());
341 }
342
343 setState(state.nextPhase());
344
345 if (state.getPhase() == Phase.OFFER) {
346 broadcast(new YourTurn());
347 } else {
348 Map<PartyId, Integer> powers = state.getActiveParties().stream()
349 .collect(Collectors.toMap(pid -> pid, pid -> 1));
350 broadcast(new Voting(bids, powers));
351 }
352 /**
353 * Really tricky but we can safely replace {@link #endPhase}: all
354 * functions are synchronized so the state is always consistent when
355 * this is called. MNMMM Yes and then?? need proof that not called right
356 * after replace. I think we need proof that #endPhase is always
357 * consistent with state , that phase in state is coupled
358 */
359 endPhase = createWillBeCalled(() -> endPhase(),
360 PHASE_TIME + TIME_MARGIN);
361 }
362
363 /**
364 * Alternative to {@link ProtocolToPartyConnections#broadcast(Inform)},
365 * sending only to active parties
366 *
367 * @param info the info to broadcast
368 */
369 private void broadcast(Inform info) {
370 for (PartyId party : state.getActiveParties()) {
371 try {
372 state.getConnections().get(party).send(info);
373 } catch (IOException e) {
374 handleError("Party seems to have disconnected", party, e);
375 }
376 }
377 }
378
379 /**
380 * Update state to include the given error and closes the party.
381 *
382 * @param message The message to attach to the error
383 * @param party the party where the error occured
384 * @param e the exception that occured.
385 */
386 private synchronized void handleError(final String message,
387 final PartyId partyid, final Throwable e) {
388 if (e instanceof ProtocolException) {
389 setState(state.with(partyid, (ProtocolException) e));
390 } else {
391 setState(state.with(partyid,
392 new ProtocolException(message, partyid, e)));
393 }
394 log.log(Level.WARNING,
395 "AMOP intercepted error from party " + partyid + ":" + message,
396 e);
397 }
398
399 /**
400 * Sets the new state. If the new state is final, the finish-up procedure is
401 * executed. All state changes go through here to ensure we check isFinal
402 * before any state change.
403 *
404 * @param newstate the new state.
405 */
406 private synchronized void setState(AMOPState newstate) {
407 long now = System.currentTimeMillis();
408 if (state.isFinal(now)) {
409 finish.complete();
410 return;
411 }
412 this.state = newstate;
413 if (newstate.isFinal(now)) {
414 finish.complete();
415 }
416 }
417
418 /**
419 * Called when we reach final state. Must be called through {@link #finish}
420 * to ensure this is called only once. Send finished info to all parties,
421 * notify current nego state as final and set {@link #isFinishedInfoSent}.
422 * Double calls are automatically ignored.
423 */
424 private synchronized void finish() {
425 Finished finished = new Finished(state.getAgreements());
426 for (ProtocolToPartyConn conn : state.getConnections()) {
427 try {
428 conn.send(finished);
429 conn.close();
430 } catch (Exception e) {
431 log.log(Level.INFO, "Failed to send Finished to " + conn, e);
432 }
433 }
434 notifyListeners(new CurrentNegoState(state));
435 }
436
437}
Note: See TracBrowser for help on using the repository browser.