package geniusweb.protocol.session.mopac2; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Timer; import java.util.TimerTask; import java.util.logging.Level; import java.util.stream.Collectors; import geniusweb.actions.Action; import geniusweb.actions.EndNegotiation; import geniusweb.actions.Offer; import geniusweb.actions.PartyId; import geniusweb.actions.Votes; import geniusweb.deadline.Deadline; import geniusweb.events.ProtocolEvent; import geniusweb.inform.Agreements; import geniusweb.inform.Finished; import geniusweb.inform.Inform; import geniusweb.inform.Settings; import geniusweb.inform.Voting; import geniusweb.inform.YourTurn; import geniusweb.issuevalue.Bid; import geniusweb.progress.ProgressFactory; import geniusweb.protocol.CurrentNegoState; import geniusweb.protocol.ProtocolException; import geniusweb.protocol.partyconnection.ProtocolToPartyConn; import geniusweb.protocol.partyconnection.ProtocolToPartyConnFactory; import geniusweb.protocol.partyconnection.ProtocolToPartyConnections; import geniusweb.protocol.session.SessionProtocol; import geniusweb.protocol.session.SessionSettings; import geniusweb.protocol.session.mopac.phase.Phase; import geniusweb.references.Parameters; import geniusweb.references.PartyWithProfile; import geniusweb.references.ProfileRef; import geniusweb.references.ProtocolRef; import geniusweb.references.Reference; import tudelft.utilities.listener.DefaultListenable; import tudelft.utilities.logging.Reporter; import tudelft.utilities.repository.NoResourcesNowException; /** * * All parties are first sent the {@link SessionSettings}. Only parties * specified initially in the settings do participate (no late joining in) * *

parameter

MOPAC parties can receive a parameter power * containing a Integer. This parameter is picked up by this protocol, for * handling the agreement extraction from the offers. Power=1 for parties that * do not have the power parameter. *

Protocol steps

* *
    *
  1. The protocol tries to start all parties. If not all parties start, the * parties are freed up and another attempt is done to start all parties some * time later. *
  2. The session deadline clock now starts ticking. *
  3. remainingparties are sent their settings. *
  4. Loop until {@link Deadline} is reached or |remainingparties| ≤ 2: *
      *
    1. protocol sends {@link YourTurn} to all remainingparties. Each party now * must submit an {@link Offer} within {@link Phase#PHASE_MAXTIME} seconds. If a * party fails to submit it is removed from from remainingparties. *
    2. protocol sends a {@link Voting} containing a List of Bid containing all * received {@link Bid}s. Each party must place his {@link Votes} within the * provided deadline. If a party does not submit, it is terminated and removed * from remainingparties. Previous votes for the same bid do not count. But see * {@link Agreements}. A party that misbehaves after submitting its vote is * removed but it Votes remain standing. *
    3. protocol sends to all remainingparties a OptIn containing all received * votes. Each party now must submit again a {@link Votes} object within the * deadline. This new Votes must equal or extend the previous Votes of the * party. *
    4. The protocol uses the {@link Phase#getEvaluator()} to determine the * agreements from the votes. If there are agreements, the parties that placed * these votes reached an agreement. They are moved to the agreement set and * removed from the remainingparties. *
    *
  5. Any remaining parties are sent a {@link Finished} object without * agreement bid and are terminated. *
*

* A party can send {@link EndNegotiation} at any time to remove itself from the * negotiation. The other parties may continue anyawy without him. *

* Parties receive the {@link Finished} information at the very end when all * agreements have been collected. *

* This logs to "Protocol" logger if there are issues *

* This object is mutable: the internal state changes as parties interact with * the protocol. *

* Threadsafe: all entrypoints are synhronized. */ public class MOPAC2 extends DefaultListenable implements SessionProtocol { /** * Tech notes. 0. Everything here is phase driven and runs due to callback * from parties. 1. Connect all parties. 2. If any party fails at any time, * add it to exceptions list in the state 3. Run timer to check the phase at * possible end time. The timer just checks the state, and therefore it does * no harm if it is double checked. 4. We run phases always with either * PHASE_TIME or time to the global deadline, whichever comes first. * Therefore phases naturally end at end of nego. 5. Phases must have at * least {@link #MINDURATION} otherwise it makes no sense to even start it. */ private static final int MINDURATION = 100; private static final int MIN_SLEEP_TIME = 1000; private static final int MAX_SLEEP_TIME = 60000; private static final ProtocolRef MOPAC2 = new ProtocolRef("MOPAC2"); private final Reporter log; private MOPAC2State state = null; // mutable! /** * the existing party connections. we assume ownership of this so it should * not be modified although connections may of course break. mutable! */ private ProtocolToPartyConnections connections = new ProtocolToPartyConnections( Collections.emptyList()); /** * Set to true after all is done: we sent final outcomes. This is needed * because we can't tell from state what we did at the very end. */ private boolean finished = false; private Timer timer = null; /** * * @param state normally the initial state coming from SAOPSettings * @param logger the {@link Reporter} to use */ public MOPAC2(MOPAC2State state, Reporter logger) { if (state == null) { throw new NullPointerException("state must be not null"); } if (state.getSettings().getDeadline().getDuration() < MINDURATION) { throw new IllegalArgumentException( "Duration must be at least " + MINDURATION); } if (logger == null) { throw new NullPointerException("Logger must be not null"); } this.log = logger; this.state = state; } @Override public synchronized void start( ProtocolToPartyConnFactory connectionfactory) { try { connect(connectionfactory); } catch (Throwable e) { /** We can't {@link #handleError} yet. FIXME */ throw new RuntimeException("Failed to connect", e); } long now = System.currentTimeMillis(); state = state.initPhase( ProgressFactory.create(state.getSettings().getDeadline(), now), now); setupParties(now); startPhase(now); } @Override public String getDescription() { return "All parties get YourTurn. They now can submit their Offer within 30 seconds. " + "Next they receive a Voting. They can now submit their Votes." + "Then they receive a a OptIn, now they can widen up their Votes." + "If one of their Vote succeeds, they finish with an agreement. " + "The VotingEvaluator setting determines the exact voting behaviour " + "and if this process repeats"; } @Override public void addParticipant(PartyWithProfile party) throws IllegalStateException { throw new IllegalStateException( "Dynamic joining a negotiation is not supported in AMOP"); } @Override public MOPAC2State getState() { return state; } @Override public ProtocolRef getRef() { return MOPAC2; } /******************************************************************* * private functions. Some are protected only, for testing purposes ********************************************************************/ /** * step 1 in protocol: connect all involved parties and start the clock. * This always "succeeds" with a valid (but possibly final) state *

* This is 'protected' to allow junit testing, this code is not a 'public' * part of the interface. * * @param connectionfactory the connectionfactory for making party * connections * * @throws InterruptedException if the connection procedure is unterrupted * * @throws IOException if this fails to properly conect to the * parties, eg interrupted or server not * responding.. */ protected synchronized void connect( ProtocolToPartyConnFactory connectionfactory) throws InterruptedException, IOException { List participants = state.getSettings() .getAllParties(); List parties = participants.stream() .map(parti -> (parti.getParty().getPartyRef())) .collect(Collectors.toList()); log.log(Level.INFO, "MOPAC connect " + parties); List newconnections = null; while (newconnections == null) { try { newconnections = connectionfactory.connect(parties); } catch (NoResourcesNowException e) { long waitms = e.getLater().getTime() - System.currentTimeMillis(); log.log(Level.INFO, "No resources available to run session, waiting" + waitms); Thread.sleep(Math.min(MAX_SLEEP_TIME, Math.max(MIN_SLEEP_TIME, waitms))); } } for (int i = 0; i < participants.size(); i++) { ProtocolToPartyConn conn = newconnections.get(i); connections = connections.with(conn); state = state.with(conn.getParty(), participants.get(i)); } } /** * step 2 in protocol: listen to connections and send settings to the * parties. *

* This is 'protected' to allow junit testing, this code is not a 'public' * part of the interface. * * @param now the current time. */ protected synchronized void setupParties(long now) { for (ProtocolToPartyConn conn : connections) { conn.addListener(action -> actionRequest(conn, action, now)); } for (ProtocolToPartyConn connection : connections) { try { sendSettings(connection); } catch (IOException e) { state = state.with(new ProtocolException("Failed to initialize", connection.getParty())); } } } /** * Send the {@link Inform} for the current phase to the remaining parties * and start the time-out checker */ private void startPhase(long now) { Inform info = state.getPhase().getInform(); for (PartyId pid : state.getPhase().getPartyStates().getNotYetActed()) { try { connections.get(pid).send(info); } catch (IOException e) { state = state.with(new ProtocolException( "Party seems to have disconnected", pid, e)); } } startTimer(1 + state.getPhase().getDeadline() - now); } /** * Check at given deadline if we already ended the phase. * * @param deadn the deadline */ private void startTimer(long deadln) { if (timer != null) throw new IllegalStateException("Timer is still running!"); timer = new Timer("deadlinetimer"); timer.schedule(new TimerTask() { @Override public void run() { checkEndPhase(System.currentTimeMillis()); } }, deadln); } /** * Inform a party about its settings * * @param connection * @throws IOException if party got disconnected */ private synchronized void sendSettings(ProtocolToPartyConn connection) throws IOException { PartyId partyid = connection.getParty(); ProfileRef profile = state.getPartyProfiles().get(partyid).getProfile(); Parameters params = state.getPartyProfiles().get(partyid).getParty() .getParameters(); connection.send(new Settings(connection.getParty(), profile, getRef(), state.getProgress(), params)); } /** * This is called when one of the {@link ProtocolToPartyConn}s does an * action. Synchronized so that we always handle only 1 action at a time. * * @param partyconn the connection on which the action came in * @param action the {@link Action} taken by some party * @param now current time */ protected synchronized void actionRequest( final ProtocolToPartyConn partyconn, final Action action, long now) { if (finished) return; state = state.with(partyconn.getParty(), action, now); checkEndPhase(System.currentTimeMillis()); } /** * The current phase may be completed. We check, because it may already been * handled. Proceed to next phase as needed. Reset the deadline timers and * inform parties. Increase progress if necessary. Must only be called * through {@link #endPhase} to ensure this is called only once. * */ private synchronized void checkEndPhase(long now) { if (!state.getPhase().isFinal(now)) return; // phase indeed ended. Check what's up. if (timer != null) { timer.cancel(); timer = null; } state = state.finishPhase(); if (state.isFinal(now)) { endNegotiation(); return; } state = state.nextPhase(now); startPhase(now); } /** * To be called when we reach final state. Must only be called if * {@link MOPAC2State#isFinal(long)}. Send finished info to all parties. * Double calls are automatically ignored using the global finished flag. */ private synchronized void endNegotiation() { if (finished) return; finished = true; Finished info = new Finished(state.getAgreements()); for (ProtocolToPartyConn conn : connections) { try { conn.send(info); conn.close(); } catch (Exception e) { log.log(Level.INFO, "Failed to send Finished to " + conn, e); } } notifyListeners(new CurrentNegoState(state)); } }