diff --git a/paimon-python/pypaimon/common/file_io.py b/paimon-python/pypaimon/common/file_io.py index 80430aef9c8b..3e039248b798 100644 --- a/paimon-python/pypaimon/common/file_io.py +++ b/paimon-python/pypaimon/common/file_io.py @@ -199,7 +199,12 @@ def new_output_stream(self, path: str): def get_file_status(self, path: str): path_str = self.to_filesystem_path(path) file_infos = self.filesystem.get_file_info([path_str]) - return file_infos[0] + file_info = file_infos[0] + + if file_info.type == pyarrow.fs.FileType.NotFound: + raise FileNotFoundError(f"File {path} (resolved as {path_str}) does not exist") + + return file_info def list_status(self, path: str): path_str = self.to_filesystem_path(path) @@ -211,51 +216,73 @@ def list_directories(self, path: str): return [info for info in file_infos if info.type == pyarrow.fs.FileType.Directory] def exists(self, path: str) -> bool: - try: - path_str = self.to_filesystem_path(path) - file_info = self.filesystem.get_file_info([path_str])[0] - result = file_info.type != pyarrow.fs.FileType.NotFound - return result - except Exception: - return False + path_str = self.to_filesystem_path(path) + file_info = self.filesystem.get_file_info([path_str])[0] + return file_info.type != pyarrow.fs.FileType.NotFound def delete(self, path: str, recursive: bool = False) -> bool: - try: - path_str = self.to_filesystem_path(path) - file_info = self.filesystem.get_file_info([path_str])[0] - if file_info.type == pyarrow.fs.FileType.Directory: - if recursive: - self.filesystem.delete_dir_contents(path_str) - else: - self.filesystem.delete_dir(path_str) - else: - self.filesystem.delete_file(path_str) - return True - except Exception as e: - self.logger.warning(f"Failed to delete {path}: {e}") + path_str = self.to_filesystem_path(path) + file_info = self.filesystem.get_file_info([path_str])[0] + + if file_info.type == pyarrow.fs.FileType.NotFound: return False + + if file_info.type == pyarrow.fs.FileType.Directory: + if not recursive: + selector = pyarrow.fs.FileSelector(path_str, recursive=False, allow_not_found=True) + dir_contents = self.filesystem.get_file_info(selector) + if len(dir_contents) > 0: + raise OSError(f"Directory {path} is not empty") + if recursive: + self.filesystem.delete_dir_contents(path_str) + self.filesystem.delete_dir(path_str) + else: + self.filesystem.delete_dir(path_str) + else: + self.filesystem.delete_file(path_str) + return True def mkdirs(self, path: str) -> bool: - try: - path_str = self.to_filesystem_path(path) - self.filesystem.create_dir(path_str, recursive=True) + path_str = self.to_filesystem_path(path) + file_info = self.filesystem.get_file_info([path_str])[0] + + if file_info.type == pyarrow.fs.FileType.Directory: return True - except Exception as e: - self.logger.warning(f"Failed to create directory {path}: {e}") - return False + elif file_info.type == pyarrow.fs.FileType.File: + raise FileExistsError(f"Path exists but is not a directory: {path}") + + self.filesystem.create_dir(path_str, recursive=True) + return True def rename(self, src: str, dst: str) -> bool: + dst_str = self.to_filesystem_path(dst) + dst_parent = Path(dst_str).parent + if str(dst_parent) and not self.exists(str(dst_parent)): + self.mkdirs(str(dst_parent)) + + src_str = self.to_filesystem_path(src) + try: - dst_str = self.to_filesystem_path(dst) - dst_parent = Path(dst_str).parent - if str(dst_parent) and not self.exists(str(dst_parent)): - self.mkdirs(str(dst_parent)) - - src_str = self.to_filesystem_path(src) + if hasattr(self.filesystem, 'rename'): + return self.filesystem.rename(src_str, dst_str) + + dst_file_info = self.filesystem.get_file_info([dst_str])[0] + if dst_file_info.type != pyarrow.fs.FileType.NotFound: + if dst_file_info.type == pyarrow.fs.FileType.File: + return False + # Make it compatible with HadoopFileIO: if dst is an existing directory, + # dst=dst/srcFileName + src_name = Path(src_str).name + dst_str = str(Path(dst_str) / src_name) + final_dst_info = self.filesystem.get_file_info([dst_str])[0] + if final_dst_info.type != pyarrow.fs.FileType.NotFound: + return False + self.filesystem.move(src_str, dst_str) return True - except Exception as e: - self.logger.warning(f"Failed to rename {src} to {dst}: {e}") + except FileNotFoundError: + return False + except (PermissionError, OSError): return False def delete_quietly(self, path: str): @@ -304,6 +331,12 @@ def read_file_utf8(self, path: str) -> str: return input_stream.read().decode('utf-8') def try_to_write_atomic(self, path: str, content: str) -> bool: + if self.exists(path): + path_str = self.to_filesystem_path(path) + file_info = self.filesystem.get_file_info([path_str])[0] + if file_info.type == pyarrow.fs.FileType.Directory: + return False + temp_path = path + str(uuid.uuid4()) + ".tmp" success = False try: @@ -331,6 +364,11 @@ def copy_file(self, source_path: str, target_path: str, overwrite: bool = False) source_str = self.to_filesystem_path(source_path) target_str = self.to_filesystem_path(target_path) + target_parent = Path(target_str).parent + + if str(target_parent) and not self.exists(str(target_parent)): + self.mkdirs(str(target_parent)) + self.filesystem.copy_file(source_str, target_str) def copy_files(self, source_directory: str, target_directory: str, overwrite: bool = False): diff --git a/paimon-python/pypaimon/filesystem/local.py b/paimon-python/pypaimon/filesystem/local.py index c845f8547cce..c48eab6459a7 100644 --- a/paimon-python/pypaimon/filesystem/local.py +++ b/paimon-python/pypaimon/filesystem/local.py @@ -17,6 +17,7 @@ import threading import pyarrow +from pathlib import Path from pyarrow._fs import LocalFileSystem @@ -24,11 +25,25 @@ class PaimonLocalFileSystem(LocalFileSystem): rename_lock = threading.Lock() - def move(self, src, dst): - with PaimonLocalFileSystem.rename_lock: - file_info = self.get_file_info([dst])[0] - result = file_info.type != pyarrow.fs.FileType.NotFound - if (result is True): - raise Exception("Target file already exists") - - super(PaimonLocalFileSystem, self).move(src, dst) + def rename(self, src, dst): + try: + with PaimonLocalFileSystem.rename_lock: + dst_file_info = self.get_file_info([dst])[0] + if dst_file_info.type != pyarrow.fs.FileType.NotFound: + if dst_file_info.type == pyarrow.fs.FileType.File: + return False + # Make it compatible with HadoopFileIO: if dst is an existing directory, + # dst=dst/srcFileName + src_name = Path(src).name + dst = str(Path(dst) / src_name) + final_dst_info = self.get_file_info([dst])[0] + if final_dst_info.type != pyarrow.fs.FileType.NotFound: + return False + + # Perform atomic move + super(PaimonLocalFileSystem, self).move(src, dst) + return True + except FileNotFoundError: + return False + except (PermissionError, OSError): + return False diff --git a/paimon-python/pypaimon/tests/file_io_test.py b/paimon-python/pypaimon/tests/file_io_test.py index bf4003f8efff..ec5d6c4e829a 100644 --- a/paimon-python/pypaimon/tests/file_io_test.py +++ b/paimon-python/pypaimon/tests/file_io_test.py @@ -20,7 +20,9 @@ import tempfile import unittest from pathlib import Path +from unittest.mock import MagicMock, patch +import pyarrow from pyarrow.fs import S3FileSystem, LocalFileSystem from pypaimon.common.file_io import FileIO @@ -155,5 +157,207 @@ def test_write_file_with_overwrite_flag(self): finally: shutil.rmtree(temp_dir, ignore_errors=True) + def test_exists_does_not_catch_exception(self): + temp_dir = tempfile.mkdtemp(prefix="file_io_exists_test_") + try: + warehouse_path = f"file://{temp_dir}" + file_io = FileIO(warehouse_path, {}) + + test_file = os.path.join(temp_dir, "test_file.txt") + with open(test_file, "w") as f: + f.write("test") + self.assertTrue(file_io.exists(f"file://{test_file}")) + self.assertFalse(file_io.exists(f"file://{temp_dir}/nonexistent.txt")) + + mock_filesystem = MagicMock() + mock_filesystem.get_file_info.side_effect = OSError("Permission denied") + file_io.filesystem = mock_filesystem + + with self.assertRaises(OSError) as context: + file_io.exists("file:///some/path") + self.assertIn("Permission denied", str(context.exception)) + + with self.assertRaises(OSError): + file_io.new_output_stream("file:///some/path/file.txt") + + with self.assertRaises(OSError): + file_io.check_or_mkdirs("file:///some/path") + + with self.assertRaises(OSError): + file_io.write_file("file:///some/path", "content", overwrite=False) + + with self.assertRaises(OSError): + file_io.copy_file("file:///src", "file:///dst", overwrite=False) + + with patch.object(file_io, 'read_file_utf8', side_effect=Exception("Read error")): + with self.assertRaises(OSError): + file_io.read_overwritten_file_utf8("file:///some/path") + + mock_filesystem.get_file_info.side_effect = OSError("Network error") + file_io.filesystem = mock_filesystem + + with self.assertRaises(OSError): + file_io.rename("file:///src", "file:///dst") + + file_io.delete_quietly("file:///some/path") + file_io.delete_directory_quietly("file:///some/path") + finally: + shutil.rmtree(temp_dir, ignore_errors=True) + + def test_delete_non_empty_directory_raises_error(self): + temp_dir = tempfile.mkdtemp(prefix="file_io_delete_test_") + try: + warehouse_path = f"file://{temp_dir}" + file_io = FileIO(warehouse_path, {}) + + test_dir = os.path.join(temp_dir, "test_dir") + os.makedirs(test_dir) + test_file = os.path.join(test_dir, "test_file.txt") + with open(test_file, "w") as f: + f.write("test") + + with self.assertRaises(OSError) as context: + file_io.delete(f"file://{test_dir}", recursive=False) + self.assertIn("is not empty", str(context.exception)) + finally: + shutil.rmtree(temp_dir, ignore_errors=True) + + def test_delete_returns_false_when_file_not_exists(self): + temp_dir = tempfile.mkdtemp(prefix="file_io_delete_test_") + try: + warehouse_path = f"file://{temp_dir}" + file_io = FileIO(warehouse_path, {}) + + result = file_io.delete(f"file://{temp_dir}/nonexistent.txt") + self.assertFalse(result, "delete() should return False when file does not exist") + + result = file_io.delete(f"file://{temp_dir}/nonexistent_dir", recursive=False) + self.assertFalse(result, "delete() should return False when directory does not exist") + finally: + shutil.rmtree(temp_dir, ignore_errors=True) + + def test_mkdirs_raises_error_when_path_is_file(self): + temp_dir = tempfile.mkdtemp(prefix="file_io_mkdirs_test_") + try: + warehouse_path = f"file://{temp_dir}" + file_io = FileIO(warehouse_path, {}) + + test_file = os.path.join(temp_dir, "test_file.txt") + with open(test_file, "w") as f: + f.write("test") + + with self.assertRaises(FileExistsError) as context: + file_io.mkdirs(f"file://{test_file}") + self.assertIn("is not a directory", str(context.exception)) + finally: + shutil.rmtree(temp_dir, ignore_errors=True) + + def test_rename_returns_false_when_dst_exists(self): + temp_dir = tempfile.mkdtemp(prefix="file_io_rename_test_") + try: + warehouse_path = f"file://{temp_dir}" + file_io = FileIO(warehouse_path, {}) + + src_file = os.path.join(temp_dir, "src.txt") + dst_file = os.path.join(temp_dir, "dst.txt") + with open(src_file, "w") as f: + f.write("src") + with open(dst_file, "w") as f: + f.write("dst") + + result = file_io.rename(f"file://{src_file}", f"file://{dst_file}") + self.assertFalse(result) + finally: + shutil.rmtree(temp_dir, ignore_errors=True) + + def test_get_file_status_raises_error_when_file_not_exists(self): + temp_dir = tempfile.mkdtemp(prefix="file_io_get_file_status_test_") + try: + warehouse_path = f"file://{temp_dir}" + file_io = FileIO(warehouse_path, {}) + + with self.assertRaises(FileNotFoundError) as context: + file_io.get_file_status(f"file://{temp_dir}/nonexistent.txt") + self.assertIn("does not exist", str(context.exception)) + + test_file = os.path.join(temp_dir, "test_file.txt") + with open(test_file, "w") as f: + f.write("test content") + + file_info = file_io.get_file_status(f"file://{test_file}") + self.assertEqual(file_info.type, pyarrow.fs.FileType.File) + self.assertIsNotNone(file_info.size) + + with self.assertRaises(FileNotFoundError) as context: + file_io.get_file_size(f"file://{temp_dir}/nonexistent.txt") + self.assertIn("does not exist", str(context.exception)) + + with self.assertRaises(FileNotFoundError) as context: + file_io.is_dir(f"file://{temp_dir}/nonexistent_dir") + self.assertIn("does not exist", str(context.exception)) + finally: + shutil.rmtree(temp_dir, ignore_errors=True) + + def test_copy_file(self): + temp_dir = tempfile.mkdtemp(prefix="file_io_copy_test_") + try: + warehouse_path = f"file://{temp_dir}" + file_io = FileIO(warehouse_path, {}) + + source_file = os.path.join(temp_dir, "source.txt") + target_file = os.path.join(temp_dir, "target.txt") + + with open(source_file, "w") as f: + f.write("source content") + + # Test 1: Raises FileExistsError when target exists and overwrite=False + with open(target_file, "w") as f: + f.write("target content") + + with self.assertRaises(FileExistsError) as context: + file_io.copy_file(f"file://{source_file}", f"file://{target_file}", overwrite=False) + self.assertIn("already exists", str(context.exception)) + + with open(target_file, "r") as f: + self.assertEqual(f.read(), "target content") + + # Test 2: Overwrites when overwrite=True + file_io.copy_file(f"file://{source_file}", f"file://{target_file}", overwrite=True) + with open(target_file, "r") as f: + self.assertEqual(f.read(), "source content") + + # Test 3: Creates parent directory if it doesn't exist + target_file_in_subdir = os.path.join(temp_dir, "subdir", "target.txt") + file_io.copy_file(f"file://{source_file}", f"file://{target_file_in_subdir}", overwrite=False) + self.assertTrue(os.path.exists(target_file_in_subdir)) + with open(target_file_in_subdir, "r") as f: + self.assertEqual(f.read(), "source content") + finally: + shutil.rmtree(temp_dir, ignore_errors=True) + + def test_try_to_write_atomic(self): + temp_dir = tempfile.mkdtemp(prefix="file_io_try_write_atomic_test_") + try: + warehouse_path = f"file://{temp_dir}" + file_io = FileIO(warehouse_path, {}) + + target_dir = os.path.join(temp_dir, "target_dir") + os.makedirs(target_dir) + + result = file_io.try_to_write_atomic(f"file://{target_dir}", "test content") + self.assertFalse(result, "try_to_write_atomic should return False when target is a directory") + + self.assertTrue(os.path.isdir(target_dir)) + self.assertEqual(len(os.listdir(target_dir)), 0, "No file should be created inside the directory") + + normal_file = os.path.join(temp_dir, "normal_file.txt") + result = file_io.try_to_write_atomic(f"file://{normal_file}", "test content") + self.assertTrue(result, "try_to_write_atomic should succeed for a normal file path") + self.assertTrue(os.path.exists(normal_file)) + with open(normal_file, "r") as f: + self.assertEqual(f.read(), "test content") + finally: + shutil.rmtree(temp_dir, ignore_errors=True) + if __name__ == '__main__': unittest.main() diff --git a/paimon-python/pypaimon/tests/filesystem_catalog_test.py b/paimon-python/pypaimon/tests/filesystem_catalog_test.py index 02c7102ab817..6bdba47b81a6 100644 --- a/paimon-python/pypaimon/tests/filesystem_catalog_test.py +++ b/paimon-python/pypaimon/tests/filesystem_catalog_test.py @@ -18,6 +18,7 @@ import shutil import tempfile import unittest +from unittest.mock import MagicMock from pypaimon import CatalogFactory, Schema from pypaimon.catalog.catalog_exception import (DatabaseAlreadyExistException, @@ -171,3 +172,20 @@ def test_alter_table(self): ) table = catalog.get_table(identifier) self.assertEqual(len(table.fields), 2) + + def test_get_database_propagates_exists_error(self): + catalog = CatalogFactory.create({ + "warehouse": self.warehouse + }) + + with self.assertRaises(DatabaseNotExistException): + catalog.get_database("nonexistent_db") + + mock_filesystem = MagicMock() + mock_filesystem.get_file_info.side_effect = OSError("Permission denied") + catalog.file_io.filesystem = mock_filesystem + + with self.assertRaises(OSError) as context: + catalog.get_database("test_db") + self.assertIn("Permission denied", str(context.exception)) + self.assertNotIsInstance(context.exception, DatabaseNotExistException)