From 2f445aaa4c366f274fe1cceca1d0d553ebe0c8f6 Mon Sep 17 00:00:00 2001 From: Moshbbab <132464244+Moshbbab@users.noreply.github.com> Date: Mon, 2 Feb 2026 16:11:40 +0300 Subject: [PATCH] Add Hemmah Pro IVS 2025 valuation module --- hemmah_pro_ivs_2025.py | 869 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 869 insertions(+) create mode 100644 hemmah_pro_ivs_2025.py diff --git a/hemmah_pro_ivs_2025.py b/hemmah_pro_ivs_2025.py new file mode 100644 index 00000000000..2f14fc0dc60 --- /dev/null +++ b/hemmah_pro_ivs_2025.py @@ -0,0 +1,869 @@ +""" +🏗️ HEMMAH PRO - IVS 2025 COMPLIANT VALUATION SYSTEM +نظام همة الاحترافي للتقييم العقاري - متوافق مع المعايير الدولية +""" + +import os +import sys +import warnings +from datetime import datetime, date +from typing import Dict, List, Tuple, Optional + +import pandas as pd +import numpy as np +import matplotlib.pyplot as plt +from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor +from sklearn.model_selection import train_test_split, cross_val_score +from sklearn.metrics import mean_absolute_error, r2_score, mean_absolute_percentage_error +import shap +import xgboost as xgb +import ipywidgets as widgets +from IPython.display import display, clear_output +from fpdf import FPDF +import arabic_reshaper +from bidi.algorithm import get_display + +warnings.filterwarnings("ignore") + +# التثبيت الصامت للمكتبات +if "google.colab" in sys.modules: + print("🔧 جاري تثبيت المكتبات المطلوبة...") + os.system("pip install -q fpdf arabic-reshaper python-bidi shap xgboost") + + # تحميل الخطوط العربية + if not os.path.exists("Amiri-Regular.ttf"): + os.system( + "wget -q https://github.com/google/fonts/raw/main/ofl/amiri/Amiri-Regular.ttf" + ) + os.system( + "wget -q https://github.com/google/fonts/raw/main/ofl/amiri/Amiri-Bold.ttf" + ) + print("✅ تم التثبيت") + + +# إعدادات العرض +plt.rcParams["figure.figsize"] = (12, 6) +plt.rcParams["font.size"] = 10 + + +IVS_2025_FRAMEWORK = { + "ivs_101_scope": { + "valuation_purpose": "Mortgage Financing / تقييم للرهن العقاري", + "client_type": "Banking Sector", + "property_type": "Residential & Commercial Land", + "basis_of_value": "Market Value (IVS Definition)", + "valuation_date": str(date.today()), + "report_date": str(date.today()), + "valuer_name": "مشبب القحطاني", + "valuer_license": "[رقم الترخيص]", + "inspection_date": "[تاريخ المعاينة]", + }, + "ivs_102_bases": { + "market_value_def": "The estimated amount for which an asset should exchange...", + "assumptions": [ + "البيع في السوق المفتوحة", + "الطرفان على دراية تامة", + "لا إكراه أو تسرع في البيع", + ], + }, + "ivs_103_approaches": { + "primary": "Market Approach (Comparable Sales)", + "secondary": "Income Approach (for rental properties)", + "tertiary": "Cost Approach (for special properties)", + }, + "compliance_statement": "This valuation is prepared in accordance with IVS 2025", +} + + +class HemmahDataEngine: + """ + محرك بيانات احترافي يعالج بيانات وزارة العدل وأقار + """ + + def __init__(self) -> None: + self.raw_data: Optional[pd.DataFrame] = None + self.processed_data: Optional[pd.DataFrame] = None + self.quality_metrics: Dict = {} + self.feature_columns: List[str] = [] + + def load_data(self, file_path: str) -> "HemmahDataEngine": + """تحميل البيانات من CSV/Excel""" + print(f"📂 جاري تحميل: {file_path}") + + try: + if file_path.endswith(".csv"): + # محاولة عدة ترميزات + for encoding in ["utf-8", "utf-8-sig", "cp1256", "iso-8859-1"]: + try: + self.raw_data = pd.read_csv(file_path, encoding=encoding) + break + except Exception: + continue + else: + self.raw_data = pd.read_excel(file_path) + + if self.raw_data is None: + raise ValueError("لا يمكن قراءة الملف بالترميزات المتاحة.") + + print(f"✅ تم التحميل: {len(self.raw_data):,} سجل") + return self + + except Exception as exc: + print(f"❌ خطأ في التحميل: {exc}") + raise + + def ivs_quality_check(self) -> Dict: + """ + فحص جودة البيانات حسب IVS 104 + """ + if self.raw_data is None: + raise ValueError("لا توجد بيانات محملة") + + df = self.raw_data.copy() + metrics = { + "total_records": len(df), + "timestamp": datetime.now().isoformat(), + "checks": {}, + } + + # 1. اكتمال البيانات + completeness = {} + for col in df.columns: + null_pct = (df[col].isnull().sum() / len(df)) * 100 + completeness[col] = round(100 - null_pct, 2) + metrics["checks"]["completeness"] = completeness + + # 2. التفرد (إزالة التكرارات) + duplicates = df.duplicated().sum() + metrics["checks"]["uniqueness"] = { + "duplicate_count": int(duplicates), + "unique_percentage": round(((len(df) - duplicates) / len(df)) * 100, 2), + } + + # 3. القيم الشاذة (Outliers) + numeric_cols = df.select_dtypes(include=[np.number]).columns + outlier_report = {} + + for col in numeric_cols: + if any(keyword in col.lower() for keyword in ["price", "سعر", "value", "قيمة"]): + q1 = df[col].quantile(0.25) + q3 = df[col].quantile(0.75) + iqr = q3 - q1 + lower = q1 - 1.5 * iqr + upper = q3 + 1.5 * iqr + outliers = df[(df[col] < lower) | (df[col] > upper)] + outlier_report[col] = { + "count": len(outliers), + "percentage": round(len(outliers) / len(df) * 100, 2), + "bounds": {"lower": lower, "upper": upper}, + } + + metrics["checks"]["outliers"] = outlier_report + + # 4. حداثة البيانات + date_cols = [ + c + for c in df.columns + if any(x in c.lower() for x in ["date", "تاريخ", "sale", "بيع"]) + ] + if date_cols: + try: + latest = pd.to_datetime(df[date_cols[0]], errors="coerce").max() + metrics["checks"]["timeliness"] = { + "latest_record": str(latest.date()) if pd.notna(latest) else "Unknown", + "data_age_days": (datetime.now() - latest).days + if pd.notna(latest) + else None, + } + except Exception: + metrics["checks"]["timeliness"] = "Unable to parse dates" + + self.quality_metrics = metrics + return metrics + + def clean_and_engineer(self) -> "HemmahDataEngine": + """ + تنظيف البيانات وهندسة المتغيرات + """ + if self.raw_data is None: + raise ValueError("لا توجد بيانات محملة") + + df = self.raw_data.copy() + initial_count = len(df) + + # تنظيف الأسعار + price_cols = [ + c + for c in df.columns + if any(x in c.lower() for x in ["price", "سعر", "value", "قيمة"]) + ] + for col in price_cols: + df[col] = df[col].astype(str).str.replace(",", "").str.replace('"', "").str.strip() + df[col] = pd.to_numeric(df[col], errors="coerce") + + # تنظيف المساحات + area_cols = [ + c + for c in df.columns + if any(x in c.lower() for x in ["area", "مساحة", "size", "المساحة"]) + ] + for col in area_cols: + df[col] = df[col].astype(str).str.replace(",", "").str.replace('"', "").str.strip() + df[col] = pd.to_numeric(df[col], errors="coerce") + + # إزالة القيم غير المنطقية + for col in price_cols + area_cols: + if col in df.columns: + df = df[df[col] > 0] + + # حساب سعر المتر + if price_cols and area_cols: + df["price_per_sqm"] = df[price_cols[0]] / df[area_cols[0]] + # إزالة الشواذ الشديدة + df = df[df["price_per_sqm"] < df["price_per_sqm"].quantile(0.995)] + df = df[df["price_per_sqm"] > df["price_per_sqm"].quantile(0.005)] + + # هندسة المتغيرات الجغرافية + location_cols = [ + c + for c in df.columns + if any(x in c.lower() for x in ["district", "حي", "city", "مدينة", "region", "منطقة"]) + ] + if "price_per_sqm" in df.columns: + for col in location_cols[:2]: + # حساب متوسط السعر للحي (Target Encoding) + district_avg = df.groupby(col)["price_per_sqm"].transform("mean") + df[f"{col}_avg_price"] = district_avg + + # ترتيب الحي (percentile) + df[f"{col}_tier"] = pd.qcut( + df[col].map(df.groupby(col)["price_per_sqm"].mean()), + q=5, + labels=["E", "D", "C", "B", "A"], + ) + + # متغيرات إضافية + if area_cols: + main_area = area_cols[0] + df["area_category"] = pd.cut( + df[main_area], + bins=[0, 300, 600, 1000, 2000, float("inf")], + labels=["Small", "Medium", "Large", "XLarge", "Estate"], + ) + + self.processed_data = df + final_count = len(df) + + print(f"✅ تم التنظيف: {initial_count:,} → {final_count:,} سجل صالح") + print(f"📊 المتغيرات الم engineered: {len(df.columns)}") + + return self + + def get_modeling_data(self) -> Tuple[pd.DataFrame, List[str], str]: + """ + إعداد البيانات للنمذجة + """ + if self.processed_data is None: + raise ValueError("لا توجد بيانات معالجة") + + df = self.processed_data.copy() + + # تحديد الهدف + target = "price_per_sqm" if "price_per_sqm" in df.columns else None + if target is None: + raise ValueError("لا يوجد عمود للسعر") + + # تحديد المتغيرات المستقلة + exclude = ["price", "سعر", "value", "قيمة", "price_per_sqm", "date", "تاريخ"] + features = [c for c in df.columns if not any(x in c.lower() for x in exclude)] + features = [ + c for c in features if df[c].dtype in ["int64", "float64", "int32", "float32"] + ] + + # إزالة القيم الناقصة + model_df = df[features + [target]].dropna() + + return model_df, features, target + + +class HemmahMLEngine: + """ + محرك تعلم آلة متقدم للتقييم العقاري + """ + + def __init__(self) -> None: + self.models: Dict[str, object] = {} + self.best_model_name: Optional[str] = None + self.best_model: Optional[object] = None + self.feature_importance: Optional[pd.DataFrame] = None + self.shap_explainer: Optional[object] = None + self.metrics: Dict = {} + self.training_data: Optional[pd.DataFrame] = None + + def train_multiple_models( + self, df: pd.DataFrame, features: List[str], target: str + ) -> Dict: + """ + تدريب عدة نماذج واختيار الأفضل + """ + print("🤖 جاري تدريب النماذج...") + + x_train, x_test, y_train, y_test = train_test_split( + df[features], df[target], test_size=0.2, random_state=42 + ) + self.training_data = x_train + + models_config = { + "Random Forest": RandomForestRegressor( + n_estimators=200, max_depth=20, random_state=42, n_jobs=-1 + ), + "XGBoost": xgb.XGBRegressor( + n_estimators=200, max_depth=6, learning_rate=0.1, random_state=42 + ), + "Gradient Boosting": GradientBoostingRegressor( + n_estimators=200, max_depth=5, random_state=42 + ), + } + + results = {} + + for name, model in models_config.items(): + print(f" ⚙️ تدريب {name}...") + + # Cross-validation + cv_scores = cross_val_score(model, x_train, y_train, cv=5, scoring="r2", n_jobs=-1) + + # Training + model.fit(x_train, y_train) + y_pred = model.predict(x_test) + + # Metrics + results[name] = { + "cv_r2_mean": cv_scores.mean(), + "cv_r2_std": cv_scores.std(), + "test_r2": r2_score(y_test, y_pred), + "test_mae": mean_absolute_error(y_test, y_pred), + "test_mape": mean_absolute_percentage_error(y_test, y_pred) * 100, + "model": model, + } + + # اختيار الأفضل + self.best_model_name = max(results, key=lambda x: results[x]["test_r2"]) + self.best_model = results[self.best_model_name]["model"] + self.metrics = results[self.best_model_name] + + # Feature Importance + if hasattr(self.best_model, "feature_importances_"): + self.feature_importance = pd.DataFrame( + {"feature": features, "importance": self.best_model.feature_importances_} + ).sort_values("importance", ascending=False) + + # SHAP Setup + try: + self.shap_explainer = shap.TreeExplainer(self.best_model) + print("✅ تم إعداد SHAP للتفسير") + except Exception as exc: + print(f"⚠️ لا يمكن إعداد SHAP: {exc}") + + # طباعة النتائج + print("\n" + "=" * 60) + print("📊 نتائج مقارنة النماذج (IVS 105)") + print("=" * 60) + for name, res in results.items(): + marker = "★" if name == self.best_model_name else " " + print(f"{marker} {name:20} | R²: {res['test_r2']:.3f} | MAPE: {res['test_mape']:.1f}%") + print("=" * 60) + + return results + + def predict(self, input_data: pd.DataFrame) -> Dict: + """ + التنبؤ مع تفسير كامل + """ + if self.best_model is None: + raise ValueError("لا يوجد نموذج مدرب") + + prediction = self.best_model.predict(input_data)[0] + + result = { + "predicted_price_per_sqm": prediction, + "confidence_interval": {"lower": prediction * 0.85, "upper": prediction * 1.15}, + "model_used": self.best_model_name, + "r2_score": self.metrics.get("test_r2", 0), + } + + # SHAP Explanation + if self.shap_explainer is not None: + shap_values = self.shap_explainer.shap_values(input_data) + result["shap_values"] = shap_values + result["feature_contributions"] = self._explain_features(input_data, shap_values) + + return result + + def _explain_features(self, x_data: pd.DataFrame, shap_values: np.ndarray) -> List[Dict]: + """ + تفسير مساهمة كل متغير + """ + contributions = [] + for i, col in enumerate(x_data.columns): + value = x_data[col].iloc[0] + if isinstance(shap_values, list): + impact = shap_values[0][0][i] + else: + impact = shap_values[0][i] if len(shap_values.shape) > 1 else shap_values[i] + contributions.append( + {"feature": col, "value": value, "impact": impact, "direction": "↑" if impact > 0 else "↓"} + ) + + return sorted(contributions, key=lambda x: abs(x["impact"]), reverse=True) + + def sensitivity_analysis( + self, + base_input: pd.DataFrame, + feature: str, + variations: List[float] = None, + ) -> pd.DataFrame: + """ + تحليل الحساسية (IVS 105) + """ + if variations is None: + variations = [-0.2, -0.1, 0, 0.1, 0.2] + + base_pred = self.best_model.predict(base_input)[0] + results = [] + + for var in variations: + modified = base_input.copy() + modified[feature] = modified[feature] * (1 + var) + new_pred = self.best_model.predict(modified)[0] + + results.append( + { + "variation": f"{var:+.0%}", + "predicted_value": new_pred, + "change_from_base": ((new_pred - base_pred) / base_pred) * 100, + "absolute_change": new_pred - base_pred, + } + ) + + return pd.DataFrame(results) + + +class HemmahDashboard: + """ + لوحة تحكم تفاعلية احترافية + """ + + def __init__(self, data_engine: HemmahDataEngine, ml_engine: HemmahMLEngine) -> None: + self.data_engine = data_engine + self.ml_engine = ml_engine + self.current_prediction: Optional[Dict] = None + + def create_interface(self) -> None: + """ + إنشاء الواجهة الكاملة + """ + # العنوان + header = widgets.HTML( + """ +
متوافق مع المعايير الدولية للتقييم | يستخدم بيانات وزارة العدل
+