Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 73 additions & 35 deletions paimon-python/pypaimon/common/file_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,12 @@ def new_output_stream(self, path: str):
def get_file_status(self, path: str):
path_str = self.to_filesystem_path(path)
file_infos = self.filesystem.get_file_info([path_str])
return file_infos[0]
file_info = file_infos[0]

if file_info.type == pyarrow.fs.FileType.NotFound:
raise FileNotFoundError(f"File {path} (resolved as {path_str}) does not exist")

return file_info

def list_status(self, path: str):
path_str = self.to_filesystem_path(path)
Expand All @@ -211,51 +216,73 @@ def list_directories(self, path: str):
return [info for info in file_infos if info.type == pyarrow.fs.FileType.Directory]

def exists(self, path: str) -> bool:
try:
path_str = self.to_filesystem_path(path)
file_info = self.filesystem.get_file_info([path_str])[0]
result = file_info.type != pyarrow.fs.FileType.NotFound
return result
except Exception:
return False
path_str = self.to_filesystem_path(path)
file_info = self.filesystem.get_file_info([path_str])[0]
return file_info.type != pyarrow.fs.FileType.NotFound

def delete(self, path: str, recursive: bool = False) -> bool:
try:
path_str = self.to_filesystem_path(path)
file_info = self.filesystem.get_file_info([path_str])[0]
if file_info.type == pyarrow.fs.FileType.Directory:
if recursive:
self.filesystem.delete_dir_contents(path_str)
else:
self.filesystem.delete_dir(path_str)
else:
self.filesystem.delete_file(path_str)
return True
except Exception as e:
self.logger.warning(f"Failed to delete {path}: {e}")
path_str = self.to_filesystem_path(path)
file_info = self.filesystem.get_file_info([path_str])[0]

if file_info.type == pyarrow.fs.FileType.NotFound:
return False

if file_info.type == pyarrow.fs.FileType.Directory:
if not recursive:
selector = pyarrow.fs.FileSelector(path_str, recursive=False, allow_not_found=True)
dir_contents = self.filesystem.get_file_info(selector)
if len(dir_contents) > 0:
raise OSError(f"Directory {path} is not empty")
if recursive:
self.filesystem.delete_dir_contents(path_str)
self.filesystem.delete_dir(path_str)
else:
self.filesystem.delete_dir(path_str)
else:
self.filesystem.delete_file(path_str)
return True

def mkdirs(self, path: str) -> bool:
try:
path_str = self.to_filesystem_path(path)
self.filesystem.create_dir(path_str, recursive=True)
path_str = self.to_filesystem_path(path)
file_info = self.filesystem.get_file_info([path_str])[0]

if file_info.type == pyarrow.fs.FileType.Directory:
return True
except Exception as e:
self.logger.warning(f"Failed to create directory {path}: {e}")
return False
elif file_info.type == pyarrow.fs.FileType.File:
raise FileExistsError(f"Path exists but is not a directory: {path}")

self.filesystem.create_dir(path_str, recursive=True)
return True

def rename(self, src: str, dst: str) -> bool:
dst_str = self.to_filesystem_path(dst)
dst_parent = Path(dst_str).parent
if str(dst_parent) and not self.exists(str(dst_parent)):
self.mkdirs(str(dst_parent))

src_str = self.to_filesystem_path(src)

try:
dst_str = self.to_filesystem_path(dst)
dst_parent = Path(dst_str).parent
if str(dst_parent) and not self.exists(str(dst_parent)):
self.mkdirs(str(dst_parent))

src_str = self.to_filesystem_path(src)
if hasattr(self.filesystem, 'rename'):
return self.filesystem.rename(src_str, dst_str)

dst_file_info = self.filesystem.get_file_info([dst_str])[0]
if dst_file_info.type != pyarrow.fs.FileType.NotFound:
if dst_file_info.type == pyarrow.fs.FileType.File:
return False
# Make it compatible with HadoopFileIO: if dst is an existing directory,
# dst=dst/srcFileName
src_name = Path(src_str).name
dst_str = str(Path(dst_str) / src_name)
final_dst_info = self.filesystem.get_file_info([dst_str])[0]
if final_dst_info.type != pyarrow.fs.FileType.NotFound:
return False

self.filesystem.move(src_str, dst_str)
return True
except Exception as e:
self.logger.warning(f"Failed to rename {src} to {dst}: {e}")
except FileNotFoundError:
return False
except (PermissionError, OSError):
return False

def delete_quietly(self, path: str):
Expand Down Expand Up @@ -304,6 +331,12 @@ def read_file_utf8(self, path: str) -> str:
return input_stream.read().decode('utf-8')

def try_to_write_atomic(self, path: str, content: str) -> bool:
if self.exists(path):
path_str = self.to_filesystem_path(path)
file_info = self.filesystem.get_file_info([path_str])[0]
if file_info.type == pyarrow.fs.FileType.Directory:
return False

temp_path = path + str(uuid.uuid4()) + ".tmp"
success = False
try:
Expand Down Expand Up @@ -331,6 +364,11 @@ def copy_file(self, source_path: str, target_path: str, overwrite: bool = False)

source_str = self.to_filesystem_path(source_path)
target_str = self.to_filesystem_path(target_path)
target_parent = Path(target_str).parent

if str(target_parent) and not self.exists(str(target_parent)):
self.mkdirs(str(target_parent))

self.filesystem.copy_file(source_str, target_str)

def copy_files(self, source_directory: str, target_directory: str, overwrite: bool = False):
Expand Down
31 changes: 23 additions & 8 deletions paimon-python/pypaimon/filesystem/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,33 @@

import threading
import pyarrow
from pathlib import Path
from pyarrow._fs import LocalFileSystem


class PaimonLocalFileSystem(LocalFileSystem):

rename_lock = threading.Lock()

def move(self, src, dst):
with PaimonLocalFileSystem.rename_lock:
file_info = self.get_file_info([dst])[0]
result = file_info.type != pyarrow.fs.FileType.NotFound
if (result is True):
raise Exception("Target file already exists")

super(PaimonLocalFileSystem, self).move(src, dst)
def rename(self, src, dst):
try:
with PaimonLocalFileSystem.rename_lock:
dst_file_info = self.get_file_info([dst])[0]
if dst_file_info.type != pyarrow.fs.FileType.NotFound:
if dst_file_info.type == pyarrow.fs.FileType.File:
return False
# Make it compatible with HadoopFileIO: if dst is an existing directory,
# dst=dst/srcFileName
src_name = Path(src).name
dst = str(Path(dst) / src_name)
final_dst_info = self.get_file_info([dst])[0]
if final_dst_info.type != pyarrow.fs.FileType.NotFound:
return False

# Perform atomic move
super(PaimonLocalFileSystem, self).move(src, dst)
return True
except FileNotFoundError:
return False
except (PermissionError, OSError):
return False
Loading