-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathtask_DB.py
More file actions
128 lines (114 loc) Β· 4.56 KB
/
task_DB.py
File metadata and controls
128 lines (114 loc) Β· 4.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
"""
CodeCraft PMS Project
νμΌλͺ
: task_DB.py
λ§μ§λ§ μμ λ μ§ : 2025/02/21
"""
import pymysql
from mysql_connection import db_connect
from task import *
# μ
무λ₯Ό μ‘°ννλ ν¨μ
# νλ‘μ νΈ λ²νΈμ νλ²μ λ§€κ° λ³μλ‘ λ°κ³ , WHERE μ μ 쑰건μΌλ‘ νν°λ§ν΄μΌ νλ€
# λ§μ½, νλ²μΌλ‘λ§ μ‘°νν κ²½μ°, ν λͺ
μ μ¬μ©μκ° 2κ° μ΄μμ νλ‘μ νΈμ μ°Έμ¬νκ³ μλ€λ©΄, λ€λ₯Έ νλ‘μ νΈμ μ
무λ κ°μ΄ μ‘°νλ μ μμ
def fetch_task_info(pid, univ_id):
connection = db_connect()
cur = connection.cursor(pymysql.cursors.DictCursor)
try:
load_work = "SELECT w_no, w_name, w_person, w_start, w_end, w_checked, s_no FROM work WHERE p_no = %s AND s_no = %s"
cur.execute(load_work, (pid, univ_id))
result = cur.fetchall()
return result
except Exception as e:
print(f"Error [fetch_task_info] : {e}")
return e
finally:
cur.close()
connection.close()
# νΉμ νλ‘μ νΈμ λͺ¨λ μ
무λ₯Ό μ‘°ννλ ν¨μ
# νλ‘μ νΈ λ²νΈλ₯Ό λ§€κ° λ³μλ‘ λ°λλ€
def fetch_all_task_info(pid):
connection = db_connect()
cur = connection.cursor(pymysql.cursors.DictCursor)
try:
cur.execute("SELECT * FROM work WHERE p_no = %s", (pid,))
result = cur.fetchall()
return result
except Exception as e:
print(f"Error [fetch_all_task_info] : {e}")
return e
finally:
cur.close()
connection.close()
# μ
무λ₯Ό μΆκ°νλ ν¨μ
# μΆκ°νλ €λ μ
무μ λ΄μ©, νλ‘μ νΈ λ²νΈ, νλ²μ λ§€κ° λ³μλ‘ λ°λλ€
# μ
무λ₯Ό μΆκ°ν λ νλ‘μ νΈ λ²νΈμ νλ²λ λ§€κ° λ³μλ‘ λ°μμΌ νλ€ (work ν
μ΄λΈμ p_no, s_no μ»¬λΌ λͺ¨λ NOT NULL)
def add_task_info(tname, tperson, tstart, tend, pid, univ_id):
connection = db_connect()
cur = connection.cursor(pymysql.cursors.DictCursor)
try:
add_work = """
INSERT INTO work(w_name, w_person, w_start, w_end, w_checked, p_no, s_no)
VALUES (%s, %s, %s, %s, 0, %s, %s)
"""
cur.execute(add_work, (tname, tperson, tstart, tend, pid, univ_id))
connection.commit()
# ORDERY BY μ μ μ΄μ©νμ¬ μ
무 λ²νΈλ₯Ό λ΄λ¦Όμ°¨μμΌλ‘ μ‘°ννκ³ ,
# κ·Έ 첫 λ²μ§Έ νλ§ κ°μ Έμ¨ νμ λ°©κΈ μΆκ°ν μ
무μ μ
무 λ²νΈλ₯Ό μ‘°ννμ¬ λ°ν
cur.execute("SELECT /*+ INDEX(work idx_work_pno_sno_wno) */ * FROM work WHERE p_no = %s AND s_no = %s ORDER BY w_no DESC", (pid, univ_id))
row = cur.fetchone()
return row['w_no']
except Exception as e:
connection.rollback()
print(f"Error [add_task_info] : {e}")
return e
finally:
cur.close()
connection.close()
# μ
무λ₯Ό μμ νλ ν¨μ
# μμ νλ €λ μ
무μ λ΄μ©κ³Ό μ
무 λ²νΈλ₯Ό λ§€κ° λ³μλ‘ λ°λλ€
# μμ ν μ
무μ ν΄λΉνλ μ
무 λ²νΈλ λ§€κ° λ³μλ‘ κ°μ΄ λ°μμΌ νλ©°, κ·Έλ μ§ μμΌλ©΄ λͺ¨λ μ
λ¬΄κ° κ°μ λ΄μ©μΌλ‘ μμ λλ λ¬Έμ κ° λ°μ
def update_task_info(tname, tperson, tstart, tend, tfinish, univ_id, w_no):
connection = db_connect()
cur = connection.cursor(pymysql.cursors.DictCursor)
try:
edit_work = """
UPDATE work
SET w_name = %s,
w_person = %s,
w_start = %s,
w_end = %s,
w_checked = %s,
s_no = %s
WHERE w_no = %s
"""
cur.execute(edit_work, (tname, tperson, tstart, tend, tfinish, univ_id, w_no))
connection.commit()
return True
except Exception as e:
connection.rollback()
print(f"Error [update_task_info]: {e}")
return e
finally:
cur.close()
connection.close()
# μ
무λ₯Ό μμ νλ ν¨μ
# μμ νλ €λ μ
무μ μ
무 λ²νΈλ₯Ό λ§€κ° λ³μλ‘ λ°λλ€
def delete_task_info(w_no):
connection = db_connect()
cur = connection.cursor(pymysql.cursors.DictCursor)
try:
cur.execute("SELECT COUNT(*) AS cnt FROM work WHERE w_no = %s", (w_no,))
result = cur.fetchone()
if result['cnt'] == 0:
print(f"Error [delete_task_info] : Work number {w_no} does not exist.")
return False
# μ
무 λ²νΈλ‘ ν΄λΉ μ
무 μμ
cur.execute("DELETE FROM work WHERE w_no = %s", (w_no,))
connection.commit()
return True
except Exception as e:
connection.rollback()
print(f"Error [delete_task_info] : {e}")
return e
finally:
cur.close()
connection.close()