package geniusweb.protocol.session.amop; import java.io.IOException; import java.sql.Date; import java.text.SimpleDateFormat; import java.util.List; import java.util.Map; import java.util.logging.Level; import java.util.stream.Collectors; import geniusweb.actions.Action; import geniusweb.actions.EndNegotiation; import geniusweb.actions.Offer; import geniusweb.actions.PartyId; import geniusweb.actions.Vote; import geniusweb.actions.Votes; import geniusweb.deadline.Deadline; import geniusweb.events.ProtocolEvent; import geniusweb.inform.ActionDone; import geniusweb.inform.Agreements; import geniusweb.inform.Finished; import geniusweb.inform.Inform; import geniusweb.inform.Settings; import geniusweb.inform.Voting; import geniusweb.inform.YourTurn; import geniusweb.issuevalue.Bid; import geniusweb.progress.ProgressFactory; import geniusweb.protocol.CurrentNegoState; import geniusweb.protocol.ProtocolException; import geniusweb.protocol.partyconnection.ProtocolToPartyConn; import geniusweb.protocol.partyconnection.ProtocolToPartyConnFactory; import geniusweb.protocol.partyconnection.ProtocolToPartyConnections; import geniusweb.protocol.session.SessionProtocol; import geniusweb.protocol.session.SessionSettings; import geniusweb.protocol.session.amop.AMOPState.Phase; import geniusweb.references.Parameters; import geniusweb.references.PartyWithProfile; import geniusweb.references.ProfileRef; import geniusweb.references.ProtocolRef; import geniusweb.references.Reference; import tudelft.utilities.listener.DefaultListenable; import tudelft.utilities.logging.Reporter; import tudelft.utilities.repository.NoResourcesNowException; /** * * All parties are first sent the {@link SessionSettings}. Only parties * specified initially in the settings do participate. * *

parameter

AMOP parties can receive a parameter: minVotes containing * a Double. If set, the AMOP protocol checks that all {@link Votes} from that * party have {@link Vote#getMinPower()} > minVotes. *

Protocol steps

*
    *
  1. The protocol tries to start all parties. If not all parties start, the * parties are freed up and another attempt is done to start all parties some * time later. *
  2. the variable remainingparties = {all parties} *
  3. The session deadline clock now starts ticking. *
  4. All parties are sent their settings. *
  5. Loop until {@link Deadline} is reached or |remainingparties|<2: *
      *
    1. protocol sends {@link YourTurn} to all remainingparties. Each party now * must submit an {@link Offer} within {@link #PHASE_TIME} seconds. If a party * fails to submit it is send Finished and removed from from remainingparties. *
    2. protocol sends a {@link Voting} containing a List of {@link Bid} * containing all received {@link Bid}s. Each party must place his {@link Votes} * within {@link #PHASE_TIME} seconds. If a party does not submit, it is send a * {@link Finished} and removed from remainingparties. Previous votes for the * same bid do not count. But see {@link Agreements}. *
    3. The protocol determines a maximum sized subset of size N≥2 of the * votes for bid B for which the vote conditions hold. If there is such a * subset, the parties that placed these votes reached an agreement. They are * added to the agreement set, sent the {@link Finished} info and terminated. * They are removed from the remainingparties. *
    *
  6. Any remaining parties are sent a {@link Finished} object without * agreement bid and are terminated. *
*

* A party can send {@link EndNegotiation} at any time to remove itself from the * negotiation. The other parties may continue anyawy without him. *

* Parties receive the {@link Finished} information at the very end when all * agreements have been collected. *

* This logs to "Protocol" logger if there are issues *

* This object is mutable: the internal state changes as parties interact with * the protocol. *

* Threadsafe: all entrypoints are synhronized. */ public class AMOP extends DefaultListenable implements SessionProtocol { private static final long PHASE_TIME = 30000; // millis private static final long TIME_MARGIN = 20l;// ms extra delay after deadline private static final int MINDURATION = 100; private static final int MIN_SLEEP_TIME = 1000; private static final int MAX_SLEEP_TIME = 60000; private static final ProtocolRef AMOP = new ProtocolRef("AMOP"); private final Reporter log; private AMOPState state = null; // mutable! private volatile WillBeCalled finish = new WillBeCalled(() -> finish(), null); private volatile WillBeCalled endPhase = new WillBeCalled(() -> endPhase(), null); /** * * @param state normally the initial state coming from SAOPSettings * @param logger the {@link Reporter} to use */ public AMOP(AMOPState state, Reporter logger) { if (state == null) { throw new NullPointerException("state must be not null"); } if (state.getSettings().getDeadline().getDuration() < MINDURATION) { throw new IllegalArgumentException( "Duration must be at least " + MINDURATION); } if (logger == null) { throw new NullPointerException("Logger must be not null"); } this.log = logger; this.state = state; } @Override public synchronized void start( ProtocolToPartyConnFactory connectionfactory) { try { // we're in Phase.INIT still connect(connectionfactory); // this moves to setDeadline(); setupParties(); endPhase.complete(); // end of INIT } catch (Throwable e) { handleError("Failed to start up session", new PartyId("UNKNOWN"), e); } } @Override public String getDescription() { return "All parties get YourTurn. They now can submit their Offer within 30 seconds. " + "Next they receive a ElicitComparison. They can now submit their Votes." + "If one of their Vote succeeds, they finish with an agreement. " + "Parties that did not yet reach agreement continue negotiating. " + "A parameter 'minVotes', if set, contains the the smallest value to be used in each Vote"; } @Override public void addParticipant(PartyWithProfile party) throws IllegalStateException { throw new IllegalStateException( "Dynamic joining a negotiation is not supported in AMOP"); } @Override public AMOPState getState() { return state; } @Override public ProtocolRef getRef() { return AMOP; } /******************************************************************* * private functions. Some are protected only, for testing purposes ********************************************************************/ /** * step 1 in protocol: connect all involved parties and start the clock. * This always "succeeds" with a valid (but possibly final) state *

* This is 'protected' to allow junit testing, this code is not a 'public' * part of the interface. * * @param connectionfactory the connectionfactory for making party * connections * * @throws InterruptedException if the connection procedure is unterrupted * * @throws IOException if this fails to properly conect to the * parties, eg interrupted or server not * responding.. */ protected synchronized void connect( ProtocolToPartyConnFactory connectionfactory) throws InterruptedException, IOException { List participants = state.getSettings() .getAllParties(); List parties = participants.stream() .map(parti -> (parti.getParty().getPartyRef())) .collect(Collectors.toList()); List connections = null; log.log(Level.INFO, "AMOP connect " + parties); while (connections == null) { try { connections = connectionfactory.connect(parties); } catch (NoResourcesNowException e) { long waitms = e.getLater().getTime() - System.currentTimeMillis(); log.log(Level.INFO, "No resources available to run session, waiting" + waitms); Thread.sleep(Math.min(MAX_SLEEP_TIME, Math.max(MIN_SLEEP_TIME, waitms))); } } for (int i = 0; i < participants.size(); i++) { setState(this.state.with(connections.get(i), participants.get(i))); } } /** * overridable factory method, used for testing. * * @param r the {@link Runnable} that will be called * @param delayMs the time after which r will be called * @return new WillBeCalled(r,delayms) */ protected WillBeCalled createWillBeCalled(Runnable r, Long delayMs) { return new WillBeCalled(r, delayMs); } /** * Set state to proper deadline. Starts the timer tasks. This tasks triggers * a call to handleError when the session times out. */ private void setDeadline() { long now = System.currentTimeMillis(); Deadline deadline = state.getSettings().getDeadline(); setState(state.with(ProgressFactory.create(deadline, now))); finish = createWillBeCalled(() -> finish(), TIME_MARGIN + deadline.getDuration()); log.log(Level.INFO, "AMOP deadline set to " + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") .format(new Date(System.currentTimeMillis() + deadline.getDuration() + TIME_MARGIN))); } /** * step 2 in protocol: listen to connections and send settings to the * parties. *

* This is 'protected' to allow junit testing, this code is not a 'public' * part of the interface. * * @throws ProtocolException if a party does not follow the protocol */ protected synchronized void setupParties() throws ProtocolException { for (ProtocolToPartyConn conn : state.getConnections()) { conn.addListener(action -> actionRequest(conn, action)); } for (ProtocolToPartyConn connection : state.getConnections()) { try { sendSettings(connection); } catch (IOException e) { throw new ProtocolException("Failed to initialize", connection.getParty(), e); } } } /** * Inform a party about its settings * * @param connection * @throws IOException if party got disconnected */ private synchronized void sendSettings(ProtocolToPartyConn connection) throws IOException { PartyId partyid = connection.getParty(); ProfileRef profile = state.getPartyProfiles().get(partyid).getProfile(); Parameters params = state.getPartyProfiles().get(partyid).getParty() .getParameters(); if (profile == null) { throw new IllegalArgumentException( "Missing profile for party " + connection.getReference()); } connection.send(new Settings(connection.getParty(), profile, getRef(), state.getProgress(), params)); } /** * This is called when one of the {@link ProtocolToPartyConn}s does an * action. Synchronized so that we always handle only 1 action at a time. * * @param partyconn the connection on which the action came in * @param action the {@link Action} taken by some party */ protected synchronized void actionRequest( final ProtocolToPartyConn partyconn, final Action action) { if (action == null) { Throwable err = partyconn.getError(); if (err == null) { err = new ProtocolException("Party sent a null action", partyconn.getParty()); } handleError(partyconn + "Protocol error", partyconn.getParty(), err); return; } try { setState(state.with(partyconn.getParty(), action)); } catch (Throwable e) { handleError("failed to handle action " + action, partyconn.getParty(), e); } // we ignore broadcast errors here, may need to be fixed. broadcast(new ActionDone(action)); if (state.isAllPartiesActed()) { endPhase.complete(); } } /** * The current phase is completed. Proceed to next phase as needed. Reset * the deadline timers and inform parties. Increase progress if necessary. * Must only be called through {@link #endPhase} to ensure this is called * only once. */ private synchronized void endPhase() { if (state.isFinal(System.currentTimeMillis())) return; List bids = null; if (state.getPhase() == Phase.OFFER) { bids = state.getPhaseActions().values().stream() .map(act -> ((Offer) act)).collect(Collectors.toList()); } setState(state.nextPhase()); if (state.getPhase() == Phase.OFFER) { broadcast(new YourTurn()); } else { Map powers = state.getActiveParties().stream() .collect(Collectors.toMap(pid -> pid, pid -> 1)); broadcast(new Voting(bids, powers)); } /** * Really tricky but we can safely replace {@link #endPhase}: all * functions are synchronized so the state is always consistent when * this is called. MNMMM Yes and then?? need proof that not called right * after replace. I think we need proof that #endPhase is always * consistent with state , that phase in state is coupled */ endPhase = createWillBeCalled(() -> endPhase(), PHASE_TIME + TIME_MARGIN); } /** * Alternative to {@link ProtocolToPartyConnections#broadcast(Inform)}, * sending only to active parties * * @param info the info to broadcast */ private void broadcast(Inform info) { for (PartyId party : state.getActiveParties()) { try { state.getConnections().get(party).send(info); } catch (IOException e) { handleError("Party seems to have disconnected", party, e); } } } /** * Update state to include the given error and closes the party. * * @param message The message to attach to the error * @param party the party where the error occured * @param e the exception that occured. */ private synchronized void handleError(final String message, final PartyId partyid, final Throwable e) { if (e instanceof ProtocolException) { setState(state.with(partyid, (ProtocolException) e)); } else { setState(state.with(partyid, new ProtocolException(message, partyid, e))); } log.log(Level.WARNING, "AMOP intercepted error from party " + partyid + ":" + message, e); } /** * Sets the new state. If the new state is final, the finish-up procedure is * executed. All state changes go through here to ensure we check isFinal * before any state change. * * @param newstate the new state. */ private synchronized void setState(AMOPState newstate) { long now = System.currentTimeMillis(); if (state.isFinal(now)) { finish.complete(); return; } this.state = newstate; if (newstate.isFinal(now)) { finish.complete(); } } /** * Called when we reach final state. Must be called through {@link #finish} * to ensure this is called only once. Send finished info to all parties, * notify current nego state as final and set {@link #isFinishedInfoSent}. * Double calls are automatically ignored. */ private synchronized void finish() { Finished finished = new Finished(state.getAgreements()); for (ProtocolToPartyConn conn : state.getConnections()) { try { conn.send(finished); conn.close(); } catch (Exception e) { log.log(Level.INFO, "Failed to send Finished to " + conn, e); } } notifyListeners(new CurrentNegoState(state)); } }