Skip to content

Commit

Permalink
Merge branch 'master' into version
Browse files Browse the repository at this point in the history
# Conflicts:
#	jdbc/spring-data-jpa/pom.xml
  • Loading branch information
KirillKurdyukov committed Nov 18, 2024
2 parents e36ebc9 + c49e92b commit a71c357
Show file tree
Hide file tree
Showing 11 changed files with 208 additions and 143 deletions.
2 changes: 1 addition & 1 deletion jdbc/spring-data-jpa/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<properties>
<maven.compiler.release>17</maven.compiler.release>
<kotlin.version>1.9.22</kotlin.version>
<hibernate.ydb.dialect.version>1.1.0</hibernate.ydb.dialect.version>
<hibernate.ydb.dialect.version>1.4.0</hibernate.ydb.dialect.version>
<spring.boot.version>3.2.1</spring.boot.version>
</properties>
<dependencies>
Expand Down
15 changes: 13 additions & 2 deletions ydb-cookbook/src/main/java/tech/ydb/examples/SimpleExample.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,27 @@
package tech.ydb.examples;

import java.time.Duration;

import tech.ydb.auth.iam.CloudAuthHelper;
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.scheme.SchemeClient;
import tech.ydb.scheme.description.DescribePathResult;
import tech.ydb.topic.TopicClient;
import tech.ydb.topic.description.Codec;
import tech.ydb.topic.description.Consumer;
import tech.ydb.topic.description.SupportedCodecs;
import tech.ydb.topic.settings.CreateTopicSettings;


