-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathFileData.py
More file actions
440 lines (369 loc) · 18.8 KB
/
FileData.py
File metadata and controls
440 lines (369 loc) · 18.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
# archive_agent/data/FileData.py
# Copyright © 2025 Dr.-Ing. Paul Wilhelm <paul@wilhelm.dev>
# This file is part of Archive Agent. See LICENSE for details.
from archive_agent import __version__
import time
import uuid
from typing import List, Optional, Dict, Callable, Any
from archive_agent.core.ProgressManager import ProgressManager, ProgressInfo
from PIL import Image
from qdrant_client.models import PointStruct
from archive_agent.data.processor.EmbedProcessor import EmbedProcessor
from archive_agent.db.QdrantSchema import QdrantPayload
from archive_agent.ai.AiManager import AiManager
from archive_agent.ai.AiManagerFactory import AiManagerFactory
from archive_agent.ai.chunk.AiChunk import ChunkSchema
from archive_agent.ai.vision.AiVisionEntity import AiVisionEntity
from archive_agent.ai.vision.AiVisionOCR import AiVisionOCR
from archive_agent.ai.vision.AiVisionSchema import VisionSchema
from archive_agent.config.DecoderSettings import DecoderSettings
from archive_agent.data.DocumentContent import DocumentContent
from archive_agent.util.format import get_point_page_line_info, format_filename_short
from archive_agent.data.loader.pdf import is_pdf_document, load_pdf_document
from archive_agent.data.loader.image import is_image, load_image
from archive_agent.data.loader.text import is_plaintext, load_plaintext
from archive_agent.data.loader.text import is_ascii_document, load_ascii_document
from archive_agent.data.loader.text import is_binary_document, load_binary_document
from archive_agent.util.image_util import image_resize_safe, image_to_base64
from archive_agent.data.chunk import get_chunks_with_reference_ranges, get_sentences_with_reference_ranges
DecoderCallable = Callable[[ProgressInfo], Optional[DocumentContent]]
class FileData:
def __init__(
self,
ai_factory: AiManagerFactory,
decoder_settings: DecoderSettings,
file_path: str,
file_meta: Dict[str, Any],
max_workers_vision: int,
max_workers_embed: int,
):
"""
Initialize file data.
:param ai_factory: AI manager factory for creating instances.
:param decoder_settings: Decoder settings.
:param file_path: Path to the file.
:param file_meta: File metadata.
:param max_workers_vision: Max. workers for vision.
:param max_workers_embed: Max. workers for embedding.
"""
# Core dependencies and configuration
self.ai_factory = ai_factory
self.ai = ai_factory.get_ai() # Primary AI instance for vision, chunking, config
self.decoder_settings = decoder_settings
self.chunk_lines_block = self.ai.chunk_lines_block
self.max_workers_vision = max_workers_vision
self.max_workers_embed = max_workers_embed
# File metadata and logging
self.file_path = file_path
self.file_meta = file_meta
self.logger = self.ai.cli.get_prefixed_logger(prefix=format_filename_short(self.file_path))
# Processing components
self.chunk_processor = EmbedProcessor(ai_factory, self.logger, file_path, self.max_workers_embed)
self.points: List[PointStruct] = []
# Vision callback configuration based on AI provider capabilities
self.image_to_text_callback_combined = self.image_to_text_combined if self.ai.ai_provider.supports_vision else None
self.image_to_text_callback_entity = self.image_to_text_entity if self.ai.ai_provider.supports_vision else None
self.image_to_text_callback_ocr = self.image_to_text_ocr if self.ai.ai_provider.supports_vision else None
self.image_to_text_callback_page = self.image_to_text_callback_ocr
# Select appropriate vision callback based on decoder settings
if self.decoder_settings.image_ocr and self.decoder_settings.image_entity_extract:
self.image_to_text_callback_image = self.image_to_text_callback_combined
elif self.decoder_settings.image_ocr:
self.image_to_text_callback_image = self.image_to_text_ocr
elif self.decoder_settings.image_entity_extract:
self.image_to_text_callback_image = self.image_to_text_callback_entity
else:
self.image_to_text_callback_image = None
# Determine decoder function based on file type
self.decoder_func: Optional[DecoderCallable] = self.get_decoder_func()
def get_decoder_func(self) -> Optional[DecoderCallable]:
"""
Determine the appropriate decoder function based on file type.
:return: Decoder function or None if unsupported.
"""
if is_image(self.file_path):
return lambda progress_info: load_image(
ai_factory=self.ai_factory,
logger=self.logger,
file_path=self.file_path,
image_to_text_callback=self.image_to_text_callback_image,
progress_info=progress_info,
)
elif is_plaintext(self.file_path):
return lambda progress_info: load_plaintext(
logger=self.logger,
file_path=self.file_path,
)
elif is_ascii_document(self.file_path):
return lambda progress_info: load_ascii_document(
logger=self.logger,
file_path=self.file_path,
)
elif is_binary_document(self.file_path):
return lambda progress_info: load_binary_document(
ai_factory=self.ai_factory,
logger=self.logger,
verbose=self.ai.cli.VERBOSE_LOADER,
file_path=self.file_path,
max_workers_vision=self.max_workers_vision,
image_to_text_callback=self.image_to_text_callback_image,
progress_info=progress_info,
)
elif is_pdf_document(self.file_path):
return lambda progress_info: load_pdf_document(
ai_factory=self.ai_factory,
logger=self.logger,
verbose=self.ai.cli.VERBOSE_LOADER,
file_path=self.file_path,
max_workers_vision=self.max_workers_vision,
image_to_text_callback_page=self.image_to_text_callback_page,
image_to_text_callback_image=self.image_to_text_callback_image,
decoder_settings=self.decoder_settings,
progress_info=progress_info,
)
return None
def is_processable(self) -> bool:
"""
Check if the file is processable based on decoder availability.
:return: True if processable, False otherwise.
"""
return self.decoder_func is not None
# IMAGE PROCESSING AND VISION CALLBACKS
def image_to_text(self, ai: AiManager, image: Image.Image) -> Optional[VisionSchema]:
"""
Convert image to RGB if needed, resize, and process with AI vision.
:param ai: AI manager.
:param image: PIL Image object.
:return: VisionSchema result or None if failed.
"""
if image.mode != "RGB":
self.logger.debug(f"Converted image from '{image.mode}' to 'RGB'")
image = image.convert("RGB")
image_possibly_resized = image_resize_safe(image=image, logger=self.logger, verbose=self.ai.cli.VERBOSE_VISION)
if image_possibly_resized is None:
self.logger.warning(f"Failed to resize image")
return None
image_base64 = image_to_base64(image_possibly_resized)
vision_result = ai.vision(image_base64)
if vision_result.is_rejected:
self.logger.error(f"⚠️ Image rejected: \"{vision_result.rejection_reason}\"")
return None
return vision_result
def image_to_text_ocr(self, ai: AiManager, image: Image.Image, progress_info: ProgressInfo) -> Optional[str]:
"""
Request vision with OCR on the image and format the result.
:param ai: AI manager.
:param image: PIL Image object.
:param progress_info: Progress tracking information.
:return: OCR text or None if failed.
"""
if self.ai.cli.VERBOSE_VISION:
self.logger.info("Requesting vision feature: OCR")
ai.request_ocr()
vision_result = self.image_to_text(ai=ai, image=image)
progress_info.progress_manager.update_task(progress_info.parent_key, advance=1)
if vision_result is not None:
return AiVisionOCR.format_vision_answer(vision_result=vision_result)
else:
return None
def image_to_text_entity(self, ai: AiManager, image: Image.Image, progress_info: ProgressInfo) -> Optional[str]:
"""
Request vision with entity extraction on the image and format the result.
:param ai: AI manager.
:param image: PIL Image object.
:param progress_info: Progress tracking information.
:return: Entity text or None if failed.
"""
if self.ai.cli.VERBOSE_VISION:
self.logger.info("Requesting vision feature: Entity Extraction")
ai.request_entity()
vision_result = self.image_to_text(ai=ai, image=image)
progress_info.progress_manager.update_task(progress_info.parent_key, advance=1)
if vision_result is not None:
return AiVisionEntity.format_vision_answer(vision_result=vision_result)
else:
return None
def image_to_text_combined(self, ai: AiManager, image: Image.Image, progress_info: ProgressInfo) -> Optional[str]:
"""
Request vision with OCR and entity extraction on the image, format and join the results.
:param ai: AI manager.
:param image: PIL Image object.
:param progress_info: Progress tracking information.
:return: Combined text or None if any part failed.
"""
if self.ai.cli.VERBOSE_VISION:
self.logger.info("Requesting vision features: OCR, Entity Extraction")
ai.request_ocr()
vision_result_ocr = self.image_to_text(ai=ai, image=image)
progress_info.progress_manager.update_task(progress_info.parent_key, advance=1)
if vision_result_ocr is None:
return None
text_ocr = AiVisionOCR.format_vision_answer(vision_result=vision_result_ocr)
ai.request_entity()
vision_result_entity = self.image_to_text(ai=ai, image=image)
progress_info.progress_manager.update_task(progress_info.parent_key, advance=1)
if vision_result_entity is None:
return None
text_entity = AiVisionEntity.format_vision_answer(vision_result=vision_result_entity)
# Join with a single space
return text_ocr + " " + text_entity
def decode(self, progress_info: ProgressInfo) -> Optional[DocumentContent]:
"""
Decode the file using the determined decoder function.
:param progress_info: Progress tracking information
:return: DocumentContent or None if failed or unsupported
"""
if self.decoder_func is not None:
try:
return self.decoder_func(progress_info)
except Exception as e:
self.logger.error(f"Failed to process file: {e}")
return None
self.logger.warning(f"Cannot process file")
return None
# AI CHUNKING CALLBACK
# noinspection PyMethodMayBeStatic
def chunk_callback(self, ai: AiManager, block_of_sentences: List[str]) -> ChunkSchema:
"""
Callback for chunking a block of sentences using AI.
:param ai: AI manager.
:param block_of_sentences: List of sentences to chunk.
:return: ChunkSchema result.
"""
chunk_result = ai.chunk(block_of_sentences)
return chunk_result
def process(self, progress_manager: ProgressManager, file_progress_key: str) -> bool:
"""
Process the file through the complete RAG pipeline:
Phase 1: Document decoding and vision processing (PDF/Binary only)
Phase 2: Sentence extraction and AI chunking
Phase 3: Reference range analysis and setup
Phase 4: Parallel embedding and vector point creation
:param progress_manager: Progress manager for progress reporting.
:param file_progress_key: File progress key for progress tracking.
:return: True if successful, False otherwise.
"""
file_t0 = time.monotonic()
# PHASE 1: Document Decoding and Image Processing
phase_t0 = time.monotonic()
vision_progress_key = None
if is_pdf_document(self.file_path) or is_binary_document(self.file_path):
# Use generic interface - create child task under file
vision_progress_key = progress_manager.start_task("Image Processing", parent=file_progress_key, weight=0.33)
progress_manager.activate_task(vision_progress_key)
# Call the loader function assigned to this file data.
# NOTE: DocumentContent is an array of text lines, mapped to page or line numbers.
# Create ProgressInfo for clean parameter passing
if vision_progress_key:
vision_progress_info = progress_manager.create_progress_info(vision_progress_key)
else:
# For text-only files, decode() reports against file-level progress
vision_progress_info = progress_manager.create_progress_info(file_progress_key)
doc_content: Optional[DocumentContent] = self.decode(vision_progress_info)
# Complete image processing phase if it was created
if vision_progress_key is not None:
progress_manager.complete_task(vision_progress_key)
self.logger.info(f"Phase 1 (decode/vision) completed in {time.monotonic() - phase_t0:.1f}s")
# Decoder may fail, e.g. on I/O error, exhausted AI attempts, …
if doc_content is None:
self.logger.warning(f"Failed to decode document")
return False
# PHASE 2: Sentence Extraction and AI Chunking
phase_t0 = time.monotonic()
doc_content.strip_lines()
# Use preprocessing and NLP (spaCy) to split text into sentences, keeping track of references.
if self.ai.cli.VERBOSE_CHUNK:
self.logger.info(f"Extracting sentences across ({len(doc_content.lines)}) lines")
total_chars = sum(len(line) for line in doc_content.lines)
self.logger.info(
f"Submitting spaCy tokenization to subprocess "
f"({len(doc_content.lines)} lines, {total_chars:,} chars — large documents may take minutes)"
)
nlp_t0 = time.monotonic()
sentences_with_reference_ranges = get_sentences_with_reference_ranges(doc_content)
self.logger.info(
f"Sentence extraction completed in {time.monotonic() - nlp_t0:.1f}s "
f"({len(sentences_with_reference_ranges)} sentences)"
)
# Create chunking phase - use generic interface
has_vision = is_pdf_document(self.file_path) or is_binary_document(self.file_path)
chunking_weight = 0.34 if has_vision else 0.50
chunking_progress_key = progress_manager.start_task("Chunking", parent=file_progress_key, weight=chunking_weight)
progress_manager.activate_task(chunking_progress_key)
# Group sentences into chunks, keeping track of references.
if self.ai.cli.VERBOSE_CHUNK:
self.logger.info(f"Extracting chunks across ({len(sentences_with_reference_ranges)}) sentences")
chunks = get_chunks_with_reference_ranges(
ai_factory=self.ai_factory,
sentences_with_references=sentences_with_reference_ranges,
chunk_callback=self.chunk_callback,
chunk_lines_block=self.chunk_lines_block,
file_path=self.file_path,
progress_info=progress_manager.create_progress_info(chunking_progress_key),
logger=self.logger,
verbose=self.ai.cli.VERBOSE_CHUNK,
)
# Complete chunking phase
progress_manager.complete_task(chunking_progress_key)
self.logger.info(f"Phase 2 (chunking) completed in {time.monotonic() - phase_t0:.1f}s ({len(chunks)} chunks)")
# PHASE 3: Reference Range Analysis and Point Creation Setup
is_page_based = doc_content.pages_per_line is not None
if is_page_based:
max_page = max(doc_content.pages_per_line) if doc_content.pages_per_line else 0
reference_total_info = f"{max_page}"
else:
max_line = max(doc_content.lines_per_line) if doc_content.lines_per_line else 0
reference_total_info = f"{max_line}"
# Create embedding phase - use generic interface
phase_t0 = time.monotonic()
embedding_weight = 0.33 if has_vision else 0.50
embedding_progress_key = progress_manager.start_task(
"Embedding", parent=file_progress_key, weight=embedding_weight, total=len(chunks)
)
progress_manager.activate_task(embedding_progress_key)
# PHASE 4: Parallel Embedding and Vector Point Creation
# Process chunks in parallel for embedding
embedding_results = self.chunk_processor.process_chunks_parallel(
chunks=chunks,
verbose=self.ai.cli.VERBOSE_CHUNK,
progress_info=progress_manager.create_progress_info(embedding_progress_key)
)
# Process results and create points
for chunk_index, (chunk, vector) in enumerate(embedding_results):
if vector is None:
self.logger.warning(f"Failed to embed chunk ({chunk_index + 1}) / ({len(chunks)})")
continue
payload_model = QdrantPayload(
file_path=self.file_path,
file_mtime=self.file_meta['mtime'],
chunk_index=chunk_index,
chunks_total=len(chunks),
chunk_text=chunk.text,
version=f"v{__version__}",
page_range=None,
line_range=None,
)
min_r, max_r = chunk.reference_range
range_list = [min_r, max_r] if min_r != max_r else [min_r]
if is_page_based:
payload_model.page_range = range_list
else:
payload_model.line_range = range_list
payload = payload_model.model_dump()
point = PointStruct(
id=str(uuid.uuid4()),
vector=vector,
payload=payload,
)
if self.ai.cli.VERBOSE_CHUNK:
self.logger.info(
f"Reference for chunk ({chunk_index + 1}) / ({len(chunks)}): "
f"{get_point_page_line_info(point)} "
f"of {reference_total_info}"
)
self.points.append(point)
# Complete embedding phase
progress_manager.complete_task(embedding_progress_key)
self.logger.info(f"Phase 3+4 (embedding) completed in {time.monotonic() - phase_t0:.1f}s ({len(self.points)} points)")
self.logger.info(f"File processing completed in {time.monotonic() - file_t0:.1f}s")
return True