source: protocol/src/main/java/geniusweb/protocol/session/mopac2/MOPAC2.java@ 52

Last change on this file since 52 was 52, checked in by ruud, 14 months ago

Fixed small issues in domaineditor.

File size: 13.1 KB
Line 
1package geniusweb.protocol.session.mopac2;
2
3import java.io.IOException;
4import java.util.Collections;
5import java.util.List;
6import java.util.Timer;
7import java.util.TimerTask;
8import java.util.logging.Level;
9import java.util.stream.Collectors;
10
11import geniusweb.actions.Action;
12import geniusweb.actions.EndNegotiation;
13import geniusweb.actions.Offer;
14import geniusweb.actions.PartyId;
15import geniusweb.actions.Votes;
16import geniusweb.deadline.Deadline;
17import geniusweb.events.ProtocolEvent;
18import geniusweb.inform.Agreements;
19import geniusweb.inform.Finished;
20import geniusweb.inform.Inform;
21import geniusweb.inform.Settings;
22import geniusweb.inform.Voting;
23import geniusweb.inform.YourTurn;
24import geniusweb.issuevalue.Bid;
25import geniusweb.progress.ProgressFactory;
26import geniusweb.protocol.CurrentNegoState;
27import geniusweb.protocol.ProtocolException;
28import geniusweb.protocol.partyconnection.ProtocolToPartyConn;
29import geniusweb.protocol.partyconnection.ProtocolToPartyConnFactory;
30import geniusweb.protocol.partyconnection.ProtocolToPartyConnections;
31import geniusweb.protocol.session.SessionProtocol;
32import geniusweb.protocol.session.SessionSettings;
33import geniusweb.protocol.session.mopac.phase.Phase;
34import geniusweb.references.Parameters;
35import geniusweb.references.PartyWithProfile;
36import geniusweb.references.ProfileRef;
37import geniusweb.references.ProtocolRef;
38import geniusweb.references.Reference;
39import tudelft.utilities.listener.DefaultListenable;
40import tudelft.utilities.logging.Reporter;
41import tudelft.utilities.repository.NoResourcesNowException;
42
43/**
44 *
45 * All parties are first sent the {@link SessionSettings}. Only parties
46 * specified initially in the settings do participate (no late joining in)
47 *
48 * <h2>parameter</h2> MOPAC parties can receive a parameter <code>power</code>
49 * containing a Integer. This parameter is picked up by this protocol, for
50 * handling the agreement extraction from the offers. Power=1 for parties that
51 * do not have the power parameter.
52 * <h2>Protocol steps</h2>
53 *
54 * <ol>
55 * <li>The protocol tries to start all parties. If not all parties start, the
56 * parties are freed up and another attempt is done to start all parties some
57 * time later.
58 * <li>The session deadline clock now starts ticking.
59 * <li>remainingparties are sent their settings.
60 * <li>Loop until {@link Deadline} is reached or |remainingparties| &le; 2:
61 * <ol>
62 * <li>protocol sends {@link YourTurn} to all remainingparties. Each party now
63 * must submit an {@link Offer} within {@link Phase#PHASE_MAXTIME} seconds. If a
64 * party fails to submit it is removed from from remainingparties.
65 * <li>protocol sends a {@link Voting} containing a List of Bid containing all
66 * received {@link Bid}s. Each party must place his {@link Votes} within the
67 * provided deadline. If a party does not submit, it is terminated and removed
68 * from remainingparties. Previous votes for the same bid do not count. But see
69 * {@link Agreements}. A party that misbehaves after submitting its vote is
70 * removed but it Votes remain standing.
71 * <li>protocol sends to all remainingparties a OptIn containing all received
72 * votes. Each party now must submit again a {@link Votes} object within the
73 * deadline. This new Votes must equal or extend the previous Votes of the
74 * party.
75 * <li>The protocol uses the {@link Phase#getEvaluator()} to determine the
76 * agreements from the votes. If there are agreements, the parties that placed
77 * these votes reached an agreement. They are moved to the agreement set and
78 * removed from the remainingparties.
79 * </ol>
80 * <li>Any remaining parties are sent a {@link Finished} object without
81 * agreement bid and are terminated.
82 * </ol>
83 * <p>
84 * A party can send {@link EndNegotiation} at any time to remove itself from the
85 * negotiation. The other parties may continue anyawy without him.
86 * <p>
87 * Parties receive the {@link Finished} information at the very end when all
88 * agreements have been collected.
89 * <p>
90 * This logs to "Protocol" logger if there are issues
91 * <p>
92 * This object is mutable: the internal state changes as parties interact with
93 * the protocol.
94 * <p>
95 * Threadsafe: all entrypoints are synhronized.
96 */
97public class MOPAC2 extends DefaultListenable<ProtocolEvent>
98 implements SessionProtocol {
99 /**
100 * Tech notes. 0. Everything here is phase driven and runs due to callback
101 * from parties. 1. Connect all parties. 2. If any party fails at any time,
102 * add it to exceptions list in the state 3. Run timer to check the phase at
103 * possible end time. The timer just checks the state, and therefore it does
104 * no harm if it is double checked. 4. We run phases always with either
105 * PHASE_TIME or time to the global deadline, whichever comes first.
106 * Therefore phases naturally end at end of nego. 5. Phases must have at
107 * least {@link #MINDURATION} otherwise it makes no sense to even start it.
108 */
109 private static final int MINDURATION = 100;
110 private static final int MIN_SLEEP_TIME = 1000;
111 private static final int MAX_SLEEP_TIME = 60000;
112 private static final ProtocolRef MOPAC2 = new ProtocolRef("MOPAC2");
113
114 private final Reporter log;
115 private MOPAC2State state = null; // mutable!
116 /**
117 * the existing party connections. we assume ownership of this so it should
118 * not be modified although connections may of course break. mutable!
119 */
120 private ProtocolToPartyConnections connections = new ProtocolToPartyConnections(
121 Collections.emptyList());
122
123 /**
124 * Set to true after all is done: we sent final outcomes. This is needed
125 * because we can't tell from state what we did at the very end.
126 */
127 private boolean finished = false;
128 private Timer timer = null;
129
130 /**
131 *
132 * @param state normally the initial state coming from SAOPSettings
133 * @param logger the {@link Reporter} to use
134 */
135 public MOPAC2(MOPAC2State state, Reporter logger) {
136 if (state == null) {
137 throw new NullPointerException("state must be not null");
138 }
139 if (state.getSettings().getDeadline().getDuration() < MINDURATION) {
140 throw new IllegalArgumentException(
141 "Duration must be at least " + MINDURATION);
142 }
143 if (logger == null) {
144 throw new NullPointerException("Logger must be not null");
145 }
146 this.log = logger;
147 this.state = state;
148 }
149
150 @Override
151 public synchronized void start(
152 ProtocolToPartyConnFactory connectionfactory) {
153 try {
154 connect(connectionfactory);
155 } catch (Throwable e) {
156 /** We can't {@link #handleError} yet. FIXME */
157 throw new RuntimeException("Failed to connect", e);
158 }
159 long now = System.currentTimeMillis();
160 state = state.initPhase(
161 ProgressFactory.create(state.getSettings().getDeadline(), now),
162 now);
163 setupParties(now);
164 startPhase(now);
165
166 }
167
168 @Override
169 public String getDescription() {
170 return "All parties get YourTurn. They now can submit their Offer within 30 seconds. "
171 + "Next they receive a Voting. They can now submit their Votes."
172 + "Then they receive a a OptIn, now they can widen up their Votes."
173 + "If one of their Vote succeeds, they finish with an agreement. "
174 + "The VotingEvaluator setting determines the exact voting behaviour "
175 + "and if this process repeats";
176 }
177
178 @Override
179 public void addParticipant(PartyWithProfile party)
180 throws IllegalStateException {
181 throw new IllegalStateException(
182 "Dynamic joining a negotiation is not supported in AMOP");
183 }
184
185 @Override
186 public MOPAC2State getState() {
187 return state;
188 }
189
190 @Override
191 public ProtocolRef getRef() {
192 return MOPAC2;
193 }
194
195 /*******************************************************************
196 * private functions. Some are protected only, for testing purposes
197 ********************************************************************/
198 /**
199 * step 1 in protocol: connect all involved parties and start the clock.
200 * This always "succeeds" with a valid (but possibly final) state
201 * <p>
202 * This is 'protected' to allow junit testing, this code is not a 'public'
203 * part of the interface.
204 *
205 * @param connectionfactory the connectionfactory for making party
206 * connections
207 *
208 * @throws InterruptedException if the connection procedure is unterrupted
209 *
210 * @throws IOException if this fails to properly conect to the
211 * parties, eg interrupted or server not
212 * responding..
213 */
214 protected synchronized void connect(
215 ProtocolToPartyConnFactory connectionfactory)
216 throws InterruptedException, IOException {
217 List<PartyWithProfile> participants = state.getSettings()
218 .getAllParties();
219 List<Reference> parties = participants.stream()
220 .map(parti -> (parti.getParty().getPartyRef()))
221 .collect(Collectors.toList());
222 log.log(Level.INFO, "MOPAC connect " + parties);
223 List<ProtocolToPartyConn> newconnections = null;
224 while (newconnections == null) {
225 try {
226 newconnections = connectionfactory.connect(parties);
227 } catch (NoResourcesNowException e) {
228 long waitms = e.getLater().getTime()
229 - System.currentTimeMillis();
230 log.log(Level.INFO,
231 "No resources available to run session, waiting"
232 + waitms);
233 Thread.sleep(Math.min(MAX_SLEEP_TIME,
234 Math.max(MIN_SLEEP_TIME, waitms)));
235 }
236 }
237 for (int i = 0; i < participants.size(); i++) {
238 ProtocolToPartyConn conn = newconnections.get(i);
239 connections = connections.with(conn);
240 state = state.with(conn.getParty(), participants.get(i));
241 }
242 }
243
244 /**
245 * step 2 in protocol: listen to connections and send settings to the
246 * parties.
247 * <p>
248 * This is 'protected' to allow junit testing, this code is not a 'public'
249 * part of the interface.
250 *
251 * @param now the current time.
252 */
253 protected synchronized void setupParties(long now) {
254 for (ProtocolToPartyConn conn : connections) {
255 conn.addListener(action -> actionRequest(conn, action, now));
256 }
257 for (ProtocolToPartyConn connection : connections) {
258 try {
259 sendSettings(connection);
260 } catch (IOException e) {
261 state = state.with(new ProtocolException("Failed to initialize",
262 connection.getParty()));
263 }
264 }
265 }
266
267 /**
268 * Send the {@link Inform} for the current phase to the remaining parties
269 * and start the time-out checker
270 */
271 private void startPhase(long now) {
272 Inform info = state.getPhase().getInform();
273 for (PartyId pid : state.getPhase().getPartyStates().getNotYetActed()) {
274 try {
275 connections.get(pid).send(info);
276 } catch (IOException e) {
277 state = state.with(new ProtocolException(
278 "Party seems to have disconnected", pid, e));
279 }
280 }
281 startTimer(1 + state.getPhase().getDeadline() - now);
282 }
283
284 /**
285 * Check at given deadline if we already ended the phase.
286 *
287 * @param deadn the deadline
288 */
289 private void startTimer(long deadln) {
290 if (timer != null)
291 throw new IllegalStateException("Timer is still running!");
292 timer = new Timer("deadlinetimer");
293 timer.schedule(new TimerTask() {
294 @Override
295 public void run() {
296 checkEndPhase(System.currentTimeMillis());
297 }
298 }, deadln);
299
300 }
301
302 /**
303 * Inform a party about its settings
304 *
305 * @param connection
306 * @throws IOException if party got disconnected
307 */
308 private synchronized void sendSettings(ProtocolToPartyConn connection)
309 throws IOException {
310 PartyId partyid = connection.getParty();
311 ProfileRef profile = state.getPartyProfiles().get(partyid).getProfile();
312 Parameters params = state.getPartyProfiles().get(partyid).getParty()
313 .getParameters();
314 connection.send(new Settings(connection.getParty(), profile, getRef(),
315 state.getProgress(), params));
316 }
317
318 /**
319 * This is called when one of the {@link ProtocolToPartyConn}s does an
320 * action. Synchronized so that we always handle only 1 action at a time.
321 *
322 * @param partyconn the connection on which the action came in
323 * @param action the {@link Action} taken by some party
324 * @param now current time
325 */
326 protected synchronized void actionRequest(
327 final ProtocolToPartyConn partyconn, final Action action,
328 long now) {
329 if (finished)
330 return;
331 state = state.with(partyconn.getParty(), action, now);
332 checkEndPhase(System.currentTimeMillis());
333 }
334
335 /**
336 * The current phase may be completed. We check, because it may already been
337 * handled. Proceed to next phase as needed. Reset the deadline timers and
338 * inform parties. Increase progress if necessary. Must only be called
339 * through {@link #endPhase} to ensure this is called only once.
340 *
341 */
342 private synchronized void checkEndPhase(long now) {
343 if (!state.getPhase().isFinal(now))
344 return;
345 // phase indeed ended. Check what's up.
346 if (timer != null) {
347 timer.cancel();
348 timer = null;
349 }
350 state = state.finishPhase();
351 if (state.isFinal(now)) {
352 endNegotiation();
353 return;
354 }
355
356 state = state.nextPhase(now);
357
358 startPhase(now);
359 }
360
361 /**
362 * To be called when we reach final state. Must only be called if
363 * {@link MOPAC2State#isFinal(long)}. Send finished info to all parties.
364 * Double calls are automatically ignored using the global finished flag.
365 */
366 private synchronized void endNegotiation() {
367 if (finished)
368 return;
369 finished = true;
370 Finished info = new Finished(state.getAgreements());
371 for (ProtocolToPartyConn conn : connections) {
372 try {
373 conn.send(info);
374 conn.close();
375 } catch (Exception e) {
376 log.log(Level.INFO, "Failed to send Finished to " + conn, e);
377 }
378 }
379 notifyListeners(new CurrentNegoState(state));
380 }
381
382}
Note: See TracBrowser for help on using the repository browser.