Introduce log writer APIs (log_io.cpp, log_io.hpp)#1144
Conversation
| { | ||
| public: | ||
| async_disk_writer_t(int validate_flushed_batch_efd, int signal_checkpoint_efd); | ||
| async_disk_writer_t(int validate_flushed_batch_eventfd, int signal_checkpoint_eventfd); |
There was a problem hiding this comment.
shouldn't the names be event_fd instead of eventfd?
There was a problem hiding this comment.
No, because eventfd is a kernel object type (like memfd), and has its own open(2)-like system call eventfd() (which really should have been eventfd_create()).
| constexpr size_t c_max_chunks_per_txn = 8; | ||
|
|
||
| // 8 chunks can hold up to 8 * (2^16 - 2^8) = 522240 64B objects, | ||
| constexpr size_t c_max_log_records = 522240; |
There was a problem hiding this comment.
Can you express this value in terms of c_max_chunks_per_txn, so that changing that value doesn't require recalculating this one?
There was a problem hiding this comment.
Hmm, this will conflict with my current changes (which limit txn logs to 2^16 records), so I think we can postpone discussion until after those are merged.
There was a problem hiding this comment.
In the meantime, here are some numbers to consider: we can address up to 2^16 txn logs in metadata (16 bits to embed an fd/offset), and we can have at most 2^32 live object versions (we have 2^38 bytes in the data segment and objects are at least 2^6 bytes). I think it makes sense to limit the total number of txn records to the max number of live versions (otherwise some records must be addressing versions that don't exist), so we also can have at most 2^32 txn log records, so to ensure this limit is respected we can have at most 2^16 records/txn log. That's a bit smaller than I'd like, but I think the consistency arguments above are compelling. We can discuss if this limit is actually adequate for customer use cases (ditto our 64KB object size limit).
Given the above, we can address at most 2^32 bytes of new object versions in a txn log: max object size is 2^16 bytes and at most 2^16 new versions can be addressed in a txn log. Chunk size is 2^22 bytes, so that means that a txn log can address at most 2^10 chunks. Given 16-bit chunk IDs, that's 2KB for chunk ID storage, or half a 4KB page.
There was a problem hiding this comment.
Also, I think my current changes to redesign in-memory txn log storage unfortunately need to be merged before these changes, even though these changes are further along, because the scalability issues with mmap() revealed in benchmarking seem to have much more perf impact than any deficiencies of the current persistence implementation.
| uint32_t decision_count; | ||
| }; | ||
|
|
||
| char padding[3]; |
There was a problem hiding this comment.
Can you add a comment to explain what this is used for / why it is necessary?
There was a problem hiding this comment.
Also, it would help if the 3 would be a constant so its name would be more descriptive of why 3 is an appropriate value.
senderista
left a comment
There was a problem hiding this comment.
This is really looking good, just a few things could be improved. Also a heads-up that my pending changes to replace txn log fds will require some rework (but not really that much).
|
|
||
| // Keep track of session threads to unblock. | ||
| std::unordered_map<gaia_txn_id_t, int> m_ts_to_session_decision_fd_map; | ||
| std::unordered_map<gaia_txn_id_t, int> m_ts_to_session_decision_eventfd_map; |
There was a problem hiding this comment.
Is this data structure only accessed by a single thread? (Obviously it's not threadsafe.)
| // Track maximum number of new chunks (apart from the one that the txn is already using) | ||
| // that can be allocated per transaction. | ||
| // This sets an upper bound on txn size: 32MB < txn_size < 36MB | ||
| constexpr size_t c_max_chunks_per_txn = 8; |
There was a problem hiding this comment.
See my comments elsewhere: based on existing limits (including those in my pending changes for txn log storage), I think we need to allow up to 2^10 chunks/txn.
Also, the max txn log size in bytes is now 1MB (2^16 16-byte records).
| @@ -108,6 +113,8 @@ struct txn_log_t | |||
| // convenient place for shared state between the client and server. | |||
| memory_manager::chunk_offset_t current_chunk; | |||
There was a problem hiding this comment.
FYI I'm removing this in my current changes (it never worked anyway for its intended purpose as shared state).
| // convenient place for shared state between the client and server. | ||
| memory_manager::chunk_offset_t current_chunk; | ||
| size_t record_count; | ||
| int session_decision_eventfd; |
There was a problem hiding this comment.
I don't think it makes sense to put a datum that is valid for only one process (i.e. an fd) in shared-memory state. We should discuss alternatives, e.g. registering this eventfd as part of txn submission, or registering some general index into an array of session shared state (which I think we will need anyway for reliable recovery from crashed sessions).
| memory_manager::chunk_offset_t current_chunk; | ||
| size_t record_count; | ||
| int session_decision_eventfd; | ||
| size_t chunk_count; |
There was a problem hiding this comment.
As you know, we can always infer the set of chunks used in a txn by scanning the offsets in the txn log (which we need to do anyway during conflict detection), so we should discuss if this is necessary.
| // The txn_log is sorted on the client for the correct validation impl, thus this map is used to track order of writes. | ||
| // Note that writes beloning to a txn can be assigned in arbitrary chunk order (due to chunk reuse) which is another reason to | ||
| // track chunk ids in the log. | ||
| std::map<chunk_offset_t, std::set<gaia_offset_t>> chunk_to_offsets_map; |
There was a problem hiding this comment.
I see no reason to track the full set of offsets in a chunk at all, when all you care about is the smallest and largest ones. Why not just track min and max offsets per chunk?
| std::map<chunk_offset_t, std::set<gaia_offset_t>> chunk_to_offsets_map; | ||
| for (size_t i = 0; i < log.data()->chunk_count; i++) | ||
| { | ||
| auto chunk = log.data()->chunks + i; |
There was a problem hiding this comment.
I think it would be clearer to use array indexing rather than pointer arithmetic.
| /** | ||
| * Validate the result of I/O calls submitted to the kernel for processing. | ||
| */ | ||
| void validate_flushed_batch(); |
There was a problem hiding this comment.
Not sure what the meaning of "validate" is here when the return type is void rather than bool.
| #else | ||
| size_t allocation_size_in_slots = (allocation_size_in_bytes + c_slot_size_in_bytes - 1) / c_slot_size_in_bytes; | ||
| #endif | ||
| size_t allocation_size_in_slots = calculate_allocation_size_in_slots(allocation_size_in_bytes); |
There was a problem hiding this comment.
I like this refactoring!
|
|
||
| // Create iovec entries. | ||
| size_t payload_size = 0; | ||
| for (size_t i = 0; i < contiguous_address_offsets.size(); i += 2) |
There was a problem hiding this comment.
Note how the logic in this loop would be simplified by just storing start/end offset pairs in contiguous_address_offsets.
| m_async_disk_writer->perform_post_completion_maintenance(); | ||
| } | ||
|
|
||
| void log_handler_t::submit_writes(bool sync) |
There was a problem hiding this comment.
Please use the same parameter name as async_disk_writer_t::submit_and_swap_in_progress_batch(): should_wait_for_completion.
| /** | ||
| * Submit async_disk_writer's internal I/O request queue to the kernel for processing. | ||
| */ | ||
| void submit_writes(bool sync); |
There was a problem hiding this comment.
Please use the same parameter name as async_disk_writer_t::submit_and_swap_in_progress_batch(): should_wait_for_completion.
| @@ -116,10 +116,10 @@ void async_disk_writer_t::perform_post_completion_maintenance() | |||
| transactions::txn_metadata_t::set_txn_durable(decision.commit_ts); | |||
There was a problem hiding this comment.
Setting the TXN_DURABLE flag can only be correct if both the committed data and the decision have been persisted. How do you know at this point that the data is also durable? Isn't the log batch committed asynchronously with the decision batch?
There was a problem hiding this comment.
A bit of a higher-level design digression: generally speaking, if we have dedicated txn metadata flags for a property, we don't need a watermark for it, and vice versa. The key difference is that metadata flags can be updated out-of-order, while a watermark is in-order by construction (it can only represent a "prefix property", i.e. a predicate which is true for every timestamp <= the watermark). If txns were made durable strictly in commit_ts order (or if there were no latency penalty from requiring a full prefix of txns to be durable before advancing the watermark), then I think a durability watermark could replace the TXN_DURABLE flag. This also seems reasonable given the current batch approach, since we can't signal a session thread that its committing txn is durable until fdatasync() has completed for that txn's log and decision batches. If we required batches to observe the prefix property as well (i.e. we couldn't finalize a batch as long as there were any "holes" in it), then I think this would be a natural approach. However, in view of the longer-term design goal to make durability fully asynchronous using O_DIRECT | O_DATASYNC (so we can signal a session thread as soon as its log/decision write has completed, without having to wait for fdatasync() to complete for the whole batch), I think it's probably better to stick with the metadata flags approach, since that would allow committing session threads to be signaled out-of-order by the logging thread (when we move to O_DIRECT), which might be important for txns with large variance in update size (to prevent large txns from blocking commit notification of small txns). Anyway, curious to hear your thoughts on this.
|
I think all of my and others' feedback has been addressed, and I also rewrote some clearly inefficient code. |
simone-gaia
left a comment
There was a problem hiding this comment.
LGTM, but honestly don't understand much of what is going on...
Is this code hooked up anywhere or at least tested?
|
|
||
| m_validate_flush_efd = validate_flush_efd; | ||
| m_signal_checkpoint_efd = signal_checkpoint_efd; | ||
| m_validate_flush_eventfd = validate_flush_efd; |
There was a problem hiding this comment.
nit: should we put an _ before fd? (i.e m_validate_flush_event_fd).
There was a problem hiding this comment.
no, eventfd is the name of a kernel object type and a syscall
| void perform_flushed_batch_maintenance(); | ||
|
|
||
| private: | ||
| static constexpr char c_gaia_wal_dir_name[] = "/wal_dir"; |
There was a problem hiding this comment.
I don't think it is a good idea to prepend / to a directory name.
There was a problem hiding this comment.
agree, will fix (I certainly haven't found all the issues in the original code)
| { | ||
| auto dirpath = wal_dir_path; | ||
| ASSERT_PRECONDITION(!dirpath.empty(), "Gaia persistent directory path shouldn't be empty."); | ||
| s_wal_dir_path = dirpath.append(c_gaia_wal_dir_name); |
There was a problem hiding this comment.
I think the operator is just /.
Neither, but there's no time for that. I'll do lots of testing for the next 2 PRs. |
| // operations on the file. | ||
| log_file_t::log_file_t(const std::string& dir, int dir_fd, file_sequence_t file_seq, size_t size) | ||
| log_file_t::log_file_t(const std::string& dir_name, int dir_fd, file_sequence_t file_seq, size_t file_size) | ||
| : m_file_size(file_size), m_file_seq(file_seq), m_dir_name(dir_name), m_dir_fd(dir_fd) |
There was a problem hiding this comment.
Don't we also need m_current_offset(0)?
There was a problem hiding this comment.
Also, could we perform the initializations in the same order in which the arguments are passed?
There was a problem hiding this comment.
m_current_offset is already initialized to a default value (see its declaration). We get a warning if initialization order doesn't match declaration order.
There was a problem hiding this comment.
Can we change declaration order or does it matter to optimize storage?
|
|
||
| log_handler_t::log_handler_t(const std::string& wal_dir_path) | ||
| { | ||
| auto dirpath = wal_dir_path; |
There was a problem hiding this comment.
dirpath -> dir_path
| log_handler_t::log_handler_t(const std::string& wal_dir_path) | ||
| { | ||
| auto dirpath = wal_dir_path; | ||
| ASSERT_PRECONDITION(!dirpath.empty(), "Gaia persistent directory path shouldn't be empty."); |
There was a problem hiding this comment.
Should "persistent" be "persistence". It's not a directory that is persistent, but a directory used for persistence. Perhaps we should also say something like "data", i.e. "Gaia data persistence directory" instead of "Gaia persistent directory".
There was a problem hiding this comment.
Or "Gaia persistent data directory" - this goes well with "persistent log directory" that is used further down.
|
|
||
| void log_handler_t::open_for_writes(int validate_flushed_batch_eventfd, int signal_checkpoint_eventfd) | ||
| { | ||
| ASSERT_PRECONDITION(validate_flushed_batch_eventfd != -1, "Eventfd to signal post flush maintenance operations invalid!"); |
There was a problem hiding this comment.
I'm not sure if I got this right:
Eventfd to signal post flush maintenance operations invalid!
->
Eventfd to signal post-flush maintenance operations are invalid!
| writes_to_submit.push_back({header_ptr, sizeof(record_header_t)}); | ||
| writes_to_submit.push_back({txn_decisions_ptr, txn_decision_size}); | ||
|
|
||
| m_async_disk_writer->enqueue_pwritev_requests(writes_to_submit, m_current_file->get_file_fd(), m_current_file->get_current_offset(), uring_op_t::pwritev_decision); |
There was a problem hiding this comment.
Can you break the arguments of this call?
| std::vector<chunk_data_t> chunk_data; | ||
|
|
||
| // Obtain deleted IDs and min/max offsets per chunk. | ||
| for (size_t i = 0; i < txn_log->record_count; i++) |
LaurentiuCristofor
left a comment
There was a problem hiding this comment.
Left some comments on minor issues, nothing blocking.
Introduce log writer APIs (log_io.cpp, log_io.hpp)