Skip to content

Commit

Permalink
[rrd4j] Write asynchronously to database (openhab#14913)
Browse files Browse the repository at this point in the history
* [rrd4j] Write asynchronously to database
* add timestamps to log message

---------

Signed-off-by: Jan N. Klug <[email protected]>
Signed-off-by: Matt Myers <[email protected]>
  • Loading branch information
J-N-K authored and matchews committed Aug 9, 2023
1 parent ea3aa0f commit 3b5e154
Showing 1 changed file with 51 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,52 @@ public String getLabel(@Nullable Locale locale) {
}

@Override
public synchronized void store(final Item item, @Nullable final String alias) {
public void store(final Item item, @Nullable final String alias) {
if (!isSupportedItemType(item)) {
logger.trace("Ignoring item '{}' since its type {} is not supported", item.getName(), item.getType());
return;
}
final String name = alias == null ? item.getName() : alias;

Double value;

if (item instanceof NumberItem && item.getState() instanceof QuantityType) {
NumberItem nItem = (NumberItem) item;
QuantityType<?> qState = (QuantityType<?>) item.getState();
Unit<? extends Quantity<?>> unit = nItem.getUnit();
if (unit != null) {
QuantityType<?> convertedState = qState.toUnit(unit);
if (convertedState != null) {
value = convertedState.doubleValue();
} else {
value = null;
logger.warn(
"Failed to convert state '{}' to unit '{}'. Please check your item definition for correctness.",
qState, unit);
}
} else {
value = qState.doubleValue();
}
} else {
DecimalType state = item.getStateAs(DecimalType.class);
if (state != null) {
value = state.toBigDecimal().doubleValue();
} else {
value = null;
}
}

if (value == null) {
// we could not convert the value
return;
}

long now = System.currentTimeMillis() / 1000;

scheduler.schedule(() -> internalStore(name, value, now, true), 0, TimeUnit.SECONDS);
}

private synchronized void internalStore(String name, double value, long now, boolean retry) {
RrdDb db = null;
try {
db = getDB(name);
Expand All @@ -163,7 +202,6 @@ public synchronized void store(final Item item, @Nullable final String alias) {
}

ConsolFun function = getConsolidationFunction(db);
long now = System.currentTimeMillis() / 1000;
if (function != ConsolFun.AVERAGE) {
try {
// we store the last value again, so that the value change
Expand All @@ -177,7 +215,8 @@ public synchronized void store(final Item item, @Nullable final String alias) {
sample.setTime(now - 1);
sample.setValue(DATASOURCE_STATE, lastValue);
sample.update();
logger.debug("Stored '{}' as value '{}' in rrd4j database (again)", name, lastValue);
logger.debug("Stored '{}' as value '{}' with timestamp {} in rrd4j database (again)", name,
lastValue, now - 1);
}
}
} catch (IOException e) {
Expand All @@ -187,50 +226,24 @@ public synchronized void store(final Item item, @Nullable final String alias) {
try {
Sample sample = db.createSample();
sample.setTime(now);

Double value = null;

if (item instanceof NumberItem && item.getState() instanceof QuantityType) {
NumberItem nItem = (NumberItem) item;
QuantityType<?> qState = (QuantityType<?>) item.getState();
Unit<? extends Quantity<?>> unit = nItem.getUnit();
if (unit != null) {
QuantityType<?> convertedState = qState.toUnit(unit);
if (convertedState != null) {
value = convertedState.doubleValue();
} else {
logger.warn(
"Failed to convert state '{}' to unit '{}'. Please check your item definition for correctness.",
qState, unit);
}
} else {
value = qState.doubleValue();
}
} else {
DecimalType state = item.getStateAs(DecimalType.class);
if (state != null) {
value = state.toBigDecimal().doubleValue();
}
}
if (value != null) {
if (db.getDatasource(DATASOURCE_STATE).getType() == DsType.COUNTER) { // counter values must be
// adjusted by stepsize
value = value * db.getRrdDef().getStep();
}
sample.setValue(DATASOURCE_STATE, value);
sample.update();
logger.debug("Stored '{}' as value '{}' in rrd4j database", name, value);
double storeValue = value;
if (db.getDatasource(DATASOURCE_STATE).getType() == DsType.COUNTER) { // counter values must be
// adjusted by stepsize
storeValue = value * db.getRrdDef().getStep();
}
sample.setValue(DATASOURCE_STATE, storeValue);
sample.update();
logger.debug("Stored '{}' as value '{}' with timestamp {} in rrd4j database", name, storeValue, now);
} catch (IllegalArgumentException e) {
String message = e.getMessage();
if (message != null && message.contains("at least one second step is required")) {
if (message != null && message.contains("at least one second step is required") && retry) {
// we try to store the value one second later
ScheduledFuture<?> job = scheduledJobs.get(name);
if (job != null) {
job.cancel(true);
scheduledJobs.remove(name);
}
job = scheduler.schedule(() -> store(item, name), 1, TimeUnit.SECONDS);
job = scheduler.schedule(() -> internalStore(name, value, now + 1, false), 1, TimeUnit.SECONDS);
scheduledJobs.put(name, job);
} else {
logger.warn("Could not persist '{}' to rrd4j database: {}", name, e.getMessage());
Expand Down

0 comments on commit 3b5e154

Please sign in to comment.