source: ANL2022/super_agent/utils/persistent_data.py

Last change on this file was 75, checked in by wouter, 21 months ago

#6 added ANAC2022 parties

File size: 6.0 KB
Line 
1from abc import ABC
2from collections import defaultdict
3from typing import List
4from .negotiation_data import NegotiationData
5import math
6
7
8class PersistentData(ABC):
9 def __init__(self):
10 self._t_split: int = 40
11 self._t_phase: float = 0.2
12 self._new_weight: float = 0.3
13 self._smooth_width: int = 3
14 self._opponent_decrease: float = 0.65
15 self._default_alpha: float = 10.7
16
17 self._avg_utility: float = 0.0
18 self._negotiations: int = 0
19 # dictionary ["string"] -> float
20 self._avg_max_utility_opponent = defaultdict()
21 # dictionary ["string"] -> int
22 self._opponent_encounters = defaultdict()
23
24 self._std_utility: float = 0.0
25 self._nego_results: List[float] = []
26
27 self._avg_opponent_utility = defaultdict()
28 self._opponent_alpha = defaultdict()
29 self._opponent_utility_by_time = defaultdict()
30
31 def update(self, negotiation_data: NegotiationData):
32 new_util = negotiation_data.get_agreement_util() if negotiation_data.get_agreement_util() > 0 else (
33 self._avg_utility - 1.1 * math.pow(self._std_utility, 2))
34 self._avg_utility = (self._avg_utility * self._negotiations + new_util) / (self._negotiations + 1)
35
36 self._negotiations += 1
37
38 self._nego_results.append(negotiation_data.get_agreement_util())
39 self._std_utility = 0.0
40
41 for util in self._nego_results:
42 self._std_utility += math.pow(util - self._avg_utility, 2)
43 self._std_utility = math.sqrt(self._std_utility / self._negotiations)
44
45 opponent = negotiation_data.get_opponent_name()
46
47 if opponent is not None:
48 encounters = self._opponent_encounters.get(opponent) if opponent in self._opponent_encounters else 0
49 self._opponent_encounters[opponent] = encounters + 1
50
51 self._avg_utility = self._avg_max_utility_opponent.get(
52 opponent) if opponent in self._avg_max_utility_opponent else 0.0
53
54 self._avg_max_utility_opponent[opponent] = (
55 (self._avg_utility * encounters + negotiation_data.get_max_received_util()) / (encounters + 1))
56
57 avg_op_util = self._avg_opponent_utility.get(opponent) if opponent in self._avg_opponent_utility else 0.0
58 self._avg_opponent_utility[opponent] = (avg_op_util * encounters + negotiation_data.get_opponent_util()) / (
59 encounters + 1)
60
61 opponent_time_util: List[float] = []
62 if opponent in self._opponent_utility_by_time:
63 opponent_time_util = self._opponent_utility_by_time.get(opponent)
64 else:
65 opponent_time_util = [0.0] * self._t_split
66
67 new_util_data: List[float] = negotiation_data.get_opponent_util_by_time()
68
69 ratio = ((1 - self._new_weight) * opponent_time_util[0] + self._new_weight * new_util_data[0]) / \
70 opponent_time_util[0] if opponent_time_util[0] > 0.0 else 1
71
72 for i in range(self._t_split):
73 if new_util_data[i] > 0:
74 opponent_time_util[i] = (
75 (1 - self._new_weight) * opponent_time_util[i] + self._new_weight * new_util_data[i])
76 else:
77 opponent_time_util[i] *= ratio
78
79 self._opponent_utility_by_time[opponent] = opponent_time_util
80 self._opponent_alpha[opponent] = self._calc_alpha(opponent)
81
82 def _known_opponent(self, opponent: str):
83 return opponent in self._opponent_encounters
84
85 def get_opponent_encounters(self, opponent):
86 return self._opponent_encounters[opponent] if opponent in self._opponent_encounters else None
87
88 def get_smooth_threshold_over_time(self, opponent: str):
89 if not self._known_opponent(opponent):
90 return None
91
92 opponent_time_util = self._opponent_utility_by_time.get(opponent)
93 smoothed_time_util: List[float] = [0.0] * self._t_split
94 # for i in range(self._t_split):
95 # smoothed_time_util[i] = 0.0
96
97 for i in range(self._t_split):
98 for j in range(max(0, i - self._smooth_width), min(i + self._smooth_width + 1, self._t_split)):
99 smoothed_time_util[i] += opponent_time_util[j]
100 smoothed_time_util[i] /= (min(i + self._smooth_width + 1, self._t_split) - max(i - self._smooth_width, 0))
101
102 return smoothed_time_util
103
104 def _calc_alpha(self, opponent: str):
105 alpha_arr = self.get_smooth_threshold_over_time(opponent)
106 if alpha_arr is None:
107 return self._default_alpha
108 max_idx = 0
109 t = 0
110 for max_idx in range(self._t_split):
111 if alpha_arr[max_idx] < 0.2:
112 break
113
114 max_val = alpha_arr[0]
115 min_val = alpha_arr[max(max_idx - self._smooth_width - 1, 0)]
116 if max_val - min_val < 0.1:
117 return self._default_alpha
118
119 for t in range(max_idx):
120 if alpha_arr[t] <= (max_val - self._opponent_decrease * (max_val - min_val)):
121 break
122
123 calibrated_polynom = [572.83, -1186.7, 899.29, -284.68, 32.911]
124 alpha = calibrated_polynom[0]
125 t_time = self._t_phase + (1 - self._t_phase) * (
126 max_idx * (t / self._t_split) + (self._t_split - max_idx) * 0.85) / self._t_split
127 for i in range(1, len(calibrated_polynom)):
128 alpha = alpha * t_time + calibrated_polynom[i]
129 print("alpha={0}".format(alpha))
130 return alpha
131
132 def get_avg_max_utility(self, opponent: str):
133 if opponent in self._avg_max_utility_opponent:
134 return self._avg_max_utility_opponent[opponent]
135 return None
136
137 def get_opponent_utility(self, opponent):
138 return self._avg_opponent_utility.get(opponent) if self._known_opponent(opponent) else 0.0
139
140 def get_opponent_alpha(self, opponent):
141 return self._opponent_alpha.get(opponent) if self._known_opponent(opponent) else 0.0
142
143 def get_std_utility(self):
144 return self._std_utility
145
146 def get_avg_utility(self):
147 return self._avg_utility
Note: See TracBrowser for help on using the repository browser.