跳到主要内容

rust tokie 异步文件下载

http range分片下载知识

http range

http 允许通过Range: bytes=0-1023range请求头分片下载服务器资源 bytes区间0-1023是闭间即0-10231023-0+1个字节

http head 知识点

HTTP HEAD 方法 请求资源的头部信息, 并且这些头部与 HTTP GET 方法请求时返回的一致. 该请求方法的一个使用场景是在下载一个大文件前先获取其大小再决定是否要下载, 以此可以节约带宽资源. 使用http head 可以先查看下载文件是否支持分片下载

curl -I http://i.imgur.com/z4d4kWk.jpg

HTTP/1.1 200 OK
...
Accept-Ranges: bytes
Content-Length: 146515

如果请求一个资源时, HTTP响应中出现如下所示的 Accept-Ranges, 且其值不是none, 那么服务器支持范围请求。 注有的Accept-Rangesbytes值但是Content-Length为0是不支持分片下载

rust tokio reqwest 文件下载实现

首先检查是否支持分片

async fn check_request_range(cfg: &Config) -> Result<RangeInfo, CustomeErrors> {
let url = &cfg.url;
let resp = reqwest::Client::new().head(url).send().await?;
// println!("{:#?}", resp.status());
if !resp.status().is_success() {
return Err(CustomeErrors::CustomError("range请求解析失败!".to_string()));
}

let headers = resp.headers();
let accept_range = headers.get(ACCEPT_RANGES);
// println!("{:#?}", accept_range);

let is_support = if let Some(t) = accept_range {
match t.to_str() {
Ok(value) => value.contains("bytes"),
Err(_e) => false,
}
} else {
false
};

if is_support {
let content_length = headers.get(CONTENT_LENGTH);
// println!("content_length:{:#?}", content_length);
let length = if let Some(t) = content_length {
match t.to_str() {
Ok(value) => value,
Err(_e) => "0",
}
} else {
"0"
};

// println!("当前文件大小:{:#?}", length);
let length = length.parse::<usize>()?;
if length > 0 {
let range_info = RangeInfo {
length,
is_support: true,
};
Ok(range_info)
} else {
Err(CustomeErrors::CustomError("不支持分片下载".to_string()))
}
} else {
Err(CustomeErrors::CustomError("不支持分片下载".to_string()))
}
}

下载逻辑

 pub async fn download(&self) -> Result<String, DownLoaderError> {
let task_num = self.task_num;
let file_length = self.file_length;

if file_length % task_num == 0 {
//80 10
let chuck_num = file_length / task_num;
let rang_vec = (0..task_num)
.into_iter()
.enumerate()
.map(|(index, _x)| (index, index * chuck_num, (index + 1) * chuck_num - 1))
.collect::<Vec<(usize, usize, usize)>>();
// println!("{:#?}", rang_vec);
let tasks: Vec<_> = rang_vec
.iter()
.map(|range| tokio::spawn(ParallelDownloader::down(self.url.clone(), *range)))
.collect();
let result = join_all(tasks).await;
let error_size = result
.iter()
.filter(|x| match x {
Ok(t) => match t {
Ok(t1) => !t1,
Err(_e) => true,
},
Err(_e) => true,
})
.count();

if error_size > 0 {
return Err(DownLoaderError::CustomError("下载失败!".to_string()));
}

let is_combine = self.combine_files().await?;
println!("{:#?}", is_combine);
Ok("下载完成!".to_string())
} else {
let chuck_num = file_length / task_num;
let rang_vec = (0..(task_num + 1))
.into_iter()
.enumerate()
.map(|(index, _x)| {
if index == task_num {
// (index,format!("bytes={}-{}", index*chuck_num, file_length))
(index, index * chuck_num, file_length)
} else {
// (index,format!("bytes={}-{}", index*chuck_num, (index+1)*chuck_num-1))
(index, index * chuck_num, (index + 1) * chuck_num - 1)
}
})
.collect::<Vec<(usize, usize, usize)>>();
//println!("{:#?}", rang_vec);
let tasks: Vec<_> = rang_vec
.iter()
.map(|range| tokio::spawn(ParallelDownloader::down(self.url.clone(), *range)))
.collect();
let result = join_all(tasks).await;
let error_size = result
.iter()
.filter(|x| match x {
Ok(t) => match t {
Ok(t1) => !t1,
Err(_e) => true,
},
Err(_e) => true,
})
.count();

if error_size > 0 {
return Err(DownLoaderError::CustomError("下载失败!".to_string()));
}

let is_combine = self.combine_files().await;
println!("{:#?}", is_combine);
Ok("下载完成!".to_string())
}
}
//range(index,range-start,range-end)
pub async fn down(url: String, range: (usize, usize, usize)) -> Result<bool, DownLoaderError> {
let client = Client::new();
let range_str = format!("bytes={}-{}", range.1, range.2);
let mut stream = client
.get(url)
.header(RANGE, range_str)
.send()
.await?
.bytes_stream();

fs::create_dir_all(TMP_DIR)?;
let tmp = format!("{}/{}.tmp", TMP_DIR, range.0);
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(tmp.clone())?;
let bytes_to_download = range.2 - range.1 + 1;
let mut file_lenth = 0_usize;
while let Some(chunk) = stream.next().await {
let chuck = chunk?;
let size = chuck.len();
file.write_all(&chuck)?;
file.flush()?;
file_lenth += size;
}
// println!("bytes_to_download大小:{}", bytes_to_download);
// println!("file_lenth大小:{}", file_lenth);
if bytes_to_download - file_lenth > 1 {
println!("bytes_to_download大小:{}", bytes_to_download);
println!("file_lenth大小:{}", file_lenth);
return Err(DownLoaderError::CustomError("文件缺失".to_string()));
}
println!(
"{:#?}",
format!("{}.tmp 已下载(大小:{})", range.0, file_lenth)
);
Ok(true)
}
}

分片合并

  pub async fn combine_files(&self) -> Result<bool, DownLoaderError> {
println!("合并。。。。!");
let file_length = self.file_length;
let mut task_num = self.task_num;
//注:在通过获取的文件大小按任务数`task_num`分割数据大小时,当不可整除时要task_num要+1,避免合并数据遗漏最后一个分片
if file_length % task_num != 0 {
task_num += 1;
}
let filename = self.get_file_name().await?;
println!("{}", filename);
let mut output = BufWriter::new(File::create(filename.trim())?);
for i in 0..task_num {
let mut buf: Vec<u8> = Vec::new();
let mut file = BufReader::new(File::open(format!("{}/{}.tmp", TMP_DIR, i))?);
file.read_to_end(&mut buf)?;
output.write_all(&buf)?;
output.flush()?;
// stdout().flush()?;
}
remove_dir_all(&TMP_DIR)?;
println!("文件合并完成!");
Ok(true)
}

其他

tokio main 多线程配置 #[tokio::main(flavor = "multi_thread", worker_threads = 8)] 有时不配置该项效果更好!