-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathWordCountParallel.hs
73 lines (64 loc) · 2.7 KB
/
WordCountParallel.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
-- To run this program:
--
-- cabal run --flag fusion-plugin WordCountParallel test-data.txt
--
import Data.Char (chr)
import Data.Function ((&))
import Data.Word (Word8)
import GHC.Conc (numCapabilities)
import System.Environment (getArgs)
import Streamly.Data.Array (Array)
import WordCount (count, Counts(..), isSpace)
import qualified Streamly.Data.Array as Array
import qualified Streamly.Data.Fold as Fold
import qualified Streamly.Data.Stream.Prelude as Stream
import qualified Streamly.FileSystem.File as File (readChunks)
import qualified Streamly.Unicode.Stream as Stream
-- Get the line, word, char counts in one chunk.
countArray :: Array Word8 -> IO Counts
countArray arr =
Array.read arr -- Stream IO Word8
& Stream.decodeLatin1 -- Stream IO Char
& Stream.fold (Fold.foldl' count (Counts 0 0 0 True)) -- IO Counts
-- When combining the counts in two contiguous chunks, we would also need to
-- know whether the first element of the next chunk was a space char or
-- non-space to know whether the same word is continuing to the next chunk or
-- if it is a new word. So add that too, giving (firstCharWasSpace, Counts).
{-# NOINLINE partialCounts #-}
partialCounts :: Array Word8 -> IO (Bool, Counts)
partialCounts arr = do
let r = Array.getIndex 0 arr
case r of
Just x -> do
counts <- countArray arr
return (isSpace (chr (fromIntegral x)), counts)
Nothing -> return (False, Counts 0 0 0 True)
-- Combine the counts from two consecutive chunks
addCounts :: (Bool, Counts) -> (Bool, Counts) -> (Bool, Counts)
addCounts (sp1, Counts l1 w1 c1 ws1) (sp2, Counts l2 w2 c2 ws2) =
let wcount =
if not ws1 && not sp2 -- no space between two chunks
then w1 + w2 - 1
else w1 + w2
in (sp1, Counts (l1 + l2) wcount (c1 + c2) ws2)
-- Now put it all together, we only need to divide the stream into arrays,
-- apply our counting function to each array and then combine all the counts.
wc :: String -> IO (Bool, Counts)
wc file = do
File.readChunks file -- Stream IO (Array Word8)
& Stream.parMapM
( Stream.maxThreads numCapabilities
. Stream.ordered True
)
partialCounts -- Stream IO (Bool, Counts)
& Stream.fold foldCounts -- IO (Bool, Counts)
where
foldCounts = Fold.foldl' addCounts (False, Counts 0 0 0 True)
-------------------------------------------------------------------------------
-- Main
-------------------------------------------------------------------------------
main :: IO ()
main = do
name <- fmap head getArgs
(_, Counts l w c _) <- wc name
putStrLn $ show l ++ " " ++ show w ++ " " ++ show c ++ " " ++ name