source: geniuswebcore/geniusweb/protocol/session/learn/Learn.py@ 93

Last change on this file since 93 was 90, checked in by Bart Vastenhouw, 3 years ago

Refactor to help reusing partiesserver.

File size: 9.7 KB
Line 
1from datetime import datetime
2import logging
3from threading import Timer
4import threading
5from time import time, sleep
6from typing import Optional, List
7
8from tudelft.utilities.listener.DefaultListenable import DefaultListenable
9from tudelft.utilities.listener.Listener import Listener
10from tudelft.utilities.repository.NoResourcesNowException import NoResourcesNowException
11from tudelft_utilities_logging.Reporter import Reporter
12from uri.uri import URI
13
14from geniusweb.actions.Action import Action
15from geniusweb.actions.PartyId import PartyId
16from geniusweb.events.ProtocolEvent import ProtocolEvent
17from geniusweb.inform.Finished import Finished
18from geniusweb.inform.Inform import Inform
19from geniusweb.inform.Settings import Settings
20from geniusweb.progress.ProgressFactory import ProgressFactory
21from geniusweb.protocol.CurrentNegoState import CurrentNegoState
22from geniusweb.protocol.NegoState import NegoState
23from geniusweb.protocol.ProtocolException import ProtocolException
24from geniusweb.protocol.partyconnection.ProtocolToPartyConn import ProtocolToPartyConn
25from geniusweb.protocol.partyconnection.ProtocolToPartyConnFactory import ProtocolToPartyConnFactory
26from geniusweb.protocol.partyconnection.ProtocolToPartyConnections import ProtocolToPartyConnections
27from geniusweb.protocol.session.SessionProtocol import SessionProtocol
28from geniusweb.protocol.session.learn.LearnState import LearnState
29from geniusweb.references.PartyWithProfile import PartyWithProfile
30from geniusweb.references.ProtocolRef import ProtocolRef
31from geniusweb.utils import val
32
33
34MIN_SLEEP_TIME = 1000
35MAX_SLEEP_TIME = 60000
36TIME_MARGIN = 20 # ms extra delay after deadline
37LEARN = ProtocolRef(URI("Learn"))
38
39class Learn (DefaultListenable[ProtocolEvent],SessionProtocol):
40 '''
41 The Learn protocol allows parties to learn until the deadline set in the
42 {@link LearnSettings}.
43 '''
44 _deadlinetimer:Optional[Timer] = None
45# private volatile AtomicBoolean isFinishedInfoSent = new AtomicBoolean(
46# false);
47 _conns = ProtocolToPartyConnections([])
48 _synclock = threading.RLock()
49 _isFinishedInfoSent = False
50
51 def __init__(self, state:LearnState, logger:Reporter ):
52 super().__init__()
53 self._state = state
54 self._log = logger
55
56 def start(self, connectionfactory:ProtocolToPartyConnFactory ):
57 try:
58 self._connect(connectionfactory)
59 self._setDeadline()
60 self._setupParties()
61 except Exception as e:
62 self._handleError("Failed to start up session", None, e)
63
64 def getDescription(self)->str:
65 return "Sends all parties the Settings. Parties can start learning immediately but must respect the deadline. When party is done, it should send LearningDone."
66
67 def getState(self)->NegoState :
68 return self._state
69
70
71 def getRef(self)-> ProtocolRef:
72 return LEARN
73
74 def addParticipant(self, party:PartyWithProfile ) :
75 raise ValueError(
76 "Dynamic joining a negotiation is not allowed in LEARN")
77
78 #####################################################################
79 # private functions. Some are protected only, for testing purposes
80 #####################################################################
81 def _connect(self, connectionfactory:ProtocolToPartyConnFactory ):
82 '''
83 step 1 in protocol: connect all involved parties and start the clock.
84 This always "succeeds" with a valid (but possibly final) state
85 <p>
86 This is 'protected' to allow junit testing, this code is not a 'public'
87 part of the interface.
88
89 @param connectionfactory the connectionfactory for making party
90 connections
91
92 @throws InterruptedException if the connection procedure is unterrupted
93
94 @throws IOException if this fails to properly conect to the
95 parties, eg interrupted or server not
96 responding..
97 '''
98 with self._synclock:
99 participants:List[PartyWithProfile] = self._state.getSettings().getAllParties()
100
101 parties = [parti.getParty().getPartyRef() for parti in participants ]
102 connections:Optional[List[ProtocolToPartyConn]] = None
103 self._log.log(logging.INFO, "LEARN connect " + str(parties))
104 while connections == None:
105 try:
106 connections = connectionfactory.connectAll(parties) # type:ignore
107 except NoResourcesNowException as e:
108 waitms = (e.getLater().timestamp()-time())*1000
109 self._log.log(logging.INFO,
110 "No resources available to run session, waiting"+ str(waitms))
111 sleep(min(MAX_SLEEP_TIME,
112 max(MIN_SLEEP_TIME, waitms))/1000)
113 # need val because mypy fails to see that connections can't be None here.
114 for i in range(len(participants)):
115 self._conns = self._conns.With(val(connections)[i])
116 self._setState(self._state.WithParty(val(connections)[i].getParty(),
117 participants[i]))
118
119 def _actionRequest(self, partyconn:ProtocolToPartyConn , action:Action ) :
120 '''
121 This is called when one of the party connections does an action.
122 Synchronized so that we always handle only 1 action at a time.
123
124 @param partyconn the connection on which the action came in.
125 @param action the {@link Action} taken by some party
126 '''
127 with self._synclock:
128 if action == None:
129 err = partyconn.getError()
130 if err == None:
131 err = ProtocolException("Party sent a null action",
132 partyconn.getParty())
133 self._handleError(str(partyconn) + "Protocol error", partyconn.getParty(),
134 err) #type:ignore
135 return
136
137 try:
138 self._setState(self._state.WithAction(partyconn.getParty(), action))
139 except Exception as e:
140 self._handleError("failed to handle action " + str(action),
141 partyconn.getParty(), e)
142
143
144
145
146 def _setupParties(self):
147 '''
148 step 2 in protocol: listen to connections and send settings to the
149 parties.
150 <p>
151 This is 'protected' to allow junit testing, this code is not a 'public'
152 part of the interface.
153
154 @throws ProtocolException if a party does not follow the protocol
155 '''
156 this=self
157 class MyListener(Listener[Action]):
158 def notifyChange(self, act:Action):
159 this._actionRequest(act)
160
161
162 with self._synclock:
163
164 for conn in self._conns:
165 conn.addListener(MyListener())
166
167 for connection in self._conns:
168 try:
169 self._sendSettings(connection)
170 except ConnectionError as e:
171 raise ProtocolException("Failed to initialize",
172 connection.getParty(), e)
173
174
175 def _sendSettings(self, connection:ProtocolToPartyConn ):
176 '''
177 Inform a party about its settings
178
179 @param connection
180 @throws IOException if party got disconnected
181 '''
182 with self._synclock:
183 partyid = connection.getParty()
184 profile = self._state.getPartyProfiles()[partyid].getProfile()
185 params = self._state.getPartyProfiles()[partyid].getParty().getParameters()
186 if profile == None:
187 raise ValueError(
188 "Missing profile for party " + str(connection.getReference()))
189 connection.send(Settings(connection.getParty(), profile, self.getRef(),
190 val(self._state.getProgress()), params))
191
192 def _setDeadline(self):
193 '''
194 Set state to proper deadline. Starts the timer tasks. This tasks triggers
195 a call to handleError when the session times out.
196 '''
197 with self._synclock:
198 now = int(time()*1000)
199 deadline = self._state.getSettings().getDeadline()
200 self._setState(self._state.WithProgress(ProgressFactory.create(deadline, now)))
201 duration=(deadline.getDuration() + TIME_MARGIN)/1000.0
202 self._deadlinetimer = Timer(duration, self._timertask);
203
204 # set timer TIME_MARGIN after real deadline to ensure we're not too early
205 duration=(deadline.getDuration() + TIME_MARGIN)/1000.0
206 self._deadlinetimer = Timer( duration, self._timertask)
207 self._deadlinetimer.start()
208 self._log.log(logging.INFO, "SAOP deadline set to "
209 + datetime.utcfromtimestamp( (duration+now)/1000. ).strftime('%Y/%m/%d %H:%M:%S'))
210
211 def _timertask(self):
212 if not self._state.isFinal(1000.*time()):
213 self._log.log(logging.CRITICAL,
214 "BUG. Deadline timer has triggered but state is not final")
215 self._log.log(logging.INFO,
216 "LEARN deadline reached. Terminating session.")
217 self._finish()
218
219
220 def _handleError(self, message:str, party:Optional[PartyId], e:Exception):
221 '''
222 Update state to include the given error and finishes up the session.
223
224 @param message The message to attach to the error
225 @param party the party where the error occured
226 @param e the exception that occured.
227 '''
228 with self._synclock:
229 if isinstance(e, ProtocolException):
230 self._setState(self._state.WithException( e))
231 else:
232 self._setState(self._state.WithException(ProtocolException(message, party, e)))
233 self._log.log(logging.WARNING, "LEARN protocol intercepted error due to party "\
234 + str(party) + ":" + message, e)
235
236 def _setState(self, newstate:LearnState) :
237 '''
238 Sets the new state. If the new state is final, the finish-up procedure is
239 executed.
240
241 @param newstate the new state.
242 '''
243 with self._synclock:
244 now = int(1000*time())
245 if self._state.isFinal(now):
246 self._finish()
247 return
248 self._state = newstate;
249 if newstate.isFinal(now):
250 self._finish()
251
252 def _finish(self):
253 '''
254 Called when we reach final state. Cancels deadline timer. Send finished
255 info to all parties, notify current nego state as final and set
256 {@link #isFinishedInfoSent}. Double calls are automatically ignored.
257 '''
258 with self._synclock:
259 if self._deadlinetimer :
260 self._deadlinetimer.cancel()
261 self._deadlinetimer = None
262 if self._isFinishedInfoSent:
263 return;
264 self._isFinishedInfoSent=True
265 finished = Finished(self._state.getAgreements());
266 for conn in self._conns:
267 self._sendFinish(conn, finished)
268 self.notifyListeners(CurrentNegoState(self._state))
269
270
271 def _sendFinish(self, connection:ProtocolToPartyConn , finished:Inform ):
272 try:
273 connection.send(finished)
274 connection.close()
275 except Exception as e:
276 self._log.log(logging.INFO, "Failed to send Finished to " + str(connection), e)
277
278
Note: See TracBrowser for help on using the repository browser.