package geniusweb.protocol.session.amop;
import java.io.IOException;
import java.sql.Date;
import java.text.SimpleDateFormat;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.stream.Collectors;
import geniusweb.actions.Action;
import geniusweb.actions.EndNegotiation;
import geniusweb.actions.Offer;
import geniusweb.actions.PartyId;
import geniusweb.actions.Vote;
import geniusweb.actions.Votes;
import geniusweb.deadline.Deadline;
import geniusweb.events.ProtocolEvent;
import geniusweb.inform.ActionDone;
import geniusweb.inform.Agreements;
import geniusweb.inform.Finished;
import geniusweb.inform.Inform;
import geniusweb.inform.Settings;
import geniusweb.inform.Voting;
import geniusweb.inform.YourTurn;
import geniusweb.issuevalue.Bid;
import geniusweb.progress.ProgressFactory;
import geniusweb.protocol.CurrentNegoState;
import geniusweb.protocol.ProtocolException;
import geniusweb.protocol.partyconnection.ProtocolToPartyConn;
import geniusweb.protocol.partyconnection.ProtocolToPartyConnFactory;
import geniusweb.protocol.partyconnection.ProtocolToPartyConnections;
import geniusweb.protocol.session.SessionProtocol;
import geniusweb.protocol.session.SessionSettings;
import geniusweb.protocol.session.amop.AMOPState.Phase;
import geniusweb.references.Parameters;
import geniusweb.references.PartyWithProfile;
import geniusweb.references.ProfileRef;
import geniusweb.references.ProtocolRef;
import geniusweb.references.Reference;
import tudelft.utilities.listener.DefaultListenable;
import tudelft.utilities.logging.Reporter;
import tudelft.utilities.repository.NoResourcesNowException;
/**
*
* All parties are first sent the {@link SessionSettings}. Only parties
* specified initially in the settings do participate.
*
*
parameter
AMOP parties can receive a parameter: minVotes containing
* a Double. If set, the AMOP protocol checks that all {@link Votes} from that
* party have {@link Vote#getMinPower()} > minVotes.
* Protocol steps
*
* - The protocol tries to start all parties. If not all parties start, the
* parties are freed up and another attempt is done to start all parties some
* time later.
*
- the variable remainingparties = {all parties}
*
- The session deadline clock now starts ticking.
*
- All parties are sent their settings.
*
- Loop until {@link Deadline} is reached or |remainingparties|<2:
*
* - protocol sends {@link YourTurn} to all remainingparties. Each party now
* must submit an {@link Offer} within {@link #PHASE_TIME} seconds. If a party
* fails to submit it is send Finished and removed from from remainingparties.
*
- protocol sends a {@link Voting} containing a List of {@link Bid}
* containing all received {@link Bid}s. Each party must place his {@link Votes}
* within {@link #PHASE_TIME} seconds. If a party does not submit, it is send a
* {@link Finished} and removed from remainingparties. Previous votes for the
* same bid do not count. But see {@link Agreements}.
*
- The protocol determines a maximum sized subset of size N≥2 of the
* votes for bid B for which the vote conditions hold. If there is such a
* subset, the parties that placed these votes reached an agreement. They are
* added to the agreement set, sent the {@link Finished} info and terminated.
* They are removed from the remainingparties.
*
* - Any remaining parties are sent a {@link Finished} object without
* agreement bid and are terminated.
*
*
* A party can send {@link EndNegotiation} at any time to remove itself from the
* negotiation. The other parties may continue anyawy without him.
*
* Parties receive the {@link Finished} information at the very end when all
* agreements have been collected.
*
* This logs to "Protocol" logger if there are issues
*
* This object is mutable: the internal state changes as parties interact with
* the protocol.
*
* Threadsafe: all entrypoints are synhronized.
*/
public class AMOP extends DefaultListenable
implements SessionProtocol {
private static final long PHASE_TIME = 30000; // millis
private static final long TIME_MARGIN = 20l;// ms extra delay after deadline
private static final int MINDURATION = 100;
private static final int MIN_SLEEP_TIME = 1000;
private static final int MAX_SLEEP_TIME = 60000;
private static final ProtocolRef AMOP = new ProtocolRef("AMOP");
private final Reporter log;
private AMOPState state = null; // mutable!
private volatile WillBeCalled finish = new WillBeCalled(() -> finish(),
null);
private volatile WillBeCalled endPhase = new WillBeCalled(() -> endPhase(),
null);
/**
*
* @param state normally the initial state coming from SAOPSettings
* @param logger the {@link Reporter} to use
*/
public AMOP(AMOPState state, Reporter logger) {
if (state == null) {
throw new NullPointerException("state must be not null");
}
if (state.getSettings().getDeadline().getDuration() < MINDURATION) {
throw new IllegalArgumentException(
"Duration must be at least " + MINDURATION);
}
if (logger == null) {
throw new NullPointerException("Logger must be not null");
}
this.log = logger;
this.state = state;
}
@Override
public synchronized void start(
ProtocolToPartyConnFactory connectionfactory) {
try {
// we're in Phase.INIT still
connect(connectionfactory);
// this moves to
setDeadline();
setupParties();
endPhase.complete(); // end of INIT
} catch (Throwable e) {
handleError("Failed to start up session", new PartyId("UNKNOWN"),
e);
}
}
@Override
public String getDescription() {
return "All parties get YourTurn. They now can submit their Offer within 30 seconds. "
+ "Next they receive a ElicitComparison. They can now submit their Votes."
+ "If one of their Vote succeeds, they finish with an agreement. "
+ "Parties that did not yet reach agreement continue negotiating. "
+ "A parameter 'minVotes', if set, contains the the smallest value to be used in each Vote";
}
@Override
public void addParticipant(PartyWithProfile party)
throws IllegalStateException {
throw new IllegalStateException(
"Dynamic joining a negotiation is not supported in AMOP");
}
@Override
public AMOPState getState() {
return state;
}
@Override
public ProtocolRef getRef() {
return AMOP;
}
/*******************************************************************
* private functions. Some are protected only, for testing purposes
********************************************************************/
/**
* step 1 in protocol: connect all involved parties and start the clock.
* This always "succeeds" with a valid (but possibly final) state
*
* This is 'protected' to allow junit testing, this code is not a 'public'
* part of the interface.
*
* @param connectionfactory the connectionfactory for making party
* connections
*
* @throws InterruptedException if the connection procedure is unterrupted
*
* @throws IOException if this fails to properly conect to the
* parties, eg interrupted or server not
* responding..
*/
protected synchronized void connect(
ProtocolToPartyConnFactory connectionfactory)
throws InterruptedException, IOException {
List participants = state.getSettings()
.getAllParties();
List parties = participants.stream()
.map(parti -> (parti.getParty().getPartyRef()))
.collect(Collectors.toList());
List connections = null;
log.log(Level.INFO, "AMOP connect " + parties);
while (connections == null) {
try {
connections = connectionfactory.connect(parties);
} catch (NoResourcesNowException e) {
long waitms = e.getLater().getTime()
- System.currentTimeMillis();
log.log(Level.INFO,
"No resources available to run session, waiting"
+ waitms);
Thread.sleep(Math.min(MAX_SLEEP_TIME,
Math.max(MIN_SLEEP_TIME, waitms)));
}
}
for (int i = 0; i < participants.size(); i++) {
setState(this.state.with(connections.get(i), participants.get(i)));
}
}
/**
* overridable factory method, used for testing.
*
* @param r the {@link Runnable} that will be called
* @param delayMs the time after which r will be called
* @return new WillBeCalled(r,delayms)
*/
protected WillBeCalled createWillBeCalled(Runnable r, Long delayMs) {
return new WillBeCalled(r, delayMs);
}
/**
* Set state to proper deadline. Starts the timer tasks. This tasks triggers
* a call to handleError when the session times out.
*/
private void setDeadline() {
long now = System.currentTimeMillis();
Deadline deadline = state.getSettings().getDeadline();
setState(state.with(ProgressFactory.create(deadline, now)));
finish = createWillBeCalled(() -> finish(),
TIME_MARGIN + deadline.getDuration());
log.log(Level.INFO, "AMOP deadline set to "
+ new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
.format(new Date(System.currentTimeMillis()
+ deadline.getDuration() + TIME_MARGIN)));
}
/**
* step 2 in protocol: listen to connections and send settings to the
* parties.
*
* This is 'protected' to allow junit testing, this code is not a 'public'
* part of the interface.
*
* @throws ProtocolException if a party does not follow the protocol
*/
protected synchronized void setupParties() throws ProtocolException {
for (ProtocolToPartyConn conn : state.getConnections()) {
conn.addListener(action -> actionRequest(conn, action));
}
for (ProtocolToPartyConn connection : state.getConnections()) {
try {
sendSettings(connection);
} catch (IOException e) {
throw new ProtocolException("Failed to initialize",
connection.getParty(), e);
}
}
}
/**
* Inform a party about its settings
*
* @param connection
* @throws IOException if party got disconnected
*/
private synchronized void sendSettings(ProtocolToPartyConn connection)
throws IOException {
PartyId partyid = connection.getParty();
ProfileRef profile = state.getPartyProfiles().get(partyid).getProfile();
Parameters params = state.getPartyProfiles().get(partyid).getParty()
.getParameters();
if (profile == null) {
throw new IllegalArgumentException(
"Missing profile for party " + connection.getReference());
}
connection.send(new Settings(connection.getParty(), profile, getRef(),
state.getProgress(), params));
}
/**
* This is called when one of the {@link ProtocolToPartyConn}s does an
* action. Synchronized so that we always handle only 1 action at a time.
*
* @param partyconn the connection on which the action came in
* @param action the {@link Action} taken by some party
*/
protected synchronized void actionRequest(
final ProtocolToPartyConn partyconn, final Action action) {
if (action == null) {
Throwable err = partyconn.getError();
if (err == null) {
err = new ProtocolException("Party sent a null action",
partyconn.getParty());
}
handleError(partyconn + "Protocol error", partyconn.getParty(),
err);
return;
}
try {
setState(state.with(partyconn.getParty(), action));
} catch (Throwable e) {
handleError("failed to handle action " + action,
partyconn.getParty(), e);
}
// we ignore broadcast errors here, may need to be fixed.
broadcast(new ActionDone(action));
if (state.isAllPartiesActed()) {
endPhase.complete();
}
}
/**
* The current phase is completed. Proceed to next phase as needed. Reset
* the deadline timers and inform parties. Increase progress if necessary.
* Must only be called through {@link #endPhase} to ensure this is called
* only once.
*/
private synchronized void endPhase() {
if (state.isFinal(System.currentTimeMillis()))
return;
List bids = null;
if (state.getPhase() == Phase.OFFER) {
bids = state.getPhaseActions().values().stream()
.map(act -> ((Offer) act)).collect(Collectors.toList());
}
setState(state.nextPhase());
if (state.getPhase() == Phase.OFFER) {
broadcast(new YourTurn());
} else {
Map powers = state.getActiveParties().stream()
.collect(Collectors.toMap(pid -> pid, pid -> 1));
broadcast(new Voting(bids, powers));
}
/**
* Really tricky but we can safely replace {@link #endPhase}: all
* functions are synchronized so the state is always consistent when
* this is called. MNMMM Yes and then?? need proof that not called right
* after replace. I think we need proof that #endPhase is always
* consistent with state , that phase in state is coupled
*/
endPhase = createWillBeCalled(() -> endPhase(),
PHASE_TIME + TIME_MARGIN);
}
/**
* Alternative to {@link ProtocolToPartyConnections#broadcast(Inform)},
* sending only to active parties
*
* @param info the info to broadcast
*/
private void broadcast(Inform info) {
for (PartyId party : state.getActiveParties()) {
try {
state.getConnections().get(party).send(info);
} catch (IOException e) {
handleError("Party seems to have disconnected", party, e);
}
}
}
/**
* Update state to include the given error and closes the party.
*
* @param message The message to attach to the error
* @param party the party where the error occured
* @param e the exception that occured.
*/
private synchronized void handleError(final String message,
final PartyId partyid, final Throwable e) {
if (e instanceof ProtocolException) {
setState(state.with(partyid, (ProtocolException) e));
} else {
setState(state.with(partyid,
new ProtocolException(message, partyid, e)));
}
log.log(Level.WARNING,
"AMOP intercepted error from party " + partyid + ":" + message,
e);
}
/**
* Sets the new state. If the new state is final, the finish-up procedure is
* executed. All state changes go through here to ensure we check isFinal
* before any state change.
*
* @param newstate the new state.
*/
private synchronized void setState(AMOPState newstate) {
long now = System.currentTimeMillis();
if (state.isFinal(now)) {
finish.complete();
return;
}
this.state = newstate;
if (newstate.isFinal(now)) {
finish.complete();
}
}
/**
* Called when we reach final state. Must be called through {@link #finish}
* to ensure this is called only once. Send finished info to all parties,
* notify current nego state as final and set {@link #isFinishedInfoSent}.
* Double calls are automatically ignored.
*/
private synchronized void finish() {
Finished finished = new Finished(state.getAgreements());
for (ProtocolToPartyConn conn : state.getConnections()) {
try {
conn.send(finished);
conn.close();
} catch (Exception e) {
log.log(Level.INFO, "Failed to send Finished to " + conn, e);
}
}
notifyListeners(new CurrentNegoState(state));
}
}