1. 缘起 🔗
今天逛V2EX发现一个比较有意思的帖子“多线程分段下载文件时,为什么不下载到同一个大文件中?而是要分别下载到单独的文件然后再合并。 - V2EX”,楼主认为一些多线程HTTP文件下载器把文件下载到多个小文件后再合并是没有必要的,我深以为然。
2. 多线程下载器的设计 🔗
2.1 整体思路 🔗
响应头的语法如下:Content-Length: <length>
我们可以通过HTTP的Range请求头指定服务器返回待下载文件的某个分段,这也是HTTP断点续传的原理。如果服务器支持的话,会返回206 Partial Content 响应并携带该分块的数据,否则返回HTTP / 200 OK响应及整个文件数据。
:Range: <unit>=<range-start>- Range: <unit>=<range-start>-<range-end> Range: <unit>=<range-start>-<range-end>, <range-start>-<range-end> Range: <unit>=<range-start>-<range-end>, <range-start>-<range-end>, <range-start>-<range-end> Range: <unit>=-<suffix-length>
2.2 并发安全的文件写入 🔗
个worker,那么每个worker的下载大小为SIZE / worker
那我们多个worker并发去写同一个文件会不会有并发安全问题呢?如果我们只打开文件一次,一个文件对象在多个线程共享,那么肯定会有并发安全问题,因为多个线程操作同一个文件对象,共享的是同样的文件描述符(File Descriptor, FD),所有可能导致文件的覆盖。如果只有一个文件对象,我们必须用一个互斥锁同步对文件的读写,每个线程对文件的一次写入都需要获得锁之后,再通过lseek(2)
好在现代操作系统允许在一个进程内多次打开同一个文件,得到多个具有独立偏移量的文件描述符,例如通过man 2 open
我们可以看到Linux的man page写了这么一段话:
Each open() of a file creates a new open file description; thus, there may be multiple open file descriptions corresponding to a file inode.
2.3 稀疏文件的额外收益 🔗
大多数UNIX文件系统都支持稀疏文件(Sparse File),当我们将文件描述符的偏移量移动到文件尾部之后再执行一次写入,那么从程序员的视角上看之前文件的尾部到写入的字节之间多出了一段以0
3. 多线程下载器的实现 🔗
我用Go写了一个简单的命令行程序,默认创建的并发worker的数量是逻辑CPU的个数,可以通过flag -n
package main
import (
1. Use HEAD method to get the file size(HTTP response header Content-Length)
2. Create a temporary file to store the downloaded content
3. Calculate the range of each slice for each worker
4. Send a GET request with Range header to download the slice
5. Write the downloaded content to the temporary file
Note: To avoid concurrent write to the same file, we open the file in each worker.
TODO: Add handling logic for the following cases:
- The server sends a 429 Too Many Requests status code, slow down the speed
- Custom rate limiter to control the download speed
- Add a timeout for the HTTP request
- Add a retry mechanism for the failed HTTP request
- Add a checksum to verify the downloaded content
// HTTP status code that may occur:
// 200 OK: we can only download the complete file
// 206 Partial Content: we can download the file slice separately
// 429 Too Many Requests: we need to slow down the download speed
const (
ProgressBarHint = "Downloading task: "
var (
// number of the concurrent downloading workers. If not specified, the count is set to the number of CPU cores
numWorkers int
// ErrDownloadingNotCompleted indicates the task finished incompletely
ErrDownloadingNotCompleted = errors.New("downloading not completed")
// DownloadingTask represents an HTTP file downloading task
type DownloadingTask struct {
// the URL of the file to download
Url string
// the file name to save the downloaded content
FileName string
// the temporary file name to store the unfinished downloaded content
TempFileName string
// the length of the total file in bytes, -1 means unknown file size
Length int64
// concurrent workers to download the file
NumWorker int
// the downloading status of each worker
CompleteStatus []bool
func NewDownloadingTask(fileURL string, numWorker int) (*DownloadingTask, error) {
// fetch the content length of the file
resp, err := http.Head(fileURL)
if err != nil {
return nil, err
defer resp.Body.Close()
contentLength, err := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64)
if err != nil {
numWorker = 1
contentLength = -1
// create a tmp file to store the downloaded content
file, err := os.CreateTemp(".", "multi-downloader-*")
if err != nil {
return nil, err
defer file.Close()
// create a downloading task and start downloading
return &DownloadingTask{
Url: fileURL,
Length: contentLength,
NumWorker: numWorker,
CompleteStatus: make([]bool, numWorker),
TempFileName: file.Name(),
FileName: parseFileName(fileURL),
}, nil
// IsComplete checks if all the workers have finished downloading
func (t *DownloadingTask) IsComplete() bool {
for _, status := range t.CompleteStatus {
if !status {
return false
return true
// Start starts the downloading task by assigning the downloading task to all the workers.
func (t *DownloadingTask) Start() error {
// use a channel to receive the downloading status for each slice
statusStream := make(chan TaskSliceStatus)
pb := progressbar.DefaultBytes(t.Length, ProgressBarHint) // use progress bar to show the downloading progress
for i := 0; i < t.NumWorker; i++ {
t.startSlice(i, statusStream, pb)
// collect the downloading status from all the workers
for i := 0; i < t.NumWorker; i++ {
status := <-statusStream
if status.Err == nil {
t.CompleteStatus[status.WorkerId] = true
} else {
fmt.Printf("failed to download slice %d: %s\n", status.WorkerId, status.Err.Error())
if t.IsComplete() {
return os.Rename(t.TempFileName, t.FileName)
return ErrDownloadingNotCompleted
// Resume resumes downloading unfinished slices of the task
func (t *DownloadingTask) Resume() error {
statusStream := make(chan TaskSliceStatus)
pb := progressbar.DefaultBytes(-1, ProgressBarHint) // use progress bar to show the downloading progress
countMissingSlice := 0
for i := 0; i < t.NumWorker; i++ {
if !t.CompleteStatus[i] {
t.startSlice(i, statusStream, pb)
for i := 0; i < countMissingSlice; i++ {
status := <-statusStream
if status.Err == nil {
t.CompleteStatus[status.WorkerId] = true
} else {
fmt.Printf("failed to download slice %d: %s\n", status.WorkerId, status.Err.Error())
if t.IsComplete() {
return nil
return ErrDownloadingNotCompleted
func (t *DownloadingTask) startSlice(sliceNumber int, statusStream chan<- TaskSliceStatus, pb io.Writer) {
go func(workerId int) {
statusStream <- TaskSliceStatus{
WorkerId: workerId,
Err: downloadSlice(t, workerId, pb),
type TaskSliceStatus struct {
WorkerId int
Err error
func downloadSlice(task *DownloadingTask, workerId int, pb io.Writer) error {
// open the file in each worker, so we don't have to synchronize the file access
// each worker's file descriptor has its own offset
file, err := os.OpenFile(task.TempFileName, os.O_WRONLY, 0644)
if err != nil {
return err
defer file.Close()
// calculate the range of the slice
sliceLen := task.Length / int64(task.NumWorker)
start := sliceLen * int64(workerId)
end := start + sliceLen - 1
if workerId == task.NumWorker-1 {
end = task.Length - 1
if _, err = file.Seek(start, io.SeekStart); err != nil {
return err
w := io.MultiWriter(file, pb)
req, err := http.NewRequest("GET", task.Url, nil)
if err != nil {
return err
req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/ Safari/537.36 Edg/")
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, end))
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
defer resp.Body.Close()
if resp.StatusCode != http.StatusPartialContent && len(task.CompleteStatus) > 1 {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
_, err = io.Copy(w, resp.Body)
return err
// parseFileName extracts the file name from the file URL
// by removing the query string and the path
func parseFileName(fileURL string) string {
idx := strings.Index(fileURL, "?")
if idx != -1 {
fileURL = fileURL[:idx]
idx = strings.LastIndex(fileURL, "/")
if idx != -1 {
return fileURL[idx+1:]
return fileURL
func init() {
flag.IntVar(&numWorkers, "n", runtime.NumCPU(), "number of multi-thread workers")
if len(os.Args) < 2 {
fmt.Println("Usage: " + os.Args[0] + " <url>")
func main() {
task, err := NewDownloadingTask(flag.Args()[0], numWorkers)
if err != nil {
fmt.Println("failed to initialize a downloading task:", err.Error())
fmt.Printf("Using %d workers to download the file\n", task.NumWorker)
if err := task.Start(); err != nil {
fmt.Println("failed to download the file:", err.Error())
if err = task.Resume(); err != nil {
fmt.Println("failed to resume the downloading task:", err.Error())
4. 参考文献 🔗
- open(2) - Linux manual page (man7.org)
- Linux-UNIX系统编程手册