source: protocol/src/main/java/geniusweb/protocol/session/shaop/SHAOP.java@ 52

Last change on this file since 52 was 52, checked in by ruud, 14 months ago

Fixed small issues in domaineditor.

File size: 13.5 KB
Line 
1package geniusweb.protocol.session.shaop;
2
3import java.io.IOException;
4import java.sql.Date;
5import java.text.SimpleDateFormat;
6import java.util.Arrays;
7import java.util.List;
8import java.util.Timer;
9import java.util.TimerTask;
10import java.util.concurrent.atomic.AtomicBoolean;
11import java.util.logging.Level;
12import java.util.stream.Collectors;
13
14import geniusweb.actions.Accept;
15import geniusweb.actions.Action;
16import geniusweb.actions.Comparison;
17import geniusweb.actions.ElicitComparison;
18import geniusweb.actions.EndNegotiation;
19import geniusweb.actions.Offer;
20import geniusweb.actions.PartyId;
21import geniusweb.deadline.Deadline;
22import geniusweb.events.ProtocolEvent;
23import geniusweb.inform.ActionDone;
24import geniusweb.inform.Finished;
25import geniusweb.inform.Inform;
26import geniusweb.inform.Settings;
27import geniusweb.inform.YourTurn;
28import geniusweb.progress.ProgressFactory;
29import geniusweb.protocol.CurrentNegoState;
30import geniusweb.protocol.ProtocolException;
31import geniusweb.protocol.partyconnection.ProtocolToPartyConn;
32import geniusweb.protocol.partyconnection.ProtocolToPartyConnFactory;
33import geniusweb.protocol.session.SessionProtocol;
34import geniusweb.protocol.session.SessionState;
35import geniusweb.protocol.session.TeamInfo;
36import geniusweb.references.Parameters;
37import geniusweb.references.PartyWithProfile;
38import geniusweb.references.ProfileRef;
39import geniusweb.references.ProtocolRef;
40import geniusweb.references.Reference;
41import tudelft.utilities.listener.DefaultListenable;
42import tudelft.utilities.logging.Reporter;
43import tudelft.utilities.repository.NoResourcesNowException;
44
45/**
46 *
47 * This is similar to SAOP but there are two types of parties: a SHAOP party and
48 * an a COB party associated with each SHAOP party.
49 * <p>
50 * Each SHAOP must receive a parameter: elicitationcost containing a Double. A
51 * party of this type can execute a RequestComparison(userid) action, which
52 * increases the total bother with the elicitationcost. The call results in the
53 * associated COB party to execute a Comparison action. If this parameter is not
54 * set, the {@link SHAOPState#DEFAULT_ELICITATATION_COST} is used.
55 * <p>
56 * A SHAOP party keeps the turn until it does an action that is not a
57 * {@link ElicitComparison}. A COB party also receives all ActionDone if a party
58 * does an action. The party takes an action only after CompareWithBid is
59 * received. It reacts with a Comparison action.
60 * <p>
61 * The Comparison/ComparisonTest actions are private between the
62 * {@link TeamInfo}. A SHAOP party can do a {@link ElicitComparison} action at
63 * any time. These actions are completely transparent to the other parties and
64 * are not dependent or influencing the normal turn taking.
65 */
66public class SHAOP extends DefaultListenable<ProtocolEvent>
67 implements SessionProtocol {
68 /**
69 * Tech note: We can not just extend SAOP because SHAOPState and
70 * SHAOPSettings do not extend SAOPState and SAOPSettings
71 */
72 public static final int TIME_MARGIN = 20;// ms extra delay after deadline
73 public static final int MINDURATION = 100;
74 public static final int MIN_SLEEP_TIME = 1000;
75 public static final int MAX_SLEEP_TIME = 60000;
76 private static final ProtocolRef SHAOP = new ProtocolRef("SHAOP");
77 private final Reporter log;
78
79 private SHAOPState state = null; // mutable!
80 private volatile AtomicBoolean isFinishedInfoSent = new AtomicBoolean(
81 false);
82 private volatile Timer deadlinetimer = null;
83 private List ALLOWED_ACTIONS = Arrays.asList(Offer.class,
84 ElicitComparison.class, Accept.class, EndNegotiation.class);
85
86 /**
87 *
88 * @param state normally the initial state coming from SAOPSettings
89 * @param logger the {@link Reporter} to use
90 */
91 public SHAOP(SHAOPState state, Reporter logger) {
92 if (state == null) {
93 throw new NullPointerException("state must be not null");
94 }
95 if (state.getSettings().getDeadline().getDuration() < MINDURATION) {
96 throw new IllegalArgumentException(
97 "Duration must be at least " + MINDURATION);
98 }
99 this.log = logger;
100 this.state = state;
101 }
102
103 @Override
104 public synchronized void start(
105 ProtocolToPartyConnFactory connectionfactory) {
106
107 try {
108 connect(connectionfactory);
109 setDeadline();
110 setupParties();
111 nextTurn();
112 } catch (Throwable e) {
113 handleError("Failed to start up session", null, e);
114 }
115 }
116
117 @Override
118 public String getDescription() {
119 return "We iterate through all PartiesTuple's, first the shaop party and then the cob party. Each gets "
120 + "YourTurn in clockwise order, after which they can do their next action. "
121 + "No new participants after start. End after prescribed deadline or when some bid is unanimously Accepted."
122 + "Parties can only act on their own behalf and only when it is their turn."
123 + "COB requests and actions are stored in the actions list but not broadcasted."
124 + "A SHAOP party keeps the turn until it does a non-COB request (Accept, Offer, EndNegotiation).";
125
126 }
127
128 @Override
129 public void addParticipant(PartyWithProfile party)
130 throws IllegalStateException {
131 throw new IllegalStateException(
132 "Dynamic joining a negotiation is not allowed in SHAOP");
133 }
134
135 @Override
136 public SessionState getState() {
137 return state;
138 }
139
140 @Override
141 public ProtocolRef getRef() {
142 return SHAOP;
143 }
144
145 /*******************************************************************
146 * private functions. Some are protected only, for testing purposes
147 ********************************************************************/
148 /**
149 * step 1 in protocol: connect all involved parties and start the clock.
150 * This always "succeeds" with a valid (but possibly final) state
151 * <p>
152 * This is 'protected' to allow junit testing, this code is not a 'public'
153 * part of the interface.
154 *
155 * @param connectionfactory the connectionfactory for making party
156 * connections
157 *
158 * @throws InterruptedException if the connection procedure is unterrupted
159 *
160 * @throws IOException if this fails to properly conect to the
161 * parties, eg interrupted or server not
162 * responding..
163 */
164 protected synchronized void connect(
165 ProtocolToPartyConnFactory connectionfactory)
166 throws InterruptedException, IOException {
167 List<PartyWithProfile> participants = state.getSettings().getTeams()
168 .stream().map(team -> team.getParties())
169 .flatMap(List::stream).collect(Collectors.toList());
170 List<Reference> parties = participants.stream()
171 .map(parti -> (parti.getParty().getPartyRef()))
172 .collect(Collectors.toList());
173 List<ProtocolToPartyConn> connections = null;
174 log.log(Level.INFO, "SHAOP connect " + parties);
175 while (connections == null) {
176 try {
177 connections = connectionfactory.connect(parties);
178 } catch (NoResourcesNowException e) {
179 long waitms = e.getLater().getTime()
180 - System.currentTimeMillis();
181 log.log(Level.INFO,
182 "No resources available to run session, waiting"
183 + waitms);
184 Thread.sleep(Math.min(MAX_SLEEP_TIME,
185 Math.max(MIN_SLEEP_TIME, waitms)));
186 }
187 }
188 for (int i = 0; i < participants.size(); i++) {
189 setState(this.state.with(connections.get(i)));
190 }
191 }
192
193 /**
194 * Set state to proper deadline. Starts the timer tasks. This tasks triggers
195 * a call to handleError when the session times out.
196 */
197 private synchronized void setDeadline() {
198 long now = System.currentTimeMillis();
199 Deadline deadline = state.getSettings().getDeadline();
200 setState(state.with(ProgressFactory.create(deadline, now)));
201 deadlinetimer = new Timer();
202 TimerTask task = new TimerTask() {
203 @Override
204 public void run() {
205 if (!state.isFinal(System.currentTimeMillis())) {
206 log.log(Level.SEVERE,
207 "BUG. Deadline timer has triggered but state is not final");
208 }
209 log.log(Level.INFO,
210 "SHAOP deadline reached. Terminating session.");
211 finish();
212 }
213 };
214// set timer TIME_MARGIN after real deadline to ensure we're not too
215// early
216 deadlinetimer.schedule(task, deadline.getDuration() + TIME_MARGIN);
217 log.log(Level.INFO, "SHAOP deadline set to "
218 + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
219 .format(new Date(System.currentTimeMillis()
220 + deadline.getDuration() + TIME_MARGIN)));
221 }
222
223 /**
224 * step 2 in protocol: listen to connections and send settings to the
225 * parties.
226 * <p>
227 * This is 'protected' to allow junit testing, this code is not a 'public'
228 * part of the interface.
229 *
230 * @throws ProtocolException if a party does not follow the protocol
231 */
232 protected synchronized void setupParties() throws ProtocolException {
233 for (ProtocolToPartyConn conn : state.connections) {
234 conn.addListener(action -> actionRequest(conn, action));
235 }
236
237 for (ProtocolToPartyConn connection : state.connections) {
238 try {
239 sendSettings(connection);
240 } catch (IOException e) {
241 throw new ProtocolException("Failed to initialize",
242 connection.getParty(), e);
243 }
244 }
245
246 }
247
248 /**
249 * Inform a party about its settings
250 *
251 * @param connection
252 * @throws IOException if party got disconnected
253 */
254 private synchronized void sendSettings(ProtocolToPartyConn connection)
255 throws IOException {
256 PartyId partyid = connection.getParty();
257 PartyWithProfile settings = state.getPartyProfile(partyid);
258 ProfileRef profile = settings.getProfile();
259 Parameters params = settings.getParty().getParameters();
260 connection.send(new Settings(connection.getParty(), profile, getRef(),
261 state.getProgress(), params));
262 }
263
264 /**
265 * This is called when one of the party connections does an action.
266 * Synchronized so that we always handle only 1 action at a time. This is
267 * also called when a connection closes down, but then with a null action.
268 *
269 * @param partyconn the connection on which the action came in
270 * @param action the {@link Action} taken by some party
271 */
272 protected synchronized void actionRequest(
273 final ProtocolToPartyConn partyconn, final Action action) {
274 try {
275 if (partyconn.getError() != null)
276 throw partyconn.getError();
277 actionRequest1(partyconn, action);
278 } catch (Throwable e) {
279 handleError("failed to handle action " + action,
280 partyconn.getParty(), e);
281 }
282 }
283
284 protected synchronized void actionRequest1(
285 final ProtocolToPartyConn partyconn, final Action action)
286 throws ProtocolException, IOException {
287 PartyId partyid = partyconn.getParty();
288 if (state.isFinal(System.currentTimeMillis())) {
289 return;
290 }
291
292 if (action == null)
293 throw new ProtocolException("Party sent a null action", partyid);
294
295 // check if action allowed
296 if (state.isShaopParty(partyid)) {
297 if (!ALLOWED_ACTIONS.contains(action.getClass())) {
298 throw new ProtocolException(
299 "Illegal action for SHAOP Party:" + action, partyid);
300 }
301 } else {
302 if (!(action instanceof Comparison))
303 throw new ProtocolException(
304 "Illegal action for COB Party:" + action, partyid);
305 }
306
307 // broadcast, except if this is about eliciting comparison.
308 if (action instanceof ElicitComparison
309 || action instanceof Comparison) {
310 // send actionDone only to the partner
311 setState(state.with(partyconn.getParty(), action));
312 state.connections.get(state.getPartner(partyid))
313 .send(new ActionDone(action));
314 } else {
315 if (!partyconn.getParty().equals(state.getCurrentTeam()))
316 throw new ProtocolException(
317 "Party acts without having the turn", partyid);
318 setState(state.with(partyconn.getParty(), action));
319 // FIXME? this ignores possible broadcast errors
320 state.connections.broadcast(new ActionDone(action));
321 if (!state.isFinal(System.currentTimeMillis()))
322 nextTurn();
323 }
324
325 }
326
327 /**
328 * Signal the current participant it's his turn
329 *
330 * @throws IOException
331 */
332 private synchronized void nextTurn() {
333 PartyId party = state.getCurrentTeam();
334 try {
335 state.getConnections().get(party).send(new YourTurn());
336 } catch (IOException e) {
337 handleError("failed to send YourTurn", party, e);
338 }
339 }
340
341 /**
342 * Update state to include the given error and finishes up the session.
343 *
344 * @param message The message to attach to the error
345 * @param party the party where the error occured
346 * @param e the exception that occured.
347 */
348 private synchronized void handleError(final String message,
349 final PartyId party, final Throwable e) {
350 log.log(Level.WARNING, "SHAOP protocol intercepted error due to party "
351 + party + ": " + message, e);
352 if (e instanceof ProtocolException) {
353 setState(state.with((ProtocolException) e));
354 } else {
355 setState(state.with(new ProtocolException(message, party, e)));
356 }
357 }
358
359 /**
360 * Sets the new state. If the new state is final, the finish-up procedure is
361 * executed.
362 *
363 * @param newstate the new state.
364 */
365 private synchronized void setState(SHAOPState newstate) {
366 long now = System.currentTimeMillis();
367 if (state.isFinal(now)) {
368 finish();
369 return;
370 }
371 this.state = newstate;
372 if (newstate.isFinal(now)) {
373 finish();
374 }
375 }
376
377 /**
378 * Called when we reach final state. Cancels deadline timer. Send finished
379 * info to all parties, notify current nego state as final and set
380 * {@link #isFinishedInfoSent}. Double calls are automatically ignored.
381 */
382 private synchronized void finish() {
383 if (deadlinetimer != null) {
384 deadlinetimer.cancel();
385 deadlinetimer = null;
386 }
387 if (!isFinishedInfoSent.compareAndSet(false, true))
388 return;
389 Inform finished = new Finished(state.getAgreements());
390 state.connections.stream().forEach(conn -> sendFinish(conn, finished));
391 notifyListeners(new CurrentNegoState(state));
392 }
393
394 private void sendFinish(ProtocolToPartyConn connection, Inform finished) {
395 try {
396 connection.send(finished);
397 connection.close();
398 } catch (Exception e) {
399 log.log(Level.INFO, "Failed to send Finished to " + connection, e);
400 }
401 }
402
403}
Note: See TracBrowser for help on using the repository browser.