-
Notifications
You must be signed in to change notification settings - Fork 1.3k
[python] Refactor FullStartingScanner by introducing PartialStartingScanner class #7032
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
| if idx_of_this_subtask >= number_of_para_subtasks: | ||
| raise Exception("idx_of_this_subtask must be less than number_of_para_subtasks") | ||
| if self.start_pos_of_this_subtask is not None: | ||
| raise Exception("with_shard and with_slice cannot be used simultaneously") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use a suitable Exception or Value type instead of generic Exception, such as ValueError.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👌
| split.shard_file_idx_map[file.file_name] = (0, plan_end_pos - file_begin_pos) | ||
| elif file_end_pos <= plan_start_pos or file_begin_pos >= plan_end_pos: | ||
| split.shard_file_idx_map[file.file_name] = (-1, -1) | ||
| return file_end_pos |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
define a constant for (-1, -1), which is important for other developers to understand it.
| return filtered_partitioned_files, (plan_start_pos, plan_end_pos) | ||
|
|
||
| def _append_only_filter_by_shard(self, partitioned_files: defaultdict) -> (defaultdict, int, int): | ||
| """ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar code with _data_evolution_filter_by_shard, can you refactor ?
| if is_blob and not self._is_blob_file(file.file_name): | ||
| if self._partial_read(): | ||
| partitioned_files = self._filter_by_pos(partitioned_files) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need sort before call _filter_by_pos?
| self.partial_read = True | ||
| self.starting_scanner = self._create_starting_scanner() | ||
| self.starting_scanner.with_shard(idx_of_this_subtask, number_of_para_subtasks) | ||
| return self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If INCREMENTAL_BETWEEN_TIMESTAMP is configured, _create_starting_scanner() returns IncrementalStartingScanner which doesn't have with_shard(). This will cause AttributeError.
|
|
||
| def _filter_by_pos(self, files): | ||
| if self.table.is_primary_key_table: | ||
| return self._primary_key_filter_by_shard(files) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may cause exception if call with_slice for pk table
Purpose
Tests
API and Format
Documentation