1 | from abc import ABC
|
---|
2 | from collections import defaultdict
|
---|
3 | from typing import List
|
---|
4 | from .negotiation_data import NegotiationData
|
---|
5 | import math
|
---|
6 |
|
---|
7 |
|
---|
8 | class PersistentData(ABC):
|
---|
9 | def __init__(self):
|
---|
10 | self._t_split: int = 40
|
---|
11 | self._t_phase: float = 0.2
|
---|
12 | self._new_weight: float = 0.3
|
---|
13 | self._smooth_width: int = 3
|
---|
14 | self._opponent_decrease: float = 0.65
|
---|
15 | self._default_alpha: float = 10.7
|
---|
16 |
|
---|
17 | self._avg_utility: float = 0.0
|
---|
18 | self._negotiations: int = 0
|
---|
19 | # dictionary ["string"] -> float
|
---|
20 | self._avg_max_utility_opponent = defaultdict()
|
---|
21 | # dictionary ["string"] -> int
|
---|
22 | self._opponent_encounters = defaultdict()
|
---|
23 |
|
---|
24 | self._std_utility: float = 0.0
|
---|
25 | self._nego_results: List[float] = []
|
---|
26 |
|
---|
27 | self._avg_opponent_utility = defaultdict()
|
---|
28 | self._opponent_alpha = defaultdict()
|
---|
29 | self._opponent_utility_by_time = defaultdict()
|
---|
30 |
|
---|
31 | def update(self, negotiation_data: NegotiationData):
|
---|
32 | new_util = negotiation_data.get_agreement_util() if negotiation_data.get_agreement_util() > 0 else (
|
---|
33 | self._avg_utility - 1.1 * math.pow(self._std_utility, 2))
|
---|
34 | self._avg_utility = (self._avg_utility * self._negotiations + new_util) / (self._negotiations + 1)
|
---|
35 |
|
---|
36 | self._negotiations += 1
|
---|
37 |
|
---|
38 | self._nego_results.append(negotiation_data.get_agreement_util())
|
---|
39 | self._std_utility = 0.0
|
---|
40 |
|
---|
41 | for util in self._nego_results:
|
---|
42 | self._std_utility += math.pow(util - self._avg_utility, 2)
|
---|
43 | self._std_utility = math.sqrt(self._std_utility / self._negotiations)
|
---|
44 |
|
---|
45 | opponent = negotiation_data.get_opponent_name()
|
---|
46 |
|
---|
47 | if opponent is not None:
|
---|
48 | encounters = self._opponent_encounters.get(opponent) if opponent in self._opponent_encounters else 0
|
---|
49 | self._opponent_encounters[opponent] = encounters + 1
|
---|
50 |
|
---|
51 | self._avg_utility = self._avg_max_utility_opponent.get(
|
---|
52 | opponent) if opponent in self._avg_max_utility_opponent else 0.0
|
---|
53 |
|
---|
54 | self._avg_max_utility_opponent[opponent] = (
|
---|
55 | (self._avg_utility * encounters + negotiation_data.get_max_received_util()) / (encounters + 1))
|
---|
56 |
|
---|
57 | avg_op_util = self._avg_opponent_utility.get(opponent) if opponent in self._avg_opponent_utility else 0.0
|
---|
58 | self._avg_opponent_utility[opponent] = (avg_op_util * encounters + negotiation_data.get_opponent_util()) / (
|
---|
59 | encounters + 1)
|
---|
60 |
|
---|
61 | opponent_time_util: List[float] = []
|
---|
62 | if opponent in self._opponent_utility_by_time:
|
---|
63 | opponent_time_util = self._opponent_utility_by_time.get(opponent)
|
---|
64 | else:
|
---|
65 | opponent_time_util = [0.0] * self._t_split
|
---|
66 |
|
---|
67 | new_util_data: List[float] = negotiation_data.get_opponent_util_by_time()
|
---|
68 |
|
---|
69 | ratio = ((1 - self._new_weight) * opponent_time_util[0] + self._new_weight * new_util_data[0]) / \
|
---|
70 | opponent_time_util[0] if opponent_time_util[0] > 0.0 else 1
|
---|
71 |
|
---|
72 | for i in range(self._t_split):
|
---|
73 | if new_util_data[i] > 0:
|
---|
74 | opponent_time_util[i] = (
|
---|
75 | (1 - self._new_weight) * opponent_time_util[i] + self._new_weight * new_util_data[i])
|
---|
76 | else:
|
---|
77 | opponent_time_util[i] *= ratio
|
---|
78 |
|
---|
79 | self._opponent_utility_by_time[opponent] = opponent_time_util
|
---|
80 | self._opponent_alpha[opponent] = self._calc_alpha(opponent)
|
---|
81 |
|
---|
82 | def _known_opponent(self, opponent: str):
|
---|
83 | return opponent in self._opponent_encounters
|
---|
84 |
|
---|
85 | def get_opponent_encounters(self, opponent):
|
---|
86 | return self._opponent_encounters[opponent] if opponent in self._opponent_encounters else None
|
---|
87 |
|
---|
88 | def get_smooth_threshold_over_time(self, opponent: str):
|
---|
89 | if not self._known_opponent(opponent):
|
---|
90 | return None
|
---|
91 |
|
---|
92 | opponent_time_util = self._opponent_utility_by_time.get(opponent)
|
---|
93 | smoothed_time_util: List[float] = [0.0] * self._t_split
|
---|
94 | # for i in range(self._t_split):
|
---|
95 | # smoothed_time_util[i] = 0.0
|
---|
96 |
|
---|
97 | for i in range(self._t_split):
|
---|
98 | for j in range(max(0, i - self._smooth_width), min(i + self._smooth_width + 1, self._t_split)):
|
---|
99 | smoothed_time_util[i] += opponent_time_util[j]
|
---|
100 | smoothed_time_util[i] /= (min(i + self._smooth_width + 1, self._t_split) - max(i - self._smooth_width, 0))
|
---|
101 |
|
---|
102 | return smoothed_time_util
|
---|
103 |
|
---|
104 | def _calc_alpha(self, opponent: str):
|
---|
105 | alpha_arr = self.get_smooth_threshold_over_time(opponent)
|
---|
106 | if alpha_arr is None:
|
---|
107 | return self._default_alpha
|
---|
108 | max_idx = 0
|
---|
109 | t = 0
|
---|
110 | for max_idx in range(self._t_split):
|
---|
111 | if alpha_arr[max_idx] < 0.2:
|
---|
112 | break
|
---|
113 |
|
---|
114 | max_val = alpha_arr[0]
|
---|
115 | min_val = alpha_arr[max(max_idx - self._smooth_width - 1, 0)]
|
---|
116 | if max_val - min_val < 0.1:
|
---|
117 | return self._default_alpha
|
---|
118 |
|
---|
119 | for t in range(max_idx):
|
---|
120 | if alpha_arr[t] <= (max_val - self._opponent_decrease * (max_val - min_val)):
|
---|
121 | break
|
---|
122 |
|
---|
123 | calibrated_polynom = [572.83, -1186.7, 899.29, -284.68, 32.911]
|
---|
124 | alpha = calibrated_polynom[0]
|
---|
125 | t_time = self._t_phase + (1 - self._t_phase) * (
|
---|
126 | max_idx * (t / self._t_split) + (self._t_split - max_idx) * 0.85) / self._t_split
|
---|
127 | for i in range(1, len(calibrated_polynom)):
|
---|
128 | alpha = alpha * t_time + calibrated_polynom[i]
|
---|
129 | print("alpha={0}".format(alpha))
|
---|
130 | return alpha
|
---|
131 |
|
---|
132 | def get_avg_max_utility(self, opponent: str):
|
---|
133 | if opponent in self._avg_max_utility_opponent:
|
---|
134 | return self._avg_max_utility_opponent[opponent]
|
---|
135 | return None
|
---|
136 |
|
---|
137 | def get_opponent_utility(self, opponent):
|
---|
138 | return self._avg_opponent_utility.get(opponent) if self._known_opponent(opponent) else 0.0
|
---|
139 |
|
---|
140 | def get_opponent_alpha(self, opponent):
|
---|
141 | return self._opponent_alpha.get(opponent) if self._known_opponent(opponent) else 0.0
|
---|
142 |
|
---|
143 | def get_std_utility(self):
|
---|
144 | return self._std_utility
|
---|
145 |
|
---|
146 | def get_avg_utility(self):
|
---|
147 | return self._avg_utility
|
---|