source: src/main/java/genius/core/session/SessionManager.java

Last change on this file was 127, checked in by Wouter Pasman, 6 years ago

#41 ROLL BACK of rev.126 . So this version is equal to rev. 125

File size: 15.9 KB
Line 
1package genius.core.session;
2
3import java.util.ArrayList;
4import java.util.HashMap;
5import java.util.List;
6import java.util.Map;
7import java.util.concurrent.Callable;
8import java.util.concurrent.ExecutionException;
9import java.util.concurrent.TimeoutException;
10
11import genius.core.AgentID;
12import genius.core.Bid;
13import genius.core.DeadlineType;
14import genius.core.actions.Action;
15import genius.core.events.AgentLogEvent;
16import genius.core.events.BrokenPartyException;
17import genius.core.events.MultipartyNegoActionEvent;
18import genius.core.events.NegotiationEvent;
19import genius.core.events.RecoverableSessionErrorEvent;
20import genius.core.events.SessionEndedNormallyEvent;
21import genius.core.events.SessionFailedEvent;
22import genius.core.exceptions.NegotiationPartyTimeoutException;
23import genius.core.list.Tuple;
24import genius.core.listener.DefaultListenable;
25import genius.core.listener.Listener;
26import genius.core.parties.NegotiationParty;
27import genius.core.parties.NegotiationPartyInternal;
28import genius.core.persistent.PersistentDataContainer;
29import genius.core.persistent.PersistentDataType;
30import genius.core.protocol.MultilateralProtocol;
31import genius.core.protocol.Protocol;
32import genius.core.timeline.DiscreteTimeline;
33
34/**
35 * The {@link SessionManager} is responsible for enforcing the
36 * {@link MultilateralProtocol} during the {@link Session}. This is the entry
37 * point for the negotiation algorithm. The protocol and session parameters are
38 * passed on from the GUI.
39 *
40 * This logs all events to the {@link Listener}. You need to subscribe to hear
41 * the log events, eg for display or for writing to file.
42 *
43 *
44 * @author David Festen
45 *
46 */
47public class SessionManager extends DefaultListenable<NegotiationEvent>
48 implements Runnable {
49
50 private static final int SESSION_ENDED_MAXTIME = 1000;
51
52 private static final long SAVE_INFO_MAXTIME = 1000;
53
54 private final Session session;
55
56 // participating parties with all the session and utilspace info
57 private final List<NegotiationPartyInternal> partiesInternal;
58
59 private ExecutorWithTimeout executor;
60
61 /**
62 * just the parties of this session. Basically a copy of
63 * {@link #partiesInternal}. Needed by the {@link Protocol}.
64 */
65 private ArrayList<NegotiationParty> parties;
66
67 /**
68 * We need to collect this, for the updates on persistentData.
69 */
70 private List<Action> actions = new ArrayList<Action>();
71
72 /**
73 * map of agent name and a short name for his profile. Needed for by
74 * {@link PersistentDataContainer}.
75 */
76 private Map<String, String> profiles = new HashMap<>();
77
78 private SessionConfiguration config;
79
80 /**
81 * Initializes a new instance of the {@link SessionManager} object. After
82 * initialization this {@link SessionManager} can be {@link #run()}.
83 *
84 * @param theparties
85 * The parties to use in this session (including agents and
86 * optionally mediators)
87 * @param session
88 * A session object containing preset information (can also be a
89 * new instance)
90 * @param exec
91 * the executor to use when running
92 */
93 public SessionManager(SessionConfiguration config,
94 List<NegotiationPartyInternal> theparties, Session session,
95 ExecutorWithTimeout exec) {
96 this.config = config;
97 this.session = session;
98 this.partiesInternal = theparties;
99 this.executor = exec;
100
101 parties = new ArrayList<NegotiationParty>();
102 for (NegotiationPartyInternal p : theparties) {
103 parties.add(p.getParty());
104 profiles.put(p.getID().toString(),
105 "Profile" + p.getUtilitySpace().getName().hashCode());
106 }
107 }
108
109 /**
110 * Run and wait for completion. Can be used from a thread. Throws from the
111 * underlying call are wrapped in a {@link RuntimeException} because
112 * {@link #run()} doesn't allow throwing checked exceptions.
113 */
114 public void run() {
115 try {
116 runAndWait();
117 } catch (Exception e) {
118 throw new RuntimeException("Run failed:" + e.getMessage(), e);
119 }
120 }
121
122 /**
123 * Runs the negotiation session and wait for it to complete. After the run
124 * (or failure) of the session the listeners are notified. All events,
125 * success and errors are reported to the listeners. Should not throw any
126 * checked exceptions.
127 *
128 */
129 public void runAndWait() {
130 // force GC to clean up mess of previous runs.
131 System.gc();
132 Bid agreement = null;
133
134 try {
135 executeProtocol();
136 agreement = session.getInfo().getProtocol()
137 .getCurrentAgreement(session, parties);
138 notifyChange(new SessionEndedNormallyEvent(session, agreement,
139 partiesInternal));
140 } catch (Exception e) {
141 notifyChange(new SessionFailedEvent(new BrokenPartyException(
142 "Failed to execute protocol", config, session, e)));
143 }
144
145 // Session completed (maybe not succesfully). functions in here
146 // should not throw, this is just aftermath.
147
148 // do the agents aftermath
149 callPartiesSessionEnded(agreement);
150 savePartiesInfo(agreement);
151 }
152
153 /**
154 * Save the {@link PersistentDataType} of the parties if the type requires
155 * saving. If this fails, we just record an AfterSessionErrorEvent.
156 *
157 * @throws TimeoutException
158 * @throws ExecutionException
159 */
160 private void savePartiesInfo(Bid agreementBid) {
161 for (final NegotiationPartyInternal party : partiesInternal) {
162 final Tuple<Bid, Double> agreement = new Tuple<>(agreementBid,
163 party.getUtility(agreementBid));
164 try {
165 new ExecutorWithTimeout(SAVE_INFO_MAXTIME).execute(
166 "saving info for " + party.getID(),
167 new Callable<String>() {
168
169 @Override
170 public String call() throws Exception {
171 party.saveStorage(actions, profiles, agreement);
172 return null;
173 }
174 });
175 } catch (TimeoutException | ExecutionException e) {
176 notifyChange(
177 new RecoverableSessionErrorEvent(session, party, e));
178 }
179
180 }
181 }
182
183 /**
184 * Tell all parties {@link NegotiationParty#negotiationEnded(Bid)}.
185 *
186 * @param agreement
187 * the agreement bid, or null if no agreement was reached.
188 */
189 private void callPartiesSessionEnded(final Bid agreement) {
190 for (final NegotiationPartyInternal party : partiesInternal) {
191 try {
192 Map<String, String> result = new ExecutorWithTimeout(
193 SESSION_ENDED_MAXTIME).execute(
194 "session ended " + party.getID(),
195 new Callable<Map<String, String>>() {
196
197 @Override
198 public Map<String, String> call()
199 throws Exception {
200 return party.getParty()
201 .negotiationEnded(agreement);
202 }
203 });
204
205 if (result != null && !result.isEmpty()) {
206 notifyChange(new AgentLogEvent(party.getID().toString(),
207 result));
208 }
209 } catch (ExecutionException | TimeoutException e1) {
210 notifyChange(
211 new RecoverableSessionErrorEvent(session, party, e1));
212 }
213 }
214 }
215
216 /**
217 * execute main loop (using the protocol's round structure). do
218 * before-session stuff. Then Run main loop till protocol is finished or
219 * deadline is reached. Then do after-session stuff.
220 *
221 * @throws InvalidActionError
222 * when a party did an invalid action
223 * @throws InterruptedException
224 * when a party was interrupted
225 * @throws ExecutionException
226 * when a party threw a exception
227 * @throws NegotiationPartyTimeoutException
228 * when a party timed out
229 */
230 private void executeProtocol() throws ActionException, InterruptedException,
231 ExecutionException, NegotiationPartyTimeoutException {
232 session.startTimer();
233
234 handleBeforeSession();
235
236 do {
237
238 // generate new round
239 Round round = session.getInfo().getProtocol()
240 .getRoundStructure(parties, session);
241
242 // add round to session
243 session.startNewRound(round);
244
245 if (checkDeadlineReached())
246 break;
247 int turnNumber = 0;
248
249 // Let each party do an action
250 for (Turn turn : round.getTurns()) {
251 if (checkDeadlineReached())
252 break;
253 // for each party, set the round-based timeline again (to avoid
254 // tempering)
255 if (session.getTimeline() instanceof DiscreteTimeline) {
256 ((DiscreteTimeline) session.getTimeline())
257 .setcRound(session.getRoundNumber());
258 }
259
260 turnNumber++;
261 doPartyTurn(turnNumber, turn);
262
263 // Do not start new turn in current round if protocol is
264 // finished at this point
265 if (session.getInfo().getProtocol().isFinished(session,
266 parties)) {
267 break;
268 }
269 }
270 if (checkDeadlineReached())
271 break;
272
273 } while (!session.getInfo().getProtocol().isFinished(session, parties)
274 && !checkDeadlineReached());
275
276 // stop timers if running
277 if (session.isTimerRunning())
278 session.stopTimer();
279
280 // post session protocol call
281 session.getInfo().getProtocol().afterSession(session, parties);
282 }
283
284 /**
285 * Handle the before-session information to be sent to the parties.
286 *
287 * @throws NegotiationPartyTimeoutException
288 * @throws ExecutionException
289 * @throws InterruptedException
290 * @throws TimeoutException
291 */
292 private void handleBeforeSession() throws NegotiationPartyTimeoutException,
293 ExecutionException, InterruptedException {
294 List<NegotiationParty> negoparties = new ArrayList<NegotiationParty>();
295 for (NegotiationPartyInternal party : partiesInternal) {
296 negoparties.add(party.getParty());
297 }
298
299 Map<NegotiationParty, List<Action>> preparatoryActions = session
300 .getInfo().getProtocol().beforeSession(session, negoparties);
301
302 for (final NegotiationParty party : preparatoryActions.keySet()) {
303 for (final Action act : preparatoryActions.get(party)) {
304 try {
305 executor.execute(getPartyID(party).toString(),
306 new Callable<Object>() {
307 @Override
308 public Object call() throws Exception {
309 party.receiveMessage(null, act);
310 return null;
311 }
312 });
313 } catch (TimeoutException e) {
314 throw new NegotiationPartyTimeoutException(party,
315 "party timed out in the before-session update", e);
316 }
317 }
318
319 }
320 }
321
322 /**
323 * Let a party decide for an action and create events for the taken action.
324 *
325 * @param turnNumber
326 * @param turn
327 * a party's {@link Turn}.
328 * @throws InvalidActionError
329 * @throws InterruptedException
330 * @throws ExecutionException
331 * @throws NegotiationPartyTimeoutException
332 */
333 private void doPartyTurn(int turnNumber, Turn turn)
334 throws ActionException, InterruptedException, ExecutionException,
335 NegotiationPartyTimeoutException {
336 NegotiationParty party = turn.getParty();
337 Action action = requestAction(party, turn.getValidActions());
338 turn.setAction(action);
339 actions.add(action);
340 updateListeners(party, action);
341 notifyChange(new MultipartyNegoActionEvent(action,
342 session.getRoundNumber(), session.getTurnNumber(),
343 session.getTimeline().getTime(), partiesInternal,
344 session.getInfo().getProtocol().getCurrentAgreement(session,
345 parties)));
346 }
347
348 private boolean checkDeadlineReached() {
349 // look at the time, if this is over time, remove last round and count
350 // previous round
351 // as most recent round
352 if (session.isDeadlineReached()) {
353 System.out.println("Deadline reached. " + session.getDeadlines());
354 session.removeLastRound();
355 if (session.getDeadlines().getType() == DeadlineType.TIME) {
356 double runTimeInSeconds = (Integer) session.getDeadlines()
357 .getValue();
358 session.setRuntimeInSeconds(runTimeInSeconds);
359 }
360 return true;
361 }
362 return false;
363 }
364
365 /**
366 * Request an {@link Action} from the
367 * {@link genius.core.parties.NegotiationParty} given a list of valid
368 * actions and apply it according to
369 * {@link MultilateralProtocol#applyAction(Action, Session)}
370 *
371 * @param party
372 * The party to request an action of
373 * @param validActions
374 * the actions the party can choose
375 * @return the chosen action-
376 * @throws TimeoutException
377 */
378 private Action requestAction(final NegotiationParty party,
379 final List<Class<? extends Action>> validActions)
380 throws ActionException, InterruptedException, ExecutionException,
381 NegotiationPartyTimeoutException {
382
383 Action action;
384 try {
385 action = executor.execute(getPartyID(party).toString(),
386 new Callable<Action>() {
387 @Override
388 public Action call() throws Exception {
389 ArrayList<Class<? extends Action>> possibleactions = new ArrayList<Class<? extends Action>>();
390 possibleactions.addAll(validActions);
391 return party.chooseAction(possibleactions);
392 }
393 });
394 } catch (TimeoutException e) {
395 String msg = "Negotiating party " + getPartyID(party)
396 + " timed out in chooseAction() method.";
397 throw new NegotiationPartyTimeoutException(party, msg, e);
398 }
399
400 checkAction(party, action, validActions);
401 // execute action according to protocol
402 session.getInfo().getProtocol().applyAction(action, session);
403
404 // return the chosen action
405 return action;
406 }
407
408 /**
409 * Check if the action ID has been filled in properly and contains an action
410 * from the list of valid actions. No action details are checked as this is
411 * protocol dependent.
412 *
413 * @param validActions
414 * the allowed/valid actions
415 *
416 * @return the given action if it is found ok
417 * @throws InvalidActionContentsError
418 * @throws InvalidActionError
419 *
420 */
421 private Action checkAction(NegotiationParty party, Action action,
422 List<Class<? extends Action>> validActions) throws ActionException {
423 if (action == null || action.getAgent() == null
424 || !validActions.contains(action.getClass())) {
425 throw new InvalidActionError(party, validActions, action);
426 }
427 if (!action.getAgent().equals(getPartyID(party))) {
428 throw new InvalidActionContentsError(getPartyID(party),
429 "partyID " + getPartyID(party)
430 + " does not match agentID in action"
431 + action.getAgent());
432 }
433 return action;
434 }
435
436 /**
437 * Update all {@link NegotiationParty}s in the listeners map with the new
438 * action. Has to be here since interface (correctly) does not deal with
439 * implementation details
440 *
441 * @param actionOwner
442 * The Party that initiated the action
443 * @param action
444 * The action it did.
445 */
446 private void updateListeners(final NegotiationParty actionOwner,
447 final Action action) throws NegotiationPartyTimeoutException,
448 ExecutionException, InterruptedException {
449 Map<NegotiationParty, List<NegotiationParty>> listeners = session
450 .getInfo().getProtocol().getActionListeners(parties);
451
452 if (listeners == null)
453 return;
454
455 // if anyone is listening, notify any and all observers
456 if (listeners.get(actionOwner) != null)
457 for (final NegotiationParty observer : listeners.get(actionOwner)) {
458 try {
459 executor.execute(getPartyID(actionOwner).toString(),
460 new Callable<Object>() {
461 @Override
462 public Object call() {
463 observer.receiveMessage(
464 getPartyID(actionOwner), action);
465 return null;
466 }
467 });
468 } catch (TimeoutException e) {
469 String msg = String.format(
470 "Negotiating party %s timed out in receiveMessage() method.",
471 getPartyID(observer));
472 throw new NegotiationPartyTimeoutException(observer, msg,
473 e);
474 }
475 }
476 }
477
478 /**
479 * @param party
480 * the NegotiationParty
481 * @return the {@link NegotiationPartyInternal} that contains this party.
482 */
483 private NegotiationPartyInternal getNegoPartyInternal(
484 NegotiationParty needed) {
485 for (NegotiationPartyInternal party : partiesInternal) {
486 if (party.getParty() == needed) {
487 return party;
488 }
489 }
490 throw new IllegalStateException("The referenced NegotiationParty "
491 + needed + " is not one of the actual running parties.");
492 }
493
494 /**
495 * @param party
496 * a {@link NegotiationParty}
497 * @return the agent ID of this party.
498 */
499 private AgentID getPartyID(NegotiationParty party) {
500 return getNegoPartyInternal(party).getID();
501 }
502
503}
Note: See TracBrowser for help on using the repository browser.