-
Notifications
You must be signed in to change notification settings - Fork 42
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- 增加多线程分片下载文件 - 增加下载文件但是进程退出后,重新下载恢复之前的进度
- Loading branch information
Showing
9 changed files
with
332 additions
and
19 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,4 +41,4 @@ jobs: | |
- name: Test | ||
run: | | ||
go build . | ||
go test -v . | ||
go test -v ./... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
package partial | ||
|
||
import ( | ||
"sync/atomic" | ||
) | ||
|
||
type Chunk struct { | ||
// 切片的顺序 | ||
index int64 | ||
|
||
// 切片内容的在源文件的开始地址 | ||
start int64 | ||
|
||
// 切片内容在源文件的结束地址 | ||
end int64 | ||
|
||
// 切片任务的下载错误 | ||
err error | ||
|
||
// 下载完的切片的具体内容 | ||
buffer []byte | ||
} | ||
|
||
func NewChunk(index, start, end int64) *Chunk { | ||
chunk := &Chunk{ | ||
start: start, | ||
end: end, | ||
index: index, | ||
} | ||
return chunk | ||
} | ||
|
||
func (p *Chunk) SetData(bytes []byte) { | ||
p.buffer = bytes | ||
} | ||
|
||
func (p *Chunk) SetError(err error) { | ||
p.err = err | ||
} | ||
|
||
func (p *Chunk) Error() error { | ||
return p.err | ||
} | ||
|
||
func (p *Chunk) Data() []byte { | ||
return p.buffer | ||
} | ||
|
||
// 切片乱序写入后,将切片顺序读取 | ||
type ChunksSorter struct { | ||
// 已经读取的切片数量 | ||
readCount int64 | ||
|
||
// 切片的所有总数 | ||
chunkCount int64 | ||
|
||
// 线程数,用于阻塞写入 | ||
works int64 | ||
|
||
// 存储切片的缓存区 | ||
chunks []chan *Chunk | ||
} | ||
|
||
func NewChunksSorter(chunkCount int64, works int) *ChunksSorter { | ||
chunks := make([]chan *Chunk, works) | ||
for i := 0; i < len(chunks); i++ { | ||
chunks[i] = make(chan *Chunk) | ||
} | ||
|
||
return &ChunksSorter{ | ||
chunkCount: chunkCount, | ||
works: int64(works), | ||
chunks: chunks, | ||
} | ||
} | ||
|
||
// 将数据写入到缓存区,如果该缓存已满,则会被阻塞 | ||
func (p *ChunksSorter) Write(chunk *Chunk) { | ||
p.chunks[chunk.index%p.works] <- chunk | ||
} | ||
|
||
// 关闭 workId 下的通道 | ||
func (p *ChunksSorter) Close(workId int) { | ||
if (len(p.chunks) - 1) >= workId { | ||
close(p.chunks[workId]) | ||
} | ||
} | ||
|
||
// 顺序读取切片,如果下一个切片没有下载完,则会被阻塞 | ||
func (p *ChunksSorter) Read() *Chunk { | ||
if p.chunkCount == 0 { | ||
return nil | ||
} | ||
i := atomic.AddInt64(&p.readCount, 1) | ||
chunk := <-p.chunks[(i-1)%p.works] | ||
return chunk | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
package partial | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"io" | ||
"os" | ||
"sync" | ||
) | ||
|
||
const ChunkSize = 1024 * 1024 * 10 | ||
|
||
type ChunkDownFunc func(start, end int64) ([]byte, error) | ||
|
||
type MultiPartialDownloader struct { | ||
|
||
// 文件路径 | ||
filePath string | ||
|
||
// 最终文件大小 | ||
finalSize int64 | ||
|
||
// 本地文件大小 | ||
localSize int64 | ||
|
||
writer io.Writer | ||
works int | ||
downFunc ChunkDownFunc | ||
} | ||
|
||
func NewMultiPartialDownloader(filePath string, finalSize int64, writer io.Writer, works int, fn ChunkDownFunc) *MultiPartialDownloader { | ||
return &MultiPartialDownloader{ | ||
filePath: filePath, | ||
finalSize: finalSize, | ||
works: works, | ||
writer: writer, | ||
downFunc: fn, | ||
} | ||
} | ||
|
||
func (p *MultiPartialDownloader) Download() error { | ||
fileinfo, err := os.Stat(p.filePath) | ||
|
||
// 如果异常 | ||
// - 文件不存在异常: localSize 默认值 0 | ||
// - 不是文件不存在异常: 报错 | ||
if err != nil && !os.IsNotExist(err) { | ||
return err | ||
} | ||
if err == nil { | ||
p.localSize = fileinfo.Size() | ||
} | ||
|
||
// 计算需要下载的块数 | ||
needDownSize := p.finalSize - p.localSize | ||
chunkCount := needDownSize / ChunkSize | ||
if needDownSize%ChunkSize != 0 { | ||
chunkCount++ | ||
} | ||
|
||
chunksSorter := NewChunksSorter( | ||
chunkCount, | ||
p.works, | ||
) | ||
|
||
// 下载切片任务 | ||
var wg sync.WaitGroup | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer func() { | ||
// 取消切片下载任务,并等待 | ||
cancel() | ||
wg.Wait() | ||
}() | ||
|
||
for i := 0; i < p.works; i++ { | ||
wg.Add(1) | ||
go func(ctx context.Context, workId int) { | ||
defer func() { | ||
// 关闭 workId 下的接收通道 | ||
chunksSorter.Close(workId) | ||
wg.Done() | ||
}() | ||
|
||
// 每个 work 取自己倍数的 chunk | ||
for j := workId; j < int(chunkCount); j += p.works { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
default: | ||
var ( | ||
err error | ||
buffer []byte | ||
) | ||
start := p.localSize + int64(j)*ChunkSize | ||
end := p.localSize + int64(j+1)*ChunkSize | ||
if end > p.finalSize { | ||
end = p.finalSize | ||
} | ||
chunk := NewChunk(int64(j), start, end) | ||
|
||
// 重试三次 | ||
for t := 0; t < 3; t++ { | ||
// ? 由于长度是从1开始,而数据是从0地址开始 | ||
// ? 计算字节时容量会多出开头的一位,所以末尾需要减少一位 | ||
buffer, err = p.downFunc(chunk.start, chunk.end-1) | ||
if err == nil { | ||
break | ||
} | ||
} | ||
chunk.SetData(buffer) | ||
chunk.SetError(err) | ||
chunksSorter.Write(chunk) | ||
|
||
if err != nil { | ||
return | ||
} | ||
} | ||
} | ||
}(ctx, i) | ||
} | ||
|
||
// 将分片顺序写入到文件 | ||
for { | ||
chunk := chunksSorter.Read() | ||
if chunk == nil { | ||
break | ||
} | ||
if chunk.Error() != nil { | ||
return chunk.Error() | ||
} | ||
if len(chunk.Data()) == 0 { | ||
return errors.New("chunk buffer download but size is 0") | ||
} | ||
p.writer.Write(chunk.Data()) | ||
} | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
package partial | ||
|
||
import ( | ||
"bytes" | ||
"crypto/md5" | ||
"strings" | ||
"testing" | ||
) | ||
|
||
func TestDownload(t *testing.T) { | ||
var buffer bytes.Buffer | ||
|
||
filedata := []byte(strings.Repeat("hello world", 1024*1024*100)) | ||
download := NewMultiPartialDownloader( | ||
"myTestfile", | ||
int64(len(filedata)), | ||
&buffer, | ||
3, | ||
func(start, end int64) ([]byte, error) { | ||
return filedata[start : end+1], nil | ||
}, | ||
) | ||
|
||
err := download.Download() | ||
if err != nil { | ||
t.Fatal(err.Error()) | ||
} | ||
if md5.Sum(buffer.Bytes()) != md5.Sum(filedata) { | ||
t.Fatal("download file has diff MD5") | ||
} | ||
} |
Oops, something went wrong.