source: geniuswebcore/geniusweb/voting/CollectedVotes.py@ 77

Last change on this file since 77 was 73, checked in by Bart Vastenhouw, 3 years ago

Fix for IssueValue hashcode.

File size: 9.3 KB
Line 
1from typing import Dict, Set, FrozenSet
2
3from tudelft.utilities.immutablelist.FixedList import FixedList
4from tudelft.utilities.immutablelist.PowerSet import PowerSet
5
6from geniusweb.PriorityQueue import PriorityQueue
7from geniusweb.actions.PartyId import PartyId
8from geniusweb.actions.Vote import Vote
9from geniusweb.actions.Votes import Votes
10from geniusweb.issuevalue.Bid import Bid
11from geniusweb.utils import toTuple, toStr
12
13
14# who removed sign function from python?
15sign = lambda a: (a>0) - (a<0)
16
17class CollectedVotes:
18 '''
19 Utility class. Typically contains all the votes collected from 1 round. Can
20 find the best combinations of votes, eg to get the maximum consensus group
21 '''
22
23
24 def __init__(self, votesMap:Dict[PartyId, Votes] , powers:Dict[PartyId, int] ) :
25 ''''
26 @param votesMap the {@link Votes} done for each party
27 @param powers the power of each party. The power is an integer number.
28 In the voting process, the party powers add up to form
29 the total voting power. This set may contain parties that
30 are not in newmap.
31 '''
32 self._allvotes = dict(votesMap)
33 self._powers = dict(powers)
34 self._allBids = None
35
36 if not set(votesMap.keys()).issubset(set(powers.keys())):
37 raise ValueError("All voting parties must have a power")
38
39 def With(self, votes:Votes , power:int)->"CollectedVotes":
40 '''
41 @param votes additional set of votes for a new party that did not yet
42 vote.
43 @param power powers for the new party, can be null if power already set
44 previously.
45 @return new CollectedVotes with the new votes added.
46 '''
47 actor = votes.getActor()
48 if actor in self._allvotes:
49 raise ValueError("Party " + str(actor) + " already voted")
50
51 newpowers:Dict[PartyId, int] = dict(self._powers)
52 if not actor in self._powers:
53 if power == None:
54 raise ValueError("Power must be set for new actor " + str(actor))
55 newpowers[actor]= power
56
57 newmap = dict(self._allvotes)
58 newmap[actor]= votes
59 return CollectedVotes(newmap, newpowers)
60
61 def getVotes(self)->Dict[PartyId, Votes] :
62 '''
63 @return all the votes placed so far
64 '''
65 return dict(self._allvotes)
66
67 def getPowers(self) ->Dict[PartyId, int] :
68 '''
69 @return the powers of all parties
70 '''
71 return dict(self._powers)
72
73 def getMaxAgreements(self)->Dict[Bid, FrozenSet[PartyId]]:
74 '''
75 @return a map containing for each {@link Bid}s a subset of
76 {@link PartyId}s that placed a satisfied vote for that bid, such
77 that the subset has maximum power. Maximum power means that there
78 is no other subset of satisfied votes for that bid with more
79 group power. There may be other subsets of equal power for the
80 bid. If there are no concensus possibilities at all for a bid,
81 the bid is not placed in the map.
82 '''
83 agreements: Dict[Bid, FrozenSet[PartyId]] = {}
84 allbidsmap = self.getAllBids()
85 for bid in allbidsmap.keys():
86 mx = self._getMaxPowerGroup(allbidsmap[bid])
87 if len(mx)>0:
88 agreements[bid]=mx
89 return agreements
90
91 def getAllBids(self) -> Dict[Bid, Set[Vote]]:
92 '''
93 @return all bids that were voted on and the votes for that bid. This
94 basically reverses keys and values in {@link #allvotes}
95 '''
96 if self._allBids == None: #type:ignore
97 self._allBids = self._getAllBids1() #type:ignore
98 return self._allBids #type:ignore
99
100 def Without(self, parties:Set[PartyId] )->"CollectedVotes":
101 '''
102 @param parties the parties to be removed
103 @return new CollectedVotes without given parties
104 '''
105 newvotes = dict(self._allvotes)
106 newpower = dict(self._powers)
107 for party in parties:
108 newvotes.pop(party)
109 newpower.pop(party)
110
111 return CollectedVotes(newvotes, newpower)
112
113 def getTotalPower(self, parties:FrozenSet[PartyId] ) -> int:
114 '''
115 @param parties a list of parties
116 @return total voting power of the parties
117 '''
118 return sum([self._powers[p] for p in parties])
119
120 def _getMaxPowerGroupBreathFirst(self, allvotes:Set[Vote] ) -> FrozenSet[PartyId] :
121 '''
122 Breath-first optimized search, similar as {@link #getMaxPowerGroup(Set)}
123 but more efficient on average (remains to be proven). Notice that this
124 algorithm may be more expensive particularly if there is no consensus at
125 all, because of the added overhead of the breath-first search
126
127 @param allvotes a list of all votes for a particular bid.
128 @return a consensus-group with maximum power, so there is no other
129 consensus group that has more power (but there may be other
130 groups with same power). returns empty set if no consensus found
131 '''
132
133 # CHECK how about scoping. What is 'this' referring to?
134 this=self
135 def comparator(vs1:Set[Vote], vs2:Set[Vote]):
136 parties1 = frozenset([vote.getActor() for vote in vs1])
137 parties2 = frozenset([vote.getActor() for vote in vs2])
138 return sign(this.getTotalPower(parties1)-
139 this.getTotalPower(parties2))
140
141 '''
142 queue that keeps the options for the breath-first search sorted from
143 highest to lowest power of the voters.
144 '''
145 queue = PriorityQueue[Set[Vote]] (comparator)
146 queue.put(allvotes)
147
148 while not queue.empty():
149 # try one with the highest power
150 vs = queue.get()
151 if self._isViable(vs):
152 return self.getParties(vs)
153 if len(vs) > 1:
154 # vote can be splitted. push all new subsets
155 for vote in vs:
156 subvotes = set(vs)
157 subvotes.remove(vote)
158 # avoid duplicate work. Since this has a removed party,
159 # and assuming power of each party >=1,
160 # we can not already have checked subvotes
161 if not subvotes in queue:
162 queue.put(subvotes)
163
164 return frozenset()
165
166 def _getMaxPowerGroup(self, votes:Set[Vote] )-> FrozenSet[PartyId] :
167 '''
168 @param votes a list of all votes for a particular bid. The parties that
169 placed the votes are called 'voters'
170 @return a consensus-group with maximum power, so there is no other
171 consensus group that has more power (but there may be other
172 groups with same power). returns empty set if no consensus found
173 '''
174 maxgroup:FrozenSet[PartyId] = frozenset()
175 maxpower = 0
176
177 for parties in self._getConsensusGroups(votes):
178 power = self.getTotalPower(parties)
179 if power > maxpower:
180 maxgroup = parties
181 maxpower = power
182 return frozenset(maxgroup)
183
184 def _getConsensusGroups(self, votes: Set[Vote] )-> FrozenSet[FrozenSet[PartyId]] :
185 '''
186 @param votes a set of {@link Votes} for one single {@link Bid}. It is
187 assumed all votes are by different parties.
188 @return all subsets sets of parties that create a viable consensus vote.
189 A viable consensus voteset VS is a subset of the votes, such that
190 <ol>
191 <li>|VS| &ge; 2
192 <li>TP = The sum of the powers of the parties in VS
193 <li>forall v in VS: v_minpower &le; TP &le; v_maxpower
194 </ol>
195 Notice that this algorithm works in exponential space
196 (=expensive) and not suited for large numbers of parties. Should
197 work fine for up to 10 parties.
198 '''
199 groups:Set[FrozenSet[PartyId]] = set()
200
201 allVotePermutations = PowerSet[Vote](FixedList[Vote](votes))
202 # check each possible subset.
203 for voteList in allVotePermutations:
204 if self._isViable(voteList):
205 groups.add(self.getParties(voteList))
206 return frozenset(groups)
207
208 def _isViable(self, voteList) -> bool:
209 '''
210 @param voteList a :list_iterator of votes for a single bid.
211 @return true if this votelist satisfies the requirements of all
212 participants and there are &ge; 2 parties
213 '''
214 parties = self.getParties(voteList)
215 totalpower = self.getTotalPower(parties);
216 return len(parties) >= 2 and totalpower >= self._getMinPower(voteList)\
217 and totalpower <= self._getMaxPower(voteList)
218
219 @staticmethod
220 def getParties(voteList) -> FrozenSet[PartyId] :
221 '''
222 @param voteList iterable of Vote, list of all {@link Vote}s
223 @return frozen set with all parties that did a vote
224 '''
225 parties:Set[PartyId] = set()
226 for vote in voteList:
227 parties.add(vote.getActor())
228 return frozenset(parties)
229
230 def _getMinPower(self, votes) ->int:
231 '''
232 @param votes a iterable of Vote, list of {@link Vote}s on one Bid. All parties must be
233 known
234 @return the minimum voting power needed for this group, so that all can
235 accept. This is equal to the max of the vote.getMinPower
236 '''
237 mx = 0
238 for vote in votes:
239 mx = max(mx, vote.getMinPower())
240 return mx
241
242 def _getMaxPower(self, votes) -> int:
243 '''
244 @param votes a iterable Vote list of {@link Vote}s on one Bid. All parties must be
245 known
246 @return the maximum voting power needed for this group, so that all can
247 accept. This is equal to the min of the vote.getMaxPower
248 '''
249 mn = 9999999999999999 #int.maxvalue?
250 for vote in votes:
251 mn = min(mn, vote.getMaxPower())
252 return mn
253
254 def _getAllBids1(self) -> Dict[Bid, Set[Vote]]:
255 bids:Dict[Bid, Set[Vote]] = {}
256 for votes in self._allvotes.values():
257 for vote in votes.getVotes():
258 bid = vote.getBid()
259 if not bid in bids:
260 bids[bid]=set()
261 bids[bid].add(vote)
262 return bids
263
264 def __hash__(self):
265 return hash((toTuple(self._allvotes), toTuple(self._powers)))
266
267 def __eq__(self, other):
268 return isinstance(other, self.__class__) \
269 and self._allvotes == other._allvotes\
270 and self._powers==other._powers
271
272 def __repr__(self)->str:
273 return "CollectedVotes[" + toStr(self._allvotes) \
274 + "," + toStr(self._powers) + "]"
Note: See TracBrowser for help on using the repository browser.