Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🚧 Reintegration OSMSource, OSHDBUpdate, OSHDBTools #502

Draft
wants to merge 31 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions data/replication/state.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#Wed Mar 15 18:02:09 UTC 2023
sequenceNumber=92075
timestamp=2023-03-15T18\:00\:00Z
Binary file added data/sample.pbf
Binary file not shown.
Binary file modified data/test-data.mv.db
Binary file not shown.
76 changes: 76 additions & 0 deletions oshdb-ignite/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.heigit.ohsome</groupId>
<artifactId>oshdb-parent</artifactId>
<version>1.2.0-SNAPSHOT</version>
</parent>

<artifactId>oshdb-ignite</artifactId>

<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>2022.0.3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>


<dependencies>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>oshdb-api-ignite</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>me.tongfei</groupId>
<artifactId>progressbar</artifactId>
<version>0.9.5</version>
</dependency>

<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/com.zaxxer/HikariCP -->
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>5.0.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.duckdb/duckdb_jdbc -->
<dependency>
<groupId>org.duckdb</groupId>
<artifactId>duckdb_jdbc</artifactId>
<version>0.7.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.arrow/arrow-vector -->
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>11.0.0</version>
</dependency>


</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.heigit.ohsome.oshdb.ignite.progress;

import java.time.Duration;
import me.tongfei.progressbar.ProgressBar;
import me.tongfei.progressbar.ProgressBarBuilder;

public class ProgressUtil {

private ProgressUtil(){
// utility class
}

public static ProgressBar progressBar(String name, int size, Duration updateInterval) {
return new ProgressBarBuilder().setTaskName(name)
.setUpdateIntervalMillis((int)updateInterval.toMillis())
.setInitialMax(size)
.setConsumer(new SysOutProgressConsumer(60))
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.heigit.ohsome.oshdb.ignite.progress;

import me.tongfei.progressbar.ProgressBarConsumer;

public class SysOutProgressConsumer implements ProgressBarConsumer {
private final int maxLength;

public SysOutProgressConsumer(int maxLength) {
this.maxLength = maxLength;
}

@Override
public int getMaxRenderedLength() {
return maxLength;
}

@Override
public void accept(String rendered) {
System.out.println(rendered);
}

@Override
public void close() {
// no/op
}
}
42 changes: 42 additions & 0 deletions oshdb-rocksdb/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.heigit.ohsome</groupId>
<artifactId>oshdb-parent</artifactId>
<version>1.2.0-SNAPSHOT</version>
</parent>

<artifactId>oshdb-rocksdb</artifactId>
<name>OSHDB RocksDB Module</name>

<properties>
<rocksdb.version>7.9.2</rocksdb.version>
</properties>

<dependencies>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>oshdb-store</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>${rocksdb.version}</version>
</dependency>

<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>${h2.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package org.heigit.ohsome.oshdb.rocksdb;

import static com.google.common.collect.Streams.zip;
import static java.util.Collections.emptySet;
import static java.util.Map.entry;
import static java.util.Optional.ofNullable;
import static org.heigit.ohsome.oshdb.rocksdb.RocksDBUtil.idToKey;
import static org.heigit.ohsome.oshdb.rocksdb.RocksDBUtil.idsToKeys;

import com.google.common.collect.Maps;
import com.google.common.collect.Streams;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.heigit.ohsome.oshdb.store.BackRef;
import org.heigit.ohsome.oshdb.store.BackRefType;
import org.rocksdb.Cache;
import org.rocksdb.Options;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.StringAppendOperator;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;

public class BackRefStore implements AutoCloseable {

private final BackRefType type;
private final Options dbOptions;
private final RocksDB db;

public BackRefStore(BackRefType type, Path path, Cache cache) throws IOException, RocksDBException {
Files.createDirectories(path);
this.type = type;

this.dbOptions = RocksDBUtil.defaultOptions();
dbOptions.setMergeOperator(new StringAppendOperator((char) 0));
dbOptions.unorderedWrite();

try {
db = RocksDB.open(dbOptions, path.toString());
} catch (RocksDBException e) {
close();
throw e;
}
}

public Map<Long, Set<Long>> backRefs(Set<Long> ids) throws RocksDBException {
var keys = idsToKeys(ids, ids.size());
try (var ro = new ReadOptions()) {
var backRefIfs = db.multiGetAsList(keys);
var map = Maps.<Long, Set<Long>>newHashMapWithExpectedSize(ids.size());
zip(ids.stream(), backRefIfs.stream(), (id, backRef) -> entry(id, keysToSet(backRef)))
.forEach(entry -> map.put(entry.getKey(), entry.getValue()));
return map;
}
}

private Set<Long> keysToSet(byte[] backRefIds) {
if (backRefIds == null){
return emptySet();
}
var bb = ByteBuffer.wrap(backRefIds);
var set = new TreeSet<Long>();
set.add(bb.getLong());
while (bb.hasRemaining()) {
bb.get(); // delimiter;
set.add(bb.getLong());
}
return set;
}

public void update(List<BackRef> backRefs) throws RocksDBException {
throw new UnsupportedOperationException("not yet implemented");
}

@Override
public void close() {
ofNullable(db).ifPresent(RocksDB::close);
dbOptions.close();
}

@Override
public String toString() {
return "BackRefStore " + type;
}

public void merge(long backRef, Set<Long> ids) throws RocksDBException {
var backRefKey = idToKey(backRef);
try ( var wo = new WriteOptions().setDisableWAL(true);
var wb = new WriteBatch()) {
for (var id : ids) {
var key = idToKey(id);
wb.merge(key, backRefKey);
}
db.write(wo, wb);
}
}
}
Loading