Skip to content

Commit

Permalink
remove data count
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicole00 committed May 17, 2023
1 parent 8c39c8d commit 769b3d7
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,6 @@ object Exchange {
val fields = tagConfig.vertexField :: tagConfig.fields
val data = createDataSource(spark, tagConfig.dataSourceConfigEntry, fields)
if (data.isDefined && !c.dry) {
data.get.cache()
val count = data.get.count()
val startTime = System.currentTimeMillis()
val batchSuccess =
spark.sparkContext.longAccumulator(s"batchSuccess.${tagConfig.name}")
Expand All @@ -161,10 +159,8 @@ object Exchange {
batchFailure
)
processor.process()
data.get.unpersist()
val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f")
LOG.info(
s"import for tag ${tagConfig.name}: data total count: $count, total time: ${costTime}s")
LOG.info(s"import for tag ${tagConfig.name}, total time: ${costTime}s")
if (tagConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) {
LOG.info(s"Client-Import: batchSuccess.${tagConfig.name}: ${batchSuccess.value}")
LOG.info(s"Client-Import: batchFailure.${tagConfig.name}: ${batchFailure.value}")
Expand Down Expand Up @@ -195,8 +191,6 @@ object Exchange {
}
val data = createDataSource(spark, edgeConfig.dataSourceConfigEntry, fields)
if (data.isDefined && !c.dry) {
data.get.cache()
val count = data.get.count()
val startTime = System.currentTimeMillis()
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${edgeConfig.name}")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.${edgeConfig.name}")
Expand All @@ -212,10 +206,8 @@ object Exchange {
batchFailure
)
processor.process()
data.get.unpersist()
val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f")
LOG.info(
s"import for edge ${edgeConfig.name}: data total count: $count, total time: ${costTime}s")
LOG.info(s"import for edge ${edgeConfig.name}, total time: ${costTime}s")
if (edgeConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) {
LOG.info(s"Client-Import: batchSuccess.${edgeConfig.name}: ${batchSuccess.value}")
LOG.info(s"Client-Import: batchFailure.${edgeConfig.name}: ${batchFailure.value}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,6 @@ object Exchange {
val fields = tagConfig.vertexField :: tagConfig.fields
val data = createDataSource(spark, tagConfig.dataSourceConfigEntry, fields)
if (data.isDefined && !c.dry) {
data.get.cache()
val count = data.get.count()
val startTime = System.currentTimeMillis()
val batchSuccess =
spark.sparkContext.longAccumulator(s"batchSuccess.${tagConfig.name}")
Expand All @@ -161,9 +159,8 @@ object Exchange {
batchFailure
)
processor.process()
data.get.unpersist()
val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f")
LOG.info(s"import for tag ${tagConfig.name}, data count: $count, cost time: ${costTime}s")
LOG.info(s"import for tag ${tagConfig.name}, cost time: ${costTime}s")
if (tagConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) {
LOG.info(s"Client-Import: batchSuccess.${tagConfig.name}: ${batchSuccess.value}")
LOG.info(s"Client-Import: batchFailure.${tagConfig.name}: ${batchFailure.value}")
Expand Down Expand Up @@ -194,8 +191,6 @@ object Exchange {
}
val data = createDataSource(spark, edgeConfig.dataSourceConfigEntry, fields)
if (data.isDefined && !c.dry) {
data.get.cache()
val count = data.get.count()
val startTime = System.currentTimeMillis()
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${edgeConfig.name}")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.${edgeConfig.name}")
Expand All @@ -213,8 +208,7 @@ object Exchange {
processor.process()
data.get.unpersist()
val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f")
LOG.info(
s"import for edge ${edgeConfig.name}, data count: $count, cost time: ${costTime}s")
LOG.info(s"import for edge ${edgeConfig.name}, cost time: ${costTime}s")
if (edgeConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) {
LOG.info(s"Client-Import: batchSuccess.${edgeConfig.name}: ${batchSuccess.value}")
LOG.info(s"Client-Import: batchFailure.${edgeConfig.name}: ${batchFailure.value}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,6 @@ object Exchange {
val fields = tagConfig.vertexField :: tagConfig.fields
val data = createDataSource(spark, tagConfig.dataSourceConfigEntry, fields)
if (data.isDefined && !c.dry) {
data.get.cache()
val count = data.get.count()
val startTime = System.currentTimeMillis()
val batchSuccess =
spark.sparkContext.longAccumulator(s"batchSuccess.${tagConfig.name}")
Expand All @@ -161,9 +159,8 @@ object Exchange {
batchFailure
)
processor.process()
data.get.unpersist()
val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f")
LOG.info(s"import for tag ${tagConfig.name}, data count: $count, cost time: ${costTime}s")
LOG.info(s"import for tag ${tagConfig.name}, cost time: ${costTime}s")
if (tagConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) {
LOG.info(s"Client-Import: batchSuccess.${tagConfig.name}: ${batchSuccess.value}")
LOG.info(s"Client-Import: batchFailure.${tagConfig.name}: ${batchFailure.value}")
Expand Down Expand Up @@ -194,8 +191,6 @@ object Exchange {
}
val data = createDataSource(spark, edgeConfig.dataSourceConfigEntry, fields)
if (data.isDefined && !c.dry) {
data.get.cache()
val count = data.get.count()
val startTime = System.currentTimeMillis()
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${edgeConfig.name}")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.${edgeConfig.name}")
Expand All @@ -211,10 +206,8 @@ object Exchange {
batchFailure
)
processor.process()
data.get.unpersist()
val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f")
LOG.info(
s"import for edge ${edgeConfig.name}, data count: $count, cost time: ${costTime}s")
LOG.info(s"import for edge ${edgeConfig.name}, cost time: ${costTime}s")
if (edgeConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) {
LOG.info(s"Client-Import: batchSuccess.${edgeConfig.name}: ${batchSuccess.value}")
LOG.info(s"Client-Import: batchFailure.${edgeConfig.name}: ${batchFailure.value}")
Expand Down

0 comments on commit 769b3d7

Please sign in to comment.