-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ALS-4461] Allow incremental vcf loading #73
Changes from 37 commits
6287251
3537737
620a5af
c0ad4a4
5afa1fe
dbf3a2f
01efbf0
68e849e
a141f92
160bc87
ee6ee2f
df366d0
5e3a93e
7433fde
f508062
8da85d0
295f52e
36904a8
9e22c10
747b0a1
c7afde5
5e56f09
a2749e7
7de09ff
15fe3bf
52cc4a0
88ea1c2
df03002
b0d13ea
28b2672
9bc9e21
b22cc1b
15c9f00
0a5fde5
abb2659
93c5a17
3bb4fe0
c2a0f38
ed9a5d4
f6ea975
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
@@ -1,44 +1,52 @@ | ||||||||||
package edu.harvard.hms.dbmi.avillach.hpds.storage; | ||||||||||
|
||||||||||
import java.io.ByteArrayInputStream; | ||||||||||
import java.io.File; | ||||||||||
import java.io.FileNotFoundException; | ||||||||||
import java.io.IOException; | ||||||||||
import java.io.ObjectInputStream; | ||||||||||
import java.io.ObjectOutputStream; | ||||||||||
import java.io.RandomAccessFile; | ||||||||||
import java.io.Serializable; | ||||||||||
import java.io.*; | ||||||||||
import java.util.Set; | ||||||||||
import java.util.concurrent.ConcurrentHashMap; | ||||||||||
import java.util.function.Function; | ||||||||||
import java.util.zip.GZIPInputStream; | ||||||||||
import java.util.zip.GZIPOutputStream; | ||||||||||
|
||||||||||
import org.apache.commons.io.output.ByteArrayOutputStream; | ||||||||||
|
||||||||||
public class FileBackedByteIndexedStorage <K, V extends Serializable> implements Serializable { | ||||||||||
public abstract class FileBackedByteIndexedStorage <K, V extends Serializable> implements Serializable { | ||||||||||
private static final long serialVersionUID = -7297090745384302635L; | ||||||||||
private transient RandomAccessFile storage; | ||||||||||
private ConcurrentHashMap<K, Long[]> index; | ||||||||||
private File storageFile; | ||||||||||
private boolean completed = false; | ||||||||||
private Long maxStorageSize; //leave this in to not break serialization | ||||||||||
protected transient RandomAccessFile storage; | ||||||||||
protected ConcurrentHashMap<K, Long[]> index; | ||||||||||
protected File storageFile; | ||||||||||
protected boolean completed = false; | ||||||||||
|
||||||||||
|
||||||||||
public FileBackedByteIndexedStorage(Class<K> keyClass, Class<V> valueClass, File storageFile) throws FileNotFoundException { | ||||||||||
this.index = new ConcurrentHashMap<K, Long[]>(); | ||||||||||
this.storageFile = storageFile; | ||||||||||
this.storage = new RandomAccessFile(this.storageFile, "rw"); | ||||||||||
} | ||||||||||
|
||||||||||
public void updateStorageDirectory(File storageDirectory) { | ||||||||||
if (!storageDirectory.isDirectory()) { | ||||||||||
throw new IllegalArgumentException("storageDirectory is not a directory"); | ||||||||||
} | ||||||||||
String currentStoreageFilename = storageFile.getName(); | ||||||||||
storageFile = new File(storageDirectory, currentStoreageFilename); | ||||||||||
} | ||||||||||
|
||||||||||
public Set<K> keys(){ | ||||||||||
return index.keySet(); | ||||||||||
} | ||||||||||
|
||||||||||
public void put(K key, V value) throws IOException { | ||||||||||
public void put(K key, V value) { | ||||||||||
if(completed) { | ||||||||||
throw new RuntimeException("A completed FileBackedByteIndexedStorage cannot be modified."); | ||||||||||
} | ||||||||||
Long[] recordIndex = store(value); | ||||||||||
Long[] recordIndex; | ||||||||||
try (ByteArrayOutputStream out = writeObject(value)) { | ||||||||||
recordIndex = new Long[2]; | ||||||||||
synchronized (storage) { | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can get some really difficult to debug concurrency problems here if a thread calls There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Really, shouldn't the storage file name be immutable within the lifetime of this object? That would address my locking concerns. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree. This code was just moved from somewhere else. I did not introduce it and am very hesitant to actually change it. I will think about this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did misunderstand your second comment originally -- the reason The creating, saving, loading, and actual usage of this class by HPDS is somewhat jumbled and unsafe right now, I agree. |
||||||||||
storage.seek(storage.length()); | ||||||||||
recordIndex[0] = storage.getFilePointer(); | ||||||||||
storage.write(out.toByteArray()); | ||||||||||
recordIndex[1] = storage.getFilePointer() - recordIndex[0]; | ||||||||||
} | ||||||||||
} catch (IOException e) { | ||||||||||
throw new UncheckedIOException(e); | ||||||||||
} | ||||||||||
index.put(key, recordIndex); | ||||||||||
} | ||||||||||
|
||||||||||
|
@@ -63,60 +71,43 @@ public void complete() { | |||||||||
this.completed = true; | ||||||||||
} | ||||||||||
|
||||||||||
public boolean isComplete() { | ||||||||||
return this.completed; | ||||||||||
} | ||||||||||
|
||||||||||
private Long[] store(V value) throws IOException { | ||||||||||
|
||||||||||
ByteArrayOutputStream out = new ByteArrayOutputStream(); | ||||||||||
ObjectOutputStream oos = new ObjectOutputStream(new GZIPOutputStream(out)); | ||||||||||
oos.writeObject(value); | ||||||||||
oos.flush(); | ||||||||||
oos.close(); | ||||||||||
|
||||||||||
Long[] recordIndex = new Long[2]; | ||||||||||
synchronized(storage) { | ||||||||||
storage.seek(storage.length()); | ||||||||||
recordIndex[0] = storage.getFilePointer(); | ||||||||||
storage.write(out.toByteArray()); | ||||||||||
recordIndex[1] = storage.getFilePointer() - recordIndex[0]; | ||||||||||
// maxStorageSize = storage.getFilePointer(); | ||||||||||
} | ||||||||||
return recordIndex; | ||||||||||
} | ||||||||||
|
||||||||||
public V get(K key) throws IOException { | ||||||||||
if(this.storage==null) { | ||||||||||
synchronized(this) { | ||||||||||
this.open(); | ||||||||||
} | ||||||||||
} | ||||||||||
Long[] offsetsInStorage = index.get(key); | ||||||||||
if(offsetsInStorage != null) { | ||||||||||
Long offsetInStorage = index.get(key)[0]; | ||||||||||
int offsetLength = index.get(key)[1].intValue(); | ||||||||||
if(offsetInStorage != null && offsetLength>0) { | ||||||||||
byte[] buffer = new byte[offsetLength]; | ||||||||||
synchronized(storage) { | ||||||||||
storage.seek(offsetInStorage); | ||||||||||
storage.readFully(buffer); | ||||||||||
public V get(K key) { | ||||||||||
try { | ||||||||||
if(this.storage==null) { | ||||||||||
synchronized(this) { | ||||||||||
this.open(); | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know you didn't write this, but I'm also worried that this is not as safe as it seems.
99% sure the synchronized block has to include the null check There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with this. I made the change and added a todo comment to make this class immutable and remove the need for this check |
||||||||||
} | ||||||||||
ObjectInputStream in = new ObjectInputStream(new GZIPInputStream(new ByteArrayInputStream(buffer))); | ||||||||||
|
||||||||||
try { | ||||||||||
V readObject = (V) in.readObject(); | ||||||||||
} | ||||||||||
Long[] offsetsInStorage = index.get(key); | ||||||||||
if(offsetsInStorage != null) { | ||||||||||
Long offsetInStorage = index.get(key)[0]; | ||||||||||
int offsetLength = index.get(key)[1].intValue(); | ||||||||||
Comment on lines
+84
to
+85
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
if(offsetInStorage != null && offsetLength>0) { | ||||||||||
byte[] buffer = new byte[offsetLength]; | ||||||||||
synchronized(storage) { | ||||||||||
storage.seek(offsetInStorage); | ||||||||||
storage.readFully(buffer); | ||||||||||
} | ||||||||||
V readObject = readObject(buffer); | ||||||||||
return readObject; | ||||||||||
} catch (ClassNotFoundException e) { | ||||||||||
throw new RuntimeException("This should never happen."); | ||||||||||
} finally { | ||||||||||
in.close(); | ||||||||||
}else { | ||||||||||
return null; | ||||||||||
} | ||||||||||
}else { | ||||||||||
} else { | ||||||||||
return null; | ||||||||||
} | ||||||||||
} else { | ||||||||||
return null; | ||||||||||
} catch (IOException e) { | ||||||||||
throw new UncheckedIOException(e); | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
protected abstract V readObject(byte[] buffer); | ||||||||||
|
||||||||||
protected abstract ByteArrayOutputStream writeObject(V value) throws IOException; | ||||||||||
|
||||||||||
public V getOrELse(K key, V defaultValue) { | ||||||||||
V result = get(key); | ||||||||||
return result == null ? defaultValue : result; | ||||||||||
} | ||||||||||
|
||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
package edu.harvard.hms.dbmi.avillach.hpds.storage; | ||
|
||
import java.io.*; | ||
import java.util.zip.GZIPInputStream; | ||
import java.util.zip.GZIPOutputStream; | ||
|
||
public class FileBackedJavaIndexedStorage <K, V extends Serializable> extends FileBackedByteIndexedStorage<K, V> { | ||
public FileBackedJavaIndexedStorage(Class<K> keyClass, Class<V> valueClass, File storageFile) throws FileNotFoundException { | ||
super(keyClass, valueClass, storageFile); | ||
} | ||
|
||
protected ByteArrayOutputStream writeObject(V value) throws IOException { | ||
ByteArrayOutputStream out = new ByteArrayOutputStream(); | ||
ObjectOutputStream oos = new ObjectOutputStream(new GZIPOutputStream(out)); | ||
oos.writeObject(value); | ||
oos.flush(); | ||
oos.close(); | ||
return out; | ||
} | ||
|
||
@Override | ||
protected V readObject(byte[] buffer) { | ||
try (ObjectInputStream in = new ObjectInputStream(new GZIPInputStream(new ByteArrayInputStream(buffer)));) { | ||
V readObject = (V) in.readObject(); | ||
return readObject; | ||
} catch (IOException e) { | ||
throw new UncheckedIOException(e); | ||
} catch (ClassNotFoundException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
package edu.harvard.hms.dbmi.avillach.hpds.storage; | ||
|
||
import org.codehaus.jackson.map.ObjectMapper; | ||
import org.codehaus.jackson.type.TypeReference; | ||
|
||
import java.io.*; | ||
import java.util.zip.GZIPInputStream; | ||
import java.util.zip.GZIPOutputStream; | ||
|
||
public abstract class FileBackedJsonIndexStorage <K, V extends Serializable> extends FileBackedByteIndexedStorage<K, V> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're starting to create a pretty involved inheritance hierarchy. In my experience, these get difficult to read. We aren't in Java 17 yet, so you don't have sealed classes, which would help a lot. That said, you could approximate the concept of contained (bounded?) inheritance by putting your two implementing classes in this file. Example: https://gist.github.com/Luke-Sikina/70d3fc83f34610623ea052d0ef04b5d8 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh I see. The implementing classes are in another package? Oof There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My most changes to this have actually completely decoupled reading/writing from the rest of the logic in this class so I think it would be pretty easy to get rid of the inheritance and introduce a dependency on an "objectMapper". Maybe... |
||
private static final long serialVersionUID = -1086729119489479152L; | ||
|
||
protected transient ObjectMapper objectMapper = new ObjectMapper(); | ||
|
||
public FileBackedJsonIndexStorage(File storageFile) throws FileNotFoundException { | ||
super(null, null, storageFile); | ||
} | ||
|
||
protected ByteArrayOutputStream writeObject(V value) throws IOException { | ||
ByteArrayOutputStream out = new ByteArrayOutputStream(); | ||
objectMapper.writeValue(new GZIPOutputStream(out), value); | ||
return out; | ||
} | ||
|
||
protected V readObject(byte[] buffer) { | ||
try { | ||
return objectMapper.readValue(new GZIPInputStream(new ByteArrayInputStream(buffer)), getTypeReference()); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
// Required to populate the objectMapper on deserialization | ||
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { | ||
in.defaultReadObject(); | ||
objectMapper = new ObjectMapper(); | ||
} | ||
|
||
public abstract TypeReference<V> getTypeReference(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It wouldn't be a ton of work to make this implement
Map<K, V>
; As is, you're approximating a lot of methods from that interface while missing small details that make this code hard to reuse. You could just crib from java'sUnmodifiableMap