Skip to content

Commit

Permalink
SAMZA-1181: Fix AppMaster hang after submitting jobs to Yarn
Browse files Browse the repository at this point in the history
Currently when a job is submitted to Yarn, it is going to hang after AppMaster is created. The log shows that it hangs during bootstrapping from Coordinator stream. Further debugging shows that the jobs hang in the second time of bootstrap while reading locality data from LocalityManager. The sequence is the following:
1. JobModelManager creates CoordinatorStreamConsumer, and bootstrap it,
2. LocalityManager writes locality info into coordinator stream
3. JobModelManager closes CoordinatorStreamConsumer
4. Later localityManager bootstraps CoordinatorStreamConsumer again
Step 3 is the problem here. Since CoordinatorStreamConsumer is still held by LocalityManager, it cannot be closed prematurely. Step 3 is introduced in SAMZA-1154, as a refactoring of JobModelManager for task rest end point. To fix this issue, we will revert this change of step 3.

Author: Shanthoosh Venkataraman <[email protected]>

Reviewers: Xinyu Liu <[email protected]>

Closes apache#104 from shanthoosh/master
  • Loading branch information
shanthoosh authored and Xinyu Liu committed Mar 31, 2017
1 parent 71004e1 commit 888e061
Showing 1 changed file with 56 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,76 +81,65 @@ object JobModelManager extends Logging {
* from the coordinator stream, and instantiate a JobModelManager.
*/
def apply(coordinatorSystemConfig: Config, metricsRegistryMap: MetricsRegistryMap): JobModelManager = {
var coordinatorSystemConsumer: CoordinatorStreamSystemConsumer = null
var coordinatorSystemProducer: CoordinatorStreamSystemProducer = null
try {
val coordinatorStreamSystemFactory: CoordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory()
coordinatorSystemConsumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(coordinatorSystemConfig, metricsRegistryMap)
coordinatorSystemProducer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(coordinatorSystemConfig, metricsRegistryMap)
info("Registering coordinator system stream consumer.")
coordinatorSystemConsumer.register
debug("Starting coordinator system stream consumer.")
coordinatorSystemConsumer.start
debug("Bootstrapping coordinator system stream consumer.")
coordinatorSystemConsumer.bootstrap
info("Registering coordinator system stream producer.")
coordinatorSystemProducer.register(SOURCE)

val config = coordinatorSystemConsumer.getConfig
info("Got config: %s" format config)
val changelogManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, SOURCE)
changelogManager.start()
val localityManager = new LocalityManager(coordinatorSystemProducer, coordinatorSystemConsumer)
// We don't need to start() localityManager as they share the same instances with checkpoint and changelog managers.
// TODO: This code will go away with refactoring - SAMZA-678

localityManager.start()

// Map the name of each system to the corresponding SystemAdmin
val systemAdmins = getSystemAdmins(config)

val streamMetadataCache = new StreamMetadataCache(systemAdmins = systemAdmins, cacheTTLms = 0)
var streamPartitionCountMonitor: StreamPartitionCountMonitor = null
if (config.getMonitorPartitionChange) {
val extendedSystemAdmins = systemAdmins.filter{
case (systemName, systemAdmin) => systemAdmin.isInstanceOf[ExtendedSystemAdmin]
}
val inputStreamsToMonitor = config.getInputStreams.filter(systemStream => extendedSystemAdmins.contains(systemStream.getSystem))
if (inputStreamsToMonitor.nonEmpty) {
streamPartitionCountMonitor = new StreamPartitionCountMonitor(
inputStreamsToMonitor.asJava,
streamMetadataCache,
metricsRegistryMap,
config.getMonitorPartitionChangeFrequency)
}
}
val previousChangelogPartitionMapping = changelogManager.readChangeLogPartitionMapping()
val jobModelManager = getJobModelManager(config, previousChangelogPartitionMapping, localityManager, streamMetadataCache, streamPartitionCountMonitor, null)
val jobModel = jobModelManager.jobModel
// Save the changelog mapping back to the ChangelogPartitionmanager
// newChangelogPartitionMapping is the merging of all current task:changelog
// assignments with whatever we had before (previousChangelogPartitionMapping).
// We must persist legacy changelog assignments so that
// maxChangelogPartitionId always has the absolute max, not the current
// max (in case the task with the highest changelog partition mapping
// disappears.
val newChangelogPartitionMapping = jobModel.getContainers.asScala.flatMap(_._2.getTasks.asScala).map{case (taskName,taskModel) => {
taskName -> Integer.valueOf(taskModel.getChangelogPartition.getPartitionId)
}}.toMap ++ previousChangelogPartitionMapping.asScala
info("Saving task-to-changelog partition mapping: %s" format newChangelogPartitionMapping)
changelogManager.writeChangeLogPartitionMapping(newChangelogPartitionMapping.asJava)

createChangeLogStreams(config, jobModel.maxChangeLogStreamPartitions)

jobModelManager
} finally {
if (coordinatorSystemConsumer != null) {
coordinatorSystemConsumer.stop()
val coordinatorStreamSystemFactory: CoordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory()
val coordinatorSystemConsumer: CoordinatorStreamSystemConsumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(coordinatorSystemConfig, metricsRegistryMap)
val coordinatorSystemProducer: CoordinatorStreamSystemProducer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(coordinatorSystemConfig, metricsRegistryMap)
info("Registering coordinator system stream consumer.")
coordinatorSystemConsumer.register
debug("Starting coordinator system stream consumer.")
coordinatorSystemConsumer.start
debug("Bootstrapping coordinator system stream consumer.")
coordinatorSystemConsumer.bootstrap
info("Registering coordinator system stream producer.")
coordinatorSystemProducer.register(SOURCE)

val config = coordinatorSystemConsumer.getConfig
info("Got config: %s" format config)
val changelogManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, SOURCE)
changelogManager.start()
val localityManager = new LocalityManager(coordinatorSystemProducer, coordinatorSystemConsumer)
// We don't need to start() localityManager as they share the same instances with checkpoint and changelog managers.
// TODO: This code will go away with refactoring - SAMZA-678

localityManager.start()

// Map the name of each system to the corresponding SystemAdmin
val systemAdmins = getSystemAdmins(config)

val streamMetadataCache = new StreamMetadataCache(systemAdmins = systemAdmins, cacheTTLms = 0)
var streamPartitionCountMonitor: StreamPartitionCountMonitor = null
if (config.getMonitorPartitionChange) {
val extendedSystemAdmins = systemAdmins.filter{
case (systemName, systemAdmin) => systemAdmin.isInstanceOf[ExtendedSystemAdmin]
}
if (coordinatorSystemProducer != null) {
coordinatorSystemProducer.stop()
val inputStreamsToMonitor = config.getInputStreams.filter(systemStream => extendedSystemAdmins.contains(systemStream.getSystem))
if (inputStreamsToMonitor.nonEmpty) {
streamPartitionCountMonitor = new StreamPartitionCountMonitor(
inputStreamsToMonitor.asJava,
streamMetadataCache,
metricsRegistryMap,
config.getMonitorPartitionChangeFrequency)
}
}
val previousChangelogPartitionMapping = changelogManager.readChangeLogPartitionMapping()
val jobModelManager = getJobModelManager(config, previousChangelogPartitionMapping, localityManager, streamMetadataCache, streamPartitionCountMonitor, null)
val jobModel = jobModelManager.jobModel
// Save the changelog mapping back to the ChangelogPartitionmanager
// newChangelogPartitionMapping is the merging of all current task:changelog
// assignments with whatever we had before (previousChangelogPartitionMapping).
// We must persist legacy changelog assignments so that
// maxChangelogPartitionId always has the absolute max, not the current
// max (in case the task with the highest changelog partition mapping
// disappears.
val newChangelogPartitionMapping = jobModel.getContainers.asScala.flatMap(_._2.getTasks.asScala).map{case (taskName,taskModel) => {
taskName -> Integer.valueOf(taskModel.getChangelogPartition.getPartitionId)
}}.toMap ++ previousChangelogPartitionMapping.asScala
info("Saving task-to-changelog partition mapping: %s" format newChangelogPartitionMapping)
changelogManager.writeChangeLogPartitionMapping(newChangelogPartitionMapping.asJava)

createChangeLogStreams(config, jobModel.maxChangeLogStreamPartitions)

jobModelManager
}
def apply(coordinatorSystemConfig: Config): JobModelManager = apply(coordinatorSystemConfig, new MetricsRegistryMap())

Expand Down

0 comments on commit 888e061

Please sign in to comment.