source: CSE3210/agent2/agent2.py

Last change on this file was 74, checked in by wouter, 21 months ago

#6 Added CSE3210 parties

File size: 13.4 KB
Line 
1from decimal import Decimal
2import logging
3import math
4from random import randint
5import time
6from typing import Callable, cast
7from geniusweb.profile.utilityspace.LinearAdditive import LinearAdditive
8
9from geniusweb.profile.utilityspace.UtilitySpace import UtilitySpace
10from tudelft.utilities.immutablelist.ImmutableList import ImmutableList
11from ...time_dependent_agent.extended_util_space import ExtendedUtilSpace
12
13from geniusweb.actions.Accept import Accept
14from geniusweb.actions.Action import Action
15from geniusweb.actions.Offer import Offer
16from geniusweb.bidspace.AllBidsList import AllBidsList
17from geniusweb.inform.ActionDone import ActionDone
18from geniusweb.inform.Finished import Finished
19from geniusweb.inform.Inform import Inform
20from geniusweb.inform.Settings import Settings
21from geniusweb.inform.YourTurn import YourTurn
22from geniusweb.issuevalue.Bid import Bid
23from geniusweb.party.Capabilities import Capabilities
24from geniusweb.party.DefaultParty import DefaultParty
25from geniusweb.profile.Profile import Profile
26from geniusweb.profileconnection.ProfileConnectionFactory import (
27 ProfileConnectionFactory,
28)
29from geniusweb.profileconnection.ProfileInterface import ProfileInterface
30from .group2_frequency_analyzer import FrequencyAnalyzer
31from .group2_plot_trace import plot_characteristics
32from geniusweb.progress.ProgressRounds import ProgressRounds
33from tudelft_utilities_logging.Reporter import Reporter
34
35
36class Agent2(DefaultParty):
37 """
38 Template agent that offers random bids until a bid with sufficient utility is offered.
39 """
40
41 def __init__(self, reporter: Reporter = None):
42 super().__init__(reporter)
43 self.getReporter().log(logging.INFO, "party is initialized")
44 self._profileint: ProfileInterface = None # type:ignore
45 self._last_received_bid: Bid = None # type:ignore
46 self._utilspace: UtilitySpace = None # type:ignore
47 self._extendedspace: ExtendedUtilSpace = None # type:ignore
48
49 self.highest_social_welfare_bid: list[Bid] = []
50
51 # General settings
52 self.opponent_model = FrequencyAnalyzer()
53 self.reservation_utility: float = .0 # not sure if this is a good value to have, since any agreement is better than no agreement...
54 self.concession_speed: float = 11.0 # higher will concede slower (1/e is approximately linear) [0.0, ...]
55 self.attempts: int = 500 # the number of iterations it will go through to look for an 'optimal' bid
56 self.hard_to_get: float = .2 # the moment from which we'll consider playing nice [0.0, 1.0]
57 self.niceness: Decimal = Decimal(.05) # utility we're considering to give up for the sake of being nice [0.0, 1.0]
58
59 # Agent characteristics:
60 # Can be included in plotting, make sure the dimensionality of all of them match up
61 self.lower_utility_bound: list[float] = []
62 self.our_social_welfare: list[float] = []
63 self.their_social_welfare: list[float] = []
64 self.their_social_welfare: list[float] = []
65 self.esitmated_opponent_utility: list[float] = []
66
67 def notifyChange(self, info: Inform):
68 """This is the entry point of all interaction with your agent after is has been initialised.
69
70 Args:
71 info (Inform): Contains either a request for action or information.
72 """
73
74 # a Settings message is the first message that will be send to your
75 # agent containing all the information about the negotiation session.
76 if isinstance(info, Settings):
77 self._settings: Settings = cast(Settings, info)
78 self._me = self._settings.getID()
79
80 # progress towards the deadline has to be tracked manually through the use of the Progress object
81 self._progress = self._settings.getProgress()
82
83 # the profile contains the preferences of the agent over the domain
84 self._profileint = ProfileConnectionFactory.create(
85 info.getProfile().getURI(), self.getReporter()
86 )
87 self.opponent_model.set_domain(self._profileint.getProfile().getDomain())
88
89 reservation_bid = self._profileint.getProfile().getReservationBid()
90
91 if reservation_bid is not None:
92 profile, _ = self._get_profile_and_progress()
93 self.reservation_utility = profile.getUtility(reservation_bid)
94
95 # ActionDone is an action send by an opponent (an offer or an accept)
96 elif isinstance(info, ActionDone):
97 action: Action = cast(ActionDone, info).getAction()
98
99 # if it is an offer, set the last received bid
100 if isinstance(action, Offer):
101 self._last_received_bid = cast(Offer, action).getBid()
102 # YourTurn notifies you that it is your turn to act
103 elif isinstance(info, YourTurn):
104 action = self._my_turn()
105 if isinstance(self._progress, ProgressRounds):
106 self._progress = self._progress.advance()
107 self.getConnection().send(action)
108
109 # Finished will be send if the negotiation has ended (through agreement or deadline)
110 elif isinstance(info, Finished):
111 # terminate the agent MUST BE CALLED
112 self.terminate()
113 else:
114 self.getReporter().log(
115 logging.WARNING, "Ignoring unknown info " + str(info)
116 )
117
118 # lets the geniusweb system know what settings this agent can handle
119 # leave it as it is for this competition
120 def getCapabilities(self) -> Capabilities:
121 return Capabilities(
122 set(["SAOP"]),
123 set(["geniusweb.profile.utilityspace.LinearAdditive"]),
124 )
125
126 # terminates the agent and its connections
127 # leave it as it is for this competition
128 def terminate(self):
129 self.getReporter().log(logging.INFO, "party is terminating:")
130 # self._plot_characteristics()
131 super().terminate()
132 if self._profileint is not None:
133 self._profileint.close()
134
135 # ===================
136 # === AGENT LOGIC ===
137 # ===================
138
139 # give a description of your agent
140 def getDescription(self) -> str:
141 return "Shaken, not stirred"
142
143 # execute a turn
144 def _my_turn(self):
145 self._update_utilspace()
146 self.opponent_model.add_bid(self._last_received_bid)
147
148 next_bid = self._find_bid(self.attempts)
149
150 if self._is_acceptable(self._last_received_bid, next_bid):
151 action = Accept(self._me, self._last_received_bid)
152 else:
153 action = Offer(self._me, next_bid)
154
155 # send the action
156 return action
157
158 # =================
159 # === ACCEPTING ===
160 # =================
161
162 def _is_acceptable(self, bid: Bid, our_next_bid: Bid) -> bool:
163 if bid is None:
164 return False
165
166 profile, _ = self._get_profile_and_progress()
167 bid_utility = profile.getUtility(bid)
168
169 threshold = self._lower_util_bound()
170
171 self.lower_utility_bound.append(threshold)
172 self.our_social_welfare.append(float(self._social_welfare(our_next_bid)))
173 self.their_social_welfare.append(float(self._social_welfare(bid)))
174 self.esitmated_opponent_utility.append(float(self.opponent_model.get_utility(our_next_bid)))
175
176 # has to be higher than the reservation value and our threshold, but if the bid is better than we expect we'll always accept
177 return (bid_utility > self.reservation_utility and bid_utility > threshold) or bid_utility > profile.getUtility(our_next_bid)
178
179 def _lower_util_bound(self) -> float:
180 _, progress = self._get_profile_and_progress()
181
182 threshold = self._exponential_decrease(progress, self.concession_speed)
183
184 return threshold
185
186 """
187 A function which is 1.0 at x=0.0, and 0.0 at x=1.0.
188 k determines how quickly it falls to 0.0; higher k is slower decrease
189 - k > 1/e will first fall slowly, then fast
190 - k < 1/e will first fall fast, then slow
191 - k = 1/e ~ linear
192 Rounding errors make x=1.0 not actually intersect (worse with higher k),
193 intersection with zero can be forced by setting force_intersect
194 """
195 def _exponential_decrease(self, x, k, force_intersect=True):
196 if force_intersect and x > 0.99:
197 return 0.0
198
199 return -math.exp(x**k) + 2 - x**k * (-math.e + 2)
200
201 # ===============
202 # === BIDDING ===
203 # ===============
204
205 def _find_bid(self, attempts) -> Bid:
206 # compose a list of all possible bids
207 _, progress = self._get_profile_and_progress()
208
209 # it the beginning we'll play hard to get
210 # after that we'll consider playing nice
211 # => this makes us indicate our interests and gives us
212 # the opportunity to collect information about our opponent
213 if progress < self.hard_to_get:
214 return self._find_max_bid()
215 else:
216 return self._find_max_nice_bid(attempts)
217
218 """
219 Gets a random bid from the given list of all_bids
220 """
221 def _get_random_bid(self, all_bids: ImmutableList[Bid]):
222 return all_bids.get(randint(0, all_bids.size() - 1))
223
224 """
225 Finds the maximum bid according to a certain proposition
226 """
227 def _find_bid_with(self, proposition: Callable[[Bid, Bid], bool], attempts: int):
228 # compose a list of all possible bids
229 # TODO Make the selection more constrained, the frequency analyzer performs relatively well
230 # but the amount of time it takes to find a good/nice bid can be reduced significantly
231 all_bids = AllBidsList(self._profileint.getProfile().getDomain())
232
233 # TODO Also consider doing this differently
234 maxBid = self._find_lower_bid()
235
236 if maxBid is None:
237 if len(self.highest_social_welfare_bid) == 0:
238 maxBid = self._find_max_bid()
239 else:
240 maxBid = self.highest_social_welfare_bid[-1]
241
242 for _ in range(attempts):
243 bid = self._get_random_bid(all_bids)
244 maxBid = bid if proposition(bid, maxBid) else maxBid
245
246 self.highest_social_welfare_bid.append(maxBid)
247 return maxBid
248
249 """
250 Find a bid according to the current lower bound
251 returns _find_max_bid if no bid in that range can be found
252 """
253 def _find_lower_bid(self):
254 lower_bound_bids = self._extendedspace.getBids(min(Decimal(self._lower_util_bound()), Decimal(1) - self.niceness))
255
256 if lower_bound_bids.size() == 0:
257 return None
258
259 return self._get_random_bid(lower_bound_bids)
260
261 """
262 Find the maximum bids from the domain
263 """
264 def _find_max_bid(self) -> Bid:
265 max_bids = self._extendedspace.getBids(self._extendedspace.getMax())
266 return self._get_random_bid(max_bids)
267
268 """
269 Finds the maximum bid while trying to also accomodate the opponents interests
270 according to _is_better_bid with be_nice set to True
271 """
272 def _find_max_nice_bid(self, attempts) -> Bid:
273 # some cheeky CPL currying
274 return self._find_bid_with((lambda a, b: self._is_better_bid(a, b, self.niceness, be_nice=True) and self._is_acceptable(a, b)), attempts)
275
276 """
277 Checks if bid a is better than bid b.
278 If be_nice is True, will also consider the opponents utility according to opponent_model and
279 is willing to sacrifice a niceness amount of utility when comparing in order to create a win-win
280 """
281 def _is_better_bid(self, a: Bid, b: Bid, niceness: Decimal, be_nice=False) -> bool:
282 profile, _ = self._get_profile_and_progress()
283
284 if not be_nice:
285 return profile.getUtility(a) >= profile.getUtility(b)
286 else:
287 # TODO look into niceness possibly accumulating over multiple self.attempts
288 # TODO Social welfare metric?
289 return profile.getUtility(a) >= profile.getUtility(b) - - niceness \
290 and self.opponent_model.get_utility(a) >= self.opponent_model.get_utility(b)
291
292 def _social_welfare(self, bid: Bid) -> Decimal:
293 profile, _ = self._get_profile_and_progress()
294
295 return (profile.getUtility(bid) + Decimal(self.opponent_model.get_utility(bid)))/Decimal(2.0)
296
297 # ==============
298 # === UTILS ====
299 # ==============
300
301 def _get_profile_and_progress(self) -> tuple[LinearAdditive, float]:
302 profile: Profile = self._profileint.getProfile()
303 progress: float = self._progress.get(time.time() * 1000)
304
305 return cast(LinearAdditive, profile), progress
306
307 def _update_utilspace(self) -> None: # throws IOException
308 newutilspace = self._profileint.getProfile()
309 if not newutilspace == self._utilspace:
310 self._utilspace = cast(LinearAdditive, newutilspace)
311 self._extendedspace = ExtendedUtilSpace(self._utilspace)
312
313 # ===================
314 # === DEBUG TOOLS ===
315 # ===================
316
317 def _print_utility(self, bid: Bid) -> None:
318 profile, _ = self._get_profile_and_progress()
319 print("Bid:", bid, "with utility:", profile.getUtility(bid))
320
321 def _plot_characteristics(self) -> None:
322 characteristics = {
323 "lowest acceptable utility": self._plot_space(self.lower_utility_bound, "gray"),
324 "social welfare (ours, estimation)": self._plot_space(self.our_social_welfare, "green"),
325 "social welfare (theirs, estimation)": self._plot_space(self.their_social_welfare, "blue"),
326 "opponent utility (estimation)": self._plot_space(self.esitmated_opponent_utility, "red")
327 }
328 plot_characteristics(characteristics, len(self.lower_utility_bound))
329
330 def _plot_space(self, arr: list, color: str) -> tuple[list, list, str]:
331 return (list(range(len(arr))), arr, color)
Note: See TracBrowser for help on using the repository browser.