Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Getting jep.JepException: java.util.concurrent.TimeoutException: Futures timed out after [100 seconds] exception in starting a training from ORCA pytorch estimation with BigDL backend #5560

Open
amardeepjaiman opened this issue Aug 28, 2022 · 18 comments
Assignees

Comments

@amardeepjaiman
Copy link

Hi,
I am trying to run a Fashion MNIST sample code given in BigDL repo on Azure Databricks spark cluster environment. Sample code link is here :
https://github.com/intel-analytics/BigDL/blob/main/python/orca/colab-notebook/examples/fashion_mnist_bigdl.ipynb

Cluster Configuration:

I have 1 Azure D4_V5 Based driver node and 2 Azure standard D4_V5 based worker nodes setup in my spark cluster.
Azure Databricks Runtime : 9.1 LTS ML (Scala 2.12, Spark 3.1.2)

Spark Configuration is below :
spark.executorEnv.PYTHONHOME /databricks/python3/lib/python3.8
spark.serializer org.apache.spark.serializer.JavaSerializer
spark.executorEnv.KMP_BLOCKTIME 0
spark.databricks.delta.preview.enabled true
spark.rpc.message.maxSize 2047
spark.executor.cores 3
spark.executor.memory 8g
spark.files.fetchTimeout 100000s
spark.network.timeout 100000s
spark.databricks.conda.condaMagic.enabled true
spark.driver.memory 8g
spark.scheduler.minRegisteredResourcesRatio 1.0
spark.scheduler.maxRegisteredResourcesWaitingTime 60s
spark.executor.heartbeatInterval 1000000
spark.cores.max 6
spark.default.parallelism 1000
spark.executorEnv.OMP_NUM_THREADS 1
spark.driver.cores 3

I create the estimator using
orca_estimator = Estimator.from_torch(model=net, optimizer=optimizer, loss=criterion, metrics=[Accuracy()], backend="bigdl")

and geting exception in following line :

from bigdl.orca.learn.trigger import EveryEpoch
orca_estimator.fit(data=trainloader, epochs=epochs, validation_data=testloader, checkpoint_trigger=EveryEpoch())

Please find below the full stacktrace of the error I am getting

jep.JepException: java.util.concurrent.TimeoutException: Futures timed out after [100 seconds]
at com.intel.analytics.bigdl.orca.utils.PythonInterpreter$.threadExecute(PythonInterpreter.scala:98)
at com.intel.analytics.bigdl.orca.utils.PythonInterpreter$.createInterpreter(PythonInterpreter.scala:82)
at com.intel.analytics.bigdl.orca.utils.PythonInterpreter$.init(PythonInterpreter.scala:63)
at com.intel.analytics.bigdl.orca.utils.PythonInterpreter$.check(PythonInterpreter.scala:56)
at com.intel.analytics.bigdl.orca.utils.PythonInterpreter$.exec(PythonInterpreter.scala:104)
at com.intel.analytics.bigdl.orca.net.PythonFeatureSet$.$anonfun$loadPythonSet$1(PythonFeatureSet.scala:90)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:868)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:868)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:91)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:813)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1657)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:816)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:672)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [100 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:259)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263)
at scala.concurrent.Await$.$anonfun$result$1(package.scala:220)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:57)
at scala.concurrent.Await$.result(package.scala:146)
at com.intel.analytics.bigdl.orca.utils.PythonInterpreter$.$anonfun$threadExecute$2(PythonInterpreter.scala:91)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
at com.intel.analytics.bigdl.orca.utils.PythonInterpreter$.threadExecute(PythonInterpreter.scala:90)
... 28 more

org.apache.spark.rdd.RDD.count(RDD.scala:1263)
com.intel.analytics.bigdl.orca.net.PythonFeatureSet$.loadPythonSet(PythonFeatureSet.scala:86)
com.intel.analytics.bigdl.orca.net.PythonFeatureSet.(PythonFeatureSet.scala:168)
com.intel.analytics.bigdl.orca.net.PythonFeatureSet$.python(PythonFeatureSet.scala:61)
com.intel.analytics.bigdl.orca.net.python.PythonZooNet.createFeatureSetFromPyTorch(PythonZooNet.scala:283)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
py4j.Gateway.invoke(Gateway.java:295)
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
py4j.commands.CallCommand.execute(CallCommand.java:79)
py4j.GatewayConnection.run(GatewayConnection.java:251)
java.lang.Thread.run(Thread.java:748)

