diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog.py b/paimon-python/pypaimon/catalog/filesystem_catalog.py index 8d1b485b7425..65a453896696 100644 --- a/paimon-python/pypaimon/catalog/filesystem_catalog.py +++ b/paimon-python/pypaimon/catalog/filesystem_catalog.py @@ -46,7 +46,7 @@ def __init__(self, catalog_options: Options): raise ValueError(f"Paimon '{CatalogOptions.WAREHOUSE.key()}' path must be set") self.warehouse = catalog_options.get(CatalogOptions.WAREHOUSE) self.catalog_options = catalog_options - self.file_io = FileIO(self.warehouse, self.catalog_options) + self.file_io = FileIO.get(self.warehouse, self.catalog_options) def get_database(self, name: str) -> Database: if self.file_io.exists(self.get_database_path(name)): diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py b/paimon-python/pypaimon/catalog/rest/rest_catalog.py index 5d9462f6c377..41a3061fb93c 100644 --- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py +++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py @@ -252,7 +252,7 @@ def to_table_metadata(self, db: str, response: GetTableResponse) -> TableMetadat ) def file_io_from_options(self, table_path: str) -> FileIO: - return FileIO(table_path, self.context.options) + return FileIO.get(table_path, self.context.options) def file_io_for_data(self, table_path: str, identifier: Identifier): return RESTTokenFileIO(identifier, table_path, self.context.options) \ diff --git a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py index 2cec5df7216c..f686dc66ea37 100644 --- a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py +++ b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py @@ -20,70 +20,164 @@ import time from typing import Optional -from pyarrow._fs import FileSystem +from cachetools import TTLCache from pypaimon.api.rest_api import RESTApi from pypaimon.api.rest_util import RESTUtil from pypaimon.catalog.rest.rest_token import RESTToken from pypaimon.common.file_io import FileIO +from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO from pypaimon.common.identifier import Identifier from pypaimon.common.options import Options from pypaimon.common.options.config import CatalogOptions, OssOptions +from pypaimon.common.uri_reader import UriReaderFactory class RESTTokenFileIO(FileIO): - + """ + A FileIO to support getting token from REST Server. + """ + + _FILE_IO_CACHE_MAXSIZE = 1000 + _FILE_IO_CACHE_TTL = 36000 # 10 hours in seconds + def __init__(self, identifier: Identifier, path: str, catalog_options: Optional[Options] = None): self.identifier = identifier self.path = path + self.catalog_options = catalog_options + self.properties = catalog_options or Options({}) # For compatibility with refresh_token() self.token: Optional[RESTToken] = None self.api_instance: Optional[RESTApi] = None self.lock = threading.Lock() self.log = logging.getLogger(__name__) - super().__init__(path, catalog_options) + self._uri_reader_factory_cache: Optional[UriReaderFactory] = None + self._file_io_cache: TTLCache = TTLCache( + maxsize=self._FILE_IO_CACHE_MAXSIZE, + ttl=self._FILE_IO_CACHE_TTL + ) def __getstate__(self): state = self.__dict__.copy() # Remove non-serializable objects state.pop('lock', None) state.pop('api_instance', None) + state.pop('_file_io_cache', None) + state.pop('_uri_reader_factory_cache', None) # token can be serialized, but we'll refresh it on deserialization return state def __setstate__(self, state): self.__dict__.update(state) - # Recreate lock after deserialization + # Recreate lock and cache after deserialization self.lock = threading.Lock() + self._file_io_cache = TTLCache( + maxsize=self._FILE_IO_CACHE_MAXSIZE, + ttl=self._FILE_IO_CACHE_TTL + ) + self._uri_reader_factory_cache = None # api_instance will be recreated when needed self.api_instance = None - def _initialize_oss_fs(self, path) -> FileSystem: + def file_io(self) -> FileIO: self.try_to_refresh_token() - merged_token = self._merge_token_with_catalog_options(self.token.token) - merged_properties = RESTUtil.merge( - self.properties.to_map() if self.properties else {}, - merged_token - ) - merged_options = Options(merged_properties) - original_properties = self.properties - self.properties = merged_options - try: - return super()._initialize_oss_fs(path) - finally: - self.properties = original_properties + + if self.token is None: + return FileIO.get(self.path, self.catalog_options or Options({})) + + cache_key = self.token + + file_io = self._file_io_cache.get(cache_key) + if file_io is not None: + return file_io + + with self.lock: + file_io = self._file_io_cache.get(cache_key) + if file_io is not None: + return file_io + + merged_token = self._merge_token_with_catalog_options(self.token.token) + merged_properties = RESTUtil.merge( + self.catalog_options.to_map() if self.catalog_options else {}, + merged_token + ) + merged_options = Options(merged_properties) + + file_io = PyArrowFileIO(self.path, merged_options) + self._file_io_cache[cache_key] = file_io + return file_io def _merge_token_with_catalog_options(self, token: dict) -> dict: """Merge token with catalog options, DLF OSS endpoint should override the standard OSS endpoint.""" merged_token = dict(token) - dlf_oss_endpoint = self.properties.get(CatalogOptions.DLF_OSS_ENDPOINT) - if dlf_oss_endpoint and dlf_oss_endpoint.strip(): - merged_token[OssOptions.OSS_ENDPOINT.key()] = dlf_oss_endpoint + if self.catalog_options: + dlf_oss_endpoint = self.catalog_options.get(CatalogOptions.DLF_OSS_ENDPOINT) + if dlf_oss_endpoint and dlf_oss_endpoint.strip(): + merged_token[OssOptions.OSS_ENDPOINT.key()] = dlf_oss_endpoint return merged_token + def new_input_stream(self, path: str): + return self.file_io().new_input_stream(path) + def new_output_stream(self, path: str): - # Call parent class method to ensure path conversion and parent directory creation - return super().new_output_stream(path) + return self.file_io().new_output_stream(path) + + def get_file_status(self, path: str): + return self.file_io().get_file_status(path) + + def list_status(self, path: str): + return self.file_io().list_status(path) + + def exists(self, path: str) -> bool: + return self.file_io().exists(path) + + def delete(self, path: str, recursive: bool = False) -> bool: + return self.file_io().delete(path, recursive) + + def mkdirs(self, path: str) -> bool: + return self.file_io().mkdirs(path) + + def rename(self, src: str, dst: str) -> bool: + return self.file_io().rename(src, dst) + + def copy_file(self, source_path: str, target_path: str, overwrite: bool = False): + return self.file_io().copy_file(source_path, target_path, overwrite) + + def to_filesystem_path(self, path: str) -> str: + return self.file_io().to_filesystem_path(path) + + def try_to_write_atomic(self, path: str, content: str) -> bool: + return self.file_io().try_to_write_atomic(path, content) + + def write_parquet(self, path: str, data, compression: str = 'zstd', + zstd_level: int = 1, **kwargs): + return self.file_io().write_parquet(path, data, compression, zstd_level, **kwargs) + + def write_orc(self, path: str, data, compression: str = 'zstd', + zstd_level: int = 1, **kwargs): + return self.file_io().write_orc(path, data, compression, zstd_level, **kwargs) + + def write_avro(self, path: str, data, avro_schema=None, + compression: str = 'zstd', zstd_level: int = 1, **kwargs): + return self.file_io().write_avro(path, data, avro_schema, compression, zstd_level, **kwargs) + + def write_lance(self, path: str, data, **kwargs): + return self.file_io().write_lance(path, data, **kwargs) + + def write_blob(self, path: str, data, blob_as_descriptor: bool, **kwargs): + return self.file_io().write_blob(path, data, blob_as_descriptor, **kwargs) + + @property + def uri_reader_factory(self): + if self._uri_reader_factory_cache is None: + catalog_options = self.catalog_options or Options({}) + self._uri_reader_factory_cache = UriReaderFactory(catalog_options) + + return self._uri_reader_factory_cache + + @property + def filesystem(self): + return self.file_io().filesystem def try_to_refresh_token(self): if self.should_refresh(): @@ -111,3 +205,12 @@ def refresh_token(self): def valid_token(self): self.try_to_refresh_token() return self.token + + def close(self): + with self.lock: + for file_io in self._file_io_cache.values(): + try: + file_io.close() + except Exception as e: + self.log.warning(f"Error closing cached FileIO: {e}") + self._file_io_cache.clear() diff --git a/paimon-python/pypaimon/common/file_io.py b/paimon-python/pypaimon/common/file_io.py index 3e039248b798..536e06c0b13d 100644 --- a/paimon-python/pypaimon/common/file_io.py +++ b/paimon-python/pypaimon/common/file_io.py @@ -16,298 +16,78 @@ # limitations under the License. ################################################################################ import logging -import os -import subprocess import uuid +from abc import ABC, abstractmethod from pathlib import Path -from typing import Any, Dict, List, Optional -from urllib.parse import splitport, urlparse +from typing import List, Optional import pyarrow -from packaging.version import parse -from pyarrow._fs import FileSystem from pypaimon.common.options import Options -from pypaimon.common.options.config import OssOptions, S3Options -from pypaimon.common.uri_reader import UriReaderFactory -from pypaimon.filesystem.local import PaimonLocalFileSystem -from pypaimon.schema.data_types import DataField, AtomicType, PyarrowFieldParser -from pypaimon.table.row.blob import BlobData, BlobDescriptor, Blob -from pypaimon.table.row.generic_row import GenericRow -from pypaimon.table.row.row_kind import RowKind -from pypaimon.write.blob_format_writer import BlobFormatWriter - - -class FileIO: - def __init__(self, path: str, catalog_options: Options): - self.properties = catalog_options - self.logger = logging.getLogger(__name__) - scheme, netloc, _ = self.parse_location(path) - self.uri_reader_factory = UriReaderFactory(catalog_options) - if scheme in {"oss"}: - self.filesystem = self._initialize_oss_fs(path) - elif scheme in {"s3", "s3a", "s3n"}: - self.filesystem = self._initialize_s3_fs() - elif scheme in {"hdfs", "viewfs"}: - self.filesystem = self._initialize_hdfs_fs(scheme, netloc) - elif scheme in {"file"}: - self.filesystem = self._initialize_local_fs() - else: - raise ValueError(f"Unrecognized filesystem type in URI: {scheme}") - - @staticmethod - def parse_location(location: str): - uri = urlparse(location) - if not uri.scheme: - return "file", uri.netloc, os.path.abspath(location) - elif uri.scheme in ("hdfs", "viewfs"): - return uri.scheme, uri.netloc, uri.path - else: - return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}" - - @staticmethod - def _create_s3_retry_config( - max_attempts: int = 10, - request_timeout: int = 60, - connect_timeout: int = 60 - ) -> Dict[str, Any]: - """ - AwsStandardS3RetryStrategy and timeout parameters are only available - in PyArrow >= 8.0.0. - """ - if parse(pyarrow.__version__) >= parse("8.0.0"): - config = { - 'request_timeout': request_timeout, - 'connect_timeout': connect_timeout - } - try: - from pyarrow.fs import AwsStandardS3RetryStrategy - retry_strategy = AwsStandardS3RetryStrategy(max_attempts=max_attempts) - config['retry_strategy'] = retry_strategy - except ImportError: - pass - return config - else: - return {} - - def _extract_oss_bucket(self, location) -> str: - uri = urlparse(location) - if uri.scheme and uri.scheme != "oss": - raise ValueError("Not an OSS URI: {}".format(location)) - - netloc = uri.netloc or "" - # parse oss://access_id:secret_key@Endpoint/bucket/path/to/object - if (getattr(uri, "username", None) or getattr(uri, "password", None)) or ("@" in netloc): - first_segment = uri.path.lstrip("/").split("/", 1)[0] - if not first_segment: - raise ValueError("Invalid OSS URI without bucket: {}".format(location)) - return first_segment - - # parse oss://bucket/... or oss://bucket.endpoint/... - host = getattr(uri, "hostname", None) or netloc - if not host: - raise ValueError("Invalid OSS URI without host: {}".format(location)) - bucket = host.split(".", 1)[0] - if not bucket: - raise ValueError("Invalid OSS URI without bucket: {}".format(location)) - return bucket - - def _initialize_oss_fs(self, path) -> FileSystem: - from pyarrow.fs import S3FileSystem - - client_kwargs = { - "access_key": self.properties.get(OssOptions.OSS_ACCESS_KEY_ID), - "secret_key": self.properties.get(OssOptions.OSS_ACCESS_KEY_SECRET), - "session_token": self.properties.get(OssOptions.OSS_SECURITY_TOKEN), - "region": self.properties.get(OssOptions.OSS_REGION), - } - - # Based on https://github.com/apache/arrow/issues/40506 - if parse(pyarrow.__version__) >= parse("7.0.0"): - client_kwargs['force_virtual_addressing'] = True - client_kwargs['endpoint_override'] = self.properties.get(OssOptions.OSS_ENDPOINT) - else: - oss_bucket = self._extract_oss_bucket(path) - client_kwargs['endpoint_override'] = (oss_bucket + "." + - self.properties.get(OssOptions.OSS_ENDPOINT)) - - retry_config = self._create_s3_retry_config() - client_kwargs.update(retry_config) - return S3FileSystem(**client_kwargs) - - def _initialize_s3_fs(self) -> FileSystem: - from pyarrow.fs import S3FileSystem - - client_kwargs = { - "endpoint_override": self.properties.get(S3Options.S3_ENDPOINT), - "access_key": self.properties.get(S3Options.S3_ACCESS_KEY_ID), - "secret_key": self.properties.get(S3Options.S3_ACCESS_KEY_SECRET), - "session_token": self.properties.get(S3Options.S3_SECURITY_TOKEN), - "region": self.properties.get(S3Options.S3_REGION), - "force_virtual_addressing": True, - } - - retry_config = self._create_s3_retry_config() - client_kwargs.update(retry_config) - - return S3FileSystem(**client_kwargs) - - def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: - from pyarrow.fs import HadoopFileSystem - - if 'HADOOP_HOME' not in os.environ: - raise RuntimeError("HADOOP_HOME environment variable is not set.") - if 'HADOOP_CONF_DIR' not in os.environ: - raise RuntimeError("HADOOP_CONF_DIR environment variable is not set.") - - hadoop_home = os.environ.get("HADOOP_HOME") - native_lib_path = f"{hadoop_home}/lib/native" - os.environ['LD_LIBRARY_PATH'] = f"{native_lib_path}:{os.environ.get('LD_LIBRARY_PATH', '')}" - - class_paths = subprocess.run( - [f'{hadoop_home}/bin/hadoop', 'classpath', '--glob'], - capture_output=True, - text=True, - check=True - ) - os.environ['CLASSPATH'] = class_paths.stdout.strip() - - host, port_str = splitport(netloc) - return HadoopFileSystem( - host=host, - port=int(port_str), - user=os.environ.get('HADOOP_USER_NAME', 'hadoop') - ) - - def _initialize_local_fs(self) -> FileSystem: - - return PaimonLocalFileSystem() +class FileIO(ABC): + """ + File IO interface to read and write files. + """ + + @abstractmethod def new_input_stream(self, path: str): - path_str = self.to_filesystem_path(path) - return self.filesystem.open_input_file(path_str) + pass + @abstractmethod def new_output_stream(self, path: str): - path_str = self.to_filesystem_path(path) - parent_dir = Path(path_str).parent - if str(parent_dir) and not self.exists(str(parent_dir)): - self.mkdirs(str(parent_dir)) - - return self.filesystem.open_output_stream(path_str) - + pass + + @abstractmethod def get_file_status(self, path: str): - path_str = self.to_filesystem_path(path) - file_infos = self.filesystem.get_file_info([path_str]) - file_info = file_infos[0] - - if file_info.type == pyarrow.fs.FileType.NotFound: - raise FileNotFoundError(f"File {path} (resolved as {path_str}) does not exist") - - return file_info - + pass + + @abstractmethod def list_status(self, path: str): - path_str = self.to_filesystem_path(path) - selector = pyarrow.fs.FileSelector(path_str, recursive=False, allow_not_found=True) - return self.filesystem.get_file_info(selector) - - def list_directories(self, path: str): - file_infos = self.list_status(path) - return [info for info in file_infos if info.type == pyarrow.fs.FileType.Directory] + pass + @abstractmethod def exists(self, path: str) -> bool: - path_str = self.to_filesystem_path(path) - file_info = self.filesystem.get_file_info([path_str])[0] - return file_info.type != pyarrow.fs.FileType.NotFound + pass + @abstractmethod def delete(self, path: str, recursive: bool = False) -> bool: - path_str = self.to_filesystem_path(path) - file_info = self.filesystem.get_file_info([path_str])[0] - - if file_info.type == pyarrow.fs.FileType.NotFound: - return False - - if file_info.type == pyarrow.fs.FileType.Directory: - if not recursive: - selector = pyarrow.fs.FileSelector(path_str, recursive=False, allow_not_found=True) - dir_contents = self.filesystem.get_file_info(selector) - if len(dir_contents) > 0: - raise OSError(f"Directory {path} is not empty") - if recursive: - self.filesystem.delete_dir_contents(path_str) - self.filesystem.delete_dir(path_str) - else: - self.filesystem.delete_dir(path_str) - else: - self.filesystem.delete_file(path_str) - return True - + pass + + @abstractmethod def mkdirs(self, path: str) -> bool: - path_str = self.to_filesystem_path(path) - file_info = self.filesystem.get_file_info([path_str])[0] - - if file_info.type == pyarrow.fs.FileType.Directory: - return True - elif file_info.type == pyarrow.fs.FileType.File: - raise FileExistsError(f"Path exists but is not a directory: {path}") - - self.filesystem.create_dir(path_str, recursive=True) - return True - + pass + + @abstractmethod def rename(self, src: str, dst: str) -> bool: - dst_str = self.to_filesystem_path(dst) - dst_parent = Path(dst_str).parent - if str(dst_parent) and not self.exists(str(dst_parent)): - self.mkdirs(str(dst_parent)) - - src_str = self.to_filesystem_path(src) - - try: - if hasattr(self.filesystem, 'rename'): - return self.filesystem.rename(src_str, dst_str) - - dst_file_info = self.filesystem.get_file_info([dst_str])[0] - if dst_file_info.type != pyarrow.fs.FileType.NotFound: - if dst_file_info.type == pyarrow.fs.FileType.File: - return False - # Make it compatible with HadoopFileIO: if dst is an existing directory, - # dst=dst/srcFileName - src_name = Path(src_str).name - dst_str = str(Path(dst_str) / src_name) - final_dst_info = self.filesystem.get_file_info([dst_str])[0] - if final_dst_info.type != pyarrow.fs.FileType.NotFound: - return False - - self.filesystem.move(src_str, dst_str) - return True - except FileNotFoundError: - return False - except (PermissionError, OSError): - return False - + pass + def delete_quietly(self, path: str): - if self.logger.isEnabledFor(logging.DEBUG): - self.logger.debug(f"Ready to delete {path}") + logger = logging.getLogger(__name__) + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"Ready to delete {path}") try: if not self.delete(path, False) and self.exists(path): - self.logger.warning(f"Failed to delete file {path}") + logger.warning(f"Failed to delete file {path}") except Exception: - self.logger.warning(f"Exception occurs when deleting file {path}", exc_info=True) + logger.warning(f"Exception occurs when deleting file {path}", exc_info=True) def delete_files_quietly(self, files: List[str]): for file_path in files: self.delete_quietly(file_path) def delete_directory_quietly(self, directory: str): - if self.logger.isEnabledFor(logging.DEBUG): - self.logger.debug(f"Ready to delete {directory}") + logger = logging.getLogger(__name__) + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"Ready to delete {directory}") try: if not self.delete(directory, True) and self.exists(directory): - self.logger.warning(f"Failed to delete directory {directory}") + logger.warning(f"Failed to delete directory {directory}") except Exception: - self.logger.warning(f"Exception occurs when deleting directory {directory}", exc_info=True) + logger.warning(f"Exception occurs when deleting directory {directory}", exc_info=True) def get_file_size(self, path: str) -> int: file_info = self.get_file_status(path) @@ -332,9 +112,7 @@ def read_file_utf8(self, path: str) -> str: def try_to_write_atomic(self, path: str, content: str) -> bool: if self.exists(path): - path_str = self.to_filesystem_path(path) - file_info = self.filesystem.get_file_info([path_str])[0] - if file_info.type == pyarrow.fs.FileType.Directory: + if self.is_dir(path): return False temp_path = path + str(uuid.uuid4()) + ".tmp" @@ -345,7 +123,7 @@ def try_to_write_atomic(self, path: str, content: str) -> bool: finally: if not success: self.delete_quietly(temp_path) - return success + return success def write_file(self, path: str, content: str, overwrite: bool = False): if not overwrite and self.exists(path): @@ -362,14 +140,15 @@ def copy_file(self, source_path: str, target_path: str, overwrite: bool = False) if not overwrite and self.exists(target_path): raise FileExistsError(f"Target file {target_path} already exists and overwrite=False") - source_str = self.to_filesystem_path(source_path) target_str = self.to_filesystem_path(target_path) target_parent = Path(target_str).parent if str(target_parent) and not self.exists(str(target_parent)): self.mkdirs(str(target_parent)) - self.filesystem.copy_file(source_str, target_str) + with self.new_input_stream(source_path) as input_stream: + with self.new_output_stream(target_path) as output_stream: + output_stream.write(input_stream.read()) def copy_files(self, source_directory: str, target_directory: str, overwrite: bool = False): file_infos = self.list_status(source_directory) @@ -407,196 +186,58 @@ def read_overwritten_file_utf8(self, path: str) -> Optional[str]: return None - def write_parquet(self, path: str, data: pyarrow.Table, compression: str = 'zstd', - zstd_level: int = 1, **kwargs): - try: - import pyarrow.parquet as pq + def to_filesystem_path(self, path: str) -> str: + return path - with self.new_output_stream(path) as output_stream: - if compression.lower() == 'zstd': - kwargs['compression_level'] = zstd_level - pq.write_table(data, output_stream, compression=compression, **kwargs) + def parse_location(self, location: str): + from urllib.parse import urlparse + import os - except Exception as e: - self.delete_quietly(path) - raise RuntimeError(f"Failed to write Parquet file {path}: {e}") from e + uri = urlparse(location) + if not uri.scheme: + return "file", uri.netloc, os.path.abspath(location) + elif uri.scheme in ("hdfs", "viewfs"): + return uri.scheme, uri.netloc, uri.path + else: + return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}" - def write_orc(self, path: str, data: pyarrow.Table, compression: str = 'zstd', + def write_parquet(self, path: str, data, compression: str = 'zstd', + zstd_level: int = 1, **kwargs): + raise NotImplementedError("write_parquet must be implemented by FileIO subclasses") + + def write_orc(self, path: str, data, compression: str = 'zstd', zstd_level: int = 1, **kwargs): - try: - """Write ORC file using PyArrow ORC writer. - - Note: PyArrow's ORC writer doesn't support compression_level parameter. - ORC files will use zstd compression with default level - (which is 3, see https://github.com/facebook/zstd/blob/dev/programs/zstdcli.c) - instead of the specified level. - """ - import sys - import pyarrow.orc as orc - - with self.new_output_stream(path) as output_stream: - # Check Python version - if 3.6, don't use compression parameter - if sys.version_info[:2] == (3, 6): - orc.write_table(data, output_stream, **kwargs) - else: - orc.write_table( - data, - output_stream, - compression=compression, - **kwargs - ) - - except Exception as e: - self.delete_quietly(path) - raise RuntimeError(f"Failed to write ORC file {path}: {e}") from e - - def write_avro( - self, path: str, data: pyarrow.Table, - avro_schema: Optional[Dict[str, Any]] = None, - compression: str = 'zstd', zstd_level: int = 1, **kwargs): - import fastavro - if avro_schema is None: - from pypaimon.schema.data_types import PyarrowFieldParser - avro_schema = PyarrowFieldParser.to_avro_schema(data.schema) - - records_dict = data.to_pydict() - - def record_generator(): - num_rows = len(list(records_dict.values())[0]) - for i in range(num_rows): - yield {col: records_dict[col][i] for col in records_dict.keys()} - - records = record_generator() - - codec_map = { - 'null': 'null', - 'deflate': 'deflate', - 'snappy': 'snappy', - 'bzip2': 'bzip2', - 'xz': 'xz', - 'zstandard': 'zstandard', - 'zstd': 'zstandard', # zstd is commonly used in Paimon - } - compression_lower = compression.lower() + raise NotImplementedError("write_orc must be implemented by FileIO subclasses") + + def write_avro(self, path: str, data, avro_schema=None, + compression: str = 'zstd', zstd_level: int = 1, **kwargs): + raise NotImplementedError("write_avro must be implemented by FileIO subclasses") + + def write_lance(self, path: str, data, **kwargs): + raise NotImplementedError("write_lance must be implemented by FileIO subclasses") + + def write_blob(self, path: str, data, blob_as_descriptor: bool, **kwargs): + """Write Blob format file. Must be implemented by subclasses.""" + raise NotImplementedError("write_blob must be implemented by FileIO subclasses") + + def close(self): + pass + + @staticmethod + def get(path: str, catalog_options: Optional[Options] = None) -> 'FileIO': + """ + Returns a FileIO instance for accessing the file system identified by the given path. + - LocalFileIO for local file system (file:// or no scheme) + - PyArrowFileIO for remote file systems (oss://, s3://, hdfs://, etc.) + """ + from urllib.parse import urlparse - codec = codec_map.get(compression_lower) - if codec is None: - raise ValueError( - f"Unsupported compression '{compression}' for Avro format. " - f"Supported compressions: {', '.join(sorted(codec_map.keys()))}." - ) - - with self.new_output_stream(path) as output_stream: - if codec == 'zstandard': - kwargs['codec_compression_level'] = zstd_level - fastavro.writer(output_stream, avro_schema, records, codec=codec, **kwargs) - - def write_lance(self, path: str, data: pyarrow.Table, **kwargs): - try: - import lance - from pypaimon.read.reader.lance_utils import to_lance_specified - file_path_for_lance, storage_options = to_lance_specified(self, path) - - writer = lance.file.LanceFileWriter( - file_path_for_lance, data.schema, storage_options=storage_options, **kwargs) - try: - # Write all batches - for batch in data.to_batches(): - writer.write_batch(batch) - finally: - writer.close() - except Exception as e: - self.delete_quietly(path) - raise RuntimeError(f"Failed to write Lance file {path}: {e}") from e - - def write_blob(self, path: str, data: pyarrow.Table, blob_as_descriptor: bool, **kwargs): - try: - # Validate input constraints - if data.num_columns != 1: - raise RuntimeError(f"Blob format only supports a single column, got {data.num_columns} columns") - # Check for null values - column = data.column(0) - if column.null_count > 0: - raise RuntimeError("Blob format does not support null values") - # Convert PyArrow schema to Paimon DataFields - # For blob files, we expect exactly one blob column - field = data.schema[0] - if pyarrow.types.is_large_binary(field.type): - fields = [DataField(0, field.name, AtomicType("BLOB"))] - else: - # Convert other types as needed - paimon_type = PyarrowFieldParser.to_paimon_type(field.type, field.nullable) - fields = [DataField(0, field.name, paimon_type)] - # Convert PyArrow Table to records - records_dict = data.to_pydict() - num_rows = data.num_rows - field_name = fields[0].name - with self.new_output_stream(path) as output_stream: - writer = BlobFormatWriter(output_stream) - # Write each row - for i in range(num_rows): - col_data = records_dict[field_name][i] - # Convert to appropriate type based on field type - if hasattr(fields[0].type, 'type') and fields[0].type.type == "BLOB": - if blob_as_descriptor: - blob_descriptor = BlobDescriptor.deserialize(col_data) - uri_reader = self.uri_reader_factory.create(blob_descriptor.uri) - blob_data = Blob.from_descriptor(uri_reader, blob_descriptor) - elif isinstance(col_data, bytes): - blob_data = BlobData(col_data) - else: - # Convert to bytes if needed - if hasattr(col_data, 'as_py'): - col_data = col_data.as_py() - if isinstance(col_data, str): - col_data = col_data.encode('utf-8') - blob_data = BlobData(col_data) - row_values = [blob_data] - else: - row_values = [col_data] - # Create GenericRow and write - row = GenericRow(row_values, fields, RowKind.INSERT) - writer.add_element(row) - writer.close() - - except Exception as e: - self.delete_quietly(path) - raise RuntimeError(f"Failed to write blob file {path}: {e}") from e - - def to_filesystem_path(self, path: str) -> str: - from pyarrow.fs import S3FileSystem - import re - - parsed = urlparse(path) - normalized_path = re.sub(r'/+', '/', parsed.path) if parsed.path else '' - - if parsed.scheme and len(parsed.scheme) == 1 and not parsed.netloc: - # This is likely a Windows drive letter, not a URI scheme - return str(path) - - if parsed.scheme == 'file' and parsed.netloc and parsed.netloc.endswith(':'): - # file://C:/path format - netloc is 'C:', need to reconstruct path with drive letter - drive_letter = parsed.netloc.rstrip(':') - path_part = normalized_path.lstrip('/') - return f"{drive_letter}:/{path_part}" if path_part else f"{drive_letter}:" - - if isinstance(self.filesystem, S3FileSystem): - # For S3, return "bucket/path" format - if parsed.scheme: - if parsed.netloc: - # Has netloc (bucket): return "bucket/path" format - path_part = normalized_path.lstrip('/') - return f"{parsed.netloc}/{path_part}" if path_part else parsed.netloc - else: - # Has scheme but no netloc: return path without scheme - result = normalized_path.lstrip('/') - return result if result else '.' - return str(path) - - if parsed.scheme: - # Handle empty path: return '.' for current directory - if not normalized_path: - return '.' - return normalized_path - - return str(path) + uri = urlparse(path) + scheme = uri.scheme + + if not scheme or scheme == "file": + from pypaimon.filesystem.local_file_io import LocalFileIO + return LocalFileIO(path, catalog_options) + + from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO + return PyArrowFileIO(path, catalog_options or Options({})) diff --git a/paimon-python/pypaimon/common/uri_reader.py b/paimon-python/pypaimon/common/uri_reader.py index b05ce7725321..50718f0849da 100644 --- a/paimon-python/pypaimon/common/uri_reader.py +++ b/paimon-python/pypaimon/common/uri_reader.py @@ -148,7 +148,7 @@ def _new_reader(self, key: UriKey, parsed_uri: ParseResult) -> UriReader: # Import FileIO here to avoid circular imports from pypaimon.common.file_io import FileIO uri_string = parsed_uri.geturl() - file_io = FileIO(uri_string, self.catalog_options) + file_io = FileIO.get(uri_string, self.catalog_options) return UriReader.from_file(file_io) except Exception as e: raise RuntimeError(f"Failed to create reader for URI {parsed_uri.geturl()}") from e diff --git a/paimon-python/pypaimon/filesystem/local_file_io.py b/paimon-python/pypaimon/filesystem/local_file_io.py new file mode 100644 index 000000000000..183eb0ea130d --- /dev/null +++ b/paimon-python/pypaimon/filesystem/local_file_io.py @@ -0,0 +1,439 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import logging +import os +import shutil +import threading +import uuid +from pathlib import Path +from typing import Any, Dict, Optional +from urllib.parse import urlparse + +import pyarrow +import pyarrow.fs + +from pypaimon.common.file_io import FileIO +from pypaimon.common.options import Options +from pypaimon.common.uri_reader import UriReaderFactory +from pypaimon.filesystem.local import PaimonLocalFileSystem +from pypaimon.schema.data_types import DataField, AtomicType, PyarrowFieldParser +from pypaimon.table.row.blob import BlobData, BlobDescriptor, Blob +from pypaimon.table.row.generic_row import GenericRow +from pypaimon.table.row.row_kind import RowKind +from pypaimon.write.blob_format_writer import BlobFormatWriter + + +class LocalFileIO(FileIO): + """ + Local file system implementation of FileIO. + """ + + RENAME_LOCK = threading.Lock() + + def __init__(self, path: str = None, catalog_options: Optional[Options] = None): + self.logger = logging.getLogger(__name__) + self.path = path + self.properties = catalog_options or Options({}) + self.filesystem = PaimonLocalFileSystem() + self.uri_reader_factory = UriReaderFactory(self.properties) + + @staticmethod + def create(): + return LocalFileIO() + + def _to_file(self, path: str) -> Path: + parsed = urlparse(path) + + if parsed.scheme and len(parsed.scheme) == 1 and not parsed.netloc: + return Path(path) + + if parsed.scheme == 'file' and parsed.netloc and parsed.netloc.endswith(':'): + drive_letter = parsed.netloc.rstrip(':') + path_part = parsed.path.lstrip('/') if parsed.path else '' + if path_part: + return Path(f"{drive_letter}:/{path_part}") + else: + return Path(f"{drive_letter}:") + + local_path = parsed.path if parsed.scheme else path + + if not local_path: + return Path(".") + + return Path(local_path) + + def new_input_stream(self, path: str): + if self.logger.isEnabledFor(logging.DEBUG): + self.logger.debug(f"Invoking new_input_stream for {path}") + + file_path = self._to_file(path) + if not file_path.exists(): + raise FileNotFoundError(f"File {path} does not exist") + + return open(file_path, 'rb') + + def new_output_stream(self, path: str): + if self.logger.isEnabledFor(logging.DEBUG): + self.logger.debug(f"Invoking new_output_stream for {path}") + + file_path = self._to_file(path) + # Create parent directories if needed + parent = file_path.parent + if parent and not parent.exists(): + parent.mkdir(parents=True, exist_ok=True) + + return open(file_path, 'wb') + + def get_file_status(self, path: str): + if self.logger.isEnabledFor(logging.DEBUG): + self.logger.debug(f"Invoking get_file_status for {path}") + + file_path = self._to_file(path) + if not file_path.exists(): + import getpass + user = getpass.getuser() + raise FileNotFoundError( + f"File {path} does not exist or the user running " + f"Paimon ('{user}') has insufficient permissions to access it." + ) + + class LocalFileStatus: + def __init__(self, file_path: Path, original_path: str): + stat_info = file_path.stat() + self.path = str(file_path.absolute()) + self.original_path = original_path + self.size = stat_info.st_size if file_path.is_file() else None + self.type = ( + pyarrow.fs.FileType.Directory if file_path.is_dir() + else pyarrow.fs.FileType.File if file_path.is_file() + else pyarrow.fs.FileType.NotFound + ) + self.mtime = stat_info.st_mtime + + return LocalFileStatus(file_path, path) + + def list_status(self, path: str): + if self.logger.isEnabledFor(logging.DEBUG): + self.logger.debug(f"Invoking list_status for {path}") + + file_path = self._to_file(path) + results = [] + + if not file_path.exists(): + return results + + if file_path.is_file(): + results.append(self.get_file_status(path)) + elif file_path.is_dir(): + try: + for item in file_path.iterdir(): + try: + if path.startswith('file://'): + item_path = f"file://{item}" + else: + item_path = str(item) + results.append(self.get_file_status(item_path)) + except FileNotFoundError: + pass + except PermissionError: + pass + + return results + + def exists(self, path: str) -> bool: + if self.logger.isEnabledFor(logging.DEBUG): + self.logger.debug(f"Invoking exists for {path}") + + file_path = self._to_file(path) + return file_path.exists() + + def delete(self, path: str, recursive: bool = False) -> bool: + if self.logger.isEnabledFor(logging.DEBUG): + self.logger.debug(f"Invoking delete for {path}") + + file_path = self._to_file(path) + + if not file_path.exists(): + return False + + if file_path.is_file(): + file_path.unlink() + return True + elif file_path.is_dir(): + if not recursive: + try: + items = list(file_path.iterdir()) + if items: + raise OSError(f"Directory {path} is not empty") + except PermissionError: + raise OSError( + f"Directory {path} does not exist or an I/O error occurred" + ) + file_path.rmdir() + else: + shutil.rmtree(file_path) + return True + + return False + + def mkdirs(self, path: str) -> bool: + if self.logger.isEnabledFor(logging.DEBUG): + self.logger.debug(f"Invoking mkdirs for {path}") + + file_path = self._to_file(path) + + if file_path.is_dir(): + return True + elif file_path.exists() and not file_path.is_dir(): + raise FileExistsError(f"Path exists but is not a directory: {path}") + + file_path.mkdir(parents=True, exist_ok=True) + return True + + def rename(self, src: str, dst: str) -> bool: + if self.logger.isEnabledFor(logging.DEBUG): + self.logger.debug(f"Invoking rename for {src} to {dst}") + + src_file = self._to_file(src) + dst_file = self._to_file(dst) + + dst_parent = dst_file.parent + if dst_parent and not dst_parent.exists(): + dst_parent.mkdir(parents=True, exist_ok=True) + + try: + with LocalFileIO.RENAME_LOCK: + if dst_file.exists(): + if dst_file.is_file(): + return False + # Make it compatible with HadoopFileIO: if dst is an existing directory, + # dst=dst/srcFileName + dst_file = dst_file / src_file.name + if dst_file.exists(): + return False + + # Perform atomic move + src_file.rename(dst_file) + return True + except FileNotFoundError: + return False + except (PermissionError, OSError): + return False + + def try_to_write_atomic(self, path: str, content: str) -> bool: + file_path = self._to_file(path) + if file_path.exists() and file_path.is_dir(): + return False + + parent = file_path.parent + if parent and not parent.exists(): + parent.mkdir(parents=True, exist_ok=True) + + temp_path = file_path.parent / f"{file_path.name}.{uuid.uuid4()}.tmp" + success = False + try: + with open(temp_path, 'w', encoding='utf-8') as f: + f.write(content) + success = self.rename(str(temp_path), path) + finally: + if not success and temp_path.exists(): + self.delete_quietly(str(temp_path)) + return success + + def copy_file(self, source_path: str, target_path: str, overwrite: bool = False): + if not overwrite and self.exists(target_path): + raise FileExistsError(f"Target file {target_path} already exists and overwrite=False") + + source_file = self._to_file(source_path) + target_file = self._to_file(target_path) + + target_parent = target_file.parent + if target_parent and not target_parent.exists(): + target_parent.mkdir(parents=True, exist_ok=True) + + shutil.copy2(source_file, target_file) + + def to_filesystem_path(self, path: str) -> str: + file_path = self._to_file(path) + result = str(file_path) + parsed = urlparse(path) + original_path = parsed.path if parsed.scheme else path + if original_path.startswith('./') and not result.startswith('./'): + result = './' + result + return result + + @staticmethod + def parse_location(location: str): + uri = urlparse(location) + if not uri.scheme: + return "file", uri.netloc, os.path.abspath(location) + elif uri.scheme == "file": + return "file", uri.netloc, uri.path + else: + raise ValueError(f"LocalFileIO only supports file:// scheme, got {uri.scheme}") + + def write_parquet(self, path: str, data: pyarrow.Table, compression: str = 'zstd', + zstd_level: int = 1, **kwargs): + try: + import pyarrow.parquet as pq + + file_path = self._to_file(path) + parent = file_path.parent + if parent and not parent.exists(): + parent.mkdir(parents=True, exist_ok=True) + + with open(file_path, 'wb') as f: + if compression.lower() == 'zstd': + kwargs['compression_level'] = zstd_level + pq.write_table(data, f, compression=compression, **kwargs) + except Exception as e: + self.delete_quietly(path) + raise RuntimeError(f"Failed to write Parquet file {path}: {e}") from e + + def write_orc(self, path: str, data: pyarrow.Table, compression: str = 'zstd', + zstd_level: int = 1, **kwargs): + try: + import sys + import pyarrow.orc as orc + + file_path = self._to_file(path) + parent = file_path.parent + if parent and not parent.exists(): + parent.mkdir(parents=True, exist_ok=True) + + with open(file_path, 'wb') as f: + if sys.version_info[:2] == (3, 6): + orc.write_table(data, f, **kwargs) + else: + orc.write_table(data, f, compression=compression, **kwargs) + except Exception as e: + self.delete_quietly(path) + raise RuntimeError(f"Failed to write ORC file {path}: {e}") from e + + def write_avro(self, path: str, data: pyarrow.Table, + avro_schema: Optional[Dict[str, Any]] = None, + compression: str = 'zstd', zstd_level: int = 1, **kwargs): + import fastavro + if avro_schema is None: + avro_schema = PyarrowFieldParser.to_avro_schema(data.schema) + + records_dict = data.to_pydict() + + def record_generator(): + num_rows = len(list(records_dict.values())[0]) + for i in range(num_rows): + yield {col: records_dict[col][i] for col in records_dict.keys()} + + records = record_generator() + + codec_map = { + 'null': 'null', + 'deflate': 'deflate', + 'snappy': 'snappy', + 'bzip2': 'bzip2', + 'xz': 'xz', + 'zstandard': 'zstandard', + 'zstd': 'zstandard', + } + compression_lower = compression.lower() + + codec = codec_map.get(compression_lower) + if codec is None: + raise ValueError( + f"Unsupported compression '{compression}' for Avro format. " + f"Supported compressions: {', '.join(sorted(codec_map.keys()))}." + ) + + file_path = self._to_file(path) + parent = file_path.parent + if parent and not parent.exists(): + parent.mkdir(parents=True, exist_ok=True) + + with open(file_path, 'wb') as output_stream: + if codec == 'zstandard': + kwargs['codec_compression_level'] = zstd_level + fastavro.writer(output_stream, avro_schema, records, codec=codec, **kwargs) + + def write_lance(self, path: str, data: pyarrow.Table, **kwargs): + try: + import lance + from pypaimon.read.reader.lance_utils import to_lance_specified + file_path_for_lance, storage_options = to_lance_specified(self, path) + + writer = lance.file.LanceFileWriter( + file_path_for_lance, data.schema, storage_options=storage_options, **kwargs) + try: + for batch in data.to_batches(): + writer.write_batch(batch) + finally: + writer.close() + except Exception as e: + self.delete_quietly(path) + raise RuntimeError(f"Failed to write Lance file {path}: {e}") from e + + def write_blob(self, path: str, data: pyarrow.Table, blob_as_descriptor: bool, **kwargs): + try: + if data.num_columns != 1: + raise RuntimeError(f"Blob format only supports a single column, got {data.num_columns} columns") + + column = data.column(0) + if column.null_count > 0: + raise RuntimeError("Blob format does not support null values") + + field = data.schema[0] + if pyarrow.types.is_large_binary(field.type): + fields = [DataField(0, field.name, AtomicType("BLOB"))] + else: + paimon_type = PyarrowFieldParser.to_paimon_type(field.type, field.nullable) + fields = [DataField(0, field.name, paimon_type)] + + records_dict = data.to_pydict() + num_rows = data.num_rows + field_name = fields[0].name + + file_path = self._to_file(path) + parent = file_path.parent + if parent and not parent.exists(): + parent.mkdir(parents=True, exist_ok=True) + + with open(file_path, 'wb') as output_stream: + writer = BlobFormatWriter(output_stream) + for i in range(num_rows): + col_data = records_dict[field_name][i] + if hasattr(fields[0].type, 'type') and fields[0].type.type == "BLOB": + if blob_as_descriptor: + blob_descriptor = BlobDescriptor.deserialize(col_data) + uri_reader = self.uri_reader_factory.create(blob_descriptor.uri) + blob_data = Blob.from_descriptor(uri_reader, blob_descriptor) + elif isinstance(col_data, bytes): + blob_data = BlobData(col_data) + else: + if hasattr(col_data, 'as_py'): + col_data = col_data.as_py() + if isinstance(col_data, str): + col_data = col_data.encode('utf-8') + blob_data = BlobData(col_data) + row_values = [blob_data] + else: + row_values = [col_data] + row = GenericRow(row_values, fields, RowKind.INSERT) + writer.add_element(row) + writer.close() + except Exception as e: + self.delete_quietly(path) + raise RuntimeError(f"Failed to write blob file {path}: {e}") from e diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py new file mode 100644 index 000000000000..a2b83ed2cc2f --- /dev/null +++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py @@ -0,0 +1,505 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import logging +import os +import subprocess +import uuid +from pathlib import Path +from typing import Any, Dict, List, Optional +from urllib.parse import splitport, urlparse + +import pyarrow +from packaging.version import parse +from pyarrow._fs import FileSystem + +from pypaimon.common.file_io import FileIO +from pypaimon.common.options import Options +from pypaimon.common.options.config import OssOptions, S3Options +from pypaimon.common.uri_reader import UriReaderFactory +from pypaimon.schema.data_types import DataField, AtomicType, PyarrowFieldParser +from pypaimon.table.row.blob import BlobData, BlobDescriptor, Blob +from pypaimon.table.row.generic_row import GenericRow +from pypaimon.table.row.row_kind import RowKind +from pypaimon.write.blob_format_writer import BlobFormatWriter + + +class PyArrowFileIO(FileIO): + def __init__(self, path: str, catalog_options: Options): + self.properties = catalog_options + self.logger = logging.getLogger(__name__) + scheme, netloc, _ = self.parse_location(path) + self.uri_reader_factory = UriReaderFactory(catalog_options) + if scheme in {"oss"}: + self.filesystem = self._initialize_oss_fs(path) + elif scheme in {"s3", "s3a", "s3n"}: + self.filesystem = self._initialize_s3_fs() + elif scheme in {"hdfs", "viewfs"}: + self.filesystem = self._initialize_hdfs_fs(scheme, netloc) + else: + raise ValueError(f"Unrecognized filesystem type in URI: {scheme}") + + @staticmethod + def parse_location(location: str): + uri = urlparse(location) + if not uri.scheme: + return "file", uri.netloc, os.path.abspath(location) + elif uri.scheme in ("hdfs", "viewfs"): + return uri.scheme, uri.netloc, uri.path + else: + return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}" + + @staticmethod + def _create_s3_retry_config( + max_attempts: int = 10, + request_timeout: int = 60, + connect_timeout: int = 60 + ) -> Dict[str, Any]: + if parse(pyarrow.__version__) >= parse("8.0.0"): + config = { + 'request_timeout': request_timeout, + 'connect_timeout': connect_timeout + } + try: + from pyarrow.fs import AwsStandardS3RetryStrategy + retry_strategy = AwsStandardS3RetryStrategy(max_attempts=max_attempts) + config['retry_strategy'] = retry_strategy + except ImportError: + pass + return config + else: + return {} + + def _extract_oss_bucket(self, location) -> str: + uri = urlparse(location) + if uri.scheme and uri.scheme != "oss": + raise ValueError("Not an OSS URI: {}".format(location)) + + netloc = uri.netloc or "" + if (getattr(uri, "username", None) or getattr(uri, "password", None)) or ("@" in netloc): + first_segment = uri.path.lstrip("/").split("/", 1)[0] + if not first_segment: + raise ValueError("Invalid OSS URI without bucket: {}".format(location)) + return first_segment + + host = getattr(uri, "hostname", None) or netloc + if not host: + raise ValueError("Invalid OSS URI without host: {}".format(location)) + bucket = host.split(".", 1)[0] + if not bucket: + raise ValueError("Invalid OSS URI without bucket: {}".format(location)) + return bucket + + def _initialize_oss_fs(self, path) -> FileSystem: + from pyarrow.fs import S3FileSystem + + client_kwargs = { + "access_key": self.properties.get(OssOptions.OSS_ACCESS_KEY_ID), + "secret_key": self.properties.get(OssOptions.OSS_ACCESS_KEY_SECRET), + "session_token": self.properties.get(OssOptions.OSS_SECURITY_TOKEN), + "region": self.properties.get(OssOptions.OSS_REGION), + } + + if parse(pyarrow.__version__) >= parse("7.0.0"): + client_kwargs['force_virtual_addressing'] = True + client_kwargs['endpoint_override'] = self.properties.get(OssOptions.OSS_ENDPOINT) + else: + oss_bucket = self._extract_oss_bucket(path) + client_kwargs['endpoint_override'] = (oss_bucket + "." + + self.properties.get(OssOptions.OSS_ENDPOINT)) + + retry_config = self._create_s3_retry_config() + client_kwargs.update(retry_config) + + return S3FileSystem(**client_kwargs) + + def _initialize_s3_fs(self) -> FileSystem: + from pyarrow.fs import S3FileSystem + + client_kwargs = { + "endpoint_override": self.properties.get(S3Options.S3_ENDPOINT), + "access_key": self.properties.get(S3Options.S3_ACCESS_KEY_ID), + "secret_key": self.properties.get(S3Options.S3_ACCESS_KEY_SECRET), + "session_token": self.properties.get(S3Options.S3_SECURITY_TOKEN), + "region": self.properties.get(S3Options.S3_REGION), + "force_virtual_addressing": True, + } + + retry_config = self._create_s3_retry_config() + client_kwargs.update(retry_config) + + return S3FileSystem(**client_kwargs) + + def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: + from pyarrow.fs import HadoopFileSystem + + if 'HADOOP_HOME' not in os.environ: + raise RuntimeError("HADOOP_HOME environment variable is not set.") + if 'HADOOP_CONF_DIR' not in os.environ: + raise RuntimeError("HADOOP_CONF_DIR environment variable is not set.") + + hadoop_home = os.environ.get("HADOOP_HOME") + native_lib_path = f"{hadoop_home}/lib/native" + os.environ['LD_LIBRARY_PATH'] = f"{native_lib_path}:{os.environ.get('LD_LIBRARY_PATH', '')}" + + class_paths = subprocess.run( + [f'{hadoop_home}/bin/hadoop', 'classpath', '--glob'], + capture_output=True, + text=True, + check=True + ) + os.environ['CLASSPATH'] = class_paths.stdout.strip() + + host, port_str = splitport(netloc) + return HadoopFileSystem( + host=host, + port=int(port_str), + user=os.environ.get('HADOOP_USER_NAME', 'hadoop') + ) + + def new_input_stream(self, path: str): + path_str = self.to_filesystem_path(path) + return self.filesystem.open_input_file(path_str) + + def new_output_stream(self, path: str): + path_str = self.to_filesystem_path(path) + parent_dir = Path(path_str).parent + if str(parent_dir) and not self.exists(str(parent_dir)): + self.mkdirs(str(parent_dir)) + + return self.filesystem.open_output_stream(path_str) + + def get_file_status(self, path: str): + path_str = self.to_filesystem_path(path) + file_infos = self.filesystem.get_file_info([path_str]) + file_info = file_infos[0] + + if file_info.type == pyarrow.fs.FileType.NotFound: + raise FileNotFoundError(f"File {path} (resolved as {path_str}) does not exist") + + return file_info + + def list_status(self, path: str): + path_str = self.to_filesystem_path(path) + selector = pyarrow.fs.FileSelector(path_str, recursive=False, allow_not_found=True) + return self.filesystem.get_file_info(selector) + + def list_directories(self, path: str): + file_infos = self.list_status(path) + return [info for info in file_infos if info.type == pyarrow.fs.FileType.Directory] + + def exists(self, path: str) -> bool: + path_str = self.to_filesystem_path(path) + file_info = self.filesystem.get_file_info([path_str])[0] + return file_info.type != pyarrow.fs.FileType.NotFound + + def delete(self, path: str, recursive: bool = False) -> bool: + path_str = self.to_filesystem_path(path) + file_info = self.filesystem.get_file_info([path_str])[0] + + if file_info.type == pyarrow.fs.FileType.NotFound: + return False + + if file_info.type == pyarrow.fs.FileType.Directory: + if not recursive: + selector = pyarrow.fs.FileSelector(path_str, recursive=False, allow_not_found=True) + dir_contents = self.filesystem.get_file_info(selector) + if len(dir_contents) > 0: + raise OSError(f"Directory {path} is not empty") + if recursive: + self.filesystem.delete_dir_contents(path_str) + self.filesystem.delete_dir(path_str) + else: + self.filesystem.delete_dir(path_str) + else: + self.filesystem.delete_file(path_str) + return True + + def mkdirs(self, path: str) -> bool: + path_str = self.to_filesystem_path(path) + file_info = self.filesystem.get_file_info([path_str])[0] + + if file_info.type == pyarrow.fs.FileType.Directory: + return True + elif file_info.type == pyarrow.fs.FileType.File: + raise FileExistsError(f"Path exists but is not a directory: {path}") + + self.filesystem.create_dir(path_str, recursive=True) + return True + + def rename(self, src: str, dst: str) -> bool: + dst_str = self.to_filesystem_path(dst) + dst_parent = Path(dst_str).parent + if str(dst_parent) and not self.exists(str(dst_parent)): + self.mkdirs(str(dst_parent)) + + src_str = self.to_filesystem_path(src) + + try: + if hasattr(self.filesystem, 'rename'): + return self.filesystem.rename(src_str, dst_str) + + dst_file_info = self.filesystem.get_file_info([dst_str])[0] + if dst_file_info.type != pyarrow.fs.FileType.NotFound: + if dst_file_info.type == pyarrow.fs.FileType.File: + return False + # Make it compatible with HadoopFileIO: if dst is an existing directory, + # dst=dst/srcFileName + src_name = Path(src_str).name + dst_str = str(Path(dst_str) / src_name) + final_dst_info = self.filesystem.get_file_info([dst_str])[0] + if final_dst_info.type != pyarrow.fs.FileType.NotFound: + return False + + self.filesystem.move(src_str, dst_str) + return True + except FileNotFoundError: + return False + except (PermissionError, OSError): + return False + + def delete_quietly(self, path: str): + if self.logger.isEnabledFor(logging.DEBUG): + self.logger.debug(f"Ready to delete {path}") + + try: + if not self.delete(path, False) and self.exists(path): + self.logger.warning(f"Failed to delete file {path}") + except Exception: + self.logger.warning(f"Exception occurs when deleting file {path}", exc_info=True) + + def delete_files_quietly(self, files: List[str]): + for file_path in files: + self.delete_quietly(file_path) + + def delete_directory_quietly(self, directory: str): + if self.logger.isEnabledFor(logging.DEBUG): + self.logger.debug(f"Ready to delete {directory}") + + try: + if not self.delete(directory, True) and self.exists(directory): + self.logger.warning(f"Failed to delete directory {directory}") + except Exception: + self.logger.warning(f"Exception occurs when deleting directory {directory}", exc_info=True) + + def try_to_write_atomic(self, path: str, content: str) -> bool: + if self.exists(path): + path_str = self.to_filesystem_path(path) + file_info = self.filesystem.get_file_info([path_str])[0] + if file_info.type == pyarrow.fs.FileType.Directory: + return False + + temp_path = path + str(uuid.uuid4()) + ".tmp" + success = False + try: + self.write_file(temp_path, content, False) + success = self.rename(temp_path, path) + finally: + if not success: + self.delete_quietly(temp_path) + return success + + def copy_file(self, source_path: str, target_path: str, overwrite: bool = False): + if not overwrite and self.exists(target_path): + raise FileExistsError(f"Target file {target_path} already exists and overwrite=False") + + source_str = self.to_filesystem_path(source_path) + target_str = self.to_filesystem_path(target_path) + target_parent = Path(target_str).parent + + if str(target_parent) and not self.exists(str(target_parent)): + self.mkdirs(str(target_parent)) + + self.filesystem.copy_file(source_str, target_str) + + def write_parquet(self, path: str, data: pyarrow.Table, compression: str = 'zstd', + zstd_level: int = 1, **kwargs): + try: + import pyarrow.parquet as pq + + with self.new_output_stream(path) as output_stream: + if compression.lower() == 'zstd': + kwargs['compression_level'] = zstd_level + pq.write_table(data, output_stream, compression=compression, **kwargs) + + except Exception as e: + self.delete_quietly(path) + raise RuntimeError(f"Failed to write Parquet file {path}: {e}") from e + + def write_orc(self, path: str, data: pyarrow.Table, compression: str = 'zstd', + zstd_level: int = 1, **kwargs): + try: + """Write ORC file using PyArrow ORC writer. + + Note: PyArrow's ORC writer doesn't support compression_level parameter. + ORC files will use zstd compression with default level + (which is 3, see https://github.com/facebook/zstd/blob/dev/programs/zstdcli.c) + instead of the specified level. + """ + import sys + import pyarrow.orc as orc + + with self.new_output_stream(path) as output_stream: + # Check Python version - if 3.6, don't use compression parameter + if sys.version_info[:2] == (3, 6): + orc.write_table(data, output_stream, **kwargs) + else: + orc.write_table( + data, + output_stream, + compression=compression, + **kwargs + ) + + except Exception as e: + self.delete_quietly(path) + raise RuntimeError(f"Failed to write ORC file {path}: {e}") from e + + def write_avro( + self, path: str, data: pyarrow.Table, + avro_schema: Optional[Dict[str, Any]] = None, + compression: str = 'zstd', zstd_level: int = 1, **kwargs): + import fastavro + if avro_schema is None: + from pypaimon.schema.data_types import PyarrowFieldParser + avro_schema = PyarrowFieldParser.to_avro_schema(data.schema) + + records_dict = data.to_pydict() + + def record_generator(): + num_rows = len(list(records_dict.values())[0]) + for i in range(num_rows): + yield {col: records_dict[col][i] for col in records_dict.keys()} + + records = record_generator() + + codec_map = { + 'null': 'null', + 'deflate': 'deflate', + 'snappy': 'snappy', + 'bzip2': 'bzip2', + 'xz': 'xz', + 'zstandard': 'zstandard', + 'zstd': 'zstandard', # zstd is commonly used in Paimon + } + compression_lower = compression.lower() + + codec = codec_map.get(compression_lower) + if codec is None: + raise ValueError( + f"Unsupported compression '{compression}' for Avro format. " + f"Supported compressions: {', '.join(sorted(codec_map.keys()))}." + ) + + with self.new_output_stream(path) as output_stream: + if codec == 'zstandard': + kwargs['codec_compression_level'] = zstd_level + fastavro.writer(output_stream, avro_schema, records, codec=codec, **kwargs) + + def write_lance(self, path: str, data: pyarrow.Table, **kwargs): + try: + import lance + from pypaimon.read.reader.lance_utils import to_lance_specified + file_path_for_lance, storage_options = to_lance_specified(self, path) + + writer = lance.file.LanceFileWriter( + file_path_for_lance, data.schema, storage_options=storage_options, **kwargs) + try: + # Write all batches + for batch in data.to_batches(): + writer.write_batch(batch) + finally: + writer.close() + except Exception as e: + self.delete_quietly(path) + raise RuntimeError(f"Failed to write Lance file {path}: {e}") from e + + def write_blob(self, path: str, data: pyarrow.Table, blob_as_descriptor: bool, **kwargs): + try: + if data.num_columns != 1: + raise RuntimeError(f"Blob format only supports a single column, got {data.num_columns} columns") + column = data.column(0) + if column.null_count > 0: + raise RuntimeError("Blob format does not support null values") + field = data.schema[0] + if pyarrow.types.is_large_binary(field.type): + fields = [DataField(0, field.name, AtomicType("BLOB"))] + else: + paimon_type = PyarrowFieldParser.to_paimon_type(field.type, field.nullable) + fields = [DataField(0, field.name, paimon_type)] + records_dict = data.to_pydict() + num_rows = data.num_rows + field_name = fields[0].name + with self.new_output_stream(path) as output_stream: + writer = BlobFormatWriter(output_stream) + for i in range(num_rows): + col_data = records_dict[field_name][i] + if hasattr(fields[0].type, 'type') and fields[0].type.type == "BLOB": + if blob_as_descriptor: + blob_descriptor = BlobDescriptor.deserialize(col_data) + uri_reader = self.uri_reader_factory.create(blob_descriptor.uri) + blob_data = Blob.from_descriptor(uri_reader, blob_descriptor) + elif isinstance(col_data, bytes): + blob_data = BlobData(col_data) + else: + if hasattr(col_data, 'as_py'): + col_data = col_data.as_py() + if isinstance(col_data, str): + col_data = col_data.encode('utf-8') + blob_data = BlobData(col_data) + row_values = [blob_data] + else: + row_values = [col_data] + row = GenericRow(row_values, fields, RowKind.INSERT) + writer.add_element(row) + writer.close() + + except Exception as e: + self.delete_quietly(path) + raise RuntimeError(f"Failed to write blob file {path}: {e}") from e + + def to_filesystem_path(self, path: str) -> str: + from pyarrow.fs import S3FileSystem + import re + + parsed = urlparse(path) + normalized_path = re.sub(r'/+', '/', parsed.path) if parsed.path else '' + + if parsed.scheme and len(parsed.scheme) == 1 and not parsed.netloc: + return str(path) + + if parsed.scheme == 'file' and parsed.netloc and parsed.netloc.endswith(':'): + drive_letter = parsed.netloc.rstrip(':') + path_part = normalized_path.lstrip('/') + return f"{drive_letter}:/{path_part}" if path_part else f"{drive_letter}:" + + if isinstance(self.filesystem, S3FileSystem): + if parsed.scheme: + if parsed.netloc: + path_part = normalized_path.lstrip('/') + return f"{parsed.netloc}/{path_part}" if path_part else parsed.netloc + else: + result = normalized_path.lstrip('/') + return result if result else '.' + return str(path) + + if parsed.scheme: + if not normalized_path: + return '.' + return normalized_path + + return str(path) diff --git a/paimon-python/pypaimon/read/reader/lance_utils.py b/paimon-python/pypaimon/read/reader/lance_utils.py index 60c7763aa3c2..c219dc67043f 100644 --- a/paimon-python/pypaimon/read/reader/lance_utils.py +++ b/paimon-python/pypaimon/read/reader/lance_utils.py @@ -26,6 +26,9 @@ def to_lance_specified(file_io: FileIO, file_path: str) -> Tuple[str, Optional[Dict[str, str]]]: """Convert path and extract storage options for Lance format.""" + if hasattr(file_io, 'file_io'): + file_io = file_io.file_io() + scheme, _, _ = file_io.parse_location(file_path) storage_options = None file_path_for_lance = file_io.to_filesystem_path(file_path) diff --git a/paimon-python/pypaimon/sample/rest_catalog_blob_as_descriptor_sample.py b/paimon-python/pypaimon/sample/rest_catalog_blob_as_descriptor_sample.py index 802d1def1023..2770fad136a9 100644 --- a/paimon-python/pypaimon/sample/rest_catalog_blob_as_descriptor_sample.py +++ b/paimon-python/pypaimon/sample/rest_catalog_blob_as_descriptor_sample.py @@ -23,8 +23,8 @@ from pypaimon import Schema from pypaimon.table.row.blob import BlobDescriptor, Blob -from pypaimon.common.file_io import FileIO from pypaimon.common.options import Options +from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO def write_table_with_blob(catalog, video_file_path: str, external_oss_options: dict): @@ -66,7 +66,7 @@ def write_table_with_blob(catalog, video_file_path: str, external_oss_options: d # Access external OSS file to get file size try: - external_file_io = FileIO(video_file_path, Options(external_oss_options)) + external_file_io = PyArrowFileIO(video_file_path, Options(external_oss_options)) video_file_size = external_file_io.get_file_size(video_file_path) except Exception as e: raise FileNotFoundError( diff --git a/paimon-python/pypaimon/table/row/blob.py b/paimon-python/pypaimon/table/row/blob.py index b92ddb3e8281..a9a2e2fd490b 100644 --- a/paimon-python/pypaimon/table/row/blob.py +++ b/paimon-python/pypaimon/table/row/blob.py @@ -155,7 +155,7 @@ def from_local(file: str) -> 'Blob': file_uri = file else: file_uri = f"file://{file}" - file_io = FileIO(file_uri, {}) + file_io = FileIO.get(file_uri, {}) uri_reader = FileUriReader(file_io) descriptor = BlobDescriptor(file, 0, -1) return Blob.from_descriptor(uri_reader, descriptor) diff --git a/paimon-python/pypaimon/tests/blob_test.py b/paimon-python/pypaimon/tests/blob_test.py index e12d031c7abe..91808bc01413 100644 --- a/paimon-python/pypaimon/tests/blob_test.py +++ b/paimon-python/pypaimon/tests/blob_test.py @@ -25,6 +25,7 @@ from pypaimon import CatalogFactory from pypaimon.common.file_io import FileIO +from pypaimon.filesystem.local_file_io import LocalFileIO from pypaimon.common.options import Options from pypaimon.read.reader.format_blob_reader import FormatBlobReader from pypaimon.schema.data_types import AtomicType, DataField @@ -108,7 +109,7 @@ def test_from_local(self): def test_from_file_with_offset_and_length(self): """Test Blob.from_file() method with offset and length.""" - file_io = FileIO(self.file if self.file.startswith('file://') else f"file://{self.file}", Options({})) + file_io = LocalFileIO(self.file if self.file.startswith('file://') else f"file://{self.file}", Options({})) blob = Blob.from_file(file_io, self.file, 0, 4) # Verify it returns a BlobRef instance @@ -204,7 +205,7 @@ def test_blob_factory_methods_return_correct_types(self): self.assertIsInstance(blob_ref, Blob) # from_file should return BlobRef - file_io = FileIO(self.file if self.file.startswith('file://') else f"file://{self.file}", Options({})) + file_io = LocalFileIO(self.file if self.file.startswith('file://') else f"file://{self.file}", Options({})) blob_file = Blob.from_file(file_io, self.file, 0, os.path.getsize(self.file)) self.assertIsInstance(blob_file, BlobRef) self.assertIsInstance(blob_file, Blob) @@ -563,7 +564,7 @@ def tearDown(self): def test_blob_end_to_end(self): # Set up file I/O - file_io = FileIO(self.temp_dir, Options({})) + file_io = LocalFileIO(self.temp_dir, Options({})) blob_field_name = "blob_field" # ========== Step 1: Check Type Validation ========== @@ -612,12 +613,11 @@ def test_blob_complex_types_throw_exception(self): """Test that complex types containing BLOB elements throw exceptions during read/write operations.""" from pypaimon.schema.data_types import DataField, AtomicType, ArrayType, MultisetType, MapType from pypaimon.table.row.blob import BlobData - from pypaimon.common.file_io import FileIO from pypaimon.table.row.generic_row import GenericRow, GenericRowSerializer from pypaimon.table.row.row_kind import RowKind # Set up file I/O - file_io = FileIO(self.temp_dir, Options({})) + file_io = LocalFileIO(self.temp_dir, Options({})) # ========== Test ArrayType(nullable=True, element_type=AtomicType("BLOB")) ========== array_fields = [ @@ -755,11 +755,10 @@ def test_blob_complex_types_throw_exception(self): def test_blob_advanced_scenarios(self): """Test advanced blob scenarios: corruption, truncation, zero-length, large blobs, compression, cross-format.""" from pypaimon.schema.data_types import DataField, AtomicType - from pypaimon.common.file_io import FileIO from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor # Set up file I/O - file_io = FileIO(self.temp_dir, Options({})) + file_io = LocalFileIO(self.temp_dir, Options({})) # ========== Test 1: Corrupted file header test ========== @@ -999,7 +998,7 @@ def test_blob_advanced_scenarios(self): def test_blob_end_to_end_with_descriptor(self): # Set up file I/O - file_io = FileIO(self.temp_dir, Options({})) + file_io = LocalFileIO(self.temp_dir, Options({})) # ========== Step 1: Write data to local file ========== # Create test data and write it to a local file diff --git a/paimon-python/pypaimon/tests/file_io_test.py b/paimon-python/pypaimon/tests/file_io_test.py index ec5d6c4e829a..6cdf713b0c4f 100644 --- a/paimon-python/pypaimon/tests/file_io_test.py +++ b/paimon-python/pypaimon/tests/file_io_test.py @@ -23,9 +23,11 @@ from unittest.mock import MagicMock, patch import pyarrow -from pyarrow.fs import S3FileSystem, LocalFileSystem +from pyarrow.fs import S3FileSystem -from pypaimon.common.file_io import FileIO +from pypaimon.common.options import Options +from pypaimon.filesystem.local_file_io import LocalFileIO +from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO class FileIOTest(unittest.TestCase): @@ -33,7 +35,7 @@ class FileIOTest(unittest.TestCase): def test_s3_filesystem_path_conversion(self): """Test S3FileSystem path conversion with various formats.""" - file_io = FileIO("s3://bucket/warehouse", {}) + file_io = PyArrowFileIO("s3://bucket/warehouse", Options({})) self.assertIsInstance(file_io.filesystem, S3FileSystem) # Test bucket and path @@ -64,9 +66,8 @@ def test_s3_filesystem_path_conversion(self): self.assertEqual(file_io.to_filesystem_path(parent_str), parent_str) def test_local_filesystem_path_conversion(self): - """Test LocalFileSystem path conversion with various formats.""" - file_io = FileIO("file:///tmp/warehouse", {}) - self.assertIsInstance(file_io.filesystem, LocalFileSystem) + file_io = LocalFileIO("file:///tmp/warehouse", Options({})) + self.assertIsInstance(file_io, LocalFileIO) # Test file:// scheme self.assertEqual(file_io.to_filesystem_path("file:///tmp/path/to/file.txt"), @@ -93,11 +94,9 @@ def test_local_filesystem_path_conversion(self): self.assertEqual(file_io.to_filesystem_path(parent_str), parent_str) def test_windows_path_handling(self): - """Test Windows path handling (drive letters, file:// scheme).""" - file_io = FileIO("file:///tmp/warehouse", {}) - self.assertIsInstance(file_io.filesystem, LocalFileSystem) + file_io = LocalFileIO("file:///tmp/warehouse", Options({})) + self.assertIsInstance(file_io, LocalFileIO) - # Windows absolute paths self.assertEqual(file_io.to_filesystem_path("C:\\path\\to\\file.txt"), "C:\\path\\to\\file.txt") self.assertEqual(file_io.to_filesystem_path("C:/path/to/file.txt"), @@ -113,17 +112,17 @@ def test_windows_path_handling(self): "/C:/path/to/file.txt") # Windows path with S3FileSystem (should preserve) - s3_file_io = FileIO("s3://bucket/warehouse", {}) + s3_file_io = PyArrowFileIO("s3://bucket/warehouse", Options({})) self.assertEqual(s3_file_io.to_filesystem_path("C:\\path\\to\\file.txt"), "C:\\path\\to\\file.txt") def test_path_normalization(self): """Test path normalization (multiple slashes).""" - file_io = FileIO("file:///tmp/warehouse", {}) + file_io = LocalFileIO("file:///tmp/warehouse", Options({})) self.assertEqual(file_io.to_filesystem_path("file://///tmp///path///file.txt"), "/tmp/path/file.txt") - s3_file_io = FileIO("s3://bucket/warehouse", {}) + s3_file_io = PyArrowFileIO("s3://bucket/warehouse", Options({})) self.assertEqual(s3_file_io.to_filesystem_path("s3://my-bucket///path///to///file.txt"), "my-bucket/path/to/file.txt") @@ -131,7 +130,7 @@ def test_write_file_with_overwrite_flag(self): temp_dir = tempfile.mkdtemp(prefix="file_io_write_test_") try: warehouse_path = f"file://{temp_dir}" - file_io = FileIO(warehouse_path, {}) + file_io = LocalFileIO(warehouse_path, Options({})) test_file_uri = f"file://{temp_dir}/overwrite_test.txt" expected_path = os.path.join(temp_dir, "overwrite_test.txt") @@ -161,7 +160,7 @@ def test_exists_does_not_catch_exception(self): temp_dir = tempfile.mkdtemp(prefix="file_io_exists_test_") try: warehouse_path = f"file://{temp_dir}" - file_io = FileIO(warehouse_path, {}) + file_io = LocalFileIO(warehouse_path, Options({})) test_file = os.path.join(temp_dir, "test_file.txt") with open(test_file, "w") as f: @@ -169,35 +168,48 @@ def test_exists_does_not_catch_exception(self): self.assertTrue(file_io.exists(f"file://{test_file}")) self.assertFalse(file_io.exists(f"file://{temp_dir}/nonexistent.txt")) - mock_filesystem = MagicMock() - mock_filesystem.get_file_info.side_effect = OSError("Permission denied") - file_io.filesystem = mock_filesystem + mock_path = MagicMock(spec=Path) + mock_path.exists.side_effect = OSError("Permission denied") + with patch.object(file_io, '_to_file', return_value=mock_path): + with self.assertRaises(OSError) as context: + file_io.exists("file:///some/path") + self.assertIn("Permission denied", str(context.exception)) - with self.assertRaises(OSError) as context: - file_io.exists("file:///some/path") - self.assertIn("Permission denied", str(context.exception)) - - with self.assertRaises(OSError): - file_io.new_output_stream("file:///some/path/file.txt") - - with self.assertRaises(OSError): - file_io.check_or_mkdirs("file:///some/path") - - with self.assertRaises(OSError): - file_io.write_file("file:///some/path", "content", overwrite=False) - - with self.assertRaises(OSError): - file_io.copy_file("file:///src", "file:///dst", overwrite=False) + with patch('builtins.open', side_effect=OSError("Permission denied")): + with self.assertRaises(OSError): + file_io.new_output_stream("file:///some/path/file.txt") - with patch.object(file_io, 'read_file_utf8', side_effect=Exception("Read error")): + mock_path = MagicMock(spec=Path) + mock_path.is_dir.side_effect = OSError("Permission denied") + with patch.object(file_io, '_to_file', return_value=mock_path): with self.assertRaises(OSError): - file_io.read_overwritten_file_utf8("file:///some/path") + file_io.check_or_mkdirs("file:///some/path") - mock_filesystem.get_file_info.side_effect = OSError("Network error") - file_io.filesystem = mock_filesystem + with patch('builtins.open', side_effect=OSError("Permission denied")): + with self.assertRaises(OSError): + file_io.write_file("file:///some/path", "content", overwrite=False) - with self.assertRaises(OSError): - file_io.rename("file:///src", "file:///dst") + with patch('builtins.open', side_effect=OSError("Permission denied")): + with self.assertRaises(OSError): + file_io.copy_file("file:///src", "file:///dst", overwrite=False) + + with patch.object(file_io, 'exists', return_value=True): + with patch.object(file_io, 'read_file_utf8', side_effect=OSError("Read error")): + with self.assertRaises(OSError) as context: + file_io.read_overwritten_file_utf8("file:///some/path") + self.assertIn("Read error", str(context.exception)) + + # rename() catches OSError and returns False (consistent with Java implementation) + mock_src_path = MagicMock(spec=Path) + mock_dst_path = MagicMock(spec=Path) + mock_dst_path.parent = MagicMock() + mock_dst_path.parent.exists.return_value = True + mock_dst_path.exists.return_value = False + mock_src_path.rename.side_effect = OSError("Network error") + with patch.object(file_io, '_to_file', side_effect=[mock_src_path, mock_dst_path]): + # rename() catches OSError and returns False, doesn't raise + result = file_io.rename("file:///src", "file:///dst") + self.assertFalse(result, "rename() should return False when OSError occurs") file_io.delete_quietly("file:///some/path") file_io.delete_directory_quietly("file:///some/path") @@ -208,7 +220,7 @@ def test_delete_non_empty_directory_raises_error(self): temp_dir = tempfile.mkdtemp(prefix="file_io_delete_test_") try: warehouse_path = f"file://{temp_dir}" - file_io = FileIO(warehouse_path, {}) + file_io = LocalFileIO(warehouse_path, Options({})) test_dir = os.path.join(temp_dir, "test_dir") os.makedirs(test_dir) @@ -226,7 +238,7 @@ def test_delete_returns_false_when_file_not_exists(self): temp_dir = tempfile.mkdtemp(prefix="file_io_delete_test_") try: warehouse_path = f"file://{temp_dir}" - file_io = FileIO(warehouse_path, {}) + file_io = LocalFileIO(warehouse_path, Options({})) result = file_io.delete(f"file://{temp_dir}/nonexistent.txt") self.assertFalse(result, "delete() should return False when file does not exist") @@ -240,7 +252,7 @@ def test_mkdirs_raises_error_when_path_is_file(self): temp_dir = tempfile.mkdtemp(prefix="file_io_mkdirs_test_") try: warehouse_path = f"file://{temp_dir}" - file_io = FileIO(warehouse_path, {}) + file_io = LocalFileIO(warehouse_path, Options({})) test_file = os.path.join(temp_dir, "test_file.txt") with open(test_file, "w") as f: @@ -256,7 +268,7 @@ def test_rename_returns_false_when_dst_exists(self): temp_dir = tempfile.mkdtemp(prefix="file_io_rename_test_") try: warehouse_path = f"file://{temp_dir}" - file_io = FileIO(warehouse_path, {}) + file_io = LocalFileIO(warehouse_path, Options({})) src_file = os.path.join(temp_dir, "src.txt") dst_file = os.path.join(temp_dir, "dst.txt") @@ -274,7 +286,7 @@ def test_get_file_status_raises_error_when_file_not_exists(self): temp_dir = tempfile.mkdtemp(prefix="file_io_get_file_status_test_") try: warehouse_path = f"file://{temp_dir}" - file_io = FileIO(warehouse_path, {}) + file_io = LocalFileIO(warehouse_path, Options({})) with self.assertRaises(FileNotFoundError) as context: file_io.get_file_status(f"file://{temp_dir}/nonexistent.txt") @@ -302,7 +314,7 @@ def test_copy_file(self): temp_dir = tempfile.mkdtemp(prefix="file_io_copy_test_") try: warehouse_path = f"file://{temp_dir}" - file_io = FileIO(warehouse_path, {}) + file_io = LocalFileIO(warehouse_path, Options({})) source_file = os.path.join(temp_dir, "source.txt") target_file = os.path.join(temp_dir, "target.txt") @@ -338,22 +350,29 @@ def test_copy_file(self): def test_try_to_write_atomic(self): temp_dir = tempfile.mkdtemp(prefix="file_io_try_write_atomic_test_") try: - warehouse_path = f"file://{temp_dir}" - file_io = FileIO(warehouse_path, {}) - target_dir = os.path.join(temp_dir, "target_dir") + normal_file = os.path.join(temp_dir, "normal_file.txt") + + from pypaimon.filesystem.local_file_io import LocalFileIO + local_file_io = LocalFileIO(f"file://{temp_dir}", Options({})) os.makedirs(target_dir) + self.assertFalse( + local_file_io.try_to_write_atomic(f"file://{target_dir}", "test content"), + "LocalFileIO should return False when target is a directory") + self.assertEqual(len(os.listdir(target_dir)), 0, "No file should be created inside the directory") - result = file_io.try_to_write_atomic(f"file://{target_dir}", "test content") - self.assertFalse(result, "try_to_write_atomic should return False when target is a directory") + self.assertTrue(local_file_io.try_to_write_atomic(f"file://{normal_file}", "test content")) + with open(normal_file, "r") as f: + self.assertEqual(f.read(), "test content") - self.assertTrue(os.path.isdir(target_dir)) + os.remove(normal_file) + local_file_io = LocalFileIO(f"file://{temp_dir}", Options({})) + self.assertFalse( + local_file_io.try_to_write_atomic(f"file://{target_dir}", "test content"), + "LocalFileIO should return False when target is a directory") self.assertEqual(len(os.listdir(target_dir)), 0, "No file should be created inside the directory") - normal_file = os.path.join(temp_dir, "normal_file.txt") - result = file_io.try_to_write_atomic(f"file://{normal_file}", "test content") - self.assertTrue(result, "try_to_write_atomic should succeed for a normal file path") - self.assertTrue(os.path.exists(normal_file)) + self.assertTrue(local_file_io.try_to_write_atomic(f"file://{normal_file}", "test content")) with open(normal_file, "r") as f: self.assertEqual(f.read(), "test content") finally: diff --git a/paimon-python/pypaimon/tests/filesystem_catalog_test.py b/paimon-python/pypaimon/tests/filesystem_catalog_test.py index 6bdba47b81a6..fe2c19f18ac9 100644 --- a/paimon-python/pypaimon/tests/filesystem_catalog_test.py +++ b/paimon-python/pypaimon/tests/filesystem_catalog_test.py @@ -1,5 +1,4 @@ # Licensed to the Apache Software Foundation (ASF) under one -# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file @@ -181,11 +180,21 @@ def test_get_database_propagates_exists_error(self): with self.assertRaises(DatabaseNotExistException): catalog.get_database("nonexistent_db") - mock_filesystem = MagicMock() - mock_filesystem.get_file_info.side_effect = OSError("Permission denied") - catalog.file_io.filesystem = mock_filesystem + catalog.create_database("test_db", False) + + # FileSystemCatalog has file_io attribute + from pypaimon.catalog.filesystem_catalog import FileSystemCatalog + self.assertIsInstance(catalog, FileSystemCatalog) + filesystem_catalog = catalog # type: FileSystemCatalog + + original_exists = filesystem_catalog.file_io.exists + filesystem_catalog.file_io.exists = MagicMock(side_effect=OSError("Permission denied")) + # Now get_database should propagate OSError, not DatabaseNotExistException with self.assertRaises(OSError) as context: catalog.get_database("test_db") self.assertIn("Permission denied", str(context.exception)) self.assertNotIsInstance(context.exception, DatabaseNotExistException) + + # Restore original method + filesystem_catalog.file_io.exists = original_exists diff --git a/paimon-python/pypaimon/tests/lance_utils_test.py b/paimon-python/pypaimon/tests/lance_utils_test.py index 2676414d8bb1..71b7b2d5719a 100644 --- a/paimon-python/pypaimon/tests/lance_utils_test.py +++ b/paimon-python/pypaimon/tests/lance_utils_test.py @@ -20,7 +20,7 @@ from pypaimon.common.options import Options from pypaimon.common.options.config import OssOptions -from pypaimon.common.file_io import FileIO +from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO from pypaimon.read.reader.lance_utils import to_lance_specified @@ -35,7 +35,7 @@ def test_oss_url_bucket_extraction_correctness(self): OssOptions.OSS_ACCESS_KEY_SECRET.key(): "test-secret", }) - file_io = FileIO(file_path, properties) + file_io = PyArrowFileIO(file_path, properties) file_path_for_lance, storage_options = to_lance_specified(file_io, file_path) self.assertEqual( @@ -61,7 +61,7 @@ def test_oss_url_with_security_token(self): OssOptions.OSS_SECURITY_TOKEN.key(): "test-token", }) - file_io = FileIO(file_path, properties) + file_io = PyArrowFileIO(file_path, properties) file_path_for_lance, storage_options = to_lance_specified(file_io, file_path) self.assertEqual(file_path_for_lance, "oss://my-bucket/path/to/file.lance") diff --git a/paimon-python/pypaimon/tests/py36/ao_simple_test.py b/paimon-python/pypaimon/tests/py36/ao_simple_test.py index 183a3a72a688..f6bcfa976711 100644 --- a/paimon-python/pypaimon/tests/py36/ao_simple_test.py +++ b/paimon-python/pypaimon/tests/py36/ao_simple_test.py @@ -26,7 +26,7 @@ TableNotExistException) from pypaimon.common.options import Options from pypaimon.common.options.config import OssOptions -from pypaimon.common.file_io import FileIO +from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO from pypaimon.tests.py36.pyarrow_compat import table_sort_by from pypaimon.tests.rest.rest_base_test import RESTBaseTest @@ -404,19 +404,19 @@ def test_initialize_oss_fs_pyarrow_lt_7(self): with patch("pypaimon.common.file_io.pyarrow.__version__", "6.0.0"), \ patch("pyarrow.fs.S3FileSystem") as mock_s3fs: - FileIO("oss://oss-bucket/paimon-database/paimon-table", Options(props)) + PyArrowFileIO("oss://oss-bucket/paimon-database/paimon-table", Options(props)) mock_s3fs.assert_called_once_with(access_key="AKID", secret_key="SECRET", session_token="TOKEN", region="cn-hangzhou", endpoint_override="oss-bucket." + props[OssOptions.OSS_ENDPOINT.key()]) - FileIO("oss://oss-bucket.endpoint/paimon-database/paimon-table", Options(props)) + PyArrowFileIO("oss://oss-bucket.endpoint/paimon-database/paimon-table", Options(props)) mock_s3fs.assert_called_with(access_key="AKID", secret_key="SECRET", session_token="TOKEN", region="cn-hangzhou", endpoint_override="oss-bucket." + props[OssOptions.OSS_ENDPOINT.key()]) - FileIO("oss://access_id:secret_key@Endpoint/oss-bucket/paimon-database/paimon-table", Options(props)) + PyArrowFileIO("oss://access_id:secret_key@Endpoint/oss-bucket/paimon-database/paimon-table", Options(props)) mock_s3fs.assert_called_with(access_key="AKID", secret_key="SECRET", session_token="TOKEN", diff --git a/paimon-python/pypaimon/tests/rest/rest_server.py b/paimon-python/pypaimon/tests/rest/rest_server.py index e0bedeac1bb9..d556f7f55c99 100755 --- a/paimon-python/pypaimon/tests/rest/rest_server.py +++ b/paimon-python/pypaimon/tests/rest/rest_server.py @@ -800,7 +800,7 @@ def _get_file_io(self): from pypaimon.common.options import Options warehouse_path = str(Path(self.data_path) / self.warehouse) options = Options({"warehouse": warehouse_path}) - return FileIO(warehouse_path, options) + return FileIO.get(warehouse_path, options) def _create_table_metadata(self, identifier: Identifier, schema_id: int, schema: Schema, uuid_str: str, is_external: bool) -> TableMetadata: diff --git a/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py b/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py index 07e445b12a4f..47ea8e6cb626 100644 --- a/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py +++ b/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py @@ -18,16 +18,15 @@ import os import pickle import tempfile -import time import unittest from unittest.mock import patch from pypaimon.catalog.rest.rest_token_file_io import RESTTokenFileIO -from pypaimon.common.file_io import FileIO from pypaimon.common.identifier import Identifier from pypaimon.common.options import Options from pypaimon.common.options.config import CatalogOptions, OssOptions +from pypaimon.filesystem.local_file_io import LocalFileIO class RESTTokenFileIOTest(unittest.TestCase): @@ -92,6 +91,30 @@ def test_new_output_stream_path_conversion_and_parent_creation(self): finally: os.chdir(original_cwd) + def test_try_to_write_atomic_directory_check(self): + with patch.object(RESTTokenFileIO, 'try_to_refresh_token'): + file_io = RESTTokenFileIO( + self.identifier, + self.warehouse_path, + self.catalog_options + ) + + target_dir = os.path.join(self.temp_dir, "target_dir") + os.makedirs(target_dir) + + result = file_io.try_to_write_atomic(f"file://{target_dir}", "test content") + self.assertFalse(result, "try_to_write_atomic should return False when target is a directory") + + self.assertTrue(os.path.isdir(target_dir)) + self.assertEqual(len(os.listdir(target_dir)), 0, "No file should be created inside the directory") + + normal_file = os.path.join(self.temp_dir, "normal_file.txt") + result = file_io.try_to_write_atomic(f"file://{normal_file}", "test content") + self.assertTrue(result, "try_to_write_atomic should succeed for a normal file path") + self.assertTrue(os.path.exists(normal_file)) + with open(normal_file, "r") as f: + self.assertEqual(f.read(), "test content") + def test_new_output_stream_behavior_matches_parent(self): """Test that RESTTokenFileIO.new_output_stream behaves like FileIO.new_output_stream.""" with patch.object(RESTTokenFileIO, 'try_to_refresh_token'): @@ -100,7 +123,7 @@ def test_new_output_stream_behavior_matches_parent(self): self.warehouse_path, self.catalog_options ) - regular_file_io = FileIO(self.warehouse_path, self.catalog_options) + regular_file_io = LocalFileIO(self.warehouse_path, self.catalog_options) test_file_path = f"file://{self.temp_dir}/comparison/test.txt" test_content = b"comparison content" @@ -195,9 +218,7 @@ def test_dlf_oss_endpoint_overrides_token_endpoint(self): def test_catalog_options_not_modified(self): from pypaimon.api.rest_util import RESTUtil - from pypaimon.catalog.rest.rest_token import RESTToken - from pyarrow.fs import LocalFileSystem - + original_catalog_options = Options({ CatalogOptions.URI.key(): "http://test-uri", "custom.key": "custom.value" @@ -217,10 +238,8 @@ def test_catalog_options_not_modified(self): OssOptions.OSS_ACCESS_KEY_SECRET.key(): "token-secret-key", OssOptions.OSS_ENDPOINT.key(): "token-endpoint" } - file_io.token = RESTToken(token_dict, int(time.time() * 1000) + 3600000) - with patch.object(FileIO, '_initialize_oss_fs', return_value=LocalFileSystem()): - file_io._initialize_oss_fs("file:///test/path") + merged_token = file_io._merge_token_with_catalog_options(token_dict) self.assertEqual( original_catalog_options.to_map(), @@ -230,7 +249,7 @@ def test_catalog_options_not_modified(self): merged_properties = RESTUtil.merge( original_catalog_options.to_map(), - file_io._merge_token_with_catalog_options(token_dict) + merged_token ) self.assertIn("custom.key", merged_properties) @@ -238,6 +257,53 @@ def test_catalog_options_not_modified(self): self.assertIn(OssOptions.OSS_ACCESS_KEY_ID.key(), merged_properties) self.assertEqual(merged_properties[OssOptions.OSS_ACCESS_KEY_ID.key()], "token-access-key") + def test_filesystem_property(self): + with patch.object(RESTTokenFileIO, 'try_to_refresh_token'): + file_io = RESTTokenFileIO( + self.identifier, + self.warehouse_path, + self.catalog_options + ) + + self.assertTrue(hasattr(file_io, 'filesystem'), "RESTTokenFileIO should have filesystem property") + filesystem = file_io.filesystem + self.assertIsNotNone(filesystem, "filesystem should not be None") + + self.assertTrue(hasattr(filesystem, 'open_input_file'), + "filesystem should support open_input_file method") + + def test_uri_reader_factory_property(self): + with patch.object(RESTTokenFileIO, 'try_to_refresh_token'): + file_io = RESTTokenFileIO( + self.identifier, + self.warehouse_path, + self.catalog_options + ) + + self.assertTrue(hasattr(file_io, 'uri_reader_factory'), + "RESTTokenFileIO should have uri_reader_factory property") + uri_reader_factory = file_io.uri_reader_factory + self.assertIsNotNone(uri_reader_factory, "uri_reader_factory should not be None") + + self.assertTrue(hasattr(uri_reader_factory, 'create'), + "uri_reader_factory should support create method") + + def test_filesystem_and_uri_reader_factory_after_serialization(self): + with patch.object(RESTTokenFileIO, 'try_to_refresh_token'): + original_file_io = RESTTokenFileIO( + self.identifier, + self.warehouse_path, + self.catalog_options + ) + + pickled = pickle.dumps(original_file_io) + restored_file_io = pickle.loads(pickled) + + self.assertIsNotNone(restored_file_io.filesystem, + "filesystem should work after deserialization") + self.assertIsNotNone(restored_file_io.uri_reader_factory, + "uri_reader_factory should work after deserialization") + if __name__ == '__main__': unittest.main()