-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathretention_policy.py
More file actions
executable file
·502 lines (417 loc) · 19.1 KB
/
retention_policy.py
File metadata and controls
executable file
·502 lines (417 loc) · 19.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
#!/usr/bin/env python3
"""
Data Retention Policy Implementation for Mini Data Warehouse
Manages data lifecycle and automated cleanup
"""
import psycopg2
from datetime import datetime, timedelta
import logging
import json
import os
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class RetentionPolicyManager:
def __init__(self, connection_params=None):
if connection_params is None:
self.connection_params = {
'host': 'localhost',
'port': '5432',
'database': 'warehouse',
'user': 'admin',
'password': 'secret'
}
else:
self.connection_params = connection_params
# Default retention policies (in days)
self.retention_policies = {
'raw_data': {
'orders': 1095, # 3 years
'order_items': 1095, # 3 years
'customers': -1, # Never delete (keep forever)
'products': -1, # Never delete (keep forever)
},
'aggregated_data': {
'fact_sales': 2555, # 7 years
'dim_customer': -1, # Never delete
'dim_product': -1, # Never delete
'dim_date': -1, # Never delete
},
'logs_and_metadata': {
'data_quality_logs': 90, # 90 days
'load_metadata': 365, # 1 year
}
}
def connect_db(self):
"""Connect to PostgreSQL database"""
try:
conn = psycopg2.connect(**self.connection_params)
conn.autocommit = True
return conn
except psycopg2.Error as e:
logger.error(f"Database connection failed: {e}")
return None
def create_audit_table(self):
"""Create audit table to track retention policy executions"""
conn = self.connect_db()
if not conn:
return False
try:
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS retention_audit (
audit_id SERIAL PRIMARY KEY,
table_name VARCHAR(100) NOT NULL,
policy_type VARCHAR(50) NOT NULL,
retention_days INTEGER,
records_before INTEGER,
records_deleted INTEGER,
execution_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status VARCHAR(20) DEFAULT 'SUCCESS'
)
""")
# Create index for better query performance
cur.execute("""
CREATE INDEX IF NOT EXISTS idx_retention_audit_table_date
ON retention_audit(table_name, execution_date)
""")
logger.info("Retention audit table created/verified")
return True
except psycopg2.Error as e:
logger.error(f"Error creating audit table: {e}")
return False
finally:
conn.close()
def create_archive_tables(self):
"""Create archive tables for storing deleted data"""
conn = self.connect_db()
if not conn:
return False
try:
with conn.cursor() as cur:
# Archive table for orders
cur.execute("""
CREATE TABLE IF NOT EXISTS archived_orders (
archived_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
order_id INTEGER,
customer_id INTEGER,
order_date DATE,
total_amount NUMERIC(12,2)
)
""")
# Archive table for order_items
cur.execute("""
CREATE TABLE IF NOT EXISTS archived_order_items (
archived_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
order_item_id INTEGER,
order_id INTEGER,
product_id INTEGER,
quantity INTEGER,
unit_price NUMERIC(10,2)
)
""")
# Archive table for fact_sales
cur.execute("""
CREATE TABLE IF NOT EXISTS archived_fact_sales (
archived_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
sales_key INTEGER,
date_key INTEGER,
customer_key INTEGER,
product_key INTEGER,
order_id INTEGER,
order_item_id INTEGER,
quantity INTEGER,
unit_price NUMERIC(10,2),
total_amount NUMERIC(12,2),
profit_margin NUMERIC(5,2),
created_date TIMESTAMP
)
""")
logger.info("Archive tables created/verified")
return True
except psycopg2.Error as e:
logger.error(f"Error creating archive tables: {e}")
return False
finally:
conn.close()
def get_table_record_count(self, table_name):
"""Get current record count for a table"""
conn = self.connect_db()
if not conn:
return 0
try:
with conn.cursor() as cur:
cur.execute(f"SELECT COUNT(*) FROM {table_name}")
return cur.fetchone()[0]
except psycopg2.Error as e:
logger.error(f"Error counting records in {table_name}: {e}")
return 0
finally:
conn.close()
def archive_and_delete_orders(self, retention_days):
"""Archive and delete old orders and related order items"""
if retention_days <= 0:
logger.info("Orders retention policy: Keep forever")
return True
cutoff_date = datetime.now() - timedelta(days=retention_days)
logger.info(f"Applying retention policy to orders: deleting records older than {cutoff_date.date()}")
conn = self.connect_db()
if not conn:
return False
try:
with conn.cursor() as cur:
# Get count before deletion
records_before = self.get_table_record_count('orders')
# Archive orders to be deleted
cur.execute("""
INSERT INTO archived_orders (order_id, customer_id, order_date, total_amount)
SELECT order_id, customer_id, order_date, total_amount
FROM orders
WHERE order_date < %s
""", (cutoff_date.date(),))
archived_orders = cur.rowcount
# Archive related order items
cur.execute("""
INSERT INTO archived_order_items (order_item_id, order_id, product_id, quantity, unit_price)
SELECT oi.order_item_id, oi.order_id, oi.product_id, oi.quantity, oi.unit_price
FROM order_items oi
JOIN orders o ON oi.order_id = o.order_id
WHERE o.order_date < %s
""", (cutoff_date.date(),))
archived_order_items = cur.rowcount
# Delete order items first (foreign key constraint)
cur.execute("""
DELETE FROM order_items
WHERE order_id IN (
SELECT order_id FROM orders WHERE order_date < %s
)
""", (cutoff_date.date(),))
deleted_order_items = cur.rowcount
# Delete orders
cur.execute("DELETE FROM orders WHERE order_date < %s", (cutoff_date.date(),))
deleted_orders = cur.rowcount
# Record audit information
cur.execute("""
INSERT INTO retention_audit (table_name, policy_type, retention_days, records_before, records_deleted)
VALUES (%s, %s, %s, %s, %s)
""", ('orders', 'time_based', retention_days, records_before, deleted_orders))
cur.execute("""
INSERT INTO retention_audit (table_name, policy_type, retention_days, records_before, records_deleted)
VALUES (%s, %s, %s, %s, %s)
""", ('order_items', 'cascading', retention_days, 0, deleted_order_items))
logger.info(f"Archived and deleted {deleted_orders} orders and {deleted_order_items} order items")
return True
except psycopg2.Error as e:
logger.error(f"Error in orders retention policy: {e}")
return False
finally:
conn.close()
def archive_and_delete_fact_sales(self, retention_days):
"""Archive and delete old sales facts"""
if retention_days <= 0:
logger.info("Fact sales retention policy: Keep forever")
return True
cutoff_date = datetime.now() - timedelta(days=retention_days)
cutoff_date_key = int(cutoff_date.strftime('%Y%m%d'))
logger.info(f"Applying retention policy to fact_sales: deleting records older than {cutoff_date.date()}")
conn = self.connect_db()
if not conn:
return False
try:
with conn.cursor() as cur:
# Get count before deletion
records_before = self.get_table_record_count('fact_sales')
# Archive sales facts to be deleted
cur.execute("""
INSERT INTO archived_fact_sales (
sales_key, date_key, customer_key, product_key, order_id,
order_item_id, quantity, unit_price, total_amount, profit_margin, created_date
)
SELECT
sales_key, date_key, customer_key, product_key, order_id,
order_item_id, quantity, unit_price, total_amount, profit_margin, created_date
FROM fact_sales
WHERE date_key < %s
""", (cutoff_date_key,))
archived_records = cur.rowcount
# Delete old sales facts
cur.execute("DELETE FROM fact_sales WHERE date_key < %s", (cutoff_date_key,))
deleted_records = cur.rowcount
# Record audit information
cur.execute("""
INSERT INTO retention_audit (table_name, policy_type, retention_days, records_before, records_deleted)
VALUES (%s, %s, %s, %s, %s)
""", ('fact_sales', 'time_based', retention_days, records_before, deleted_records))
logger.info(f"Archived and deleted {deleted_records} sales fact records")
return True
except psycopg2.Error as e:
logger.error(f"Error in fact_sales retention policy: {e}")
return False
finally:
conn.close()
def cleanup_audit_logs(self, retention_days=90):
"""Clean up old audit log entries"""
cutoff_date = datetime.now() - timedelta(days=retention_days)
logger.info(f"Cleaning up audit logs older than {cutoff_date.date()}")
conn = self.connect_db()
if not conn:
return False
try:
with conn.cursor() as cur:
cur.execute(
"DELETE FROM retention_audit WHERE execution_date < %s",
(cutoff_date,)
)
deleted_records = cur.rowcount
logger.info(f"Deleted {deleted_records} old audit log entries")
return True
except psycopg2.Error as e:
logger.error(f"Error cleaning up audit logs: {e}")
return False
finally:
conn.close()
def get_retention_statistics(self):
"""Get statistics about retention policy executions"""
conn = self.connect_db()
if not conn:
return {}
try:
with conn.cursor() as cur:
# Get recent retention policy executions
cur.execute("""
SELECT
table_name,
MAX(execution_date) as last_execution,
SUM(records_deleted) as total_deleted,
COUNT(*) as execution_count
FROM retention_audit
WHERE execution_date >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY table_name
ORDER BY last_execution DESC
""")
recent_executions = cur.fetchall()
# Get archive table sizes
cur.execute("SELECT COUNT(*) FROM archived_orders")
archived_orders_count = cur.fetchone()[0]
cur.execute("SELECT COUNT(*) FROM archived_order_items")
archived_order_items_count = cur.fetchone()[0]
cur.execute("SELECT COUNT(*) FROM archived_fact_sales")
archived_fact_sales_count = cur.fetchone()[0]
statistics = {
'recent_executions': [
{
'table_name': row[0],
'last_execution': row[1].isoformat() if row[1] else None,
'total_deleted': row[2],
'execution_count': row[3]
} for row in recent_executions
],
'archive_counts': {
'archived_orders': archived_orders_count,
'archived_order_items': archived_order_items_count,
'archived_fact_sales': archived_fact_sales_count
}
}
return statistics
except psycopg2.Error as e:
logger.error(f"Error getting retention statistics: {e}")
return {}
finally:
conn.close()
def execute_retention_policies(self):
"""Execute all configured retention policies"""
logger.info("Starting retention policy execution...")
# Ensure infrastructure is in place
if not self.create_audit_table():
return False
if not self.create_archive_tables():
return False
success = True
# Execute raw data retention policies
raw_policies = self.retention_policies['raw_data']
if not self.archive_and_delete_orders(raw_policies['orders']):
success = False
# Execute aggregated data retention policies
agg_policies = self.retention_policies['aggregated_data']
if not self.archive_and_delete_fact_sales(agg_policies['fact_sales']):
success = False
# Cleanup audit logs
if not self.cleanup_audit_logs():
success = False
if success:
logger.info("All retention policies executed successfully")
else:
logger.error("Some retention policies failed")
return success
def load_retention_config(self, config_file='retention_config.json'):
"""Load retention policies from configuration file"""
if os.path.exists(config_file):
with open(config_file, 'r') as f:
config = json.load(f)
self.retention_policies.update(config)
logger.info(f"Loaded retention configuration from {config_file}")
else:
# Create default config file
with open(config_file, 'w') as f:
json.dump(self.retention_policies, f, indent=2)
logger.info(f"Created default retention configuration: {config_file}")
def print_retention_report(self):
"""Print a formatted retention policy report"""
stats = self.get_retention_statistics()
print("\n" + "="*50)
print("DATA RETENTION POLICY REPORT")
print("="*50)
print(f"Generated: {datetime.now().isoformat()}")
print()
print("RECENT POLICY EXECUTIONS (Last 30 days)")
print("-" * 40)
for execution in stats.get('recent_executions', []):
print(f"Table: {execution['table_name']}")
print(f" Last execution: {execution['last_execution']}")
print(f" Records deleted: {execution['total_deleted']}")
print(f" Execution count: {execution['execution_count']}")
print()
print("ARCHIVE TABLE SIZES")
print("-" * 20)
archive_counts = stats.get('archive_counts', {})
for table, count in archive_counts.items():
print(f"{table}: {count:,} records")
print("\nCURRENT RETENTION POLICIES")
print("-" * 30)
for category, policies in self.retention_policies.items():
print(f"{category.upper().replace('_', ' ')}:")
for table, days in policies.items():
if days == -1:
policy = "Keep forever"
else:
policy = f"{days} days"
print(f" {table}: {policy}")
print()
def main():
"""Main function to run retention policy management"""
import argparse
parser = argparse.ArgumentParser(description='Data retention policy manager for Mini Data Warehouse')
parser.add_argument('--config', help='Path to retention configuration file')
parser.add_argument('--report', action='store_true', help='Generate retention policy report')
parser.add_argument('--execute', action='store_true', help='Execute retention policies')
args = parser.parse_args()
manager = RetentionPolicyManager()
# Load configuration if provided
if args.config:
manager.load_retention_config(args.config)
else:
manager.load_retention_config() # Load default config
success = True
# Execute retention policies if requested
if args.execute:
if not manager.execute_retention_policies():
success = False
# Generate report if requested or as default action
if args.report or not args.execute:
manager.print_retention_report()
return 0 if success else 1
if __name__ == "__main__":
exit(main())