Please let me know if someone has already faced this issue in past.

Also requesting BigDL official team to support on this issue as i want to use the BigDL library for my deep learning training on Spark cluster for distributed training.

Thanks in Adavance

@yangw1234
Copy link
Contributor

@qiuxin2012 could you help take a look at this issue?

@hkvision
Copy link
Contributor

Same issue as #4800?

@qiuxin2012
Copy link
Contributor

@amardeepjaiman As the notebook is for colab, so you should have done a lots of changes. Could you tell us the detail steps?
Including

  1. How to install the conda environment to azure?
  2. How to install related python dependencies, like torch torchvision jep bigdl
  3. How to execute the bigdl code? command line or notebook

@amardeepjaiman
Copy link
Author

I am using Azure Databricks Environment to execute it in notebook.
to setup environment I am using a shell script so that the required enviornment and dependency jar files are available on all the nodes. Please find the content of the shell script below :

#!/bin/bash
export JAVA_HOME='/usr/lib/jvm/java-8-openjdk-amd64'
export PYTHONPATH='/databricks/python3'
/databricks/python/bin/pip install bigdl-spark3
/databricks/python/bin/pip install torch==1.11.0+cpu torchvision==0.12.0+cpu six cloudpickle argparse tqdm matplotlib tensorboard -f https://download.pytorch.org/whl/torch_stable.html
JAVA_HOME='/usr/lib/jvm/java-8-openjdk-amd64' /databricks/python/bin/pip install jep

I am trying to runn the BigDL code in databricks notebook not command line. Please let me know if you need more information.

@qiuxin2012
Copy link
Contributor

@PatrickkZ Please try to reproduce the error, or find a right way to run the notebook.

@PatrickkZ
Copy link
Contributor

@amardeepjaiman, hi, we have already reproduce the same error on databricks. we are finding a way to solve this problem, We'll let you know when we have more information.

@xbinglzh
Copy link

xbinglzh commented Sep 1, 2022

the same error

@xbinglzh
Copy link

xbinglzh commented Sep 5, 2022

How long will it take?

@xbinglzh
Copy link

xbinglzh commented Sep 5, 2022

