From 7bc1e8aa8fac4da01a8563b0f22c80c8794dff87 Mon Sep 17 00:00:00 2001 From: IreneLi Date: Fri, 3 Jan 2025 14:02:39 -0500 Subject: [PATCH 01/30] Create class CLPRemoteHandler --- src/clp_logging/remote_handlers.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 src/clp_logging/remote_handlers.py diff --git a/src/clp_logging/remote_handlers.py b/src/clp_logging/remote_handlers.py new file mode 100644 index 0000000..459d3f4 --- /dev/null +++ b/src/clp_logging/remote_handlers.py @@ -0,0 +1,30 @@ +import boto3 +from botocore.exceptions import NoCredentialsError +from clp_logging.handlers import * +from pathlib import Path +from typing import Optional + +class CLPRemoteHandler(): + """ + Handles CLP file upload and comparison to AWS S3 bucket. + Configuration of AWS access key is required. Run command `aws configure` + """ + + def __init__( + self, + s3_bucket: str, + ) -> None: + self.s3_resource: boto3.resources.factory.s3.ServiceResource = boto3.resource("s3") + self.s3_client: boto3.client = boto3.client("s3") + self.bucket: str = s3_bucket + + self.log_name: Optional[str] = None + self.log_path: Optional[Path] = None + self.remote_folder_path: Optional[str] = None + self.obj_key: Optional[str] = None + + def get_obj_key(self) -> str: + return self.obj_key + + def set_obj_key(self, obj_key) -> None: + self.obj_key = obj_key From 0ae4a2a446278981568c322843769061b93482ee Mon Sep 17 00:00:00 2001 From: IreneLi Date: Fri, 3 Jan 2025 14:04:57 -0500 Subject: [PATCH 02/30] Create multipart upload initiation --- src/clp_logging/remote_handlers.py | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/src/clp_logging/remote_handlers.py b/src/clp_logging/remote_handlers.py index 459d3f4..d6478c5 100644 --- a/src/clp_logging/remote_handlers.py +++ b/src/clp_logging/remote_handlers.py @@ -1,8 +1,9 @@ import boto3 from botocore.exceptions import NoCredentialsError from clp_logging.handlers import * +import datetime from pathlib import Path -from typing import Optional +from typing import Optional, Any, List class CLPRemoteHandler(): """ @@ -23,8 +24,31 @@ def __init__( self.remote_folder_path: Optional[str] = None self.obj_key: Optional[str] = None + def _remote_log_naming(self, timestamp: datetime.datetime) -> str: + new_filename: str + ext: int = self.log_name.find(".") + upload_time: str = timestamp.strftime("%Y-%m-%d-%H%M%S") + + if ext != -1: + new_filename = f'log_{upload_time}{self.log_name[ext:]}' + else: + new_filename = f'{upload_time}_{self.log_name}' + new_filename = f"{self.remote_folder_path}/{new_filename}" + return new_filename + def get_obj_key(self) -> str: return self.obj_key def set_obj_key(self, obj_key) -> None: self.obj_key = obj_key + + def initiate_upload(self, log_path: Path) -> None: + self.log_path: Path = log_path + self.log_name: str = log_path.name + timestamp: datetime.datetime = datetime.datetime.now() + self.remote_folder_path: str = f'logs/{timestamp.year}/{timestamp.month}/{timestamp.day}' + + self.obj_key: str = self._remote_log_naming(timestamp) + create_ret: Dict[str, Any] = self.s3_client.create_multipart_upload(Bucket=self.bucket, Key=self.obj_key, + ChecksumAlgorithm='SHA256') + self.upload_id = create_ret['UploadId'] From 8895b37daad19aa1ed1510481bab80617a0ce542 Mon Sep 17 00:00:00 2001 From: IreneLi Date: Fri, 3 Jan 2025 14:07:09 -0500 Subject: [PATCH 03/30] Create function that performs multipart upload on 5mb segments --- src/clp_logging/remote_handlers.py | 88 +++++++++++++++++++++++++++++- 1 file changed, 86 insertions(+), 2 deletions(-) diff --git a/src/clp_logging/remote_handlers.py b/src/clp_logging/remote_handlers.py index d6478c5..278a553 100644 --- a/src/clp_logging/remote_handlers.py +++ b/src/clp_logging/remote_handlers.py @@ -1,11 +1,13 @@ +import base64 import boto3 from botocore.exceptions import NoCredentialsError from clp_logging.handlers import * import datetime +import hashlib from pathlib import Path -from typing import Optional, Any, List +from typing import Any, Dict, List, Optional -class CLPRemoteHandler(): +class CLPRemoteHandler(CLPFileHandler): """ Handles CLP file upload and comparison to AWS S3 bucket. Configuration of AWS access key is required. Run command `aws configure` @@ -24,6 +26,19 @@ def __init__( self.remote_folder_path: Optional[str] = None self.obj_key: Optional[str] = None + self.multipart_upload_config: Dict[str, int] = { + 'size': 1024 * 1024 * 5, + 'index': 1, + 'pos': 0, + } + self.uploaded_parts: List[Dict[str, int | str]] = [] + self.upload_id: Optional[int] = None + + def _calculate_part_sha256(self, data: bytes) -> str: + sha256_hash: hashlib.Hash = hashlib.sha256() + sha256_hash.update(data) + return base64.b64encode(sha256_hash.digest()).decode('utf-8') + def _remote_log_naming(self, timestamp: datetime.datetime) -> str: new_filename: str ext: int = self.log_name.find(".") @@ -36,6 +51,45 @@ def _remote_log_naming(self, timestamp: datetime.datetime) -> str: new_filename = f"{self.remote_folder_path}/{new_filename}" return new_filename + def _upload_part(self, upload_id) -> Dict[str, int | str]: + upload_data: bytes + # Read the latest file + try: + with open(self.log_path, 'rb') as file: + file.seek(self.multipart_upload_config['pos']) + upload_data = file.read(self.multipart_upload_config['size']) + except FileNotFoundError as e: + raise FileNotFoundError(f'The log file {self.log_path} cannot be found: {e}') from e + except IOError as e: + raise IOError(f'IO Error occurred while reading file {self.log_path}: {e}') from e + except Exception as e: + raise e + + try: + sha256_checksum: str = self._calculate_part_sha256(upload_data) + response: Dict[str, Any] = self.s3_client.upload_part( + Bucket=self.bucket, + Key=self.obj_key, + Body=upload_data, + PartNumber=self.multipart_upload_config['index'], + UploadId=upload_id, + ChecksumSHA256=sha256_checksum + ) + print(f'Uploaded Part {self.multipart_upload_config["index"]}') + print(response) + + # Store both ETag and SHA256 for validation + return { + 'PartNumber': self.multipart_upload_config['index'], + 'ETag': response['ETag'], + 'ChecksumSHA256': response['ChecksumSHA256'], + } + except Exception as e: + self.s3_client.abort_multipart_upload( + Bucket=self.bucket, Key=self.obj_key, UploadId=upload_id + ) + raise Exception(f'Error occurred during multipart upload on part {self.multipart_upload_config["index"]}: {e}') from e + def get_obj_key(self) -> str: return self.obj_key @@ -52,3 +106,33 @@ def initiate_upload(self, log_path: Path) -> None: create_ret: Dict[str, Any] = self.s3_client.create_multipart_upload(Bucket=self.bucket, Key=self.obj_key, ChecksumAlgorithm='SHA256') self.upload_id = create_ret['UploadId'] + + def multipart_upload(self) -> None: + # Upload initiation is required before upload + if not self.upload_id: + raise Exception('No upload process.') + + file_size: int = self.log_path.stat().st_size + print(file_size) + try: + while ( + file_size - self.multipart_upload_config['pos'] + >= self.multipart_upload_config['size'] + ): + upload_status: Dict[str, int | str] = self._upload_part(self.upload_id) + print(upload_status) + self.multipart_upload_config['index'] += 1 + self.multipart_upload_config['pos'] += self.multipart_upload_config['size'] + self.uploaded_parts.append(upload_status) + + # AWS S3 limits object part count to 10000 + if self.multipart_upload_config['index'] > 10000: + break + + except NoCredentialsError as e: + raise e + except Exception as e: + self.s3_client.abort_multipart_upload( + Bucket=self.bucket, Key=self.obj_key, UploadId=self.upload_id + ) + raise e From 7475ed5ae4d7282d82722bb4dddff4b622a216e7 Mon Sep 17 00:00:00 2001 From: IreneLi Date: Fri, 3 Jan 2025 14:10:12 -0500 Subject: [PATCH 04/30] Create function that completes multipart upload --- src/clp_logging/remote_handlers.py | 40 ++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/src/clp_logging/remote_handlers.py b/src/clp_logging/remote_handlers.py index 278a553..bebf9a9 100644 --- a/src/clp_logging/remote_handlers.py +++ b/src/clp_logging/remote_handlers.py @@ -136,3 +136,43 @@ def multipart_upload(self) -> None: Bucket=self.bucket, Key=self.obj_key, UploadId=self.upload_id ) raise e + + def complete_upload(self) -> None: + if not self.upload_id: + raise Exception('No upload process to complete.') + + file_size: int = self.log_path.stat().st_size + try: + # Upload the remaining segment + if file_size - self.multipart_upload_config['pos'] < self.multipart_upload_config['size']: + self.multipart_upload_config['size'] = file_size - self.multipart_upload_config['pos'] + upload_status: Dict[str, int | str] = self._upload_part(self.upload_id) + self.multipart_upload_config['index'] += 1 + self.uploaded_parts.append(upload_status) + except Exception as e: + self.s3_client.abort_multipart_upload( + Bucket=self.bucket, Key=self.obj_key, UploadId=self.upload_id + ) + raise e + + print(self.obj_key) + response = self.s3_client.complete_multipart_upload( + Bucket=self.bucket, + Key=self.obj_key, + UploadId=self.upload_id, + MultipartUpload={ + 'Parts': [ + {'PartNumber': part['PartNumber'], 'ETag': part['ETag'], 'ChecksumSHA256': part['ChecksumSHA256']} + for part in self.uploaded_parts + ] + }, + ) + print(response) + print('Complete multipart upload') + try: + response = self.s3_client.head_object(Bucket=self.bucket, Key=self.obj_key) + print('Object metadata:', response) + except Exception as e: + print('Object not found:', e) + self.upload_id = None + self.obj_key = None From 96a831bdd4a9754851788e13ed5502ff2d247fb4 Mon Sep 17 00:00:00 2001 From: IreneLi Date: Fri, 3 Jan 2025 14:14:39 -0500 Subject: [PATCH 05/30] Handle corner case: file rotation when part number exceeds 10000 --- src/clp_logging/remote_handlers.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/clp_logging/remote_handlers.py b/src/clp_logging/remote_handlers.py index bebf9a9..cc2684b 100644 --- a/src/clp_logging/remote_handlers.py +++ b/src/clp_logging/remote_handlers.py @@ -33,6 +33,7 @@ def __init__( } self.uploaded_parts: List[Dict[str, int | str]] = [] self.upload_id: Optional[int] = None + self.remote_file_count: int = 0 def _calculate_part_sha256(self, data: bytes) -> str: sha256_hash: hashlib.Hash = hashlib.sha256() @@ -43,6 +44,8 @@ def _remote_log_naming(self, timestamp: datetime.datetime) -> str: new_filename: str ext: int = self.log_name.find(".") upload_time: str = timestamp.strftime("%Y-%m-%d-%H%M%S") + if self.remote_file_count != 0: + upload_time += "-" + str(self.remote_file_count) if ext != -1: new_filename = f'log_{upload_time}{self.log_name[ext:]}' @@ -127,7 +130,16 @@ def multipart_upload(self) -> None: # AWS S3 limits object part count to 10000 if self.multipart_upload_config['index'] > 10000: - break + self.complete_upload() + + # Initiate multipart upload to a new S3 object + self.remote_file_count += 1 + self.obj_key = self._remote_log_naming() + self.multipart_upload_config['index'] = 1 + self.uploaded_parts = [] + create_ret = self.s3_client.create_multipart_upload(Bucket=self.bucket, Key=self.obj_key, + ChecksumAlgorithm='SHA256') + self.upload_id = create_ret['UploadId'] except NoCredentialsError as e: raise e From 055eaf0b74aa163ea1406d0369c56648fa30db80 Mon Sep 17 00:00:00 2001 From: IreneLi Date: Fri, 3 Jan 2025 14:16:35 -0500 Subject: [PATCH 06/30] Integrate timeout functionalities for CLPLogLevelTimeout usage and close for CLPFileHandler usage --- src/clp_logging/remote_handlers.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/clp_logging/remote_handlers.py b/src/clp_logging/remote_handlers.py index cc2684b..421b9da 100644 --- a/src/clp_logging/remote_handlers.py +++ b/src/clp_logging/remote_handlers.py @@ -188,3 +188,19 @@ def complete_upload(self) -> None: print('Object not found:', e) self.upload_id = None self.obj_key = None + + def timeout(self, log_path: Path) -> None: + print("time out start") + if not self.upload_id: + super().__init__(fpath=log_path) + self.initiate_upload(log_path) + + self.multipart_upload() + print("time out end") + + def close(self): + print("close start") + super().close() + if self.closed: + self.complete_upload() + print("close end") \ No newline at end of file From b32b5f251a9b53cf8d914431a5991ab3a1725998 Mon Sep 17 00:00:00 2001 From: IreneLi Date: Fri, 3 Jan 2025 14:21:48 -0500 Subject: [PATCH 07/30] Avoid new upload initialization when previous upload has not completed --- src/clp_logging/remote_handlers.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/clp_logging/remote_handlers.py b/src/clp_logging/remote_handlers.py index 421b9da..a124703 100644 --- a/src/clp_logging/remote_handlers.py +++ b/src/clp_logging/remote_handlers.py @@ -34,6 +34,7 @@ def __init__( self.uploaded_parts: List[Dict[str, int | str]] = [] self.upload_id: Optional[int] = None self.remote_file_count: int = 0 + self.upload_in_progress: bool = False def _calculate_part_sha256(self, data: bytes) -> str: sha256_hash: hashlib.Hash = hashlib.sha256() @@ -100,8 +101,12 @@ def set_obj_key(self, obj_key) -> None: self.obj_key = obj_key def initiate_upload(self, log_path: Path) -> None: + if self.upload_in_progress: + raise Exception('An upload is already in progress. Cannot initiate another upload.') + self.log_path: Path = log_path self.log_name: str = log_path.name + self.upload_in_progress = True timestamp: datetime.datetime = datetime.datetime.now() self.remote_folder_path: str = f'logs/{timestamp.year}/{timestamp.month}/{timestamp.day}' @@ -186,6 +191,7 @@ def complete_upload(self) -> None: print('Object metadata:', response) except Exception as e: print('Object not found:', e) + self.upload_in_progress = False self.upload_id = None self.obj_key = None From ed3a57b751e8b96b5a0b28b2619eb57b5658608a Mon Sep 17 00:00:00 2001 From: IreneLi Date: Fri, 3 Jan 2025 14:25:21 -0500 Subject: [PATCH 08/30] Ensure consistent string quotation --- src/clp_logging/remote_handlers.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/clp_logging/remote_handlers.py b/src/clp_logging/remote_handlers.py index a124703..90853e4 100644 --- a/src/clp_logging/remote_handlers.py +++ b/src/clp_logging/remote_handlers.py @@ -17,8 +17,8 @@ def __init__( self, s3_bucket: str, ) -> None: - self.s3_resource: boto3.resources.factory.s3.ServiceResource = boto3.resource("s3") - self.s3_client: boto3.client = boto3.client("s3") + self.s3_resource: boto3.resources.factory.s3.ServiceResource = boto3.resource('s3') + self.s3_client: boto3.client = boto3.client('s3') self.bucket: str = s3_bucket self.log_name: Optional[str] = None @@ -43,16 +43,16 @@ def _calculate_part_sha256(self, data: bytes) -> str: def _remote_log_naming(self, timestamp: datetime.datetime) -> str: new_filename: str - ext: int = self.log_name.find(".") - upload_time: str = timestamp.strftime("%Y-%m-%d-%H%M%S") + ext: int = self.log_name.find('.') + upload_time: str = timestamp.strftime('%Y-%m-%d-%H%M%S') if self.remote_file_count != 0: - upload_time += "-" + str(self.remote_file_count) + upload_time += '-' + str(self.remote_file_count) if ext != -1: new_filename = f'log_{upload_time}{self.log_name[ext:]}' else: new_filename = f'{upload_time}_{self.log_name}' - new_filename = f"{self.remote_folder_path}/{new_filename}" + new_filename = f'{self.remote_folder_path}/{new_filename}' return new_filename def _upload_part(self, upload_id) -> Dict[str, int | str]: From 964ae20a5ec673d1ee70c7ca0788fc7d5cc5e2d7 Mon Sep 17 00:00:00 2001 From: IreneLi Date: Sat, 4 Jan 2025 19:29:22 -0500 Subject: [PATCH 09/30] Remove print statements and fix aws segment limitation error --- src/clp_logging/remote_handlers.py | 40 ++++++++++++++---------------- 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/src/clp_logging/remote_handlers.py b/src/clp_logging/remote_handlers.py index 90853e4..e38abc8 100644 --- a/src/clp_logging/remote_handlers.py +++ b/src/clp_logging/remote_handlers.py @@ -45,6 +45,7 @@ def _remote_log_naming(self, timestamp: datetime.datetime) -> str: new_filename: str ext: int = self.log_name.find('.') upload_time: str = timestamp.strftime('%Y-%m-%d-%H%M%S') + # File rotation if self.remote_file_count != 0: upload_time += '-' + str(self.remote_file_count) @@ -79,8 +80,6 @@ def _upload_part(self, upload_id) -> Dict[str, int | str]: UploadId=upload_id, ChecksumSHA256=sha256_checksum ) - print(f'Uploaded Part {self.multipart_upload_config["index"]}') - print(response) # Store both ETag and SHA256 for validation return { @@ -121,25 +120,36 @@ def multipart_upload(self) -> None: raise Exception('No upload process.') file_size: int = self.log_path.stat().st_size - print(file_size) try: while ( file_size - self.multipart_upload_config['pos'] >= self.multipart_upload_config['size'] ): upload_status: Dict[str, int | str] = self._upload_part(self.upload_id) - print(upload_status) self.multipart_upload_config['index'] += 1 self.multipart_upload_config['pos'] += self.multipart_upload_config['size'] self.uploaded_parts.append(upload_status) # AWS S3 limits object part count to 10000 - if self.multipart_upload_config['index'] > 10000: - self.complete_upload() + if self.multipart_upload_config['index'] >= 10000: + self.s3_client.complete_multipart_upload( + Bucket=self.bucket, + Key=self.obj_key, + UploadId=self.upload_id, + MultipartUpload={ + 'Parts': [ + {'PartNumber': part['PartNumber'], 'ETag': part['ETag'], + 'ChecksumSHA256': part['ChecksumSHA256']} + for part in self.uploaded_parts + ] + }, + ) # Initiate multipart upload to a new S3 object self.remote_file_count += 1 - self.obj_key = self._remote_log_naming() + timestamp: datetime.datetime = datetime.datetime.now() + self.remote_folder_path = f'logs/{timestamp.year}/{timestamp.month}/{timestamp.day}' + self.obj_key = self._remote_log_naming(timestamp) self.multipart_upload_config['index'] = 1 self.uploaded_parts = [] create_ret = self.s3_client.create_multipart_upload(Bucket=self.bucket, Key=self.obj_key, @@ -172,8 +182,7 @@ def complete_upload(self) -> None: ) raise e - print(self.obj_key) - response = self.s3_client.complete_multipart_upload( + self.s3_client.complete_multipart_upload( Bucket=self.bucket, Key=self.obj_key, UploadId=self.upload_id, @@ -184,29 +193,18 @@ def complete_upload(self) -> None: ] }, ) - print(response) - print('Complete multipart upload') - try: - response = self.s3_client.head_object(Bucket=self.bucket, Key=self.obj_key) - print('Object metadata:', response) - except Exception as e: - print('Object not found:', e) self.upload_in_progress = False self.upload_id = None self.obj_key = None def timeout(self, log_path: Path) -> None: - print("time out start") if not self.upload_id: super().__init__(fpath=log_path) self.initiate_upload(log_path) self.multipart_upload() - print("time out end") def close(self): - print("close start") super().close() if self.closed: - self.complete_upload() - print("close end") \ No newline at end of file + self.complete_upload() \ No newline at end of file From 1fa016363790c34ce69032d7ac7ae13be903600e Mon Sep 17 00:00:00 2001 From: IreneLi Date: Sat, 4 Jan 2025 23:16:25 -0500 Subject: [PATCH 10/30] Modify code to pass typing tests. --- src/clp_logging/remote_handlers.py | 36 +++++++++++++++++++----------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/src/clp_logging/remote_handlers.py b/src/clp_logging/remote_handlers.py index e38abc8..e44db18 100644 --- a/src/clp_logging/remote_handlers.py +++ b/src/clp_logging/remote_handlers.py @@ -37,11 +37,14 @@ def __init__( self.upload_in_progress: bool = False def _calculate_part_sha256(self, data: bytes) -> str: - sha256_hash: hashlib.Hash = hashlib.sha256() + sha256_hash: hashlib._Hash = hashlib.sha256() sha256_hash.update(data) return base64.b64encode(sha256_hash.digest()).decode('utf-8') def _remote_log_naming(self, timestamp: datetime.datetime) -> str: + if self.log_name is None: + raise ValueError("No input file.") + new_filename: str ext: int = self.log_name.find('.') upload_time: str = timestamp.strftime('%Y-%m-%d-%H%M%S') @@ -56,7 +59,10 @@ def _remote_log_naming(self, timestamp: datetime.datetime) -> str: new_filename = f'{self.remote_folder_path}/{new_filename}' return new_filename - def _upload_part(self, upload_id) -> Dict[str, int | str]: + def _upload_part(self) -> Dict[str, int | str]: + if self.log_path is None: + raise ValueError("No input file.") + upload_data: bytes # Read the latest file try: @@ -77,7 +83,7 @@ def _upload_part(self, upload_id) -> Dict[str, int | str]: Key=self.obj_key, Body=upload_data, PartNumber=self.multipart_upload_config['index'], - UploadId=upload_id, + UploadId=self.upload_id, ChecksumSHA256=sha256_checksum ) @@ -89,27 +95,27 @@ def _upload_part(self, upload_id) -> Dict[str, int | str]: } except Exception as e: self.s3_client.abort_multipart_upload( - Bucket=self.bucket, Key=self.obj_key, UploadId=upload_id + Bucket=self.bucket, Key=self.obj_key, UploadId=self.upload_id ) raise Exception(f'Error occurred during multipart upload on part {self.multipart_upload_config["index"]}: {e}') from e - def get_obj_key(self) -> str: + def get_obj_key(self) -> str | None: return self.obj_key - def set_obj_key(self, obj_key) -> None: + def set_obj_key(self, obj_key: str) -> None: self.obj_key = obj_key def initiate_upload(self, log_path: Path) -> None: if self.upload_in_progress: raise Exception('An upload is already in progress. Cannot initiate another upload.') - self.log_path: Path = log_path - self.log_name: str = log_path.name + self.log_path = log_path + self.log_name = log_path.name self.upload_in_progress = True timestamp: datetime.datetime = datetime.datetime.now() - self.remote_folder_path: str = f'logs/{timestamp.year}/{timestamp.month}/{timestamp.day}' + self.remote_folder_path = f'logs/{timestamp.year}/{timestamp.month}/{timestamp.day}' - self.obj_key: str = self._remote_log_naming(timestamp) + self.obj_key = self._remote_log_naming(timestamp) create_ret: Dict[str, Any] = self.s3_client.create_multipart_upload(Bucket=self.bucket, Key=self.obj_key, ChecksumAlgorithm='SHA256') self.upload_id = create_ret['UploadId'] @@ -118,6 +124,8 @@ def multipart_upload(self) -> None: # Upload initiation is required before upload if not self.upload_id: raise Exception('No upload process.') + if self.log_path is None: + raise ValueError("No input file.") file_size: int = self.log_path.stat().st_size try: @@ -125,7 +133,7 @@ def multipart_upload(self) -> None: file_size - self.multipart_upload_config['pos'] >= self.multipart_upload_config['size'] ): - upload_status: Dict[str, int | str] = self._upload_part(self.upload_id) + upload_status: Dict[str, int | str] = self._upload_part() self.multipart_upload_config['index'] += 1 self.multipart_upload_config['pos'] += self.multipart_upload_config['size'] self.uploaded_parts.append(upload_status) @@ -167,13 +175,15 @@ def multipart_upload(self) -> None: def complete_upload(self) -> None: if not self.upload_id: raise Exception('No upload process to complete.') + if self.log_path is None: + raise ValueError("No input file.") file_size: int = self.log_path.stat().st_size try: # Upload the remaining segment if file_size - self.multipart_upload_config['pos'] < self.multipart_upload_config['size']: self.multipart_upload_config['size'] = file_size - self.multipart_upload_config['pos'] - upload_status: Dict[str, int | str] = self._upload_part(self.upload_id) + upload_status: Dict[str, int | str] = self._upload_part() self.multipart_upload_config['index'] += 1 self.uploaded_parts.append(upload_status) except Exception as e: @@ -204,7 +214,7 @@ def timeout(self, log_path: Path) -> None: self.multipart_upload() - def close(self): + def close(self) -> None: super().close() if self.closed: self.complete_upload() \ No newline at end of file From 85cac30ee6e5f48f4025a14fada3008fdf671235 Mon Sep 17 00:00:00 2001 From: IreneLi Date: Sat, 4 Jan 2025 23:30:23 -0500 Subject: [PATCH 11/30] Code format & linting test --- src/clp_logging/remote_handlers.py | 144 +++++++++++++++++------------ 1 file changed, 85 insertions(+), 59 deletions(-) diff --git a/src/clp_logging/remote_handlers.py b/src/clp_logging/remote_handlers.py index e44db18..e667234 100644 --- a/src/clp_logging/remote_handlers.py +++ b/src/clp_logging/remote_handlers.py @@ -1,24 +1,29 @@ import base64 -import boto3 -from botocore.exceptions import NoCredentialsError -from clp_logging.handlers import * import datetime import hashlib from pathlib import Path from typing import Any, Dict, List, Optional +import boto3 +from botocore.exceptions import NoCredentialsError + +from clp_logging.handlers import CLPFileHandler + + class CLPRemoteHandler(CLPFileHandler): """ - Handles CLP file upload and comparison to AWS S3 bucket. - Configuration of AWS access key is required. Run command `aws configure` + Handles CLP file upload and comparison to AWS S3 bucket. Configuration of + AWS access key is required. Run command `aws configure`. + + :param s3_bucket: Name of the AWS S3 Bucket where log files are transferred """ def __init__( - self, - s3_bucket: str, + self, + s3_bucket: str, ) -> None: - self.s3_resource: boto3.resources.factory.s3.ServiceResource = boto3.resource('s3') - self.s3_client: boto3.client = boto3.client('s3') + self.s3_resource: boto3.resources.factory.s3.ServiceResource = boto3.resource("s3") + self.s3_client: boto3.client = boto3.client("s3") self.bucket: str = s3_bucket self.log_name: Optional[str] = None @@ -27,9 +32,9 @@ def __init__( self.obj_key: Optional[str] = None self.multipart_upload_config: Dict[str, int] = { - 'size': 1024 * 1024 * 5, - 'index': 1, - 'pos': 0, + "size": 1024 * 1024 * 5, + "index": 1, + "pos": 0, } self.uploaded_parts: List[Dict[str, int | str]] = [] self.upload_id: Optional[int] = None @@ -39,24 +44,24 @@ def __init__( def _calculate_part_sha256(self, data: bytes) -> str: sha256_hash: hashlib._Hash = hashlib.sha256() sha256_hash.update(data) - return base64.b64encode(sha256_hash.digest()).decode('utf-8') + return base64.b64encode(sha256_hash.digest()).decode("utf-8") def _remote_log_naming(self, timestamp: datetime.datetime) -> str: if self.log_name is None: raise ValueError("No input file.") new_filename: str - ext: int = self.log_name.find('.') - upload_time: str = timestamp.strftime('%Y-%m-%d-%H%M%S') - # File rotation + ext: int = self.log_name.find(".") + upload_time: str = timestamp.strftime("%Y-%m-%d-%H%M%S") + # Naming of multiple remote files from the same local file if self.remote_file_count != 0: - upload_time += '-' + str(self.remote_file_count) + upload_time += "-" + str(self.remote_file_count) if ext != -1: - new_filename = f'log_{upload_time}{self.log_name[ext:]}' + new_filename = f"log_{upload_time}{self.log_name[ext:]}" else: - new_filename = f'{upload_time}_{self.log_name}' - new_filename = f'{self.remote_folder_path}/{new_filename}' + new_filename = f"{upload_time}_{self.log_name}" + new_filename = f"{self.remote_folder_path}/{new_filename}" return new_filename def _upload_part(self) -> Dict[str, int | str]: @@ -64,15 +69,15 @@ def _upload_part(self) -> Dict[str, int | str]: raise ValueError("No input file.") upload_data: bytes - # Read the latest file + # Read the latest version of the file try: - with open(self.log_path, 'rb') as file: - file.seek(self.multipart_upload_config['pos']) - upload_data = file.read(self.multipart_upload_config['size']) + with open(self.log_path, "rb") as file: + file.seek(self.multipart_upload_config["pos"]) + upload_data = file.read(self.multipart_upload_config["size"]) except FileNotFoundError as e: - raise FileNotFoundError(f'The log file {self.log_path} cannot be found: {e}') from e + raise FileNotFoundError(f"The log file {self.log_path} cannot be found: {e}") from e except IOError as e: - raise IOError(f'IO Error occurred while reading file {self.log_path}: {e}') from e + raise IOError(f"IO Error occurred while reading file {self.log_path}: {e}") from e except Exception as e: raise e @@ -82,22 +87,24 @@ def _upload_part(self) -> Dict[str, int | str]: Bucket=self.bucket, Key=self.obj_key, Body=upload_data, - PartNumber=self.multipart_upload_config['index'], + PartNumber=self.multipart_upload_config["index"], UploadId=self.upload_id, - ChecksumSHA256=sha256_checksum + ChecksumSHA256=sha256_checksum, ) # Store both ETag and SHA256 for validation return { - 'PartNumber': self.multipart_upload_config['index'], - 'ETag': response['ETag'], - 'ChecksumSHA256': response['ChecksumSHA256'], + "PartNumber": self.multipart_upload_config["index"], + "ETag": response["ETag"], + "ChecksumSHA256": response["ChecksumSHA256"], } except Exception as e: self.s3_client.abort_multipart_upload( Bucket=self.bucket, Key=self.obj_key, UploadId=self.upload_id ) - raise Exception(f'Error occurred during multipart upload on part {self.multipart_upload_config["index"]}: {e}') from e + raise Exception( + f'Multipart Upload on Part {self.multipart_upload_config["index"]}: {e}' + ) from e def get_obj_key(self) -> str | None: return self.obj_key @@ -107,47 +114,52 @@ def set_obj_key(self, obj_key: str) -> None: def initiate_upload(self, log_path: Path) -> None: if self.upload_in_progress: - raise Exception('An upload is already in progress. Cannot initiate another upload.') + raise Exception("An upload is already in progress. Cannot initiate another upload.") self.log_path = log_path self.log_name = log_path.name self.upload_in_progress = True timestamp: datetime.datetime = datetime.datetime.now() - self.remote_folder_path = f'logs/{timestamp.year}/{timestamp.month}/{timestamp.day}' + self.remote_folder_path = f"logs/{timestamp.year}/{timestamp.month}/{timestamp.day}" self.obj_key = self._remote_log_naming(timestamp) - create_ret: Dict[str, Any] = self.s3_client.create_multipart_upload(Bucket=self.bucket, Key=self.obj_key, - ChecksumAlgorithm='SHA256') - self.upload_id = create_ret['UploadId'] + create_ret: Dict[str, Any] = self.s3_client.create_multipart_upload( + Bucket=self.bucket, Key=self.obj_key, ChecksumAlgorithm="SHA256" + ) + self.upload_id = create_ret["UploadId"] def multipart_upload(self) -> None: - # Upload initiation is required before upload + # Upload initiation is required before multipart_upload if not self.upload_id: - raise Exception('No upload process.') + raise Exception("No upload process.") if self.log_path is None: raise ValueError("No input file.") file_size: int = self.log_path.stat().st_size try: while ( - file_size - self.multipart_upload_config['pos'] - >= self.multipart_upload_config['size'] + file_size - self.multipart_upload_config["pos"] + >= self.multipart_upload_config["size"] ): + # Perform upload and label the uploaded part upload_status: Dict[str, int | str] = self._upload_part() - self.multipart_upload_config['index'] += 1 - self.multipart_upload_config['pos'] += self.multipart_upload_config['size'] + self.multipart_upload_config["index"] += 1 + self.multipart_upload_config["pos"] += self.multipart_upload_config["size"] self.uploaded_parts.append(upload_status) # AWS S3 limits object part count to 10000 - if self.multipart_upload_config['index'] >= 10000: + if self.multipart_upload_config["index"] >= 10000: self.s3_client.complete_multipart_upload( Bucket=self.bucket, Key=self.obj_key, UploadId=self.upload_id, MultipartUpload={ - 'Parts': [ - {'PartNumber': part['PartNumber'], 'ETag': part['ETag'], - 'ChecksumSHA256': part['ChecksumSHA256']} + "Parts": [ + { + "PartNumber": part["PartNumber"], + "ETag": part["ETag"], + "ChecksumSHA256": part["ChecksumSHA256"], + } for part in self.uploaded_parts ] }, @@ -156,13 +168,16 @@ def multipart_upload(self) -> None: # Initiate multipart upload to a new S3 object self.remote_file_count += 1 timestamp: datetime.datetime = datetime.datetime.now() - self.remote_folder_path = f'logs/{timestamp.year}/{timestamp.month}/{timestamp.day}' + self.remote_folder_path = ( + f"logs/{timestamp.year}/{timestamp.month}/{timestamp.day}" + ) self.obj_key = self._remote_log_naming(timestamp) - self.multipart_upload_config['index'] = 1 + self.multipart_upload_config["index"] = 1 self.uploaded_parts = [] - create_ret = self.s3_client.create_multipart_upload(Bucket=self.bucket, Key=self.obj_key, - ChecksumAlgorithm='SHA256') - self.upload_id = create_ret['UploadId'] + create_ret = self.s3_client.create_multipart_upload( + Bucket=self.bucket, Key=self.obj_key, ChecksumAlgorithm="SHA256" + ) + self.upload_id = create_ret["UploadId"] except NoCredentialsError as e: raise e @@ -173,18 +188,24 @@ def multipart_upload(self) -> None: raise e def complete_upload(self) -> None: + # Upload initiation is required before complete_upload if not self.upload_id: - raise Exception('No upload process to complete.') + raise Exception("No upload process to complete.") if self.log_path is None: raise ValueError("No input file.") file_size: int = self.log_path.stat().st_size try: # Upload the remaining segment - if file_size - self.multipart_upload_config['pos'] < self.multipart_upload_config['size']: - self.multipart_upload_config['size'] = file_size - self.multipart_upload_config['pos'] + if ( + file_size - self.multipart_upload_config["pos"] + < self.multipart_upload_config["size"] + ): + self.multipart_upload_config["size"] = ( + file_size - self.multipart_upload_config["pos"] + ) upload_status: Dict[str, int | str] = self._upload_part() - self.multipart_upload_config['index'] += 1 + self.multipart_upload_config["index"] += 1 self.uploaded_parts.append(upload_status) except Exception as e: self.s3_client.abort_multipart_upload( @@ -197,8 +218,12 @@ def complete_upload(self) -> None: Key=self.obj_key, UploadId=self.upload_id, MultipartUpload={ - 'Parts': [ - {'PartNumber': part['PartNumber'], 'ETag': part['ETag'], 'ChecksumSHA256': part['ChecksumSHA256']} + "Parts": [ + { + "PartNumber": part["PartNumber"], + "ETag": part["ETag"], + "ChecksumSHA256": part["ChecksumSHA256"], + } for part in self.uploaded_parts ] }, @@ -208,6 +233,7 @@ def complete_upload(self) -> None: self.obj_key = None def timeout(self, log_path: Path) -> None: + # Upload latest segment upon CLPLogLevelTimeout if not self.upload_id: super().__init__(fpath=log_path) self.initiate_upload(log_path) @@ -217,4 +243,4 @@ def timeout(self, log_path: Path) -> None: def close(self) -> None: super().close() if self.closed: - self.complete_upload() \ No newline at end of file + self.complete_upload() From 065c28cf628f9043efca0441b86062a402c4e9dc Mon Sep 17 00:00:00 2001 From: IreneLi Date: Sun, 9 Feb 2025 11:03:52 -0500 Subject: [PATCH 12/30] Redesign CLPS3Handler architecture and complete a functional draft of the handler --- src/clp_logging/handlers.py | 159 ++++++++++++++++++- src/clp_logging/remote_handlers.py | 246 ----------------------------- 2 files changed, 158 insertions(+), 247 deletions(-) delete mode 100644 src/clp_logging/remote_handlers.py diff --git a/src/clp_logging/handlers.py b/src/clp_logging/handlers.py index ede0003..524fd4c 100644 --- a/src/clp_logging/handlers.py +++ b/src/clp_logging/handlers.py @@ -10,12 +10,16 @@ from signal import SIGINT, signal, SIGTERM from threading import Thread, Timer from types import FrameType -from typing import Callable, ClassVar, Dict, IO, Optional, Tuple, Union +from typing import Any, Callable, ClassVar, Dict, List, IO, Optional, Tuple, Union import tzlocal from clp_ffi_py.ir import FourByteEncoder from zstandard import FLUSH_FRAME, ZstdCompressionWriter, ZstdCompressor +import datetime +import io +import boto3 + from clp_logging.protocol import ( BYTE_ORDER, EOF_CHAR, @@ -792,3 +796,156 @@ def __init__( super().__init__( open(fpath, mode), enable_compression, timestamp_format, timezone, loglevel_timeout ) + + +class CLPS3Handler(CLPBaseHandler): + """ + Log is written to stream in CLP IR encoding, and uploaded to s3_bucket + + :param s3_bucket: S3 bucket to upload CLP encoded log messages to + """ + + def init(self, stream: IO[bytes]) -> None: + self.cctx: ZstdCompressor = ZstdCompressor() + self.ostream: Union[ZstdCompressionWriter, IO[bytes]] = ( + self.cctx.stream_writer(self.local_buffer) if self.enable_compression else stream + ) + self.last_timestamp_ms: int = floor(time.time() * 1000) # convert to ms and truncate + self.ostream.write( + FourByteEncoder.encode_preamble( + self.last_timestamp_ms, self.timestamp_format, self.timezone + ) + ) + + def __init__( + self, + s3_bucket: str, + stream: Optional[IO[bytes]] = None, + enable_compression: bool = True, + timestamp_format: Optional[str] = None, + timezone: Optional[str] = None, + ) -> None: + super().__init__() + self.closed: bool = False + self.enable_compression: bool = enable_compression + self.local_buffer: io.BytesIO = io.BytesIO() + if stream is None: + stream = self.local_buffer + self.ostream: IO[bytes] = stream + self.timestamp_format: str + self.timezone: str + self.timestamp_format, self.timezone = _init_timeinfo(timestamp_format, timezone) + self.init(stream) + + self.s3_bucket: str = s3_bucket + self.remote_folder_path: Optional[str] = None + self.obj_key: str = self._remote_log_naming(datetime.datetime.now()) + self.s3_resource: boto3.resources.factory.s3.ServiceResource = boto3.resource("s3") + self.s3_client: boto3.client = boto3.client("s3") + self.buffer_size: int = 1024 * 1024 * 5 + self.uploaded_parts: List[Dict[str, int | str]] = [] + self.upload_index: int = 1 + create_ret: Dict[str, Any] = self.s3_client.create_multipart_upload( + Bucket=self.s3_bucket, Key=self.obj_key#, ChecksumAlgorithm="SHA256" + ) + self.upload_id: int = create_ret["UploadId"] + + + def _remote_log_naming(self, timestamp: datetime.datetime) -> str: + self.remote_folder_path: str = f"logs/{timestamp.year}/{timestamp.month}/{timestamp.day}" + + new_filename: str + upload_time: str = timestamp.strftime("%Y-%m-%d-%H%M%S") + + # HARD-CODED TO .clp.zst FOR NOW + if self.enable_compression: + new_filename = f"{self.remote_folder_path}/{upload_time}_log.clp.zst" + else: + new_filename = f"{self.remote_folder_path}/{upload_time}_log.clp" + return new_filename + + # override + def _write(self, loglevel: int, msg: str) -> None: + if self.closed: + raise RuntimeError("Stream already closed") + clp_msg: bytearray + clp_msg, self.last_timestamp_ms = _encode_log_event(msg, self.last_timestamp_ms) + self.ostream.write(clp_msg) + self._flush() + if self.local_buffer.tell() >= self.buffer_size: + self.upload_index += 1 + self.local_buffer.seek(0) + self.local_buffer.truncate(0) + + def _flush(self) -> None: + self.ostream.flush() + data = self.local_buffer.getvalue() + + try: + # sha256_checksum: str = self._calculate_part_sha256(upload_data) + response: Dict[str, Any] = self.s3_client.upload_part( + Bucket=self.s3_bucket, + Key=self.obj_key, + Body=data, + PartNumber=self.upload_index, + UploadId=self.upload_id, + # ChecksumSHA256=sha256_checksum, + ) + + # Store both ETag and SHA256 for validation + upload_status: Dict[str, int | str] = { + "PartNumber": self.upload_index, + "ETag": response["ETag"], + # "ChecksumSHA256": response["ChecksumSHA256"], + } + + # Determine the part to which the new upload_status belongs + if len(self.uploaded_parts) > self.upload_index - 1: + self.uploaded_parts[self.upload_index-1] = upload_status + else: + self.uploaded_parts.append(upload_status) + + except Exception as e: + self.s3_client.abort_multipart_upload( + Bucket=self.s3_bucket, Key=self.obj_key, UploadId=self.upload_id + ) + raise Exception( + f'Multipart Upload on Part {self.upload_index}: {e}' + ) from e + + + # override + def close(self) -> None: + self.ostream.write(EOF_CHAR) + self._flush() + self.local_buffer.seek(0) + self.local_buffer.truncate(0) + + try: + print(self.uploaded_parts) + self.s3_client.complete_multipart_upload( + Bucket=self.s3_bucket, + Key=self.obj_key, + UploadId=self.upload_id, + MultipartUpload={ + "Parts": [ + { + "PartNumber": part["PartNumber"], + "ETag": part["ETag"], + # "ChecksumSHA256": part["ChecksumSHA256"], + } + for part in self.uploaded_parts + ] + }, + ) + except Exception as e: + self.s3_client.abort_multipart_upload( + Bucket=self.s3_bucket, Key=self.obj_key, UploadId=self.upload_id + ) + raise Exception( + f'Multipart Upload on Part {self.upload_index}: {e}' + ) from e + finally: + self.ostream.close() + self.closed = True + super().close() \ No newline at end of file diff --git a/src/clp_logging/remote_handlers.py b/src/clp_logging/remote_handlers.py deleted file mode 100644 index e667234..0000000 --- a/src/clp_logging/remote_handlers.py +++ /dev/null @@ -1,246 +0,0 @@ -import base64 -import datetime -import hashlib -from pathlib import Path -from typing import Any, Dict, List, Optional - -import boto3 -from botocore.exceptions import NoCredentialsError - -from clp_logging.handlers import CLPFileHandler - - -class CLPRemoteHandler(CLPFileHandler): - """ - Handles CLP file upload and comparison to AWS S3 bucket. Configuration of - AWS access key is required. Run command `aws configure`. - - :param s3_bucket: Name of the AWS S3 Bucket where log files are transferred - """ - - def __init__( - self, - s3_bucket: str, - ) -> None: - self.s3_resource: boto3.resources.factory.s3.ServiceResource = boto3.resource("s3") - self.s3_client: boto3.client = boto3.client("s3") - self.bucket: str = s3_bucket - - self.log_name: Optional[str] = None - self.log_path: Optional[Path] = None - self.remote_folder_path: Optional[str] = None - self.obj_key: Optional[str] = None - - self.multipart_upload_config: Dict[str, int] = { - "size": 1024 * 1024 * 5, - "index": 1, - "pos": 0, - } - self.uploaded_parts: List[Dict[str, int | str]] = [] - self.upload_id: Optional[int] = None - self.remote_file_count: int = 0 - self.upload_in_progress: bool = False - - def _calculate_part_sha256(self, data: bytes) -> str: - sha256_hash: hashlib._Hash = hashlib.sha256() - sha256_hash.update(data) - return base64.b64encode(sha256_hash.digest()).decode("utf-8") - - def _remote_log_naming(self, timestamp: datetime.datetime) -> str: - if self.log_name is None: - raise ValueError("No input file.") - - new_filename: str - ext: int = self.log_name.find(".") - upload_time: str = timestamp.strftime("%Y-%m-%d-%H%M%S") - # Naming of multiple remote files from the same local file - if self.remote_file_count != 0: - upload_time += "-" + str(self.remote_file_count) - - if ext != -1: - new_filename = f"log_{upload_time}{self.log_name[ext:]}" - else: - new_filename = f"{upload_time}_{self.log_name}" - new_filename = f"{self.remote_folder_path}/{new_filename}" - return new_filename - - def _upload_part(self) -> Dict[str, int | str]: - if self.log_path is None: - raise ValueError("No input file.") - - upload_data: bytes - # Read the latest version of the file - try: - with open(self.log_path, "rb") as file: - file.seek(self.multipart_upload_config["pos"]) - upload_data = file.read(self.multipart_upload_config["size"]) - except FileNotFoundError as e: - raise FileNotFoundError(f"The log file {self.log_path} cannot be found: {e}") from e - except IOError as e: - raise IOError(f"IO Error occurred while reading file {self.log_path}: {e}") from e - except Exception as e: - raise e - - try: - sha256_checksum: str = self._calculate_part_sha256(upload_data) - response: Dict[str, Any] = self.s3_client.upload_part( - Bucket=self.bucket, - Key=self.obj_key, - Body=upload_data, - PartNumber=self.multipart_upload_config["index"], - UploadId=self.upload_id, - ChecksumSHA256=sha256_checksum, - ) - - # Store both ETag and SHA256 for validation - return { - "PartNumber": self.multipart_upload_config["index"], - "ETag": response["ETag"], - "ChecksumSHA256": response["ChecksumSHA256"], - } - except Exception as e: - self.s3_client.abort_multipart_upload( - Bucket=self.bucket, Key=self.obj_key, UploadId=self.upload_id - ) - raise Exception( - f'Multipart Upload on Part {self.multipart_upload_config["index"]}: {e}' - ) from e - - def get_obj_key(self) -> str | None: - return self.obj_key - - def set_obj_key(self, obj_key: str) -> None: - self.obj_key = obj_key - - def initiate_upload(self, log_path: Path) -> None: - if self.upload_in_progress: - raise Exception("An upload is already in progress. Cannot initiate another upload.") - - self.log_path = log_path - self.log_name = log_path.name - self.upload_in_progress = True - timestamp: datetime.datetime = datetime.datetime.now() - self.remote_folder_path = f"logs/{timestamp.year}/{timestamp.month}/{timestamp.day}" - - self.obj_key = self._remote_log_naming(timestamp) - create_ret: Dict[str, Any] = self.s3_client.create_multipart_upload( - Bucket=self.bucket, Key=self.obj_key, ChecksumAlgorithm="SHA256" - ) - self.upload_id = create_ret["UploadId"] - - def multipart_upload(self) -> None: - # Upload initiation is required before multipart_upload - if not self.upload_id: - raise Exception("No upload process.") - if self.log_path is None: - raise ValueError("No input file.") - - file_size: int = self.log_path.stat().st_size - try: - while ( - file_size - self.multipart_upload_config["pos"] - >= self.multipart_upload_config["size"] - ): - # Perform upload and label the uploaded part - upload_status: Dict[str, int | str] = self._upload_part() - self.multipart_upload_config["index"] += 1 - self.multipart_upload_config["pos"] += self.multipart_upload_config["size"] - self.uploaded_parts.append(upload_status) - - # AWS S3 limits object part count to 10000 - if self.multipart_upload_config["index"] >= 10000: - self.s3_client.complete_multipart_upload( - Bucket=self.bucket, - Key=self.obj_key, - UploadId=self.upload_id, - MultipartUpload={ - "Parts": [ - { - "PartNumber": part["PartNumber"], - "ETag": part["ETag"], - "ChecksumSHA256": part["ChecksumSHA256"], - } - for part in self.uploaded_parts - ] - }, - ) - - # Initiate multipart upload to a new S3 object - self.remote_file_count += 1 - timestamp: datetime.datetime = datetime.datetime.now() - self.remote_folder_path = ( - f"logs/{timestamp.year}/{timestamp.month}/{timestamp.day}" - ) - self.obj_key = self._remote_log_naming(timestamp) - self.multipart_upload_config["index"] = 1 - self.uploaded_parts = [] - create_ret = self.s3_client.create_multipart_upload( - Bucket=self.bucket, Key=self.obj_key, ChecksumAlgorithm="SHA256" - ) - self.upload_id = create_ret["UploadId"] - - except NoCredentialsError as e: - raise e - except Exception as e: - self.s3_client.abort_multipart_upload( - Bucket=self.bucket, Key=self.obj_key, UploadId=self.upload_id - ) - raise e - - def complete_upload(self) -> None: - # Upload initiation is required before complete_upload - if not self.upload_id: - raise Exception("No upload process to complete.") - if self.log_path is None: - raise ValueError("No input file.") - - file_size: int = self.log_path.stat().st_size - try: - # Upload the remaining segment - if ( - file_size - self.multipart_upload_config["pos"] - < self.multipart_upload_config["size"] - ): - self.multipart_upload_config["size"] = ( - file_size - self.multipart_upload_config["pos"] - ) - upload_status: Dict[str, int | str] = self._upload_part() - self.multipart_upload_config["index"] += 1 - self.uploaded_parts.append(upload_status) - except Exception as e: - self.s3_client.abort_multipart_upload( - Bucket=self.bucket, Key=self.obj_key, UploadId=self.upload_id - ) - raise e - - self.s3_client.complete_multipart_upload( - Bucket=self.bucket, - Key=self.obj_key, - UploadId=self.upload_id, - MultipartUpload={ - "Parts": [ - { - "PartNumber": part["PartNumber"], - "ETag": part["ETag"], - "ChecksumSHA256": part["ChecksumSHA256"], - } - for part in self.uploaded_parts - ] - }, - ) - self.upload_in_progress = False - self.upload_id = None - self.obj_key = None - - def timeout(self, log_path: Path) -> None: - # Upload latest segment upon CLPLogLevelTimeout - if not self.upload_id: - super().__init__(fpath=log_path) - self.initiate_upload(log_path) - - self.multipart_upload() - - def close(self) -> None: - super().close() - if self.closed: - self.complete_upload() From 828897e8b4538120f3a1c5d8b112d635d38cd69f Mon Sep 17 00:00:00 2001 From: IreneLi Date: Mon, 17 Mar 2025 10:53:13 -0400 Subject: [PATCH 13/30] Add aws credential configuration --- src/clp_logging/handlers.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/clp_logging/handlers.py b/src/clp_logging/handlers.py index 524fd4c..94b0a7c 100644 --- a/src/clp_logging/handlers.py +++ b/src/clp_logging/handlers.py @@ -824,6 +824,8 @@ def __init__( enable_compression: bool = True, timestamp_format: Optional[str] = None, timezone: Optional[str] = None, + aws_access_key_id: Optional[str] = None, + aws_secret_access_key: Optional[str] = None ) -> None: super().__init__() self.closed: bool = False @@ -841,7 +843,11 @@ def __init__( self.remote_folder_path: Optional[str] = None self.obj_key: str = self._remote_log_naming(datetime.datetime.now()) self.s3_resource: boto3.resources.factory.s3.ServiceResource = boto3.resource("s3") - self.s3_client: boto3.client = boto3.client("s3") + self.s3_client: boto3.client + if aws_access_key_id and aws_secret_access_key: + self.s3_client = boto3.client("s3", aws_access_key_id, aws_secret_access_key) + else: + self.s3_client = boto3.client("s3") self.buffer_size: int = 1024 * 1024 * 5 self.uploaded_parts: List[Dict[str, int | str]] = [] self.upload_index: int = 1 From 2267849208d17774ef77ccd0dadc2d9e944e2809 Mon Sep 17 00:00:00 2001 From: IreneLi Date: Mon, 17 Mar 2025 11:00:16 -0400 Subject: [PATCH 14/30] Add rotation after 10000 parts --- src/clp_logging/handlers.py | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/src/clp_logging/handlers.py b/src/clp_logging/handlers.py index 94b0a7c..35f2543 100644 --- a/src/clp_logging/handlers.py +++ b/src/clp_logging/handlers.py @@ -841,7 +841,9 @@ def __init__( self.s3_bucket: str = s3_bucket self.remote_folder_path: Optional[str] = None - self.obj_key: str = self._remote_log_naming(datetime.datetime.now()) + self.remote_file_count: int = 0 + self.start_timestamp: datetime = datetime.datetime.now() + self.obj_key: str = self._remote_log_naming(self.start_timestamp) self.s3_resource: boto3.resources.factory.s3.ServiceResource = boto3.resource("s3") self.s3_client: boto3.client if aws_access_key_id and aws_secret_access_key: @@ -863,11 +865,15 @@ def _remote_log_naming(self, timestamp: datetime.datetime) -> str: new_filename: str upload_time: str = timestamp.strftime("%Y-%m-%d-%H%M%S") + file_count: str = "" + if self.remote_file_count != 0: + file_count = f"-{self.remote_file_count}" + # HARD-CODED TO .clp.zst FOR NOW if self.enable_compression: - new_filename = f"{self.remote_folder_path}/{upload_time}_log.clp.zst" + new_filename = f"{self.remote_folder_path}/{upload_time}_log{file_count}.clp.zst" else: - new_filename = f"{self.remote_folder_path}/{upload_time}_log.clp" + new_filename = f"{self.remote_folder_path}/{upload_time}_log{file_count}.clp" return new_filename # override @@ -883,6 +889,17 @@ def _write(self, loglevel: int, msg: str) -> None: self.local_buffer.seek(0) self.local_buffer.truncate(0) + # Rotate after 10000 parts (limitaion by s3) + if self.upload_index > 10000: + self.remote_file_count += 1 + self.obj_key = self._remote_log_naming(self.start_timestamp) + self.uploaded_parts = [] + self.upload_index = 1 + create_ret = self.s3_client.create_multipart_upload( + Bucket=self.s3_bucket, Key=self.obj_key#, ChecksumAlgorithm="SHA256" + ) + self.upload_id = create_ret["UploadId"] + def _flush(self) -> None: self.ostream.flush() data = self.local_buffer.getvalue() From ba5c3dd03ade7a31e9448e5d3a8030da5ea4a3da Mon Sep 17 00:00:00 2001 From: IreneLi Date: Mon, 17 Mar 2025 23:27:58 -0400 Subject: [PATCH 15/30] Fix part limit rotation --- src/clp_logging/handlers.py | 54 ++++++++++++++++++++++--------------- 1 file changed, 32 insertions(+), 22 deletions(-) diff --git a/src/clp_logging/handlers.py b/src/clp_logging/handlers.py index 35f2543..0d61d60 100644 --- a/src/clp_logging/handlers.py +++ b/src/clp_logging/handlers.py @@ -825,7 +825,8 @@ def __init__( timestamp_format: Optional[str] = None, timezone: Optional[str] = None, aws_access_key_id: Optional[str] = None, - aws_secret_access_key: Optional[str] = None + aws_secret_access_key: Optional[str] = None, + part_limit: Optional[int] = None ) -> None: super().__init__() self.closed: bool = False @@ -851,6 +852,7 @@ def __init__( else: self.s3_client = boto3.client("s3") self.buffer_size: int = 1024 * 1024 * 5 + self.part_limit: int = part_limit if part_limit else 10000 self.uploaded_parts: List[Dict[str, int | str]] = [] self.upload_index: int = 1 create_ret: Dict[str, Any] = self.s3_client.create_multipart_upload( @@ -885,20 +887,27 @@ def _write(self, loglevel: int, msg: str) -> None: self.ostream.write(clp_msg) self._flush() if self.local_buffer.tell() >= self.buffer_size: - self.upload_index += 1 - self.local_buffer.seek(0) - self.local_buffer.truncate(0) - - # Rotate after 10000 parts (limitaion by s3) - if self.upload_index > 10000: - self.remote_file_count += 1 - self.obj_key = self._remote_log_naming(self.start_timestamp) - self.uploaded_parts = [] - self.upload_index = 1 - create_ret = self.s3_client.create_multipart_upload( - Bucket=self.s3_bucket, Key=self.obj_key#, ChecksumAlgorithm="SHA256" - ) - self.upload_id = create_ret["UploadId"] + # Rotate after 10000 parts (limitaion by s3) + if self.upload_index >= self.part_limit: + self._complete_upload() + self.ostream.close() + self.local_buffer = io.BytesIO() + self.init(self.local_buffer) + self.remote_file_count += 1 + self.obj_key = self._remote_log_naming(self.start_timestamp) + self.uploaded_parts = [] + self.upload_index = 1 + create_ret = self.s3_client.create_multipart_upload( + Bucket=self.s3_bucket, Key=self.obj_key#, ChecksumAlgorithm="SHA256" + ) + self.upload_id = create_ret["UploadId"] + if not self.upload_id: + raise RuntimeError("Failed to initialize new upload ID.") + else: + self.upload_index += 1 + self.local_buffer.seek(0) + self.local_buffer.truncate(0) + def _flush(self) -> None: self.ostream.flush() @@ -936,9 +945,7 @@ def _flush(self) -> None: f'Multipart Upload on Part {self.upload_index}: {e}' ) from e - - # override - def close(self) -> None: + def _complete_upload(self) -> None: self.ostream.write(EOF_CHAR) self._flush() self.local_buffer.seek(0) @@ -968,7 +975,10 @@ def close(self) -> None: raise Exception( f'Multipart Upload on Part {self.upload_index}: {e}' ) from e - finally: - self.ostream.close() - self.closed = True - super().close() \ No newline at end of file + + # override + def close(self) -> None: + self._complete_upload() + self.ostream.close() + self.closed = True + super().close() \ No newline at end of file From a9ec06dca35323a56c796440f837ecf946d9f391 Mon Sep 17 00:00:00 2001 From: IreneLi Date: Mon, 17 Mar 2025 23:29:45 -0400 Subject: [PATCH 16/30] Fix aws credential checking --- src/clp_logging/handlers.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/clp_logging/handlers.py b/src/clp_logging/handlers.py index 0d61d60..4746585 100644 --- a/src/clp_logging/handlers.py +++ b/src/clp_logging/handlers.py @@ -846,11 +846,11 @@ def __init__( self.start_timestamp: datetime = datetime.datetime.now() self.obj_key: str = self._remote_log_naming(self.start_timestamp) self.s3_resource: boto3.resources.factory.s3.ServiceResource = boto3.resource("s3") - self.s3_client: boto3.client - if aws_access_key_id and aws_secret_access_key: - self.s3_client = boto3.client("s3", aws_access_key_id, aws_secret_access_key) - else: - self.s3_client = boto3.client("s3") + self.s3_client: boto3.client = boto3.client( + "s3", + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key + ) if aws_access_key_id and aws_secret_access_key else boto3.client("s3") self.buffer_size: int = 1024 * 1024 * 5 self.part_limit: int = part_limit if part_limit else 10000 self.uploaded_parts: List[Dict[str, int | str]] = [] From 0aef71d6c858ba5570067a4efb7e2158369acccf Mon Sep 17 00:00:00 2001 From: IreneLi Date: Mon, 17 Mar 2025 23:43:40 -0400 Subject: [PATCH 17/30] Enable sha256 checksum on multipart upload --- src/clp_logging/handlers.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/clp_logging/handlers.py b/src/clp_logging/handlers.py index 4746585..e4d413b 100644 --- a/src/clp_logging/handlers.py +++ b/src/clp_logging/handlers.py @@ -16,9 +16,12 @@ from clp_ffi_py.ir import FourByteEncoder from zstandard import FLUSH_FRAME, ZstdCompressionWriter, ZstdCompressor +import boto3 +import base64 import datetime +import hashlib import io -import boto3 + from clp_logging.protocol import ( BYTE_ORDER, @@ -856,7 +859,7 @@ def __init__( self.uploaded_parts: List[Dict[str, int | str]] = [] self.upload_index: int = 1 create_ret: Dict[str, Any] = self.s3_client.create_multipart_upload( - Bucket=self.s3_bucket, Key=self.obj_key#, ChecksumAlgorithm="SHA256" + Bucket=self.s3_bucket, Key=self.obj_key, ChecksumAlgorithm="SHA256" ) self.upload_id: int = create_ret["UploadId"] @@ -898,7 +901,7 @@ def _write(self, loglevel: int, msg: str) -> None: self.uploaded_parts = [] self.upload_index = 1 create_ret = self.s3_client.create_multipart_upload( - Bucket=self.s3_bucket, Key=self.obj_key#, ChecksumAlgorithm="SHA256" + Bucket=self.s3_bucket, Key=self.obj_key, ChecksumAlgorithm="SHA256" ) self.upload_id = create_ret["UploadId"] if not self.upload_id: @@ -914,21 +917,21 @@ def _flush(self) -> None: data = self.local_buffer.getvalue() try: - # sha256_checksum: str = self._calculate_part_sha256(upload_data) + sha256_checksum: str = base64.b64encode(hashlib.sha256(data).digest()).decode('utf-8') response: Dict[str, Any] = self.s3_client.upload_part( Bucket=self.s3_bucket, Key=self.obj_key, Body=data, PartNumber=self.upload_index, UploadId=self.upload_id, - # ChecksumSHA256=sha256_checksum, + ChecksumSHA256=sha256_checksum, ) # Store both ETag and SHA256 for validation upload_status: Dict[str, int | str] = { "PartNumber": self.upload_index, "ETag": response["ETag"], - # "ChecksumSHA256": response["ChecksumSHA256"], + "ChecksumSHA256": response["ChecksumSHA256"], } # Determine the part to which the new upload_status belongs @@ -952,7 +955,6 @@ def _complete_upload(self) -> None: self.local_buffer.truncate(0) try: - print(self.uploaded_parts) self.s3_client.complete_multipart_upload( Bucket=self.s3_bucket, Key=self.obj_key, @@ -962,7 +964,7 @@ def _complete_upload(self) -> None: { "PartNumber": part["PartNumber"], "ETag": part["ETag"], - # "ChecksumSHA256": part["ChecksumSHA256"], + "ChecksumSHA256": part["ChecksumSHA256"], } for part in self.uploaded_parts ] From 4839528aa7003147c894c4c940820243ae33c72e Mon Sep 17 00:00:00 2001 From: IreneLi Date: Tue, 18 Mar 2025 00:21:57 -0400 Subject: [PATCH 18/30] Add error checking throughout the upload process --- src/clp_logging/handlers.py | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/src/clp_logging/handlers.py b/src/clp_logging/handlers.py index e4d413b..bf8b6d6 100644 --- a/src/clp_logging/handlers.py +++ b/src/clp_logging/handlers.py @@ -16,8 +16,9 @@ from clp_ffi_py.ir import FourByteEncoder from zstandard import FLUSH_FRAME, ZstdCompressionWriter, ZstdCompressor -import boto3 import base64 +import boto3 +import botocore import datetime import hashlib import io @@ -849,12 +850,17 @@ def __init__( self.start_timestamp: datetime = datetime.datetime.now() self.obj_key: str = self._remote_log_naming(self.start_timestamp) self.s3_resource: boto3.resources.factory.s3.ServiceResource = boto3.resource("s3") - self.s3_client: boto3.client = boto3.client( - "s3", - aws_access_key_id=aws_access_key_id, - aws_secret_access_key=aws_secret_access_key - ) if aws_access_key_id and aws_secret_access_key else boto3.client("s3") - self.buffer_size: int = 1024 * 1024 * 5 + try: + self.s3_client = boto3.client( + "s3", + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key + ) if aws_access_key_id and aws_secret_access_key else boto3.client("s3") + except botocore.exceptions.NoCredentialsError: + raise RuntimeError("AWS credentials not found. Please configure your credentials.") + except botocore.exceptions.ClientError as e: + raise RuntimeError(f"Failed to initialize AWS client: {e}") + self.buffer_size: int = 1024 * 1024 * 0.5 self.part_limit: int = part_limit if part_limit else 10000 self.uploaded_parts: List[Dict[str, int | str]] = [] self.upload_index: int = 1 @@ -862,6 +868,8 @@ def __init__( Bucket=self.s3_bucket, Key=self.obj_key, ChecksumAlgorithm="SHA256" ) self.upload_id: int = create_ret["UploadId"] + if not self.upload_id or not isinstance(self.upload_id, str): + raise RuntimeError("Failed to obtain a valid Upload ID from S3.") def _remote_log_naming(self, timestamp: datetime.datetime) -> str: @@ -927,6 +935,9 @@ def _flush(self) -> None: ChecksumSHA256=sha256_checksum, ) + if response["ChecksumSHA256"] != sha256_checksum: + raise ValueError(f"Checksum mismatch for part {self.upload_index}. Upload aborted.") + # Store both ETag and SHA256 for validation upload_status: Dict[str, int | str] = { "PartNumber": self.upload_index, From 8ab75d046c9bf2698d9880103ba7d67da160a6ff Mon Sep 17 00:00:00 2001 From: IreneLi Date: Fri, 21 Mar 2025 00:05:29 -0400 Subject: [PATCH 19/30] Define macros and add options to specify multipart upload size --- src/clp_logging/handlers.py | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/src/clp_logging/handlers.py b/src/clp_logging/handlers.py index bf8b6d6..4f0f249 100644 --- a/src/clp_logging/handlers.py +++ b/src/clp_logging/handlers.py @@ -40,6 +40,11 @@ DEFAULT_LOG_FORMAT: str = " %(levelname)s %(name)s %(message)s" WARN_PREFIX: str = " [WARN][clp_logging]" +# Define the multipart upload size limits +MIN_UPLOAD_PART_SIZE = 5 * 1024 * 1024 # 5 MB +MAX_UPLOAD_PART_SIZE = 5 * 1024 * 1024 * 1024 # 5 GB +MAX_PART_NUM_PER_UPLOAD = 10000 + def _init_timeinfo(fmt: Optional[str], tz: Optional[str]) -> Tuple[str, str]: """ @@ -830,7 +835,8 @@ def __init__( timezone: Optional[str] = None, aws_access_key_id: Optional[str] = None, aws_secret_access_key: Optional[str] = None, - part_limit: Optional[int] = None + max_part_num: Optional[int] = None, + upload_part_size: Optional[int] = MIN_UPLOAD_PART_SIZE ) -> None: super().__init__() self.closed: bool = False @@ -860,8 +866,16 @@ def __init__( raise RuntimeError("AWS credentials not found. Please configure your credentials.") except botocore.exceptions.ClientError as e: raise RuntimeError(f"Failed to initialize AWS client: {e}") - self.buffer_size: int = 1024 * 1024 * 0.5 - self.part_limit: int = part_limit if part_limit else 10000 + + self.upload_part_size: int + if MIN_UPLOAD_PART_SIZE <= upload_part_size <= MAX_UPLOAD_PART_SIZE: + self.upload_part_size = upload_part_size + else: + raise RuntimeError( + f"Invalid upload_part_size: {upload_part_size}. " + f"It must be between {MIN_UPLOAD_PART_SIZE} and {MAX_UPLOAD_PART_SIZE}." + ) + self.max_part_num: int = max_part_num if max_part_num else MAX_PART_NUM_PER_UPLOAD self.uploaded_parts: List[Dict[str, int | str]] = [] self.upload_index: int = 1 create_ret: Dict[str, Any] = self.s3_client.create_multipart_upload( @@ -897,9 +911,9 @@ def _write(self, loglevel: int, msg: str) -> None: clp_msg, self.last_timestamp_ms = _encode_log_event(msg, self.last_timestamp_ms) self.ostream.write(clp_msg) self._flush() - if self.local_buffer.tell() >= self.buffer_size: - # Rotate after 10000 parts (limitaion by s3) - if self.upload_index >= self.part_limit: + if self.local_buffer.tell() >= self.upload_part_size: + # Rotate after maximum number of parts + if self.upload_index >= self.max_part_num: self._complete_upload() self.ostream.close() self.local_buffer = io.BytesIO() From c273d0c5d879d43a39c0e66c180da968169ecc62 Mon Sep 17 00:00:00 2001 From: IreneLi Date: Fri, 21 Mar 2025 00:06:34 -0400 Subject: [PATCH 20/30] Always define remote file index --- src/clp_logging/handlers.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/clp_logging/handlers.py b/src/clp_logging/handlers.py index 4f0f249..623d3fb 100644 --- a/src/clp_logging/handlers.py +++ b/src/clp_logging/handlers.py @@ -852,7 +852,7 @@ def __init__( self.s3_bucket: str = s3_bucket self.remote_folder_path: Optional[str] = None - self.remote_file_count: int = 0 + self.remote_file_count: int = 1 self.start_timestamp: datetime = datetime.datetime.now() self.obj_key: str = self._remote_log_naming(self.start_timestamp) self.s3_resource: boto3.resources.factory.s3.ServiceResource = boto3.resource("s3") @@ -892,9 +892,7 @@ def _remote_log_naming(self, timestamp: datetime.datetime) -> str: new_filename: str upload_time: str = timestamp.strftime("%Y-%m-%d-%H%M%S") - file_count: str = "" - if self.remote_file_count != 0: - file_count = f"-{self.remote_file_count}" + file_count: str = f"-{self.remote_file_count}" # HARD-CODED TO .clp.zst FOR NOW if self.enable_compression: From b737d92ecc1112a7f4eed001eee0997178c88e45 Mon Sep 17 00:00:00 2001 From: IreneLi Date: Fri, 21 Mar 2025 00:10:19 -0400 Subject: [PATCH 21/30] Use unix timestamp on file name --- src/clp_logging/handlers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/clp_logging/handlers.py b/src/clp_logging/handlers.py index 623d3fb..d01acec 100644 --- a/src/clp_logging/handlers.py +++ b/src/clp_logging/handlers.py @@ -890,7 +890,7 @@ def _remote_log_naming(self, timestamp: datetime.datetime) -> str: self.remote_folder_path: str = f"logs/{timestamp.year}/{timestamp.month}/{timestamp.day}" new_filename: str - upload_time: str = timestamp.strftime("%Y-%m-%d-%H%M%S") + upload_time: str = str(int(timestamp.timestamp())) file_count: str = f"-{self.remote_file_count}" From 8f94e68916a40026d4d1f9ca7eb9cc0c1dc3628c Mon Sep 17 00:00:00 2001 From: IreneLi Date: Fri, 21 Mar 2025 00:24:52 -0400 Subject: [PATCH 22/30] Add configurable parameter on users's s3 directory --- src/clp_logging/handlers.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/clp_logging/handlers.py b/src/clp_logging/handlers.py index d01acec..52e2541 100644 --- a/src/clp_logging/handlers.py +++ b/src/clp_logging/handlers.py @@ -836,7 +836,8 @@ def __init__( aws_access_key_id: Optional[str] = None, aws_secret_access_key: Optional[str] = None, max_part_num: Optional[int] = None, - upload_part_size: Optional[int] = MIN_UPLOAD_PART_SIZE + upload_part_size: Optional[int] = MIN_UPLOAD_PART_SIZE, + s3_directory: Optional[str] = None ) -> None: super().__init__() self.closed: bool = False @@ -854,7 +855,8 @@ def __init__( self.remote_folder_path: Optional[str] = None self.remote_file_count: int = 1 self.start_timestamp: datetime = datetime.datetime.now() - self.obj_key: str = self._remote_log_naming(self.start_timestamp) + self.s3_directory: str = (s3_directory.rstrip('/') + '/') if s3_directory else '' + self.obj_key: str = self._remote_log_naming() self.s3_resource: boto3.resources.factory.s3.ServiceResource = boto3.resource("s3") try: self.s3_client = boto3.client( @@ -886,11 +888,11 @@ def __init__( raise RuntimeError("Failed to obtain a valid Upload ID from S3.") - def _remote_log_naming(self, timestamp: datetime.datetime) -> str: - self.remote_folder_path: str = f"logs/{timestamp.year}/{timestamp.month}/{timestamp.day}" + def _remote_log_naming(self) -> str: + self.remote_folder_path: str = f"{self.s3_directory}{self.start_timestamp.year}/{self.start_timestamp.month}/{self.start_timestamp.day}" new_filename: str - upload_time: str = str(int(timestamp.timestamp())) + upload_time: str = str(int(self.start_timestamp.timestamp())) file_count: str = f"-{self.remote_file_count}" @@ -917,7 +919,7 @@ def _write(self, loglevel: int, msg: str) -> None: self.local_buffer = io.BytesIO() self.init(self.local_buffer) self.remote_file_count += 1 - self.obj_key = self._remote_log_naming(self.start_timestamp) + self.obj_key = self._remote_log_naming() self.uploaded_parts = [] self.upload_index = 1 create_ret = self.s3_client.create_multipart_upload( From b70df58a7f9078110af8b2749a5b9c8fabf4ad1f Mon Sep 17 00:00:00 2001 From: Ruihao Li Date: Fri, 21 Mar 2025 17:10:07 -0400 Subject: [PATCH 23/30] Updated pyproject.toml to include new dev and testing dependencies for S3 handler --- pyproject.toml | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index e5ca9d4..37b23d8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "clp_logging" -version = "0.0.12" +version = "0.0.14" license = { text = "Apache License 2.0" } authors = [ { name="david lion", email="david.lion@yscope.com" }, @@ -14,7 +14,7 @@ readme = "README.md" requires-python = ">=3.7" dependencies = [ "backports.zoneinfo >= 0.2.1; python_version < '3.9'", - "clp-ffi-py >= 0.0.11", + "clp-ffi-py >= 0.0.14", "typing-extensions >= 3.7.4", "tzlocal == 5.1; python_version < '3.8'", "tzlocal >= 5.2; python_version >= '3.8'", @@ -27,6 +27,8 @@ classifiers = [ [project.optional-dependencies] dev = [ "black >= 24.4.0", + "boto3 >= 1.37.18", + "botocore >= 1.37.18", "build >= 0.8.0", "docformatter >= 1.7.5", "mypy >= 1.9.0", @@ -34,6 +36,7 @@ dev = [ "types-python-dateutil >= 2.8.19.2", ] test = [ + "moto >= 5.1.1", "smart_open == 6.4.0", ] From 587d6cbc04bb594916d447e14c47b2c4dce3c73c Mon Sep 17 00:00:00 2001 From: IreneLi Date: Tue, 25 Mar 2025 17:38:25 -0400 Subject: [PATCH 24/30] Modify naming of non-interface level variables --- src/clp_logging/handlers.py | 129 ++++++++++++++++++------------------ 1 file changed, 64 insertions(+), 65 deletions(-) diff --git a/src/clp_logging/handlers.py b/src/clp_logging/handlers.py index 52e2541..1bce787 100644 --- a/src/clp_logging/handlers.py +++ b/src/clp_logging/handlers.py @@ -816,11 +816,11 @@ class CLPS3Handler(CLPBaseHandler): def init(self, stream: IO[bytes]) -> None: self.cctx: ZstdCompressor = ZstdCompressor() - self.ostream: Union[ZstdCompressionWriter, IO[bytes]] = ( - self.cctx.stream_writer(self.local_buffer) if self.enable_compression else stream + self._ostream: Union[ZstdCompressionWriter, IO[bytes]] = ( + self.cctx.stream_writer(self._local_buffer) if self.enable_compression else stream ) self.last_timestamp_ms: int = floor(time.time() * 1000) # convert to ms and truncate - self.ostream.write( + self._ostream.write( FourByteEncoder.encode_preamble( self.last_timestamp_ms, self.timestamp_format, self.timezone ) @@ -842,24 +842,23 @@ def __init__( super().__init__() self.closed: bool = False self.enable_compression: bool = enable_compression - self.local_buffer: io.BytesIO = io.BytesIO() + self._local_buffer: io.BytesIO = io.BytesIO() if stream is None: - stream = self.local_buffer - self.ostream: IO[bytes] = stream + stream = self._local_buffer + self._ostream: IO[bytes] = stream self.timestamp_format: str self.timezone: str self.timestamp_format, self.timezone = _init_timeinfo(timestamp_format, timezone) self.init(stream) self.s3_bucket: str = s3_bucket - self.remote_folder_path: Optional[str] = None - self.remote_file_count: int = 1 - self.start_timestamp: datetime = datetime.datetime.now() + self._remote_folder_path: Optional[str] = None + self._remote_file_count: int = 1 + self._start_timestamp: datetime = datetime.datetime.now() self.s3_directory: str = (s3_directory.rstrip('/') + '/') if s3_directory else '' - self.obj_key: str = self._remote_log_naming() - self.s3_resource: boto3.resources.factory.s3.ServiceResource = boto3.resource("s3") + self._obj_key: str = self._remote_log_naming() try: - self.s3_client = boto3.client( + self._s3_client = boto3.client( "s3", aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key @@ -878,29 +877,29 @@ def __init__( f"It must be between {MIN_UPLOAD_PART_SIZE} and {MAX_UPLOAD_PART_SIZE}." ) self.max_part_num: int = max_part_num if max_part_num else MAX_PART_NUM_PER_UPLOAD - self.uploaded_parts: List[Dict[str, int | str]] = [] - self.upload_index: int = 1 - create_ret: Dict[str, Any] = self.s3_client.create_multipart_upload( - Bucket=self.s3_bucket, Key=self.obj_key, ChecksumAlgorithm="SHA256" + self._uploaded_parts: List[Dict[str, int | str]] = [] + self._upload_index: int = 1 + create_ret: Dict[str, Any] = self._s3_client.create_multipart_upload( + Bucket=self.s3_bucket, Key=self._obj_key, ChecksumAlgorithm="SHA256" ) - self.upload_id: int = create_ret["UploadId"] - if not self.upload_id or not isinstance(self.upload_id, str): + self._upload_id: int = create_ret["UploadId"] + if not self._upload_id or not isinstance(self._upload_id, str): raise RuntimeError("Failed to obtain a valid Upload ID from S3.") def _remote_log_naming(self) -> str: - self.remote_folder_path: str = f"{self.s3_directory}{self.start_timestamp.year}/{self.start_timestamp.month}/{self.start_timestamp.day}" + self._remote_folder_path: str = f"{self.s3_directory}{self._start_timestamp.year}/{self._start_timestamp.month}/{self._start_timestamp.day}" new_filename: str - upload_time: str = str(int(self.start_timestamp.timestamp())) + upload_time: str = str(int(self._start_timestamp.timestamp())) - file_count: str = f"-{self.remote_file_count}" + file_count: str = f"-{self._remote_file_count}" # HARD-CODED TO .clp.zst FOR NOW if self.enable_compression: - new_filename = f"{self.remote_folder_path}/{upload_time}_log{file_count}.clp.zst" + new_filename = f"{self._remote_folder_path}/{upload_time}_log{file_count}.clp.zst" else: - new_filename = f"{self.remote_folder_path}/{upload_time}_log{file_count}.clp" + new_filename = f"{self._remote_folder_path}/{upload_time}_log{file_count}.clp" return new_filename # override @@ -909,81 +908,81 @@ def _write(self, loglevel: int, msg: str) -> None: raise RuntimeError("Stream already closed") clp_msg: bytearray clp_msg, self.last_timestamp_ms = _encode_log_event(msg, self.last_timestamp_ms) - self.ostream.write(clp_msg) + self._ostream.write(clp_msg) self._flush() - if self.local_buffer.tell() >= self.upload_part_size: + if self._local_buffer.tell() >= self.upload_part_size: # Rotate after maximum number of parts - if self.upload_index >= self.max_part_num: + if self._upload_index >= self.max_part_num: self._complete_upload() - self.ostream.close() - self.local_buffer = io.BytesIO() - self.init(self.local_buffer) - self.remote_file_count += 1 - self.obj_key = self._remote_log_naming() - self.uploaded_parts = [] - self.upload_index = 1 - create_ret = self.s3_client.create_multipart_upload( - Bucket=self.s3_bucket, Key=self.obj_key, ChecksumAlgorithm="SHA256" + self._ostream.close() + self._local_buffer = io.BytesIO() + self.init(self._local_buffer) + self._remote_file_count += 1 + self._obj_key = self._remote_log_naming() + self._uploaded_parts = [] + self._upload_index = 1 + create_ret = self._s3_client.create_multipart_upload( + Bucket=self.s3_bucket, Key=self._obj_key, ChecksumAlgorithm="SHA256" ) - self.upload_id = create_ret["UploadId"] - if not self.upload_id: + self._upload_id = create_ret["UploadId"] + if not self._upload_id: raise RuntimeError("Failed to initialize new upload ID.") else: - self.upload_index += 1 - self.local_buffer.seek(0) - self.local_buffer.truncate(0) + self._upload_index += 1 + self._local_buffer.seek(0) + self._local_buffer.truncate(0) def _flush(self) -> None: - self.ostream.flush() - data = self.local_buffer.getvalue() + self._ostream.flush() + data = self._local_buffer.getvalue() try: sha256_checksum: str = base64.b64encode(hashlib.sha256(data).digest()).decode('utf-8') - response: Dict[str, Any] = self.s3_client.upload_part( + response: Dict[str, Any] = self._s3_client.upload_part( Bucket=self.s3_bucket, - Key=self.obj_key, + Key=self._obj_key, Body=data, - PartNumber=self.upload_index, - UploadId=self.upload_id, + PartNumber=self._upload_index, + UploadId=self._upload_id, ChecksumSHA256=sha256_checksum, ) if response["ChecksumSHA256"] != sha256_checksum: - raise ValueError(f"Checksum mismatch for part {self.upload_index}. Upload aborted.") + raise ValueError(f"Checksum mismatch for part {self._upload_index}. Upload aborted.") # Store both ETag and SHA256 for validation upload_status: Dict[str, int | str] = { - "PartNumber": self.upload_index, + "PartNumber": self._upload_index, "ETag": response["ETag"], "ChecksumSHA256": response["ChecksumSHA256"], } # Determine the part to which the new upload_status belongs - if len(self.uploaded_parts) > self.upload_index - 1: - self.uploaded_parts[self.upload_index-1] = upload_status + if len(self._uploaded_parts) > self._upload_index - 1: + self._uploaded_parts[self._upload_index-1] = upload_status else: - self.uploaded_parts.append(upload_status) + self._uploaded_parts.append(upload_status) except Exception as e: - self.s3_client.abort_multipart_upload( - Bucket=self.s3_bucket, Key=self.obj_key, UploadId=self.upload_id + self._s3_client.abort_multipart_upload( + Bucket=self.s3_bucket, Key=self._obj_key, UploadId=self._upload_id ) raise Exception( - f'Multipart Upload on Part {self.upload_index}: {e}' + f'Multipart Upload on Part {self._upload_index}: {e}' ) from e def _complete_upload(self) -> None: - self.ostream.write(EOF_CHAR) + self._ostream.write(EOF_CHAR) self._flush() - self.local_buffer.seek(0) - self.local_buffer.truncate(0) + self._local_buffer.seek(0) + self._local_buffer.truncate(0) try: - self.s3_client.complete_multipart_upload( + self._s3_client.complete_multipart_upload( Bucket=self.s3_bucket, - Key=self.obj_key, - UploadId=self.upload_id, + Key=self._obj_key, + UploadId=self._upload_id, MultipartUpload={ "Parts": [ { @@ -991,21 +990,21 @@ def _complete_upload(self) -> None: "ETag": part["ETag"], "ChecksumSHA256": part["ChecksumSHA256"], } - for part in self.uploaded_parts + for part in self._uploaded_parts ] }, ) except Exception as e: - self.s3_client.abort_multipart_upload( - Bucket=self.s3_bucket, Key=self.obj_key, UploadId=self.upload_id + self._s3_client.abort_multipart_upload( + Bucket=self.s3_bucket, Key=self._obj_key, UploadId=self._upload_id ) raise Exception( - f'Multipart Upload on Part {self.upload_index}: {e}' + f'Multipart Upload on Part {self._upload_index}: {e}' ) from e # override def close(self) -> None: self._complete_upload() - self.ostream.close() + self._ostream.close() self.closed = True super().close() \ No newline at end of file From 9c8653ad9147cae2690c7ba534929261a39de10d Mon Sep 17 00:00:00 2001 From: IreneLi Date: Tue, 25 Mar 2025 17:56:57 -0400 Subject: [PATCH 25/30] Remove duplicate code --- src/clp_logging/handlers.py | 35 +++++++++++++++++------------------ 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/src/clp_logging/handlers.py b/src/clp_logging/handlers.py index 1bce787..5f2b6cb 100644 --- a/src/clp_logging/handlers.py +++ b/src/clp_logging/handlers.py @@ -814,18 +814,6 @@ class CLPS3Handler(CLPBaseHandler): :param s3_bucket: S3 bucket to upload CLP encoded log messages to """ - def init(self, stream: IO[bytes]) -> None: - self.cctx: ZstdCompressor = ZstdCompressor() - self._ostream: Union[ZstdCompressionWriter, IO[bytes]] = ( - self.cctx.stream_writer(self._local_buffer) if self.enable_compression else stream - ) - self.last_timestamp_ms: int = floor(time.time() * 1000) # convert to ms and truncate - self._ostream.write( - FourByteEncoder.encode_preamble( - self.last_timestamp_ms, self.timestamp_format, self.timezone - ) - ) - def __init__( self, s3_bucket: str, @@ -849,7 +837,7 @@ def __init__( self.timestamp_format: str self.timezone: str self.timestamp_format, self.timezone = _init_timeinfo(timestamp_format, timezone) - self.init(stream) + self._init_stream(stream) self.s3_bucket: str = s3_bucket self._remote_folder_path: Optional[str] = None @@ -886,6 +874,17 @@ def __init__( if not self._upload_id or not isinstance(self._upload_id, str): raise RuntimeError("Failed to obtain a valid Upload ID from S3.") + def _init_stream(self, stream: IO[bytes]) -> None: + self.cctx: ZstdCompressor = ZstdCompressor() + self._ostream: Union[ZstdCompressionWriter, IO[bytes]] = ( + self.cctx.stream_writer(self._local_buffer) if self.enable_compression else stream + ) + self.last_timestamp_ms: int = floor(time.time() * 1000) # convert to ms and truncate + self._ostream.write( + FourByteEncoder.encode_preamble( + self.last_timestamp_ms, self.timestamp_format, self.timezone + ) + ) def _remote_log_naming(self) -> str: self._remote_folder_path: str = f"{self.s3_directory}{self._start_timestamp.year}/{self._start_timestamp.month}/{self._start_timestamp.day}" @@ -914,9 +913,8 @@ def _write(self, loglevel: int, msg: str) -> None: # Rotate after maximum number of parts if self._upload_index >= self.max_part_num: self._complete_upload() - self._ostream.close() self._local_buffer = io.BytesIO() - self.init(self._local_buffer) + self._init_stream(self._local_buffer) self._remote_file_count += 1 self._obj_key = self._remote_log_naming() self._uploaded_parts = [] @@ -935,10 +933,10 @@ def _write(self, loglevel: int, msg: str) -> None: def _flush(self) -> None: self._ostream.flush() - data = self._local_buffer.getvalue() + data: bytes = self._local_buffer.getvalue() + sha256_checksum: str = base64.b64encode(hashlib.sha256(data).digest()).decode('utf-8') try: - sha256_checksum: str = base64.b64encode(hashlib.sha256(data).digest()).decode('utf-8') response: Dict[str, Any] = self._s3_client.upload_part( Bucket=self.s3_bucket, Key=self._obj_key, @@ -1002,9 +1000,10 @@ def _complete_upload(self) -> None: f'Multipart Upload on Part {self._upload_index}: {e}' ) from e + self._ostream.close() + # override def close(self) -> None: self._complete_upload() - self._ostream.close() self.closed = True super().close() \ No newline at end of file From be9bcbab4710fa3e8793894caefba42e4e3f1bb1 Mon Sep 17 00:00:00 2001 From: IreneLi Date: Tue, 25 Mar 2025 18:10:10 -0400 Subject: [PATCH 26/30] Add PutObject option to flush to remote --- src/clp_logging/handlers.py | 138 +++++++++++++++++++++--------------- 1 file changed, 79 insertions(+), 59 deletions(-) diff --git a/src/clp_logging/handlers.py b/src/clp_logging/handlers.py index 5f2b6cb..33d4626 100644 --- a/src/clp_logging/handlers.py +++ b/src/clp_logging/handlers.py @@ -825,7 +825,8 @@ def __init__( aws_secret_access_key: Optional[str] = None, max_part_num: Optional[int] = None, upload_part_size: Optional[int] = MIN_UPLOAD_PART_SIZE, - s3_directory: Optional[str] = None + s3_directory: Optional[str] = None, + use_multipart_upload: Optional[bool] = True ) -> None: super().__init__() self.closed: bool = False @@ -839,12 +840,7 @@ def __init__( self.timestamp_format, self.timezone = _init_timeinfo(timestamp_format, timezone) self._init_stream(stream) - self.s3_bucket: str = s3_bucket - self._remote_folder_path: Optional[str] = None - self._remote_file_count: int = 1 - self._start_timestamp: datetime = datetime.datetime.now() - self.s3_directory: str = (s3_directory.rstrip('/') + '/') if s3_directory else '' - self._obj_key: str = self._remote_log_naming() + try: self._s3_client = boto3.client( "s3", @@ -855,24 +851,32 @@ def __init__( raise RuntimeError("AWS credentials not found. Please configure your credentials.") except botocore.exceptions.ClientError as e: raise RuntimeError(f"Failed to initialize AWS client: {e}") + self.s3_bucket: str = s3_bucket + self._remote_folder_path: Optional[str] = None + self._remote_file_count: int = 1 + self._start_timestamp: datetime = datetime.datetime.now() + self.s3_directory: str = (s3_directory.rstrip('/') + '/') if s3_directory else '' + self._obj_key: str = self._remote_log_naming() - self.upload_part_size: int - if MIN_UPLOAD_PART_SIZE <= upload_part_size <= MAX_UPLOAD_PART_SIZE: - self.upload_part_size = upload_part_size - else: - raise RuntimeError( - f"Invalid upload_part_size: {upload_part_size}. " - f"It must be between {MIN_UPLOAD_PART_SIZE} and {MAX_UPLOAD_PART_SIZE}." + self.use_multipart_upload = use_multipart_upload + if self.use_multipart_upload: + self.upload_part_size: int + if MIN_UPLOAD_PART_SIZE <= upload_part_size <= MAX_UPLOAD_PART_SIZE: + self.upload_part_size = upload_part_size + else: + raise RuntimeError( + f"Invalid upload_part_size: {upload_part_size}. " + f"It must be between {MIN_UPLOAD_PART_SIZE} and {MAX_UPLOAD_PART_SIZE}." + ) + self.max_part_num: int = max_part_num if max_part_num else MAX_PART_NUM_PER_UPLOAD + self._uploaded_parts: List[Dict[str, int | str]] = [] + self._upload_index: int = 1 + create_ret: Dict[str, Any] = self._s3_client.create_multipart_upload( + Bucket=self.s3_bucket, Key=self._obj_key, ChecksumAlgorithm="SHA256" ) - self.max_part_num: int = max_part_num if max_part_num else MAX_PART_NUM_PER_UPLOAD - self._uploaded_parts: List[Dict[str, int | str]] = [] - self._upload_index: int = 1 - create_ret: Dict[str, Any] = self._s3_client.create_multipart_upload( - Bucket=self.s3_bucket, Key=self._obj_key, ChecksumAlgorithm="SHA256" - ) - self._upload_id: int = create_ret["UploadId"] - if not self._upload_id or not isinstance(self._upload_id, str): - raise RuntimeError("Failed to obtain a valid Upload ID from S3.") + self._upload_id: int = create_ret["UploadId"] + if not self._upload_id or not isinstance(self._upload_id, str): + raise RuntimeError("Failed to obtain a valid Upload ID from S3.") def _init_stream(self, stream: IO[bytes]) -> None: self.cctx: ZstdCompressor = ZstdCompressor() @@ -908,11 +912,14 @@ def _write(self, loglevel: int, msg: str) -> None: clp_msg: bytearray clp_msg, self.last_timestamp_ms = _encode_log_event(msg, self.last_timestamp_ms) self._ostream.write(clp_msg) + if not self.use_multipart_upload: + self._ostream.write(EOF_CHAR) self._flush() - if self._local_buffer.tell() >= self.upload_part_size: + if self.use_multipart_upload and self._local_buffer.tell() >= self.upload_part_size: # Rotate after maximum number of parts if self._upload_index >= self.max_part_num: self._complete_upload() + self._ostream.close() self._local_buffer = io.BytesIO() self._init_stream(self._local_buffer) self._remote_file_count += 1 @@ -936,39 +943,52 @@ def _flush(self) -> None: data: bytes = self._local_buffer.getvalue() sha256_checksum: str = base64.b64encode(hashlib.sha256(data).digest()).decode('utf-8') - try: - response: Dict[str, Any] = self._s3_client.upload_part( - Bucket=self.s3_bucket, - Key=self._obj_key, - Body=data, - PartNumber=self._upload_index, - UploadId=self._upload_id, - ChecksumSHA256=sha256_checksum, - ) - - if response["ChecksumSHA256"] != sha256_checksum: - raise ValueError(f"Checksum mismatch for part {self._upload_index}. Upload aborted.") - - # Store both ETag and SHA256 for validation - upload_status: Dict[str, int | str] = { - "PartNumber": self._upload_index, - "ETag": response["ETag"], - "ChecksumSHA256": response["ChecksumSHA256"], - } - - # Determine the part to which the new upload_status belongs - if len(self._uploaded_parts) > self._upload_index - 1: - self._uploaded_parts[self._upload_index-1] = upload_status - else: - self._uploaded_parts.append(upload_status) + if self.use_multipart_upload: + try: + response: Dict[str, Any] = self._s3_client.upload_part( + Bucket=self.s3_bucket, + Key=self._obj_key, + Body=data, + PartNumber=self._upload_index, + UploadId=self._upload_id, + ChecksumSHA256=sha256_checksum, + ) - except Exception as e: - self._s3_client.abort_multipart_upload( - Bucket=self.s3_bucket, Key=self._obj_key, UploadId=self._upload_id - ) - raise Exception( - f'Multipart Upload on Part {self._upload_index}: {e}' - ) from e + if response["ChecksumSHA256"] != sha256_checksum: + raise ValueError(f"Checksum mismatch for part {self._upload_index}. Upload aborted.") + + # Store both ETag and SHA256 for validation + upload_status: Dict[str, int | str] = { + "PartNumber": self._upload_index, + "ETag": response["ETag"], + "ChecksumSHA256": response["ChecksumSHA256"], + } + + # Determine the part to which the new upload_status belongs + if len(self._uploaded_parts) > self._upload_index - 1: + self._uploaded_parts[self._upload_index-1] = upload_status + else: + self._uploaded_parts.append(upload_status) + + except Exception as e: + self._s3_client.abort_multipart_upload( + Bucket=self.s3_bucket, Key=self._obj_key, UploadId=self._upload_id + ) + raise Exception( + f'Multipart Upload on Part {self._upload_index}: {e}' + ) from e + else: + self._ostream.write(EOF_CHAR) + try: + self._s3_client.put_object( + Bucket=self.s3_bucket, + Key=self._obj_key, + Body=data, + ContentEncoding='zstd' if self.enable_compression else 'binary', + ChecksumSHA256=sha256_checksum + ) + except Exception as e: + raise Exception(f'Failed to upload using PutObject: {e}') def _complete_upload(self) -> None: self._ostream.write(EOF_CHAR) @@ -1000,10 +1020,10 @@ def _complete_upload(self) -> None: f'Multipart Upload on Part {self._upload_index}: {e}' ) from e - self._ostream.close() - # override def close(self) -> None: - self._complete_upload() + if self.use_multipart_upload: + self._complete_upload() + self._ostream.close() self.closed = True super().close() \ No newline at end of file From 8744baa3b0c1e4b5cb3ebe2eec8b47ca59537e4a Mon Sep 17 00:00:00 2001 From: IreneLi Date: Tue, 25 Mar 2025 18:53:25 -0400 Subject: [PATCH 27/30] Add comments and error checking --- src/clp_logging/handlers.py | 76 +++++++++++++++++++++++++++++++------ 1 file changed, 65 insertions(+), 11 deletions(-) diff --git a/src/clp_logging/handlers.py b/src/clp_logging/handlers.py index 999d3d9..a88900b 100644 --- a/src/clp_logging/handlers.py +++ b/src/clp_logging/handlers.py @@ -1012,14 +1012,32 @@ def _serialize_kv_pair_log_event( serialize_dict_to_msgpack(auto_gen_kv_pairs), serialize_dict_to_msgpack(user_gen_kv_pairs), ) - - + class CLPS3Handler(CLPBaseHandler): """ Log is written to stream in CLP IR encoding, and uploaded to s3_bucket :param s3_bucket: S3 bucket to upload CLP encoded log messages to + :param stream: Target stream to write log messages to + :param enable_compression: Option to enable/disable stream compression + :param timestamp_format: Timestamp format written in preamble to be + used when generating the logs with a reader. + :param timezone: Timezone written in preamble to be used when + generating the timestamp from Unix epoch time. + :param aws_access_key_id: User's public access key for the S3 bucket. + :param aws_secret_access_key: User's private access key for the S3 bucket. + :param s3_directory: S3 remote directory to upload objects to. + :param use_multipart_upload: Option to use multipart upload to upload + stream segments or use PutObject to upload the entire buffer. + :param max_part_num: Maximum number of parts allowed for a multipart upload + session before uploading to a new object. + :param upload_part_size: Maximum size of a part in a multipart upload + session before writing to a new part. + """ + + def __init__( + self, s3_bucket: str, stream: Optional[IO[bytes]] = None, enable_compression: bool = True, @@ -1027,10 +1045,10 @@ class CLPS3Handler(CLPBaseHandler): timezone: Optional[str] = None, aws_access_key_id: Optional[str] = None, aws_secret_access_key: Optional[str] = None, - max_part_num: Optional[int] = None, - upload_part_size: Optional[int] = MIN_UPLOAD_PART_SIZE, s3_directory: Optional[str] = None, - use_multipart_upload: Optional[bool] = True + use_multipart_upload: Optional[bool] = True, + max_part_num: Optional[int] = None, + upload_part_size: Optional[int] = MIN_UPLOAD_PART_SIZE ) -> None: super().__init__() self.closed: bool = False @@ -1044,7 +1062,8 @@ class CLPS3Handler(CLPBaseHandler): self.timestamp_format, self.timezone = _init_timeinfo(timestamp_format, timezone) self._init_stream(stream) - + # Configure s3-related variables + self.s3_bucket: str = s3_bucket try: self._s3_client = boto3.client( "s3", @@ -1055,7 +1074,6 @@ class CLPS3Handler(CLPBaseHandler): raise RuntimeError("AWS credentials not found. Please configure your credentials.") except botocore.exceptions.ClientError as e: raise RuntimeError(f"Failed to initialize AWS client: {e}") - self.s3_bucket: str = s3_bucket self._remote_folder_path: Optional[str] = None self._remote_file_count: int = 1 self._start_timestamp: datetime = datetime.datetime.now() @@ -1064,6 +1082,7 @@ class CLPS3Handler(CLPBaseHandler): self.use_multipart_upload = use_multipart_upload if self.use_multipart_upload: + # Configure size limit of a part in multipart upload self.upload_part_size: int if MIN_UPLOAD_PART_SIZE <= upload_part_size <= MAX_UPLOAD_PART_SIZE: self.upload_part_size = upload_part_size @@ -1083,6 +1102,11 @@ class CLPS3Handler(CLPBaseHandler): raise RuntimeError("Failed to obtain a valid Upload ID from S3.") def _init_stream(self, stream: IO[bytes]) -> None: + """ + Initialize and configure output stream + + :param stream: Target stream to write log messages to + """ self.cctx: ZstdCompressor = ZstdCompressor() self._ostream: Union[ZstdCompressionWriter, IO[bytes]] = ( self.cctx.stream_writer(self._local_buffer) if self.enable_compression else stream @@ -1095,6 +1119,9 @@ def _init_stream(self, stream: IO[bytes]) -> None: ) def _remote_log_naming(self) -> str: + """ + Set the name of the target S3 object key to upload to + """ self._remote_folder_path: str = f"{self.s3_directory}{self._start_timestamp.year}/{self._start_timestamp.month}/{self._start_timestamp.day}" new_filename: str @@ -1102,7 +1129,7 @@ def _remote_log_naming(self) -> str: file_count: str = f"-{self._remote_file_count}" - # HARD-CODED TO .clp.zst FOR NOW + # Compression uses zstd format if self.enable_compression: new_filename = f"{self._remote_folder_path}/{upload_time}_log{file_count}.clp.zst" else: @@ -1111,18 +1138,26 @@ def _remote_log_naming(self) -> str: # override def _write(self, loglevel: int, msg: str) -> None: + """ + Write the log message stream into a local buffer. + (With use_multipart_upload) Update the part number if the local buffer + exceeds a predetermined buffer size. Then clear the local buffer. + """ if self.closed: raise RuntimeError("Stream already closed") clp_msg: bytearray clp_msg, self.last_timestamp_ms = _encode_log_event(msg, self.last_timestamp_ms) + + # Write log stream to a local buffer and flush to upload self._ostream.write(clp_msg) if not self.use_multipart_upload: self._ostream.write(EOF_CHAR) self._flush() + if self.use_multipart_upload and self._local_buffer.tell() >= self.upload_part_size: # Rotate after maximum number of parts if self._upload_index >= self.max_part_num: - self._complete_upload() + self._complete_multipart_upload() self._ostream.close() self._local_buffer = io.BytesIO() self._init_stream(self._local_buffer) @@ -1143,6 +1178,10 @@ def _write(self, loglevel: int, msg: str) -> None: def _flush(self) -> None: + """ + Upload local buffer to the S3 bucket using upload_part if + use_multipart_upload = True, otherwise use put_object. + """ self._ostream.flush() data: bytes = self._local_buffer.getvalue() sha256_checksum: str = base64.b64encode(hashlib.sha256(data).digest()).decode('utf-8') @@ -1158,6 +1197,7 @@ def _flush(self) -> None: ChecksumSHA256=sha256_checksum, ) + # Verify integrity of the uploaded part using SHA256 Checksum if response["ChecksumSHA256"] != sha256_checksum: raise ValueError(f"Checksum mismatch for part {self._upload_index}. Upload aborted.") @@ -1191,10 +1231,21 @@ def _flush(self) -> None: ContentEncoding='zstd' if self.enable_compression else 'binary', ChecksumSHA256=sha256_checksum ) + + # Verify integrity of the upload using SHA256 Checksum + response = self.s3_client.head_object(Bucket=self.s3_bucket, Key=self.obj_key) + if 'ChecksumSHA256' in response: + s3_checksum = response['ChecksumSHA256'] + if s3_checksum != sha256_checksum: + raise ValueError(f"Checksum mismatch. Upload aborted.") + except Exception as e: raise Exception(f'Failed to upload using PutObject: {e}') - def _complete_upload(self) -> None: + def _complete_multipart_upload(self) -> None: + """ + Complete a multipart upload session and clear the local buffer. + """ self._ostream.write(EOF_CHAR) self._flush() self._local_buffer.seek(0) @@ -1226,8 +1277,11 @@ def _complete_upload(self) -> None: # override def close(self) -> None: + """ + Complete the upload if needed. Close the stream and the handler. + """ if self.use_multipart_upload: - self._complete_upload() + self._complete_multipart_upload() self._ostream.close() self.closed = True super().close() From 238d62576612f8e212604653d4bfe0676232d2be Mon Sep 17 00:00:00 2001 From: IreneLi Date: Wed, 26 Mar 2025 01:29:55 -0400 Subject: [PATCH 28/30] Add comments to explain parameters --- src/clp_logging/handlers.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/clp_logging/handlers.py b/src/clp_logging/handlers.py index a88900b..27dbea1 100644 --- a/src/clp_logging/handlers.py +++ b/src/clp_logging/handlers.py @@ -1021,6 +1021,7 @@ class CLPS3Handler(CLPBaseHandler): :param s3_bucket: S3 bucket to upload CLP encoded log messages to :param stream: Target stream to write log messages to :param enable_compression: Option to enable/disable stream compression + Default: True :param timestamp_format: Timestamp format written in preamble to be used when generating the logs with a reader. :param timezone: Timezone written in preamble to be used when @@ -1030,10 +1031,11 @@ class CLPS3Handler(CLPBaseHandler): :param s3_directory: S3 remote directory to upload objects to. :param use_multipart_upload: Option to use multipart upload to upload stream segments or use PutObject to upload the entire buffer. + Default: True :param max_part_num: Maximum number of parts allowed for a multipart upload - session before uploading to a new object. + session before uploading to a new object. Default: 10000 :param upload_part_size: Maximum size of a part in a multipart upload - session before writing to a new part. + session before writing to a new part. Default: 5MB """ def __init__( @@ -1246,6 +1248,7 @@ def _complete_multipart_upload(self) -> None: """ Complete a multipart upload session and clear the local buffer. """ + # Flush EOF marker to the local buffer and upload self._ostream.write(EOF_CHAR) self._flush() self._local_buffer.seek(0) From 2c0a34adb8d6bf949a72a1dc520ed7b10742200b Mon Sep 17 00:00:00 2001 From: IreneLi Date: Wed, 26 Mar 2025 14:28:19 -0400 Subject: [PATCH 29/30] Maintain consistent naming in S3 handler --- src/clp_logging/handlers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/clp_logging/handlers.py b/src/clp_logging/handlers.py index 27dbea1..af5ef85 100644 --- a/src/clp_logging/handlers.py +++ b/src/clp_logging/handlers.py @@ -1235,9 +1235,9 @@ def _flush(self) -> None: ) # Verify integrity of the upload using SHA256 Checksum - response = self.s3_client.head_object(Bucket=self.s3_bucket, Key=self.obj_key) + response: Dict[str, Any] = self._s3_client.head_object(Bucket=self.s3_bucket, Key=self.obj_key) if 'ChecksumSHA256' in response: - s3_checksum = response['ChecksumSHA256'] + s3_checksum: str = response['ChecksumSHA256'] if s3_checksum != sha256_checksum: raise ValueError(f"Checksum mismatch. Upload aborted.") From b44814353e236fade082ea8cb23131ce469bd0b5 Mon Sep 17 00:00:00 2001 From: IreneLi Date: Wed, 26 Mar 2025 14:29:51 -0400 Subject: [PATCH 30/30] Fix object key naming issue --- src/clp_logging/handlers.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/clp_logging/handlers.py b/src/clp_logging/handlers.py index af5ef85..6aab524 100644 --- a/src/clp_logging/handlers.py +++ b/src/clp_logging/handlers.py @@ -1235,7 +1235,10 @@ def _flush(self) -> None: ) # Verify integrity of the upload using SHA256 Checksum - response: Dict[str, Any] = self._s3_client.head_object(Bucket=self.s3_bucket, Key=self.obj_key) + response: Dict[str, Any] = self._s3_client.head_object( + Bucket=self.s3_bucket, + Key=self._obj_key + ) if 'ChecksumSHA256' in response: s3_checksum: str = response['ChecksumSHA256'] if s3_checksum != sha256_checksum: