Skip to content

Commit

Permalink
Factorgen: Ensure that tables are sorted before sampling them
Browse files Browse the repository at this point in the history
This change makes the factor tables people2Hops, people4Hops, and messageIds
deterministic regardless of the degree of parallelism
  • Loading branch information
szarnyasg committed Nov 3, 2022
1 parent 4439c68 commit 0547dbb
Showing 1 changed file with 6 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ object FactorGenerationStage extends DatagenStage with Logging {
date_trunc("day", $"creationDate").as("creationDay"),
date_trunc("day", $"deletionDate").as("deletionDay"),
$"MessageId")
.orderBy($"MessageId")

val sampleSize = 20000.0
val count = messages.count()
val sampleFraction = Math.min(sampleSize / count, 1.0)
Expand Down Expand Up @@ -407,6 +409,7 @@ object FactorGenerationStage extends DatagenStage with Logging {
log.info(s"Factor people4Hops: using ${sampleSize} samples (${sampleFraction * 100}%)")

peopleInChina
.orderBy($"Person.id")
.sample(sampleFraction, 42)
.join(relations.alias("knows"), $"Person.id" === $"knows.Person1Id")
.select($"knows.Person1Id".alias("Person1Id"), $"knows.Person2Id".alias("Person2Id"))
Expand All @@ -428,6 +431,7 @@ object FactorGenerationStage extends DatagenStage with Logging {
$"Person2.creationDate".as("Person2CreationDate"),
$"Person2.deletionDate".as("Person2DeletionDate")
)
.orderBy($"Person1Id", $"Person2Id")

val sampleFractionPersonPairs = Math.min(10000.0 / personPairs.count(), 1.0)
personPairs.sample(sampleFractionPersonPairs, 42)
Expand Down Expand Up @@ -455,6 +459,7 @@ object FactorGenerationStage extends DatagenStage with Logging {
log.info(s"Factor people4Hops: using ${sampleSize} samples (${sampleFraction * 100}%)")

peopleInChina
.orderBy($"Person.id")
.sample(sampleFraction, 42)
.join(relations.alias("knows"), $"Person.id" === $"knows.Person1Id")
.select($"knows.Person1Id".alias("Person1Id"), $"knows.Person2Id".alias("Person2Id"))
Expand All @@ -476,6 +481,7 @@ object FactorGenerationStage extends DatagenStage with Logging {
$"Person2.creationDate".as("Person2CreationDate"),
$"Person2.deletionDate".as("Person2DeletionDate")
)
.orderBy($"Person1Id", $"Person2Id")

val sampleFractionPersonPairs = Math.min(10000.0 / personPairs.count(), 1.0)
personPairs.sample(sampleFractionPersonPairs, 42)
Expand Down

0 comments on commit 0547dbb

Please sign in to comment.