Layer info: TorchModel[5d5e341e]
jep.JepException: java.util.concurrent.TimeoutException: Futures timed out after [100 seconds]
at com.intel.analytics.bigdl.orca.utils.PythonInterpreter$.threadExecute(PythonInterpreter.scala:98)
at com.intel.analytics.bigdl.orca.utils.PythonInterpreter$.exec(PythonInterpreter.scala:108)
at com.intel.analytics.bigdl.orca.net.TorchModel.updateOutput(TorchModel.scala:131)
at com.intel.analytics.bigdl.dllib.nn.abstractnn.AbstractModule.forward(AbstractModule.scala:283)
at com.intel.analytics.bigdl.dllib.optim.DistriOptimizer$$anonfun$4$$anonfun$5$$anonfun$apply$2.apply$mcI$sp(DistriOptimizer.scala:272)
at com.intel.analytics.bigdl.dllib.optim.DistriOptimizer$$anonfun$4$$anonfun$5$$anonfun$apply$2.apply(DistriOptimizer.scala:263)
at com.intel.analytics.bigdl.dllib.optim.DistriOptimizer$$anonfun$4$$anonfun$5$$anonfun$apply$2.apply(DistriOptimizer.scala:263)
at com.intel.analytics.bigdl.dllib.utils.ThreadPool$$anonfun$1$$anon$5.call(ThreadPool.scala:160)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [100 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at com.intel.analytics.bigdl.orca.utils.PythonInterpreter$$anonfun$5.apply(PythonInterpreter.scala:91)
at com.intel.analytics.bigdl.orca.utils.PythonInterpreter$$anonfun$5.apply(PythonInterpreter.scala:90)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at com.intel.analytics.bigdl.orca.utils.PythonInterpreter$.threadExecute(PythonInterpreter.scala:90)
... 11 more

at com.intel.analytics.bigdl.dllib.nn.abstractnn.AbstractModule.forward(AbstractModule.scala:289)
at com.intel.analytics.bigdl.dllib.optim.DistriOptimizer$$anonfun$4$$anonfun$5$$anonfun$apply$2.apply$mcI$sp(DistriOptimizer.scala:272)
at com.intel.analytics.bigdl.dllib.optim.DistriOptimizer$$anonfun$4$$anonfun$5$$anonfun$apply$2.apply(DistriOptimizer.scala:263)
at com.intel.analytics.bigdl.dllib.optim.DistriOptimizer$$anonfun$4$$anonfun$5$$anonfun$apply$2.apply(DistriOptimizer.scala:263)
at com.intel.analytics.bigdl.dllib.utils.ThreadPool$$anonfun$1$$anon$5.call(ThreadPool.scala:160)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

@qiuxin2012
Copy link
Contributor

qiuxin2012 commented Sep 13, 2022

@amardeepjaiman @xbinglzh I failed to enable jep backend pytorch estimator. But I run pyspark backend pytorch estimator successfully. See example https://github.com/intel-analytics/BigDL/blob/v2.0.0/python/orca/example/learn/pytorch/fashion_mnist/fashion_mnist.py
You can try it with following configuration.
Databricks's Init script:

#!/bin/bash
apt-get install openjdk-8-jdk-headless -qq > /dev/null
python -c "import os;os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'"
update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java

export JAVA_HOME='/usr/lib/jvm/java-8-openjdk-amd64'

/databricks/python/bin/pip install numpy==1.22.3

/databricks/python/bin/pip install bigdl-orca-spark3 tqdm
/databricks/python/bin/pip install torch==1.11.0+cpu torchvision==0.12.0+cpu tensorboard -f https://download.pytorch.org/whl/torch_stable.html
/databricks/python/bin/pip install cloudpickle

cp /databricks/python/lib/python3.8/site-packages/bigdl/share/*/lib/*.jar /databricks/jars

Databricks's Spark Conf: (spark.executor.cores, spark.cores.max should match your cluster, my is one 4 cores executor.)

spark.driver.extraLibraryPath /databricks/python3/lib/
spark.cores.max 4
spark.executor.extraLibraryPath /databricks/python3/lib/
spark.executor.cores 4

You need to delete the argments parser in the notebook and use following arguments:

    cluster_mode = "spark-submit"
    runtime = "spark"
    address=""
    backend="spark"
    batch_size=4
    epochs=2
    data_dir="./data"
    download=True

@qiuxin2012
Copy link
Contributor

You can use below code in your notebook directly:

from __future__ import print_function
import os
import argparse
import numpy as np
import matplotlib.pyplot as plt

import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from bigdl.orca import init_orca_context, stop_orca_context
from bigdl.orca.learn.pytorch import Estimator
from bigdl.orca.learn.metrics import Accuracy
from bigdl.orca.learn.trigger import EveryEpoch


def train_data_creator(config={}, batch_size=4, download=True, data_dir='./data'):
    transform = transforms.Compose(
        [transforms.ToTensor(),
         transforms.Normalize((0.5,), (0.5,))])

    trainset = torchvision.datasets.FashionMNIST(root=data_dir,
                                                 download=download,
                                                 train=True,
                                                 transform=transform)
    trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size,
                                              shuffle=True, num_workers=0)
    return trainloader


def validation_data_creator(config={}, batch_size=4, download=True, data_dir='./data'):
    transform = transforms.Compose(
        [transforms.ToTensor(),
         transforms.Normalize((0.5,), (0.5,))])
    testset = torchvision.datasets.FashionMNIST(root=data_dir, train=False,
                                                download=download, transform=transform)
    testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size,
                                             shuffle=False, num_workers=0)
    return testloader


# helper function to show an image
def matplotlib_imshow(img, one_channel=False):
    if one_channel:
        img = img.mean(dim=0)
    img = img / 2 + 0.5     # unnormalize
    npimg = img.numpy()
    if one_channel:
        plt.imshow(npimg, cmap="Greys")
    else:
        plt.imshow(np.transpose(npimg, (1, 2, 0)))


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 4 * 4, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 4 * 4)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


def model_creator(config):
    model = Net()
    return model


def optimizer_creator(model, config):
    optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
    return optimizer


def main():
    cluster_mode = "spark-submit"
    runtime = "spark"
    address=""
    backend="spark"
    batch_size=4
    epochs=2
    data_dir="./data"
    download=True

    if runtime == "ray":
        init_orca_context(runtime=runtime, address=address)
    else:
        if cluster_mode == "local":
            init_orca_context()
        elif cluster_mode.startswith("yarn"):
            init_orca_context(cluster_mode=cluster_mode, cores=4, num_nodes=2)
        elif cluster_mode == "spark-submit":
            init_orca_context(cluster_mode=cluster_mode)

    tensorboard_dir = data_dir+"runs"
    # constant for classes
    classes = ('T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat',
               'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle Boot')

    # plot some random training images
    dataiter = iter(train_data_creator(config={}, batch_size=4,
                                       download=download, data_dir=data_dir))
    images, labels = dataiter.next()

    # create grid of images
    img_grid = torchvision.utils.make_grid(images)

    # show images
    matplotlib_imshow(img_grid, one_channel=True)

    # training loss vs. epochs
    criterion = nn.CrossEntropyLoss()
    batch_size = batch_size
    epochs = epochs
    if backend == "bigdl":
        train_loader = train_data_creator(config={}, batch_size=4,
                                          download=download, data_dir=data_dir)
        test_loader = validation_data_creator(config={}, batch_size=4,
                                              download=download, data_dir=data_dir)

        net = model_creator(config={})
        optimizer = optimizer_creator(model=net, config={"lr": 0.001})
        orca_estimator = Estimator.from_torch(model=net,
                                              optimizer=optimizer,
                                              loss=criterion,
                                              metrics=[Accuracy()],
                                              backend="bigdl")

        orca_estimator.set_tensorboard(tensorboard_dir, "bigdl")

        orca_estimator.fit(data=train_loader, epochs=epochs, validation_data=test_loader,
                           checkpoint_trigger=EveryEpoch())

        res = orca_estimator.evaluate(data=test_loader)
        print("Accuracy of the network on the test images: %s" % res)
    elif backend in ["ray", "spark"]:
        orca_estimator = Estimator.from_torch(model=model_creator,
                                              optimizer=optimizer_creator,
                                              loss=criterion,
                                              metrics=[Accuracy()],
                                              model_dir=os.getcwd(),
                                              use_tqdm=True,
                                              backend=backend)
        stats = orca_estimator.fit(train_data_creator, epochs=epochs, batch_size=batch_size)

        for stat in stats:
            writer.add_scalar("training_loss", stat['train_loss'], stat['epoch'])
        print("Train stats: {}".format(stats))
        val_stats = orca_estimator.evaluate(validation_data_creator, batch_size=batch_size)
        print("Validation stats: {}".format(val_stats))
        orca_estimator.shutdown()
    else:
        raise NotImplementedError("Only bigdl, torch_distributed, and spark are supported "
                                  "as the backend, but got {}".format(args.backend))

    stop_orca_context()


main()

@amardeepjaiman
Copy link
Author

ok. let me try and get back to you.

@amardeepjaiman
Copy link
Author

Hi @qiuxin2012 ,

I tried to use the init script shared by you , but I am getting init script faliure while starting the databricks cluster. Which databricks runtime version you are using ? Please check the attached error snapshot and cluster configuration.

Screenshot 2022-09-19 at 9 55 42 AM

Screenshot 2022-09-19 at 9 55 21 AM

@qiuxin2012
Copy link
Contributor

qiuxin2012 commented Sep 19, 2022

My is 9.1 LTS(include spark 3.1.2, Scala 2.12). See the image below.

image

@amardeepjaiman
Copy link
Author

@qiuxin2012

Cluster is up with init script. When i run the given source code with Spark backend, training seems to be started but getting following error with model save directory in save_pkl function.

java.io.FileNotFoundException: /databricks/driver/state.pkl

Py4JJavaError Traceback (most recent call last)
in
----> 1 main()

in main()
70 use_tqdm=True,
71 backend=backend)
---> 72 stats = orca_estimator.fit(train_data_creator, epochs=epochs, batch_size=batch_size)
73 print(stats)
74 for stat in stats:

/databricks/python/lib/python3.8/site-packages/bigdl/orca/learn/pytorch/pytorch_pyspark_estimator.py in fit(self, data, epochs, batch_size, profile, reduce_results, info, feature_cols, label_cols, callbacks)
260 lambda iter: transform_func(iter, init_params, params)).collect()
261
--> 262 self.state_dict = PyTorchPySparkEstimator._get_state_dict_from_remote(self.model_dir)
263 worker_stats = res
264

/databricks/python/lib/python3.8/site-packages/bigdl/orca/learn/pytorch/pytorch_pyspark_estimator.py in _get_state_dict_from_remote(remote_dir)
278 try:
279 temp_dir = tempfile.mkdtemp()
--> 280 get_remote_file_to_local(os.path.join(remote_dir, "state.pkl"),
281 os.path.join(temp_dir, "state.pkl"),
282 over_write=True)

/databricks/python/lib/python3.8/site-packages/bigdl/dllib/utils/file_utils.py in get_remote_file_to_local(remote_path, local_path, over_write)
144
145 def get_remote_file_to_local(remote_path, local_path, over_write=False):
--> 146 callZooFunc("float", "getRemoteFileToLocal", remote_path, local_path, over_write)
147
148

/databricks/python/lib/python3.8/site-packages/bigdl/dllib/utils/file_utils.py in callZooFunc(bigdl_type, name, *args)
225 if not ("does not exist" in str(e)
226 and "Method {}".format(name) in str(e)):
--> 227 raise e
228 else:
229 return result

/databricks/python/lib/python3.8/site-packages/bigdl/dllib/utils/file_utils.py in callZooFunc(bigdl_type, name, *args)
219 try:
220 api = getattr(jinvoker, name)
--> 221 java_result = api(*args)
222 result = _java2py(gateway, java_result)
223 except Exception as e:

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in call(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
115 def deco(*a, **kw):
116 try:
--> 117 return f(*a, **kw)
118 except py4j.protocol.Py4JJavaError as e:
119 converted = convert_exception(e.java_exception)

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o372.getRemoteFileToLocal.
: java.io.FileNotFoundException: /databricks/driver/state.pkl
at com.databricks.backend.daemon.data.client.DbfsClient.send0(DbfsClient.scala:120)
at com.databricks.backend.daemon.data.client.DbfsClient.sendIdempotent(DbfsClient.scala:68)
at com.databricks.backend.daemon.data.client.DatabricksFileSystemV1.open(DatabricksFileSystemV1.scala:80)
at com.databricks.backend.daemon.data.client.DatabricksFileSystemV1.open(DatabricksFileSystemV1.scala:89)
at com.databricks.backend.daemon.data.client.DatabricksFileSystem.open(DatabricksFileSystem.scala:88)
at com.intel.analytics.bigdl.dllib.common.zooUtils$.getRemoteFileToLocal(zooUtils.scala:240)
at com.intel.analytics.bigdl.dllib.common.PythonZoo.getRemoteFileToLocal(PythonZoo.scala:254)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)

It seems model_dir path we are giving in params , it is not taking it on databrick. I have also tried to give shared location path on DBFS as '/dbfs/FileStore' and 'dbfs:/FileStore' but it is also not taking it.
Could you please let me know about it how did you resolve this issue ? I am seeing several reported issues regarding model_dir path on databricks for spark backend.

Thanks,
Amardeep

@amardeepjaiman
Copy link
Author

Hi @qiuxin2012 ,

I was able to solve the above issue using the latest nightly build of bigdl-spark3 from BigDL repos. Now the training is running with above configuration where I have 1 min worker (with4 cores) assigned and training is running on single worker node.
Now if I change the Min Worker in the databricks cluster configuration to 2. Now i have 2 worker nodes with 4 cores. So i changed the spark configuration spark.cores.max 4 to spark.cores.max 8.
Ideally now training should be distributedly running on both worker nodes and using all 8 cores. But I get an exception while running this. Please find below the stacktrace below.

org.apache.spark.SparkException: Job aborted due to stage failure: Could not recover from a failed barrier ResultStage. Most recent failure reason: Stage failed because barrier task ResultTask(5, 1) finished unsuccessfully.

Py4JJavaError Traceback (most recent call last)

in
7 checkpoint_trigger=EveryEpoch())
8 elif backend in ["ray", "spark"]:
----> 9 stats = orca_estimator.fit(train_data_creator, epochs=epochs, batch_size=batch_size)
10 print(stats)
11 #for stat in stats:

/local_disk0/spark-535f5fcb-a5b3-4f65-978d-b2904ffacebc/userFiles-3b1eb76a-8e9c-40ee-85cc-cd7442a8bd6b/addedFile9081478138653500889bigdl_spark_3_1_2_2_1_0_SNAPSHOT_python_api-57d27.egg/bigdl/orca/learn/pytorch/pytorch_pyspark_estimator.py in fit(self, data, epochs, batch_size, profile, reduce_results, info, feature_cols, label_cols, validation_data, callbacks)
294 return PytorchPysparkWorker(**init_param).train_epochs(**param)
295
--> 296 res = self.workerRDD.barrier().mapPartitions(
297 lambda iter: transform_func(iter, init_params, params)).collect()
298

/databricks/spark/python/pyspark/rdd.py in collect(self)
965 # Default path used in OSS Spark / for non-credential passthrough clusters:
966 with SCCallSiteSync(self.context) as css:
--> 967 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
968 return list(_load_from_socket(sock_info, self._jrdd_deserializer))
969

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in call(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
115 def deco(*a, **kw):
116 try:
--> 117 return f(*a, **kw)
118 except py4j.protocol.Py4JJavaError as e:
119 converted = convert_exception(e.java_exception)

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Could not recover from a failed barrier ResultStage. Most recent failure reason: Stage failed because barrier task ResultTask(5, 1) finished unsuccessfully.
org.apache.spark.api.python.PythonException: 'RuntimeError: [../third_party/gloo/gloo/transport/tcp/pair.cc:799] connect [127.0.1.1]:46496: Connection refused'. Full traceback below:
Traceback (most recent call last):
File "/databricks/spark/python/pyspark/worker.py", line 754, in main
process()
File "/databricks/spark/python/pyspark/worker.py", line 744, in process
out_iter = func(split_index, iterator)
File "/databricks/spark/python/pyspark/rdd.py", line 2900, in func
return f(iterator)
File "/local_disk0/spark-535f5fcb-a5b3-4f65-978d-b2904ffacebc/userFiles-3b1eb76a-8e9c-40ee-85cc-cd7442a8bd6b/addedFile9081478138653500889bigdl_spark_3_1_2_2_1_0_SNAPSHOT_python_api-57d27.egg/bigdl/orca/learn/pytorch/pytorch_pyspark_estimator.py", line 297, in
File "/local_disk0/spark-535f5fcb-a5b3-4f65-978d-b2904ffacebc/userFiles-3b1eb76a-8e9c-40ee-85cc-cd7442a8bd6b/addedFile9081478138653500889bigdl_spark_3_1_2_2_1_0_SNAPSHOT_python_api-57d27.egg/bigdl/orca/learn/pytorch/pytorch_pyspark_estimator.py", line 294, in transform_func
File "/local_disk0/spark-1f5a35e9-7687-40cd-a547-b12c6fb12535/executor-229da1dd-799c-4cbc-8f41-67678a47db52/spark-c9524750-d4e8-4c08-b4d7-79163cd29f60/addedFile9081478138653500889bigdl_spark_3_1_2_2_1_0_SNAPSHOT_python_api-57d27.egg/bigdl/orca/learn/pytorch/pytorch_pyspark_worker.py", line 95, in init
self.setup_distributed(self.mode, cluster_info, driver_ip, driver_tcp_store_port)
File "/local_disk0/spark-1f5a35e9-7687-40cd-a547-b12c6fb12535/executor-229da1dd-799c-4cbc-8f41-67678a47db52/spark-c9524750-d4e8-4c08-b4d7-79163cd29f60/addedFile9081478138653500889bigdl_spark_3_1_2_2_1_0_SNAPSHOT_python_api-57d27.egg/bigdl/orca/learn/pytorch/pytorch_pyspark_worker.py", line 117, in setup_distributed
self.setup_torch_distribute(tcp_store_host=driver_ip,
File "/local_disk0/spark-1f5a35e9-7687-40cd-a547-b12c6fb12535/executor-229da1dd-799c-4cbc-8f41-67678a47db52/spark-c9524750-d4e8-4c08-b4d7-79163cd29f60/addedFile9081478138653500889bigdl_spark_3_1_2_2_1_0_SNAPSHOT_python_api-57d27.egg/bigdl/orca/learn/pytorch/torch_runner.py", line 174, in setup_torch_distribute
dist.init_process_group(
File "/databricks/python/lib/python3.8/site-packages/torch/distributed/distributed_c10d.py", line 602, in init_process_group
default_pg = _new_process_group_helper(
File "/databricks/python/lib/python3.8/site-packages/torch/distributed/distributed_c10d.py", line 703, in _new_process_group_helper
pg = ProcessGroupGloo(prefix_store, rank, world_size, timeout=timeout)
RuntimeError: [../third_party/gloo/gloo/transport/tcp/pair.cc:799] connect [127.0.1.1]:46496: Connection refused

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:642)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:830)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:812)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:595)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1038)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:91)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:819)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1657)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:822)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:678)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2873)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2820)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2814)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2814)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:2571)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3078)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3022)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3010)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1112)
at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2494)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1036)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:419)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1034)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:260)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)

@qiuxin2012
Copy link
Contributor

@amardeepjaiman Sorry for the late response. We have reproduced your new error, I will info you when we find a solution.
The error will happen when number of executors >= 2.

@PatrickkZ
Copy link
Contributor

@amardeepjaiman, hi, you can fix this by add a environment variable GLOO_SOCKET_IFNAME.

  1. execute !ifconfig in the notebook
    image

  2. set GLOO_SOCKET_IFNAME to your first Ethernet interface(mine is eth0)
    image

this works for me when I have 2 workers.

here is my init script

# use the latest version of orca
/databricks/python/bin/pip install --pre --upgrade bigdl-orca-spark3
/databricks/python/bin/pip install tqdm
/databricks/python/bin/pip install torch==1.11.0+cpu torchvision==0.12.0+cpu tensorboard -f https://download.pytorch.org/whl/torch_stable.html
/databricks/python/bin/pip install cloudpickle

cp /databricks/python/lib/python3.8/site-packages/bigdl/share/*/lib/*.jar /databricks/jars

As for the model_dir problem, you can just leave it to None

elif backend in ["ray", "spark"]:
      orca_estimator = Estimator.from_torch(model=model_creator,
                                          optimizer=optimizer_creator,
                                          loss=criterion,
                                          metrics=[Accuracy()],
                                          model_dir=None,
                                          use_tqdm=True,
                                          backend=backend)

if model_dir is not None, it should be a path starts with /dbfs or dbfs:, but this won't work until this pr is merged. so you can try it later, for now, just leave it to None.

reference

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

7 participants