Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OConcurrentModificationException occurs when insert after delete #7940

Closed
weickdev opened this issue Dec 16, 2017 · 20 comments
Closed

OConcurrentModificationException occurs when insert after delete #7940

weickdev opened this issue Dec 16, 2017 · 20 comments
Assignees
Labels
Milestone

Comments

@weickdev
Copy link

OrientDB Version: 3.0.0.SNAPSHOT

Java Version: 1.8

OS: CentOS

When I insert massive edges, I get below exception occasionally.

com.orientechnologies.orient.core.exception.OConcurrentModificationException: Cannot UPDATE the record #156:26159 because the version is not the latest. Probably you are updating an old record or it has been modified by another user (db=v3 your=v2)
	DB name="test"
	DB name="test"
	Error Code="3"
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_131]
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_131]
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_131]
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_131]
	at com.orientechnologies.orient.client.binary.OChannelBinaryAsynchClient.throwSerializedException(OChannelBinaryAsynchClient.java:318) ~[orientdb-client-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
	at com.orientechnologies.orient.client.binary.OChannelBinaryAsynchClient.handleStatus(OChannelBinaryAsynchClient.java:275) ~[orientdb-client-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
	at com.orientechnologies.orient.client.binary.OChannelBinaryAsynchClient.beginResponse(OChannelBinaryAsynchClient.java:191) ~[orientdb-client-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
	at com.orientechnologies.orient.client.binary.OChannelBinaryAsynchClient.beginResponse(OChannelBinaryAsynchClient.java:153) ~[orientdb-client-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
	at com.orientechnologies.orient.client.remote.OStorageRemote.beginResponse(OStorageRemote.java:1693) ~[orientdb-client-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
	at com.orientechnologies.orient.client.remote.OStorageRemote.lambda$networkOperationRetryTimeout$2(OStorageRemote.java:226) ~[orientdb-client-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
	at com.orientechnologies.orient.client.remote.OStorageRemote.baseNetworkOperation(OStorageRemote.java:279) ~[orientdb-client-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
	at com.orientechnologies.orient.client.remote.OStorageRemote.networkOperationRetryTimeout(OStorageRemote.java:214) ~[orientdb-client-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
	at com.orientechnologies.orient.client.remote.OStorageRemote.networkOperation(OStorageRemote.java:243) ~[orientdb-client-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
	at com.orientechnologies.orient.client.remote.OStorageRemote.command(OStorageRemote.java:835) ~[orientdb-client-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
	at com.orientechnologies.orient.core.command.OCommandRequestTextAbstract.execute(OCommandRequestTextAbstract.java:68) ~[orientdb-core-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
	at com.tinkerpop.blueprints.impls.orient.OrientGraphCommand.execute(OrientGraphCommand.java:49) ~[orientdb-graphdb-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
	at com.haizhi.graph.lab.orientdb.orientdb.runner.HbaseRelationScan.lambda$null$14(HbaseRelationScan.java:354) ~[classes!/:1.0-SNAPSHOT]
	at java.util.ArrayList.forEach(ArrayList.java:1249) ~[na:1.8.0_131]
	at com.haizhi.graph.lab.orientdb.orientdb.runner.HbaseRelationScan.lambda$saveByBatchQueryWithoutCache$15(HbaseRelationScan.java:344) ~[classes!/:1.0-SNAPSHOT]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_131]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_131]
	at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_131]
	Suppressed: com.orientechnologies.orient.core.exception.OConcurrentModificationException: Cannot UPDATE the record #156:26159 because the version is not the latest. Probably you are updating an old record or it has been modified by another user (db=v3 your=v2)
	DB name="test"
		at com.orientechnologies.orient.core.conflict.OVersionRecordConflictStrategy.checkVersions(OVersionRecordConflictStrategy.java:56) ~[orientdb-core-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
		at com.orientechnologies.orient.core.conflict.OVersionRecordConflictStrategy.onUpdate(OVersionRecordConflictStrategy.java:43) ~[orientdb-core-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
		at com.orientechnologies.orient.core.storage.impl.local.OAbstractPaginatedStorage.checkAndIncrementVersion(OAbstractPaginatedStorage.java:4733) ~[orientdb-core-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
		at com.orientechnologies.orient.core.storage.impl.local.OAbstractPaginatedStorage.doUpdateRecord(OAbstractPaginatedStorage.java:4221) ~[orientdb-core-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
		at com.orientechnologies.orient.core.storage.impl.local.OAbstractPaginatedStorage.commitEntry(OAbstractPaginatedStorage.java:4793) ~[orientdb-core-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
		at com.orientechnologies.orient.core.storage.impl.local.OAbstractPaginatedStorage.commit(OAbstractPaginatedStorage.java:1878) ~[orientdb-core-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
		at com.orientechnologies.orient.core.storage.impl.local.OAbstractPaginatedStorage.commit(OAbstractPaginatedStorage.java:1736) ~[orientdb-core-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
		at com.orientechnologies.orient.core.db.document.ODatabaseDocumentAbstract.internalCommit(ODatabaseDocumentAbstract.java:3020) ~[orientdb-core-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
		at com.orientechnologies.orient.core.tx.OTransactionOptimistic.doCommit(OTransactionOptimistic.java:539) ~[orientdb-core-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
		at com.orientechnologies.orient.core.tx.OTransactionOptimistic.commit(OTransactionOptimistic.java:103) ~[orientdb-core-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
		at com.orientechnologies.orient.core.db.document.ODatabaseDocumentAbstract.commit(ODatabaseDocumentAbstract.java:2456) ~[orientdb-core-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
		at com.orientechnologies.orient.core.db.document.ODatabaseDocumentAbstract.commit(ODatabaseDocumentAbstract.java:2419) ~[orientdb-core-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
		at com.orientechnologies.orient.core.sql.OCommandExecutorSQLCreateEdge.execute(OCommandExecutorSQLCreateEdge.java:225) ~[orientdb-core-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
		at com.orientechnologies.orient.core.sql.OCommandExecutorSQLDelegate.execute(OCommandExecutorSQLDelegate.java:74) ~[orientdb-core-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
		at com.orientechnologies.orient.core.storage.impl.local.OAbstractPaginatedStorage.executeCommand(OAbstractPaginatedStorage.java:3658) ~[orientdb-core-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
		at com.orientechnologies.orient.core.storage.impl.local.OAbstractPaginatedStorage.command(OAbstractPaginatedStorage.java:3595) ~[orientdb-core-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
		at com.orientechnologies.orient.core.command.OCommandRequestTextAbstract.execute(OCommandRequestTextAbstract.java:68) ~[orientdb-core-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
		at com.orientechnologies.orient.server.OConnectionBinaryExecutor.executeCommand(OConnectionBinaryExecutor.java:581) ~[orientdb-server-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
		at com.orientechnologies.orient.client.remote.message.OCommandRequest.execute(OCommandRequest.java:103) ~[orientdb-client-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
		at com.orientechnologies.orient.server.network.protocol.binary.ONetworkProtocolBinary.sessionRequest(ONetworkProtocolBinary.java:276) ~[orientdb-server-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
		at com.orientechnologies.orient.server.network.protocol.binary.ONetworkProtocolBinary.execute(ONetworkProtocolBinary.java:182) ~[orientdb-server-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]
		at com.orientechnologies.common.thread.OSoftThread.run(OSoftThread.java:82) ~[orientdb-core-3.0.0-SNAPSHOT.jar!/:3.0.0-SNAPSHOT]

In this error log, #156:26159 is an edge. Before this massive edge inserting, I have ran delete command to delete all datas of this edge collection. But I can't make sure if it was causes by the delete operation.
If I create a new edge collection and and re-insert all the datas (without run the delete command), it woks well.

@andrii0lomakin
Copy link
Member

Looks like transaction still active after delete and there are stale data causing CME. Assigned to @tglman because of that.

@andrii0lomakin andrii0lomakin added this to the 3.0 milestone Dec 16, 2017
@weickdev
Copy link
Author

weickdev commented Dec 18, 2017

@Laa I meet this error once again today. This time, the db is a new one, and has no delete operation before edge insert. By the way:

  1. I use OrientGraphNoTx
  2. I set massive insert intent: graphNoTx.declareIntent(new OIntentMassiveInsert());
  3. I set -DridBag.embeddedToSbtreeBonsaiThreshold=-1

@blueAnger
Copy link

I built the source code of branch develop and got the same exception too. New database, new class. I tried to batch-insert data into a edge class several times, but the same exception repeatedly occurs. And there's no guarantee where or when the exception was thrown. It just kept jumping out of nowhere. When the exception occurs, the server is hung and got no response so I have to kill and restart it.

@andrii0lomakin
Copy link
Member

@blueAnger could you provide source code for us?

@weickdev
Copy link
Author

weickdev commented Dec 29, 2017

@Laa Below is my code:

private void saveByBatchQueryWithoutCache(List<Map<String, Object>> list) {
        List<Map<String, Object>> copyList = new ArrayList<>();
        copyList.addAll(list);
        try {
            semaphore.acquire();
        } catch (InterruptedException e) {
            logger.error("main thread is interrupted: {}", e);
        }

        executorService.execute(() -> {
           //step 1: get OrientGraphNoTx
            StringBuffer batchEdgeInsert = new StringBuffer();
            OrientGraphNoTx graphNoTx = orientdbUtil.getOrientGraphNoTx();
            graphNoTx.declareIntent(new OIntentMassiveInsert());

            //step 2: get all the vertex rid by the business key
            //<class, keys>
            Map<String, Set<String>> keysToSearchMap = new HashMap<>();
            copyList.forEach(map -> {
                String from = map.get("_from").toString();
                String fromClass = from.substring(0, from.indexOf("/"));
                String to = map.get("_to").toString();
                String toClass = to.substring(0, to.indexOf("/"));

                Set<String> fromKeys = keysToSearchMap.computeIfAbsent(fromClass, key -> new HashSet<>());
                Set<String> toKeys = keysToSearchMap.computeIfAbsent(toClass, key -> new HashSet<>());
                fromKeys.add(from.substring(from.indexOf("/") + 1));
                toKeys.add(to.substring(to.indexOf("/") + 1));
            });

            //<key, rid>
            Map<String, String> ridMap = new HashMap<>();
            keysToSearchMap.forEach((cls, keys) -> {
                OrientDynaElementIterable iterable = graphNoTx.command(new OCommandSQL(String.format("SELECT FROM %s WHERE _key in ?", cls))).execute(keys);
                iterable.forEach(obj -> {
                    Vertex vertex = (Vertex) obj;
                    ridMap.put(cls + "/" + vertex.getProperty("_key"), vertex.getId().toString());
                });
            });
           
            //step 3: insert the edges
            copyList.forEach(item -> {
                String fromId = ridMap.get(item.get("_from").toString());
                String toId = ridMap.get(item.get("_to").toString());
                batchEdgeInsert.setLength(0);
                batchEdgeInsert.append("CREATE EDGE ").append(tableName)
                        .append(" FROM ").append(fromId)
                        .append(" TO ").append(toId)
                        .append(" CONTENT ").append(JSON.toJSONString(item)).append(";\n ");
                try {
                    graphNoTx.command(new OCommandSQL(batchEdgeInsert.toString())).execute();
                } catch (ORecordDuplicatedException e) {
                    logger.warn("duplicate exception");
                } catch (Exception e) {
                    logger.error("insert error: {}", e);
                }
            });
            semaphore.release();
            graphNoTx.declareIntent(null);
            graphNoTx.shutdown();
        });
    }

I get graphNoTx by this:

OrientGraphFactory graphFactory = new OrientGraphFactory(dbUrl, username, password).setupPool(1, 40);
...
public OrientGraphNoTx getOrientGraphNoTx(){
        return graphFactory.getNoTx();
    }

@tglman
Copy link
Member

tglman commented Jan 2, 2018

Hi @weickdev,

Since OrientDB 3.0 the Tinker Pop 2 API have been deprecated, we suggest to use the multimodel API: http://orientdb.com/docs/3.0.x/java/Java-MultiModel-API.html or Tinker Pop 3.0 : http://orientdb.com/docs/3.0.x/tinkerpop3/OrientDB-TinkerPop3.html, some of the problem you may be experiencing may be due to the old API.

Regards

@weickdev
Copy link
Author

weickdev commented Jan 3, 2018

@tglman thanks for your replay, I will try the new API.

@blueAnger
Copy link

blueAnger commented Jan 4, 2018

Hi @tglman ,
I am sorry, but same exception occurs when concurrently adding edges. My code is as follows. You can compare it with what weickdev posted.

private void saveByBatchQueryWithoutCache(List<Map<String, Object>> list) {
        List<Map<String, Object>> copyList = new ArrayList<>();
        copyList.addAll(list);
        try {
            semaphore.acquire();
        } catch (InterruptedException e) {
            logger.error("main thread is interrupted: {}", e);
        }

        executorService.execute(() -> {
            //<class, keys>
            Map<String, Set<String>> keysToSearchMap = new HashMap<>();
            copyList.forEach(map -> {
                String from = map.get("_from").toString();
                String fromClass = from.substring(0, from.indexOf("/"));
                String to = map.get("_to").toString();
                String toClass = to.substring(0, to.indexOf("/"));

                Set<String> fromKeys = keysToSearchMap.computeIfAbsent(fromClass, key -> new HashSet<>());
                Set<String> toKeys = keysToSearchMap.computeIfAbsent(toClass, key -> new HashSet<>());
                fromKeys.add(from.substring(from.indexOf("/") + 1));
                toKeys.add(to.substring(to.indexOf("/") + 1));
            });

            ODatabaseSession session = orientdbUtil.getGraphSession();
            session.begin();
//            session.declareIntent(new OIntentMassiveInsert());

            long start = System.currentTimeMillis();
            //<key, rid>
            Map<String, OVertex> ridMap = new HashMap<>();
            keysToSearchMap.forEach((cls, keys) -> {
                OResultSet resultSet = session.query("SELECT FROM ? WHERE _key in ?", cls, keys);
                resultSet.vertexStream().forEach(vertex -> ridMap.put(cls + "/" + vertex.getProperty("_key"), vertex));
                resultSet.close();
            });

            copyList.forEach(item -> {
                OVertex fromVertex = ridMap.get(item.get("_from").toString());
                OVertex toVertex = ridMap.get(item.get("_to").toString());
                long insertStart = System.currentTimeMillis();
                try {
                    OEdge edge = session.newEdge(fromVertex, toVertex, tableName);
                    item.forEach(edge::setProperty);
                    edge.save();
                } catch (Exception e) {
                    logger.error("insert error: {}", e);
                }
                logger.debug("create current edge costs {} ms", System.currentTimeMillis() - insertStart);
            });
            logger.info("create {} item cause {} ms", copyList.size(), System.currentTimeMillis() - start);
            semaphore.release();
            session.commit();
            session.close();
        });
    }

how I got the ODatabaseSession

private ODatabasePool pool;

    @PostConstruct
    private void init()
    {
        pool = new ODatabasePool(dbUrl, username, password);
    }

    public ODatabaseSession getGraphSession() {
        return pool.acquire();
    }

Also, I found this documentation, I already set it. The JAVA_OPTS in bin/server.sh is as follows.

if [ -z "$JAVA_OPTS_SCRIPT" ] ; then
    JAVA_OPTS_SCRIPT="-Djna.nosys=true -XX:+HeapDumpOnOutOfMemoryError -Djava.awt.headless=true -Dfile.encoding=UTF8 -Drhino.opt.level=9 -DridBag.embeddedToSbtreeBonsaiThreshold=-1 -Dstorage.useWAL=false -Dstorage.diskCache.bufferSize=102400 -XX:+PrintGCDetails"

Also, as you can see, I use session transaction in my code to make the get and set together an atomic operation. I don't know what else to do. Hope my information helps.

Thanks.

@tglman
Copy link
Member

tglman commented Jan 4, 2018

Hi @blueAnger,

We will check this and let you know.

Regards

@tglman
Copy link
Member

tglman commented Jan 4, 2018

hi @blueAnger, @weickdev,

I wrote a simple test case similar to your case, with no luck to reproduce that, here is my test case:

public class TestInsert {
  private ODatabasePool pool;

  @Before
  public void before() {
    pool = new ODatabasePool("remote:localhost/test", "admin", "admin");
  }

  @After
  public void after() {
    pool.close();
  }

  @Test
  public void saveByBatchQueryWithoutCache() {
    String tableName = "E";
    ODatabaseSession insertSession = pool.acquire();
    insertSession.begin();
    Set<String> keyToSearch = new HashSet<>();
    for (int i = 0; i < 20; i++) {
      OVertex v = insertSession.newVertex("one");
      String key = "a" + i;
      keyToSearch.add(key);
      v.setProperty("_key", key);
      insertSession.save(v);
    }
    insertSession.commit();
    insertSession.close();
    for (int z = 0; z < 5; z++) {
      ODatabaseSession session = pool.acquire();

      long start = System.currentTimeMillis();
      //<key, rid>
      Map<String, OVertex> ridMap = new HashMap<>();
      keyToSearch.forEach((key) -> {
        OResultSet resultSet = session.query("SELECT FROM one WHERE _key = ?", key);
        resultSet.vertexStream().forEach(vertex -> ridMap.put(vertex.getProperty("_key"), vertex));
        resultSet.close();
      });

      for (int i = 0; i < 19; i++) {
        OVertex fromVertex = ridMap.get("a" + i);
        OVertex toVertex = ridMap.get("a" + (i + 1));
        try {
          OEdge edge = session.newEdge(fromVertex, toVertex, tableName);
          edge.setProperty("some", "value");
          edge.save();
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
      session.commit();
      session.close();
    }
  }
}

Anyway from your previous message you said that the server got stuck, in case you reproduced could you get an heap dump and send to us ?

Regards

@blueAnger
Copy link

Hi @tglman
I saw your test code, looks like you're just adding some edges sequently in a single thread. I think I made it quite clear that the exception occurs when concurrently adding edges. Therefore, I change your test code so maybe you can reproduce the exception.

public class TestInsert
{
    private ODatabasePool pool;

    private final String TABLE_V1 = "table1";
    private final String TABLE_V2 = "table2";
    private final String TABLE_E1 = "edge1";
    private final int VERTEX_NUM = 50;
    private final int EDGE_NUM = 1000;
    private final int THREAD_NUM = 10;

    @Before
    public void before() {
        pool = new ODatabasePool("remote:localhost/test", "admin", "admin");

        ODatabaseSession session = pool.acquire();
        Arrays.asList(new ORawPair<>(TABLE_V1, "V"), new ORawPair<>(TABLE_V2, "V"), new ORawPair<>(TABLE_E1, "E")).forEach(pair -> {
            try {
                String table = pair.getFirst();
                String superClass = pair.getSecond();
                OClass oClass = session.getClass(table);
                if (oClass != null)
                {
                    session.command(String.format("TRUNCATE CLASS %s UNSAFE", table)).close();
                    session.command(String.format("DROP CLASS %s IF EXISTS UNSAFE", table)).close();
                }
                session.command(String.format("CREATE CLASS %s EXTENDS %s", table, superClass)).close();
                session.command(String.format("CREATE PROPERTY %s.name STRING ", table)).close();
                session.command(String.format("CREATE PROPERTY %s._key STRING ", table)).close();
                session.command(String.format("CREATE INDEX `%s._key` ON `%s` (_key) UNIQUE_HASH_INDEX", table, table)).close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        session.close();
    }

    @Test
    public void addEdgesConcurrently() {
        ODatabaseSession insertSession = pool.acquire();
        insertSession.begin();
        //<cls, keys>
        Map<String, Set<String>> keysToSearchMap = new HashMap<>();
        Arrays.asList(TABLE_V1, TABLE_V2).forEach(table -> {
            for (int i = 0; i < VERTEX_NUM; ++i) {
                OVertex v = insertSession.newVertex(table);
                String key = String.format("%s_key_%d", table, i);
                Set<String> keys = keysToSearchMap.computeIfAbsent(table, cls -> new HashSet<>());
                keys.add(key);
                v.setProperty("_key", key);
                v.setProperty("name", String.format("%s_name_%d", table, i));
                v.save();
            }
        });
        insertSession.commit();
        insertSession.close();

        System.out.println("======inserting vertices finished=======");

        ExecutorService executorService = Executors.newFixedThreadPool(THREAD_NUM);
        final Random random = new Random(System.currentTimeMillis());
        for(int t = 0; t < THREAD_NUM; ++t)
        {
            executorService.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " start");
                ODatabaseSession session = pool.acquire();
                session.begin();

                //<key, vertex>
                Map<String, OVertex> vertexMap = new HashMap<>();
                keysToSearchMap.forEach((cls, keys) -> {
                    OResultSet resultSet = session.query("SELECT FROM ? WHERE _key in ?", cls, keys);
                    resultSet.vertexStream().forEach(vertex -> vertexMap.put(vertex.getProperty("_key"), vertex));
                    resultSet.close();
                });

                for (int i = 0; i < EDGE_NUM; ++i)
                {
                    int i1 = random.nextInt(VERTEX_NUM);
                    int i2 = random.nextInt(VERTEX_NUM);
                    OVertex fromV1 = vertexMap.get(String.format("%s_key_%d", TABLE_V1, i1));
                    OVertex toV2 = vertexMap.get(String.format("%s_key_%d", TABLE_V2, i2));

                    OEdge edge = session.newEdge(fromV1, toV2, TABLE_E1);
                    edge.setProperty("_key", String.format("%s_key_%d", TABLE_E1, i));
                    edge.setProperty("name", String.format("%s_name_%d", TABLE_E1, i));
                    edge.save();
                }
                session.commit();
                session.close();
                System.out.println(Thread.currentThread().getName() + " finished");
            });
        }
        executorService.shutdown();
        while (!executorService.isTerminated())
        {
            try {
                executorService.awaitTermination(3, TimeUnit.MINUTES);
            } catch (InterruptedException e) {}//ignore
        }
        pool.close();
    }
}

Thanks.

@tglman
Copy link
Member

tglman commented Jan 5, 2018

hi @blueAnger,

I found the reason for this problem, the close of the pool was not rollbacking eventual pending transaction, this was causing the local cache stay active and giving back old record on query, and using old record bring to councurrent modification exception. I fixed this in the last commit, can you double check ?

And as well I would suggest to check if all the methods do commit correctly.

Regards

@blueAnger
Copy link

Hi @tglman
The problem remains. I took a look at your commit. You added rollback(true) in several pool-like classes' close() method. IMHO, that doesn't resolve the concurrent modification problem, since the exception is thrown before invoking pool.close().

Let me re-state the problem here:

  • concurrently adding edge cause OConcurrentModificationException. However, according to the documentation, you use optimistic lock, so I guess it's normal to happen? Or not? Please let me know.
  • when OConcurrentModificationException occurs, the server kinda stuck, but not completely stuck. I can't proceed to query or reconnect against the same database. But other database is still connectable.

I realize my test code had some problem. So maybe you can try the following test code. Hope it helps.

import com.orientechnologies.common.util.ORawPair;
import com.orientechnologies.orient.core.db.ODatabasePool;
import com.orientechnologies.orient.core.db.ODatabaseSession;
import com.orientechnologies.orient.core.exception.OConcurrentModificationException;
import com.orientechnologies.orient.core.metadata.schema.OClass;
import com.orientechnologies.orient.core.record.OEdge;
import com.orientechnologies.orient.core.record.OVertex;
import com.orientechnologies.orient.core.sql.executor.OResultSet;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * Created by thomas on 18/1/5.
 */
public class TestInsert
{
    private ODatabasePool pool;

    private final String TABLE_V1 = "table1";
    private final String TABLE_V2 = "table2";
    private final String TABLE_E1 = "edge1";
    private final int VERTEX_NUM = 50;
    private final int EDGE_NUM = 1000;
    private final int THREAD_NUM = 10;
    /**
     * min retry wait time(mills) when concurrent modification occurs
     */
    private final long MIN_WAIT_TIME = 1000;

    /**
     * max retry wait time(mills) when concurrent modification occurs
     */
    private final long MAX_WAIT_TIME = 3000;

    private final int MAX_RETRY = 3;

    private final Logger LOGGER = LoggerFactory.getLogger(TestInsert.class);

    @Before
    public void before() {
        pool = new ODatabasePool("remote:localhost/test", "root", "admin");

        ODatabaseSession session = pool.acquire();
        Arrays.asList(new ORawPair<>(TABLE_V1, "V"), new ORawPair<>(TABLE_V2, "V"), new ORawPair<>(TABLE_E1, "E")).forEach(pair -> {
            try {
                String table = pair.getFirst();
                String superClass = pair.getSecond();
                OClass oClass = session.getClass(table);
                if (oClass != null)
                {
                    session.command(String.format("TRUNCATE CLASS %s UNSAFE", table)).close();
                    session.command(String.format("DROP CLASS %s IF EXISTS UNSAFE", table)).close();
                }
                session.command(String.format("CREATE CLASS %s EXTENDS %s", table, superClass)).close();
                session.command(String.format("CREATE PROPERTY %s.name STRING ", table)).close();
                session.command(String.format("CREATE PROPERTY %s._key STRING ", table)).close();
                session.command(String.format("CREATE INDEX `%s._key` ON `%s` (_key) UNIQUE_HASH_INDEX", table, table)).close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        session.close();
    }

    @Test
    public void addEdgesConcurrently() {
        ODatabaseSession insertSession = pool.acquire();
        insertSession.begin();
        //<cls, keys>
        Map<String, Set<String>> keysToSearchMap = new HashMap<>();
        Arrays.asList(TABLE_V1, TABLE_V2).forEach(table -> {
            for (int i = 0; i < VERTEX_NUM; ++i) {
                OVertex v = insertSession.newVertex(table);
                String key = String.format("%s_key_%d", table, i);
                Set<String> keys = keysToSearchMap.computeIfAbsent(table, cls -> new HashSet<>());
                keys.add(key);
                v.setProperty("_key", key);
                v.setProperty("name", String.format("%s_name_%d", table, i));
                v.save();
            }
        });
        insertSession.commit();
        insertSession.close();

        LOGGER.info("======inserting vertices finished=======");

        ExecutorService executorService = Executors.newFixedThreadPool(THREAD_NUM);
        final Random random = new Random(System.currentTimeMillis());
        for(int t = 0; t < THREAD_NUM; ++t)
        {
            final int _t = t;
            executorService.execute(() -> {
                LOGGER.info("=======start==========");
                boolean finished = false;
                int time = 0;
                while (!finished && (++time) <= MAX_RETRY)
                {
                    ODatabaseSession session = null;
                    try {
                        session = pool.acquire();
                        session.begin();

                        //<key, vertex>
                        Map<String, OVertex> vertexMap = new HashMap<>();
                        for (Map.Entry<String, Set<String>> entry : keysToSearchMap.entrySet())
                        {
                            OResultSet resultSet = session.query("SELECT FROM ? WHERE _key in ?", entry.getKey(), entry.getValue());
                            resultSet.vertexStream().forEach(vertex -> vertexMap.put(vertex.getProperty("_key"), vertex));
                            resultSet.close();
                        }

                        for (int i = 0; i < EDGE_NUM; ++i)
                        {
                            int i1 = random.nextInt(VERTEX_NUM);
                            int i2 = random.nextInt(VERTEX_NUM);
                            OVertex fromV1 = vertexMap.get(String.format("%s_key_%d", TABLE_V1, i1));
                            OVertex toV2 = vertexMap.get(String.format("%s_key_%d", TABLE_V2, i2));

                            OEdge edge = session.newEdge(fromV1, toV2, TABLE_E1);
                            edge.setProperty("_key", String.format("%s_key_%d", TABLE_E1, _t * EDGE_NUM + i));
                            edge.setProperty("name", String.format("%s_name_%d", TABLE_E1, _t * EDGE_NUM + i));
                            edge.save();
                        }
                        session.commit();
                        finished = true;
                    } catch (OConcurrentModificationException e) {
                        long waitTime = (Math.abs(random.nextLong()) + MIN_WAIT_TIME) % MAX_WAIT_TIME;
                        LOGGER.info("concurrent modification. retry after {} ms", waitTime);
                        finished = false;
                        try {
                            Thread.sleep(waitTime);
                        } catch (InterruptedException ignore) {}
                    } catch (Exception e) {
                        LOGGER.error("", e);
                    } finally {
                        if (session != null) session.close();
                    }
                }
                if(finished) LOGGER.info("======finish=======");
                else LOGGER.info("========stop after {} retries========", MAX_RETRY);
            });
        }
        executorService.shutdown();
        boolean terminated = true;
        try {
            terminated = executorService.awaitTermination(1, TimeUnit.MINUTES);
        } catch (InterruptedException e) {}//ignore
        if(!terminated)
            executorService.shutdownNow();
        pool.close();
    }
}

@tglman
Copy link
Member

tglman commented Jan 8, 2018

Hi @blueAnger,

I got your test and run a few times, i couldn't reproduce the problem, so is quite likely that is fixed now.

OConcurrentModificationException can still happen if you are doing operations concurrently on the same record, for example adding 2 edges to the same record from two different threads.

If you are not able to connect any more to a database after a OConcurrentModificationException is a problem, if this happen to you again would be cool if you can send to us an heap dump of the server.

Regards

@weickdev
Copy link
Author

@tglman I compile the new develop code, server is not hung right now. As you said OConcurrentModificationException can still happen. But I find the document says

Consider the case where multiple clients attempt to add edges on the same vertex. OrientDB could throw the OConcurrentModificationException exception. This occurs because collections of edges are kept on vertices, meaning that, every time OrientDB adds or removes an edge, both vertices update and their versions increment. You can avoid this issue by using RIDBAG Bonsai structure, which are never embedded, so the edge never updates the vertices.

To use this configuration at run-time, before launching OrientDB, use this code:
java -DridBag.embeddedToSbtreeBonsaiThreshold=-1

So I set this parameter in server.sh, but the OConcurrentModificationException is still there. Does not this parameter work now?

@tglman
Copy link
Member

tglman commented Jan 10, 2018

Hi @weickdev,

This is true only if running standalone or embedded, the "RIDBAG Bonsai" mentioned in the documentation is not supported in distributed.

Regards

@weickdev
Copy link
Author

@tglman But I test it in standalone mode.

@blueAnger
Copy link

blueAnger commented Jan 17, 2018

Hi @tglman ,
I know why your commits didn't work in my server. It turns out if I set -Dstorage.useWAL=false in my bin/server.sh, then when OConcurrentModificationException occurs, the server is kinda stuck and the same database is not connectable. For what I know, I posed the JAVA_OPTS in my bin/server.sh early enough, but it seems that no one paid much attention.

Still, according to what weickdev said, the -DridBag.embeddedToSbtreeBonsaiThreshold=-1 didn't work because the OConcurrentModificationException still occurs in standalone mode.

Thanks.

@lvca lvca modified the milestones: 3.0.0, 3.0.x Apr 9, 2018
@royalpinto
Copy link

Any updates on this?

With the fix for OrientJS queries getting queued up: orientechnologies/orientjs@5acca8b I am also facing the same OConcurrentModificationException issue.

Any workaround for this ?

@myukselen
Copy link

With python goblin ORM, I have stuck in this issue also.

Each request is handled with different OrientGraph instance that is inside OrientStandardGraph.
Each have local cache and I can find the same vertex with different versions inside these caches.

After adding edge an edge, the vertex version changes. However other caches retain old versions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

No branches or pull requests

7 participants