diff --git a/src/coprocessor/endpoint.rs b/src/coprocessor/endpoint.rs index 8f5a98186b9..e78e98c6344 100644 --- a/src/coprocessor/endpoint.rs +++ b/src/coprocessor/endpoint.rs @@ -44,10 +44,10 @@ use pd::PdTask; use super::codec::mysql; use super::codec::datum::Datum; use super::dag::DAGContext; -use super::dag::executor::ExecutorMetrics; use super::statistics::analyze::AnalyzeContext; use super::metrics::*; -use super::local_metrics::*; +use super::local_metrics::{BasicLocalMetrics, ExecLocalMetrics}; +use super::dag::executor::ExecutorMetrics; use super::{Error, Result}; pub const REQ_TYPE_DAG: i64 = 103; @@ -869,12 +869,14 @@ mod tests { use tipb::expression::Expr; use tipb::executor::Executor; - use util::worker::{Builder as WorkerBuilder, FutureWorker}; + use util::config::ReadableDuration; use util::time::Instant; + use util::worker::{Builder as WorkerBuilder, FutureWorker}; #[test] fn test_get_reg_scan_tag() { let mut ctx = ReqContext { + start_time: Instant::now_coarse(), deadline: Instant::now_coarse(), isolation_level: IsolationLevel::RC, fill_cache: true, @@ -890,6 +892,7 @@ mod tests { let mut worker = WorkerBuilder::new("test-endpoint").batch_size(30).create(); let engine = engine::new_local_engine(TEMP_DIR, &[]).unwrap(); let mut cfg = Config::default(); + cfg.end_point_request_max_handle_duration = ReadableDuration::secs(0); cfg.end_point_concurrency = 1; let pd_worker = FutureWorker::new("test-pd-worker"); let end_point = Host::new(engine, worker.scheduler(), &cfg, pd_worker.scheduler()); @@ -899,14 +902,13 @@ mod tests { req.set_tp(REQ_TYPE_DAG); let (tx, rx) = mpsc::channel(); let on_resp = OnResponse::Unary(box move |msg| tx.send(msg).unwrap()); - let mut task = - RequestTask::new(req, on_resp, 1000, super::DEFAULT_REQUEST_MAX_HANDLE_SECS).unwrap(); - task.ctx.deadline -= Duration::from_secs(super::DEFAULT_REQUEST_MAX_HANDLE_SECS); + let task = RequestTask::new(req, on_resp, 1000).unwrap(); worker.schedule(Task::Request(task)).unwrap(); let resp = rx.recv_timeout(Duration::from_secs(3)).unwrap(); assert!(!resp.get_other_error().is_empty()); assert_eq!(resp.get_other_error(), super::OUTDATED_ERROR_MSG); + worker.stop(); } #[test] @@ -936,8 +938,7 @@ mod tests { let _ = tx.send(msg); // To avoid panic if rx is closed. }); - let task = RequestTask::new(req, on_resp, 1000, super::DEFAULT_REQUEST_MAX_HANDLE_SECS) - .unwrap(); + let task = RequestTask::new(req, on_resp, 1000).unwrap(); worker.schedule(Task::Request(task)).unwrap(); } for _ in 0..120 { @@ -967,12 +968,7 @@ mod tests { req.set_tp(REQ_TYPE_DAG); req.set_data(dag.write_to_bytes().unwrap()); - let err = RequestTask::new( - req, - OnResponse::Unary(box |_| ()), - 5, - super::DEFAULT_REQUEST_MAX_HANDLE_SECS, - ).unwrap_err(); + let err = RequestTask::new(req, OnResponse::Unary(box |_| ()), 5).unwrap_err(); let s = format!("{:?}", err); assert!( s.contains("Recursion"), diff --git a/tests/coprocessor/test_select.rs b/tests/coprocessor/test_select.rs index 78d587eb92a..7df00b89916 100644 --- a/tests/coprocessor/test_select.rs +++ b/tests/coprocessor/test_select.rs @@ -1506,7 +1506,7 @@ where } f.wait().unwrap(); }; - let req = RequestTask::new(req, OnResponse::Streaming(callback), 100, 60).unwrap(); + let req = RequestTask::new(req, OnResponse::Streaming(callback), 100).unwrap(); end_point.schedule(EndPointTask::Request(req)).unwrap(); let (tx, rx) = mpsc::channel();