Skip to content

Commit

Permalink
SAMZA-1229; Disk space monitor should only count data in use by the c…
Browse files Browse the repository at this point in the history
…ontainer

Author: Prateek Maheshwari <[email protected]>

Reviewers: Jagadish <[email protected]>

Closes apache#134 from prateekm/disk-space-monitor
  • Loading branch information
prateekm authored and jagadish-northguard committed Apr 24, 2017
1 parent c91da78 commit 0feb5c2
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,6 @@ object SamzaContainer extends Logging {
info("Got default storage engine base directory: %s" format defaultStoreBaseDir)

val storeWatchPaths = new util.HashSet[Path]()
storeWatchPaths.add(defaultStoreBaseDir.toPath)

val taskInstances: Map[TaskName, TaskInstance] = containerModel.getTasks.values.asScala.map(taskModel => {
debug("Setting up task instance: %s" format taskModel)
Expand Down Expand Up @@ -455,8 +454,6 @@ object SamzaContainer extends Logging {
loggedStorageBaseDir = defaultStoreBaseDir
}

storeWatchPaths.add(loggedStorageBaseDir.toPath)

info("Got base directory for logged data stores: %s" format loggedStorageBaseDir)

val taskStores = storageEngineFactories
Expand All @@ -467,25 +464,30 @@ object SamzaContainer extends Logging {
} else {
null
}

val keySerde = config.getStorageKeySerde(storeName) match {
case Some(keySerde) => serdes.getOrElse(keySerde,
throw new SamzaException("StorageKeySerde: No class defined for serde: %s." format keySerde))
case _ => null
}

val msgSerde = config.getStorageMsgSerde(storeName) match {
case Some(msgSerde) => serdes.getOrElse(msgSerde,
throw new SamzaException("StorageMsgSerde: No class defined for serde: %s." format msgSerde))
case _ => null
}
val storeBaseDir = if(changeLogSystemStreamPartition != null) {

val storeDir = if (changeLogSystemStreamPartition != null) {
TaskStorageManager.getStorePartitionDir(loggedStorageBaseDir, storeName, taskName)
}
else {
} else {
TaskStorageManager.getStorePartitionDir(defaultStoreBaseDir, storeName, taskName)
}

storeWatchPaths.add(storeDir.toPath)

val storageEngine = storageEngineFactory.getStorageEngine(
storeName,
storeBaseDir,
storeDir,
keySerde,
msgSerde,
collector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,24 +93,24 @@ class TaskStorageManager(
debug("Cleaning base directories for stores.")

taskStores.keys.foreach(storeName => {
val storagePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName)
info("Got default storage partition directory as %s" format storagePartitionDir.toPath.toString)
val storePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName)
info("Got default storage partition directory as %s" format storePartitionDir.toPath.toString)

if(storagePartitionDir.exists()) {
info("Deleting default storage partition directory %s" format storagePartitionDir.toPath.toString)
Util.rm(storagePartitionDir)
if(storePartitionDir.exists()) {
info("Deleting default storage partition directory %s" format storePartitionDir.toPath.toString)
Util.rm(storePartitionDir)
}

val loggedStoreDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName)
info("Got logged storage partition directory as %s" format loggedStoreDir.toPath.toString)
val loggedStorePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName)
info("Got logged storage partition directory as %s" format loggedStorePartitionDir.toPath.toString)

// Delete the logged store if it is not valid.
if (!isLoggedStoreValid(storeName, loggedStoreDir)) {
info("Deleting logged storage partition directory %s." format loggedStoreDir.toPath.toString)
Util.rm(loggedStoreDir)
if (!isLoggedStoreValid(storeName, loggedStorePartitionDir)) {
info("Deleting logged storage partition directory %s." format loggedStorePartitionDir.toPath.toString)
Util.rm(loggedStorePartitionDir)
} else {
val offset = readOffsetFile(loggedStoreDir)
info("Read offset %s for the store %s from logged storage partition directory %s." format(offset, storeName, loggedStoreDir))
val offset = readOffsetFile(loggedStorePartitionDir)
info("Read offset %s for the store %s from logged storage partition directory %s." format(offset, storeName, loggedStorePartitionDir))
fileOffset.put(new SystemStreamPartition(changeLogSystemStreams(storeName), partition), offset)
}
})
Expand Down Expand Up @@ -182,13 +182,13 @@ class TaskStorageManager(
taskStores.foreach {
case (storeName, storageEngine) =>
if (storageEngine.getStoreProperties.isLoggedStore) {
val loggedStoragePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName)
info("Using logged storage partition directory: %s for store: %s." format(loggedStoragePartitionDir.toPath.toString, storeName))
if (!loggedStoragePartitionDir.exists()) loggedStoragePartitionDir.mkdirs()
val loggedStorePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName)
info("Using logged storage partition directory: %s for store: %s." format(loggedStorePartitionDir.toPath.toString, storeName))
if (!loggedStorePartitionDir.exists()) loggedStorePartitionDir.mkdirs()
} else {
val storagePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName)
info("Using storage partition directory: %s for store: %s." format(storagePartitionDir.toPath.toString, storeName))
storagePartitionDir.mkdirs()
val storePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName)
info("Using storage partition directory: %s for store: %s." format(storePartitionDir.toPath.toString, storeName))
storePartitionDir.mkdirs()
}
}
}
Expand Down Expand Up @@ -322,7 +322,8 @@ class TaskStorageManager(
}
debug("Got offset %s for store %s" format(newestOffset, storeName))

val offsetFile = new File(TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName), offsetFileName)
val loggedStorePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName)
val offsetFile = new File(loggedStorePartitionDir, offsetFileName)
if (newestOffset != null) {
debug("Storing offset for store in OFFSET file ")
Util.writeDataToFile(offsetFile, newestOffset)
Expand Down

0 comments on commit 0feb5c2

Please sign in to comment.