From 7d7212ab3b445e0d1c5cafe2cfd0c147f7338c84 Mon Sep 17 00:00:00 2001 From: JP Martin Date: Wed, 4 May 2016 15:43:04 -0700 Subject: [PATCH 1/4] Add ParallelCountBytes Made it and CountBytes compute an MD5, so I could check that they match (they do). --- gcloud-java-examples/pom.xml | 4 + .../google/cloud/examples/nio/CountBytes.java | 6 + .../examples/nio/ParallelCountBytes.java | 174 ++++++++++++++++++ 3 files changed, 184 insertions(+) create mode 100644 gcloud-java-examples/src/main/java/com/google/cloud/examples/nio/ParallelCountBytes.java diff --git a/gcloud-java-examples/pom.xml b/gcloud-java-examples/pom.xml index 642f8fb0fff0..a0da75815fa7 100644 --- a/gcloud-java-examples/pom.xml +++ b/gcloud-java-examples/pom.xml @@ -67,6 +67,10 @@ com.google.cloud.examples.nio.CountBytes CountBytes + + com.google.cloud.examples.nio.ParallelCountBytes + ParallelCountBytes + com.google.cloud.examples.resourcemanager.ResourceManagerExample diff --git a/gcloud-java-examples/src/main/java/com/google/cloud/examples/nio/CountBytes.java b/gcloud-java-examples/src/main/java/com/google/cloud/examples/nio/CountBytes.java index 50ad77d72f24..184df901f714 100644 --- a/gcloud-java-examples/src/main/java/com/google/cloud/examples/nio/CountBytes.java +++ b/gcloud-java-examples/src/main/java/com/google/cloud/examples/nio/CountBytes.java @@ -18,6 +18,7 @@ import com.google.common.base.Stopwatch; +import javax.xml.bind.annotation.adapters.HexBinaryAdapter; import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; @@ -25,6 +26,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.security.MessageDigest; import java.util.concurrent.TimeUnit; /** @@ -72,8 +74,10 @@ private static void countFile(String fname) { SeekableByteChannel chan = Files.newByteChannel(path); long total = 0; int readCalls = 0; + MessageDigest md = MessageDigest.getInstance("MD5"); while (chan.read(buf) > 0) { readCalls++; + md.update(buf.array(), 0, buf.position()); total += buf.position(); buf.flip(); } @@ -81,6 +85,8 @@ private static void countFile(String fname) { long elapsed = sw.elapsed(TimeUnit.SECONDS); System.out.println("Read all " + total + " bytes in " + elapsed + "s. " + "(" + readCalls +" calls to chan.read)"); + String hex = (new HexBinaryAdapter()).marshal(md.digest()); + System.out.println("The MD5 is: 0x" + hex); if (total != size) { System.out.println("Wait, this doesn't match! We saw " + total + " bytes, " + "yet the file size is listed at " + size + " bytes."); diff --git a/gcloud-java-examples/src/main/java/com/google/cloud/examples/nio/ParallelCountBytes.java b/gcloud-java-examples/src/main/java/com/google/cloud/examples/nio/ParallelCountBytes.java new file mode 100644 index 000000000000..388af029e905 --- /dev/null +++ b/gcloud-java-examples/src/main/java/com/google/cloud/examples/nio/ParallelCountBytes.java @@ -0,0 +1,174 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.examples.nio; + +import com.google.common.base.Stopwatch; + +import javax.xml.bind.annotation.adapters.HexBinaryAdapter; +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.channels.SeekableByteChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.MessageDigest; +import java.util.concurrent.TimeUnit; + +/** + * ParallelCountBytes will read through the whole file given as input. + * + *

This example shows how to go through all the contents of a file, + * in order, using multithreaded NIO reads.It also reports how long it took. + * + *

See the README for compilation instructions. Run this code with + * {@code target/appassembler/bin/ParallelCountBytes } + */ +public class ParallelCountBytes { + + private class BufWithLock { + public Object lock; + public ByteBuffer buf; + public boolean full; + public Thread t; + + public BufWithLock(int size) { + this.buf = ByteBuffer.allocate(size); + this.lock = new Object(); + } + } + + /** + * See the class documentation. + */ + public static void main(String[] args) throws IOException { + new ParallelCountBytes().start(args); + } + + public void start(String[] args) throws IOException { + if (args.length == 0 || args[0].equals("--help")) { + help(); + return; + } + for (String a : args) { + countFile(a); + } + } + + private void stridedRead(SeekableByteChannel chan, int blockSize, int firstBlock, int stride, BufWithLock output) { + try { + // stagger the threads a little bit. + Thread.sleep(250 * firstBlock); + long pos = firstBlock * blockSize; + synchronized(output.lock) { + while (true) { + if (pos > chan.size()) { + break; + } + chan.position(pos); + // read until buffer is full, or EOF + while (chan.read(output.buf) > 0) {}; + output.full = true; + output.lock.notifyAll(); + if (output.buf.hasRemaining()) { + break; + } + // wait for main thread to process it + while (output.full) { + output.lock.wait(); + } + output.buf.flip(); + pos += stride * blockSize; + } + } + } catch (InterruptedException | IOException o) { + // this simple example doesn't handle errors, sorry. + } + } + + /** + * Print the length of the indicated file. + * + *

This uses the normal Java NIO Api, so it can take advantage of any installed + * NIO Filesystem provider without any extra effort. + */ + private void countFile(String fname) throws IOException{ + // large buffers pay off + final int bufSize = 50 * 1024 * 1024; + try { + Path path = Paths.get(new URI(fname)); + long size = Files.size(path); + System.out.println(fname + ": " + size + " bytes."); + ByteBuffer buf = ByteBuffer.allocate(bufSize); + int nBlocks = (int)Math.ceil( size / (double)bufSize); + int nThreads = nBlocks; + if (nThreads > 4) nThreads = 4; + System.out.println("Reading the whole file using " + nThreads + " threads..."); + Stopwatch sw = Stopwatch.createStarted(); + final BufWithLock[] bufs = new BufWithLock[nThreads]; + for (int i = 0; i < nThreads; i++) { + bufs[i] = new BufWithLock(bufSize); + final SeekableByteChannel chan = Files.newByteChannel(path); + final int finalNThreads = nThreads; + final int finalI = i; + bufs[i].t = new Thread(new Runnable() { + @Override + public void run() { + stridedRead(chan, bufSize, finalI, finalNThreads, bufs[finalI]); + } + }); + bufs[i].t.start(); + } + + long total = 0; + MessageDigest md = MessageDigest.getInstance("MD5"); + for (int block = 0; block < nBlocks; block++) { + BufWithLock bwl = bufs[block % bufs.length]; + synchronized (bwl.lock) { + while (!bwl.full) { + bwl.lock.wait(); + } + md.update(bwl.buf.array(), 0, bwl.buf.position()); + total += bwl.buf.position(); + bwl.full = false; + bwl.lock.notifyAll(); + } + } + + long elapsed = sw.elapsed(TimeUnit.SECONDS); + System.out.println("Read all " + total + " bytes in " + elapsed + "s. "); + String hex = (new HexBinaryAdapter()).marshal(md.digest()); + System.out.println("The MD5 is: 0x" + hex); + if (total != size) { + System.out.println("Wait, this doesn't match! We saw " + total + " bytes, " + + "yet the file size is listed at " + size + " bytes."); + } + } catch (Exception ex) { + System.out.println(fname + ": " + ex.toString()); + } + } + + private static void help() { + String[] help = + {"The argument is a ", + "and we show the length of that file." + }; + for (String s : help) { + System.out.println(s); + } + } +} From 966679ab22cf8d6d2e292ce5be146a92d3d9f490 Mon Sep 17 00:00:00 2001 From: JP Martin Date: Thu, 5 May 2016 11:18:18 -0700 Subject: [PATCH 2/4] Refactor to use ExecutorService and BaseEncoding --- .../google/cloud/examples/nio/CountBytes.java | 6 +- .../examples/nio/ParallelCountBytes.java | 126 ++++++++---------- 2 files changed, 59 insertions(+), 73 deletions(-) diff --git a/gcloud-java-examples/src/main/java/com/google/cloud/examples/nio/CountBytes.java b/gcloud-java-examples/src/main/java/com/google/cloud/examples/nio/CountBytes.java index 184df901f714..a3f9779a0790 100644 --- a/gcloud-java-examples/src/main/java/com/google/cloud/examples/nio/CountBytes.java +++ b/gcloud-java-examples/src/main/java/com/google/cloud/examples/nio/CountBytes.java @@ -17,8 +17,8 @@ package com.google.cloud.examples.nio; import com.google.common.base.Stopwatch; +import com.google.common.io.BaseEncoding; -import javax.xml.bind.annotation.adapters.HexBinaryAdapter; import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; @@ -35,7 +35,7 @@ *

This example shows how to read a file size using NIO. * File.size returns the size of the file as saved in Storage metadata. * This class also shows how to read all of the file's contents using NIO, - * and reports how long it took. + * computes a MD5 hash, and reports how long it took. * *

See the README for compilation instructions. Run this code with * {@code target/appassembler/bin/CountBytes } @@ -85,7 +85,7 @@ private static void countFile(String fname) { long elapsed = sw.elapsed(TimeUnit.SECONDS); System.out.println("Read all " + total + " bytes in " + elapsed + "s. " + "(" + readCalls +" calls to chan.read)"); - String hex = (new HexBinaryAdapter()).marshal(md.digest()); + String hex = String.valueOf(BaseEncoding.base16().encode(md.digest())); System.out.println("The MD5 is: 0x" + hex); if (total != size) { System.out.println("Wait, this doesn't match! We saw " + total + " bytes, " + diff --git a/gcloud-java-examples/src/main/java/com/google/cloud/examples/nio/ParallelCountBytes.java b/gcloud-java-examples/src/main/java/com/google/cloud/examples/nio/ParallelCountBytes.java index 388af029e905..825f60ad0ee8 100644 --- a/gcloud-java-examples/src/main/java/com/google/cloud/examples/nio/ParallelCountBytes.java +++ b/gcloud-java-examples/src/main/java/com/google/cloud/examples/nio/ParallelCountBytes.java @@ -17,8 +17,8 @@ package com.google.cloud.examples.nio; import com.google.common.base.Stopwatch; +import com.google.common.io.BaseEncoding; -import javax.xml.bind.annotation.adapters.HexBinaryAdapter; import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; @@ -27,28 +27,58 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.security.MessageDigest; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** * ParallelCountBytes will read through the whole file given as input. * *

This example shows how to go through all the contents of a file, - * in order, using multithreaded NIO reads.It also reports how long it took. + * in order, using multithreaded NIO reads. + * It prints a MD5 hash and reports how long it took. * *

See the README for compilation instructions. Run this code with * {@code target/appassembler/bin/ParallelCountBytes } */ public class ParallelCountBytes { - private class BufWithLock { - public Object lock; - public ByteBuffer buf; - public boolean full; - public Thread t; + /** + * WorkUnit holds a buffer and the instructions for what to put in it. + */ + private class WorkUnit implements Callable { + public final ByteBuffer buf; + final SeekableByteChannel chan; + final int blockSize; + int blockIndex; - public BufWithLock(int size) { - this.buf = ByteBuffer.allocate(size); - this.lock = new Object(); + public WorkUnit(SeekableByteChannel chan, int blockSize, int blockIndex) { + this.chan = chan; + this.buf = ByteBuffer.allocate(blockSize); + this.blockSize = blockSize; + this.blockIndex = blockIndex; + } + + @Override + public WorkUnit call() throws IOException { + int pos = blockSize * blockIndex; + if (pos > chan.size()) { + return this; + } + chan.position(pos); + // read until buffer is full, or EOF + while (chan.read(buf) > 0) {}; + return this; + } + + public WorkUnit resetForIndex(int blockIndex) { + this.blockIndex = blockIndex; + buf.flip(); + return this; } } @@ -69,37 +99,6 @@ public void start(String[] args) throws IOException { } } - private void stridedRead(SeekableByteChannel chan, int blockSize, int firstBlock, int stride, BufWithLock output) { - try { - // stagger the threads a little bit. - Thread.sleep(250 * firstBlock); - long pos = firstBlock * blockSize; - synchronized(output.lock) { - while (true) { - if (pos > chan.size()) { - break; - } - chan.position(pos); - // read until buffer is full, or EOF - while (chan.read(output.buf) > 0) {}; - output.full = true; - output.lock.notifyAll(); - if (output.buf.hasRemaining()) { - break; - } - // wait for main thread to process it - while (output.full) { - output.lock.wait(); - } - output.buf.flip(); - pos += stride * blockSize; - } - } - } catch (InterruptedException | IOException o) { - // this simple example doesn't handle errors, sorry. - } - } - /** * Print the length of the indicated file. * @@ -109,49 +108,36 @@ private void stridedRead(SeekableByteChannel chan, int blockSize, int firstBlock private void countFile(String fname) throws IOException{ // large buffers pay off final int bufSize = 50 * 1024 * 1024; + Queue> work = new ArrayDeque<>(); try { Path path = Paths.get(new URI(fname)); long size = Files.size(path); System.out.println(fname + ": " + size + " bytes."); - ByteBuffer buf = ByteBuffer.allocate(bufSize); - int nBlocks = (int)Math.ceil( size / (double)bufSize); - int nThreads = nBlocks; + int nThreads = (int) Math.ceil(size / (double) bufSize); if (nThreads > 4) nThreads = 4; System.out.println("Reading the whole file using " + nThreads + " threads..."); Stopwatch sw = Stopwatch.createStarted(); - final BufWithLock[] bufs = new BufWithLock[nThreads]; - for (int i = 0; i < nThreads; i++) { - bufs[i] = new BufWithLock(bufSize); - final SeekableByteChannel chan = Files.newByteChannel(path); - final int finalNThreads = nThreads; - final int finalI = i; - bufs[i].t = new Thread(new Runnable() { - @Override - public void run() { - stridedRead(chan, bufSize, finalI, finalNThreads, bufs[finalI]); - } - }); - bufs[i].t.start(); - } - long total = 0; MessageDigest md = MessageDigest.getInstance("MD5"); - for (int block = 0; block < nBlocks; block++) { - BufWithLock bwl = bufs[block % bufs.length]; - synchronized (bwl.lock) { - while (!bwl.full) { - bwl.lock.wait(); - } - md.update(bwl.buf.array(), 0, bwl.buf.position()); - total += bwl.buf.position(); - bwl.full = false; - bwl.lock.notifyAll(); + + ExecutorService exec = Executors.newFixedThreadPool(nThreads); + int blockIndex; + for (blockIndex = 0; blockIndex < nThreads; blockIndex++) { + work.add(exec.submit(new WorkUnit(Files.newByteChannel(path), bufSize, blockIndex))); + } + while (true) { + WorkUnit full = work.remove().get(); + md.update(full.buf.array(), 0, full.buf.position()); + total += full.buf.position(); + if (full.buf.hasRemaining()) { + break; } + work.add(exec.submit(full.resetForIndex(blockIndex++))); } long elapsed = sw.elapsed(TimeUnit.SECONDS); System.out.println("Read all " + total + " bytes in " + elapsed + "s. "); - String hex = (new HexBinaryAdapter()).marshal(md.digest()); + String hex = String.valueOf(BaseEncoding.base16().encode(md.digest())); System.out.println("The MD5 is: 0x" + hex); if (total != size) { System.out.println("Wait, this doesn't match! We saw " + total + " bytes, " + From 03dab3ef727effa69b270eb837b245aba7af7f14 Mon Sep 17 00:00:00 2001 From: JP Martin Date: Thu, 5 May 2016 14:13:35 -0700 Subject: [PATCH 3/4] Add shutdown --- .../java/com/google/cloud/examples/nio/ParallelCountBytes.java | 1 + 1 file changed, 1 insertion(+) diff --git a/gcloud-java-examples/src/main/java/com/google/cloud/examples/nio/ParallelCountBytes.java b/gcloud-java-examples/src/main/java/com/google/cloud/examples/nio/ParallelCountBytes.java index 825f60ad0ee8..67620880bb6d 100644 --- a/gcloud-java-examples/src/main/java/com/google/cloud/examples/nio/ParallelCountBytes.java +++ b/gcloud-java-examples/src/main/java/com/google/cloud/examples/nio/ParallelCountBytes.java @@ -134,6 +134,7 @@ private void countFile(String fname) throws IOException{ } work.add(exec.submit(full.resetForIndex(blockIndex++))); } + exec.shutdown(); long elapsed = sw.elapsed(TimeUnit.SECONDS); System.out.println("Read all " + total + " bytes in " + elapsed + "s. "); From c1b639757167cd76854da0de0d46be515b53e256 Mon Sep 17 00:00:00 2001 From: JP Martin Date: Fri, 6 May 2016 10:01:49 -0700 Subject: [PATCH 4/4] Close channels Plus a little bit of cleanup. --- .../examples/nio/ParallelCountBytes.java | 42 +++++++++++-------- 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/gcloud-java-examples/src/main/java/com/google/cloud/examples/nio/ParallelCountBytes.java b/gcloud-java-examples/src/main/java/com/google/cloud/examples/nio/ParallelCountBytes.java index 67620880bb6d..f63d5d1cba6d 100644 --- a/gcloud-java-examples/src/main/java/com/google/cloud/examples/nio/ParallelCountBytes.java +++ b/gcloud-java-examples/src/main/java/com/google/cloud/examples/nio/ParallelCountBytes.java @@ -19,6 +19,7 @@ import com.google.common.base.Stopwatch; import com.google.common.io.BaseEncoding; +import java.io.Closeable; import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; @@ -49,8 +50,16 @@ public class ParallelCountBytes { /** * WorkUnit holds a buffer and the instructions for what to put in it. + * + *

Use it like this: + *

    + *
  1. call() + *
  2. the data is now in buf, you can access it directly + *
  3. if need more, call resetForIndex(...) and go back to the top. + *
  4. else, call close() + *
*/ - private class WorkUnit implements Callable { + private static class WorkUnit implements Callable, Closeable { public final ByteBuffer buf; final SeekableByteChannel chan; final int blockSize; @@ -70,7 +79,7 @@ public WorkUnit call() throws IOException { return this; } chan.position(pos); - // read until buffer is full, or EOF + // read until buffer it is full, or EOF while (chan.read(buf) > 0) {}; return this; } @@ -80,16 +89,16 @@ public WorkUnit resetForIndex(int blockIndex) { buf.flip(); return this; } + + public void close() throws IOException { + chan.close(); + } } /** * See the class documentation. */ - public static void main(String[] args) throws IOException { - new ParallelCountBytes().start(args); - } - - public void start(String[] args) throws IOException { + public static void main(String[] args) throws Exception { if (args.length == 0 || args[0].equals("--help")) { help(); return; @@ -100,16 +109,15 @@ public void start(String[] args) throws IOException { } /** - * Print the length of the indicated file. + * Print the length and MD5 of the indicated file. * *

This uses the normal Java NIO Api, so it can take advantage of any installed * NIO Filesystem provider without any extra effort. */ - private void countFile(String fname) throws IOException{ + private static void countFile(String fname) throws Exception { // large buffers pay off final int bufSize = 50 * 1024 * 1024; Queue> work = new ArrayDeque<>(); - try { Path path = Paths.get(new URI(fname)); long size = Files.size(path); System.out.println(fname + ": " + size + " bytes."); @@ -125,14 +133,15 @@ private void countFile(String fname) throws IOException{ for (blockIndex = 0; blockIndex < nThreads; blockIndex++) { work.add(exec.submit(new WorkUnit(Files.newByteChannel(path), bufSize, blockIndex))); } - while (true) { + while (!work.isEmpty()) { WorkUnit full = work.remove().get(); md.update(full.buf.array(), 0, full.buf.position()); total += full.buf.position(); if (full.buf.hasRemaining()) { - break; + full.close(); + } else { + work.add(exec.submit(full.resetForIndex(blockIndex++))); } - work.add(exec.submit(full.resetForIndex(blockIndex++))); } exec.shutdown(); @@ -141,12 +150,9 @@ private void countFile(String fname) throws IOException{ String hex = String.valueOf(BaseEncoding.base16().encode(md.digest())); System.out.println("The MD5 is: 0x" + hex); if (total != size) { - System.out.println("Wait, this doesn't match! We saw " + total + " bytes, " + - "yet the file size is listed at " + size + " bytes."); + System.out.println("Wait, this doesn't match! We saw " + total + " bytes, " + + "yet the file size is listed at " + size + " bytes."); } - } catch (Exception ex) { - System.out.println(fname + ": " + ex.toString()); - } } private static void help() {