Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleaner version of #684 #702

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 43 additions & 61 deletions src/main/java/org/influxdb/BatchOptions.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package org.influxdb;

import org.influxdb.dto.Point;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.influxdb.dto.Point;

/**
* BatchOptions are used to configure batching of individual data point writes
* into InfluxDB. See {@link InfluxDB#enableBatch(BatchOptions)}
* BatchOptions are used to configure batching of individual data point writes into InfluxDB. See
* {@link InfluxDB#enableBatch(BatchOptions)}
*/
public final class BatchOptions implements Cloneable {

Expand All @@ -22,11 +21,9 @@ public final class BatchOptions implements Cloneable {
public static final TimeUnit DEFAULT_PRECISION = TimeUnit.NANOSECONDS;
public static final boolean DEFAULT_DROP_ACTIONS_ON_QUEUE_EXHAUSTION = false;


/**
* Default batch options. This class is immutable, each configuration
* is built by taking the DEFAULTS and setting specific configuration
* properties.
* Default batch options. This class is immutable, each configuration is built by taking the
* DEFAULTS and setting specific configuration properties.
*/
public static final BatchOptions DEFAULTS = new BatchOptions();

Expand All @@ -36,17 +33,14 @@ public final class BatchOptions implements Cloneable {
private int bufferLimit = DEFAULT_BUFFER_LIMIT;
private TimeUnit precision = DEFAULT_PRECISION;
private boolean dropActionsOnQueueExhaustion = DEFAULT_DROP_ACTIONS_ON_QUEUE_EXHAUSTION;
private Consumer<Point> droppedActionHandler = (point) -> {
};
private Consumer<Point> droppedActionHandler = (point) -> { };

private ThreadFactory threadFactory = Executors.defaultThreadFactory();
BiConsumer<Iterable<Point>, Throwable> exceptionHandler = (points, throwable) -> {
};
BiConsumer<Iterable<Point>, Throwable> exceptionHandler = (points, throwable) -> { };

private InfluxDB.ConsistencyLevel consistency = InfluxDB.ConsistencyLevel.ONE;

private BatchOptions() {
}
private BatchOptions() { }

/**
* @param actions the number of actions to collect
Expand All @@ -69,9 +63,9 @@ public BatchOptions flushDuration(final int flushDuration) {
}

/**
* Jitters the batch flush interval by a random amount. This is primarily to avoid
* large write spikes for users running a large number of client instances.
* ie, a jitter of 5s and flush duration 10s means flushes will happen every 10-15s.
* Jitters the batch flush interval by a random amount. This is primarily to avoid large write
* spikes for users running a large number of client instances. ie, a jitter of 5s and flush
* duration 10s means flushes will happen every 10-15s.
*
* @param jitterDuration (milliseconds)
* @return the BatchOptions instance to be able to use it in a fluent manner.
Expand All @@ -83,11 +77,12 @@ public BatchOptions jitterDuration(final int jitterDuration) {
}

/**
* The client maintains a buffer for failed writes so that the writes will be retried later on. This may
* help to overcome temporary network problems or InfluxDB load spikes.
* When the buffer is full and new points are written, oldest entries in the buffer are lost.
* The client maintains a buffer for failed writes so that the writes will be retried later on.
* This may help to overcome temporary network problems or InfluxDB load spikes. When the buffer
* is full and new points are written, oldest entries in the buffer are lost.
*
* To disable this feature set buffer limit to a value smaller than {@link BatchOptions#getActions}
* <p>To disable this feature set buffer limit to a value smaller than {@link
* BatchOptions#getActions}
*
* @param bufferLimit maximum number of points stored in the retry buffer
* @return the BatchOptions instance to be able to use it in a fluent manner.
Expand All @@ -112,15 +107,16 @@ public BatchOptions threadFactory(final ThreadFactory threadFactory) {
* @param exceptionHandler a consumer function to handle asynchronous errors
* @return the BatchOptions instance to be able to use it in a fluent manner.
*/
public BatchOptions exceptionHandler(final BiConsumer<Iterable<Point>, Throwable> exceptionHandler) {
public BatchOptions exceptionHandler(
final BiConsumer<Iterable<Point>, Throwable> exceptionHandler) {
BatchOptions clone = getClone();
clone.exceptionHandler = exceptionHandler;
return clone;
}

/**
* @param consistency cluster consistency setting (how many nodes have to store data points
* to treat a write as a success)
* @param consistency cluster consistency setting (how many nodes have to store data points to
* treat a write as a success)
* @return the BatchOptions instance to be able to use it in a fluent manner.
*/
public BatchOptions consistency(final InfluxDB.ConsistencyLevel consistency) {
Expand All @@ -130,7 +126,9 @@ public BatchOptions consistency(final InfluxDB.ConsistencyLevel consistency) {
}

/**
* Set the time precision to use for the whole batch. If unspecified, will default to {@link TimeUnit#NANOSECONDS}.
* Set the time precision to use for the whole batch. If unspecified, will default to {@link
* TimeUnit#NANOSECONDS}.
*
* @param precision sets the precision to use
* @return the BatchOptions instance to be able to use it in a fluent manner.
*/
Expand All @@ -141,10 +139,11 @@ public BatchOptions precision(final TimeUnit precision) {
}

/**
* Set to define the behaviour when the action queue exhausts. If unspecified, will default to false which means
* that the {@link InfluxDB#write(Point)} will be blocked till the space in the queue is created.
* true means that the newer actions being written to the queue will be dropped and
* {@link BatchOptions#droppedActionHandler} will be called.
* Set to define the behaviour when the action queue exhausts. If unspecified, will default to
* false which means that the {@link InfluxDB#write(Point)} will be blocked till the space in the
* queue is created. true means that the newer actions being written to the queue will be dropped
* and {@link BatchOptions#droppedActionHandler} will be called.
*
* @param dropActionsOnQueueExhaustion sets the behavior
* @return the BatchOptions instance to be able to use it in a fluent manner.
*/
Expand All @@ -155,8 +154,9 @@ public BatchOptions dropActionsOnQueueExhaustion(final boolean dropActionsOnQueu
}

/**
* Handler to handle dropped actions due to queue actions. This is only valid when
* {@link BatchOptions#dropActionsOnQueueExhaustion} is set to true.
* Handler to handle dropped actions due to queue actions. This is only valid when {@link
* BatchOptions#dropActionsOnQueueExhaustion} is set to true.
*
* @param droppedActionHandler to handle action drops on action queue exhaustion.
* @return the BatchOptions instance to be able to use it in a fluent manner.
*/
Expand All @@ -166,75 +166,58 @@ public BatchOptions droppedActionHandler(final Consumer<Point> droppedActionHand
return clone;
}


/**
* @return actions the number of actions to collect
*/
/** @return actions the number of actions to collect */
public int getActions() {
return actions;
}

/**
* @return flushDuration the time to wait at most (milliseconds).
*/
/** @return flushDuration the time to wait at most (milliseconds). */
public int getFlushDuration() {
return flushDuration;
}

/**
* @return batch flush interval jitter value (milliseconds)
*/
/** @return batch flush interval jitter value (milliseconds) */
public int getJitterDuration() {
return jitterDuration;
}

/**
* @return Maximum number of points stored in the retry buffer, see {@link BatchOptions#bufferLimit(int)}
* @return Maximum number of points stored in the retry buffer, see {@link
* BatchOptions#bufferLimit(int)}
*/
public int getBufferLimit() {
return bufferLimit;
}

/**
* @return a ThreadFactory instance to be used
*/
/** @return a ThreadFactory instance to be used */
public ThreadFactory getThreadFactory() {
return threadFactory;
}

/**
* @return a consumer function to handle asynchronous errors
*/
/** @return a consumer function to handle asynchronous errors */
public BiConsumer<Iterable<Point>, Throwable> getExceptionHandler() {
return exceptionHandler;
}

/**
* @return cluster consistency setting (how many nodes have to store data points
* to treat a write as a success)
* @return cluster consistency setting (how many nodes have to store data points to treat a write
* as a success)
*/
public InfluxDB.ConsistencyLevel getConsistency() {
return consistency;
}

/**
* @return the time precision
*/
/** @return the time precision */
public TimeUnit getPrecision() {
return precision;
}


/**
* @return a boolean determining whether to drop actions on action queue exhaustion.
*/
/** @return a boolean determining whether to drop actions on action queue exhaustion. */
public boolean isDropActionsOnQueueExhaustion() {
return dropActionsOnQueueExhaustion;
}

/**
* @return a consumer function to handle actions drops on action queue exhaustion.
*/
/** @return a consumer function to handle actions drops on action queue exhaustion. */
public Consumer<Point> getDroppedActionHandler() {
return droppedActionHandler;
}
Expand All @@ -246,5 +229,4 @@ private BatchOptions getClone() {
throw new RuntimeException(e);
}
}

}
4 changes: 1 addition & 3 deletions src/main/java/org/influxdb/BuilderException.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
*/
public class BuilderException extends RuntimeException {

/**
* Generated serial version UID.
*/
/** Generated serial version UID. */
private static final long serialVersionUID = 4178882805281378918L;

public BuilderException(final String message) {
Expand Down
Loading