package geniusweb.protocol.session.shaop; import java.io.IOException; import java.sql.Date; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.List; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.stream.Collectors; import geniusweb.actions.Accept; import geniusweb.actions.Action; import geniusweb.actions.Comparison; import geniusweb.actions.ElicitComparison; import geniusweb.actions.EndNegotiation; import geniusweb.actions.Offer; import geniusweb.actions.PartyId; import geniusweb.deadline.Deadline; import geniusweb.events.ProtocolEvent; import geniusweb.inform.ActionDone; import geniusweb.inform.Finished; import geniusweb.inform.Inform; import geniusweb.inform.Settings; import geniusweb.inform.YourTurn; import geniusweb.progress.ProgressFactory; import geniusweb.protocol.CurrentNegoState; import geniusweb.protocol.ProtocolException; import geniusweb.protocol.partyconnection.ProtocolToPartyConn; import geniusweb.protocol.partyconnection.ProtocolToPartyConnFactory; import geniusweb.protocol.session.SessionProtocol; import geniusweb.protocol.session.SessionState; import geniusweb.protocol.session.TeamInfo; import geniusweb.references.Parameters; import geniusweb.references.PartyWithProfile; import geniusweb.references.ProfileRef; import geniusweb.references.ProtocolRef; import geniusweb.references.Reference; import tudelft.utilities.listener.DefaultListenable; import tudelft.utilities.logging.Reporter; import tudelft.utilities.repository.NoResourcesNowException; /** * * This is similar to SAOP but there are two types of parties: a SHAOP party and * an a COB party associated with each SHAOP party. *

* Each SHAOP must receive a parameter: elicitationcost containing a Double. A * party of this type can execute a RequestComparison(userid) action, which * increases the total bother with the elicitationcost. The call results in the * associated COB party to execute a Comparison action. If this parameter is not * set, the {@link SHAOPState#DEFAULT_ELICITATATION_COST} is used. *

* A SHAOP party keeps the turn until it does an action that is not a * {@link ElicitComparison}. A COB party also receives all ActionDone if a party * does an action. The party takes an action only after CompareWithBid is * received. It reacts with a Comparison action. *

* The Comparison/ComparisonTest actions are private between the * {@link TeamInfo}. A SHAOP party can do a {@link ElicitComparison} action at * any time. These actions are completely transparent to the other parties and * are not dependent or influencing the normal turn taking. */ public class SHAOP extends DefaultListenable implements SessionProtocol { /** * Tech note: We can not just extend SAOP because SHAOPState and * SHAOPSettings do not extend SAOPState and SAOPSettings */ public static final int TIME_MARGIN = 20;// ms extra delay after deadline public static final int MINDURATION = 100; public static final int MIN_SLEEP_TIME = 1000; public static final int MAX_SLEEP_TIME = 60000; private static final ProtocolRef SHAOP = new ProtocolRef("SHAOP"); private final Reporter log; private SHAOPState state = null; // mutable! private volatile AtomicBoolean isFinishedInfoSent = new AtomicBoolean( false); private volatile Timer deadlinetimer = null; private List ALLOWED_ACTIONS = Arrays.asList(Offer.class, ElicitComparison.class, Accept.class, EndNegotiation.class); /** * * @param state normally the initial state coming from SAOPSettings * @param logger the {@link Reporter} to use */ public SHAOP(SHAOPState state, Reporter logger) { if (state == null) { throw new NullPointerException("state must be not null"); } if (state.getSettings().getDeadline().getDuration() < MINDURATION) { throw new IllegalArgumentException( "Duration must be at least " + MINDURATION); } this.log = logger; this.state = state; } @Override public synchronized void start( ProtocolToPartyConnFactory connectionfactory) { try { connect(connectionfactory); setDeadline(); setupParties(); nextTurn(); } catch (Throwable e) { handleError("Failed to start up session", null, e); } } @Override public String getDescription() { return "We iterate through all PartiesTuple's, first the shaop party and then the cob party. Each gets " + "YourTurn in clockwise order, after which they can do their next action. " + "No new participants after start. End after prescribed deadline or when some bid is unanimously Accepted." + "Parties can only act on their own behalf and only when it is their turn." + "COB requests and actions are stored in the actions list but not broadcasted." + "A SHAOP party keeps the turn until it does a non-COB request (Accept, Offer, EndNegotiation)."; } @Override public void addParticipant(PartyWithProfile party) throws IllegalStateException { throw new IllegalStateException( "Dynamic joining a negotiation is not allowed in SHAOP"); } @Override public SessionState getState() { return state; } @Override public ProtocolRef getRef() { return SHAOP; } /******************************************************************* * private functions. Some are protected only, for testing purposes ********************************************************************/ /** * step 1 in protocol: connect all involved parties and start the clock. * This always "succeeds" with a valid (but possibly final) state *

* This is 'protected' to allow junit testing, this code is not a 'public' * part of the interface. * * @param connectionfactory the connectionfactory for making party * connections * * @throws InterruptedException if the connection procedure is unterrupted * * @throws IOException if this fails to properly conect to the * parties, eg interrupted or server not * responding.. */ protected synchronized void connect( ProtocolToPartyConnFactory connectionfactory) throws InterruptedException, IOException { List participants = state.getSettings().getTeams() .stream().map(team -> team.getParties()) .flatMap(List::stream).collect(Collectors.toList()); List parties = participants.stream() .map(parti -> (parti.getParty().getPartyRef())) .collect(Collectors.toList()); List connections = null; log.log(Level.INFO, "SHAOP connect " + parties); while (connections == null) { try { connections = connectionfactory.connect(parties); } catch (NoResourcesNowException e) { long waitms = e.getLater().getTime() - System.currentTimeMillis(); log.log(Level.INFO, "No resources available to run session, waiting" + waitms); Thread.sleep(Math.min(MAX_SLEEP_TIME, Math.max(MIN_SLEEP_TIME, waitms))); } } for (int i = 0; i < participants.size(); i++) { setState(this.state.with(connections.get(i))); } } /** * Set state to proper deadline. Starts the timer tasks. This tasks triggers * a call to handleError when the session times out. */ private synchronized void setDeadline() { long now = System.currentTimeMillis(); Deadline deadline = state.getSettings().getDeadline(); setState(state.with(ProgressFactory.create(deadline, now))); deadlinetimer = new Timer(); TimerTask task = new TimerTask() { @Override public void run() { if (!state.isFinal(System.currentTimeMillis())) { log.log(Level.SEVERE, "BUG. Deadline timer has triggered but state is not final"); } log.log(Level.INFO, "SHAOP deadline reached. Terminating session."); finish(); } }; // set timer TIME_MARGIN after real deadline to ensure we're not too // early deadlinetimer.schedule(task, deadline.getDuration() + TIME_MARGIN); log.log(Level.INFO, "SHAOP deadline set to " + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") .format(new Date(System.currentTimeMillis() + deadline.getDuration() + TIME_MARGIN))); } /** * step 2 in protocol: listen to connections and send settings to the * parties. *

* This is 'protected' to allow junit testing, this code is not a 'public' * part of the interface. * * @throws ProtocolException if a party does not follow the protocol */ protected synchronized void setupParties() throws ProtocolException { for (ProtocolToPartyConn conn : state.connections) { conn.addListener(action -> actionRequest(conn, action)); } for (ProtocolToPartyConn connection : state.connections) { try { sendSettings(connection); } catch (IOException e) { throw new ProtocolException("Failed to initialize", connection.getParty(), e); } } } /** * Inform a party about its settings * * @param connection * @throws IOException if party got disconnected */ private synchronized void sendSettings(ProtocolToPartyConn connection) throws IOException { PartyId partyid = connection.getParty(); PartyWithProfile settings = state.getPartyProfile(partyid); ProfileRef profile = settings.getProfile(); Parameters params = settings.getParty().getParameters(); connection.send(new Settings(connection.getParty(), profile, getRef(), state.getProgress(), params)); } /** * This is called when one of the party connections does an action. * Synchronized so that we always handle only 1 action at a time. This is * also called when a connection closes down, but then with a null action. * * @param partyconn the connection on which the action came in * @param action the {@link Action} taken by some party */ protected synchronized void actionRequest( final ProtocolToPartyConn partyconn, final Action action) { try { if (partyconn.getError() != null) throw partyconn.getError(); actionRequest1(partyconn, action); } catch (Throwable e) { handleError("failed to handle action " + action, partyconn.getParty(), e); } } protected synchronized void actionRequest1( final ProtocolToPartyConn partyconn, final Action action) throws ProtocolException, IOException { PartyId partyid = partyconn.getParty(); if (state.isFinal(System.currentTimeMillis())) { return; } if (action == null) throw new ProtocolException("Party sent a null action", partyid); // check if action allowed if (state.isShaopParty(partyid)) { if (!ALLOWED_ACTIONS.contains(action.getClass())) { throw new ProtocolException( "Illegal action for SHAOP Party:" + action, partyid); } } else { if (!(action instanceof Comparison)) throw new ProtocolException( "Illegal action for COB Party:" + action, partyid); } // broadcast, except if this is about eliciting comparison. if (action instanceof ElicitComparison || action instanceof Comparison) { // send actionDone only to the partner setState(state.with(partyconn.getParty(), action)); state.connections.get(state.getPartner(partyid)) .send(new ActionDone(action)); } else { if (!partyconn.getParty().equals(state.getCurrentTeam())) throw new ProtocolException( "Party acts without having the turn", partyid); setState(state.with(partyconn.getParty(), action)); // FIXME? this ignores possible broadcast errors state.connections.broadcast(new ActionDone(action)); if (!state.isFinal(System.currentTimeMillis())) nextTurn(); } } /** * Signal the current participant it's his turn * * @throws IOException */ private synchronized void nextTurn() { PartyId party = state.getCurrentTeam(); try { state.getConnections().get(party).send(new YourTurn()); } catch (IOException e) { handleError("failed to send YourTurn", party, e); } } /** * Update state to include the given error and finishes up the session. * * @param message The message to attach to the error * @param party the party where the error occured * @param e the exception that occured. */ private synchronized void handleError(final String message, final PartyId party, final Throwable e) { log.log(Level.WARNING, "SHAOP protocol intercepted error due to party " + party + ": " + message, e); if (e instanceof ProtocolException) { setState(state.with((ProtocolException) e)); } else { setState(state.with(new ProtocolException(message, party, e))); } } /** * Sets the new state. If the new state is final, the finish-up procedure is * executed. * * @param newstate the new state. */ private synchronized void setState(SHAOPState newstate) { long now = System.currentTimeMillis(); if (state.isFinal(now)) { finish(); return; } this.state = newstate; if (newstate.isFinal(now)) { finish(); } } /** * Called when we reach final state. Cancels deadline timer. Send finished * info to all parties, notify current nego state as final and set * {@link #isFinishedInfoSent}. Double calls are automatically ignored. */ private synchronized void finish() { if (deadlinetimer != null) { deadlinetimer.cancel(); deadlinetimer = null; } if (!isFinishedInfoSent.compareAndSet(false, true)) return; Inform finished = new Finished(state.getAgreements()); state.connections.stream().forEach(conn -> sendFinish(conn, finished)); notifyListeners(new CurrentNegoState(state)); } private void sendFinish(ProtocolToPartyConn connection, Inform finished) { try { connection.send(finished); connection.close(); } catch (Exception e) { log.log(Level.INFO, "Failed to send Finished to " + connection, e); } } }