source: geniuswebcore/geniusweb/protocol/session/saop/SAOP.py@ 77

Last change on this file since 77 was 73, checked in by Bart Vastenhouw, 3 years ago

Fix for IssueValue hashcode.

File size: 12.1 KB
Line 
1from datetime import datetime
2import logging
3from threading import Timer
4import threading
5import time
6import traceback
7from typing import Optional, List
8
9from tudelft.utilities.listener.DefaultListenable import DefaultListenable
10from tudelft.utilities.listener.Listener import Listener
11from tudelft.utilities.repository.NoResourcesNowException import NoResourcesNowException
12from tudelft_utilities_logging.Reporter import Reporter
13from uri.uri import URI
14
15from geniusweb.actions.Action import Action
16from geniusweb.actions.PartyId import PartyId
17from geniusweb.events.ProtocolEvent import ProtocolEvent
18from geniusweb.inform.ActionDone import ActionDone
19from geniusweb.inform.Finished import Finished
20from geniusweb.inform.Inform import Inform
21from geniusweb.inform.Settings import Settings
22from geniusweb.inform.YourTurn import YourTurn
23from geniusweb.progress.ProgressFactory import ProgressFactory
24from geniusweb.protocol.CurrentNegoState import CurrentNegoState
25from geniusweb.protocol.NegoState import NegoState
26from geniusweb.protocol.ProtocolException import ProtocolException
27from geniusweb.protocol.partyconnection.ProtocolToPartyConn import ProtocolToPartyConn
28from geniusweb.protocol.partyconnection.ProtocolToPartyConnFactory import ProtocolToPartyConnFactory
29from geniusweb.protocol.partyconnection.ProtocolToPartyConnections import ProtocolToPartyConnections
30from geniusweb.protocol.session.SessionProtocol import SessionProtocol
31from geniusweb.protocol.session.SessionState import SessionState
32from geniusweb.protocol.session.saop.SAOPState import SAOPState
33from geniusweb.references.PartyWithProfile import PartyWithProfile
34from geniusweb.references.ProtocolRef import ProtocolRef
35from geniusweb.references.Reference import Reference
36from geniusweb.utils import val
37
38
39class SAOP (DefaultListenable[ProtocolEvent], SessionProtocol):
40 '''
41 The protocol runs as follows
42 <ol>
43 <li>The protocol tries to start all parties. If not all parties start, the
44 parties are freed up and another attempt is done to start all parties some
45 time later.
46 <li>All parties are sent the {@link SessionSettings}. Only parties specified
47 initially in the settings do participate.
48 <li>The session deadline clock now starts ticking.
49 <li>All parties are sent their settings.
50 <li>All parties get YourTurn in clockwise order. A party must do exactly one
51 action after it received YourTurn.
52 <li>The negotiation continues until an agreement is reached (all parties
53 agreed to the last bid), the {@link Deadline} is reached, or a party fails to
54 adhere to the protocol.
55 <li>If the session times out, the connections are cut and the negotiation
56 completes without errors and without agreement.
57 </ol>
58 <p>
59 This logs to "Protocol" logger if there are issues
60 <p>
61 This object is mutable: the internal state changes as parties interact with
62 the protocol.
63 <p>
64 Thread safe: all entry points are synchronized.
65 '''
66 _TIME_MARGIN = 20; # ms extra delay after deadline
67 _MINDURATION = 100;
68 _MIN_SLEEP_TIME = 1000;
69 _MAX_SLEEP_TIME = 60000;
70 _SAOP = ProtocolRef(URI("SAOP"))
71
72 _isFinishedInfoSent = False
73 _deadlinetimer:Optional[Timer] = None
74 _synclock = threading.RLock()
75
76 def __init__(self, state:SAOPState , logger:Reporter ,
77 connects:ProtocolToPartyConnections=ProtocolToPartyConnections([]) ) :
78 '''
79 @param state normally the initial state coming from SAOPSettings
80 @param logger the Reporter to log to
81 @param connects the connections to the parties. Defaults to no-connections.
82 '''
83 super().__init__()
84 if state == None:
85 raise ValueError("state must be not null")
86
87 if state.getSettings().getDeadline().getDuration() < self._MINDURATION :
88 raise ValueError("Duration must be at least " + str(self._MINDURATION))
89
90 self._log = logger
91 self._state = state
92 self._conns = connects
93
94 def start(self, connectionfactory:ProtocolToPartyConnFactory ):
95 with self._synclock:
96 try:
97 self._connect(connectionfactory)
98 self._setDeadline()
99 self._setupParties()
100 self._nextTurn()
101 except Exception as e:
102 traceback.print_exc() #for debugging.
103 self._handleError("Failed to start up session", None, e)
104
105 def getDescription(self)->str:
106 return "All parties get YourTurn in clockwise order, after which they can do their next action. "\
107 + "No new participants after start. End after prescribed deadline or when some bid is unanimously Accepted."\
108 + "Parties can only act on their own behalf and only when it is their turn."
109
110 def addParticipant(self, party:PartyWithProfile ):
111 raise ValueError("Dynamic joining a negotiation is not allowed in SAOP")
112
113 def getState(self)-> SessionState :
114 return self._state
115
116 def getRef(self)->ProtocolRef :
117 return self._SAOP
118
119 #*******************************************************************
120 # private functions. Some are protected only, for testing purposes
121 # ******************************************************************
122
123 def _connect(self, connectionfactory:ProtocolToPartyConnFactory):
124 '''
125 step 1 in protocol: connect all involved parties and start the clock.
126 This always "succeeds" with a valid (but possibly final) state
127 <p>
128 This is 'protected' to allow junit testing, this code is not a 'public'
129 part of the interface.
130
131 @param connectionfactory the connectionfactory for making party
132 connections
133
134 @throws InterruptedException if the connection procedure is unterrupted
135
136 @throws IOException if this fails to properly conect to the
137 parties, eg interrupted or server not
138 responding..
139 '''
140 with self._synclock:
141 participants = self._state.getSettings().getAllParties();
142 parties:List[Reference] = [party.getParty().getPartyRef() for party in participants ]
143 connections:Optional[List[ProtocolToPartyConn] ] = None
144 self._log.log(logging.INFO, "SAOP connect " + str(parties));
145 while not connections:
146 try:
147 connections = connectionfactory.connectAll(parties)
148 except NoResourcesNowException as e:
149 waitms = (e.getLater().timestamp() - time.time())*1000
150 self._log.log(logging.INFO,
151 "No resources available to run session, waiting"+ str(waitms))
152 time.sleep(min(self._MAX_SLEEP_TIME,
153 max(self._MIN_SLEEP_TIME, waitms)))
154
155 for i in range(len(participants)):
156 # now we bookkeep the connections ourselves,
157 # and update the state to keep in sync.
158 self._conns = self._conns.With(connections[i])
159 self._setState(self._state.WithParty(connections[i].getParty(),
160 participants[i]))
161
162 def _setDeadline(self):
163 '''
164 Set state to proper deadline. Starts the timer tasks. This tasks triggers
165 a call to handleError when the session times out.
166 '''
167 with self._synclock:
168 now = time.time()
169 deadline = self._state.getSettings().getDeadline()
170 self._setState(self._state.WithProgress(ProgressFactory.create(deadline, 1000*now)))
171
172 # set timer TIME_MARGIN after real deadline to ensure we're not too early
173 duration=(deadline.getDuration() + self._TIME_MARGIN)/1000.0
174 self._deadlinetimer = Timer( duration, self._timertask)
175 self._deadlinetimer.start()
176 self._log.log(logging.INFO, "SAOP deadline set to "
177 + datetime.utcfromtimestamp(duration+now).strftime('%Y/%m/%d %H:%M:%S'))
178
179 def _timertask(self):
180 if not self._state.isFinal(1000.*time.time()):
181 self._log.log(logging.CRITICAL,
182 "BUG. Deadline timer has triggered but state is not final")
183 self._log.log(logging.INFO,
184 "SAOP deadline reached. Terminating session.")
185 self._finish()
186
187
188 def _setupParties(self) :
189 '''
190 step 2 in protocol: listen to connections and send settings to the
191 parties.
192 <p>
193 This is 'protected' to allow junit testing, this code is not a 'public'
194 part of the interface.
195
196 @throws ProtocolException if a party does not follow the protocol
197 '''
198 this=self
199 class MyListener(Listener[Action]):
200 def __init__(self, conn):
201 self.conn=conn
202 def notifyChange(self, action: Action):
203 this._actionRequest(self.conn,action)
204
205 with self._synclock:
206 for conn in self._conns:
207 conn.addListener(MyListener(conn))
208
209 for connection in self._conns:
210 try :
211 self._sendSettings(connection)
212 except ConnectionError as e:
213 raise ProtocolException("Failed to initialize",
214 connection.getParty(), e)
215
216
217 def _sendSettings(self, connection:ProtocolToPartyConn):
218 '''
219 Inform a party about its settings
220
221 @param connection
222 @throws ConnectionError if party got disconnected
223 '''
224 with self._synclock:
225 partyid = connection.getParty()
226 profile = self._state.getPartyProfiles()[partyid].getProfile()
227 params = self._state.getPartyProfiles()[partyid].getParty()\
228 .getParameters();
229 if not profile:
230 raise ValueError(
231 "Missing profile for party " + str(connection.getReference()))
232 connection.send(Settings(connection.getParty(), profile, self.getRef(),
233 val(self._state.getProgress()), params))
234
235 def _actionRequest(self, partyconn:ProtocolToPartyConn , action:Action):
236 '''
237 This is called when one of the party connections does an action.
238 Synchronized so that we always handle only 1 action at a time.
239
240 @param partyconn the connection on which the action came in.
241 @param action the {@link Action} taken by some party
242 '''
243 with self._synclock:
244 if not action:
245 err = partyconn.getError();
246 if not err:
247 err = ProtocolException("Party sent a null action",
248 partyconn.getParty())
249 self._handleError(str(partyconn) + "Protocol error", partyconn.getParty(),
250 err)
251 return
252
253 try:
254 if partyconn.getParty() != self._state._getNextActor() :
255 # party does not have the turn.
256 raise ProtocolException(
257 "Party acts without having the turn",
258 partyconn.getParty());
259
260 # FIXME? this ignores possible broadcast errors
261 self._conns.broadcast(ActionDone(action))
262 self._setState(self._state.WithAction(partyconn.getParty(), action))
263 if not self._state.isFinal(int(1000*time.time())):
264 self._nextTurn()
265 except Exception as e:
266 self._handleError("failed to handle action " + str(action),
267 partyconn.getParty(), e)
268
269
270 def _nextTurn(self):
271 '''
272 Signal next participant it's his turn
273
274
275 @throws IOException
276 '''
277 with self._synclock:
278 party = self._state._getNextActor()
279 try:
280 self._conns.get(party).send(YourTurn())
281 except ConnectionError as e:
282 self._handleError("failed to send YourTurn", party, e)
283
284 def _handleError(self, message:str, party:Optional[PartyId], e:Exception):
285 '''
286 Update state to include the given error and finishes up the session.
287
288 @param message The message to attach to the error
289 @param party the party where the error occured
290 @param e the exception that occured.
291 '''
292 with self._synclock:
293 if isinstance(e, ProtocolException):
294 self._setState(self._state.WithException( e))
295 else:
296 self._setState(self._state.WithException(ProtocolException(message, party, e)))
297 if party:
298 self._setState(self._state.WithoutParty(party))
299 self._log.log(logging.WARNING, "SAOP protocol intercepted error due to party "\
300 + str(party) + ":" + message, e)
301
302 def _setState(self, newstate:SAOPState) :
303 '''
304 Sets the new state. If the new state is final, the finish-up procedure is
305 executed.
306
307 @param newstate the new state.
308 '''
309 with self._synclock:
310 now = int(1000*time.time())
311 if self._state.isFinal(now):
312 self._finish()
313 return
314 self._state = newstate;
315 if newstate.isFinal(now):
316 self._finish()
317
318 def _finish(self):
319 '''
320 Called when we reach final state. Cancels deadline timer. Send finished
321 info to all parties, notify current nego state as final and set
322 {@link #isFinishedInfoSent}. Double calls are automatically ignored.
323 '''
324 with self._synclock:
325 if self._deadlinetimer :
326 self._deadlinetimer.cancel()
327 self._deadlinetimer = None
328 if self._isFinishedInfoSent:
329 return;
330 self._isFinishedInfoSent=True
331 finished = Finished(self._state.getAgreements());
332 for conn in self._conns:
333 self._sendFinish(conn, finished)
334 self.notifyListeners(CurrentNegoState(self._state))
335
336 def _sendFinish(self, connection:ProtocolToPartyConn , finished:Inform ):
337 try:
338 connection.send(finished)
339 connection.close()
340 except Exception as e:
341 self._log.log(logging.INFO, "Failed to send Finished to " + str(connection), e)
342
Note: See TracBrowser for help on using the repository browser.