Skip to content

Commit

Permalink
Improve JdbcConsumer exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
aiguofer committed Dec 22, 2023
1 parent 87971df commit 2542d45
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import org.apache.arrow.adapter.jdbc.consumer.CompositeJdbcConsumer;
import org.apache.arrow.adapter.jdbc.consumer.JdbcConsumer;
import org.apache.arrow.adapter.jdbc.consumer.JdbcConsumerException;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.FieldVector;
Expand Down Expand Up @@ -114,7 +115,11 @@ private void consumeData(VectorSchemaRoot root) {
root.setRowCount(readRowCount);
} catch (Throwable e) {
compositeConsumer.close();
throw new RuntimeException("Error occurred while consuming data.", e);
if (e instanceof JdbcConsumerException) {
throw (JdbcConsumerException) e;
} else {
throw new RuntimeException("Error occurred while consuming data.", e);
}
}
}

Expand Down Expand Up @@ -178,7 +183,11 @@ public VectorSchemaRoot next() {
return ret;
} catch (Exception e) {
close();
throw new RuntimeException("Error occurred while getting next schema root.", e);
if (e instanceof JdbcConsumerException) {
throw (JdbcConsumerException) e;
} else {
throw new RuntimeException("Error occurred while getting next schema root.", e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import java.sql.ResultSet;
import java.sql.SQLException;

import org.apache.arrow.adapter.jdbc.JdbcFieldInfo;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.ArrowType;

/**
* Composite consumer which hold all consumers.
Expand All @@ -43,7 +45,18 @@ public CompositeJdbcConsumer(JdbcConsumer[] consumers) {
@Override
public void consume(ResultSet rs) throws SQLException, IOException {
for (int i = 0; i < consumers.length; i++) {
consumers[i].consume(rs);
try {
consumers[i].consume(rs);
} catch (Exception e) {
if (consumers[i] instanceof BaseConsumer) {
BaseConsumer consumer = (BaseConsumer) consumers[i];
JdbcFieldInfo fieldInfo = new JdbcFieldInfo(rs.getMetaData(), consumer.columnIndexInResultSet);
ArrowType arrowType = consumer.vector.getMinorType().getType();
throw new JdbcConsumerException("Exception while consuming JDBC value", e, fieldInfo, arrowType);
} else {
throw e;
}
}
}
}

Expand Down Expand Up @@ -74,3 +87,26 @@ public void resetVectorSchemaRoot(VectorSchemaRoot root) {
}
}
}

/**
* Exception while consuming JDBC data. This exception stores the JdbcFieldInfo for the column and the
* ArrowType for the corresponding vector for easier debugging.
*/
public class JdbcConsumerException extends RuntimeException {
final JdbcFieldInfo fieldInfo;
final ArrowType arrowType;

public JdbcConsumerException(String message, Throwable cause, JdbcFieldInfo fieldInfo, ArrowType arrowType) {
super(message, cause);
this.fieldInfo = fieldInfo;
this.arrowType = arrowType;
}

public ArrowType getArrowType() {
return this.arrowType;
}

public JdbcFieldInfo getFieldInfo() {
return this.fieldInfo;
}
}

0 comments on commit 2542d45

Please sign in to comment.