Skip to content

Commit

Permalink
fix googleapis#3880 googleapis#2796 disable automatic flush on write …
Browse files Browse the repository at this point in the history
…to avoid hanging and leaking threads as grpc also uses this appender which goes in to deadlock over time.
  • Loading branch information
ajaaym committed Dec 26, 2018
1 parent d18c996 commit 2fc12c3
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,14 @@ public static EntryListOption filter(String filter) {
Synchronicity getWriteSynchronicity();

/**
* Sets flush severity for asynchronous logging writes. Default is ERROR. Logs will be immediately
* written out for entries at or higher than flush severity.
* Sets flush severity for asynchronous logging writes. It is disabled by default, enabled when
* this method is called with not null value. Logs will be immediately written out for entries at
* or higher than flush severity.
*
* Enabling this can cause the leaking and hanging threads, see BUG(2796) BUG(3880). However you
* can explicitly call {@link #flush}.
*
* TODO: Enable this by default once functionality to trigger rpc is available in generated code.
*/
void setFlushSeverity(Severity flushSeverity);

Expand Down Expand Up @@ -664,7 +670,7 @@ ApiFuture<AsyncPage<MonitoredResourceDescriptor>> listMonitoredResourceDescripto
/**
* Flushes any pending asynchronous logging writes. Logs are automatically flushed based on time
* and message count that be configured via {@link com.google.api.gax.batching.BatchingSettings},
* Logs are also flushed if at or above flush severity, see {@link #setFlushSeverity}. Logging
* Logs are also flushed if enabled, at or above flush severity, see {@link #setFlushSeverity}. Logging
* frameworks require support for an explicit flush. See usage in the java.util.logging
* handler{@link LoggingHandler}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,19 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

class LoggingImpl extends BaseService<LoggingOptions> implements Logging {

private static final int FLUSH_WAIT_TIMEOUT_SECONDS = 6;
private final LoggingRpc rpc;
private final Object writeLock = new Object();
private final Set<ApiFuture<Void>> pendingWrites =
Collections.newSetFromMap(new IdentityHashMap<ApiFuture<Void>, Boolean>());

private volatile Synchronicity writeSynchronicity = Synchronicity.ASYNC;
private volatile Severity flushSeverity = Severity.ERROR;
private volatile Severity flushSeverity = null;
private boolean closed;

private static final Function<Empty, Boolean> EMPTY_TO_BOOLEAN_FUNCTION =
Expand Down Expand Up @@ -553,11 +556,13 @@ public void write(Iterable<LogEntry> logEntries, WriteOption... options) {

try {
writeLogEntries(logEntries, options);
for (LogEntry logEntry : logEntries) {
// flush pending writes if log severity at or above flush severity
if (logEntry.getSeverity().compareTo(flushSeverity) >= 0) {
flush();
break;
if (flushSeverity != null) {
for (LogEntry logEntry : logEntries) {
// flush pending writes if log severity at or above flush severity
if (logEntry.getSeverity().compareTo(flushSeverity) >= 0) {
flush();
break;
}
}
}
} finally {
Expand All @@ -574,8 +579,8 @@ public void flush() {
}

try {
ApiFutures.allAsList(writesToFlush).get();
} catch (InterruptedException | ExecutionException e) {
ApiFutures.allAsList(writesToFlush).get(FLUSH_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1188,6 +1188,41 @@ public void testWriteLogEntries() {
logging.write(ImmutableList.of(LOG_ENTRY1, LOG_ENTRY2));
}

@Test
public void testWriteLogEntriesDoesnotEnableFlushByDefault() {
WriteLogEntriesRequest request =
WriteLogEntriesRequest.newBuilder()
.addAllEntries(
Iterables.transform(
ImmutableList.of(
LOG_ENTRY1, LOG_ENTRY2.toBuilder().setSeverity(Severity.EMERGENCY).build()),
LogEntry.toPbFunction(PROJECT)))
.build();
ApiFuture<WriteLogEntriesResponse> apiFuture = SettableApiFuture.create();
EasyMock.expect(loggingRpcMock.write(request)).andReturn(apiFuture);
EasyMock.replay(rpcFactoryMock, loggingRpcMock);
logging = options.getService();
logging.write(
ImmutableList.of(
LOG_ENTRY1, LOG_ENTRY2.toBuilder().setSeverity(Severity.EMERGENCY).build()));
}

@Test
public void testWriteLogEntriesWithSeverityFlushEnabled() {
WriteLogEntriesRequest request =
WriteLogEntriesRequest.newBuilder()
.addAllEntries(
Iterables.transform(
ImmutableList.of(LOG_ENTRY1, LOG_ENTRY2), LogEntry.toPbFunction(PROJECT)))
.build();
WriteLogEntriesResponse response = WriteLogEntriesResponse.newBuilder().build();
EasyMock.expect(loggingRpcMock.write(request)).andReturn(ApiFutures.immediateFuture(response));
EasyMock.replay(rpcFactoryMock, loggingRpcMock);
logging = options.getService();
logging.setFlushSeverity(Severity.DEFAULT);
logging.write(ImmutableList.of(LOG_ENTRY1, LOG_ENTRY2));
}

@Test
public void testWriteLogEntriesWithOptions() {
Map<String, String> labels = ImmutableMap.of("key", "value");
Expand Down

0 comments on commit 2fc12c3

Please sign in to comment.