-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
302 lines (259 loc) · 14.1 KB
/
main.py
File metadata and controls
302 lines (259 loc) · 14.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
from flask import Flask, request, render_template, send_from_directory, abort, after_this_request
from werkzeug.utils import secure_filename
import pandas as pd
from docx import Document
import os
import re
import zipfile
import datetime
import logging
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.StreamHandler()
]
)
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = '/tmp/uploads'
app.config['OUTPUT_FOLDER'] = '/tmp/output'
app.config['MAX_CONTENT_LENGTH'] = 5 * 1024 * 1024 # 5 MB for free hosting
# Create folders if they don't exist
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
os.makedirs(app.config['OUTPUT_FOLDER'], exist_ok=True)
ALLOWED_WORD_EXTENSIONS = {'.docx'}
ALLOWED_EXCEL_EXTENSIONS = {'.xlsx', '.xls'}
def allowed_file(filename, allowed_extensions):
return '.' in filename and os.path.splitext(filename)[1].lower() in allowed_extensions
@app.route('/', methods=['GET'])
def index():
logging.info("Rendering index page")
return render_template('index.html')
@app.route('/upload', methods=['POST'])
def upload_files():
logging.info("Handling file upload")
word_file = request.files.get('wordfile')
excel_file = request.files.get('excelfile')
if not word_file or not allowed_file(word_file.filename, ALLOWED_WORD_EXTENSIONS):
logging.error("Invalid or missing Word (.docx) file")
abort(400, 'Invalid or missing Word (.docx) file.')
if not excel_file or not allowed_file(excel_file.filename, ALLOWED_EXCEL_EXTENSIONS):
logging.error("Invalid or missing Excel (.xlsx or .xls) file")
abort(400, 'Invalid or missing Excel (.xlsx or .xls) file.')
word_filename = secure_filename(word_file.filename)
excel_filename = secure_filename(excel_file.filename)
word_filepath = os.path.join(app.config['UPLOAD_FOLDER'], word_filename)
excel_filepath = os.path.join(app.config['UPLOAD_FOLDER'], excel_filename)
try:
word_file.save(word_filepath)
excel_file.save(excel_filepath)
except Exception as e:
logging.error(f"Failed to save files: {str(e)}")
abort(500, 'Failed to save uploaded files.')
try:
data = pd.read_excel(excel_filepath)
logging.info(f"Excel file read successfully. Columns: {data.columns.tolist()}")
except Exception as e:
logging.error(f"Failed to read Excel file: {str(e)}")
abort(400, 'Failed to read the Excel file. Ensure it is a valid format.')
columns = data.columns.tolist()
return render_template('choose_column.html', columns=columns, word_filepath=word_filepath, excel_filepath=excel_filepath)
@app.route('/process', methods=['POST'])
def process_files():
logging.info("Processing files")
word_filepath = request.form['word_filepath']
excel_filepath = request.form['excel_filepath']
chosen_column = request.form['chosen_column']
try:
data = pd.read_excel(excel_filepath, engine='openpyxl')
logging.info(f"Excel columns: {data.columns.tolist()}")
except Exception as e:
logging.error(f"Error reading Excel file during processing: {str(e)}")
abort(400, 'Error reading the Excel file during processing.')
if chosen_column not in data.columns:
logging.error(f"Chosen column '{chosen_column}' not found in Excel file")
abort(400, f"Column '{chosen_column}' not found in Excel file.")
filenames = []
for index, row in data.iterrows():
try:
doc = Document(word_filepath)
except Exception as e:
logging.error(f"Error loading Word document: {str(e)}")
continue
# Create a dictionary mapping from column names to values
row_dict = {}
for col in data.columns:
value = row[col]
if isinstance(value, datetime.datetime) and pd.notna(value):
value = value.strftime('%d/%m/%Y')
elif pd.isna(value):
value = ""
else:
value = str(value).strip()
row_dict[col] = value
# Log all possible placeholders with column names for debugging
logging.info(f"Row data keys: {list(row_dict.keys())}")
logging.info(f"Row data: {row_dict}")
# Log all tables and placeholders
placeholders_found = set()
for table in doc.tables:
for row_idx, row in enumerate(table.rows):
for col_idx, cell in enumerate(row.cells):
for paragraph in cell.paragraphs:
full_text = ''.join(run.text for run in paragraph.runs)
normalized_text = re.sub(r'[\s\u00a0\u200b\u00ad\u200c\u200d\u2028\u200e\u200f]+', ' ', full_text).strip()
run_texts = [repr(run.text) for run in paragraph.runs]
unicode_chars = [(char, hex(ord(char))) for char in full_text if ord(char) > 127 or ord(char) < 32]
matches = re.findall(r"[\u00ab\u2039<](.*?[\u00bb\u203a>])", normalized_text)
placeholders_found.update(matches)
logging.debug(f"Table cell [row {row_idx}, col {col_idx}] runs: {run_texts}")
logging.debug(f"Table cell [row {row_idx}, col {col_idx}] unicode chars: {unicode_chars}")
logging.debug(f"Table cell [row {row_idx}, col {col_idx}] full text: {full_text!r}, normalized: {normalized_text!r}, matches: {matches}")
for para_idx, paragraph in enumerate(doc.paragraphs):
full_text = ''.join(run.text for run in paragraph.runs)
normalized_text = re.sub(r'[\s\u00a0\u200b\u00ad\u200c\u200d\u2028\u200e\u200f]+', ' ', full_text).strip()
run_texts = [repr(run.text) for run in paragraph.runs]
unicode_chars = [(char, hex(ord(char))) for char in full_text if ord(char) > 127 or ord(char) < 32]
matches = re.findall(r"[\u00ab\u2039<](.*?[\u00bb\u203a>])", normalized_text)
placeholders_found.update(matches)
logging.debug(f"Paragraph {para_idx} runs: {run_texts}")
logging.debug(f"Paragraph {para_idx} unicode chars: {unicode_chars}")
logging.debug(f"Paragraph {para_idx} full text: {full_text!r}, normalized: {normalized_text!r}, matches: {matches}")
logging.info(f"Placeholders found in document: {placeholders_found}")
# Replace placeholders in paragraphs and tables
total_replacements = 0
for para_idx, paragraph in enumerate(doc.paragraphs):
replacements = _replace_placeholders_in_paragraph(paragraph, row_dict, para_idx)
total_replacements += replacements
for table_idx, table in enumerate(doc.tables):
replacements = _replace_placeholders_in_table(table, row_dict, table_idx)
total_replacements += replacements
logging.info(f"Total replacements made: {total_replacements}")
if total_replacements == 0:
logging.warning("No placeholders were replaced in the document")
try:
output_filename = f"{secure_filename(str(row_dict[chosen_column]))}_{index}.docx"
output_path = os.path.join(app.config['OUTPUT_FOLDER'], output_filename)
doc.save(output_path)
filenames.append(output_path)
except KeyError as e:
logging.error(f"Column '{chosen_column}' not found in row data: {str(e)}")
continue
except Exception as e:
logging.error(f"Error saving output file {output_filename}: {str(e)}")
continue
if not filenames:
logging.error("No files were generated")
abort(500, 'No files were generated due to processing errors.')
zip_path = os.path.join(app.config['OUTPUT_FOLDER'], "processed_documents.zip")
try:
with zipfile.ZipFile(zip_path, 'w') as doc_zip:
for file in filenames:
doc_zip.write(file, arcname=os.path.basename(file))
logging.info(f"Created zip file: {zip_path}")
except Exception as e:
logging.error(f"Error creating zip file: {str(e)}")
abort(500, 'Error creating zip file.')
@after_this_request
def remove_files(response):
logging.info("Cleaning up temporary files")
try:
for file in filenames:
if os.path.exists(file):
os.remove(file)
for file in [word_filepath, excel_filepath]:
if os.path.exists(file):
os.remove(file)
if os.path.exists(zip_path):
os.remove(zip_path)
except Exception as e:
logging.error(f"Error cleaning up files: {str(e)}")
return response
logging.info("Sending zip file to client")
return send_from_directory(app.config['OUTPUT_FOLDER'], "processed_documents.zip", as_attachment=True)
def _replace_placeholders_in_paragraph(paragraph, row_data, para_idx):
# Concatenate all runs to get the full paragraph text
full_text = ''.join(run.text for run in paragraph.runs)
# Enhanced normalization for Unicode whitespace and non-printable characters
normalized_text = re.sub(r'[\s\u00a0\u200b\u00ad\u200c\u200d\u2028\u200e\u200f]+', ' ', full_text).strip()
original_text = normalized_text
# Check if there's any text to process
if not normalized_text:
return 0
replacements = 0
logging.debug(f"Processing paragraph {para_idx} text: {full_text!r}, normalized: {normalized_text!r}")
# First approach: Direct placeholder replacement (exact column names)
for column_name, value in row_data.items():
# Create pattern for various placeholder formats with the column name
patterns = [
f"[\u00ab\u2039<]\\s*{re.escape(column_name)}\\s*[\u00bb\u203a>]", # Basic: «Column_Name»
f"[\u00ab\u2039<]\\s*{re.escape(column_name.lower())}\\s*[\u00bb\u203a>]", # Lowercase: «column_name»
f"[\u00ab\u2039<]\\s*{re.escape(column_name.upper())}\\s*[\u00bb\u203a>]" # Uppercase: «COLUMN_NAME»
]
for pattern in patterns:
regex = re.compile(pattern, re.IGNORECASE)
if regex.search(normalized_text):
normalized_text = regex.sub(value, normalized_text)
replacements += 1
logging.debug(f"Replaced '{pattern}' with '{value}' in paragraph {para_idx}")
# Second approach: Extract placeholders and try to match them to column names
if replacements == 0:
placeholders = re.findall(r"[\u00ab\u2039<](.*?[\u00bb\u203a>])", original_text)
for placeholder in placeholders:
# Remove the closing bracket from the placeholder
clean_placeholder = placeholder[:-1].strip()
# Try different variations of the placeholder to match column names
for column_name, value in row_data.items():
if (clean_placeholder.lower() == column_name.lower() or
clean_placeholder.lower().replace("_", "") == column_name.lower().replace("_", "") or
clean_placeholder.lower().replace(" ", "") == column_name.lower().replace(" ", "")):
pattern = f"[\u00ab\u2039<]\\s*{re.escape(clean_placeholder)}\\s*[\u00bb\u203a>]"
regex = re.compile(pattern, re.IGNORECASE)
if regex.search(normalized_text):
normalized_text = regex.sub(value, normalized_text)
replacements += 1
logging.debug(f"Fuzzy match: Replaced '{pattern}' with '{value}' in paragraph {para_idx}")
# If we made replacements using regex approaches, update the paragraph
if replacements > 0:
# Clear all runs and add a single new run with the replaced text
for run in paragraph.runs:
run.text = ''
paragraph.add_run(normalized_text)
logging.debug(f"Updated paragraph {para_idx} text: {normalized_text!r}")
# Third approach: Direct run-by-run inspection and replacement
# Improved to detect any placeholder and match to column names
if replacements == 0:
for i, run in enumerate(paragraph.runs):
run_text = run.text
placeholder_match = re.search(r"[\u00ab\u2039<](.*?[\u00bb\u203a>])", run_text)
if placeholder_match:
placeholder = placeholder_match.group(1)[:-1].strip()
for column_name, value in row_data.items():
if (placeholder.lower() == column_name.lower() or
placeholder.lower().replace("_", "") == column_name.lower().replace("_", "") or
placeholder.lower().replace(" ", "") == column_name.lower().replace(" ", "")):
original_placeholder = placeholder_match.group(0)
run.text = run_text.replace(original_placeholder, value)
replacements += 1
logging.debug(f"Direct run replacement: '{original_placeholder}' with '{value}' in run {i} of paragraph {para_idx}")
return replacements
def _replace_placeholders_in_table(table, row_data, table_idx):
replacements = 0
for row_idx, row in enumerate(table.rows):
for col_idx, cell in enumerate(row.cells):
for paragraph in cell.paragraphs:
cell_replacements = _replace_placeholders_in_paragraph(paragraph, row_data, f"table_{table_idx}_cell_{row_idx}_{col_idx}")
replacements += cell_replacements
# Handle nested tables if any
for i, nested_table in enumerate(cell._element.xpath('.//w:tbl')):
if i > 0: # Skip the first one as it's the table itself
try:
nested_table_obj = table.__class__(nested_table, table._parent)
replacements += _replace_placeholders_in_table(nested_table_obj, row_data, f"{table_idx}_nested_{i}")
except Exception as e:
logging.error(f"Error processing nested table: {str(e)}")
return replacements
if __name__ == '__main__':
port = int(os.environ.get('PORT', 10000))
app.run(host='0.0.0.0', port=port, debug=False)