Skip to content

Commit

Permalink
New scala version. Added CEP, WEP, fixed some bugs.
Browse files Browse the repository at this point in the history
  • Loading branch information
Gaglia88 committed Apr 12, 2019
1 parent 3c003ef commit 5fbf6e8
Show file tree
Hide file tree
Showing 125 changed files with 4,355 additions and 0 deletions.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Empty file added scala/README.md
Empty file.
37 changes: 37 additions & 0 deletions scala/sparker/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
name := "spark_er"
version := "1.0"
scalaVersion := "2.11.8"
val sparkVersion = "2.0.2"

unmanagedBase := baseDirectory.value / "custom_lib"


libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.1.0"

// https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.11
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.1.0"

// https://mvnrepository.com/artifact/org.apache.spark/spark-graphx_2.11
libraryDependencies += "org.apache.spark" % "spark-graphx_2.11" % "2.1.0"

libraryDependencies += "org.apache.spark" % "spark-mllib_2.11" % "2.1.0"

libraryDependencies += "org.apache.spark" % "spark-hive_2.11" % "2.1.0"

// https://mvnrepository.com/artifact/com.twitter/algebird-core_2.11
//libraryDependencies += "com.twitter" % "algebird-core_2.11" % "0.12.3"

// https://mvnrepository.com/artifact/org.apache.commons/commons-math3
libraryDependencies += "org.apache.commons" % "commons-math3" % "3.6.1"

// https://mvnrepository.com/artifact/commons-codec/commons-codec
libraryDependencies += "commons-codec" % "commons-codec" % "1.11"

// https://mvnrepository.com/artifact/org.jgrapht/jgrapht-core
libraryDependencies += "org.jgrapht" % "jgrapht-core" % "1.0.1"

// https://mvnrepository.com/artifact/org.json/json
libraryDependencies += "org.json" % "json" % "20170516"


//mainClass in Compile := Some("Experiments.Main")
Binary file added scala/sparker/custom_lib/serializedLoader.jar
Binary file not shown.
157 changes: 157 additions & 0 deletions scala/sparker/src/main/scala-2.11/Experiments/Main.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package Experiments

import SparkER.BlockBuildingMethods.{BlockingUtils, LSH, TokenBlocking}
import SparkER.BlockBuildingMethods.LSH.Settings
import SparkER.BlockRefinementMethods.PruningMethods._
import SparkER.BlockRefinementMethods.{BlockFiltering, BlockPurging}
import SparkER.Utilities.Converters
import SparkER.Wrappers.{CSVWrapper, JSONWrapper}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{SparkConf, SparkContext}


