source: geniuswebcore/geniusweb/protocol/session/mopac/MOPAC.py@ 77

Last change on this file since 77 was 73, checked in by Bart Vastenhouw, 3 years ago

Fix for IssueValue hashcode.

File size: 12.6 KB
Line 
1import logging
2from threading import Timer
3import threading
4from time import time, sleep
5from typing import Optional, List
6
7from tudelft.utilities.listener.DefaultListenable import DefaultListenable
8from tudelft.utilities.listener.Listener import Listener
9from tudelft.utilities.repository.NoResourcesNowException import NoResourcesNowException
10from tudelft_utilities_logging.Reporter import Reporter
11from uri.uri import URI
12
13from geniusweb.actions.Action import Action
14from geniusweb.events.ProtocolEvent import ProtocolEvent
15from geniusweb.inform.Finished import Finished
16from geniusweb.inform.Settings import Settings
17from geniusweb.progress.ProgressFactory import ProgressFactory
18from geniusweb.protocol.CurrentNegoState import CurrentNegoState
19from geniusweb.protocol.ProtocolException import ProtocolException
20from geniusweb.protocol.partyconnection.ProtocolToPartyConn import ProtocolToPartyConn
21from geniusweb.protocol.partyconnection.ProtocolToPartyConnFactory import ProtocolToPartyConnFactory
22from geniusweb.protocol.partyconnection.ProtocolToPartyConnections import ProtocolToPartyConnections
23from geniusweb.protocol.session.SessionProtocol import SessionProtocol
24from geniusweb.protocol.session.mopac.MOPACState import MOPACState
25from geniusweb.references.PartyWithProfile import PartyWithProfile
26from geniusweb.references.ProtocolRef import ProtocolRef
27from geniusweb.references.Reference import Reference
28from geniusweb.utils import val
29
30
31MINDURATION = 100
32MIN_SLEEP_TIME = 1000
33MAX_SLEEP_TIME = 60000
34MOPACPROTOCOL = ProtocolRef(URI("MOPAC"))
35
36
37class MOPAC (DefaultListenable[ProtocolEvent], SessionProtocol):
38 '''
39 All parties are first sent the {@link SessionSettings}. Only parties
40 specified initially in the settings do participate (no late joining in)
41
42 <h2>parameter</h2> MOPAC parties can receive a parameter <code>power</code>
43 containing a Integer. This parameter is picked up by this protocol, for
44 handling the agreement extraction from the offers. Power=1 for parties that
45 do not have the power parameter.
46 <h2>Protocol steps</h2>
47
48 <ol>
49 <li>The protocol tries to start all parties. If not all parties start, the
50 parties are freed up and another attempt is done to start all parties some
51 time later.
52 <li>The session deadline clock now starts ticking.
53 <li>remainingparties are sent their settings.
54 <li>Loop until {@link Deadline} is reached or |remainingparties| &le; 2:
55 <ol>
56 <li>protocol sends {@link YourTurn} to all remainingparties. Each party now
57 must submit an {@link Offer} within {@link Phase#PHASE_MAXTIME} seconds. If a
58 party fails to submit it is removed from from remainingparties.
59 <li>protocol sends a {@link Voting} containing a List of Bid containing all
60 received {@link Bid}s. Each party must place his {@link Votes} within the
61 provided deadline. If a party does not submit, it is terminated and removed
62 from remainingparties. Previous votes for the same bid do not count. But see
63 {@link Agreements}. A party that misbehaves after submitting its vote is
64 removed but it Votes remain standing.
65 <li>protocol sends to all remainingparties a OptIn containing all received
66 votes. Each party now must submit again a {@link Votes} object within the
67 deadline. This new Votes must equal or extend the previous Votes of the
68 party.
69 <li>The protocol uses the {@link Phase#getEvaluator()} to determine the
70 agreements from the votes. If there are agreements, the parties that placed
71 these votes reached an agreement. They are moved to the agreement set and
72 removed from the remainingparties.
73 </ol>
74 <li>Any remaining parties are sent a {@link Finished} object without
75 agreement bid and are terminated.
76 </ol>
77 <p>
78 A party can send {@link EndNegotiation} at any time to remove itself from the
79 negotiation. The other parties may continue anyawy without him.
80 <p>
81 Parties receive the {@link Finished} information at the very end when all
82 agreements have been collected.
83 <p>
84 This logs to "Protocol" logger if there are issues
85 <p>
86 This object is mutable: the internal state changes as parties interact with
87 the protocol.
88 <p>
89 Threadsafe: all entrypoints are synhronized.
90 '''
91 # Tech notes. 0. Everything here is phase driven and runs due to callback
92 # from parties. 1. Connect all parties. 2. If any party fails at any time,
93 # add it to exceptions list in the state 3. Run timer to check the phase at
94 # possible end time. The timer just checks the state, and therefore it does
95 # no harm if it is double checked. 4. We run phases always with either
96 # PHASE_TIME or time to the global deadline, whichever comes first.
97 # Therefore phases naturally end at end of nego. 5. Phases must have at
98 # least {@link #MINDURATION} otherwise it makes no sense to even start it.
99
100 _state:Optional[MOPACState] = None; # mutable!
101
102 # the existing party connections. we assume ownership of this so it should
103 # not be modified although connections may of course break. mutable!
104 _connections:ProtocolToPartyConnections = ProtocolToPartyConnections([])
105
106 # Set to true after all is done: we sent final outcomes. This is needed
107 # because we can't tell from state what we did at the very end.
108 _finished = False
109 _timer = None
110
111 _synclock = threading.RLock()
112
113 def __init__(self, state:MOPACState , logger:Reporter ):
114 '''
115 @param state normally the initial state coming from SAOPSettings
116 @param logger the {@link Reporter} to use
117 '''
118 super().__init__()
119 if state == None:
120 raise ValueError("state must be not null")
121 if state.getSettings().getDeadline().getDuration() < MINDURATION:
122 raise ValueError(
123 "Duration must be at least " + str(MINDURATION))
124 if logger == None :
125 raise ValueError("Logger must be not null")
126
127 self._log = logger
128 self._state = state
129
130 def start(self, connectionfactory:ProtocolToPartyConnFactory ) :
131 with self._synclock:
132 try:
133 self._connect(connectionfactory)
134 except Exception as e:
135 # We can't {@link #handleError} yet. FIXME
136 raise ConnectionError("Failed to connect", e);
137
138 now = int(1000*time())
139 self._state = val(self._state).initPhase(
140 ProgressFactory.create(val(self._state).getSettings().getDeadline(), now),
141 now);
142 self._setupParties()
143 self._startPhase(now)
144
145 def getDescription(self)->str:
146 return "All parties get YourTurn. They now can submit their Offer within 30 seconds. "\
147 + "Next they receive a Voting. They can now submit their Votes."\
148 + "Then they receive a a OptIn, now they can widen up their Votes."\
149 + "If one of their Vote succeeds, they finish with an agreement. "\
150 + "The VotingEvaluator setting determines the exact voting behaviour "\
151 + "and if this process repeats"
152
153 def addParticipant(self, party:PartyWithProfile ):
154 raise ValueError("Dynamic joining a negotiation is not supported in AMOP")
155
156 def getState(self)->MOPACState :
157 return val(self._state)
158
159 def getRef(self)->ProtocolRef :
160 return MOPACPROTOCOL
161
162
163 #*******************************************************************
164 # private functions. Some are protected only, for testing purposes
165 # ********************************************************************/
166 def _connect(self, connectionfactory:ProtocolToPartyConnFactory ):
167 '''
168 step 1 in protocol: connect all involved parties and start the clock.
169 This always "succeeds" with a valid (but possibly final) state
170 <p>
171 This is 'protected' to allow junit testing, this code is not a 'public'
172 part of the interface.
173
174 @param connectionfactory the connectionfactory for making party
175 connections
176
177 @throws InterruptedException if the connection procedure is unterrupted
178
179 @throws IOException if this fails to properly conect to the
180 parties, eg interrupted or server not
181 responding..
182 '''
183 with self._synclock:
184 participants = val(self._state).getSettings().getAllParties()
185 parties:List[Reference] = \
186 [parti.getParty().getPartyRef() for parti in participants]
187 self._log.log(logging.INFO, "MOPAC connect " + str(parties))
188 newconnections:Optional[List[ProtocolToPartyConn]] = None
189
190 while newconnections == None:
191 try:
192 newconnections = connectionfactory.connectAll(parties)
193 except NoResourcesNowException as e:
194 waitms = int(1000*e.getLater().timestamp()) - int(1000*time())
195 self._log.log(logging.INFO,
196 "No resources available to run session, waiting"
197 + str(waitms))
198 sleep(min(MAX_SLEEP_TIME,
199 max(MIN_SLEEP_TIME, waitms))/1000.)
200
201 for i in range(len(participants)):
202 conn = val(newconnections)[i]
203 self._connections = self._connections.With(conn)
204 self._state = val(self._state).With(conn.getParty(), participants[i])
205
206 def _setupParties(self) :
207 '''
208 step 2 in protocol: listen to connections and send settings to the
209 parties.
210 <p>
211 This is 'protected' to allow junit testing, this code is not a 'public'
212 part of the interface.
213 '''
214 this=self
215 with self._synclock:
216 for conn in self._connections:
217
218 class ActListener(Listener[Action]):
219 def __init__(self, conn):
220 self._conn=conn
221 def notifyChange(self, action: Action):
222 this._actionRequest(self._conn, action, int(1000*time()))
223
224 conn.addListener(ActListener(conn))
225
226 for connection in self._connections:
227 try:
228 self._sendSettings(connection)
229 except ConnectionError as e:
230 self._state = val(self._state).WithException(ProtocolException("Failed to initialize",
231 connection.getParty()))
232
233 def _startPhase(self, now:int):
234 '''
235 Send the {@link Inform} for the current phase to the remaining parties
236 and start the time-out checker
237 '''
238 info = val(self._state).getPhase().getInform()
239 for pid in val(self._state).getPhase().getPartyStates().getNotYetActed() :
240 try:
241 val(self._connections.get(pid)).send(info)
242 except ConnectionError as e:
243 self._state = val(self._state).WithException(ProtocolException(
244 "Party seems to have disconnected", pid, e))
245 self._startTimer(1 + val(self._state).getPhase().getDeadline() - now)
246
247 def _startTimer(self, deadln:int):
248 '''
249 Check at given deadline if we already ended the phase.
250
251 @param deadn the deadline (ms from now)
252 '''
253 if self._timer != None:
254 raise ValueError("Timer is still running!")
255 self._timer = Timer(deadln/1000.0, self._timertask)
256 self._timer.start()
257
258 def _timertask(self):
259 self._checkEndPhase(int(1000*time()))
260
261 def _sendSettings(self, connection:ProtocolToPartyConn):
262 '''
263 Inform a party about its settings
264
265 @param connection
266 @throws ConnectionError if party got disconnected
267 '''
268 with self._synclock:
269 partyid = connection.getParty()
270 profile = val(self._state).getPartyProfiles()[partyid].getProfile()
271 params = val(self._state).getPartyProfiles()[partyid].getParty().getParameters()
272 connection.send(Settings(connection.getParty(), profile,self.getRef(),
273 val(val(self._state).getProgress()), params));
274
275 def _actionRequest(self, partyconn:ProtocolToPartyConn , action:Action ,
276 now:int):
277 '''
278 This is called when one of the {@link ProtocolToPartyConn}s does an
279 action. Synchronized so that we always handle only 1 action at a time.
280
281 @param partyconn the connection on which the action came in
282 @param action the {@link Action} taken by some party
283 @param now current time
284 '''
285 with self._synclock:
286 if self._finished:
287 return
288 self._state = val(self._state).WithAction(partyconn.getParty(), action, now)
289 self._checkEndPhase(int(1000*time()))
290
291 def _checkEndPhase(self, now:int) :
292 '''
293 The current phase may be completed. We check, because it may already been
294 handled. Proceed to next phase as needed. Reset the deadline timers and
295 inform parties. Increase progress if necessary. Must only be called
296 through {@link #endPhase} to ensure this is called only once.
297 '''
298 with self._synclock:
299 if not val(self._state).getPhase().isFinal(now):
300 return
301 # phase indeed ended. Check what's up.
302 if self._timer != None:
303 val(self._timer).cancel()
304 self._timer = None
305 self._state = val(self._state).finishPhase()
306 if self._state.isFinal(now):
307 print("state "+str(self._state)+" is final")
308 self._endNegotiation()
309 return
310
311 self._state = self._state.nextPhase(now)
312
313 self._startPhase(now)
314
315 def _endNegotiation(self):
316 '''
317 To be called when we reach final state. Must only be called if
318 {@link MOPACState#isFinal(long)}. Send finished info to all parties.
319 Double calls are automatically ignored using the global finished flag.
320 '''
321 with self._synclock:
322 if self._finished:
323 return
324 self._finished = True
325 info = Finished(self._state.getAgreements())
326 for conn in self._connections:
327 try:
328 conn.send(info)
329 conn.close()
330 except Exception as e:
331 self._log.log(logging.INFO, "Failed to send Finished to " + str(conn), e)
332 self.notifyListeners(CurrentNegoState(self._state))
Note: See TracBrowser for help on using the repository browser.