diff --git a/remotesync/httpsync/cmd/synctest/synctest.go b/remotesync/httpsync/cmd/synctest/synctest.go index 7f9d6d5..8624880 100644 --- a/remotesync/httpsync/cmd/synctest/synctest.go +++ b/remotesync/httpsync/cmd/synctest/synctest.go @@ -102,7 +102,7 @@ func loadFile(storage cafs.FileStorage, path string) (err error) { handler := httpsync.NewFileHandlerFromFile(file, rand.Perm(256)) fileHandlers[file.Key().String()] = handler - path = fmt.Sprintf("/file/%v", file.Key()) + path = fmt.Sprintf("/file/%v", file.Key().String()[:16]) http.Handle(path, handler) log.Printf(" serving under %v", path) return diff --git a/remotesync/httpsync/httpsync.go b/remotesync/httpsync/httpsync.go index f0a9c98..1e158c5 100644 --- a/remotesync/httpsync/httpsync.go +++ b/remotesync/httpsync/httpsync.go @@ -98,6 +98,13 @@ func (handler *FileHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) return } + + // Require a Connection: close header that will trick Go's HTTP server into allowing bi-directional streams. + if r.Header.Get("Connection") != "close" { + http.Error(w, "Connection: close required", http.StatusBadRequest) + return + } + chunks, err := handler.source.GetChunks() if err != nil { handler.log.Printf("GetChunks() failed: %v", err) @@ -105,12 +112,17 @@ func (handler *FileHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } defer chunks.Dispose() + + w.WriteHeader(http.StatusOK) + w.(http.Flusher).Flush() + cb := func(bytesToTransfer, bytesTransferred int64) { handler.log.Printf(" skipped: %v transferred: %v", -bytesToTransfer, bytesTransferred) } handler.log.Printf("Calling WriteChunkData") defer handler.log.Printf("WriteChunkData finished") - err = remotesync.WriteChunkData(chunks, 0, bufio.NewReader(r.Body), handler.syncinfo.Perm, w, cb) + err = remotesync.WriteChunkData(chunks, 0, bufio.NewReader(r.Body), handler.syncinfo.Perm, + remotesync.SimpleFlushWriter{w, w.(http.Flusher)}, cb) if err != nil { handler.log.Printf("Error in WriteChunkData: %v", err) return @@ -147,6 +159,9 @@ func SyncFrom(ctx context.Context, storage cafs.FileStorage, client *http.Client // Enable cancelation req = req.WithContext(ctx) + // Trick Go's HTTP server implementation into allowing bi-directional data flow + req.Header.Set("Connection", "close") + go func() { if err := builder.WriteWishList(nopFlushWriter{pw}); err != nil { _ = pw.CloseWithError(fmt.Errorf("error in WriteWishList: %v", err)) diff --git a/remotesync/httpsync/util.go b/remotesync/httpsync/util.go index e8a0c0a..5b97d67 100644 --- a/remotesync/httpsync/util.go +++ b/remotesync/httpsync/util.go @@ -113,6 +113,7 @@ func (s *syncInfoChunks) Dispose() { close(s.done) } +// An implementation of FlushWriter whose Flush() function is a nop. type nopFlushWriter struct { w io.Writer } diff --git a/remotesync/receive.go b/remotesync/receive.go index bba130c..ca9838e 100644 --- a/remotesync/receive.go +++ b/remotesync/receive.go @@ -30,12 +30,6 @@ import ( var ErrDisposed = errors.New("disposed") var ErrUnexpectedChunk = errors.New("unexpected chunk") -// Interface FlushWriter acts like an io.Writer with an additional Flush method. -type FlushWriter interface { - io.Writer - Flush() -} - // Used by receiver to memorize information about a chunk in the time window between // putting it into the wishlist and receiving the actual chunk data. type memo struct { diff --git a/remotesync/send.go b/remotesync/send.go index 6e02354..ead4344 100644 --- a/remotesync/send.go +++ b/remotesync/send.go @@ -137,7 +137,7 @@ func forEachChunk(chunks Chunks, r io.ByteReader, perm shuffle.Permutation, f fu // Writes a stream of chunk length / data pairs, permuted by a shuffler corresponding to `perm`, // into an io.Writer, based on the chunks of a file and a matching permuted wishlist of requested chunks, // read from `r`. -func WriteChunkData(chunks Chunks, bytesToTransfer int64, r io.ByteReader, perm shuffle.Permutation, w io.Writer, cb TransferStatusCallback) error { +func WriteChunkData(chunks Chunks, bytesToTransfer int64, r io.ByteReader, perm shuffle.Permutation, w FlushWriter, cb TransferStatusCallback) error { if LoggingEnabled { log.Printf("Sender: Begin WriteChunkData") defer log.Printf("Sender: End WriteChunkData") @@ -162,6 +162,7 @@ func WriteChunkData(chunks Chunks, bytesToTransfer int64, r io.ByteReader, perm _ = r.Close() return err } else { + w.Flush() bytesTransferred += n } if err := r.Close(); err != nil { diff --git a/remotesync/util.go b/remotesync/util.go index 4a68e22..fbac7a4 100644 --- a/remotesync/util.go +++ b/remotesync/util.go @@ -23,8 +23,29 @@ import ( "github.com/indyjo/cafs" "github.com/indyjo/cafs/chunking" "io" + "net/http" ) +// Interface FlushWriter acts like an io.Writer with an additional Flush method. +type FlushWriter interface { + io.Writer + Flush() +} + +// Struct SimpleFlushWriter implements FlushWriter using a Writer and a Flusher. +type SimpleFlushWriter struct { + W io.Writer + F http.Flusher +} + +func (s SimpleFlushWriter) Write(p []byte) (n int, err error) { + return s.W.Write(p) +} + +func (s SimpleFlushWriter) Flush() { + s.F.Flush() +} + // The key pertaining to the SHA256 of an empty string is used to represent placeholders // for empty slots generated by shuffled transmissions. var emptyKey = *cafs.MustParseKey("e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855")