From 5b7895e7be582c85645317617c33f5aa7e1e959d Mon Sep 17 00:00:00 2001 From: Stevan Andjelkovic Date: Tue, 15 Mar 2022 16:49:45 +0100 Subject: [PATCH] fix(sut): make the zero-copy variant work by blocking cleaning --- src/sut/dumblog/src/Dumblog/ZeroCopy/Main.hs | 18 ++++++++++++++---- src/sut/dumblog/src/Dumblog/ZeroCopy/Worker.hs | 4 ---- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/src/sut/dumblog/src/Dumblog/ZeroCopy/Main.hs b/src/sut/dumblog/src/Dumblog/ZeroCopy/Main.hs index a697ffa0..012791b2 100644 --- a/src/sut/dumblog/src/Dumblog/ZeroCopy/Main.hs +++ b/src/sut/dumblog/src/Dumblog/ZeroCopy/Main.hs @@ -4,7 +4,8 @@ import Control.Concurrent (MVar) import Control.Concurrent.Async (link, withAsync) import Journal (allocateJournal, defaultOptions, startJournal) import Journal.Internal.Logger (nullLogger) -import Journal.Types (oLogger, oTermBufferLength) +import Journal.Types + (Subscriber(Sub2), oLogger, oMaxSubscriber, oTermBufferLength) import Dumblog.ZeroCopy.HttpServer import Dumblog.ZeroCopy.State @@ -14,9 +15,18 @@ import Dumblog.ZeroCopy.Worker zeroCopyDumblog :: Int -> Int -> FilePath -> Maybe (MVar ()) -> IO () zeroCopyDumblog capacity port fp mReady = do - let opts = defaultOptions { oTermBufferLength = capacity - , oLogger = nullLogger - } + let opts = defaultOptions + { oTermBufferLength = capacity + , oLogger = nullLogger + -- NOTE: A second subscriber whose bytes consumed is never + -- incremented is used to avoid cleaning up the journal, + -- otherwise we can have reads that point to a cleaned up part of + -- the journal. A better longer term solution would be to have + -- the compaction phase of the journal store the data somewhere + -- more long term and update the in-memory location to point to + -- that instead and allow the journal to be cleaned up. + , oMaxSubscriber = Sub2 + } allocateJournal fp opts jour <- startJournal fp opts state <- initState 40000 fp diff --git a/src/sut/dumblog/src/Dumblog/ZeroCopy/Worker.hs b/src/sut/dumblog/src/Dumblog/ZeroCopy/Worker.hs index aad2be0b..ad2005d5 100644 --- a/src/sut/dumblog/src/Dumblog/ZeroCopy/Worker.hs +++ b/src/sut/dumblog/src/Dumblog/ZeroCopy/Worker.hs @@ -44,10 +44,6 @@ worker jour = go go state' Just (Read ix) -> do conn <- mkSocket (CInt fd) - -- XXX: We need to not do `cleanBufferTo` otherwise it will clean - -- away the journal before can reply to it. I guess we could also - -- try to have subscriber `Sub2` never advance its consumed bytes - -- counter to disable cleaning... readSendfile state conn ix go state Nothing -> go state