source: protocol/src/main/java/geniusweb/protocol/session/learn/Learn.java

Last change on this file was 52, checked in by ruud, 14 months ago

Fixed small issues in domaineditor.

File size: 9.7 KB
Line 
1package geniusweb.protocol.session.learn;
2
3import java.io.IOException;
4import java.sql.Date;
5import java.text.SimpleDateFormat;
6import java.util.Collections;
7import java.util.List;
8import java.util.Timer;
9import java.util.TimerTask;
10import java.util.concurrent.atomic.AtomicBoolean;
11import java.util.logging.Level;
12import java.util.stream.Collectors;
13
14import geniusweb.actions.Action;
15import geniusweb.actions.PartyId;
16import geniusweb.deadline.Deadline;
17import geniusweb.events.ProtocolEvent;
18import geniusweb.inform.Finished;
19import geniusweb.inform.Inform;
20import geniusweb.inform.Settings;
21import geniusweb.progress.ProgressFactory;
22import geniusweb.protocol.CurrentNegoState;
23import geniusweb.protocol.NegoState;
24import geniusweb.protocol.ProtocolException;
25import geniusweb.protocol.partyconnection.ProtocolToPartyConn;
26import geniusweb.protocol.partyconnection.ProtocolToPartyConnFactory;
27import geniusweb.protocol.partyconnection.ProtocolToPartyConnections;
28import geniusweb.protocol.session.SessionProtocol;
29import geniusweb.references.Parameters;
30import geniusweb.references.PartyWithProfile;
31import geniusweb.references.ProfileRef;
32import geniusweb.references.ProtocolRef;
33import geniusweb.references.Reference;
34import tudelft.utilities.listener.DefaultListenable;
35import tudelft.utilities.logging.Reporter;
36import tudelft.utilities.repository.NoResourcesNowException;
37
38/**
39 * The Learn protocol allows parties to learn until the deadline set in the
40 * {@link LearnSettings}.
41 */
42public class Learn extends DefaultListenable<ProtocolEvent>
43 implements SessionProtocol {
44 public static final int MIN_SLEEP_TIME = 1000;
45 public static final int MAX_SLEEP_TIME = 60000;
46 public static final int TIME_MARGIN = 20;// ms extra delay after deadline
47 private LearnState state;
48 private final Reporter log;
49 private static final ProtocolRef LEARN = new ProtocolRef("Learn");
50 private volatile Timer deadlinetimer = null;
51 private volatile AtomicBoolean isFinishedInfoSent = new AtomicBoolean(
52 false);
53 private volatile ProtocolToPartyConnections conns = new ProtocolToPartyConnections(
54 Collections.emptyList());
55
56 public Learn(LearnState state, Reporter logger) {
57 this.state = state;
58 this.log = logger;
59 }
60
61 @Override
62 public void start(ProtocolToPartyConnFactory connectionfactory) {
63 try {
64 connect(connectionfactory);
65 setDeadline();
66 setupParties();
67 } catch (Throwable e) {
68 handleError("Failed to start up session", null, e);
69 }
70
71 }
72
73 @Override
74 public String getDescription() {
75 return "Sends all parties the Settings. Parties can start learning immediately but must respect the deadline. When party is done, it should send LearningDone. ";
76 }
77
78 @Override
79 public NegoState getState() {
80 return state;
81 }
82
83 @Override
84 public ProtocolRef getRef() {
85 return LEARN;
86 }
87
88 @Override
89 public void addParticipant(PartyWithProfile party) {
90 throw new IllegalStateException(
91 "Dynamic joining a negotiation is not allowed in LEARN");
92 }
93
94 /*******************************************************************
95 * private functions. Some are protected only, for testing purposes
96 ********************************************************************/
97 /**
98 * step 1 in protocol: connect all involved parties and start the clock.
99 * This always "succeeds" with a valid (but possibly final) state
100 * <p>
101 * This is 'protected' to allow junit testing, this code is not a 'public'
102 * part of the interface.
103 *
104 * @param connectionfactory the connectionfactory for making party
105 * connections
106 *
107 * @throws InterruptedException if the connection procedure is unterrupted
108 *
109 * @throws IOException if this fails to properly conect to the
110 * parties, eg interrupted or server not
111 * responding..
112 */
113 protected synchronized void connect(
114 ProtocolToPartyConnFactory connectionfactory)
115 throws InterruptedException, IOException {
116 List<PartyWithProfile> participants = state.getSettings()
117 .getAllParties();
118 List<Reference> parties = participants.stream()
119 .map(parti -> (parti.getParty().getPartyRef()))
120 .collect(Collectors.toList());
121 List<ProtocolToPartyConn> connections = null;
122 log.log(Level.INFO, "LEARN connect " + parties);
123 while (connections == null) {
124 try {
125 connections = connectionfactory.connect(parties);
126 } catch (NoResourcesNowException e) {
127 long waitms = e.getLater().getTime()
128 - System.currentTimeMillis();
129 log.log(Level.INFO,
130 "No resources available to run session, waiting"
131 + waitms);
132 Thread.sleep(Math.min(MAX_SLEEP_TIME,
133 Math.max(MIN_SLEEP_TIME, waitms)));
134 }
135 }
136 for (int i = 0; i < participants.size(); i++) {
137 conns = conns.with(connections.get(i));
138 setState(this.state.with(connections.get(i).getParty(),
139 participants.get(i)));
140 }
141 }
142
143 /**
144 * This is called when one of the party connections does an action.
145 * Synchronized so that we always handle only 1 action at a time.
146 *
147 * @param partyconn the connection on which the action came in.
148 * @param action the {@link Action} taken by some party
149 */
150 protected synchronized void actionRequest(
151 final ProtocolToPartyConn partyconn, final Action action) {
152 if (action == null) {
153 Throwable err = partyconn.getError();
154 if (err == null) {
155 err = new ProtocolException("Party sent a null action",
156 partyconn.getParty());
157 }
158 handleError(partyconn + "Protocol error", partyconn.getParty(),
159 err);
160 return;
161 }
162
163 try {
164 setState(state.with(partyconn.getParty(), action));
165 } catch (Throwable e) {
166 handleError("failed to handle action " + action,
167 partyconn.getParty(), e);
168 }
169
170 }
171
172 /**
173 * step 2 in protocol: listen to connections and send settings to the
174 * parties.
175 * <p>
176 * This is 'protected' to allow junit testing, this code is not a 'public'
177 * part of the interface.
178 *
179 * @throws ProtocolException if a party does not follow the protocol
180 */
181 protected synchronized void setupParties() throws ProtocolException {
182 for (ProtocolToPartyConn conn : conns) {
183 conn.addListener(action -> actionRequest(conn, action));
184 }
185
186 for (ProtocolToPartyConn connection : conns) {
187 try {
188 sendSettings(connection);
189 } catch (IOException e) {
190 throw new ProtocolException("Failed to initialize",
191 connection.getParty(), e);
192 }
193 }
194
195 }
196
197 /**
198 * Inform a party about its settings
199 *
200 * @param connection
201 * @throws IOException if party got disconnected
202 */
203 private synchronized void sendSettings(ProtocolToPartyConn connection)
204 throws IOException {
205 PartyId partyid = connection.getParty();
206 ProfileRef profile = state.getPartyProfiles().get(partyid).getProfile();
207 Parameters params = state.getPartyProfiles().get(partyid).getParty()
208 .getParameters();
209 if (profile == null) {
210 throw new IllegalArgumentException(
211 "Missing profile for party " + connection.getReference());
212 }
213 connection.send(new Settings(connection.getParty(), profile, getRef(),
214 state.getProgress(), params));
215 }
216
217 /**
218 * Set state to proper deadline. Starts the timer tasks. This tasks triggers
219 * a call to handleError when the session times out.
220 */
221 private synchronized void setDeadline() {
222 long now = System.currentTimeMillis();
223 Deadline deadline = state.getSettings().getDeadline();
224 setState(state.with(ProgressFactory.create(deadline, now)));
225 deadlinetimer = new Timer();
226 TimerTask task = new TimerTask() {
227 @Override
228 public void run() {
229 if (!state.isFinal(System.currentTimeMillis())) {
230 log.log(Level.SEVERE,
231 "BUG. Deadline timer has triggered but state is not final");
232 }
233 log.log(Level.INFO,
234 "LEARN deadline reached. Terminating session.");
235 finish();
236 }
237 };
238 // set timer TIME_MARGIN after real deadline to ensure we're not too
239 // early
240 deadlinetimer.schedule(task, deadline.getDuration() + TIME_MARGIN);
241 log.log(Level.INFO, "LEARN deadline set to "
242 + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
243 .format(new Date(System.currentTimeMillis()
244 + deadline.getDuration() + TIME_MARGIN)));
245 }
246
247 /**
248 * Update state to include the given error and finishes up the session.
249 *
250 * @param message The message to attach to the error
251 * @param party the party where the error occured
252 * @param e the exception that occured.
253 */
254 private synchronized void handleError(final String message,
255 final PartyId party, final Throwable e) {
256 if (e instanceof ProtocolException) {
257 setState(state.with((ProtocolException) e));
258 } else {
259 setState(state.with(new ProtocolException(message, party, e)));
260 }
261 log.log(Level.WARNING, "LEARN protocol intercepted error due to party "
262 + party + ":" + message, e);
263 }
264
265 /**
266 * Sets the new state. If the new state is final, the finish-up procedure is
267 * executed.
268 *
269 * @param newstate the new state.
270 */
271 private synchronized void setState(LearnState newstate) {
272 long now = System.currentTimeMillis();
273 if (state.isFinal(now)) {
274 finish();
275 return;
276 }
277 this.state = newstate;
278 if (newstate.isFinal(now)) {
279 finish();
280 }
281 }
282
283 /**
284 * Called when we reach final state. Cancels deadline timer. Send finished
285 * info to all parties, notify current nego state as final and set
286 * {@link #isFinishedInfoSent}. Double calls are automatically ignored.
287 */
288 private synchronized void finish() {
289 if (deadlinetimer != null) {
290 deadlinetimer.cancel();
291 deadlinetimer = null;
292 }
293 if (!isFinishedInfoSent.compareAndSet(false, true))
294 return;
295 Inform finished = new Finished(state.getAgreements());
296 for (ProtocolToPartyConn conn : conns) {
297 sendFinish(conn, finished);
298 }
299 notifyListeners(new CurrentNegoState(state));
300 }
301
302 private void sendFinish(ProtocolToPartyConn connection, Inform finished) {
303 try {
304 connection.send(finished);
305 connection.close();
306 } catch (Exception e) {
307 log.log(Level.INFO, "Failed to send Finished to " + connection, e);
308 }
309 }
310}
Note: See TracBrowser for help on using the repository browser.