Skip to content

Commit

Permalink
Use "Connection: close" header in assistive downloads (indyjo/bitwrk#189
Browse files Browse the repository at this point in the history
)
  • Loading branch information
indyjo committed Sep 7, 2019
1 parent 02a94f2 commit eae3bd9
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 9 deletions.
2 changes: 1 addition & 1 deletion remotesync/httpsync/cmd/synctest/synctest.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func loadFile(storage cafs.FileStorage, path string) (err error) {
handler := httpsync.NewFileHandlerFromFile(file, rand.Perm(256))
fileHandlers[file.Key().String()] = handler

path = fmt.Sprintf("/file/%v", file.Key())
path = fmt.Sprintf("/file/%v", file.Key().String()[:16])
http.Handle(path, handler)
log.Printf(" serving under %v", path)
return
Expand Down
17 changes: 16 additions & 1 deletion remotesync/httpsync/httpsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,31 @@ func (handler *FileHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
return
}

// Require a Connection: close header that will trick Go's HTTP server into allowing bi-directional streams.
if r.Header.Get("Connection") != "close" {
http.Error(w, "Connection: close required", http.StatusBadRequest)
return
}

chunks, err := handler.source.GetChunks()
if err != nil {
handler.log.Printf("GetChunks() failed: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer chunks.Dispose()

w.WriteHeader(http.StatusOK)
w.(http.Flusher).Flush()

cb := func(bytesToTransfer, bytesTransferred int64) {
handler.log.Printf(" skipped: %v transferred: %v", -bytesToTransfer, bytesTransferred)
}
handler.log.Printf("Calling WriteChunkData")
defer handler.log.Printf("WriteChunkData finished")
err = remotesync.WriteChunkData(chunks, 0, bufio.NewReader(r.Body), handler.syncinfo.Perm, w, cb)
err = remotesync.WriteChunkData(chunks, 0, bufio.NewReader(r.Body), handler.syncinfo.Perm,
remotesync.SimpleFlushWriter{w, w.(http.Flusher)}, cb)
if err != nil {
handler.log.Printf("Error in WriteChunkData: %v", err)
return
Expand Down Expand Up @@ -147,6 +159,9 @@ func SyncFrom(ctx context.Context, storage cafs.FileStorage, client *http.Client
// Enable cancelation
req = req.WithContext(ctx)

// Trick Go's HTTP server implementation into allowing bi-directional data flow
req.Header.Set("Connection", "close")

go func() {
if err := builder.WriteWishList(nopFlushWriter{pw}); err != nil {
_ = pw.CloseWithError(fmt.Errorf("error in WriteWishList: %v", err))
Expand Down
1 change: 1 addition & 0 deletions remotesync/httpsync/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (s *syncInfoChunks) Dispose() {
close(s.done)
}

// An implementation of FlushWriter whose Flush() function is a nop.
type nopFlushWriter struct {
w io.Writer
}
Expand Down
6 changes: 0 additions & 6 deletions remotesync/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,6 @@ import (
var ErrDisposed = errors.New("disposed")
var ErrUnexpectedChunk = errors.New("unexpected chunk")

// Interface FlushWriter acts like an io.Writer with an additional Flush method.
type FlushWriter interface {
io.Writer
Flush()
}

// Used by receiver to memorize information about a chunk in the time window between
// putting it into the wishlist and receiving the actual chunk data.
type memo struct {
Expand Down
3 changes: 2 additions & 1 deletion remotesync/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func forEachChunk(chunks Chunks, r io.ByteReader, perm shuffle.Permutation, f fu
// Writes a stream of chunk length / data pairs, permuted by a shuffler corresponding to `perm`,
// into an io.Writer, based on the chunks of a file and a matching permuted wishlist of requested chunks,
// read from `r`.
func WriteChunkData(chunks Chunks, bytesToTransfer int64, r io.ByteReader, perm shuffle.Permutation, w io.Writer, cb TransferStatusCallback) error {
func WriteChunkData(chunks Chunks, bytesToTransfer int64, r io.ByteReader, perm shuffle.Permutation, w FlushWriter, cb TransferStatusCallback) error {
if LoggingEnabled {
log.Printf("Sender: Begin WriteChunkData")
defer log.Printf("Sender: End WriteChunkData")
Expand All @@ -162,6 +162,7 @@ func WriteChunkData(chunks Chunks, bytesToTransfer int64, r io.ByteReader, perm
_ = r.Close()
return err
} else {
w.Flush()
bytesTransferred += n
}
if err := r.Close(); err != nil {
Expand Down
21 changes: 21 additions & 0 deletions remotesync/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,29 @@ import (
"github.com/indyjo/cafs"
"github.com/indyjo/cafs/chunking"
"io"
"net/http"
)

// Interface FlushWriter acts like an io.Writer with an additional Flush method.
type FlushWriter interface {
io.Writer
Flush()
}

// Struct SimpleFlushWriter implements FlushWriter using a Writer and a Flusher.
type SimpleFlushWriter struct {
W io.Writer
F http.Flusher
}

func (s SimpleFlushWriter) Write(p []byte) (n int, err error) {
return s.W.Write(p)
}

func (s SimpleFlushWriter) Flush() {
s.F.Flush()
}

// The key pertaining to the SHA256 of an empty string is used to represent placeholders
// for empty slots generated by shuffled transmissions.
var emptyKey = *cafs.MustParseKey("e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855")
Expand Down

0 comments on commit eae3bd9

Please sign in to comment.