/**
* Test WNP meta-blocking
*
* @author Luca Gagliardelli
* @since 18/12/2018
**/
object Main {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("Main")
.setMaster("local[*]")
.set("spark.default.parallelism", "4")

val sc = new SparkContext(conf)

/**
* Loads two datasets
**/
val path = "C:\\Users\\gagli\\Desktop\\datasets\\clean\\movies\\"

val dataset1 = JSONWrapper.loadProfiles(path + "dataset1.json", realIDField = "realProfileID", sourceId = 1)
val maxIdDataset1 = dataset1.map(_.id).max()

val dataset2 = JSONWrapper.loadProfiles(path + "dataset2.json", realIDField = "realProfileID", sourceId = 2, startIDFrom = maxIdDataset1 + 1)

val maxProfileID = dataset2.map(_.id).max()

val separators = Array(maxIdDataset1)

val profiles = dataset1.union(dataset2)


//Loads the groundtruth
val groundtruth = JSONWrapper.loadGroundtruth(path + "groundtruth.json", firstDatasetAttribute = "id1", secondDatasetAttribute = "id2")

//Converts the id in the groundtruth to the autogenerated ones
val realIdIds1 = sc.broadcast(dataset1.map { p =>
(p.originalID, p.id)
}.collectAsMap())

val realIdIds2 = sc.broadcast(dataset2.map { p =>
(p.originalID, p.id)
}.collectAsMap())

var newGT: Set[(Long, Long)] = null
newGT = groundtruth.map { g =>
val first = realIdIds1.value.get(g.firstEntityID)
val second = realIdIds2.value.get(g.secondEntityID)
if (first.isDefined && second.isDefined) {
val f = first.get
val s = second.get
if (f < s) {
(f, s)
}
else {
(s, f)
}
}
else {
(-1L, -1L)
}
}.filter(_._1 >= 0).collect().toSet


val newGTSize = newGT.size

val gt = sc.broadcast(newGT)

//Token blocking
val blocks = TokenBlocking.createBlocks(profiles, separators)
val useEntropy = false

//Loose meta-blocking
/*val clusters = LSHMio.clusterSimilarAttributes(
profiles = profiles,
numHashes = 128,
targetThreshold = 0.3,
maxFactor = 1.0,
numBands = -1,
keysToExclude = Nil,
computeEntropy = useEntropy,
separator = Settings.SOURCE_NAME_SEPARATOR
)
clusters.foreach(println)
val useEntropy = true
val blocks = TokenBlocking.createBlocksCluster(profiles, separators, clusters)
*/

//Purging
val blocksPurged = BlockPurging.blockPurging(blocks, 1.015)

//Filtering
val profileBlocks = Converters.blocksToProfileBlocks(blocksPurged)
val profileBlocksFiltered = BlockFiltering.blockFiltering(profileBlocks, 0.8)
val blocksAfterFiltering = Converters.profilesBlockToBlocks(profileBlocksFiltered, separators)


//Metablocking

val blockIndexMap = blocksAfterFiltering.map(b => (b.blockID, b.profiles)).collectAsMap()
val blockIndex = sc.broadcast(blockIndexMap)
val profileBlocksSizeIndex: Broadcast[scala.collection.Map[Long, Int]] = sc.broadcast(profileBlocksFiltered.map(pb => (pb.profileID, pb.blocks.size)).collectAsMap())

val blocksEntropiesMap: Broadcast[scala.collection.Map[Long, Double]] = {
if (useEntropy) {
val blocksEntropies = blocks.map(b => (b.blockID, b.entropy)).collectAsMap()
sc.broadcast(blocksEntropies)
}
else {
null
}
}

val edgesAndCount = WNP.WNP(
profileBlocksFiltered,
blockIndex,
maxProfileID.toInt,
separators,
gt,
PruningUtils.ThresholdTypes.AVG,
PruningUtils.WeightTypes.CBS,
profileBlocksSizeIndex,
useEntropy,
blocksEntropiesMap,
2.0,
PruningUtils.ComparisonTypes.OR
)

val numCandidates = edgesAndCount.map(_._1).sum()
val perfectMatch = edgesAndCount.map(_._2).sum()
val candidatePairs = edgesAndCount.flatMap(_._3)

val pc = perfectMatch.toFloat / newGTSize.toFloat
val pq = perfectMatch.toFloat / numCandidates.toFloat

println("PC = " + pc)
println("PQ = " + pq)
println("Retained edges " + numCandidates)

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package SparkER.BlockBuildingMethods

import SparkER.BlockBuildingMethods.LSH.Settings
//import BlockBuildingMethods.LSHTwitter.Settings
import SparkER.DataStructures.{BlockAbstract, KeyValue, Profile}
import org.apache.spark.rdd.RDD

/**
* Common methods for the different blocking techniques
* @author Luca Gagliardelli
* @since 2016/12/07
*/
object BlockingUtils {
/** Defines the pattern used for tokenization */
object TokenizerPattern {
/** Split the token by underscore, whitespaces and punctuation */
val DEFAULT_SPLITTING = "[\\W_]"
}

/**
* Given a tuple (entity ID, [List of entity tokens])
* produces a list of tuple (token, entityID)
*
* @param profileKs couple (entity ID, [List of entity keys])
**/
def associateKeysToProfileID(profileKs: (Long, Iterable[String])): Iterable[(String, Long)] = {
val profileId = profileKs._1
val keys = profileKs._2
keys.map(key => (key, profileId))
}

/**
* Used in the method that calculates the entropy of each block
* @param profileKs couple (entity ID, [List of entity' tokens])
* @return a list of (token, (profileID, [tokens hashes]))
**/
def associateKeysToProfileIdEntropy(profileKs: (Long, Iterable[String])): Iterable[(String, (Long, Iterable[Int]))] = {
val profileId = profileKs._1
val tokens = profileKs._2
tokens.map(tokens => (tokens, (profileId, tokens.map(_.hashCode))))
}
}
Loading

0 comments on commit 5fbf6e8

Please sign in to comment.