-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnormalization.py
More file actions
121 lines (94 loc) · 3.57 KB
/
normalization.py
File metadata and controls
121 lines (94 loc) · 3.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import numpy as np
class RunningMeanStd:
"""
Calculate running mean and standard deviation for state normalization
Incrementally updates mean and std as new data points are added
"""
def __init__(self, shape):
"""
Initialize running statistics
Args:
shape: The dimension/shape of input data to normalize
"""
self.n = 0 # Count of samples
self.mean = np.zeros(shape) # Mean of samples
self.S = np.zeros(shape) # Sum of squared deviations
self.std = np.sqrt(self.S) # Standard deviation
def update(self, x):
"""
Update statistics with a new data point
Uses Welford's online algorithm for numerical stability
Args:
x: New data point(s)
"""
x = np.array(x)
self.n += 1
if self.n == 1:
# First sample, just set mean and std directly
self.mean = x
self.std = x
else:
# Update mean and variance using numerically stable method
old_mean = self.mean.copy()
self.mean = old_mean + (x - old_mean) / self.n
self.S = self.S + (x - old_mean) * (x - self.mean)
self.std = np.sqrt(self.S / self.n)
class Normalization:
"""
Normalize states for improved RL training stability
Standardizes inputs to zero mean and unit variance
"""
def __init__(self, shape):
"""
Initialize normalizer with running statistics tracker
Args:
shape: The dimension/shape of input data to normalize
"""
self.running_ms = RunningMeanStd(shape=shape)
def __call__(self, x, update=True):
"""
Normalize input data
Args:
x: Input data to normalize
update: Whether to update running statistics (True for training, False for evaluation)
Returns:
Normalized data with zero mean and unit variance
"""
if update:
self.running_ms.update(x)
# Apply normalization: (x - mean) / std
x = (x - self.running_ms.mean) / (self.running_ms.std + 1e-8) # Add small constant to avoid division by zero
return x
class RewardScaling:
"""
Scale rewards for improved RL training stability
Uses a discounted return approach for normalization
"""
def __init__(self, shape, gamma):
"""
Initialize reward scaler
Args:
shape: The dimension/shape of rewards (typically 1)
gamma: Discount factor for returns
"""
self.shape = shape
self.gamma = gamma # Discount factor
self.running_ms = RunningMeanStd(shape=self.shape)
self.R = np.zeros(self.shape) # Running discounted return
def __call__(self, x):
"""
Scale the input reward
Args:
x: Reward value to scale
Returns:
Normalized reward based on running statistics
"""
# Update running discounted return
self.R = self.gamma * self.R + x
self.running_ms.update(self.R)
# Scale rewards by standard deviation only (preserve sign)
x = x / (self.running_ms.std + 1e-8)
return x
def reset(self):
"""Reset the running discounted return when an episode ends"""
self.R = np.zeros(self.shape)