Skip to content

Commit

Permalink
Merge pull request #423 from ldbc/fix-parallelism-dependent-factor-ta…
Browse files Browse the repository at this point in the history
…bles

Factorgen: Make people4Hops deterministic regardless of the degree of parallelism
  • Loading branch information
szarnyasg authored Nov 3, 2022
2 parents 4439c68 + 0547dbb commit 2459f4e
Showing 1 changed file with 6 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ object FactorGenerationStage extends DatagenStage with Logging {
date_trunc("day", $"creationDate").as("creationDay"),
date_trunc("day", $"deletionDate").as("deletionDay"),
$"MessageId")
.orderBy($"MessageId")

val sampleSize = 20000.0
val count = messages.count()
val sampleFraction = Math.min(sampleSize / count, 1.0)
Expand Down Expand Up @@ -407,6 +409,7 @@ object FactorGenerationStage extends DatagenStage with Logging {
log.info(s"Factor people4Hops: using ${sampleSize} samples (${sampleFraction * 100}%)")

peopleInChina
.orderBy($"Person.id")
.sample(sampleFraction, 42)
.join(relations.alias("knows"), $"Person.id" === $"knows.Person1Id")
.select($"knows.Person1Id".alias("Person1Id"), $"knows.Person2Id".alias("Person2Id"))
Expand All @@ -428,6 +431,7 @@ object FactorGenerationStage extends DatagenStage with Logging {
$"Person2.creationDate".as("Person2CreationDate"),
$"Person2.deletionDate".as("Person2DeletionDate")
)
.orderBy($"Person1Id", $"Person2Id")

val sampleFractionPersonPairs = Math.min(10000.0 / personPairs.count(), 1.0)
personPairs.sample(sampleFractionPersonPairs, 42)
Expand Down Expand Up @@ -455,6 +459,7 @@ object FactorGenerationStage extends DatagenStage with Logging {
log.info(s"Factor people4Hops: using ${sampleSize} samples (${sampleFraction * 100}%)")

peopleInChina
.orderBy($"Person.id")
.sample(sampleFraction, 42)
.join(relations.alias("knows"), $"Person.id" === $"knows.Person1Id")
.select($"knows.Person1Id".alias("Person1Id"), $"knows.Person2Id".alias("Person2Id"))
Expand All @@ -476,6 +481,7 @@ object FactorGenerationStage extends DatagenStage with Logging {
$"Person2.creationDate".as("Person2CreationDate"),
$"Person2.deletionDate".as("Person2DeletionDate")
)
.orderBy($"Person1Id", $"Person2Id")

val sampleFractionPersonPairs = Math.min(10000.0 / personPairs.count(), 1.0)
personPairs.sample(sampleFractionPersonPairs, 42)
Expand Down

0 comments on commit 2459f4e

Please sign in to comment.