1 | from typing import Dict, Set, FrozenSet
|
---|
2 |
|
---|
3 | from tudelft.utilities.immutablelist.FixedList import FixedList
|
---|
4 | from tudelft.utilities.immutablelist.PowerSet import PowerSet
|
---|
5 |
|
---|
6 | from geniusweb.PriorityQueue import PriorityQueue
|
---|
7 | from geniusweb.actions.PartyId import PartyId
|
---|
8 | from geniusweb.actions.Vote import Vote
|
---|
9 | from geniusweb.actions.Votes import Votes
|
---|
10 | from geniusweb.issuevalue.Bid import Bid
|
---|
11 | from geniusweb.utils import toTuple, toStr
|
---|
12 |
|
---|
13 |
|
---|
14 | # who removed sign function from python?
|
---|
15 | sign = lambda a: (a>0) - (a<0)
|
---|
16 |
|
---|
17 | class CollectedVotes:
|
---|
18 | '''
|
---|
19 | Utility class. Typically contains all the votes collected from 1 round. Can
|
---|
20 | find the best combinations of votes, eg to get the maximum consensus group
|
---|
21 | '''
|
---|
22 |
|
---|
23 |
|
---|
24 | def __init__(self, votesMap:Dict[PartyId, Votes] , powers:Dict[PartyId, int] ) :
|
---|
25 | ''''
|
---|
26 | @param votesMap the {@link Votes} done for each party
|
---|
27 | @param powers the power of each party. The power is an integer number.
|
---|
28 | In the voting process, the party powers add up to form
|
---|
29 | the total voting power. This set may contain parties that
|
---|
30 | are not in newmap.
|
---|
31 | '''
|
---|
32 | self._allvotes = dict(votesMap)
|
---|
33 | self._powers = dict(powers)
|
---|
34 | self._allBids = None
|
---|
35 |
|
---|
36 | if not set(votesMap.keys()).issubset(set(powers.keys())):
|
---|
37 | raise ValueError("All voting parties must have a power")
|
---|
38 |
|
---|
39 | def With(self, votes:Votes , power:int)->"CollectedVotes":
|
---|
40 | '''
|
---|
41 | @param votes additional set of votes for a new party that did not yet
|
---|
42 | vote.
|
---|
43 | @param power powers for the new party, can be null if power already set
|
---|
44 | previously.
|
---|
45 | @return new CollectedVotes with the new votes added.
|
---|
46 | '''
|
---|
47 | actor = votes.getActor()
|
---|
48 | if actor in self._allvotes:
|
---|
49 | raise ValueError("Party " + str(actor) + " already voted")
|
---|
50 |
|
---|
51 | newpowers:Dict[PartyId, int] = dict(self._powers)
|
---|
52 | if not actor in self._powers:
|
---|
53 | if power == None:
|
---|
54 | raise ValueError("Power must be set for new actor " + str(actor))
|
---|
55 | newpowers[actor]= power
|
---|
56 |
|
---|
57 | newmap = dict(self._allvotes)
|
---|
58 | newmap[actor]= votes
|
---|
59 | return CollectedVotes(newmap, newpowers)
|
---|
60 |
|
---|
61 | def getVotes(self)->Dict[PartyId, Votes] :
|
---|
62 | '''
|
---|
63 | @return all the votes placed so far
|
---|
64 | '''
|
---|
65 | return dict(self._allvotes)
|
---|
66 |
|
---|
67 | def getPowers(self) ->Dict[PartyId, int] :
|
---|
68 | '''
|
---|
69 | @return the powers of all parties
|
---|
70 | '''
|
---|
71 | return dict(self._powers)
|
---|
72 |
|
---|
73 | def getMaxAgreements(self)->Dict[Bid, FrozenSet[PartyId]]:
|
---|
74 | '''
|
---|
75 | @return a map containing for each {@link Bid}s a subset of
|
---|
76 | {@link PartyId}s that placed a satisfied vote for that bid, such
|
---|
77 | that the subset has maximum power. Maximum power means that there
|
---|
78 | is no other subset of satisfied votes for that bid with more
|
---|
79 | group power. There may be other subsets of equal power for the
|
---|
80 | bid. If there are no concensus possibilities at all for a bid,
|
---|
81 | the bid is not placed in the map.
|
---|
82 | '''
|
---|
83 | agreements: Dict[Bid, FrozenSet[PartyId]] = {}
|
---|
84 | allbidsmap = self.getAllBids()
|
---|
85 | for bid in allbidsmap.keys():
|
---|
86 | mx = self._getMaxPowerGroup(allbidsmap[bid])
|
---|
87 | if len(mx)>0:
|
---|
88 | agreements[bid]=mx
|
---|
89 | return agreements
|
---|
90 |
|
---|
91 | def getAllBids(self) -> Dict[Bid, Set[Vote]]:
|
---|
92 | '''
|
---|
93 | @return all bids that were voted on and the votes for that bid. This
|
---|
94 | basically reverses keys and values in {@link #allvotes}
|
---|
95 | '''
|
---|
96 | if self._allBids == None: #type:ignore
|
---|
97 | self._allBids = self._getAllBids1() #type:ignore
|
---|
98 | return self._allBids #type:ignore
|
---|
99 |
|
---|
100 | def Without(self, parties:Set[PartyId] )->"CollectedVotes":
|
---|
101 | '''
|
---|
102 | @param parties the parties to be removed
|
---|
103 | @return new CollectedVotes without given parties
|
---|
104 | '''
|
---|
105 | newvotes = dict(self._allvotes)
|
---|
106 | newpower = dict(self._powers)
|
---|
107 | for party in parties:
|
---|
108 | newvotes.pop(party)
|
---|
109 | newpower.pop(party)
|
---|
110 |
|
---|
111 | return CollectedVotes(newvotes, newpower)
|
---|
112 |
|
---|
113 | def getTotalPower(self, parties:FrozenSet[PartyId] ) -> int:
|
---|
114 | '''
|
---|
115 | @param parties a list of parties
|
---|
116 | @return total voting power of the parties
|
---|
117 | '''
|
---|
118 | return sum([self._powers[p] for p in parties])
|
---|
119 |
|
---|
120 | def _getMaxPowerGroupBreathFirst(self, allvotes:Set[Vote] ) -> FrozenSet[PartyId] :
|
---|
121 | '''
|
---|
122 | Breath-first optimized search, similar as {@link #getMaxPowerGroup(Set)}
|
---|
123 | but more efficient on average (remains to be proven). Notice that this
|
---|
124 | algorithm may be more expensive particularly if there is no consensus at
|
---|
125 | all, because of the added overhead of the breath-first search
|
---|
126 |
|
---|
127 | @param allvotes a list of all votes for a particular bid.
|
---|
128 | @return a consensus-group with maximum power, so there is no other
|
---|
129 | consensus group that has more power (but there may be other
|
---|
130 | groups with same power). returns empty set if no consensus found
|
---|
131 | '''
|
---|
132 |
|
---|
133 | # CHECK how about scoping. What is 'this' referring to?
|
---|
134 | this=self
|
---|
135 | def comparator(vs1:Set[Vote], vs2:Set[Vote]):
|
---|
136 | parties1 = frozenset([vote.getActor() for vote in vs1])
|
---|
137 | parties2 = frozenset([vote.getActor() for vote in vs2])
|
---|
138 | return sign(this.getTotalPower(parties1)-
|
---|
139 | this.getTotalPower(parties2))
|
---|
140 |
|
---|
141 | '''
|
---|
142 | queue that keeps the options for the breath-first search sorted from
|
---|
143 | highest to lowest power of the voters.
|
---|
144 | '''
|
---|
145 | queue = PriorityQueue[Set[Vote]] (comparator)
|
---|
146 | queue.put(allvotes)
|
---|
147 |
|
---|
148 | while not queue.empty():
|
---|
149 | # try one with the highest power
|
---|
150 | vs = queue.get()
|
---|
151 | if self._isViable(vs):
|
---|
152 | return self.getParties(vs)
|
---|
153 | if len(vs) > 1:
|
---|
154 | # vote can be splitted. push all new subsets
|
---|
155 | for vote in vs:
|
---|
156 | subvotes = set(vs)
|
---|
157 | subvotes.remove(vote)
|
---|
158 | # avoid duplicate work. Since this has a removed party,
|
---|
159 | # and assuming power of each party >=1,
|
---|
160 | # we can not already have checked subvotes
|
---|
161 | if not subvotes in queue:
|
---|
162 | queue.put(subvotes)
|
---|
163 |
|
---|
164 | return frozenset()
|
---|
165 |
|
---|
166 | def _getMaxPowerGroup(self, votes:Set[Vote] )-> FrozenSet[PartyId] :
|
---|
167 | '''
|
---|
168 | @param votes a list of all votes for a particular bid. The parties that
|
---|
169 | placed the votes are called 'voters'
|
---|
170 | @return a consensus-group with maximum power, so there is no other
|
---|
171 | consensus group that has more power (but there may be other
|
---|
172 | groups with same power). returns empty set if no consensus found
|
---|
173 | '''
|
---|
174 | maxgroup:FrozenSet[PartyId] = frozenset()
|
---|
175 | maxpower = 0
|
---|
176 |
|
---|
177 | for parties in self._getConsensusGroups(votes):
|
---|
178 | power = self.getTotalPower(parties)
|
---|
179 | if power > maxpower:
|
---|
180 | maxgroup = parties
|
---|
181 | maxpower = power
|
---|
182 | return frozenset(maxgroup)
|
---|
183 |
|
---|
184 | def _getConsensusGroups(self, votes: Set[Vote] )-> FrozenSet[FrozenSet[PartyId]] :
|
---|
185 | '''
|
---|
186 | @param votes a set of {@link Votes} for one single {@link Bid}. It is
|
---|
187 | assumed all votes are by different parties.
|
---|
188 | @return all subsets sets of parties that create a viable consensus vote.
|
---|
189 | A viable consensus voteset VS is a subset of the votes, such that
|
---|
190 | <ol>
|
---|
191 | <li>|VS| ≥ 2
|
---|
192 | <li>TP = The sum of the powers of the parties in VS
|
---|
193 | <li>forall v in VS: v_minpower ≤ TP ≤ v_maxpower
|
---|
194 | </ol>
|
---|
195 | Notice that this algorithm works in exponential space
|
---|
196 | (=expensive) and not suited for large numbers of parties. Should
|
---|
197 | work fine for up to 10 parties.
|
---|
198 | '''
|
---|
199 | groups:Set[FrozenSet[PartyId]] = set()
|
---|
200 |
|
---|
201 | allVotePermutations = PowerSet[Vote](FixedList[Vote](votes))
|
---|
202 | # check each possible subset.
|
---|
203 | for voteList in allVotePermutations:
|
---|
204 | if self._isViable(voteList):
|
---|
205 | groups.add(self.getParties(voteList))
|
---|
206 | return frozenset(groups)
|
---|
207 |
|
---|
208 | def _isViable(self, voteList) -> bool:
|
---|
209 | '''
|
---|
210 | @param voteList a :list_iterator of votes for a single bid.
|
---|
211 | @return true if this votelist satisfies the requirements of all
|
---|
212 | participants and there are ≥ 2 parties
|
---|
213 | '''
|
---|
214 | parties = self.getParties(voteList)
|
---|
215 | totalpower = self.getTotalPower(parties);
|
---|
216 | return len(parties) >= 2 and totalpower >= self._getMinPower(voteList)\
|
---|
217 | and totalpower <= self._getMaxPower(voteList)
|
---|
218 |
|
---|
219 | @staticmethod
|
---|
220 | def getParties(voteList) -> FrozenSet[PartyId] :
|
---|
221 | '''
|
---|
222 | @param voteList iterable of Vote, list of all {@link Vote}s
|
---|
223 | @return frozen set with all parties that did a vote
|
---|
224 | '''
|
---|
225 | parties:Set[PartyId] = set()
|
---|
226 | for vote in voteList:
|
---|
227 | parties.add(vote.getActor())
|
---|
228 | return frozenset(parties)
|
---|
229 |
|
---|
230 | def _getMinPower(self, votes) ->int:
|
---|
231 | '''
|
---|
232 | @param votes a iterable of Vote, list of {@link Vote}s on one Bid. All parties must be
|
---|
233 | known
|
---|
234 | @return the minimum voting power needed for this group, so that all can
|
---|
235 | accept. This is equal to the max of the vote.getMinPower
|
---|
236 | '''
|
---|
237 | mx = 0
|
---|
238 | for vote in votes:
|
---|
239 | mx = max(mx, vote.getMinPower())
|
---|
240 | return mx
|
---|
241 |
|
---|
242 | def _getMaxPower(self, votes) -> int:
|
---|
243 | '''
|
---|
244 | @param votes a iterable Vote list of {@link Vote}s on one Bid. All parties must be
|
---|
245 | known
|
---|
246 | @return the maximum voting power needed for this group, so that all can
|
---|
247 | accept. This is equal to the min of the vote.getMaxPower
|
---|
248 | '''
|
---|
249 | mn = 9999999999999999 #int.maxvalue?
|
---|
250 | for vote in votes:
|
---|
251 | mn = min(mn, vote.getMaxPower())
|
---|
252 | return mn
|
---|
253 |
|
---|
254 | def _getAllBids1(self) -> Dict[Bid, Set[Vote]]:
|
---|
255 | bids:Dict[Bid, Set[Vote]] = {}
|
---|
256 | for votes in self._allvotes.values():
|
---|
257 | for vote in votes.getVotes():
|
---|
258 | bid = vote.getBid()
|
---|
259 | if not bid in bids:
|
---|
260 | bids[bid]=set()
|
---|
261 | bids[bid].add(vote)
|
---|
262 | return bids
|
---|
263 |
|
---|
264 | def __hash__(self):
|
---|
265 | return hash((toTuple(self._allvotes), toTuple(self._powers)))
|
---|
266 |
|
---|
267 | def __eq__(self, other):
|
---|
268 | return isinstance(other, self.__class__) \
|
---|
269 | and self._allvotes == other._allvotes\
|
---|
270 | and self._powers==other._powers
|
---|
271 |
|
---|
272 | def __repr__(self)->str:
|
---|
273 | return "CollectedVotes[" + toStr(self._allvotes) \
|
---|
274 | + "," + toStr(self._powers) + "]"
|
---|