Skip to content

Commit

Permalink
perf: add read(b,o,l) to BlobInputStream (pgjdbc#2376)
Browse files Browse the repository at this point in the history
* perf: add read(b,o,l) to BlobInputStream

* Modernize tests
  • Loading branch information
davecramer authored Jan 3, 2022
1 parent 6203cbe commit 8825a38
Show file tree
Hide file tree
Showing 4 changed files with 500 additions and 318 deletions.
101 changes: 78 additions & 23 deletions pgjdbc/src/main/java/org/postgresql/largeobject/BlobInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class BlobInputStream extends InputStream {
/**
* The absolute position.
*/
private long apos;
private long absolutePosition;

/**
* Buffer used to improve performance.
Expand All @@ -33,17 +33,17 @@ public class BlobInputStream extends InputStream {
/**
* Position within buffer.
*/
private int bpos;
private int bufferPosition;

/**
* The buffer size.
*/
private int bsize;
private final int bufferSize;

/**
* The mark position.
*/
private long mpos = 0;
private long markPosition = 0;

/**
* The limit.
Expand Down Expand Up @@ -74,9 +74,9 @@ public BlobInputStream(LargeObject lo, int bsize) {
public BlobInputStream(LargeObject lo, int bsize, long limit) {
this.lo = lo;
buffer = null;
bpos = 0;
apos = 0;
this.bsize = bsize;
bufferPosition = 0;
absolutePosition = 0;
this.bufferSize = bsize;
this.limit = limit;
}

Expand All @@ -86,33 +86,88 @@ public BlobInputStream(LargeObject lo, int bsize, long limit) {
public int read() throws java.io.IOException {
LargeObject lo = getLo();
try {
if (limit > 0 && apos >= limit) {
if (limit > 0 && absolutePosition >= limit) {
return -1;
}
if (buffer == null || bpos >= buffer.length) {
buffer = lo.read(bsize);
bpos = 0;
// read more in if necessary
if (buffer == null || bufferPosition >= buffer.length) {
buffer = lo.read(bufferSize);
bufferPosition = 0;
}

// Handle EOF
if (buffer == null || bpos >= buffer.length) {
if ( buffer == null || bufferPosition >= buffer.length) {
return -1;
}

int ret = (buffer[bpos] & 0x7F);
if ((buffer[bpos] & 0x80) == 0x80) {
ret |= 0x80;
}
int ret = (buffer[bufferPosition] & 0xFF);

bpos++;
apos++;
bufferPosition++;
absolutePosition++;

return ret;
} catch (SQLException se) {
throw new IOException(se.toString());
}
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
int bytesCopied = 0;
LargeObject lo = getLo();

/* check to make sure we aren't at the limit
* funny to test for 0, but I guess someone could create a blob
* with a limit of zero
*/
if ( limit >= 0 && absolutePosition >= limit ) {
return -1;
}

/* check to make sure we are not going to read past the limit */
if ( limit >= 0 && len > limit - absolutePosition ) {
len = (int)(limit - absolutePosition);
}

try {
// have we read anything into the buffer
if ( buffer != null ) {
// now figure out how much data is in the buffer
int bytesInBuffer = buffer.length - bufferPosition;
// figure out how many bytes the user wants
int bytesToCopy = len > bytesInBuffer ? bytesInBuffer : len;
// copy them in
System.arraycopy(buffer, bufferPosition, b, off, bytesToCopy);
// move the buffer position
bufferPosition += bytesToCopy;
// position in the blob
absolutePosition += bytesToCopy;
// increment offset
off += bytesToCopy;
// decrement the length
len -= bytesToCopy;
bytesCopied = bytesToCopy;
}

if (len > 0 ) {
bytesCopied += lo.read(b, off, len);
buffer = null;
bufferPosition = 0;
absolutePosition += bytesCopied;
/*
if there is a limit on the size of the blob then we could have read to the limit
so bytesCopied will be non-zero but we will have read nothing
*/
if ( bytesCopied == 0 && (buffer == null) ) {
return -1;
}
}
} catch (SQLException ex ) {
throw new IOException(ex.getCause());
}
return bytesCopied;
}

/**
* <p>Closes this input stream and releases any system resources associated with the stream.</p>
*
Expand Down Expand Up @@ -153,7 +208,7 @@ public void close() throws IOException {
* @see java.io.InputStream#reset()
*/
public synchronized void mark(int readlimit) {
mpos = apos;
markPosition = absolutePosition;
}

/**
Expand All @@ -166,13 +221,13 @@ public synchronized void mark(int readlimit) {
public synchronized void reset() throws IOException {
LargeObject lo = getLo();
try {
if (mpos <= Integer.MAX_VALUE) {
lo.seek((int)mpos);
if (markPosition <= Integer.MAX_VALUE) {
lo.seek((int)markPosition);
} else {
lo.seek64(mpos, LargeObject.SEEK_SET);
lo.seek64(markPosition, LargeObject.SEEK_SET);
}
buffer = null;
apos = mpos;
absolutePosition = markPosition;
} catch (SQLException se) {
throw new IOException(se.toString());
}
Expand Down
14 changes: 14 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/largeobject/LargeObject.java
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,20 @@ public InputStream getInputStream(long limit) throws SQLException {
return new BlobInputStream(this, 4096, limit);
}

/**
* Returns an {@link InputStream} from this object, that will limit the amount of data that is
* visible.
* Added mostly for testing
*
* @param maxSize internal buffer size
* @param limit maximum number of bytes the resulting stream will serve
* @return {@link InputStream} from this object
* @throws SQLException if a database-access error occurs.
*/
public InputStream getInputStream(int maxSize, long limit) throws SQLException {
return new BlobInputStream(this, maxSize, limit);
}

/**
* <p>Returns an {@link OutputStream} to this object.</p>
*
Expand Down
Loading

0 comments on commit 8825a38

Please sign in to comment.