Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 10 additions & 14 deletions src/coprocessor/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ use pd::PdTask;
use super::codec::mysql;
use super::codec::datum::Datum;
use super::dag::DAGContext;
use super::dag::executor::ExecutorMetrics;
use super::statistics::analyze::AnalyzeContext;
use super::metrics::*;
use super::local_metrics::*;
use super::local_metrics::{BasicLocalMetrics, ExecLocalMetrics};
use super::dag::executor::ExecutorMetrics;
use super::{Error, Result};

pub const REQ_TYPE_DAG: i64 = 103;
Expand Down Expand Up @@ -869,12 +869,14 @@ mod tests {
use tipb::expression::Expr;
use tipb::executor::Executor;

use util::worker::{Builder as WorkerBuilder, FutureWorker};
use util::config::ReadableDuration;
use util::time::Instant;
use util::worker::{Builder as WorkerBuilder, FutureWorker};

#[test]
fn test_get_reg_scan_tag() {
let mut ctx = ReqContext {
start_time: Instant::now_coarse(),
deadline: Instant::now_coarse(),
isolation_level: IsolationLevel::RC,
fill_cache: true,
Expand All @@ -890,6 +892,7 @@ mod tests {
let mut worker = WorkerBuilder::new("test-endpoint").batch_size(30).create();
let engine = engine::new_local_engine(TEMP_DIR, &[]).unwrap();
let mut cfg = Config::default();
cfg.end_point_request_max_handle_duration = ReadableDuration::secs(0);
cfg.end_point_concurrency = 1;
let pd_worker = FutureWorker::new("test-pd-worker");
let end_point = Host::new(engine, worker.scheduler(), &cfg, pd_worker.scheduler());
Expand All @@ -899,14 +902,13 @@ mod tests {
req.set_tp(REQ_TYPE_DAG);
let (tx, rx) = mpsc::channel();
let on_resp = OnResponse::Unary(box move |msg| tx.send(msg).unwrap());
let mut task =
RequestTask::new(req, on_resp, 1000, super::DEFAULT_REQUEST_MAX_HANDLE_SECS).unwrap();
task.ctx.deadline -= Duration::from_secs(super::DEFAULT_REQUEST_MAX_HANDLE_SECS);
let task = RequestTask::new(req, on_resp, 1000).unwrap();

worker.schedule(Task::Request(task)).unwrap();
let resp = rx.recv_timeout(Duration::from_secs(3)).unwrap();
assert!(!resp.get_other_error().is_empty());
assert_eq!(resp.get_other_error(), super::OUTDATED_ERROR_MSG);
worker.stop();
}

#[test]
Expand Down Expand Up @@ -936,8 +938,7 @@ mod tests {
let _ = tx.send(msg); // To avoid panic if rx is closed.
});

let task = RequestTask::new(req, on_resp, 1000, super::DEFAULT_REQUEST_MAX_HANDLE_SECS)
.unwrap();
let task = RequestTask::new(req, on_resp, 1000).unwrap();
worker.schedule(Task::Request(task)).unwrap();
}
for _ in 0..120 {
Expand Down Expand Up @@ -967,12 +968,7 @@ mod tests {
req.set_tp(REQ_TYPE_DAG);
req.set_data(dag.write_to_bytes().unwrap());

let err = RequestTask::new(
req,
OnResponse::Unary(box |_| ()),
5,
super::DEFAULT_REQUEST_MAX_HANDLE_SECS,
).unwrap_err();
let err = RequestTask::new(req, OnResponse::Unary(box |_| ()), 5).unwrap_err();
let s = format!("{:?}", err);
assert!(
s.contains("Recursion"),
Expand Down
2 changes: 1 addition & 1 deletion tests/coprocessor/test_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1506,7 +1506,7 @@ where
}
f.wait().unwrap();
};
let req = RequestTask::new(req, OnResponse::Streaming(callback), 100, 60).unwrap();
let req = RequestTask::new(req, OnResponse::Streaming(callback), 100).unwrap();
end_point.schedule(EndPointTask::Request(req)).unwrap();

let (tx, rx) = mpsc::channel();
Expand Down