@@ -10,7 +10,10 @@ use axerrno::{AxError, AxResult};
1010use axio:: { Buf , BufMut , Read , Write } ;
1111use axpoll:: { IoEvents , PollSet , Pollable } ;
1212use axsync:: Mutex ;
13- use axtask:: { current, future:: Poller } ;
13+ use axtask:: {
14+ current,
15+ future:: { block_on, poll_io} ,
16+ } ;
1417use linux_raw_sys:: { general:: S_IFIFO , ioctl:: FIONREAD } ;
1518use memory_addr:: PAGE_SIZE_4K ;
1619use ringbuf:: {
@@ -117,28 +120,26 @@ impl FileLike for Pipe {
117120 return Ok ( 0 ) ;
118121 }
119122
120- Poller :: new ( self , IoEvents :: IN )
121- . non_blocking ( self . nonblocking ( ) )
122- . poll ( || {
123- let read = {
124- let cons = self . shared . buffer . lock ( ) ;
125- let ( left, right) = cons. as_slices ( ) ;
126- let mut count = dst. write ( left) ?;
127- if count >= left. len ( ) {
128- count += dst. write ( right) ?;
129- }
130- unsafe { cons. advance_read_index ( count) } ;
131- count
132- } ;
133- if read > 0 {
134- self . shared . poll_tx . wake ( ) ;
135- Ok ( read)
136- } else if self . closed ( ) {
137- Ok ( 0 )
138- } else {
139- Err ( AxError :: WouldBlock )
123+ block_on ( poll_io ( self , IoEvents :: IN , self . nonblocking ( ) , || {
124+ let read = {
125+ let cons = self . shared . buffer . lock ( ) ;
126+ let ( left, right) = cons. as_slices ( ) ;
127+ let mut count = dst. write ( left) ?;
128+ if count >= left. len ( ) {
129+ count += dst. write ( right) ?;
140130 }
141- } )
131+ unsafe { cons. advance_read_index ( count) } ;
132+ count
133+ } ;
134+ if read > 0 {
135+ self . shared . poll_tx . wake ( ) ;
136+ Ok ( read)
137+ } else if self . closed ( ) {
138+ Ok ( 0 )
139+ } else {
140+ Err ( AxError :: WouldBlock )
141+ }
142+ } ) )
142143 }
143144
144145 fn write ( & self , src : & mut SealedBuf ) -> AxResult < usize > {
@@ -151,34 +152,32 @@ impl FileLike for Pipe {
151152 }
152153
153154 let mut total_written = 0 ;
154- let non_blocking = self . nonblocking ( ) ;
155- Poller :: new ( self , IoEvents :: OUT )
156- . non_blocking ( non_blocking)
157- . poll ( || {
158- if self . closed ( ) {
159- raise_pipe ( ) ;
160- return Err ( AxError :: BrokenPipe ) ;
161- }
162155
163- let written = {
164- let mut prod = self . shared . buffer . lock ( ) ;
165- let ( left, right) = prod. vacant_slices_mut ( ) ;
166- let mut count = src. read ( unsafe { left. assume_init_mut ( ) } ) ?;
167- if count >= left. len ( ) {
168- count += src. read ( unsafe { right. assume_init_mut ( ) } ) ?;
169- }
170- unsafe { prod. advance_write_index ( count) } ;
171- count
172- } ;
173- if written > 0 {
174- self . shared . poll_rx . wake ( ) ;
175- total_written += written;
176- if total_written == size || non_blocking {
177- return Ok ( total_written) ;
178- }
156+ block_on ( poll_io ( self , IoEvents :: OUT , self . nonblocking ( ) , || {
157+ if self . closed ( ) {
158+ raise_pipe ( ) ;
159+ return Err ( AxError :: BrokenPipe ) ;
160+ }
161+
162+ let written = {
163+ let mut prod = self . shared . buffer . lock ( ) ;
164+ let ( left, right) = prod. vacant_slices_mut ( ) ;
165+ let mut count = src. read ( unsafe { left. assume_init_mut ( ) } ) ?;
166+ if count >= left. len ( ) {
167+ count += src. read ( unsafe { right. assume_init_mut ( ) } ) ?;
179168 }
180- Err ( AxError :: WouldBlock )
181- } )
169+ unsafe { prod. advance_write_index ( count) } ;
170+ count
171+ } ;
172+ if written > 0 {
173+ self . shared . poll_rx . wake ( ) ;
174+ total_written += written;
175+ if total_written == size || self . nonblocking ( ) {
176+ return Ok ( total_written) ;
177+ }
178+ }
179+ Err ( AxError :: WouldBlock )
180+ } ) )
182181 }
183182
184183 fn stat ( & self ) -> AxResult < Kstat > {
0 commit comments