/**
* @author Sergey Polovko
* @author Nikolay Perfilov
*/
public abstract class SimpleExample {
protected static final String TOPIC_NAME = "test-topic";
protected static final String CONSUMER_NAME = "test-consumer";
protected static final String TOPIC_NAME = System.getenv("YDB_TOPIC_NAME");
protected static final String CONSUMER_NAME = System.getenv("YDB_CONSUMER_NAME");

protected void doMain(String[] args) {
if (args.length > 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ public void onPartitionSessionClosed(PartitionSessionClosedEvent event) {
@Override
public void onReaderClosed(ReaderClosedEvent event) {
logger.info("Reader is closed.");
if (!messageReceivedFuture.isDone()) {
messageReceivedFuture.complete(null);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.examples.SimpleExample;
import tech.ydb.topic.TopicClient;
import tech.ydb.topic.description.MetadataItem;
import tech.ydb.topic.read.DecompressionException;
import tech.ydb.topic.read.Message;
import tech.ydb.topic.read.SyncReader;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package tech.ydb.examples.topic;

import java.nio.charset.StandardCharsets;
import java.text.DecimalFormat;
import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -109,6 +110,9 @@ protected void run(GrpcTransport transport, String pathPrefix) {
}
logger.info("Received a signal to stop writing");

// Wait for all writes to receive a WriteAck before shutting down writer
writer.flush();

try {
writer.shutdown(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (TimeoutException exception) {
Expand Down Expand Up @@ -240,7 +244,7 @@ private class Handler extends AbstractReadEventHandler {
public void onMessages(DataReceivedEvent event) {
for (tech.ydb.topic.read.Message message : event.getMessages()) {
messagesReceived.incrementAndGet();
if (logger.isTraceEnabled()) {
if (logger.isDebugEnabled()) {
StringBuilder str = new StringBuilder("Message received");
str.append("\n");
str.append(" offset: ").append(message.getOffset()).append("\n")
Expand All @@ -251,13 +255,22 @@ public void onMessages(DataReceivedEvent event) {
.append(" writtenAt: ").append(message.getWrittenAt()).append("\n")
.append(" partitionSession: ").append(message.getPartitionSession().getId()).append("\n")
.append(" partitionId: ").append(message.getPartitionSession().getPartitionId())
.append("\n")
.append(" metadataItems: ")
.append("\n");
message.getMetadataItems().forEach(item -> str
.append(" key: \"")
.append(item.getKey())
.append("\", value: \"")
.append(new String(item.getValue(), StandardCharsets.UTF_8))
.append("\"")
.append("\n"));
if (!message.getWriteSessionMeta().isEmpty()) {
str.append(" writeSessionMeta:\n");
message.getWriteSessionMeta().forEach((key, value) ->
str.append(" ").append(key).append(": ").append(value).append("\n"));
}
logger.trace(str.toString());
logger.debug(str.toString());
} else {
logger.debug("Message received. SeqNo={}, offset={}", message.getSeqNo(), message.getOffset());
}
Expand Down
38 changes: 31 additions & 7 deletions ydb-cookbook/src/main/java/tech/ydb/examples/topic/WriteAsync.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package tech.ydb.examples.topic;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -23,6 +24,8 @@
*/
public class WriteAsync extends SimpleExample {
private static final Logger logger = LoggerFactory.getLogger(WriteAsync.class);
private static final int MESSAGES_COUNT = 5;
private static final int WAIT_TIMEOUT_SECONDS = 60;

@Override
protected void run(GrpcTransport transport, String pathPrefix) {
Expand Down Expand Up @@ -52,14 +55,17 @@ protected void run(GrpcTransport transport, String pathPrefix) {
return null;
});

for (int i = 1; i <= 5; i++) {
// A latch to wait for all writes to receive a WriteAck before shutting down writer
CountDownLatch writesInProgress = new CountDownLatch(MESSAGES_COUNT);

for (int i = 1; i <= MESSAGES_COUNT; i++) {
final int index = i;
try {
String messageString = "message" + i;
// Blocks until the message is put into sending buffer
writer.send(Message.of(messageString.getBytes())).whenComplete((result, ex) -> {
if (ex != null) {
logger.error("Exception while sending message {}: ", index, ex);
logger.error("Exception while sending a message {}: ", index, ex);
} else {
logger.info("Message {} ack received", index);

Expand All @@ -76,20 +82,38 @@ protected void run(GrpcTransport transport, String pathPrefix) {
break;
}
}
writesInProgress.countDown();
});
} catch (QueueOverflowException exception) {
logger.error("Queue overflow exception while sending message{}: ", index, exception);
// Send queue is full. Need retry with backoff or skip
logger.error("Queue overflow exception while sending a message{}: ", index, exception);
// Send queue is full. Need to retry with backoff or skip
writesInProgress.countDown();
}

logger.info("Message {} is sent", index);
}

long timeoutSeconds = 10;
try {
writer.shutdown().get(timeoutSeconds, TimeUnit.SECONDS);
while (!writesInProgress.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
logger.error("Writes are not finished in {} seconds", WAIT_TIMEOUT_SECONDS);
}
} catch (InterruptedException exception) {
logger.error("Waiting for writes to finish was interrupted: ", exception);
}

try {
if (!writesInProgress.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
logger.error("Writes are not finished in {} seconds", WAIT_TIMEOUT_SECONDS);
}
} catch (InterruptedException exception) {
logger.error("Waiting for writes to finish was interrupted: ", exception);
}

try {
writer.shutdown().get(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (TimeoutException exception) {
logger.error("Timeout exception during writer termination ({} seconds): ", timeoutSeconds, exception);
logger.error("Timeout exception during writer termination ({} seconds): ", WAIT_TIMEOUT_SECONDS,
exception);
} catch (ExecutionException exception) {
logger.error("Execution exception during writer termination: ", exception);
} catch (InterruptedException exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ protected void run(GrpcTransport transport, String pathPrefix) {
}
}

// Wait for all writes to receive a WriteAck before shutting down writer
writer.flush();
logger.info("Flush finished");

long shutdownTimeoutSeconds = 10;
try {
writer.shutdown(shutdownTimeoutSeconds, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import tech.ydb.table.impl.PooledTableClient;
import tech.ydb.table.rpc.grpc.GrpcTableRpc;
import tech.ydb.table.transaction.TableTransaction;
import tech.ydb.table.transaction.Transaction;
import tech.ydb.table.transaction.TxControl;
import tech.ydb.topic.TopicClient;
import tech.ydb.topic.read.AsyncReader;
import tech.ydb.topic.read.DecompressionException;
Expand All @@ -42,7 +40,7 @@
public class TransactionReadAsync extends SimpleExample {
private static final Logger logger = LoggerFactory.getLogger(TransactionReadAsync.class);
private static final long MAX_MEMORY_USAGE_BYTES = 500 * 1024 * 1024; // 500 Mb
private static final int MESSAGES_COUNT = 5;
private static final int MESSAGES_COUNT = 1;

private final CompletableFuture<Void> messageReceivedFuture = new CompletableFuture<>();
private TableClient tableClient;
Expand Down Expand Up @@ -135,11 +133,13 @@ public void onMessages(DataReceivedEvent event) {
// creating session and transaction
Result<Session> sessionResult = tableClient.createSession(Duration.ofSeconds(10)).join();
if (!sessionResult.isSuccess()) {
logger.error("Couldn't get session from pool: {}", sessionResult);
logger.error("Couldn't get a session from the pool: {}", sessionResult);
return; // retry or shutdown
}
Session session = sessionResult.getValue();
TableTransaction transaction = session.createNewTransaction(TxMode.SERIALIZABLE_RW);
TableTransaction transaction = session.beginTransaction(TxMode.SERIALIZABLE_RW)
.join()
.getValue();

// do something else in transaction
transaction.executeDataQuery("SELECT 1").join();
Expand Down Expand Up @@ -199,6 +199,7 @@ public void onPartitionSessionClosed(PartitionSessionClosedEvent event) {
@Override
public void onReaderClosed(ReaderClosedEvent event) {
logger.info("Reader is closed.");
messageReceivedFuture.complete(null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
import tech.ydb.table.Session;
import tech.ydb.table.TableClient;
import tech.ydb.table.transaction.TableTransaction;
import tech.ydb.table.transaction.Transaction;
import tech.ydb.table.transaction.TxControl;
import tech.ydb.topic.TopicClient;
import tech.ydb.topic.read.DecompressionException;
import tech.ydb.topic.read.Message;
Expand Down Expand Up @@ -49,37 +47,34 @@ protected void run(GrpcTransport transport, String pathPrefix) {
reader.init();

try {
// Reading 5 messages
for (int i = 0; i < 5; i++) {
// creating session and transaction
Result<Session> sessionResult = tableClient.createSession(Duration.ofSeconds(10)).join();
if (!sessionResult.isSuccess()) {
logger.error("Couldn't get session from pool: {}", sessionResult);
return; // retry or shutdown
}
Session session = sessionResult.getValue();
TableTransaction transaction = session.createNewTransaction(TxMode.SERIALIZABLE_RW);

// do something else in transaction
transaction.executeDataQuery("SELECT 1").join();
// analyzeQueryResultIfNeeded();
// creating session and transaction
Result<Session> sessionResult = tableClient.createSession(Duration.ofSeconds(10)).join();
if (!sessionResult.isSuccess()) {
logger.error("Couldn't a get session from the pool: {}", sessionResult);
return; // retry or shutdown
}
Session session = sessionResult.getValue();
TableTransaction transaction = session.createNewTransaction(TxMode.SERIALIZABLE_RW);

//Session session
Message message = reader.receive(ReceiveSettings.newBuilder()
.setTransaction(transaction)
.build());
byte[] messageData;
try {
messageData = message.getData();
} catch (DecompressionException e) {
logger.warn("Decompression exception while receiving a message: ", e);
messageData = e.getRawData();
}
logger.info("Message received: {}", new String(messageData, StandardCharsets.UTF_8));
// do something else in transaction
transaction.executeDataQuery("SELECT 1").join();
// analyzeQueryResultIfNeeded();

transaction.commit().join();
// analyze commit status
//Session session
Message message = reader.receive(ReceiveSettings.newBuilder()
.setTransaction(transaction)
.build());
byte[] messageData;
try {
messageData = message.getData();
} catch (DecompressionException e) {
logger.warn("Decompression exception while receiving a message: ", e);
messageData = e.getRawData();
}
logger.info("Message received: {}", new String(messageData, StandardCharsets.UTF_8));

transaction.commit().join();
// analyze commit status
} catch (InterruptedException exception) {
logger.error("Interrupted exception while waiting for message: ", exception);
}
Expand Down
Loading

0 comments on commit a71c357

Please sign in to comment.