Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Making ExternalSpillableMap generic for any datatype #350

Merged
merged 1 commit into from
Mar 28, 2018

Conversation

n3nash
Copy link
Contributor

@n3nash n3nash commented Mar 15, 2018

- Introduced concept of converters to be able to serde generic datatype for SpillableMap
- Fixed/Added configs to Hoodie Configs
- Changed HoodieMergeHandle to start using SpillableMap

@@ -457,6 +468,11 @@ public Builder withFinalizeWriteParallelism(int parallelism) {
return this;
}

public Builder withMaxMemoryPerPartitionMerge(long maxMemoryPerPartitionMerge) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calculate this based on executor memory and memory fraction.

@@ -64,6 +64,9 @@
private static final String DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE = "false";
private static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism";
private static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = DEFAULT_PARALLELISM;
private static final String MAX_SIZE_IN_MEMORY_FOR_MERGE_IN_BYTES_PROP = "hoodie.merge.spill.threshold";
// Default max memory size during hash-merge, excess spills to disk
private static final String DEFAULT_MAX_SIZE_IN_MEMORY_FOR_MERGE_IN_BYTES = String.valueOf(1024*1024*1024L); //1GB
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it fraction instead of absolute value.

@@ -156,7 +159,8 @@ public R put(T key, R value) {
if (this.currentInMemoryMapSize < maxInMemorySizeInBytes || inMemoryMap.containsKey(key)) {
// Naive approach for now
if (estimatedPayloadSize == 0) {
this.estimatedPayloadSize = SpillableMapUtils.computePayloadSize(value, schema);
this.estimatedPayloadSize = SpillableMapUtils.computePayloadSize(key, this.keyConverter) +
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use SizeEstimator from spark.

@n3nash n3nash force-pushed the generic_spillable_map branch 10 times, most recently from e47bffe to 60a10db Compare March 16, 2018 20:53
@n3nash n3nash changed the title (WIP) Making ExternalSpillableMap generic for any datatype Making ExternalSpillableMap generic for any datatype Mar 16, 2018
@n3nash
Copy link
Contributor Author

n3nash commented Mar 16, 2018

@vinothchandar please take a pass at this when you get a chance.

@vinothchandar vinothchandar self-requested a review March 19, 2018 03:15
Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to review tests & API usage more in detail. Will do in next pass.

*/
final public class DiskBasedMap<T,R> implements Map<T,R> {
final public class DiskBasedMap<T, R> implements Map<T, R> {

// Stores the key and corresponding value's latest metadata spilled to disk
final private Map<T, ValueMetadata> inMemoryMetadataOfSpilledData;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to simply valueMetadataMap ? Spilling vs in-memory is only at the ExternalSpillableMap level, correct? This whole class is just disk-based i.e spilling

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* writes.
* @param <T>
* @param <R>
* An external map that spills content to disk when there is insufficient space for it to grow. <p>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this all resulting from auto-formatting on save?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's resulting from the fact that earlier checkins were not following google style, now every check in does.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay.. the checkstyle stuff will hopefully fix this for good.

currentInMemoryMapSize += this.estimatedPayloadSize;
}
inMemoryMap.put(key, value);
} else {
diskBasedMap.put(key, value);
}
return value;
} catch(IOException io) {
throw new HoodieIOException("Unable to estimate size of payload", io);
} catch (Exception io) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exception e

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed exception

throw new UncheckedIOException(io);
}
}
}

@Test
public void testSizeEstimator() throws IOException, URISyntaxException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have you done a micro benchmark for the sizeEstimator to see if it won't become a bottleneck, when called in a loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not called in a loop, it's just called once.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay sg.

