diff --git a/CosmOS-rootfs b/CosmOS-rootfs index 95ab8a8..57eb5cc 160000 --- a/CosmOS-rootfs +++ b/CosmOS-rootfs @@ -1 +1 @@ -Subproject commit 95ab8a86ff87c7b33d499d96078c9b0864796d1e +Subproject commit 57eb5ccaed0aa2b9d5bff83b48d2220238bae213 diff --git a/Makefile b/Makefile index 19070b6..666db59 100644 --- a/Makefile +++ b/Makefile @@ -19,6 +19,7 @@ RUN_TEST_FS ?= .make/sdcard-$(ARCH)-run.img TEST_FS_LA ?= sdcard-la.img RUN_TEST_FS_LA ?= .make/sdcard-la-run.img QEMU_NETDEV ?= user,id=net +FAST_RUN_QEMU_NETDEV ?= user,id=net,hostfwd=tcp::7777-:7777 QEMU_TRACE_ARGS ?= QEMU_COMP_BLK_ARGS = -drive file=$(RUN_TEST_FS),if=none,format=raw,id=x0 -device virtio-blk-device,drive=x0,bus=virtio-mmio-bus.0 QEMU_COMP_EXTRA_BLK_ARGS = -drive file=$(RUN_DISK_IMG),if=none,format=raw,id=x1 -device virtio-blk-device,drive=x1,bus=virtio-mmio-bus.1 @@ -28,6 +29,9 @@ USER_BUILD_STAMP_RV := $(USER_BIN_DIR_RV)/.xxos-build.stamp USER_BUILD_STAMP_LA := $(USER_BIN_DIR_LA)/.xxos-build.stamp KERNEL_BUILD_STAMP_RV := $(STAMP_DIR)/kernel-build-rv.stamp KERNEL_BUILD_STAMP_LA := $(STAMP_DIR)/kernel-build-la.stamp +KERNEL_LOG_KEY := $(if $(strip $(LOG)),$(strip $(LOG)),OFF) +KERNEL_LOG_STAMP_RV := $(STAMP_DIR)/kernel-log-rv-$(KERNEL_LOG_KEY).stamp +KERNEL_LOG_STAMP_LA := $(STAMP_DIR)/kernel-log-la-$(KERNEL_LOG_KEY).stamp USER_BUILD_DEPS := user/Makefile user/Cargo.toml $(shell find user/src -type f | sort) KERNEL_BUILD_DEPS := os/Makefile os/Cargo.toml os/build.rs $(shell find os/src fs/src -type f | sort) LA_BOOTLOADER_DIR ?= bootloader/loongarch64-direct @@ -134,11 +138,17 @@ $(USER_BUILD_STAMP_LA): $(USER_BUILD_DEPS) user-apps: $(USER_BUILD_STAMP_RV) user-apps-la: $(USER_BUILD_STAMP_LA) -$(KERNEL_BUILD_STAMP_RV): $(KERNEL_BUILD_DEPS) | $(STAMP_DIR) +$(KERNEL_LOG_STAMP_RV): | $(STAMP_DIR) + touch $@ + +$(KERNEL_LOG_STAMP_LA): | $(STAMP_DIR) + touch $@ + +$(KERNEL_BUILD_STAMP_RV): $(KERNEL_BUILD_DEPS) $(KERNEL_LOG_STAMP_RV) | $(STAMP_DIR) $(MAKE) -C os kernel ARCH=riscv64 touch $@ -$(KERNEL_BUILD_STAMP_LA): $(KERNEL_BUILD_DEPS) | $(STAMP_DIR) +$(KERNEL_BUILD_STAMP_LA): $(KERNEL_BUILD_DEPS) $(KERNEL_LOG_STAMP_LA) | $(STAMP_DIR) $(MAKE) -C os kernel ARCH=loongarch64 touch $@ @@ -204,7 +214,7 @@ check-kernel: $(RUN_KERNEL) exit 1; \ } -check-kernel-la: +check-kernel-la: kernel-la @test -x kernel-la || { \ echo "missing kernel-la; run 'make all' first" >&2; \ exit 1; \ @@ -304,7 +314,7 @@ run-la: check-kernel-la $(LA_BOOTLOADER_ELF) $(DISK_LA_IMG) prepare-run-test-fs- $(QEMU_LA) -machine virt -cpu la464 -kernel $(LA_BOOTLOADER_ELF) -device loader,file=kernel-la,addr=$(LA_KERNEL_ENTRY_PA) -m $(MEM_LA) -nographic -smp $(SMP) $(QEMU_LA_BLK_ARGS) -device virtio-net-pci,netdev=net0,id=net0 -netdev $(QEMU_LA_NETDEV) -no-reboot -rtc base=utc $(QEMU_LA_EXTRA_BLK_ARGS) fast-run: check-kernel - $(QEMU) -machine virt -kernel kernel-rv -m $(MEM) -nographic -smp $(SMP) -bios default $(QEMU_COMP_BLK_ARGS) -device virtio-net-device,netdev=net -netdev $(QEMU_NETDEV) -no-reboot -rtc base=utc $(QEMU_COMP_EXTRA_BLK_ARGS) $(QEMU_TRACE_ARGS) + $(QEMU) -machine virt -kernel kernel-rv -m $(MEM) -nographic -smp $(SMP) -bios default $(QEMU_COMP_BLK_ARGS) -device virtio-net-device,netdev=net -netdev $(FAST_RUN_QEMU_NETDEV) -no-reboot -rtc base=utc $(QEMU_COMP_EXTRA_BLK_ARGS) $(QEMU_TRACE_ARGS) fast-run-la: check-kernel-la $(LA_BOOTLOADER_ELF) $(QEMU_LA) -machine virt -cpu la464 -kernel $(LA_BOOTLOADER_ELF) -device loader,file=kernel-la,addr=$(LA_KERNEL_ENTRY_PA) -m $(MEM_LA) -nographic -smp $(SMP) $(QEMU_LA_BLK_ARGS) -device virtio-net-pci,netdev=net0,id=net0 -netdev $(QEMU_LA_NETDEV) -no-reboot -rtc base=utc $(QEMU_LA_EXTRA_BLK_ARGS) diff --git a/fs/Cargo.toml b/fs/Cargo.toml index fffd6ad..0349f54 100644 --- a/fs/Cargo.toml +++ b/fs/Cargo.toml @@ -19,3 +19,6 @@ debug = true [features] board_qemu = [] board_k210 = [] +io_perf_counters = [] + +default = ["io_perf_counters"] diff --git a/fs/src/block_cache.rs b/fs/src/block_cache.rs index e189475..6dd7943 100644 --- a/fs/src/block_cache.rs +++ b/fs/src/block_cache.rs @@ -1,12 +1,54 @@ //! Block Cache Layer //! Implements about the disk block cache functionality -use super::{BlockDevice, BLOCK_SZ}; -use alloc::collections::VecDeque; +use super::{BlockDevice, BlockWrite, BLOCK_SZ}; +use alloc::collections::{BTreeMap, VecDeque}; +#[cfg(feature = "io_perf_counters")] +use alloc::string::String; use alloc::sync::Arc; use alloc::vec; use alloc::vec::Vec; +#[cfg(feature = "io_perf_counters")] +use core::fmt::Write; +#[cfg(feature = "io_perf_counters")] +use core::sync::atomic::{AtomicUsize, Ordering}; use lazy_static::*; use spin::Mutex; + +#[cfg(feature = "io_perf_counters")] +static GET_CALLS: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "io_perf_counters")] +static GET_HITS: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "io_perf_counters")] +static GET_MISSES: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "io_perf_counters")] +static OVERWRITE_CALLS: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "io_perf_counters")] +static OVERWRITE_HITS: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "io_perf_counters")] +static OVERWRITE_MISSES: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "io_perf_counters")] +static OVERWRITE_DIRECT_WRITE_OPS: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "io_perf_counters")] +static OVERWRITE_DIRECT_WRITE_BLOCKS: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "io_perf_counters")] +static OVERWRITE_RANGES_CALLS: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "io_perf_counters")] +static OVERWRITE_RANGES_ITEMS: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "io_perf_counters")] +static OVERWRITE_RANGES_SINGLE_ITEM_CALLS: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "io_perf_counters")] +static OVERWRITE_RANGES_BLOCKS: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "io_perf_counters")] +static EVICTIONS: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "io_perf_counters")] +static LOOKUP_SCAN_STEPS: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "io_perf_counters")] +static EVICT_SCAN_STEPS: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "io_perf_counters")] +static SYNC_ALL_CALLS: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "io_perf_counters")] +static SYNC_BLOCK_VISITS: AtomicUsize = AtomicUsize::new(0); + /// BlockCache is a cache for a block in disk. pub struct BlockCache { cache: Vec, @@ -28,6 +70,20 @@ impl BlockCache { modified: false, } } + + /// Create a cache entry for a block whose entire content is being overwritten. + pub fn new_overwrite(block_id: usize, block_device: Arc, data: &[u8]) -> Self { + assert!(data.len() == BLOCK_SZ); + let mut cache = vec![0u8; BLOCK_SZ]; + cache.copy_from_slice(data); + Self { + cache, + block_id, + block_device, + modified: true, + } + } + /// Get the slice in the block cache according to the offset. fn addr_of_offset(&self, offset: usize) -> usize { &self.cache[offset] as *const _ as usize @@ -99,8 +155,10 @@ const BLOCK_CACHE_SIZE: usize = 2048; /// BlockCacheManager is a manager for BlockCache. pub struct BlockCacheManager { - /// ((device_id, block_id), block_cache) - queue: VecDeque<((usize, usize), Arc>)>, + /// Cache keys in insertion order, used only to choose eviction victims. + queue: VecDeque<(usize, usize)>, + /// Cache entries indexed by `(device_id, block_id)` for fast lookup. + map: BTreeMap<(usize, usize), Arc>>, } impl BlockCacheManager { @@ -108,6 +166,7 @@ impl BlockCacheManager { pub fn new() -> Self { Self { queue: VecDeque::new(), + map: BTreeMap::new(), } } @@ -121,35 +180,209 @@ impl BlockCacheManager { block_id: usize, block_device: Arc, ) -> Arc> { + #[cfg(feature = "io_perf_counters")] + GET_CALLS.fetch_add(1, Ordering::Relaxed); let key = (Self::device_id(&block_device), block_id); - if let Some(pair) = self.queue.iter().find(|pair| pair.0 == key) { - Arc::clone(&pair.1) - } else { - // substitute - if self.queue.len() == BLOCK_CACHE_SIZE { - // from front to tail - if let Some((idx, _)) = self - .queue - .iter() - .enumerate() - .find(|(_, pair)| Arc::strong_count(&pair.1) == 1) - { - self.queue.drain(idx..=idx); - } else { - panic!("Run out of BlockCache!"); + #[cfg(feature = "io_perf_counters")] + LOOKUP_SCAN_STEPS.fetch_add(1, Ordering::Relaxed); + if let Some(block_cache) = self.map.get(&key) { + #[cfg(feature = "io_perf_counters")] + GET_HITS.fetch_add(1, Ordering::Relaxed); + return Arc::clone(block_cache); + } + #[cfg(feature = "io_perf_counters")] + GET_MISSES.fetch_add(1, Ordering::Relaxed); + + // substitute + if self.map.len() == BLOCK_CACHE_SIZE { + // from front to tail + if let Some((idx, _)) = self + .queue + .iter() + .enumerate() + .find(|(_, key)| { + self.map + .get(key) + .map(|cache| Arc::strong_count(cache) == 1) + .unwrap_or(true) + }) + { + #[cfg(feature = "io_perf_counters")] + EVICT_SCAN_STEPS.fetch_add(idx + 1, Ordering::Relaxed); + #[cfg(feature = "io_perf_counters")] + EVICTIONS.fetch_add(1, Ordering::Relaxed); + if let Some(evicted_key) = self.queue.remove(idx) { + self.map.remove(&evicted_key); } + } else { + #[cfg(feature = "io_perf_counters")] + EVICT_SCAN_STEPS.fetch_add(self.queue.len(), Ordering::Relaxed); + panic!("Run out of BlockCache!"); } - // load block into mem and push back - let block_cache = Arc::new(Mutex::new(BlockCache::new( - block_id, - Arc::clone(&block_device), - ))); - self.queue.push_back((key, Arc::clone(&block_cache))); + } + // load block into mem and push back + let block_cache = Arc::new(Mutex::new(BlockCache::new( + block_id, + Arc::clone(&block_device), + ))); + self.queue.push_back(key); + self.map.insert(key, Arc::clone(&block_cache)); + block_cache + } + + pub fn overwrite_block_cache_range( + &mut self, + start_block: usize, + block_device: Arc, + data: &[u8], + ) { + assert!(data.len() % BLOCK_SZ == 0); + let block_count = data.len() / BLOCK_SZ; + #[cfg(feature = "io_perf_counters")] + OVERWRITE_CALLS.fetch_add(block_count, Ordering::Relaxed); + if block_count == 0 { + return; + } + let device_id = Self::device_id(&block_device); + let end_block = start_block + .checked_add(block_count) + .expect("block cache overwrite range overflow"); + let start_key = (device_id, start_block); + let end_key = (device_id, end_block); + + #[cfg(feature = "io_perf_counters")] + let mut hits = 0usize; + #[cfg(not(feature = "io_perf_counters"))] + let hits = (); + + for ((_, block_id), block_cache) in self.map.range(start_key..end_key) { + let idx = *block_id - start_block; + let offset = idx * BLOCK_SZ; block_cache + .lock() + .write_bytes(0, &data[offset..offset + BLOCK_SZ]); + #[cfg(feature = "io_perf_counters")] + { + hits += 1; + } + } + + #[cfg(feature = "io_perf_counters")] + { + LOOKUP_SCAN_STEPS.fetch_add(1 + hits, Ordering::Relaxed); + OVERWRITE_HITS.fetch_add(hits, Ordering::Relaxed); + OVERWRITE_MISSES.fetch_add(block_count - hits, Ordering::Relaxed); } + #[cfg(not(feature = "io_perf_counters"))] + let _ = hits; } } +#[cfg(feature = "io_perf_counters")] +fn load(counter: &AtomicUsize) -> usize { + counter.load(Ordering::Relaxed) +} + +#[cfg(feature = "io_perf_counters")] +pub fn reset_perf_counters() { + GET_CALLS.store(0, Ordering::Relaxed); + GET_HITS.store(0, Ordering::Relaxed); + GET_MISSES.store(0, Ordering::Relaxed); + OVERWRITE_CALLS.store(0, Ordering::Relaxed); + OVERWRITE_HITS.store(0, Ordering::Relaxed); + OVERWRITE_MISSES.store(0, Ordering::Relaxed); + OVERWRITE_DIRECT_WRITE_OPS.store(0, Ordering::Relaxed); + OVERWRITE_DIRECT_WRITE_BLOCKS.store(0, Ordering::Relaxed); + OVERWRITE_RANGES_CALLS.store(0, Ordering::Relaxed); + OVERWRITE_RANGES_ITEMS.store(0, Ordering::Relaxed); + OVERWRITE_RANGES_SINGLE_ITEM_CALLS.store(0, Ordering::Relaxed); + OVERWRITE_RANGES_BLOCKS.store(0, Ordering::Relaxed); + EVICTIONS.store(0, Ordering::Relaxed); + LOOKUP_SCAN_STEPS.store(0, Ordering::Relaxed); + EVICT_SCAN_STEPS.store(0, Ordering::Relaxed); + SYNC_ALL_CALLS.store(0, Ordering::Relaxed); + SYNC_BLOCK_VISITS.store(0, Ordering::Relaxed); +} + +#[cfg(feature = "io_perf_counters")] +pub fn render_perf_counters() -> String { + let get_calls = load(&GET_CALLS); + let overwrite_calls = load(&OVERWRITE_CALLS); + let lookups = get_calls + overwrite_calls; + let evictions = load(&EVICTIONS); + let lookup_steps = load(&LOOKUP_SCAN_STEPS); + let evict_scan_steps = load(&EVICT_SCAN_STEPS); + let cached_blocks = BLOCK_CACHE_MANAGER.lock().map.len(); + + let mut out = String::new(); + let _ = writeln!(&mut out, "block_cache:"); + let _ = writeln!(&mut out, " cached_blocks {}", cached_blocks); + let _ = writeln!(&mut out, " get_calls {}", get_calls); + let _ = writeln!(&mut out, " get_hits {}", load(&GET_HITS)); + let _ = writeln!(&mut out, " get_misses {}", load(&GET_MISSES)); + let _ = writeln!(&mut out, " overwrite_calls {}", overwrite_calls); + let _ = writeln!(&mut out, " overwrite_hits {}", load(&OVERWRITE_HITS)); + let _ = writeln!(&mut out, " overwrite_misses {}", load(&OVERWRITE_MISSES)); + let _ = writeln!( + &mut out, + " overwrite_direct_write_ops {}", + load(&OVERWRITE_DIRECT_WRITE_OPS) + ); + let _ = writeln!( + &mut out, + " overwrite_direct_write_blocks {}", + load(&OVERWRITE_DIRECT_WRITE_BLOCKS) + ); + let _ = writeln!( + &mut out, + " overwrite_ranges_calls {}", + load(&OVERWRITE_RANGES_CALLS) + ); + let _ = writeln!( + &mut out, + " overwrite_ranges_items {}", + load(&OVERWRITE_RANGES_ITEMS) + ); + let _ = writeln!( + &mut out, + " overwrite_ranges_single_item_calls {}", + load(&OVERWRITE_RANGES_SINGLE_ITEM_CALLS) + ); + let _ = writeln!( + &mut out, + " overwrite_ranges_blocks {}", + load(&OVERWRITE_RANGES_BLOCKS) + ); + let _ = writeln!(&mut out, " evictions {}", evictions); + let _ = writeln!(&mut out, " lookup_steps {}", lookup_steps); + let _ = writeln!( + &mut out, + " avg_lookup_steps_x100 {}", + if lookups == 0 { + 0 + } else { + lookup_steps.saturating_mul(100) / lookups + } + ); + let _ = writeln!(&mut out, " evict_scan_steps {}", evict_scan_steps); + let _ = writeln!( + &mut out, + " avg_evict_scan_x100 {}", + if evictions == 0 { + 0 + } else { + evict_scan_steps.saturating_mul(100) / evictions + } + ); + let _ = writeln!(&mut out, " sync_all_calls {}", load(&SYNC_ALL_CALLS)); + let _ = writeln!( + &mut out, + " sync_block_visits {}", + load(&SYNC_BLOCK_VISITS) + ); + out +} + lazy_static! { /// BLOCK_CACHE_MANAGER: Glocal instance of BlockCacheManager. pub static ref BLOCK_CACHE_MANAGER: Mutex = @@ -164,10 +397,91 @@ pub fn get_block_cache( .lock() .get_block_cache(block_id, block_device) } + +/// Overwrite a complete block cache entry without first loading old disk data. +pub fn overwrite_block_cache(block_id: usize, block_device: Arc, data: &[u8]) { + overwrite_block_cache_range(block_id, block_device, data); +} + +/// Overwrite complete contiguous blocks without first loading old disk data. +pub fn overwrite_block_cache_range( + start_block: usize, + block_device: Arc, + data: &[u8], +) { + assert!(data.len() % BLOCK_SZ == 0); + if data.is_empty() { + return; + } + + BLOCK_CACHE_MANAGER.lock().overwrite_block_cache_range( + start_block, + Arc::clone(&block_device), + data, + ); + #[cfg(feature = "io_perf_counters")] + OVERWRITE_DIRECT_WRITE_OPS.fetch_add(1, Ordering::Relaxed); + #[cfg(feature = "io_perf_counters")] + OVERWRITE_DIRECT_WRITE_BLOCKS.fetch_add(data.len() / BLOCK_SZ, Ordering::Relaxed); + block_device.write_blocks(start_block, data); +} + +/// Overwrite multiple complete contiguous block ranges without first loading old disk data. +pub fn overwrite_block_cache_ranges(block_device: Arc, writes: &[BlockWrite<'_>]) { + if writes.is_empty() { + return; + } + + let mut write_count = 0usize; + #[cfg(feature = "io_perf_counters")] + let mut block_count = 0usize; + { + let mut manager = BLOCK_CACHE_MANAGER.lock(); + for write in writes { + assert!(write.data.len() % BLOCK_SZ == 0); + if write.data.is_empty() { + continue; + } + write_count += 1; + #[cfg(feature = "io_perf_counters")] + { + block_count += write.data.len() / BLOCK_SZ; + } + manager.overwrite_block_cache_range( + write.start_block, + Arc::clone(&block_device), + write.data, + ); + } + } + + if write_count == 0 { + return; + } + #[cfg(feature = "io_perf_counters")] + { + OVERWRITE_RANGES_CALLS.fetch_add(1, Ordering::Relaxed); + OVERWRITE_RANGES_ITEMS.fetch_add(write_count, Ordering::Relaxed); + if write_count == 1 { + OVERWRITE_RANGES_SINGLE_ITEM_CALLS.fetch_add(1, Ordering::Relaxed); + } + OVERWRITE_RANGES_BLOCKS.fetch_add(block_count, Ordering::Relaxed); + } + #[cfg(feature = "io_perf_counters")] + OVERWRITE_DIRECT_WRITE_OPS.fetch_add(write_count, Ordering::Relaxed); + #[cfg(feature = "io_perf_counters")] + OVERWRITE_DIRECT_WRITE_BLOCKS.fetch_add(block_count, Ordering::Relaxed); + block_device.write_blocks_many(writes); +} + /// Sync(write) all the block cache to disk. pub fn block_cache_sync_all() { + #[cfg(feature = "io_perf_counters")] + SYNC_ALL_CALLS.fetch_add(1, Ordering::Relaxed); let manager = BLOCK_CACHE_MANAGER.lock(); - for (_, cache) in manager.queue.iter() { + for cache in manager.map.values() { + #[cfg(feature = "io_perf_counters")] + SYNC_BLOCK_VISITS.fetch_add(1, Ordering::Relaxed); cache.lock().sync(); } } diff --git a/fs/src/block_dev.rs b/fs/src/block_dev.rs index 7bc59f0..92c4f98 100644 --- a/fs/src/block_dev.rs +++ b/fs/src/block_dev.rs @@ -4,9 +4,39 @@ use core::any::Any; +use crate::BLOCK_SZ; + +/// A contiguous 512-byte-block write request. +pub struct BlockWrite<'a> { + pub start_block: usize, + pub data: &'a [u8], +} + pub trait BlockDevice: Send + Sync + Any { /// Read a block from the block device. fn read_block(&self, block_id: usize, buf: &mut [u8]); /// Write a block to the block device. fn write_block(&self, block_id: usize, buf: &[u8]); + /// Read a contiguous range of 512-byte blocks. + fn read_blocks(&self, start_block: usize, buf: &mut [u8]) { + assert!(buf.len() % BLOCK_SZ == 0); + for (idx, block) in buf.chunks_mut(BLOCK_SZ).enumerate() { + self.read_block(start_block + idx, block); + } + } + /// Write a contiguous range of 512-byte blocks. + fn write_blocks(&self, start_block: usize, buf: &[u8]) { + assert!(buf.len() % BLOCK_SZ == 0); + for (idx, block) in buf.chunks(BLOCK_SZ).enumerate() { + self.write_block(start_block + idx, block); + } + } + /// Write multiple independent contiguous ranges. + fn write_blocks_many(&self, writes: &[BlockWrite<'_>]) { + for write in writes { + if !write.data.is_empty() { + self.write_blocks(write.start_block, write.data); + } + } + } } diff --git a/fs/src/ext4/mod.rs b/fs/src/ext4/mod.rs index 6f767c9..d3df5e5 100644 --- a/fs/src/ext4/mod.rs +++ b/fs/src/ext4/mod.rs @@ -2,11 +2,17 @@ use alloc::{string::String, sync::Arc, vec, vec::Vec}; use core::any::Any; use core::cmp::min; use core::fmt; +#[cfg(feature = "io_perf_counters")] +use core::fmt::Write; +#[cfg(feature = "io_perf_counters")] +use core::sync::atomic::{AtomicUsize, Ordering}; use log::{debug, info}; use spin::Mutex; -use crate::block_cache::get_block_cache; -use crate::block_dev::BlockDevice as OsBlockDevice; +use crate::block_cache::{ + get_block_cache, overwrite_block_cache_range, overwrite_block_cache_ranges, +}; +use crate::block_dev::{BlockDevice as OsBlockDevice, BlockWrite as OsBlockWrite}; use crate::dentry_cache::insert_dentry; use crate::errno::FS_ERRNO; use crate::{STATFS_MAGIC_EXT4, STATFS_NAMELEN_DEFAULT, VfsStatFs}; @@ -14,7 +20,7 @@ use crate::vfs::{Inode, InodeTime, VfsAttrs, VfsFileType, VfsNode}; use crate::BLOCK_SZ; use ext4_rs::{ - BlockDevice as Ext4BlockDevice, Ext4, InodeFileType, BLOCK_SIZE + BlockDevice as Ext4BlockDevice, BlockWrite as Ext4BlockWrite, Ext4, InodeFileType, BLOCK_SIZE, }; /// Adapts the OS block-id based device into ext4_rs offset-based IO. @@ -22,6 +28,17 @@ struct Ext4BlockDeviceAdapter { inner: Arc, } +#[cfg(feature = "io_perf_counters")] +static WRITE_OFFSETS_MANY_CALLS: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "io_perf_counters")] +static WRITE_OFFSETS_MANY_ITEMS: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "io_perf_counters")] +static WRITE_OFFSETS_MANY_SINGLE_ITEM_CALLS: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "io_perf_counters")] +static WRITE_OFFSETS_MANY_ALIGNED_ITEMS: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "io_perf_counters")] +static WRITE_OFFSETS_MANY_UNALIGNED_ITEMS: AtomicUsize = AtomicUsize::new(0); + const EXT4_ROOT_INODE: u32 = 2; #[inline] @@ -76,35 +93,129 @@ impl Ext4BlockDevice for Ext4BlockDeviceAdapter { return; } - let start_block = offset / BLOCK_SZ; - let end_block = (offset + data.len()).div_ceil(BLOCK_SZ); + let mut written = 0usize; - for block_id in start_block..end_block { - let block_start = block_id * BLOCK_SZ; - let seg_start = offset.max(block_start); - let seg_end = (offset + data.len()).min(block_start + BLOCK_SZ); - if seg_start >= seg_end { - continue; - } + if offset % BLOCK_SZ != 0 { + let block_id = offset / BLOCK_SZ; + let dst_start = offset % BLOCK_SZ; + let len = (BLOCK_SZ - dst_start).min(data.len()); + get_block_cache(block_id, Arc::clone(&self.inner)) + .lock() + .write_bytes(dst_start, &data[..len]); + written += len; + } + + let aligned_len = (data.len() - written) / BLOCK_SZ * BLOCK_SZ; + if aligned_len != 0 { + let start_block = (offset + written) / BLOCK_SZ; + overwrite_block_cache_range( + start_block, + Arc::clone(&self.inner), + &data[written..written + aligned_len], + ); + written += aligned_len; + } - let src_start = seg_start - offset; - let src_end = seg_end - offset; - let dst_start = seg_start - block_start; - let dst_end = seg_end - block_start; + if written < data.len() { + let block_id = (offset + written) / BLOCK_SZ; + get_block_cache(block_id, Arc::clone(&self.inner)) + .lock() + .write_bytes(0, &data[written..]); + } + } - if dst_start == 0 && dst_end == BLOCK_SZ { - get_block_cache(block_id, Arc::clone(&self.inner)) - .lock() - .write_bytes(0, &data[src_start..src_end]); + fn write_offsets_many(&self, writes: &[Ext4BlockWrite<'_>]) { + #[cfg(feature = "io_perf_counters")] + { + let non_empty = writes.iter().filter(|write| !write.data.is_empty()).count(); + if non_empty != 0 { + WRITE_OFFSETS_MANY_CALLS.fetch_add(1, Ordering::Relaxed); + WRITE_OFFSETS_MANY_ITEMS.fetch_add(non_empty, Ordering::Relaxed); + if non_empty == 1 { + WRITE_OFFSETS_MANY_SINGLE_ITEM_CALLS.fetch_add(1, Ordering::Relaxed); + } + } + } + + let mut pending: Vec> = Vec::new(); + for write in writes { + if write.data.is_empty() { + continue; + } + if offset_and_len_are_block_aligned(write.offset, write.data.len()) { + #[cfg(feature = "io_perf_counters")] + WRITE_OFFSETS_MANY_ALIGNED_ITEMS.fetch_add(1, Ordering::Relaxed); + pending.push(OsBlockWrite { + start_block: write.offset / BLOCK_SZ, + data: write.data, + }); } else { - get_block_cache(block_id, Arc::clone(&self.inner)) - .lock() - .write_bytes(dst_start, &data[src_start..src_end]); + #[cfg(feature = "io_perf_counters")] + WRITE_OFFSETS_MANY_UNALIGNED_ITEMS.fetch_add(1, Ordering::Relaxed); + if !pending.is_empty() { + overwrite_block_cache_ranges(Arc::clone(&self.inner), &pending); + pending.clear(); + } + self.write_offset(write.offset, write.data); } } + if !pending.is_empty() { + overwrite_block_cache_ranges(Arc::clone(&self.inner), &pending); + } } } +#[inline] +fn offset_and_len_are_block_aligned(offset: usize, len: usize) -> bool { + offset % BLOCK_SZ == 0 && len % BLOCK_SZ == 0 +} + +#[cfg(feature = "io_perf_counters")] +fn perf_load(counter: &AtomicUsize) -> usize { + counter.load(Ordering::Relaxed) +} + +#[cfg(feature = "io_perf_counters")] +pub fn reset_perf_counters() { + WRITE_OFFSETS_MANY_CALLS.store(0, Ordering::Relaxed); + WRITE_OFFSETS_MANY_ITEMS.store(0, Ordering::Relaxed); + WRITE_OFFSETS_MANY_SINGLE_ITEM_CALLS.store(0, Ordering::Relaxed); + WRITE_OFFSETS_MANY_ALIGNED_ITEMS.store(0, Ordering::Relaxed); + WRITE_OFFSETS_MANY_UNALIGNED_ITEMS.store(0, Ordering::Relaxed); +} + +#[cfg(feature = "io_perf_counters")] +pub fn render_perf_counters() -> String { + let mut out = String::new(); + let _ = writeln!(&mut out, "ext4:"); + let _ = writeln!( + &mut out, + " write_offsets_many_calls {}", + perf_load(&WRITE_OFFSETS_MANY_CALLS) + ); + let _ = writeln!( + &mut out, + " write_offsets_many_items {}", + perf_load(&WRITE_OFFSETS_MANY_ITEMS) + ); + let _ = writeln!( + &mut out, + " write_offsets_many_single_item_calls {}", + perf_load(&WRITE_OFFSETS_MANY_SINGLE_ITEM_CALLS) + ); + let _ = writeln!( + &mut out, + " write_offsets_many_aligned_items {}", + perf_load(&WRITE_OFFSETS_MANY_ALIGNED_ITEMS) + ); + let _ = writeln!( + &mut out, + " write_offsets_many_unaligned_items {}", + perf_load(&WRITE_OFFSETS_MANY_UNALIGNED_ITEMS) + ); + out +} + pub struct Ext4FileSystem { ext4: Mutex, } diff --git a/fs/src/ext4_rs b/fs/src/ext4_rs index 016bbbb..95e4209 160000 --- a/fs/src/ext4_rs +++ b/fs/src/ext4_rs @@ -1 +1 @@ -Subproject commit 016bbbba3bb2670472eeda35b3b497c6ff2222ef +Subproject commit 95e420914f7442d9bd1bc0af5cc7ad00cef6ad7d diff --git a/fs/src/lib.rs b/fs/src/lib.rs index 963e752..669b93d 100644 --- a/fs/src/lib.rs +++ b/fs/src/lib.rs @@ -31,7 +31,7 @@ pub mod errno; pub const BLOCK_SZ: usize = 512; -pub use block_dev::BlockDevice; +pub use block_dev::{BlockDevice, BlockWrite}; pub use easyfs::efs::EasyFileSystem; pub use fat32::Fat32FileSystem; pub use ext4::Ext4FileSystem; diff --git a/os/Cargo.toml b/os/Cargo.toml index 8c9dd06..4e0c734 100644 --- a/os/Cargo.toml +++ b/os/Cargo.toml @@ -14,15 +14,18 @@ buddy_system_allocator = "0.6" bitflags = "1.2.1" xmas-elf = "0.7.0" virtio-drivers = "0.12.0" -fs = { path = "../fs" } +# fs = { path = "../fs" } +fs = { path = "../fs", default-features = false, features = ["io_perf_counters"] } hashbrown = "0.12.3" smoltcp = { path = "../vendor/smoltcp", default-features = false, features = [ "alloc", "medium-ethernet", "proto-ipv4", + "proto-ipv6", "proto-ipv4-fragmentation", "socket-udp", "socket-tcp", + "iface-max-addr-count-4", "fragmentation-buffer-size-16384", ] } strum_macros = "0.28.0" @@ -32,4 +35,6 @@ fat32 = [] easyfs = [] ext4 = [] platform-qemu-virt = [] -default = ["ext4", "platform-qemu-virt"] +io_perf_counters = [] +net_perf_counters = [] +default = ["ext4", "platform-qemu-virt", "io_perf_counters", "net_perf_counters"] diff --git a/os/Makefile b/os/Makefile index 7f786a8..57a5223 100644 --- a/os/Makefile +++ b/os/Makefile @@ -1,5 +1,7 @@ # Building ARCH ?= riscv64 +EXTRA_FEATURES ?= +# EXTRA_FEATURES ?= --features io_perf_counters --features net_perf_counters ARCH_REQUEST := $(strip $(or $(target),$(ARCH),riscv64)) ARCH_NORMALIZED := $(shell printf '%s' "$(ARCH_REQUEST)" | tr '[:upper:]' '[:lower:]') @@ -151,9 +153,9 @@ fs-img: kernel: @echo Platform: $(BOARD) ifeq ($(QEMU7_CFG),--cfg qemu7) - @LOG=$(LOG) cargo rustc $(MODE_ARG) --target $(TARGET) --no-default-features --features $(MAIN_FS) -- --cfg qemu7 + @LOG=$(LOG) cargo rustc $(MODE_ARG) --target $(TARGET) --no-default-features --features $(MAIN_FS) $(EXTRA_FEATURES) -- --cfg qemu7 else - @LOG=$(LOG) cargo build $(MODE_ARG) --target $(TARGET) --no-default-features --features $(MAIN_FS) + @LOG=$(LOG) cargo build $(MODE_ARG) --target $(TARGET) --no-default-features --features $(MAIN_FS) $(EXTRA_FEATURES) endif clean: diff --git a/os/src/drivers/block/mod.rs b/os/src/drivers/block/mod.rs index ca01900..be5e6d8 100644 --- a/os/src/drivers/block/mod.rs +++ b/os/src/drivers/block/mod.rs @@ -20,6 +20,18 @@ use crate::platform::{ VIRTIO_MMIO_BASE, VIRTIO_MMIO_IRQ_BASE, VIRTIO_MMIO_SLOTS, VIRTIO_MMIO_STRIDE, }; +/// Reset block driver performance counters. +#[cfg(feature = "io_perf_counters")] +pub fn reset_perf_counters() { + virtio_blk::reset_perf_counters(); +} + +/// Render block driver performance counters. +#[cfg(feature = "io_perf_counters")] +pub fn render_perf_counters() -> String { + virtio_blk::render_perf_counters() +} + fn virtio_blk_name(idx: usize) -> String { alloc::format!("vd{}", (b'a' + idx as u8) as char) } diff --git a/os/src/drivers/block/virtio_blk.rs b/os/src/drivers/block/virtio_blk.rs index 0519cb5..5c76d87 100644 --- a/os/src/drivers/block/virtio_blk.rs +++ b/os/src/drivers/block/virtio_blk.rs @@ -1,10 +1,15 @@ use super::BlockDevice; use crate::sync::SpinNoIrqLock; use crate::task::{current_task, WaitQueueKeyed, WaitReason}; +use alloc::{boxed::Box, string::String, vec::Vec}; +use core::fmt::Write; use core::hint::spin_loop; +use core::sync::atomic::{AtomicUsize, Ordering}; +use fs::BlockWrite; use virtio_drivers::{ device::blk::{BlkReq, BlkResp, RespStatus, VirtIOBlk}, transport::SomeTransport, + Error as VirtIoError, }; use crate::drivers::virtio::VirtioHal; @@ -17,9 +22,56 @@ pub struct VirtIOBlock { // static mut READ_RECORDS: SpinNoIrqLock<([usize; 512], usize)> = SpinNoIrqLock::new(([0; 512], 0)); +static READ_OPS: AtomicUsize = AtomicUsize::new(0); +static READ_BYTES: AtomicUsize = AtomicUsize::new(0); +static WRITE_OPS: AtomicUsize = AtomicUsize::new(0); +static WRITE_BYTES: AtomicUsize = AtomicUsize::new(0); +static WAIT_POLLS: AtomicUsize = AtomicUsize::new(0); +static TASK_WAITS: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "io_perf_counters")] +static WRITE_MANY_CALLS: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "io_perf_counters")] +static WRITE_MANY_REQS: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "io_perf_counters")] +static WRITE_MANY_MAX_INFLIGHT: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "io_perf_counters")] +static WRITE_MANY_QUEUE_FULL_WAITS: AtomicUsize = AtomicUsize::new(0); + +const VIRTIO_BLK_QUEUE_SIZE: usize = 16; +const VIRTIO_BLK_WRITE_DESCS: usize = 3; +const MAX_WRITE_IN_FLIGHT: usize = VIRTIO_BLK_QUEUE_SIZE / VIRTIO_BLK_WRITE_DESCS; + +struct PendingWrite<'a> { + token: u16, + block_id: usize, + data: &'a [u8], + req: BlkReq, + resp: BlkResp, +} + +impl<'a> PendingWrite<'a> { + fn new(block_id: usize, data: &'a [u8]) -> Self { + Self { + token: 0, + block_id, + data, + req: BlkReq::default(), + resp: BlkResp::default(), + } + } +} + impl BlockDevice for VirtIOBlock { /// Read a block from the virtio_blk device fn read_block(&self, block_id: usize, buf: &mut [u8]) { + self.read_blocks(block_id, buf); + } + + /// Read contiguous blocks from the virtio_blk device. + fn read_blocks(&self, block_id: usize, buf: &mut [u8]) { + assert!(buf.len() % ::fs::BLOCK_SZ == 0); + READ_OPS.fetch_add(1, Ordering::Relaxed); + READ_BYTES.fetch_add(buf.len(), Ordering::Relaxed); let mut req = BlkReq::default(); let mut resp = BlkResp::default(); @@ -85,6 +137,14 @@ impl BlockDevice for VirtIOBlock { } /// Write a block to the virtio_blk device fn write_block(&self, block_id: usize, buf: &[u8]) { + self.write_blocks(block_id, buf); + } + + /// Write contiguous blocks to the virtio_blk device. + fn write_blocks(&self, block_id: usize, buf: &[u8]) { + assert!(buf.len() % ::fs::BLOCK_SZ == 0); + WRITE_OPS.fetch_add(1, Ordering::Relaxed); + WRITE_BYTES.fetch_add(buf.len(), Ordering::Relaxed); let mut req = BlkReq::default(); let mut resp = BlkResp::default(); let token = unsafe { @@ -132,6 +192,117 @@ impl BlockDevice for VirtIOBlock { ); } } + + /// Write multiple independent contiguous ranges with several requests in flight. + fn write_blocks_many(&self, writes: &[BlockWrite<'_>]) { + let total_reqs = writes.iter().filter(|write| !write.data.is_empty()).count(); + if total_reqs == 0 { + return; + } + if total_reqs == 1 { + if let Some(write) = writes.iter().find(|write| !write.data.is_empty()) { + self.write_blocks(write.start_block, write.data); + } + return; + } + + #[cfg(feature = "io_perf_counters")] + { + WRITE_MANY_CALLS.fetch_add(1, Ordering::Relaxed); + WRITE_MANY_REQS.fetch_add(total_reqs, Ordering::Relaxed); + } + + let mut next = 0usize; + let mut in_flight: Vec>> = Vec::new(); + while next < writes.len() || !in_flight.is_empty() { + while next < writes.len() && in_flight.len() < MAX_WRITE_IN_FLIGHT { + let write = &writes[next]; + next += 1; + if write.data.is_empty() { + continue; + } + assert!(write.data.len() % ::fs::BLOCK_SZ == 0); + let mut pending = Box::new(PendingWrite::new(write.start_block, write.data)); + WRITE_OPS.fetch_add(1, Ordering::Relaxed); + WRITE_BYTES.fetch_add(write.data.len(), Ordering::Relaxed); + let token = unsafe { + let mut inner = self.inner.lock(); + match inner.write_blocks_nb( + pending.block_id, + &mut pending.req, + pending.data, + &mut pending.resp, + ) { + Ok(token) => token, + Err(VirtIoError::QueueFull) => { + #[cfg(feature = "io_perf_counters")] + WRITE_MANY_QUEUE_FULL_WAITS.fetch_add(1, Ordering::Relaxed); + WRITE_OPS.fetch_sub(1, Ordering::Relaxed); + WRITE_BYTES.fetch_sub(write.data.len(), Ordering::Relaxed); + next -= 1; + break; + } + Err(err) => { + let capacity = inner.capacity(); + panic!( + "Error when submitting VirtIOBlk batched write: block_id={} buf_len={} capacity={} err={:?}", + pending.block_id, + pending.data.len(), + capacity, + err + ) + } + } + }; + pending.token = token; + in_flight.push(pending); + update_max_write_in_flight(in_flight.len()); + } + + if !in_flight.is_empty() { + let token = self.wait_owned_used_token(&in_flight); + let idx = in_flight + .iter() + .position(|pending| pending.token == token) + .expect("ready token missing from batched write set"); + let mut pending = in_flight.swap_remove(idx); + let result = unsafe { + self.inner.lock().complete_write_blocks( + pending.token, + &pending.req, + pending.data, + &mut pending.resp, + ) + }; + if let Err(err) = result { + let capacity = self.inner.lock().capacity(); + panic!( + "Error when completing VirtIOBlk batched write: block_id={} token={} buf_len={} capacity={} resp_status={:?} err={:?}", + pending.block_id, + pending.token, + pending.data.len(), + capacity, + pending.resp.status(), + err + ); + } + if pending.resp.status() != RespStatus::OK { + let capacity = self.inner.lock().capacity(); + panic!( + "VirtIOBlk batched write response error: block_id={} token={} buf_len={} capacity={} resp_status={:?}", + pending.block_id, + pending.token, + pending.data.len(), + capacity, + pending.resp.status() + ); + } + } else if next < writes.len() { + WAIT_POLLS.fetch_add(1, Ordering::Relaxed); + spin_loop(); + } + } + } } impl VirtIOBlock { @@ -148,12 +319,14 @@ impl VirtIOBlock { let irq_disabled = !crate::hal::local_irqs_enabled(); if current_task().is_none() || irq_disabled { while !self.token_ready(token) { + WAIT_POLLS.fetch_add(1, Ordering::Relaxed); spin_loop(); } return; } // Task context path: park current task and wait for precise token wakeup. + TASK_WAITS.fetch_add(1, Ordering::Relaxed); self.wait_queue .wait_selected_with_reason_or_skip(token, WaitReason::BlockDeviceIo, || { self.token_ready(token) @@ -165,6 +338,18 @@ impl VirtIOBlock { matches!(inner.peek_used(), Some(ready) if ready == token) } + fn wait_owned_used_token(&self, in_flight: &[Box>]) -> u16 { + loop { + if let Some(token) = self.inner.lock().peek_used() { + if in_flight.iter().any(|pending| pending.token == token) { + return token; + } + } + WAIT_POLLS.fetch_add(1, Ordering::Relaxed); + spin_loop(); + } + } + /// Called from external interrupt path for this block device. pub fn handle_irq(&self) { let mut inner = self.inner.lock(); @@ -178,3 +363,62 @@ impl VirtIOBlock { } } } + +fn load(counter: &AtomicUsize) -> usize { + counter.load(Ordering::Relaxed) +} + +pub fn reset_perf_counters() { + READ_OPS.store(0, Ordering::Relaxed); + READ_BYTES.store(0, Ordering::Relaxed); + WRITE_OPS.store(0, Ordering::Relaxed); + WRITE_BYTES.store(0, Ordering::Relaxed); + WAIT_POLLS.store(0, Ordering::Relaxed); + TASK_WAITS.store(0, Ordering::Relaxed); + #[cfg(feature = "io_perf_counters")] + { + WRITE_MANY_CALLS.store(0, Ordering::Relaxed); + WRITE_MANY_REQS.store(0, Ordering::Relaxed); + WRITE_MANY_MAX_INFLIGHT.store(0, Ordering::Relaxed); + WRITE_MANY_QUEUE_FULL_WAITS.store(0, Ordering::Relaxed); + } +} + +pub fn render_perf_counters() -> String { + let mut out = String::new(); + let _ = writeln!(&mut out, "virtio_blk:"); + let _ = writeln!(&mut out, " read_ops {}", load(&READ_OPS)); + let _ = writeln!(&mut out, " read_bytes {}", load(&READ_BYTES)); + let _ = writeln!(&mut out, " write_ops {}", load(&WRITE_OPS)); + let _ = writeln!(&mut out, " write_bytes {}", load(&WRITE_BYTES)); + let _ = writeln!(&mut out, " wait_polls {}", load(&WAIT_POLLS)); + let _ = writeln!(&mut out, " task_waits {}", load(&TASK_WAITS)); + #[cfg(feature = "io_perf_counters")] + { + let _ = writeln!( + &mut out, + " write_many_calls {}", + load(&WRITE_MANY_CALLS) + ); + let _ = writeln!(&mut out, " write_many_reqs {}", load(&WRITE_MANY_REQS)); + let _ = writeln!( + &mut out, + " write_many_max_inflight {}", + load(&WRITE_MANY_MAX_INFLIGHT) + ); + let _ = writeln!( + &mut out, + " write_many_queue_full_waits {}", + load(&WRITE_MANY_QUEUE_FULL_WAITS) + ); + } + out +} + +#[cfg(feature = "io_perf_counters")] +fn update_max_write_in_flight(value: usize) { + WRITE_MANY_MAX_INFLIGHT.fetch_max(value, Ordering::Relaxed); +} + +#[cfg(not(feature = "io_perf_counters"))] +fn update_max_write_in_flight(_value: usize) {} diff --git a/os/src/fs/page_cache.rs b/os/src/fs/page_cache.rs index cc9e866..29b1e69 100644 --- a/os/src/fs/page_cache.rs +++ b/os/src/fs/page_cache.rs @@ -1,7 +1,11 @@ use alloc::collections::{BTreeMap, BTreeSet, VecDeque}; +use alloc::string::String; use alloc::sync::{Arc, Weak}; +use alloc::vec::Vec; use bitflags::bitflags; use core::cmp::{max, min}; +use core::fmt::Write; +use core::sync::atomic::{AtomicUsize, Ordering}; use fs::errno::FS_ERRNO; use fs::Inode; @@ -16,6 +20,19 @@ use crate::syscall::errno::ERRNO; use crate::sync::SpinNoIrqLock; use crate::task::{WaitQueue, WaitReason}; +static READ_PAGE_LOADS: AtomicUsize = AtomicUsize::new(0); +static READ_PAGE_BYTES: AtomicUsize = AtomicUsize::new(0); +static WRITE_MAPPING_CALLS: AtomicUsize = AtomicUsize::new(0); +static WRITE_MAPPING_BYTES: AtomicUsize = AtomicUsize::new(0); +static SYNC_MAPPING_CALLS: AtomicUsize = AtomicUsize::new(0); +static SYNC_RANGE_CALLS: AtomicUsize = AtomicUsize::new(0); +static WRITEBACK_PAGES: AtomicUsize = AtomicUsize::new(0); +static WRITEBACK_BYTES: AtomicUsize = AtomicUsize::new(0); +static WRITEBACK_BATCHES: AtomicUsize = AtomicUsize::new(0); +static WRITEBACK_BATCH_PAGES: AtomicUsize = AtomicUsize::new(0); + +const MAX_WRITEBACK_BATCH_PAGES: usize = 32; + extern "C" { fn ekernel(); } @@ -348,6 +365,59 @@ pub fn sync_all() -> Result<(), ERRNO> { Ok(()) } +fn perf_load(counter: &AtomicUsize) -> usize { + counter.load(Ordering::Relaxed) +} + +pub fn reset_perf_counters() { + READ_PAGE_LOADS.store(0, Ordering::Relaxed); + READ_PAGE_BYTES.store(0, Ordering::Relaxed); + WRITE_MAPPING_CALLS.store(0, Ordering::Relaxed); + WRITE_MAPPING_BYTES.store(0, Ordering::Relaxed); + SYNC_MAPPING_CALLS.store(0, Ordering::Relaxed); + SYNC_RANGE_CALLS.store(0, Ordering::Relaxed); + WRITEBACK_PAGES.store(0, Ordering::Relaxed); + WRITEBACK_BYTES.store(0, Ordering::Relaxed); + WRITEBACK_BATCHES.store(0, Ordering::Relaxed); + WRITEBACK_BATCH_PAGES.store(0, Ordering::Relaxed); +} + +pub fn render_perf_counters() -> String { + let mut out = String::new(); + let _ = writeln!(&mut out, "page_cache:"); + let _ = writeln!(&mut out, " read_page_loads {}", perf_load(&READ_PAGE_LOADS)); + let _ = writeln!(&mut out, " read_page_bytes {}", perf_load(&READ_PAGE_BYTES)); + let _ = writeln!( + &mut out, + " write_mapping_calls {}", + perf_load(&WRITE_MAPPING_CALLS) + ); + let _ = writeln!( + &mut out, + " write_mapping_bytes {}", + perf_load(&WRITE_MAPPING_BYTES) + ); + let _ = writeln!( + &mut out, + " sync_mapping_calls {}", + perf_load(&SYNC_MAPPING_CALLS) + ); + let _ = writeln!(&mut out, " sync_range_calls {}", perf_load(&SYNC_RANGE_CALLS)); + let _ = writeln!(&mut out, " writeback_pages {}", perf_load(&WRITEBACK_PAGES)); + let _ = writeln!(&mut out, " writeback_bytes {}", perf_load(&WRITEBACK_BYTES)); + let _ = writeln!( + &mut out, + " writeback_batches {}", + perf_load(&WRITEBACK_BATCHES) + ); + let _ = writeln!( + &mut out, + " writeback_batch_pages {}", + perf_load(&WRITEBACK_BATCH_PAGES) + ); + out +} + /// 同步某个 inode 指定范围内的脏页。 pub fn sync_inode_range(inode: &Arc, offset: usize, len: usize) -> Result<(), ERRNO> { if len == 0 || !is_inode_page_cacheable(inode) { @@ -500,6 +570,8 @@ fn write_mapping(mapping: &Arc>, offset: usize, buf: if buf.is_empty() { return 0; } + WRITE_MAPPING_CALLS.fetch_add(1, Ordering::Relaxed); + WRITE_MAPPING_BYTES.fetch_add(buf.len(), Ordering::Relaxed); let old_size = mapping.lock().size; let new_size = offset.saturating_add(buf.len()); @@ -825,7 +897,10 @@ fn ensure_page_uptodate( valid_bytes ); // TODO:后续接入通用 truncate 后,需要避免装页与截断并发时把旧数据重新提交回 cache。 - inode.read_at(page_start_off, &mut bytes[..valid_bytes]) + let read = inode.read_at(page_start_off, &mut bytes[..valid_bytes]); + READ_PAGE_LOADS.fetch_add(1, Ordering::Relaxed); + READ_PAGE_BYTES.fetch_add(read, Ordering::Relaxed); + read }; let wait_queue = { @@ -853,14 +928,9 @@ fn mark_page_dirty(mapping: &Arc>, page_guard: &mut C /// 同步单个 mapping 的全部脏页。 fn sync_mapping(mapping: &Arc>) -> Result<(), ERRNO> { - let dirty_pages: alloc::vec::Vec<_> = mapping.lock().dirty_pages.iter().copied().collect(); - for page_idx in dirty_pages { - let page = mapping.lock().pages.get(&page_idx).cloned(); - if let Some(page) = page { - flush_page(mapping, &page)?; - } - } - Ok(()) + SYNC_MAPPING_CALLS.fetch_add(1, Ordering::Relaxed); + let dirty_pages: Vec<_> = mapping.lock().dirty_pages.iter().copied().collect(); + flush_dirty_pages(mapping, &dirty_pages) } /// 同步单个 mapping 中指定范围的脏页。 @@ -869,6 +939,7 @@ fn sync_mapping_range( offset: usize, len: usize, ) -> Result<(), ERRNO> { + SYNC_RANGE_CALLS.fetch_add(1, Ordering::Relaxed); if len == 0 { return Ok(()); } @@ -877,7 +948,7 @@ fn sync_mapping_range( .checked_add(len.saturating_sub(1)) .ok_or(ERRNO::EOVERFLOW)?; let end_idx = file_page_index(end_off); - let dirty_pages: alloc::vec::Vec<_> = { + let dirty_pages: Vec<_> = { let mapping_guard = mapping.lock(); mapping_guard .dirty_pages @@ -885,15 +956,220 @@ fn sync_mapping_range( .copied() .collect() }; - for page_idx in dirty_pages { - let page = mapping.lock().pages.get(&page_idx).cloned(); - if let Some(page) = page { - flush_page(mapping, &page)?; + flush_dirty_pages(mapping, &dirty_pages) +} + +struct WritebackPage { + page_idx: u64, + page: Arc>, +} + +struct WritebackBatch { + pages: Vec, + data: Vec, + owner_inode: Arc, +} + +enum BatchCollectResult { + Batch { + batch: WritebackBatch, + consumed: usize, + }, + Wait(Arc>), + Single(Arc>), + Skip, +} + +fn flush_dirty_pages( + mapping: &Arc>, + dirty_pages: &[u64], +) -> Result<(), ERRNO> { + let mut pos = 0usize; + while pos < dirty_pages.len() { + match collect_writeback_batch(mapping, dirty_pages, pos) { + BatchCollectResult::Batch { batch, consumed } => { + flush_writeback_batch(mapping, batch)?; + pos += consumed.max(1); + } + BatchCollectResult::Wait(page) | BatchCollectResult::Single(page) => { + flush_page(mapping, &page)?; + pos += 1; + } + BatchCollectResult::Skip => { + pos += 1; + } } } Ok(()) } +fn collect_writeback_batch( + mapping: &Arc>, + dirty_pages: &[u64], + start_pos: usize, +) -> BatchCollectResult { + let owner_inode = { + let mapping_guard = mapping.lock(); + mapping_guard + .inode + .upgrade() + .expect("page cache inode disappeared") + }; + + let mut pages = Vec::new(); + let mut data = Vec::new(); + let mut consumed = 0usize; + let mut expected_page_idx = dirty_pages[start_pos]; + + while start_pos + consumed < dirty_pages.len() && pages.len() < MAX_WRITEBACK_BATCH_PAGES { + let page_idx = dirty_pages[start_pos + consumed]; + if page_idx != expected_page_idx { + break; + } + + let Some(page) = mapping.lock().pages.get(&page_idx).cloned() else { + if pages.is_empty() { + return BatchCollectResult::Skip; + } + break; + }; + + let valid_bytes = { + let mut page_guard = page.lock(); + if page_guard.state.contains(CachePageState::WRITEBACK) { + if pages.is_empty() { + return BatchCollectResult::Wait(page.clone()); + } + break; + } + if !page_guard.state.contains(CachePageState::DIRTY) { + if pages.is_empty() { + return BatchCollectResult::Skip; + } + break; + } + if page_guard.valid_bytes == 0 { + if pages.is_empty() { + return BatchCollectResult::Single(page.clone()); + } + break; + } + + page_guard.state.insert(CachePageState::WRITEBACK); + page_guard.pin_count += 1; + let valid_bytes = page_guard.valid_bytes; + let bytes = page_guard.ppn().get_bytes_array(); + data.extend_from_slice(&bytes[..valid_bytes]); + valid_bytes + }; + + pages.push(WritebackPage { + page_idx, + page, + }); + consumed += 1; + + if valid_bytes != PAGE_SIZE { + break; + } + expected_page_idx += 1; + } + + if pages.is_empty() { + BatchCollectResult::Skip + } else { + BatchCollectResult::Batch { + batch: WritebackBatch { + pages, + data, + owner_inode, + }, + consumed, + } + } +} + +fn flush_writeback_batch( + mapping: &Arc>, + batch: WritebackBatch, +) -> Result<(), ERRNO> { + let start_page_idx = batch.pages[0].page_idx; + let expected = batch.data.len(); + let mut write_ok = true; + + WRITEBACK_PAGES.fetch_add(batch.pages.len(), Ordering::Relaxed); + WRITEBACK_BYTES.fetch_add(expected, Ordering::Relaxed); + WRITEBACK_BATCHES.fetch_add(1, Ordering::Relaxed); + WRITEBACK_BATCH_PAGES.fetch_add(batch.pages.len(), Ordering::Relaxed); + + debug!( + "[page_cache] writeback batch: start_page_idx={} pages={} bytes={}", + start_page_idx, + batch.pages.len(), + expected + ); + + match batch + .owner_inode + .write_at_result(page_start(start_page_idx), &batch.data) + { + Ok(written) if written == expected => {} + Ok(written) => { + error!( + "[page_cache] short batch writeback: start_page_idx={} expected={} actual={}", + start_page_idx, + expected, + written + ); + write_ok = false; + } + Err(err) => { + error!( + "[page_cache] batch writeback failed: start_page_idx={} expected={} errno={}", + start_page_idx, + expected, + err as i32 + ); + write_ok = false; + } + } + + for info in batch.pages { + finish_page_writeback(mapping, info.page_idx, &info.page, write_ok); + } + + if write_ok { + Ok(()) + } else { + Err(ERRNO::EIO) + } +} + +fn finish_page_writeback( + mapping: &Arc>, + page_idx: u64, + page: &Arc>, + write_ok: bool, +) { + let wait_queue = { + let mut page_guard = page.lock(); + if page_guard.state.contains(CachePageState::DIRTY) { + if !write_ok || page_guard.map_count > 0 { + // 共享映射仍然存在时先保守地维持脏状态,避免写回后后续写入无法再次通知内核。 + // TODO:后续补齐反向映射后,可在写回前清 PTE 脏位并重新写保护,从而精确清脏。 + mapping.lock().dirty_pages.insert(page_idx); + } else { + page_guard.state.remove(CachePageState::DIRTY); + mapping.lock().dirty_pages.remove(&page_idx); + } + } + page_guard.state.remove(CachePageState::WRITEBACK); + page_guard.pin_count = page_guard.pin_count.saturating_sub(1); + Arc::clone(&page_guard.wait_queue) + }; + wait_queue.wake_all(); +} + /// 将单个脏页写回底层文件。 fn flush_page( mapping: &Arc>, @@ -936,6 +1212,8 @@ fn flush_page( let mut write_ok = true; if valid_bytes != 0 { let bytes = ppn.get_bytes_array(); + WRITEBACK_PAGES.fetch_add(1, Ordering::Relaxed); + WRITEBACK_BYTES.fetch_add(valid_bytes, Ordering::Relaxed); debug!( "[page_cache] writeback page: page_idx={} valid_bytes={}", page_idx, @@ -964,23 +1242,7 @@ fn flush_page( } } - let wait_queue = { - let mut page_guard = page.lock(); - if page_guard.state.contains(CachePageState::DIRTY) { - if !write_ok || page_guard.map_count > 0 { - // 共享映射仍然存在时先保守地维持脏状态,避免写回后后续写入无法再次通知内核。 - // TODO:后续补齐反向映射后,可在写回前清 PTE 脏位并重新写保护,从而精确清脏。 - mapping.lock().dirty_pages.insert(page_idx); - } else { - page_guard.state.remove(CachePageState::DIRTY); - mapping.lock().dirty_pages.remove(&page_idx); - } - } - page_guard.state.remove(CachePageState::WRITEBACK); - page_guard.pin_count = page_guard.pin_count.saturating_sub(1); - Arc::clone(&page_guard.wait_queue) - }; - wait_queue.wake_all(); + finish_page_writeback(mapping, page_idx, page, write_ok); if write_ok { return Ok(()); } diff --git a/os/src/fs/procfs.rs b/os/src/fs/procfs.rs index ad90ac7..eb300ef 100644 --- a/os/src/fs/procfs.rs +++ b/os/src/fs/procfs.rs @@ -17,10 +17,16 @@ use fs::errno::FS_ERRNO; use fs::vfs::{VfsFileType, VfsNode}; use crate::config::{MAX_HARTS, PAGE_SIZE}; +#[cfg(feature = "io_perf_counters")] +use crate::drivers::block as block_drivers; use crate::fs::inode::snapshot_mount_table; +#[cfg(feature = "io_perf_counters")] +use crate::fs::page_cache; use crate::fs::PAGE_CACHE_MANAGER; use crate::keys; use crate::mm::{frame_allocator_stats, MapPermission, VmaKind}; +#[cfg(feature = "net_perf_counters")] +use crate::net; use crate::sched::{list_pids, pid2process}; use crate::signal::{MAX_SIG, SIG_IGN}; use crate::task::{current_process, TaskStatus}; @@ -79,6 +85,24 @@ fn build_mounts() -> String { out } +#[cfg(feature = "io_perf_counters")] +fn reset_io_perf() { + ::fs::block_cache::reset_perf_counters(); + ::fs::ext4::reset_perf_counters(); + block_drivers::reset_perf_counters(); + page_cache::reset_perf_counters(); +} + +#[cfg(feature = "io_perf_counters")] +fn build_io_perf() -> String { + let mut out = String::new(); + out.push_str(&block_drivers::render_perf_counters()); + out.push_str(&::fs::block_cache::render_perf_counters()); + out.push_str(&::fs::ext4::render_perf_counters()); + out.push_str(&page_cache::render_perf_counters()); + out +} + fn parse_proc_u32(buf: &[u8]) -> Result { let text = core::str::from_utf8(buf).map_err(|_| FS_ERRNO::EINVAL)?.trim(); text.parse::().map_err(|_| FS_ERRNO::EINVAL) @@ -605,6 +629,10 @@ impl VfsNode for ProcRootNode { entries.push((String::from("self"), VfsFileType::Symlink)); entries.push((String::from("meminfo"), VfsFileType::Regular)); entries.push((String::from("mounts"), VfsFileType::Regular)); + #[cfg(feature = "io_perf_counters")] + entries.push((String::from("io_perf"), VfsFileType::Regular)); + #[cfg(feature = "net_perf_counters")] + entries.push((String::from("net_perf"), VfsFileType::Regular)); entries.push((String::from("key-users"), VfsFileType::Regular)); entries.push((String::from("sys"), VfsFileType::Directory)); for pid in list_pids() { @@ -618,6 +646,10 @@ impl VfsNode for ProcRootNode { "self" => Some(Arc::new(ProcSelfLinkNode::new()) as Arc), "meminfo" => Some(Arc::new(ProcMeminfoNode::new()) as Arc), "mounts" => Some(Arc::new(ProcMountsNode::new()) as Arc), + #[cfg(feature = "io_perf_counters")] + "io_perf" => Some(Arc::new(ProcIoPerfNode::new()) as Arc), + #[cfg(feature = "net_perf_counters")] + "net_perf" => Some(Arc::new(ProcNetPerfNode::new()) as Arc), "key-users" => Some(Arc::new(ProcKeyUsersNode::new()) as Arc), "sys" => Some(Arc::new(ProcStaticDirNode::new(ProcStaticDirKind::Sys)) as Arc), _ => { @@ -1159,6 +1191,148 @@ impl VfsNode for ProcMountsNode { } } +/// `/proc/io_perf` node. +#[derive(Default, Debug)] +#[cfg(feature = "io_perf_counters")] +pub struct ProcIoPerfNode; + +#[cfg(feature = "io_perf_counters")] +impl ProcIoPerfNode { + /// Create a new `/proc/io_perf` node. + pub fn new() -> Self { + Self + } +} + +#[cfg(feature = "io_perf_counters")] +impl VfsNode for ProcIoPerfNode { + fn as_any(&self) -> &dyn Any { + self + } + + fn file_type(&self) -> VfsFileType { + VfsFileType::Regular + } + + fn size(&self) -> usize { + build_io_perf().len() + } + + fn ls(&self) -> Vec<(String, VfsFileType)> { + Vec::new() + } + + fn find(&self, _name: &str) -> Option> { + None + } + + fn create(&self, _name: &str) -> Option> { + None + } + + fn mkdir(&self, _name: &str) -> Option> { + None + } + + fn clear(&self) { + reset_io_perf(); + } + + fn truncate(&self, _new_size: usize) -> Result<(), FS_ERRNO> { + reset_io_perf(); + Ok(()) + } + + fn read_at(&self, offset: usize, buf: &mut [u8]) -> usize { + read_string_at(build_io_perf(), offset, buf) + } + + fn write_at(&self, _offset: usize, buf: &[u8]) -> usize { + reset_io_perf(); + buf.len() + } + + fn statfs(&self) -> Result { + Ok(crate::fs::empty_statfs( + fs::STATFS_MAGIC_PROC, + crate::config::PAGE_SIZE as u64, + 0x9fa0, + 255, + )) + } +} + +/// `/proc/net_perf` node. +#[derive(Default, Debug)] +#[cfg(feature = "net_perf_counters")] +pub struct ProcNetPerfNode; + +#[cfg(feature = "net_perf_counters")] +impl ProcNetPerfNode { + /// Create a new `/proc/net_perf` node. + pub fn new() -> Self { + Self + } +} + +#[cfg(feature = "net_perf_counters")] +impl VfsNode for ProcNetPerfNode { + fn as_any(&self) -> &dyn Any { + self + } + + fn file_type(&self) -> VfsFileType { + VfsFileType::Regular + } + + fn size(&self) -> usize { + net::render_perf_counters().len() + } + + fn ls(&self) -> Vec<(String, VfsFileType)> { + Vec::new() + } + + fn find(&self, _name: &str) -> Option> { + None + } + + fn create(&self, _name: &str) -> Option> { + None + } + + fn mkdir(&self, _name: &str) -> Option> { + None + } + + fn clear(&self) { + net::reset_perf_counters(); + } + + fn truncate(&self, _new_size: usize) -> Result<(), FS_ERRNO> { + net::reset_perf_counters(); + Ok(()) + } + + fn read_at(&self, offset: usize, buf: &mut [u8]) -> usize { + read_string_at(net::render_perf_counters(), offset, buf) + } + + fn write_at(&self, _offset: usize, buf: &[u8]) -> usize { + net::reset_perf_counters(); + buf.len() + } + + fn statfs(&self) -> Result { + Ok(crate::fs::empty_statfs( + fs::STATFS_MAGIC_PROC, + crate::config::PAGE_SIZE as u64, + 0x9fa0, + 255, + )) + } +} + /// `/proc/self` symlink node. #[derive(Default, Debug)] pub struct ProcSelfLinkNode; diff --git a/os/src/main.rs b/os/src/main.rs index b6a6f00..ac951c3 100644 --- a/os/src/main.rs +++ b/os/src/main.rs @@ -25,7 +25,7 @@ #![feature(panic_info_message)] #![feature(alloc_error_handler)] - + #[macro_use] extern crate log; diff --git a/os/src/net/loopback.rs b/os/src/net/loopback.rs index 72a0cd9..fe909ce 100644 --- a/os/src/net/loopback.rs +++ b/os/src/net/loopback.rs @@ -42,6 +42,11 @@ impl Device for Loopback { fn receive(&mut self, _timestamp: Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> { self.queue.pop_front().map(|buffer| { + debug!( + "loopback receive: remaining_len={} frame_len={}", + self.queue.len(), + buffer.len() + ); let rx = RxToken { buffer }; let tx = TxToken { queue: &mut self.queue }; (rx, tx) diff --git a/os/src/net/mod.rs b/os/src/net/mod.rs index 743f79f..a2213c8 100644 --- a/os/src/net/mod.rs +++ b/os/src/net/mod.rs @@ -15,15 +15,21 @@ mod udp; mod unix_socket; use alloc::{boxed::Box, sync::Arc, vec, vec::Vec}; +#[cfg(feature = "net_perf_counters")] +use alloc::string::String; +#[cfg(feature = "net_perf_counters")] +use core::fmt::Write; use core::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +#[cfg(feature = "net_perf_counters")] +use core::sync::atomic::AtomicUsize; use lazy_static::lazy_static; use smoltcp::{ iface::{Config, Interface, SocketSet}, - phy::{Device, DeviceCapabilities, Medium, RxToken, TxToken}, + phy::{Device, DeviceCapabilities, Medium, PacketMeta, RxToken, TxToken}, socket::{tcp as tcp_socket, udp as udp_socket}, time::Instant, - wire::{EthernetAddress, HardwareAddress, IpAddress, IpCidr, IpEndpoint, Ipv4Address}, + wire::{EthernetAddress, HardwareAddress, IpAddress, IpCidr, IpEndpoint, Ipv4Address, Ipv6Address}, }; use crate::{ @@ -69,17 +75,23 @@ pub use unix_socket::{ const RX_BUF_LEN: usize = 32 * 1024; const MAX_SOCKETS: usize = 256; -const UDP_RX_META: usize = 512; +const UDP_RX_META: usize = 2048; const UDP_TX_META: usize = 512; -const UDP_BUF: usize = 64 * 1024; -const TCP_RX_BUF: usize = 128 * 1024; -const TCP_TX_BUF: usize = 128 * 1024; +const UDP_RX_BUF: usize = 512 * 1024; +const UDP_TX_BUF: usize = 64 * 1024; +const TCP_RX_BUF: usize = 512 * 1024; +const TCP_TX_BUF: usize = 512 * 1024; const EPHEMERAL_PORT_START: u16 = 49152; const EPHEMERAL_PORT_END: u16 = 65535; -const MAX_IMMEDIATE_POLLS: usize = 4; +const MAX_IMMEDIATE_POLLS: usize = 64; +const MAX_SOCKET_IMMEDIATE_POLLS: usize = 8; +const MAX_SOCKET_CATCHUP_POLLS: usize = 32; +const MAX_PASSIVE_LISTEN_SOCKETS: usize = 16; const NO_POLL_DEADLINE_US: u64 = u64::MAX; +const IPV6_LOOPBACK_SOLICITED_NODE: [u8; 16] = + [0xff, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0xff, 0x00, 0x00, 0x01]; // Kernel UDP echo feature (for quick network stack testing). const ENABLE_KERNEL_UDP_ECHO: bool = true; @@ -96,6 +108,318 @@ pub(crate) static NEED_POLL: AtomicBool = AtomicBool::new(false); /// `u64::MAX` means no timer-driven deadline currently exists. pub(crate) static NEXT_POLL_DEADLINE_US: AtomicU64 = AtomicU64::new(NO_POLL_DEADLINE_US); +#[cfg(feature = "net_perf_counters")] +static PERF_POLL_CALLS: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "net_perf_counters")] +static PERF_POLL_SOCKET_WORK_CALLS: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "net_perf_counters")] +static PERF_POLL_SOCKET_WORK_DEEP: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "net_perf_counters")] +static PERF_POLL_SOCKET_WORK_LIGHT: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "net_perf_counters")] +static PERF_POLL_SOCKET_WORK_CATCHUP: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "net_perf_counters")] +static PERF_POLL_SOCKET_RECV_WORK: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "net_perf_counters")] +static PERF_POLL_SOCKET_WORK_ACTIVE_SUM: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "net_perf_counters")] +static PERF_POLL_SOCKET_WORK_ACTIVE_MAX: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "net_perf_counters")] +static PERF_POLL_ONCE_CALLS: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "net_perf_counters")] +static PERF_POLL_BUDGET_EXHAUSTED: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "net_perf_counters")] +static PERF_LOOPBACK_TX_FRAMES: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "net_perf_counters")] +static PERF_LOOPBACK_TX_BYTES: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "net_perf_counters")] +static PERF_LOOPBACK_RX_FRAMES: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "net_perf_counters")] +static PERF_LOOPBACK_RX_BYTES: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "net_perf_counters")] +static PERF_LOOPBACK_MAX_QUEUE: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "net_perf_counters")] +static PERF_VIRTIO_TX_FRAMES: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "net_perf_counters")] +static PERF_VIRTIO_TX_BYTES: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "net_perf_counters")] +static PERF_VIRTIO_RX_FRAMES: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "net_perf_counters")] +static PERF_VIRTIO_RX_BYTES: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "net_perf_counters")] +static PERF_UDP_DIRECT_PKTS: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "net_perf_counters")] +static PERF_UDP_DIRECT_BYTES: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "net_perf_counters")] +static PERF_UDP_DIRECT_DROPS: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "net_perf_counters")] +static PERF_UDP_USER_SEND_CALLS: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "net_perf_counters")] +static PERF_UDP_USER_SEND_BYTES: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "net_perf_counters")] +static PERF_UDP_USER_RECV_CALLS: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "net_perf_counters")] +static PERF_UDP_USER_RECV_BYTES: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "net_perf_counters")] +static PERF_TCP_USER_SEND_CALLS: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "net_perf_counters")] +static PERF_TCP_USER_SEND_BYTES: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "net_perf_counters")] +static PERF_TCP_USER_RECV_CALLS: AtomicUsize = AtomicUsize::new(0); +#[cfg(feature = "net_perf_counters")] +static PERF_TCP_USER_RECV_BYTES: AtomicUsize = AtomicUsize::new(0); + +#[cfg(feature = "net_perf_counters")] +#[inline] +fn perf_load(counter: &AtomicUsize) -> usize { + counter.load(Ordering::Relaxed) +} + +#[cfg(feature = "net_perf_counters")] +#[inline] +fn perf_inc(counter: &AtomicUsize) { + counter.fetch_add(1, Ordering::Relaxed); +} + +#[cfg(feature = "net_perf_counters")] +#[inline] +fn perf_add(counter: &AtomicUsize, value: usize) { + counter.fetch_add(value, Ordering::Relaxed); +} + +#[cfg(feature = "net_perf_counters")] +#[inline] +fn perf_update_max(counter: &AtomicUsize, value: usize) { + let mut current = counter.load(Ordering::Relaxed); + while value > current { + match counter.compare_exchange_weak(current, value, Ordering::Relaxed, Ordering::Relaxed) { + Ok(_) => return, + Err(next) => current = next, + } + } +} + +#[cfg(feature = "net_perf_counters")] +#[inline] +pub(crate) fn perf_tcp_user_send(bytes: usize) { + perf_inc(&PERF_TCP_USER_SEND_CALLS); + perf_add(&PERF_TCP_USER_SEND_BYTES, bytes); +} + +#[cfg(not(feature = "net_perf_counters"))] +#[inline] +pub(crate) fn perf_tcp_user_send(_bytes: usize) {} + +#[cfg(feature = "net_perf_counters")] +#[inline] +pub(crate) fn perf_tcp_user_recv(bytes: usize) { + perf_inc(&PERF_TCP_USER_RECV_CALLS); + perf_add(&PERF_TCP_USER_RECV_BYTES, bytes); +} + +#[cfg(not(feature = "net_perf_counters"))] +#[inline] +pub(crate) fn perf_tcp_user_recv(_bytes: usize) {} + +#[cfg(feature = "net_perf_counters")] +#[inline] +pub(crate) fn perf_udp_user_send(bytes: usize) { + perf_inc(&PERF_UDP_USER_SEND_CALLS); + perf_add(&PERF_UDP_USER_SEND_BYTES, bytes); +} + +#[cfg(not(feature = "net_perf_counters"))] +#[inline] +pub(crate) fn perf_udp_user_send(_bytes: usize) {} + +#[cfg(feature = "net_perf_counters")] +#[inline] +pub(crate) fn perf_udp_user_recv(bytes: usize) { + perf_inc(&PERF_UDP_USER_RECV_CALLS); + perf_add(&PERF_UDP_USER_RECV_BYTES, bytes); +} + +#[cfg(not(feature = "net_perf_counters"))] +#[inline] +pub(crate) fn perf_udp_user_recv(_bytes: usize) {} + +#[cfg(feature = "net_perf_counters")] +pub(crate) fn reset_perf_counters() { + for counter in [ + &PERF_POLL_CALLS, + &PERF_POLL_SOCKET_WORK_CALLS, + &PERF_POLL_SOCKET_WORK_DEEP, + &PERF_POLL_SOCKET_WORK_LIGHT, + &PERF_POLL_SOCKET_WORK_CATCHUP, + &PERF_POLL_SOCKET_RECV_WORK, + &PERF_POLL_SOCKET_WORK_ACTIVE_SUM, + &PERF_POLL_SOCKET_WORK_ACTIVE_MAX, + &PERF_POLL_ONCE_CALLS, + &PERF_POLL_BUDGET_EXHAUSTED, + &PERF_LOOPBACK_TX_FRAMES, + &PERF_LOOPBACK_TX_BYTES, + &PERF_LOOPBACK_RX_FRAMES, + &PERF_LOOPBACK_RX_BYTES, + &PERF_LOOPBACK_MAX_QUEUE, + &PERF_VIRTIO_TX_FRAMES, + &PERF_VIRTIO_TX_BYTES, + &PERF_VIRTIO_RX_FRAMES, + &PERF_VIRTIO_RX_BYTES, + &PERF_UDP_DIRECT_PKTS, + &PERF_UDP_DIRECT_BYTES, + &PERF_UDP_DIRECT_DROPS, + &PERF_UDP_USER_SEND_CALLS, + &PERF_UDP_USER_SEND_BYTES, + &PERF_UDP_USER_RECV_CALLS, + &PERF_UDP_USER_RECV_BYTES, + &PERF_TCP_USER_SEND_CALLS, + &PERF_TCP_USER_SEND_BYTES, + &PERF_TCP_USER_RECV_CALLS, + &PERF_TCP_USER_RECV_BYTES, + ] { + counter.store(0, Ordering::Relaxed); + } +} + +#[cfg(feature = "net_perf_counters")] +pub(crate) fn render_perf_counters() -> String { + let mut out = String::new(); + let _ = writeln!(&mut out, "net:"); + let _ = writeln!(&mut out, " poll_calls {}", perf_load(&PERF_POLL_CALLS)); + let _ = writeln!( + &mut out, + " poll_socket_work_calls {}", + perf_load(&PERF_POLL_SOCKET_WORK_CALLS) + ); + let _ = writeln!( + &mut out, + " poll_socket_work_deep {}", + perf_load(&PERF_POLL_SOCKET_WORK_DEEP) + ); + let _ = writeln!( + &mut out, + " poll_socket_work_light {}", + perf_load(&PERF_POLL_SOCKET_WORK_LIGHT) + ); + let _ = writeln!( + &mut out, + " poll_socket_work_catchup {}", + perf_load(&PERF_POLL_SOCKET_WORK_CATCHUP) + ); + let _ = writeln!( + &mut out, + " poll_socket_recv_work {}", + perf_load(&PERF_POLL_SOCKET_RECV_WORK) + ); + let _ = writeln!( + &mut out, + " poll_socket_work_active_sum {}", + perf_load(&PERF_POLL_SOCKET_WORK_ACTIVE_SUM) + ); + let _ = writeln!( + &mut out, + " poll_socket_work_active_max {}", + perf_load(&PERF_POLL_SOCKET_WORK_ACTIVE_MAX) + ); + let _ = writeln!(&mut out, " poll_once_calls {}", perf_load(&PERF_POLL_ONCE_CALLS)); + let _ = writeln!( + &mut out, + " poll_budget_exhausted {}", + perf_load(&PERF_POLL_BUDGET_EXHAUSTED) + ); + let _ = writeln!(&mut out, "loopback:"); + let _ = writeln!(&mut out, " tx_frames {}", perf_load(&PERF_LOOPBACK_TX_FRAMES)); + let _ = writeln!(&mut out, " tx_bytes {}", perf_load(&PERF_LOOPBACK_TX_BYTES)); + let _ = writeln!(&mut out, " rx_frames {}", perf_load(&PERF_LOOPBACK_RX_FRAMES)); + let _ = writeln!(&mut out, " rx_bytes {}", perf_load(&PERF_LOOPBACK_RX_BYTES)); + let _ = writeln!(&mut out, " max_queue_len {}", perf_load(&PERF_LOOPBACK_MAX_QUEUE)); + let _ = writeln!(&mut out, "virtio:"); + let _ = writeln!(&mut out, " tx_frames {}", perf_load(&PERF_VIRTIO_TX_FRAMES)); + let _ = writeln!(&mut out, " tx_bytes {}", perf_load(&PERF_VIRTIO_TX_BYTES)); + let _ = writeln!(&mut out, " rx_frames {}", perf_load(&PERF_VIRTIO_RX_FRAMES)); + let _ = writeln!(&mut out, " rx_bytes {}", perf_load(&PERF_VIRTIO_RX_BYTES)); + let _ = writeln!(&mut out, "udp:"); + let _ = writeln!(&mut out, " direct_packets {}", perf_load(&PERF_UDP_DIRECT_PKTS)); + let _ = writeln!(&mut out, " direct_bytes {}", perf_load(&PERF_UDP_DIRECT_BYTES)); + let _ = writeln!(&mut out, " direct_drops {}", perf_load(&PERF_UDP_DIRECT_DROPS)); + let _ = writeln!(&mut out, " user_send_calls {}", perf_load(&PERF_UDP_USER_SEND_CALLS)); + let _ = writeln!(&mut out, " user_send_bytes {}", perf_load(&PERF_UDP_USER_SEND_BYTES)); + let _ = writeln!(&mut out, " user_recv_calls {}", perf_load(&PERF_UDP_USER_RECV_CALLS)); + let _ = writeln!(&mut out, " user_recv_bytes {}", perf_load(&PERF_UDP_USER_RECV_BYTES)); + let _ = writeln!(&mut out, "tcp:"); + let _ = writeln!(&mut out, " user_send_calls {}", perf_load(&PERF_TCP_USER_SEND_CALLS)); + let _ = writeln!(&mut out, " user_send_bytes {}", perf_load(&PERF_TCP_USER_SEND_BYTES)); + let _ = writeln!(&mut out, " user_recv_calls {}", perf_load(&PERF_TCP_USER_RECV_CALLS)); + let _ = writeln!(&mut out, " user_recv_bytes {}", perf_load(&PERF_TCP_USER_RECV_BYTES)); + render_tcp_state_snapshot(&mut out); + out +} + +#[cfg(feature = "net_perf_counters")] +fn render_tcp_state_snapshot(out: &mut String) { + let mut guard = NET_STACK.lock(); + let Some(stack) = guard.as_mut() else { + let _ = writeln!(out, "tcp_state_current:"); + let _ = writeln!(out, " unavailable 1"); + return; + }; + + let mut total = 0usize; + let mut closed = 0usize; + let mut listen = 0usize; + let mut syn_sent = 0usize; + let mut syn_received = 0usize; + let mut established = 0usize; + let mut fin_wait1 = 0usize; + let mut fin_wait2 = 0usize; + let mut close_wait = 0usize; + let mut closing = 0usize; + let mut last_ack = 0usize; + let mut time_wait = 0usize; + let mut listener_owned = 0usize; + let mut orphaned = 0usize; + + for st in stack.tcp_states.iter() { + total += 1; + if st.is_listener_owned() { + listener_owned += 1; + } + if st.orphaned.load(Ordering::Relaxed) { + orphaned += 1; + } + let socket = stack.sockets.get_mut::(st.handle); + match socket.state() { + tcp_socket::State::Closed => closed += 1, + tcp_socket::State::Listen => listen += 1, + tcp_socket::State::SynSent => syn_sent += 1, + tcp_socket::State::SynReceived => syn_received += 1, + tcp_socket::State::Established => established += 1, + tcp_socket::State::FinWait1 => fin_wait1 += 1, + tcp_socket::State::FinWait2 => fin_wait2 += 1, + tcp_socket::State::CloseWait => close_wait += 1, + tcp_socket::State::Closing => closing += 1, + tcp_socket::State::LastAck => last_ack += 1, + tcp_socket::State::TimeWait => time_wait += 1, + } + } + + let _ = writeln!(out, "tcp_state_current:"); + let _ = writeln!(out, " total {}", total); + let _ = writeln!(out, " closed {}", closed); + let _ = writeln!(out, " listen {}", listen); + let _ = writeln!(out, " syn_sent {}", syn_sent); + let _ = writeln!(out, " syn_received {}", syn_received); + let _ = writeln!(out, " established {}", established); + let _ = writeln!(out, " fin_wait1 {}", fin_wait1); + let _ = writeln!(out, " fin_wait2 {}", fin_wait2); + let _ = writeln!(out, " close_wait {}", close_wait); + let _ = writeln!(out, " closing {}", closing); + let _ = writeln!(out, " last_ack {}", last_ack); + let _ = writeln!(out, " time_wait {}", time_wait); + let _ = writeln!(out, " listener_owned {}", listener_owned); + let _ = writeln!(out, " orphaned {}", orphaned); +} + /// Userspace-visible IPv4 socket address layout. /// /// `sin_port` and `sin_addr` are in network byte order. @@ -138,6 +462,8 @@ pub fn notify_irq() { /// Call this from a safe context (e.g. timer interrupt path or scheduler tick). pub fn poll() { // print!("p"); + #[cfg(feature = "net_perf_counters")] + perf_inc(&PERF_POLL_CALLS); let now_us = get_time_us() as u64; let need_immediate = NEED_POLL.swap(false, Ordering::AcqRel); let deadline_us = NEXT_POLL_DEADLINE_US.load(Ordering::Acquire); @@ -177,23 +503,36 @@ impl NetStack { cfg.random_seed = 0x5A5A_1234; let mut iface = Interface::new(cfg, &mut device, now); - // Configure both external and loopback addresses on the same interface + // Configure both external and loopback addresses on the same interface. + // Rebuild the list explicitly so IPv6 localhost cannot be dropped silently. iface.update_ip_addrs(|addrs| { - // External network (QEMU user networking) - let external = IpCidr::new(IpAddress::Ipv4(Ipv4Address::new(10, 0, 2, 15)), 24); - if addrs.iter().all(|a| *a != external) { - let _ = addrs.push(external); - } + addrs.clear(); - // Loopback address - let loopback = IpCidr::new(IpAddress::Ipv4(Ipv4Address::new(127, 0, 0, 1)), 8); - if addrs.iter().all(|a| *a != loopback) { - let _ = addrs.push(loopback); - } + // External network (QEMU user networking) + addrs + .push(IpCidr::new( + IpAddress::Ipv4(Ipv4Address::new(10, 0, 2, 15)), + 24, + )) + .expect("failed to configure external IPv4 address"); + + // IPv4 loopback + addrs + .push(IpCidr::new( + IpAddress::Ipv4(Ipv4Address::new(127, 0, 0, 1)), + 8, + )) + .expect("failed to configure IPv4 loopback address"); + + // IPv6 loopback + addrs + .push(IpCidr::new(IpAddress::Ipv6(Ipv6Address::LOCALHOST), 128)) + .expect("failed to configure IPv6 loopback address"); }); let _ = iface .routes_mut() .add_default_ipv4_route(Ipv4Address::new(10, 0, 2, 2)); + info!("[kernel] net: iface addresses = {:?}", iface.ip_addrs()); let storage_vec: Vec> = (0..MAX_SOCKETS).map(|_| smoltcp::iface::SocketStorage::EMPTY).collect(); @@ -233,7 +572,66 @@ impl NetStack { } fn poll(&mut self) { - for _ in 0..MAX_IMMEDIATE_POLLS { + self.poll_with_budget(MAX_IMMEDIATE_POLLS); + } + + pub(crate) fn poll_socket_work_for(&mut self, handle: smoltcp::iface::SocketHandle) { + #[cfg(feature = "net_perf_counters")] + perf_inc(&PERF_POLL_SOCKET_WORK_CALLS); + let active = self.active_tcp_socket_count(); + #[cfg(feature = "net_perf_counters")] + perf_add(&PERF_POLL_SOCKET_WORK_ACTIVE_SUM, active); + #[cfg(feature = "net_perf_counters")] + perf_update_max(&PERF_POLL_SOCKET_WORK_ACTIVE_MAX, active); + if active <= 2 { + #[cfg(feature = "net_perf_counters")] + perf_inc(&PERF_POLL_SOCKET_WORK_DEEP); + self.poll_with_budget(MAX_IMMEDIATE_POLLS); + return; + } else { + #[cfg(feature = "net_perf_counters")] + perf_inc(&PERF_POLL_SOCKET_WORK_LIGHT); + let queued_before = self + .sockets + .get_mut::(handle) + .send_queue(); + self.poll_with_budget(MAX_SOCKET_IMMEDIATE_POLLS); + let socket = self.sockets.get_mut::(handle); + let queued_after = socket.send_queue(); + if queued_after > 0 + && (queued_after >= queued_before.saturating_sub(queued_before / 4) + || queued_after >= TCP_TX_BUF / 8 + || !socket.can_send()) + { + #[cfg(feature = "net_perf_counters")] + perf_inc(&PERF_POLL_SOCKET_WORK_CATCHUP); + self.poll_with_budget(MAX_SOCKET_CATCHUP_POLLS); + } + } + } + + pub(crate) fn poll_socket_recv_work(&mut self) { + #[cfg(feature = "net_perf_counters")] + perf_inc(&PERF_POLL_SOCKET_RECV_WORK); + self.poll_with_budget(MAX_SOCKET_IMMEDIATE_POLLS); + } + + fn active_tcp_socket_count(&mut self) -> usize { + let mut count = 0usize; + for st in self.tcp_states.iter() { + if st.is_listener_owned() { + continue; + } + let socket = self.sockets.get_mut::(st.handle); + if matches!(socket.state(), tcp_socket::State::Established) { + count += 1; + } + } + count + } + + fn poll_with_budget(&mut self, budget: usize) { + for _ in 0..budget { self.poll_once(); // Loopback packets and newly queued socket work often need another @@ -246,14 +644,28 @@ impl NetStack { } if !self.device.loopback.queue.is_empty() { + #[cfg(feature = "net_perf_counters")] + perf_inc(&PERF_POLL_BUDGET_EXHAUSTED); NEED_POLL.store(true, Ordering::Release); } } fn poll_once(&mut self) { + #[cfg(feature = "net_perf_counters")] + perf_inc(&PERF_POLL_ONCE_CALLS); let ts = now(); + if !self.device.loopback.queue.is_empty() { + debug!( + "NetStack::poll_once entering with loopback queue len={}", + self.device.loopback.queue.len() + ); + } let poll_result = self.iface.poll(ts, &mut self.device, &mut self.sockets); - trace!("NetStack::poll result={:?}, loopback queue len after={}", poll_result, self.device.loopback.queue.len()); + debug!( + "NetStack::poll result={:?}, loopback queue len after={}", + poll_result, + self.device.loopback.queue.len() + ); for st in self.udp_states.iter() { let mut ready = 0u16; @@ -300,6 +712,18 @@ impl NetStack { { let socket = self.sockets.get_mut::(st.handle); let state = socket.state(); + if let Some(prev) = st.observe_state_change(state) { + debug!( + "tcp socket {:?} state {} -> {} listener_owned={} open={} local={:?} remote={:?}", + st.handle, + tcp::tcp_state_name_repr(prev), + tcp::tcp_state_name(state), + st.is_listener_owned(), + socket.is_open(), + socket.local_endpoint(), + socket.remote_endpoint() + ); + } if st.is_listener_owned() { listener_owned = true; listener_source_id = tcp::queue_listener_connection_if_ready(st, state); @@ -376,8 +800,8 @@ impl NetStack { ) -> (smoltcp::iface::SocketHandle, Arc) { let rx_meta = vec![udp_socket::PacketMetadata::EMPTY; UDP_RX_META]; let tx_meta = vec![udp_socket::PacketMetadata::EMPTY; UDP_TX_META]; - let rx_buf = vec![0u8; UDP_BUF]; - let tx_buf = vec![0u8; UDP_BUF]; + let rx_buf = vec![0u8; UDP_RX_BUF]; + let tx_buf = vec![0u8; UDP_TX_BUF]; let udp = udp_socket::Socket::new( udp_socket::PacketBuffer::new(rx_meta, rx_buf), udp_socket::PacketBuffer::new(tx_meta, tx_buf), @@ -394,12 +818,87 @@ impl NetStack { self.udp_states.retain(|s| s.handle != handle); } + pub(crate) fn deliver_udp_loopback( + &mut self, + source_handle: smoltcp::iface::SocketHandle, + dst: IpEndpoint, + payload: &[u8], + ) -> bool { + if !is_loopback_ip(dst.addr) { + return false; + } + + let source = self.sockets.get_mut::(source_handle); + let source_endpoint = source.endpoint(); + let source_addr = source_endpoint + .addr + .unwrap_or_else(|| loopback_addr_for(dst.addr)); + let remote = IpEndpoint::new(source_addr, source_endpoint.port); + let metadata = udp_socket::UdpMetadata { + endpoint: remote, + local_address: Some(dst.addr), + meta: PacketMeta::default(), + }; + + let mut best: Option<(u8, Arc)> = None; + for st in self.udp_states.iter() { + let socket = self.sockets.get_mut::(st.handle); + let endpoint = socket.endpoint(); + if endpoint.port != dst.port { + continue; + } + + let addr_matches = match endpoint.addr { + Some(addr) => addr == dst.addr, + None => true, + }; + if !addr_matches { + continue; + } + + let priority = match socket.remote_endpoint() { + Some(peer) if peer == remote => 3, + Some(_) => continue, + None if endpoint.addr.is_some() => 2, + None => 1, + }; + + if best + .as_ref() + .map(|(best_priority, _)| priority > *best_priority) + .unwrap_or(true) + { + best = Some((priority, Arc::clone(st))); + } + } + + let Some((_, target)) = best else { + return false; + }; + + let socket = self.sockets.get_mut::(target.handle); + if socket.inject_recv_slice(payload, metadata).is_ok() { + #[cfg(feature = "net_perf_counters")] + perf_inc(&PERF_UDP_DIRECT_PKTS); + #[cfg(feature = "net_perf_counters")] + perf_add(&PERF_UDP_DIRECT_BYTES, payload.len()); + target.read_wait.wake_one(); + notify_poll_source(target.source_id(), POLLIN); + } else { + #[cfg(feature = "net_perf_counters")] + perf_inc(&PERF_UDP_DIRECT_DROPS); + } + true + } + pub(crate) fn create_tcp_socket( &mut self, ) -> (smoltcp::iface::SocketHandle, Arc) { let rx = tcp_socket::SocketBuffer::new(vec![0u8; TCP_RX_BUF]); let tx = tcp_socket::SocketBuffer::new(vec![0u8; TCP_TX_BUF]); - let tcp = tcp_socket::Socket::new(rx, tx); + let mut tcp = tcp_socket::Socket::new(rx, tx); + tcp.set_ack_delay(None); + tcp.set_nagle_enabled(false); let handle = self.sockets.add(tcp); let st = Arc::new(TcpSocketState::new(handle)); self.tcp_states.push(Arc::clone(&st)); @@ -422,6 +921,31 @@ fn now() -> Instant { Instant::from_micros(get_time_us() as i64) } +#[inline] +fn is_loopback_ip(addr: IpAddress) -> bool { + match addr { + IpAddress::Ipv4(addr) => addr.is_loopback(), + IpAddress::Ipv6(addr) => addr.is_loopback(), + } +} + +#[inline] +fn loopback_addr_for(addr: IpAddress) -> IpAddress { + match addr { + IpAddress::Ipv4(_) => IpAddress::Ipv4(Ipv4Address::new(127, 0, 0, 1)), + IpAddress::Ipv6(_) => IpAddress::Ipv6(Ipv6Address::LOCALHOST), + } +} + +fn read_ipv6_addr(bytes: &[u8]) -> Option { + if bytes.len() < 16 { + return None; + } + let mut octets = [0u8; 16]; + octets.copy_from_slice(&bytes[..16]); + Some(Ipv6Address::from(octets)) +} + /// Multi-device that routes packets between VirtIO (external) and Loopback (local). struct MultiDevice { virtio: VirtioSmoltcpDevice, @@ -498,8 +1022,20 @@ impl RxToken for MultiRxToken { F: FnOnce(&[u8]) -> R, { match self { - MultiRxToken::Virtio(token) => token.consume(f), - MultiRxToken::Loopback(token) => token.consume(f), + MultiRxToken::Virtio(token) => token.consume(|frame| { + #[cfg(feature = "net_perf_counters")] + perf_inc(&PERF_VIRTIO_RX_FRAMES); + #[cfg(feature = "net_perf_counters")] + perf_add(&PERF_VIRTIO_RX_BYTES, frame.len()); + f(frame) + }), + MultiRxToken::Loopback(token) => token.consume(|frame| { + #[cfg(feature = "net_perf_counters")] + perf_inc(&PERF_LOOPBACK_RX_FRAMES); + #[cfg(feature = "net_perf_counters")] + perf_add(&PERF_LOOPBACK_RX_BYTES, frame.len()); + f(frame) + }), } } } @@ -544,23 +1080,46 @@ impl<'a> TxToken for MultiTxToken<'a> { false } } + 0x86DD => { + if buf.len() >= 54 { + let dst = &buf[38..54]; + dst == Ipv6Address::LOCALHOST.octets() + || dst == IPV6_LOOPBACK_SOLICITED_NODE + } else { + false + } + } _ => false, } } else { false }; - trace!("Consume token of len {}, loopback: {}, buf[12] = {:x}, buf[13] = {:x}, buf[30] = {:x}", len, is_loopback, buf[12], buf[13], buf[30]); if is_loopback { // Directly push to loopback queue (our custom Loopback has a public queue field) - trace!("Pushing to loopback queue, current len={}", self.loopback.queue.len()); + debug!( + "net tx routed to loopback queue: before_len={} frame_len={}", + self.loopback.queue.len(), + len + ); self.loopback.queue.push_back(buf); - trace!("Loopback queue len after push={}", self.loopback.queue.len()); + #[cfg(feature = "net_perf_counters")] + perf_inc(&PERF_LOOPBACK_TX_FRAMES); + #[cfg(feature = "net_perf_counters")] + perf_add(&PERF_LOOPBACK_TX_BYTES, len); + #[cfg(feature = "net_perf_counters")] + perf_update_max(&PERF_LOOPBACK_MAX_QUEUE, self.loopback.queue.len()); + debug!("net tx routed to loopback queue: after_len={}", self.loopback.queue.len()); } else { // Send to VirtIO using the standard path match self.virtio.dev.try_send(&buf) { - Ok(true) => {} + Ok(true) => { + #[cfg(feature = "net_perf_counters")] + perf_inc(&PERF_VIRTIO_TX_FRAMES); + #[cfg(feature = "net_perf_counters")] + perf_add(&PERF_VIRTIO_TX_BYTES, len); + } Ok(false) => { trace!("net: tx queue busy, drop one frame"); } @@ -657,7 +1216,12 @@ impl TxToken for VirtioTxToken { let ret = f(&mut buf); match self.dev.try_send(&buf) { - Ok(true) => {} + Ok(true) => { + #[cfg(feature = "net_perf_counters")] + perf_inc(&PERF_VIRTIO_TX_FRAMES); + #[cfg(feature = "net_perf_counters")] + perf_add(&PERF_VIRTIO_TX_BYTES, len); + } Ok(false) => { trace!("net: tx queue busy, drop one frame"); } diff --git a/os/src/net/tcp.rs b/os/src/net/tcp.rs index 507c4e1..3c1061b 100644 --- a/os/src/net/tcp.rs +++ b/os/src/net/tcp.rs @@ -9,7 +9,7 @@ use core::any::Any; use core::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; use smoltcp::socket::tcp as tcp_socket; -use smoltcp::wire::{IpAddress, IpEndpoint, IpListenEndpoint, Ipv4Address}; +use smoltcp::wire::{IpAddress, IpEndpoint, IpListenEndpoint, Ipv4Address, Ipv6Address}; use crate::fs::{File, Stat, StatMode}; use crate::mm::UserBuffer; @@ -25,6 +25,31 @@ use crate::timer::{add_timer_with_socket_tag, get_time_ns}; const SOMAXCONN: usize = 128; +#[inline] +fn unspecified_addr_for_family(family: i32) -> IpAddress { + if family == super::AF_INET6 as i32 { + IpAddress::Ipv6(Ipv6Address::UNSPECIFIED) + } else { + IpAddress::Ipv4(Ipv4Address::new(0, 0, 0, 0)) + } +} + +#[inline] +fn stack_listen_addr_for_family(family: i32, addr: Option) -> Option { + match (family, addr) { + (x, None) if x == super::AF_INET6 as i32 => Some(IpAddress::Ipv6(Ipv6Address::LOCALHOST)), + (_, addr) => addr, + } +} + +#[inline] +fn ipv4_loopback_listen_endpoint(port: u16) -> IpListenEndpoint { + IpListenEndpoint { + addr: Some(IpAddress::Ipv4(Ipv4Address::new(127, 0, 0, 1))), + port, + } +} + #[inline] fn normalize_backlog(backlog: usize) -> usize { backlog.clamp(1, SOMAXCONN) @@ -41,20 +66,24 @@ fn listen_endpoint_from_bind(ep: IpEndpoint) -> IpListenEndpoint { } pub(crate) struct TcpListenerShared { + family: i32, addr: SpinNoIrqLock>, port: AtomicUsize, backlog: AtomicUsize, + dual_stack_v4: AtomicBool, pending: SpinNoIrqLock>>, passive: SpinNoIrqLock>>, accept_wait: WaitQueue, } impl TcpListenerShared { - fn new(endpoint: IpListenEndpoint, backlog: usize) -> Self { + fn new(family: i32, endpoint: IpListenEndpoint, backlog: usize) -> Self { Self { + family, addr: SpinNoIrqLock::new(endpoint.addr), port: AtomicUsize::new(endpoint.port as usize), backlog: AtomicUsize::new(backlog), + dual_stack_v4: AtomicBool::new(false), pending: SpinNoIrqLock::new(VecDeque::new()), passive: SpinNoIrqLock::new(Vec::new()), accept_wait: WaitQueue::new(), @@ -85,6 +114,31 @@ impl TcpListenerShared { } } + fn stack_endpoint(&self) -> IpListenEndpoint { + let endpoint = self.endpoint(); + IpListenEndpoint { + addr: stack_listen_addr_for_family(self.family, endpoint.addr), + port: endpoint.port, + } + } + + fn set_dual_stack_v4(&self, enabled: bool) { + self.dual_stack_v4.store(enabled, Ordering::Release); + } + + fn stack_endpoints(&self) -> Vec { + let mut endpoints = Vec::new(); + let primary = self.stack_endpoint(); + endpoints.push(primary); + if self.dual_stack_v4.load(Ordering::Acquire) && self.family == super::AF_INET6 as i32 { + let base = self.endpoint(); + if base.addr.is_none() { + endpoints.push(ipv4_loopback_listen_endpoint(base.port)); + } + } + endpoints + } + fn backlog(&self) -> usize { self.backlog.load(Ordering::Acquire) } @@ -167,7 +221,9 @@ pub(crate) struct TcpSocketState { pub(crate) write_wait: WaitQueue, pub(crate) orphaned: AtomicBool, listener: SpinNoIrqLock>>, + listener_endpoint: SpinNoIrqLock>, queued_for_accept: AtomicBool, + last_state: AtomicUsize, } impl TcpSocketState { @@ -178,7 +234,9 @@ impl TcpSocketState { write_wait: WaitQueue::new(), orphaned: AtomicBool::new(false), listener: SpinNoIrqLock::new(None), + listener_endpoint: SpinNoIrqLock::new(None), queued_for_accept: AtomicBool::new(false), + last_state: AtomicUsize::new(usize::MAX), } } @@ -197,6 +255,15 @@ impl TcpSocketState { fn clear_listener(&self) { *self.listener.lock() = None; + *self.listener_endpoint.lock() = None; + } + + fn listener_endpoint(&self) -> Option { + *self.listener_endpoint.lock() + } + + fn set_listener_endpoint(&self, endpoint: IpListenEndpoint) { + *self.listener_endpoint.lock() = Some(endpoint); } pub(crate) fn is_listener_owned(&self) -> bool { @@ -212,6 +279,52 @@ impl TcpSocketState { fn clear_queued_for_accept(&self) { self.queued_for_accept.store(false, Ordering::Release); } + + pub(crate) fn observe_state_change(&self, state: tcp_socket::State) -> Option { + let next = state as usize; + let prev = self.last_state.swap(next, Ordering::AcqRel); + if prev == next { + None + } else { + Some(prev) + } + } +} + +pub(crate) fn tcp_state_name(state: tcp_socket::State) -> &'static str { + match state { + tcp_socket::State::Closed => "CLOSED", + tcp_socket::State::Listen => "LISTEN", + tcp_socket::State::SynSent => "SYN-SENT", + tcp_socket::State::SynReceived => "SYN-RECEIVED", + tcp_socket::State::Established => "ESTABLISHED", + tcp_socket::State::FinWait1 => "FIN-WAIT-1", + tcp_socket::State::FinWait2 => "FIN-WAIT-2", + tcp_socket::State::CloseWait => "CLOSE-WAIT", + tcp_socket::State::Closing => "CLOSING", + tcp_socket::State::LastAck => "LAST-ACK", + tcp_socket::State::TimeWait => "TIME-WAIT", + } +} + +pub(crate) fn tcp_state_name_repr(state: usize) -> &'static str { + if state == usize::MAX { + return ""; + } + match state { + x if x == tcp_socket::State::Closed as usize => "CLOSED", + x if x == tcp_socket::State::Listen as usize => "LISTEN", + x if x == tcp_socket::State::SynSent as usize => "SYN-SENT", + x if x == tcp_socket::State::SynReceived as usize => "SYN-RECEIVED", + x if x == tcp_socket::State::Established as usize => "ESTABLISHED", + x if x == tcp_socket::State::FinWait1 as usize => "FIN-WAIT-1", + x if x == tcp_socket::State::FinWait2 as usize => "FIN-WAIT-2", + x if x == tcp_socket::State::CloseWait as usize => "CLOSE-WAIT", + x if x == tcp_socket::State::Closing as usize => "CLOSING", + x if x == tcp_socket::State::LastAck as usize => "LAST-ACK", + x if x == tcp_socket::State::TimeWait as usize => "TIME-WAIT", + _ => "", + } } /// Called by net poll path to move one passive listener socket into the listener pending queue @@ -228,6 +341,11 @@ pub(crate) fn queue_listener_connection_if_ready( return None; } + debug!( + "Tcp listener queued established connection: handle={:?} state={}", + st.handle, + tcp_state_name(state) + ); listener.remove_passive(st.handle); listener.push_pending(Arc::clone(st)); st.clear_listener(); @@ -236,10 +354,12 @@ pub(crate) fn queue_listener_connection_if_ready( } pub(crate) struct TcpSocketFile { + family: i32, st: SpinNoIrqLock>, bound_endpoint: SpinNoIrqLock>, listening: AtomicBool, listener: SpinNoIrqLock>>, + ipv6_only: AtomicBool, recv_timeout_ns: AtomicU64, send_timeout_ns: AtomicU64, /// IPv4 multicast groups this socket has joined (per-socket membership). @@ -250,18 +370,28 @@ pub(crate) struct TcpSocketFile { } impl TcpSocketFile { - fn new(st: Arc) -> Self { + fn new(st: Arc, family: i32) -> Self { Self { + family, st: SpinNoIrqLock::new(st), bound_endpoint: SpinNoIrqLock::new(None), listening: AtomicBool::new(false), listener: SpinNoIrqLock::new(None), + ipv6_only: AtomicBool::new(false), recv_timeout_ns: AtomicU64::new(0), send_timeout_ns: AtomicU64::new(0), mcast_groups: SpinNoIrqLock::new(Vec::new()), } } + pub(crate) fn set_ipv6_only(&self, enabled: bool) { + self.ipv6_only.store(enabled, Ordering::Release); + } + + pub(crate) fn ipv6_only(&self) -> bool { + self.ipv6_only.load(Ordering::Acquire) + } + /// Join an IPv4 multicast group. Returns `false` if the socket was already /// a member of `addr` (caller maps this to `EADDRINUSE`). pub(crate) fn join_mcast_group(&self, addr: Ipv4Address) -> bool { @@ -324,9 +454,7 @@ impl TcpSocketFile { } let bound = *self.bound_endpoint.lock(); bound.map(|bound| { - let addr = bound - .addr - .unwrap_or(IpAddress::Ipv4(Ipv4Address::new(0, 0, 0, 0))); + let addr = bound.addr.unwrap_or(unspecified_addr_for_family(self.family)); IpEndpoint::new(addr, bound.port) }) } @@ -348,6 +476,28 @@ impl TcpSocketFile { self.listener.lock().as_ref().map(Arc::clone) } + fn should_dual_stack_with_ipv4(&self, endpoint: IpListenEndpoint) -> bool { + self.family == super::AF_INET6 as i32 && !self.ipv6_only() && endpoint.addr.is_none() + } + + fn choose_refill_endpoint(&self, listener: &Arc) -> IpListenEndpoint { + let endpoints = listener.stack_endpoints(); + let passive = listener.passive.lock(); + let mut best = endpoints[0]; + let mut best_count = usize::MAX; + for endpoint in endpoints { + let count = passive + .iter() + .filter(|st| st.listener_endpoint() == Some(endpoint)) + .count(); + if count < best_count { + best = endpoint; + best_count = count; + } + } + best + } + fn trim_listener_slots(&self, listener: &Arc) -> Result<(), ERRNO> { let target = listener.backlog(); let mut to_close = Vec::new(); @@ -381,17 +531,28 @@ impl TcpSocketFile { } fn refill_listener_slots(&self, listener: &Arc) -> Result<(), ERRNO> { - let target = listener.backlog(); + let target = listener.backlog().min(super::MAX_PASSIVE_LISTEN_SOCKETS); let mut guard = NET_STACK.lock(); let stack = guard.as_mut().ok_or(ERRNO::ENETDOWN)?; while listener.slot_count() < target { let (_h, st) = stack.create_tcp_socket(); + let listen_endpoint = self.choose_refill_endpoint(listener); { let socket = stack.sockets.get_mut::(st.handle); - socket.listen(listener.endpoint()).map_err(|_| ERRNO::EIO)?; + socket + .listen(listen_endpoint) + .map_err(|_| ERRNO::EIO)?; } + debug!( + "Tcp listener refill: passive_handle={:?} endpoint={:?} slots={}/{}", + st.handle, + listen_endpoint, + listener.slot_count() + 1, + target + ); st.set_listener(Some(Arc::downgrade(listener))); + st.set_listener_endpoint(listen_endpoint); listener.push_passive(Arc::clone(&st)); } @@ -412,13 +573,26 @@ impl TcpSocketFile { ep.port }; *self.bound_endpoint.lock() = Some(listen_endpoint_from_bind(IpEndpoint::new(ep.addr, port))); + debug!( + "Tcp bind: requested={} effective={:?}", + ep, + *self.bound_endpoint.lock() + ); Ok(()) } pub(crate) fn listen(&self, backlog: usize) -> Result<(), ERRNO> { let endpoint = (*self.bound_endpoint.lock()).ok_or(ERRNO::EINVAL)?; - info!("Tcp listen: endpoint={:?} backlog={}", endpoint, backlog); + info!( + "Tcp listen: endpoint={:?} stack_endpoint={:?} backlog={}", + endpoint, + IpListenEndpoint { + addr: stack_listen_addr_for_family(self.family, endpoint.addr), + port: endpoint.port, + }, + backlog + ); let backlog = normalize_backlog(backlog); let was_listening = self.listening.load(Ordering::Acquire); @@ -428,13 +602,14 @@ impl TcpSocketFile { match guard.as_ref() { Some(ls) => Arc::clone(ls), None => { - let ls = Arc::new(TcpListenerShared::new(endpoint, backlog)); + let ls = Arc::new(TcpListenerShared::new(self.family, endpoint, backlog)); *guard = Some(Arc::clone(&ls)); ls } } }; + listener.set_dual_stack_v4(self.should_dual_stack_with_ipv4(endpoint)); listener.set_endpoint(endpoint); listener.set_backlog(backlog); @@ -443,9 +618,16 @@ impl TcpSocketFile { let mut guard = NET_STACK.lock(); let stack = guard.as_mut().ok_or(ERRNO::ENETDOWN)?; if !listener.contains_passive(st.handle) && !st.is_listener_owned() { + let listen_endpoint = listener.stack_endpoints()[0]; let socket = stack.sockets.get_mut::(st.handle); - if socket.listen(endpoint).is_ok() { + if socket.listen(listen_endpoint).is_ok() { + debug!( + "Tcp listen armed primary passive socket: handle={:?} endpoint={:?}", + st.handle, + listen_endpoint + ); st.set_listener(Some(Arc::downgrade(&listener))); + st.set_listener_endpoint(listen_endpoint); listener.push_passive(Arc::clone(&st)); } else if !was_listening { return Err(ERRNO::EADDRINUSE); @@ -474,21 +656,23 @@ impl TcpSocketFile { } if let Some(st) = listener.pop_pending() { st.clear_queued_for_accept(); + debug!("Tcp accept: popped pending handle={:?}", st.handle); let mut was_closed = false; - let peer = { + let (peer, local) = { let mut guard = NET_STACK.lock(); let stack = guard.as_mut().ok_or(ERRNO::ENETDOWN)?; let socket = stack.sockets.get_mut::(st.handle); if matches!(socket.state(), tcp_socket::State::Closed | tcp_socket::State::TimeWait) { was_closed = true; - None + (None, None) } else { - socket.remote_endpoint() + (socket.remote_endpoint(), socket.local_endpoint()) } }; if was_closed { + debug!("Tcp accept: pending handle={:?} was already closed", st.handle); st.orphaned.store(true, Ordering::Release); continue; } @@ -496,10 +680,12 @@ impl TcpSocketFile { self.refill_listener_slots(&listener)?; let accepted = Arc::new(TcpSocketFile { + family: self.family, st: SpinNoIrqLock::new(Arc::clone(&st)), - bound_endpoint: SpinNoIrqLock::new(Some(listener.endpoint())), + bound_endpoint: SpinNoIrqLock::new(local.map(listen_endpoint_from_bind)), listening: AtomicBool::new(false), listener: SpinNoIrqLock::new(None), + ipv6_only: AtomicBool::new(self.ipv6_only()), recv_timeout_ns: AtomicU64::new(self.recv_timeout_ns()), send_timeout_ns: AtomicU64::new(self.send_timeout_ns()), // A freshly accepted socket must NOT inherit the listening @@ -507,6 +693,7 @@ impl TcpSocketFile { mcast_groups: SpinNoIrqLock::new(Vec::new()), }); + debug!("Tcp accept: accepted handle={:?} peer={:?}", st.handle, peer); NEED_POLL.store(true, Ordering::Release); return Ok((accepted, peer)); } @@ -553,9 +740,22 @@ impl TcpSocketFile { let st = self.state(); let socket = stack.sockets.get_mut::(st.handle); + debug!( + "Tcp connect attempt: handle={:?} local={:?} remote={} iface_addrs={:?} chosen_src={:?}", + st.handle, + local_endpoint, + ep, + stack.iface.ip_addrs(), + stack.iface.get_source_address(&ep.addr) + ); socket .connect(stack.iface.context(), ep, local_endpoint) .map_err(|_| ERRNO::EADDRINUSE)?; + debug!( + "Tcp connect submitted: handle={:?} state={}", + st.handle, + tcp_state_name(socket.state()) + ); stack.poll(); } @@ -572,8 +772,24 @@ impl TcpSocketFile { let stack = guard.as_mut().ok_or(ERRNO::ENETDOWN)?; let socket = stack.sockets.get_mut::(st.handle); match socket.state() { - tcp_socket::State::Established | tcp_socket::State::CloseWait => return Ok(()), + tcp_socket::State::Established | tcp_socket::State::CloseWait => { + debug!( + "Tcp connect complete: handle={:?} local={:?} remote={:?} state={}", + st.handle, + socket.local_endpoint(), + socket.remote_endpoint(), + tcp_state_name(socket.state()) + ); + return Ok(()); + } tcp_socket::State::Closed | tcp_socket::State::TimeWait => { + warn!( + "Tcp connect refused: handle={:?} local={:?} remote={:?} state={}", + st.handle, + socket.local_endpoint(), + socket.remote_endpoint(), + tcp_state_name(socket.state()) + ); return Err(ERRNO::ECONNREFUSED) } _ => {} @@ -656,6 +872,9 @@ impl TcpSocketFile { socket_wait_mark_ready(handle); cleanup_socket_wait(handle); } + crate::net::perf_tcp_user_recv(total); + stack.poll_socket_recv_work(); + NEED_POLL.store(true, Ordering::Release); return Ok(total); } if !socket.may_recv() { @@ -773,7 +992,8 @@ impl TcpSocketFile { } } if total > 0 { - stack.poll(); + crate::net::perf_tcp_user_send(total); + stack.poll_socket_work_for(st.handle); NEED_POLL.store(true, Ordering::Release); if let Some(handle) = timeout_handle.take() { socket_wait_mark_ready(handle); @@ -934,6 +1154,9 @@ impl File for TcpSocketFile { socket_wait_mark_ready(handle); cleanup_socket_wait(handle); } + crate::net::perf_tcp_user_recv(n); + stack.poll_socket_recv_work(); + NEED_POLL.store(true, Ordering::Release); return Ok(n); } if !socket.may_recv() { @@ -1040,7 +1263,8 @@ impl File for TcpSocketFile { } return Err(ERRNO::EIO); } - stack.poll(); + crate::net::perf_tcp_user_send(n); + stack.poll_socket_work_for(st.handle); NEED_POLL.store(true, Ordering::Release); if let Some(handle) = timeout_handle.take() { socket_wait_mark_ready(handle); @@ -1258,9 +1482,9 @@ impl Drop for TcpSocketFile { } } -pub(crate) fn create_tcp_socket_file() -> Option> { +pub(crate) fn create_tcp_socket_file(family: i32) -> Option> { let mut guard = NET_STACK.lock(); let stack = guard.as_mut()?; let (_handle, st) = stack.create_tcp_socket(); - Some(Arc::new(TcpSocketFile::new(st))) + Some(Arc::new(TcpSocketFile::new(st, family))) } diff --git a/os/src/net/udp.rs b/os/src/net/udp.rs index 6e642ca..144a75d 100644 --- a/os/src/net/udp.rs +++ b/os/src/net/udp.rs @@ -3,11 +3,11 @@ use alloc::{sync::Arc, vec::Vec}; use core::any::Any; use core::cmp::min; -use core::sync::atomic::{AtomicU64, Ordering}; +use core::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use smoltcp::socket::udp as udp_socket; use smoltcp::socket::udp::SendError; -use smoltcp::wire::{IpAddress, IpEndpoint, IpListenEndpoint, Ipv4Address}; +use smoltcp::wire::{IpAddress, IpEndpoint, IpListenEndpoint, Ipv4Address, Ipv6Address}; use crate::fs::{File, Stat, StatMode}; use crate::mm::UserBuffer; @@ -21,6 +21,8 @@ use crate::syscall::errno::ERRNO; use crate::task::{current_task, WaitQueue, WaitReason}; use crate::timer::{add_timer_with_socket_tag, get_time_ns}; +const AF_INET_FAMILY: i32 = 2; + #[inline] fn listen_endpoint_from_bind(ep: IpEndpoint) -> IpListenEndpoint { let addr = if ep.addr.is_unspecified() { @@ -31,12 +33,22 @@ fn listen_endpoint_from_bind(ep: IpEndpoint) -> IpListenEndpoint { IpListenEndpoint { addr, port: ep.port } } +#[inline] +fn unspecified_addr_for_family(family: i32) -> IpAddress { + if family == super::AF_INET6 as i32 { + IpAddress::Ipv6(Ipv6Address::UNSPECIFIED) + } else { + IpAddress::Ipv4(Ipv4Address::new(0, 0, 0, 0)) + } +} + #[inline] fn loopback_source_addr_for_peer(peer: IpEndpoint) -> Option { match peer.addr { IpAddress::Ipv4(v4) if v4.is_loopback() => { Some(IpAddress::Ipv4(Ipv4Address::new(127, 0, 0, 1))) } + IpAddress::Ipv6(v6) if v6.is_loopback() => Some(IpAddress::Ipv6(Ipv6Address::LOCALHOST)), _ => None, } } @@ -62,28 +74,42 @@ impl UdpSocketState { } pub(crate) struct UdpSocketFile { + family: i32, st: Arc, + bound_endpoint: SpinNoIrqLock>, connected: SpinNoIrqLock>, + ipv6_only: AtomicBool, recv_timeout_ns: AtomicU64, send_timeout_ns: AtomicU64, } impl UdpSocketFile { - fn new(st: Arc) -> Self { + fn new(st: Arc, family: i32) -> Self { Self { + family, st, + bound_endpoint: SpinNoIrqLock::new(None), connected: SpinNoIrqLock::new(None), + ipv6_only: AtomicBool::new(false), recv_timeout_ns: AtomicU64::new(0), send_timeout_ns: AtomicU64::new(0), } } + pub(crate) fn set_ipv6_only(&self, enabled: bool) { + self.ipv6_only.store(enabled, Ordering::Release); + } + + pub(crate) fn ipv6_only(&self) -> bool { + self.ipv6_only.load(Ordering::Acquire) + } + pub(crate) fn recv_buffer_size(&self) -> usize { - super::UDP_BUF + super::UDP_RX_BUF } pub(crate) fn send_buffer_size(&self) -> usize { - super::UDP_BUF + super::UDP_TX_BUF } pub(crate) fn set_recv_timeout_ns(&self, timeout_ns: u64) { @@ -104,13 +130,28 @@ impl UdpSocketFile { /// Return the local (bound) endpoint of this UDP socket. If not bound, returns None. pub(crate) fn local_endpoint(&self) -> Option { + if self.connected.lock().is_some() { + let mut guard = crate::net::NET_STACK.lock(); + let stack = guard.as_mut()?; + let socket = stack.sockets.get_mut::(self.st.handle); + let listen = socket.endpoint(); + let addr = listen.addr.unwrap_or(unspecified_addr_for_family(self.family)); + return Some(IpEndpoint::new(addr, listen.port)); + } + + if let Some(bound) = *self.bound_endpoint.lock() { + let addr = bound.addr.unwrap_or(unspecified_addr_for_family(self.family)); + return Some(IpEndpoint::new(addr, bound.port)); + } + let mut guard = crate::net::NET_STACK.lock(); let stack = guard.as_mut()?; let socket = stack.sockets.get_mut::(self.st.handle); let listen = socket.endpoint(); - let addr = listen - .addr - .unwrap_or(IpAddress::Ipv4(Ipv4Address::new(0, 0, 0, 0))); + if listen.port == 0 { + return None; + } + let addr = listen.addr.unwrap_or(unspecified_addr_for_family(self.family)); Some(IpEndpoint::new(addr, listen.port)) } @@ -128,10 +169,33 @@ impl UdpSocketFile { ep.port }; let bind_ep = listen_endpoint_from_bind(IpEndpoint::new(ep.addr, port)); - debug!("UDP socket {:?} binding to {:?}", self.st.handle, bind_ep); + let stack_ep = if bind_ep.addr.is_some() { + bind_ep + } else if self.family == AF_INET_FAMILY { + IpListenEndpoint { + addr: Some(IpAddress::Ipv4(Ipv4Address::new(127, 0, 0, 1))), + port: bind_ep.port, + } + } else if self.family == super::AF_INET6 as i32 && self.ipv6_only() { + IpListenEndpoint { + addr: Some(IpAddress::Ipv6(Ipv6Address::LOCALHOST)), + port: bind_ep.port, + } + } else { + // AF_INET6 wildcard with IPV6_V6ONLY=0 keeps addr=None so one UDP + // socket can accept both IPv6 and IPv4 loopback datagrams. + bind_ep + }; + debug!( + "UDP socket {:?} binding to {:?} stack_ep={:?}", + self.st.handle, + bind_ep, + stack_ep + ); let socket = stack.sockets.get_mut::(self.st.handle); - match socket.bind(bind_ep) { + match socket.bind(stack_ep) { Ok(()) => { + *self.bound_endpoint.lock() = Some(bind_ep); debug!("UDP socket {:?} bind succeeded", self.st.handle); Ok(()) } @@ -194,9 +258,17 @@ impl UdpSocketFile { let mut guard = NET_STACK.lock(); let stack = guard.as_mut().ok_or(ERRNO::ENETDOWN)?; self.ensure_bound_for_send_locked(stack, ep)?; - let socket = stack.sockets.get_mut::(self.st.handle); + if ep.port != 0 && stack.deliver_udp_loopback(self.st.handle, ep, data) { + if let Some(handle) = timeout_handle.take() { + socket_wait_mark_ready(handle); + cleanup_socket_wait(handle); + } + crate::net::perf_udp_user_send(data.len()); + return Ok(data.len()); + } // Check if socket can send + let socket = stack.sockets.get_mut::(self.st.handle); let can_send = socket.can_send(); if can_send { @@ -214,6 +286,7 @@ impl UdpSocketFile { socket_wait_mark_ready(handle); cleanup_socket_wait(handle); } + crate::net::perf_udp_user_send(data.len()); return Ok(data.len()); } Err(e) => { @@ -335,6 +408,7 @@ impl UdpSocketFile { socket_wait_mark_ready(handle); cleanup_socket_wait(handle); } + crate::net::perf_udp_user_recv(off); return Ok((off, meta.endpoint)); } } @@ -462,6 +536,7 @@ impl File for UdpSocketFile { socket_wait_mark_ready(handle); cleanup_socket_wait(handle); } + crate::net::perf_udp_user_recv(n); return Ok(n); } } @@ -591,9 +666,9 @@ impl Drop for UdpSocketFile { } } -pub(crate) fn create_udp_socket_file() -> Option> { +pub(crate) fn create_udp_socket_file(family: i32) -> Option> { let mut guard = NET_STACK.lock(); let stack = guard.as_mut()?; let (_handle, st) = stack.create_udp_socket(); - Some(Arc::new(UdpSocketFile::new(st))) + Some(Arc::new(UdpSocketFile::new(st, family))) } diff --git a/os/src/syscall/net.rs b/os/src/syscall/net.rs index 0b56422..a657a37 100644 --- a/os/src/syscall/net.rs +++ b/os/src/syscall/net.rs @@ -1,7 +1,7 @@ use alloc::{sync::Arc, vec::Vec}; use strum_macros::FromRepr; use core::{mem::size_of, slice}; -use smoltcp::wire::{IpAddress, IpEndpoint, Ipv4Address}; +use smoltcp::wire::{IpAddress, IpEndpoint, Ipv4Address, Ipv6Address}; use crate::syscall::times::TimeVal; use crate::fs::{ make_pipe, AccessMode, File, FileDescription, FileStatusFlags, SocketSpec, @@ -48,8 +48,7 @@ const IPPROTO_TCP: i32 = 6; const IPPROTO_UDP: i32 = 17; const IPPROTO_SCTP: i32 = 132; const IPPROTO_UDPLITE: i32 = 136; -const IPV6_ANY: [u8; 16] = [0; 16]; -const IPV6_LOOPBACK: [u8; 16] = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]; +const IPV6_V6ONLY: i32 = 26; // IP-level (SOL_IP) multicast group membership options. These use a // `struct group_req { __u32 gr_interface; struct sockaddr_storage gr_group; }` @@ -570,6 +569,15 @@ fn sockaddr_to_endpoint(addr: &SockAddrIn) -> Result { Ok(IpEndpoint::new(IpAddress::Ipv4(ip), port)) } +#[inline] +fn unspecified_endpoint_for_family(family: i32) -> IpEndpoint { + if family == AF_INET6 as i32 { + IpEndpoint::new(IpAddress::Ipv6(Ipv6Address::UNSPECIFIED), 0) + } else { + IpEndpoint::new(IpAddress::Ipv4(Ipv4Address::new(0, 0, 0, 0)), 0) + } +} + fn endpoint_to_sockaddr(ep: IpEndpoint) -> SockAddrIn { let (sin_addr, sin_port) = match ep.addr { IpAddress::Ipv4(v4) => { @@ -580,6 +588,7 @@ fn endpoint_to_sockaddr(ep: IpEndpoint) -> SockAddrIn { // and big endian hosts. (u32::from_ne_bytes([b[0], b[1], b[2], b[3]]), ep.port.to_be()) } + IpAddress::Ipv6(_) => (u32::from_ne_bytes([0, 0, 0, 0]), ep.port.to_be()), }; SockAddrIn { sin_family: AF_INET, @@ -589,22 +598,42 @@ fn endpoint_to_sockaddr(ep: IpEndpoint) -> SockAddrIn { } } +#[inline] +fn ipv4_mapped_ipv6(v4: Ipv4Address) -> [u8; 16] { + let octets = v4.octets(); + [ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, octets[0], octets[1], octets[2], octets[3], + ] +} + +#[inline] +fn ipv6_mapped_to_ipv4(bytes: [u8; 16]) -> Option { + if bytes[..10] == [0; 10] && bytes[10] == 0xff && bytes[11] == 0xff { + Some(Ipv4Address::new(bytes[12], bytes[13], bytes[14], bytes[15])) + } else { + None + } +} + fn sockaddr_in6_to_endpoint(addr: &SockAddrIn6) -> Result { if addr.sin6_family != AF_INET6 { return Err(ERRNO::EAFNOSUPPORT); } - let ip = match addr.sin6_addr { - IPV6_ANY => Ipv4Address::new(0, 0, 0, 0), - IPV6_LOOPBACK => Ipv4Address::new(127, 0, 0, 1), - _ => return Err(ERRNO::EADDRNOTAVAIL), - }; - Ok(IpEndpoint::new(IpAddress::Ipv4(ip), u16::from_be(addr.sin6_port))) + if addr.sin6_scope_id != 0 { + return Err(ERRNO::EOPNOTSUPP); + } + let port = u16::from_be(addr.sin6_port); + if let Some(v4) = ipv6_mapped_to_ipv4(addr.sin6_addr) { + return Ok(IpEndpoint::new(IpAddress::Ipv4(v4), port)); + } + let ip = Ipv6Address::from(addr.sin6_addr); + Ok(IpEndpoint::new(IpAddress::Ipv6(ip), port)) } fn endpoint_to_sockaddr_in6(ep: IpEndpoint) -> SockAddrIn6 { let addr = match ep.addr { - IpAddress::Ipv4(v4) if v4.octets() == [0, 0, 0, 0] => IPV6_ANY, - IpAddress::Ipv4(_) => IPV6_LOOPBACK, + IpAddress::Ipv4(v4) => ipv4_mapped_ipv6(v4), + IpAddress::Ipv6(v6) => v6.octets(), }; SockAddrIn6 { sin6_family: AF_INET6, @@ -858,12 +887,14 @@ fn sockaddr_un_bytes(addr: &[u8]) -> Vec { out } -fn is_local_ipv4_bind_addr(addr: IpAddress) -> bool { - match addr { - IpAddress::Ipv4(v4) => { +fn is_local_bind_addr(spec: SocketSpec, addr: IpAddress) -> bool { + match (spec.family, addr) { + (x, IpAddress::Ipv4(v4)) if x == AF_INET as i32 => { let octets = v4.octets(); octets == [0, 0, 0, 0] || octets[0] == 127 } + (x, IpAddress::Ipv6(v6)) if x == AF_INET6 as i32 => v6.is_unspecified() || v6.is_loopback(), + _ => false, } } @@ -1134,7 +1165,7 @@ pub fn sys_socket(domain: i32, socket_type: i32, protocol: i32) -> isize { let (file, spec): (Arc, SocketSpec) = match domain { x if x == AF_INET as i32 => match base_type { SOCK_DGRAM => ( - create_udp_socket_file() + create_udp_socket_file(domain) .map(|f| f as Arc) .ok_or(ERRNO::ENETDOWN)?, SocketSpec { @@ -1144,7 +1175,7 @@ pub fn sys_socket(domain: i32, socket_type: i32, protocol: i32) -> isize { }, ), SOCK_STREAM => ( - create_tcp_socket_file() + create_tcp_socket_file(domain) .map(|f| f as Arc) .ok_or(ERRNO::ENETDOWN)?, SocketSpec { @@ -1161,7 +1192,7 @@ pub fn sys_socket(domain: i32, socket_type: i32, protocol: i32) -> isize { return Err(ERRNO::EPROTONOSUPPORT); } ( - create_udp_socket_file() + create_udp_socket_file(domain) .map(|f| f as Arc) .ok_or(ERRNO::ENETDOWN)?, SocketSpec { @@ -1176,7 +1207,7 @@ pub fn sys_socket(domain: i32, socket_type: i32, protocol: i32) -> isize { return Err(ERRNO::EPROTONOSUPPORT); } ( - create_tcp_socket_file() + create_tcp_socket_file(domain) .map(|f| f as Arc) .ok_or(ERRNO::ENETDOWN)?, SocketSpec { @@ -1377,7 +1408,7 @@ pub fn sys_bind(fd: i32, addr: *const SockAddrIn, addrlen: i32) -> isize { if ep.port < 1024 && ep.port != 0 && current_process().geteuid() != 0 { return Err(ERRNO::EACCES); } - if !is_local_ipv4_bind_addr(ep.addr) { + if !is_local_bind_addr(spec, ep.addr) { return Err(ERRNO::EADDRNOTAVAIL); } match socket_backend(fd)? { @@ -1509,7 +1540,7 @@ pub fn sys_getsockname(fd: i32, addr: *mut SockAddrIn, addrlen: *mut i32) -> isi with_udp_socket(fd, |udp| { let ep = udp .local_endpoint() - .unwrap_or(IpEndpoint::new(IpAddress::Ipv4(Ipv4Address::new(0, 0, 0, 0)), 0)); + .unwrap_or(unspecified_endpoint_for_family(socket_spec(fd)?.family)); copy_endpoint_to_socket_user(socket_spec(fd)?, addr, addrlen, ep)?; Ok(()) })?; @@ -1518,7 +1549,7 @@ pub fn sys_getsockname(fd: i32, addr: *mut SockAddrIn, addrlen: *mut i32) -> isi with_tcp_socket(fd, |tcp| { let ep = tcp .local_endpoint() - .unwrap_or(IpEndpoint::new(IpAddress::Ipv4(Ipv4Address::new(0, 0, 0, 0)), 0)); + .unwrap_or(unspecified_endpoint_for_family(socket_spec(fd)?.family)); copy_endpoint_to_socket_user(socket_spec(fd)?, addr, addrlen, ep)?; Ok(()) })?; @@ -1730,7 +1761,7 @@ pub fn sys_recvfrom( SocketBackendKind::Tcp => { let n = with_tcp_socket(fd, |tcp| tcp.recv_into_user_buffer(&mut ubuf))?; let ep = if addr.is_null() { - IpEndpoint::new(IpAddress::Ipv4(Ipv4Address::new(0, 0, 0, 0)), 0) + unspecified_endpoint_for_family(socket_spec(fd)?.family) } else { with_tcp_socket(fd, |tcp| Ok(tcp.remote_endpoint()))? .ok_or(ERRNO::ENOTCONN)? @@ -1741,7 +1772,7 @@ pub fn sys_recvfrom( with_netlink_route_socket(fd, |netlink| { netlink.recv_into_user_buffer(&mut ubuf, (flags & MSG_PEEK) != 0) })?, - IpEndpoint::new(IpAddress::Ipv4(Ipv4Address::new(0, 0, 0, 0)), 0), + unspecified_endpoint_for_family(AF_INET as i32), ), SocketBackendKind::Packet => { let (n, sockaddr) = @@ -1991,11 +2022,35 @@ pub fn sys_setsockopt(fd: i32, level: i32, optname: i32, optval: *const u8, optl } }, Some(SocketLevel::IpProtoIpv6) => { - if spec.family != AF_INET6 as i32 || spec.socket_type != SOCK_RAW { + if spec.family != AF_INET6 as i32 { return Err(ERRNO::ENOPROTOOPT); } match optname { + IPV6_V6ONLY => { + let token = current_user_token(); + let enabled = read_sockopt_i32(token, optval, optlen)? != 0; + match backend { + SocketBackendKind::Udp => { + with_udp_socket(fd, |udp| { + udp.set_ipv6_only(enabled); + Ok(()) + })?; + } + SocketBackendKind::Tcp => { + with_tcp_socket(fd, |tcp| { + tcp.set_ipv6_only(enabled); + Ok(()) + })?; + } + SocketBackendKind::RawIpv6 => {} + _ => return Err(ERRNO::ENOPROTOOPT), + } + Ok(0) + } IPV6_CHECKSUM => { + if backend != SocketBackendKind::RawIpv6 { + return Err(ERRNO::ENOPROTOOPT); + } let token = current_user_token(); let offset = read_sockopt_i32(token, optval, optlen)?; with_raw_ipv6_socket(fd, |raw| raw.set_checksum_offset(offset))?; @@ -2005,6 +2060,9 @@ pub fn sys_setsockopt(fd: i32, level: i32, optname: i32, optval: *const u8, optl | IPV6_RECVDSTOPTS | IPV6_RECVTCLASS | IPV6_2292PKTINFO | IPV6_2292HOPLIMIT | IPV6_2292RTHDR | IPV6_2292HOPOPTS | IPV6_2292DSTOPTS => { + if backend != SocketBackendKind::RawIpv6 { + return Err(ERRNO::ENOPROTOOPT); + } let token = current_user_token(); let enabled = read_sockopt_i32(token, optval, optlen)? != 0; with_raw_ipv6_socket(fd, |raw| raw.set_bool_option(optname, enabled))?; @@ -2232,11 +2290,38 @@ pub fn sys_getsockopt(fd: i32, level: i32, optname: i32, optval: *mut u8, optlen } }, Some(SocketLevel::IpProtoIpv6) => { - if spec.family != AF_INET6 as i32 || spec.socket_type != SOCK_RAW { + if spec.family != AF_INET6 as i32 { return Err(ERRNO::ENOPROTOOPT); } match optname { + IPV6_V6ONLY => { + let value = match backend { + SocketBackendKind::Udp => { + let mut value = 0i32; + with_udp_socket(fd, |udp| { + value = if udp.ipv6_only() { 1 } else { 0 }; + Ok(()) + })?; + value + } + SocketBackendKind::Tcp => { + let mut value = 0i32; + with_tcp_socket(fd, |tcp| { + value = if tcp.ipv6_only() { 1 } else { 0 }; + Ok(()) + })?; + value + } + SocketBackendKind::RawIpv6 => 1, + _ => return Err(ERRNO::ENOPROTOOPT), + }; + write_getsockopt_i32(token, optval, optlen, value)?; + Ok(0) + } IPV6_CHECKSUM => { + if backend != SocketBackendKind::RawIpv6 { + return Err(ERRNO::ENOPROTOOPT); + } let value = with_raw_ipv6_socket(fd, |raw| Ok(raw.checksum_offset()))?; write_getsockopt_i32(token, optval, optlen, value)?; Ok(0) @@ -2245,6 +2330,9 @@ pub fn sys_getsockopt(fd: i32, level: i32, optname: i32, optval: *mut u8, optlen | IPV6_RECVDSTOPTS | IPV6_RECVTCLASS | IPV6_2292PKTINFO | IPV6_2292HOPLIMIT | IPV6_2292RTHDR | IPV6_2292HOPOPTS | IPV6_2292DSTOPTS => { + if backend != SocketBackendKind::RawIpv6 { + return Err(ERRNO::ENOPROTOOPT); + } let value = with_raw_ipv6_socket(fd, |raw| raw.get_bool_option(optname))?; write_getsockopt_i32(token, optval, optlen, value)?; Ok(0) diff --git a/user/src/bin/disk_perf.rs b/user/src/bin/disk_perf.rs new file mode 100644 index 0000000..e1eba70 --- /dev/null +++ b/user/src/bin/disk_perf.rs @@ -0,0 +1,364 @@ +#![no_std] +#![no_main] + +extern crate alloc; + +#[macro_use] +extern crate user_lib; + +use alloc::format; +use alloc::string::String; +use core::cmp::min; +use user_lib::{ + close, fsync, ftruncate, get_time, getpid, open, pread64, pwrite64, read, sync, unlink, write, + OpenFlags, +}; + +const SEQ_BLOCK: usize = 16 * 1024; +const RAND_BLOCK: usize = 4 * 1024; + +struct Case { + name: &'static str, + size: usize, +} + +const CASES: &[Case] = &[ + Case { + name: "small", + size: 64 * 1024, + }, + Case { + name: "medium", + size: 1024 * 1024, + }, + Case { + name: "large", + size: 8 * 1024 * 1024, + }, + Case { + name: "xlarge", + size: 32 * 1024 * 1024, + }, +]; + +#[derive(Clone, Copy)] +struct Report { + bytes: usize, + ops: usize, + ms: isize, + checksum: u64, +} + +fn elapsed_ms(start: isize) -> isize { + let end = get_time(); + if end > start { + end - start + } else { + 1 + } +} + +fn fill_pattern(buf: &mut [u8], seed: u8) { + for (i, byte) in buf.iter_mut().enumerate() { + *byte = seed.wrapping_add((i as u8).wrapping_mul(37)); + } +} + +fn checksum(buf: &[u8]) -> u64 { + let mut sum = 0u64; + for &byte in buf { + sum = sum.wrapping_add(byte as u64); + } + sum +} + +fn next_rand(state: &mut u64) -> u64 { + *state = state + .wrapping_mul(6364136223846793005) + .wrapping_add(1442695040888963407); + *state +} + +fn print_rate(case: &Case, op: &str, report: Report) { + let ms = if report.ms <= 0 { 1 } else { report.ms as u64 }; + let bytes = report.bytes as u64; + let mbps_x100 = bytes.saturating_mul(100_000) / (1024 * 1024) / ms; + let iops = (report.ops as u64).saturating_mul(1000) / ms; + + println!( + "{:>6} {:>11}: {:>8} KiB, {:>5} ops, {:>5} ms, {:>5}.{:02} MiB/s, {:>6} ops/s, sum={}", + case.name, + op, + report.bytes / 1024, + report.ops, + report.ms, + mbps_x100 / 100, + mbps_x100 % 100, + iops, + report.checksum + ); +} + +fn make_path(base: &str, case: &Case) -> String { + if base.as_bytes().last() == Some(&b'/') { + format!("{}disk_perf_{}_{}.dat", base, getpid(), case.name) + } else { + format!("{}/disk_perf_{}_{}.dat", base, getpid(), case.name) + } +} + +fn write_all(fd: usize, buf: &[u8]) -> bool { + let mut done = 0usize; + while done < buf.len() { + let n = write(fd, &buf[done..]); + if n <= 0 { + println!("disk_perf: write failed: {}", n); + return false; + } + done += n as usize; + } + true +} + +fn read_all(fd: usize, buf: &mut [u8]) -> bool { + let mut done = 0usize; + while done < buf.len() { + let n = read(fd, &mut buf[done..]); + if n <= 0 { + println!("disk_perf: read failed: {}", n); + return false; + } + done += n as usize; + } + true +} + +fn pwrite_all(fd: usize, buf: &[u8], offset: usize) -> bool { + let mut done = 0usize; + while done < buf.len() { + let n = pwrite64(fd, &buf[done..], offset + done); + if n <= 0 { + println!("disk_perf: pwrite64 failed: {}", n); + return false; + } + done += n as usize; + } + true +} + +fn pread_all(fd: usize, buf: &mut [u8], offset: usize) -> bool { + let mut done = 0usize; + while done < buf.len() { + let n = pread64(fd, &mut buf[done..], offset + done); + if n <= 0 { + println!("disk_perf: pread64 failed: {}", n); + return false; + } + done += n as usize; + } + true +} + +fn bench_seq_write(path: &str, case: &Case, buf: &mut [u8]) -> Option { + let fd = open(path, OpenFlags::CREATE | OpenFlags::TRUNC | OpenFlags::WRONLY); + if fd < 0 { + println!("disk_perf: open write {} failed: {}", path, fd); + return None; + } + let fd = fd as usize; + let start = get_time(); + let mut left = case.size; + let mut bytes = 0usize; + let mut ops = 0usize; + let mut sum = 0u64; + + while left > 0 { + let n = min(left, SEQ_BLOCK); + fill_pattern(&mut buf[..n], ops as u8); + if !write_all(fd, &buf[..n]) { + let _ = close(fd); + return None; + } + sum = sum.wrapping_add(checksum(&buf[..n])); + bytes += n; + ops += 1; + left -= n; + } + let sync_ret = fsync(fd); + let ms = elapsed_ms(start); + let _ = close(fd); + if sync_ret < 0 { + println!("disk_perf: fsync after sequential write failed: {}", sync_ret); + return None; + } + Some(Report { + bytes, + ops, + ms, + checksum: sum, + }) +} + +fn bench_seq_read(path: &str, case: &Case, buf: &mut [u8]) -> Option { + let fd = open(path, OpenFlags::RDONLY); + if fd < 0 { + println!("disk_perf: open read {} failed: {}", path, fd); + return None; + } + let fd = fd as usize; + let start = get_time(); + let mut left = case.size; + let mut bytes = 0usize; + let mut ops = 0usize; + let mut sum = 0u64; + + while left > 0 { + let n = min(left, SEQ_BLOCK); + if !read_all(fd, &mut buf[..n]) { + let _ = close(fd); + return None; + } + sum = sum.wrapping_add(checksum(&buf[..n])); + bytes += n; + ops += 1; + left -= n; + } + let ms = elapsed_ms(start); + let _ = close(fd); + Some(Report { + bytes, + ops, + ms, + checksum: sum, + }) +} + +fn bench_rand_write(path: &str, case: &Case, buf: &mut [u8]) -> Option { + let fd = open(path, OpenFlags::RDWR); + if fd < 0 { + println!("disk_perf: open random write {} failed: {}", path, fd); + return None; + } + let fd = fd as usize; + if ftruncate(fd, case.size as isize) < 0 { + println!("disk_perf: ftruncate failed"); + let _ = close(fd); + return None; + } + + let blocks = case.size / RAND_BLOCK; + let mut state = 0x9e37_79b9_7f4a_7c15u64 ^ case.size as u64; + let start = get_time(); + let mut sum = 0u64; + + for op in 0..blocks { + let block = (next_rand(&mut state) as usize) % blocks; + let offset = block * RAND_BLOCK; + fill_pattern(&mut buf[..RAND_BLOCK], (op ^ block) as u8); + if !pwrite_all(fd, &buf[..RAND_BLOCK], offset) { + let _ = close(fd); + return None; + } + sum = sum.wrapping_add(checksum(&buf[..RAND_BLOCK])); + } + let sync_ret = fsync(fd); + let ms = elapsed_ms(start); + let _ = close(fd); + if sync_ret < 0 { + println!("disk_perf: fsync after random write failed: {}", sync_ret); + return None; + } + Some(Report { + bytes: blocks * RAND_BLOCK, + ops: blocks, + ms, + checksum: sum, + }) +} + +fn bench_rand_read(path: &str, case: &Case, buf: &mut [u8]) -> Option { + let fd = open(path, OpenFlags::RDONLY); + if fd < 0 { + println!("disk_perf: open random read {} failed: {}", path, fd); + return None; + } + let fd = fd as usize; + let blocks = case.size / RAND_BLOCK; + let mut state = 0x243f_6a88_85a3_08d3u64 ^ case.size as u64; + let start = get_time(); + let mut sum = 0u64; + + for _ in 0..blocks { + let block = (next_rand(&mut state) as usize) % blocks; + let offset = block * RAND_BLOCK; + if !pread_all(fd, &mut buf[..RAND_BLOCK], offset) { + let _ = close(fd); + return None; + } + sum = sum.wrapping_add(checksum(&buf[..RAND_BLOCK])); + } + let ms = elapsed_ms(start); + let _ = close(fd); + Some(Report { + bytes: blocks * RAND_BLOCK, + ops: blocks, + ms, + checksum: sum, + }) +} + +#[no_mangle] +pub fn main(argc: usize, argv: &[&str]) -> i32 { + let base = if argc > 1 { argv[1] } else { "." }; + let mut buf = [0u8; SEQ_BLOCK]; + + println!( + "disk_perf: base={}, seq_block={} KiB, rand_block={} KiB", + base, + SEQ_BLOCK / 1024, + RAND_BLOCK / 1024 + ); + println!("disk_perf: write numbers include fsync; read numbers may benefit from cache"); + + for case in CASES { + let path = make_path(base, case); + let _ = unlink(&path); + + if let Some(report) = bench_seq_write(&path, case, &mut buf) { + print_rate(case, "seq write", report); + } else { + let _ = unlink(&path); + return -1; + } + + let _ = sync(); + + if let Some(report) = bench_seq_read(&path, case, &mut buf) { + print_rate(case, "seq read", report); + } else { + let _ = unlink(&path); + return -1; + } + + if let Some(report) = bench_rand_write(&path, case, &mut buf) { + print_rate(case, "rand write", report); + } else { + let _ = unlink(&path); + return -1; + } + + let _ = sync(); + + if let Some(report) = bench_rand_read(&path, case, &mut buf) { + print_rate(case, "rand read", report); + } else { + let _ = unlink(&path); + return -1; + } + + let _ = unlink(&path); + } + + println!("disk_perf: done"); + 0 +} diff --git a/user/src/lib.rs b/user/src/lib.rs index b91b06c..3508219 100644 --- a/user/src/lib.rs +++ b/user/src/lib.rs @@ -347,6 +347,34 @@ pub fn write(fd: usize, buf: &[u8]) -> isize { sys_write(fd, buf) } +pub const SEEK_SET: usize = 0; +pub const SEEK_CUR: usize = 1; +pub const SEEK_END: usize = 2; + +pub fn lseek(fd: usize, offset: isize, whence: usize) -> isize { + sys_lseek(fd, offset, whence) +} + +pub fn pread64(fd: usize, buf: &mut [u8], offset: usize) -> isize { + sys_pread64(fd, buf, offset) +} + +pub fn pwrite64(fd: usize, buf: &[u8], offset: usize) -> isize { + sys_pwrite64(fd, buf, offset) +} + +pub fn sync() -> isize { + sys_sync() +} + +pub fn fsync(fd: usize) -> isize { + sys_fsync(fd) +} + +pub fn fdatasync(fd: usize) -> isize { + sys_fdatasync(fd) +} + pub fn link(old_path: &str, new_path: &str) -> isize { let old_path = to_cstring(old_path); let new_path = to_cstring(new_path); diff --git a/user/src/syscall.rs b/user/src/syscall.rs index 9288392..08f2474 100644 --- a/user/src/syscall.rs +++ b/user/src/syscall.rs @@ -18,11 +18,17 @@ pub const SYSCALL_OPENAT: usize = 56; pub const SYSCALL_CLOSE: usize = 57; pub const SYSCALL_PIPE: usize = 59; pub const SYSCALL_GETDENTS64: usize = 61; +pub const SYSCALL_LSEEK: usize = 62; pub const SYSCALL_READ: usize = 63; pub const SYSCALL_WRITE: usize = 64; +pub const SYSCALL_PREAD64: usize = 67; +pub const SYSCALL_PWRITE64: usize = 68; pub const SYSCALL_READLINKAT: usize = 78; pub const SYSCALL_NEWFSTATAT: usize = 79; pub const SYSCALL_FSTAT: usize = 80; +pub const SYSCALL_SYNC: usize = 81; +pub const SYSCALL_FSYNC: usize = 82; +pub const SYSCALL_FDATASYNC: usize = 83; pub const SYSCALL_EXIT: usize = 93; pub const SYSCALL_SLEEP: usize = 101; pub const SYSCALL_GETITIMER: usize = 102; @@ -203,6 +209,43 @@ pub fn sys_write(fd: usize, buffer: &[u8]) -> isize { syscall(SYSCALL_WRITE, [fd, buffer.as_ptr() as usize, buffer.len()]) } +pub fn sys_lseek(fd: usize, offset: isize, whence: usize) -> isize { + syscall(SYSCALL_LSEEK, [fd, offset as usize, whence]) +} + +pub fn sys_pread64(fd: usize, buffer: &mut [u8], offset: usize) -> isize { + syscall6( + SYSCALL_PREAD64, + [ + fd, + buffer.as_mut_ptr() as usize, + buffer.len(), + offset, + 0, + 0, + ], + ) +} + +pub fn sys_pwrite64(fd: usize, buffer: &[u8], offset: usize) -> isize { + syscall6( + SYSCALL_PWRITE64, + [fd, buffer.as_ptr() as usize, buffer.len(), offset, 0, 0], + ) +} + +pub fn sys_sync() -> isize { + syscall(SYSCALL_SYNC, [0, 0, 0]) +} + +pub fn sys_fsync(fd: usize) -> isize { + syscall(SYSCALL_FSYNC, [fd, 0, 0]) +} + +pub fn sys_fdatasync(fd: usize) -> isize { + syscall(SYSCALL_FDATASYNC, [fd, 0, 0]) +} + pub fn sys_linkat( old_dirfd: usize, old_path: &str, diff --git a/vendor/smoltcp/src/iface/interface/ipv6.rs b/vendor/smoltcp/src/iface/interface/ipv6.rs index 1bf6066..aca3982 100644 --- a/vendor/smoltcp/src/iface/interface/ipv6.rs +++ b/vendor/smoltcp/src/iface/interface/ipv6.rs @@ -150,11 +150,10 @@ impl InterfaceInner { pub fn has_solicited_node(&self, addr: Ipv6Address) -> bool { self.ip_addrs.iter().any(|cidr| { match *cidr { - IpCidr::Ipv6(cidr) if cidr.address() != Ipv6Address::LOCALHOST => { - // Take the lower order 24 bits of the IPv6 address and - // append those bits to FF02:0:0:0:0:1:FF00::/104. - addr.octets()[14..] == cidr.address().octets()[14..] - } + // Exact solicited-node matching keeps localhost (::1) working + // on Ethernet loopback and avoids false positives from only + // comparing a suffix of the address. + IpCidr::Ipv6(cidr) => addr == cidr.address().solicited_node(), _ => false, } }) diff --git a/vendor/smoltcp/src/socket/udp.rs b/vendor/smoltcp/src/socket/udp.rs index 824bad8..4cfc8a6 100644 --- a/vendor/smoltcp/src/socket/udp.rs +++ b/vendor/smoltcp/src/socket/udp.rs @@ -442,6 +442,23 @@ impl<'a> Socket<'a> { Ok(()) } + /// Enqueue a received packet directly into the receive buffer. + /// + /// This is useful for host-side loopback paths that have already performed + /// address and port selection and do not need to synthesize a full IP/UDP + /// frame just to feed it back into the socket layer. + pub fn inject_recv_slice( + &mut self, + data: &[u8], + meta: impl Into, + ) -> Result<(), SendError> { + self.rx_buffer + .enqueue(data.len(), meta.into()) + .map_err(|_| SendError::BufferFull)? + .copy_from_slice(data); + Ok(()) + } + /// Dequeue a packet received from a remote endpoint, and return the endpoint as well /// as a pointer to the payload. ///