From b90e29affd3efc40950ae9b04174f7f613691cb1 Mon Sep 17 00:00:00 2001 From: mweilsalesforce Date: Tue, 25 Jun 2019 15:36:51 -0700 Subject: [PATCH 1/4] check unmatch features --- core/src/main/scala/com/salesforce/op/OpWorkflowCore.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/com/salesforce/op/OpWorkflowCore.scala b/core/src/main/scala/com/salesforce/op/OpWorkflowCore.scala index 1bc3bf7d9e..9e5d48c6d8 100644 --- a/core/src/main/scala/com/salesforce/op/OpWorkflowCore.scala +++ b/core/src/main/scala/com/salesforce/op/OpWorkflowCore.scala @@ -255,7 +255,7 @@ private[op] trait OpWorkflowCore { val readerInputTypes = reader.get.subReaders.map(_.fullTypeName).toSet val unmatchedFeatures = rawFeatures.filterNot(f => readerInputTypes - .contains(f.originStage.asInstanceOf[FeatureGeneratorStage[_, _ <: FeatureType]].tti.tpe.toString) + .contains(f.originStage.asInstanceOf[FeatureGeneratorStage[_, _ <: FeatureType]].tti.tpe.typeSymbol.fullName) ) require( unmatchedFeatures.isEmpty, From 26913a31c75ed6725f59ee8f5c4ef0b50985abb6 Mon Sep 17 00:00:00 2001 From: mweilsalesforce Date: Tue, 25 Jun 2019 16:07:47 -0700 Subject: [PATCH 2/4] Reader Fix --- readers/src/main/scala/com/salesforce/op/readers/Reader.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/readers/src/main/scala/com/salesforce/op/readers/Reader.scala b/readers/src/main/scala/com/salesforce/op/readers/Reader.scala index a41c51e612..14b69d3bd6 100644 --- a/readers/src/main/scala/com/salesforce/op/readers/Reader.scala +++ b/readers/src/main/scala/com/salesforce/op/readers/Reader.scala @@ -66,7 +66,8 @@ private[readers] trait ReaderType[T] extends Serializable { * @param opParams contains map of reader type to ReaderParams instances * @return ReaderParams instance if it exists */ - final def getReaderParams(opParams: OpParams): Option[ReaderParams] = opParams.readerParams.get(this.typeName) + final def getReaderParams(opParams: OpParams): Option[ReaderParams] = opParams.readerParams + .get(this.wtt.tpe.toString.split("\\.").last) } From 49825db4b9e76d8ed680bd53b402352975d36dcd Mon Sep 17 00:00:00 2001 From: mweilsalesforce Date: Wed, 26 Jun 2019 12:07:31 -0700 Subject: [PATCH 3/4] tpe.toString instead --- core/src/main/scala/com/salesforce/op/OpWorkflowCore.scala | 2 +- .../main/scala/com/salesforce/op/readers/JoinedDataReader.scala | 2 +- readers/src/main/scala/com/salesforce/op/readers/Reader.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/OpWorkflowCore.scala b/core/src/main/scala/com/salesforce/op/OpWorkflowCore.scala index 9e5d48c6d8..1bc3bf7d9e 100644 --- a/core/src/main/scala/com/salesforce/op/OpWorkflowCore.scala +++ b/core/src/main/scala/com/salesforce/op/OpWorkflowCore.scala @@ -255,7 +255,7 @@ private[op] trait OpWorkflowCore { val readerInputTypes = reader.get.subReaders.map(_.fullTypeName).toSet val unmatchedFeatures = rawFeatures.filterNot(f => readerInputTypes - .contains(f.originStage.asInstanceOf[FeatureGeneratorStage[_, _ <: FeatureType]].tti.tpe.typeSymbol.fullName) + .contains(f.originStage.asInstanceOf[FeatureGeneratorStage[_, _ <: FeatureType]].tti.tpe.toString) ) require( unmatchedFeatures.isEmpty, diff --git a/readers/src/main/scala/com/salesforce/op/readers/JoinedDataReader.scala b/readers/src/main/scala/com/salesforce/op/readers/JoinedDataReader.scala index 7b9b8a5ab3..c52164b575 100644 --- a/readers/src/main/scala/com/salesforce/op/readers/JoinedDataReader.scala +++ b/readers/src/main/scala/com/salesforce/op/readers/JoinedDataReader.scala @@ -149,7 +149,7 @@ private[op] abstract class JoinedReader[T, U] )(implicit spark: SparkSession): (DataFrame, Array[String]) = { def getData(r: DataReader[_]): DataFrame = { - val readerFeatures = rawFeatures.filter { f => getGenStage(f).tti.tpe.typeSymbol.fullName == r.fullTypeName } + val readerFeatures = rawFeatures.filter { f => getGenStage(f).tti.tpe.toString == r.fullTypeName } r.generateDataFrame(readerFeatures, opParams) } diff --git a/readers/src/main/scala/com/salesforce/op/readers/Reader.scala b/readers/src/main/scala/com/salesforce/op/readers/Reader.scala index 14b69d3bd6..c869906493 100644 --- a/readers/src/main/scala/com/salesforce/op/readers/Reader.scala +++ b/readers/src/main/scala/com/salesforce/op/readers/Reader.scala @@ -51,7 +51,7 @@ private[readers] trait ReaderType[T] extends Serializable { * * @return full input type name */ - final def fullTypeName: String = wtt.tpe.typeSymbol.fullName + final def fullTypeName: String = wtt.tpe.toString /** * Short reader input type name From 75daedc1a81b9c3cabd02fada8ba7df478c067f5 Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Wed, 26 Jun 2019 21:19:37 -0700 Subject: [PATCH 4/4] Update Reader.scala --- readers/src/main/scala/com/salesforce/op/readers/Reader.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/readers/src/main/scala/com/salesforce/op/readers/Reader.scala b/readers/src/main/scala/com/salesforce/op/readers/Reader.scala index c869906493..25fe4ef803 100644 --- a/readers/src/main/scala/com/salesforce/op/readers/Reader.scala +++ b/readers/src/main/scala/com/salesforce/op/readers/Reader.scala @@ -66,8 +66,7 @@ private[readers] trait ReaderType[T] extends Serializable { * @param opParams contains map of reader type to ReaderParams instances * @return ReaderParams instance if it exists */ - final def getReaderParams(opParams: OpParams): Option[ReaderParams] = opParams.readerParams - .get(this.wtt.tpe.toString.split("\\.").last) + final def getReaderParams(opParams: OpParams): Option[ReaderParams] = opParams.readerParams.get(this.typeName) }