@@ -23,7 +23,9 @@ summary: "Here we list all possible configurations and what they mean"
<span style="color:grey">Should HoodieWriteClient autoCommit after insert and upsert. The client can choose to turn off auto-commit and commit on a "defined success condition"</span>
- [withAssumeDatePartitioning](#withAssumeDatePartitioning) (assumeDatePartitioning = false) <br/>
<span style="color:grey">Should HoodieWriteClient assume the data is partitioned by dates, i.e three levels from base path. This is a stop-gap to support tables created by versions < 0.3.1. Will be removed eventually </span>

- [withMaxMemoryFractionPerPartitionMerge](#withMaxMemoryFractionPerPartitionMerge) (maxMemoryFractionPerPartitionMerge = 0.6) <br/>
<span style="color:grey">This fraction is multiplied with the spark.memory.fraction to get a final fraction of heap space to use during merge </span>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also document how this interplays with spark.memory.storageFraction i.e this + that <= 1.0, otherwise things will OOM

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this is just defining what fraction of spark.memory.fraction is used for in-memory map in MergeHandle. So if the client uses sparkconfigs incorrectly, spark itself will cause issues before it hits the mergehandle code..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think its worth documenting..
http://spark.apache.org/docs/2.1.1/tuning.html#memory-management-overview

Usually, spark.memory.fraction - spark.memory.storageFraction is left for internal datastructures of spark. Heap - spark.memory.fraction is left for user data structures - in this case hoodie externalspillablemap..

I dont actually follow

  • Why we multiply this fraction against spark fraction instead of total heap space. By definition, we cannot dip into the spark fraction
  • How can the spark app be stable is spark fraction is 0.6 and this also takes 0.6 * 0.6 of heap, which will bring total usage to 0.6 + 0.36 = 0.96 or 96% full heap, which will keep gcing back to back..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The definitions in this link : https://spark.apache.org/docs/latest/configuration.html and the one you pasted (which is the description link in the above link) are a little misleading. I think there was an oversight on my part for choosing to multiple with spark.memory.fraction. Instead, I should multiply with 1 - spark.memory.fraction. It's more clear here : https://0x0fff.com/spark-memory-management/.
I don't want to rely on heap space to calculate the spillablemap size since the heap is actually not the real user memory left by spark memory model.
So my thought process is, executor.memory = heap. So as you increase executor.memory you increase the heap. We want our spillablemap memory size to grow accordingly, hence user.available.memory = executor.memory * (1 - spark.memory.fraction)
spillable.available.memory = user.available.memory*merge.memory.fraction.

@vinothchandar

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spillable.available.memory = user.available.memory*merge.memory.fraction sure, this is what I was getting at in the comments above. This should be okay.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@n3nash lets reflect this discussion in the docs above? Multiplying by spark fraction is definitely misleading

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


@Override
public byte[] getBytes(String s) {
return s.getBytes();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to pay attention to the charset for consistency.. please have it use utf-8 explcitly..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, done.

/**
* A default converter implementation for HoodieRecord
*/
public class DefaultValueConverter<V> implements
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to HoodieRecordConvertor ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Field[] fields = clazz.getDeclaredFields();
Optional<Field> fieldWithSchema = Arrays.stream(fields)
.filter(field -> {
if (field.getType() == Schema.class) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we merge these if statements?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// TODO : Find a generic way to figure out the true size of the record in any scenario
long sizeOfRecord = ObjectSizeCalculator.getObjectSize(hoodieRecord);
long sizeOfSchema = ObjectSizeCalculator.getObjectSize(schema);
log.info("SizeOfRecord => " + sizeOfRecord);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

single log statement.. also should this be info?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, INFO seems fine, it's only logged once right now since the size estimate is done once.

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;

public class TestAvroPayload implements HoodieRecordPayload {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we extend or reuse an existing Payload class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately not, this is a special payload implementation which holds only bytes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename : AvroBinaryTestPayload

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looked at API changes.. LGTM

@n3nash n3nash force-pushed the generic_spillable_map branch from 60a10db to 27bf9a1 Compare March 21, 2018 00:14
@n3nash
Copy link
Contributor Author

n3nash commented Mar 21, 2018

@vinothchandar I've addressed your comments and left some more comments. Please take a pass. I'm going to do some microbenchmarking on the sizeEstimator in the meantime and report back

@n3nash n3nash force-pushed the generic_spillable_map branch from 27bf9a1 to 7128aeb Compare March 21, 2018 06:21
@n3nash
Copy link
Contributor Author

n3nash commented Mar 22, 2018

The sizeEstimator, which is using twitter ObjectSizeCalculator, shows a min of 95 ms and max of 180 ms as below :

18/03/22 07:18:49 INFO collection.ExternalSpillableMap: Estimated Payload size => 1049008 Time taken 126
18/03/22 07:18:49 INFO collection.ExternalSpillableMap: Estimated Payload size => 1037889 Time taken 180
18/03/22 07:18:49 INFO collection.ExternalSpillableMap: Estimated Payload size => 1038006 Time taken 95

For now, I will live with calculating the payload size once at the beginning, if that shows problems will invest in a config based every N records size estimate.

@n3nash n3nash force-pushed the generic_spillable_map branch from 7128aeb to 9955e5c Compare March 22, 2018 08:05
@n3nash
Copy link
Contributor Author

n3nash commented Mar 22, 2018

@vinothchandar I did a little more testing and realized it's really difficult to come up with a concrete object size for an entry in the hashmap in the JVM since objects can be shared etc. I'm thinking if spillableMap should be made config based for now ? Need to spend more time to understand how to do object sizing so that records don't spill to disk when they are not needed to. WDYT ?

@vinothchandar
Copy link
Member

I'm thinking if spillableMap should be made config based for now
Meaning, you want to control if we use spillable map or totally in-memory hashmap?

I did a little more testing and realized it's really difficult to come up with a concrete object size for an entry in the hashmap in the JVM since objects can be shared etc.
Can we especially hard if we hold avro records with a shared schema object... Is this a new problem with the twitter estimator? we were using Spark's estimator in other places right..

Can you provide me more context in to whats going wrong

@n3nash
Copy link
Contributor Author

n3nash commented Mar 23, 2018

All sizes in bytes.
Yeah, so say we have a Map[HoodieRecords], in most scenarios, what ends up happening is a HoodieRecord holds a payload which might in turn hold a GenericRecord. The size of such a HoodieRecord is 1049008. When I drill into the HoodieRecord, it turns out that schema is around 100000 and the rest is 49008. If you estimate the size of hoodie record, we would assume it to be 1049008. But actually, the schema object is shared amongst all payloads of all HoodieRecords, so ideally, the real payload size is 49008. I've been able to address this issue in the HoodieRecordConverter by performing some Class field look ups, you might have noticed this during your review, and subtracting the sizeOfSchema from the sizeOfRecord to get the actual HoodieRecord size in the heap, which is 49008. So all good uptill now but still a little hacky. During some more offline tests, I realized that for a more complex payload which hold nested schemas, I cannot do any such optimizations an hence end up over-estimating the size of the record. The downside of overestimating is we spillToDisk much sooner/often than we would have if we were using a regular HashMap<>.

@vinothchandar

@n3nash n3nash force-pushed the generic_spillable_map branch from 9955e5c to 47583a0 Compare March 23, 2018 04:16
@vinothchandar
Copy link
Member

I think holding it serialized could address some of these concerns? Taking a page out of Spark's book, this is why serialized in-memory storage is better/more compact at more CPU cost.. is that an option?

return this;
}

// Dynamic calculation of maxMemory to use for merge
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java doc style comments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


// This fraction is multiplied with the spark.memory.fraction to get a final fraction of heap space to use during merge
// This makes it easier to scale this value as one increases the spark.executor.memory
public static final String MAX_MEMORY_FRACTION_FOR_MERGE_PROP = "hoodie.merge.memory.fraction";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to hoodie.memory.merge.fraction in sync with how other configs are named?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// This fraction is multiplied with the spark.memory.fraction to get a final fraction of heap space to use during merge
// This makes it easier to scale this value as one increases the spark.executor.memory
public static final String MAX_MEMORY_FRACTION_FOR_MERGE_PROP = "hoodie.merge.memory.fraction";
public static final String MAX_MEMORY_FOR_MERGE_PROP = "hoodie.max.merge.memory";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here.. different name..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@n3nash n3nash force-pushed the generic_spillable_map branch 2 times, most recently from 5fb6a9d to 5011a19 Compare March 25, 2018 18:14
@n3nash
Copy link
Contributor Author

n3nash commented Mar 25, 2018

@vinothchandar Found out a better (and simple) way to accurately size an entry in a map and works well for any type of payload. I've incorporated that and your comments, please take a pass.

@n3nash n3nash force-pushed the generic_spillable_map branch from 5011a19 to 4933211 Compare March 25, 2018 18:16
keyConverter.sizeEstimate(key) + valueConverter.sizeEstimate(value);
log.info("Estimated Payload size => " + estimatedPayloadSize);
}
else if(shouldEstimatePayloadSize &&
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vinothchandar Simple handling of sizes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does estimating the size of entire map alleviate the issues you mentioned before?

Copy link
Contributor Author

@n3nash n3nash Mar 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inherent problem lies in the fact that an entry in the Map by itself is not a good gauge of how large the record is in the heap (as discussed before mainly due to shared objects). What essentially is required is to estimate the size of a record given N records collectively, that way the size of the shared object is amortized over N records and we get as close to the actual payload size as possible. Here, estimating the size of the map does that for us. It's still not dead accurate but comes very close. I chose the number 100 after tying with a couple different payloads, nested and non-nested with shared objects and I saw the size of the shared object get amortized. We start with estimating the payload size for one record first, use that till we reach 100 records. We do this since we don't want to OOM given a small memory setting and large record. Then we update the payload estimate by calculating the size of the HashMap / N. We could choose a higher number say 1000 at which we want to re-estimate; this might give use better amortization but the fear is that because of overestimation we may already start spilling to disk by then.
The other option is to keep updating the estimate logarithmically, but my fear is for a huge HashMap, I'm not sure how the ObjectSizeEstimator performs in terms of CPU and Memory usage.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets do a follow on task to fix this more systematically.. if 100 works for now, may be we can keep it as it is.

IIUC you are saying by estimating at a higher point in the object tree (graph), the sizing is more accurate? If so, then that makes sense..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's what I'm saying.

// Value converter to convert value type to bytes
final private Converter<R> valueConverter;
// Find the actual estimated payload size after inserting N records
final private static int NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE = 100;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you move static member to the top before any instance variables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

return value;
} catch(IOException io) {
throw new HoodieIOException("Unable to estimate size of payload", io);
if (!inMemoryMap.containsKey(key)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are assuming records are of the same size here and updates need not adjust for size.. worthy to leave a TODO here to revisit..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we are assuming that, done.

// Find the actual estimated payload size after inserting N records
final private static int NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE = 100;
// Flag to determine whether to stop re-estimating payload size
private boolean shouldEstimatePayloadSize = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we eliminate this flag and just reestimate the hashmap value every X records or so continuously? It can be non-linear probe as well.. 100, 1000, 10000, 100000 to amortize cost?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I thought of that but I'm unsure of the performance of the ObjectSizeEstimator for such large objects, say a HashMap with 100K entries. Do you think we should performance test that now and do this continuously ?

@n3nash n3nash force-pushed the generic_spillable_map branch 2 times, most recently from 7797177 to 3c5d672 Compare March 27, 2018 05:09
@vinothchandar
Copy link
Member

@n3nash we can move it back to IOHandle if that eases things. May be a static helper closer to the Map itself? since this math can be reused for compaction too? My original comment was on not having the a lot getter/setters for these spark props, which the localizing of variables addressed.

	- Introduced concept of converters to be able to serde generic datatype for SpillableMap
	- Fixed/Added configs to Hoodie Configs
	- Changed HoodieMergeHandle to start using SpillableMap
@n3nash n3nash force-pushed the generic_spillable_map branch from 3c5d672 to 588b5ee Compare March 28, 2018 01:46
@n3nash
Copy link
Contributor Author

n3nash commented Mar 28, 2018

@vinothchandar Revised the PR. Unfortunately, cannot get spark defaults since they are hard-coded. I've added link to the places where they are hardcoded in the spark code in the comments. If there are more changes you propose, can we merge this and create a follow up ticket which I can take up in the following week ?

@vinothchandar vinothchandar merged commit 987f5d6 into apache:master Mar 28, 2018
vinishjail97 pushed a commit to vinishjail97/hudi that referenced this pull request Dec 15, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants