Implement the `queue_rqs` callback for rnull, allowing the block layer to submit multiple requests in a single call. This improves performance by reducing per-request overhead and enabling batch processing. The implementation processes requests from the list one at a time, removing successfully processed requests from the list. Signed-off-by: Andreas Hindborg --- drivers/block/rnull/disk_storage.rs | 34 +++---- drivers/block/rnull/rnull.rs | 180 +++++++++++++++++++++++------------- 2 files changed, 132 insertions(+), 82 deletions(-) diff --git a/drivers/block/rnull/disk_storage.rs b/drivers/block/rnull/disk_storage.rs index b2b5eaa783cdc..d9f2703957fc0 100644 --- a/drivers/block/rnull/disk_storage.rs +++ b/drivers/block/rnull/disk_storage.rs @@ -86,7 +86,7 @@ pub(crate) fn discard( } } - pub(crate) fn flush(&self, hw_data: &Pin<&SpinLock>) -> Result { + pub(crate) fn flush(&self, hw_data: &Pin<&SpinLock>) { let mut tree_guard = self.lock(); let mut hw_data_guard = hw_data.lock(); let mut access = self.access(&mut tree_guard, &mut hw_data_guard, None); @@ -129,16 +129,10 @@ fn to_sector(index: usize) -> u64 { (index << block::PAGE_SECTORS_SHIFT) as u64 } - fn extract_cache_page(&mut self) -> Result>> { + fn extract_cache_page(&mut self) -> Option> { let cache_entry = self.cache_guard.find_next_entry_circular( self.disk_storage.next_flush_sector.load(ordering::Relaxed) as usize, - ); - - let cache_entry = if let Some(entry) = cache_entry { - entry - } else { - return Ok(None); - }; + )?; let index = cache_entry.index(); @@ -168,11 +162,14 @@ fn extract_cache_page(&mut self) -> Result>> { let mut src = cache_entry; let mut offset = 0; for _ in 0..PAGE_SECTORS { - src.page_mut().get_pin_mut().copy_to_page( - disk_entry.page_mut().get_pin_mut(), - offset, - block::SECTOR_SIZE as usize, - )?; + src.page_mut() + .get_pin_mut() + .copy_to_page( + disk_entry.page_mut().get_pin_mut(), + offset, + block::SECTOR_SIZE as usize, + ) + .expect("Write to succeed"); offset += block::SECTOR_SIZE as usize; } src.remove() @@ -182,16 +179,15 @@ fn extract_cache_page(&mut self) -> Result>> { } }; - Ok(Some(page)) + Some(page) } - fn flush(&mut self) -> Result { + fn flush(&mut self) { if self.disk_storage.cache_size > 0 { - while let Some(page) = self.extract_cache_page()? { + while let Some(page) = self.extract_cache_page() { drop(page); } } - Ok(()) } fn get_or_alloc_cache_page(&mut self, sector: u64) -> Result<&mut NullBlockPage> { @@ -208,7 +204,7 @@ fn get_or_alloc_cache_page(&mut self, sector: u64) -> Result<&mut NullBlockPage> .take() .expect("Expected to have a page available") } else { - self.extract_cache_page()? + self.extract_cache_page() .expect("Expected to find a page in the cache") }; Ok(self diff --git a/drivers/block/rnull/rnull.rs b/drivers/block/rnull/rnull.rs index 429819bf042ba..592fbf5790fd2 100644 --- a/drivers/block/rnull/rnull.rs +++ b/drivers/block/rnull/rnull.rs @@ -28,7 +28,7 @@ BadBlocks, // }, bio::Segment, - error::BlkResult, + error::{BlkError, BlkResult}, mq::{ self, gen_disk::{ @@ -36,8 +36,10 @@ GenDisk, GenDiskRef, // }, + IdleRequest, IoCompletionBatch, Operations, + RequestList, TagSet, // }, SECTOR_SHIFT, @@ -720,6 +722,104 @@ fn complete_request(&self, rq: Owned>) { } } } + + #[inline(always)] + fn queue_rq_internal( + hw_data: Pin<&SpinLock>, + this: ArcBorrow<'_, Self>, + rq: Owned>, + _is_last: bool, + ) -> Result<(), QueueRequestError> { + if this.bandwidth_limit != 0 { + if !this.bandwidth_timer.active() { + drop(this.bandwidth_timer_handle.lock().take()); + let arc: Arc<_> = this.into(); + *this.bandwidth_timer_handle.lock() = + Some(arc.start(Self::BANDWIDTH_TIMER_INTERVAL)); + } + + if this + .bandwidth_bytes + .fetch_add(u64::from(rq.bytes()), ordering::Relaxed) + + u64::from(rq.bytes()) + > this.bandwidth_limit + { + rq.queue().stop_hw_queues(); + if this.bandwidth_bytes.load(ordering::Relaxed) <= this.bandwidth_limit { + rq.queue().start_stopped_hw_queues_async(); + } + + return Err(QueueRequestError { request: rq }); + } + } + + let mut rq = rq.start(); + + if rq.command() == mq::Command::Flush { + if this.memory_backed { + this.storage.flush(&hw_data); + } + this.complete_request(rq); + + return Ok(()); + } + + let status = (|| -> Result { + #[cfg(CONFIG_BLK_DEV_ZONED)] + if this.zoned.enabled { + this.handle_zoned_command(&hw_data, &mut rq)?; + } else { + this.handle_regular_command(&hw_data, &mut rq)?; + } + + #[cfg(not(CONFIG_BLK_DEV_ZONED))] + this.handle_regular_command(&hw_data, &mut rq)?; + + Ok(()) + })(); + + if let Err(e) = status { + // Do not overwrite existing error. We do not care whether this write fails. + let _ = rq + .data_ref() + .error + .cmpxchg(0, e.to_errno(), ordering::Relaxed); + } + + if rq.is_poll() { + // NOTE: We lack the ability to insert `Owned` into a + // `kernel::list::List`, so we use a `RingBuffer` instead. The + // drawback of this is that we have to allocate the space for the + // ring buffer during drive initialization, and we have to hold the + // lock protecting the list until we have processed all the requests + // in the list. Change to a linked list when the kernel gets this + // ability. + + // NOTE: We are processing requests during submit rather than during + // poll. This is different from C driver. C driver does processing + // during poll. + + hw_data + .lock() + .poll_queue + .push_head(rq) + .expect("Buffer is sized to hold all in flight requests"); + } else { + this.complete_request(rq); + } + + Ok(()) + } +} + +struct QueueRequestError { + request: Owned>, +} + +impl From for BlkError { + fn from(_value: QueueRequestError) -> Self { + kernel::block::error::code::BLK_STS_IOERR + } } impl_has_hr_timer! { @@ -761,7 +861,7 @@ struct HwQueueContext { struct Pdu { #[pin] timer: HrTimer, - error: Atomic, + error: Atomic, } impl HrTimerCallback for Pdu { @@ -802,76 +902,31 @@ fn new_request_data() -> impl PinInit { }) } - #[inline(always)] fn queue_rq( hw_data: Pin<&SpinLock>, this: ArcBorrow<'_, Self>, rq: Owned>, - _is_last: bool, - is_poll: bool, + is_last: bool, + _is_poll: bool, ) -> BlkResult { - if this.bandwidth_limit != 0 { - if !this.bandwidth_timer.active() { - drop(this.bandwidth_timer_handle.lock().take()); - let arc: Arc<_> = this.into(); - *this.bandwidth_timer_handle.lock() = - Some(arc.start(Self::BANDWIDTH_TIMER_INTERVAL)); - } + Ok(Self::queue_rq_internal(hw_data, this, rq, is_last)?) + } - if this - .bandwidth_bytes - .fetch_add(u64::from(rq.bytes()), ordering::Relaxed) - + u64::from(rq.bytes()) - > this.bandwidth_limit + fn queue_rqs( + hw_data: Pin<&SpinLock>, + this: ArcBorrow<'_, Self>, + requests: &mut RequestList, + ) { + let mut requeue = RequestList::new(); + while let Some(request) = requests.pop() { + if let Err(QueueRequestError { request }) = + Self::queue_rq_internal(hw_data, this, request, false) { - rq.queue().stop_hw_queues(); - if this.bandwidth_bytes.load(ordering::Relaxed) <= this.bandwidth_limit { - rq.queue().start_stopped_hw_queues_async(); - } - - return Err(kernel::block::error::code::BLK_STS_DEV_RESOURCE); + requeue.push_tail(request); } } - let mut rq = rq.start(); - - if rq.command() == mq::Command::Flush { - if this.memory_backed { - this.storage.flush(&hw_data)?; - } - this.complete_request(rq); - - return Ok(()); - } - - #[cfg(CONFIG_BLK_DEV_ZONED)] - if this.zoned.enabled { - this.handle_zoned_command(&hw_data, &mut rq)?; - } else { - this.handle_regular_command(&hw_data, &mut rq)?; - } - - #[cfg(not(CONFIG_BLK_DEV_ZONED))] - this.handle_regular_command(&hw_data, &mut rq)?; - - if is_poll { - // NOTE: We lack the ability to insert `Owned` into a - // `kernel::list::List`, so we use a `RingBuffer` instead. The - // drawback of this is that we have to allocate the space for the - // ring buffer during drive initialization, and we have to hold the - // lock protecting the list until we have processed all the requests - // in the list. Change to a linked list when the kernel gets this - // ability. - - // NOTE: We are processing requests during submit rather than during - // poll. This is different from C driver. C driver does processing - // during poll. - - hw_data.lock().poll_queue.push_head(rq)?; - } else { - this.complete_request(rq); - } - Ok(()) + drop(core::mem::replace(requests, requeue)); } fn commit_rqs(_hw_data: Pin<&SpinLock>, _queue_data: ArcBorrow<'_, Self>) {} @@ -888,7 +943,6 @@ fn poll( let status = rq.data_ref().error.load(ordering::Relaxed); rq.data_ref().error.store(0, ordering::Relaxed); - // TODO: check error handling via status if let Err(rq) = batch.add_request(rq, status != 0) { Self::end_request(rq); } -- 2.51.2