-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathRegressionTree.py
More file actions
183 lines (148 loc) · 8.54 KB
/
RegressionTree.py
File metadata and controls
183 lines (148 loc) · 8.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
import numpy as np
class TreeNode:
"""Node của cây: internal node (split) hoặc leaf node (predict)"""
def __init__(self, feature_index=None, threshold=None, left=None, right=None, value=None):
"""
Args:
feature_index: Feature nào dùng để split
threshold: Ngưỡng split (x[feature] <= threshold → left, else → right)
left: Cây con trái
right: Cây con phải
value: Giá trị dự đoán (chỉ có ở leaf node)
"""
self.feature_index = feature_index
self.threshold = threshold
self.left = left
self.right = right
self.value = value
class MyDecisionTreeRegressor:
"""Decision Tree: chia đệ quy để giảm phương sai"""
def __init__(self, max_depth=100, min_samples_split=2, min_samples_leaf=1, max_features=None):
"""
Args:
max_depth: Giới hạn độ sâu cây
min_samples_split: Số samples tối thiểu để chia
min_samples_leaf: Số samples tối thiểu ở lá
max_features: Số features tối đa để xem xét khi split
None: dùng tất cả features
int: dùng max_features features
float: dùng max_features * n_features features
'sqrt': dùng sqrt(n_features) features
'log2': dùng log2(n_features) features
"""
self.max_depth = max_depth
self.min_samples_split = min_samples_split
self.min_samples_leaf = min_samples_leaf
self.max_features = max_features
self.tree = None
def fit(self, X, y):
"""Build cây bằng recursive splitting"""
X = np.array(X) if not isinstance(X, np.ndarray) else X # Chuyển về numpy array
y = np.array(y).flatten() if not isinstance(y, np.ndarray) else y.flatten() # Chuyển y từ dạng ma trận sang vector
self.tree = self._build_tree(X, y) # Xây dựng cây từ root
def _build_tree(self, X, y, depth=0):
"""Xây dựng cây xử dụng đệ quy"""
num_samples, num_features = X.shape # Lấy số samples và features
# Điều kiện dừng: không đủ samples hoặc đạt max_depth
if num_samples < self.min_samples_split or depth >= self.max_depth:
return self._calculate_leaf_value(y) # Tạo leaf node
best_split = self._get_best_split(X, y, num_features) # Tìm split tốt nhất
# Không tìm được split hợp lệ → tạo leaf
if not best_split or best_split.get("variance_reduction", 0) <= 0:
return self._calculate_leaf_value(y)
# Chia data thành 2 nhánh
left_indices = X[:, best_split["feature_index"]] <= best_split["threshold"] # Samples đi left
right_indices = X[:, best_split["feature_index"]] > best_split["threshold"] # Samples đi right
# Kiểm tra constraint min_samples_leaf
if np.sum(left_indices) < self.min_samples_leaf or np.sum(right_indices) < self.min_samples_leaf:
return self._calculate_leaf_value(y) # Không split nếu vi phạm constraint
# Đệ quy build cây con
left_subtree = self._build_tree(X[left_indices], y[left_indices], depth + 1) # Build cây trái
right_subtree = self._build_tree(X[right_indices], y[right_indices], depth + 1) # Build cây phải
# Trả về internal node
return TreeNode(
feature_index=best_split["feature_index"],
threshold=best_split["threshold"],
left=left_subtree,
right=right_subtree
)
def _get_best_split(self, X, y, num_features):
"""Tìm split tốt nhất (variance reduction lớn nhất)"""
best_split = {} # Dict lưu thông tin split tốt nhất
max_variance_reduction = -float("inf") # Khởi tạo giá trị tối đa
# Xác định số features để xem xét
feature_indices = self._get_feature_indices(num_features)
for feature_index in feature_indices: # Thử từng feature
feature_values = X[:, feature_index] # Lấy values của feature
unique_values = np.unique(feature_values) # Lấy các giá trị unique
if len(unique_values) == 1: # Feature có 1 giá trị duy nhất
continue # Không thể split → bỏ qua
thresholds = (unique_values[:-1] + unique_values[1:]) / 2 # Midpoint giữa các giá trị liên tiếp
for threshold in thresholds: # Thử từng threshold
left_indices = feature_values <= threshold # Boolean mask cho left
right_indices = feature_values > threshold # Boolean mask cho right
left_count = np.sum(left_indices) # Đếm samples bên left
right_count = np.sum(right_indices) # Đếm samples bên right
# Kiểm tra constraint min_samples_leaf
if left_count < self.min_samples_leaf or right_count < self.min_samples_leaf:
continue # Bỏ qua split này
# Tính variance reduction của split này
variance_reduction = self._calculate_variance_reduction(
y, y[left_indices], y[right_indices]
)
# Cập nhật best split nếu tốt hơn
if variance_reduction > max_variance_reduction:
max_variance_reduction = variance_reduction
best_split = {
"feature_index": feature_index,
"threshold": threshold,
"variance_reduction": variance_reduction
}
return best_split # Trả về split tốt nhất
def _get_feature_indices(self, num_features):
"""Xác định các feature indices để xem xét dựa trên max_features"""
if self.max_features is None:
# Dùng tất cả features
return range(num_features)
# Tính số features cần chọn
if isinstance(self.max_features, int):
max_features = min(self.max_features, num_features)
elif isinstance(self.max_features, float):
max_features = max(1, int(self.max_features * num_features))
elif self.max_features == 'sqrt':
max_features = max(1, int(np.sqrt(num_features)))
elif self.max_features == 'log2':
max_features = max(1, int(np.log2(num_features)))
else:
raise ValueError(f"Invalid max_features value: {self.max_features}")
# Chọn ngẫu nhiên max_features features
return np.random.choice(num_features, max_features, replace=False)
def _calculate_variance_reduction(self, parent, left_child, right_child):
"""Tính độ giảm phương sai: Var(parent) - trọng số Var(children)"""
weight_left = len(left_child) / len(parent) # Tỷ lệ mẫu bên trái
weight_right = len(right_child) / len(parent) # Tỷ lệ mẫu bên phải
# Variance reduction = Var(parent) - weighted average Var(children)
variance_reduction = self._variance(parent) - (
weight_left * self._variance(left_child) +
weight_right * self._variance(right_child)
)
return variance_reduction
def _variance(self, y):
"""Tính variance của y"""
return np.var(y) if len(y) > 0 else 0 # Trả về 0 nếu y rỗng
def _calculate_leaf_value(self, y):
"""Tạo nút lá với giá trị = mean(y)"""
return TreeNode(value=np.mean(y)) # Dự đoán = trung bình y
def predict(self, X):
"""Dự đoán cho tất cả samples"""
X = np.array(X) if not isinstance(X, np.ndarray) else X # Chuyển về numpy array
return np.array([self._predict_sample(sample, self.tree) for sample in X]) # Predict từng sample
def _predict_sample(self, sample, tree):
"""Dự đoán 1 sample bằng cách đi xuống cây"""
if tree.value is not None: # Leaf node
return tree.value # Trả về giá trị dự đoán
# Internal node: đi xuống left hoặc right
if sample[tree.feature_index] <= tree.threshold: # Điều kiện đi left
return self._predict_sample(sample, tree.left) # Đệ quy left
else:
return self._predict_sample(sample, tree.right) # Đệ quy right