Skip to content

Commit

Permalink
Add workaround for input event problems in MongoInputStatusService (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
bernd authored Apr 29, 2020
1 parent 7489ce0 commit 993554b
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.bson.types.ObjectId;
import org.graylog2.bindings.providers.MongoJackObjectMapperProvider;
import org.graylog2.database.MongoConnection;
import org.graylog2.database.NotFoundException;
import org.graylog2.inputs.InputService;
import org.graylog2.rest.models.system.inputs.responses.InputDeleted;
import org.mongojack.JacksonDBCollection;
import org.mongojack.WriteResult;
Expand All @@ -45,11 +47,14 @@ public class MongoInputStatusService implements InputStatusService {
public static final String COLLECTION_NAME = "input_status";

private final JacksonDBCollection<InputStatusRecord, ObjectId> statusCollection;
private final InputService inputService;

@Inject
public MongoInputStatusService(MongoConnection mongoConnection,
MongoJackObjectMapperProvider objectMapperProvider,
EventBus eventBus) {
MongoJackObjectMapperProvider objectMapperProvider,
InputService inputService,
EventBus eventBus) {
this.inputService = inputService;
DB mongoDatabase = mongoConnection.getDatabase();
DBCollection collection = mongoDatabase.getCollection(COLLECTION_NAME);

Expand All @@ -69,13 +74,13 @@ public Optional<InputStatusRecord> get(final String inputId) {

@Override
public InputStatusRecord save(InputStatusRecord statusRecord) {
WriteResult save = statusCollection.save(statusRecord);
return (InputStatusRecord) save.getSavedObject();
final WriteResult<InputStatusRecord, ObjectId> save = statusCollection.save(statusRecord);
return save.getSavedObject();
}

@Override
public int delete(String inputId) {
WriteResult delete = statusCollection.removeById(new ObjectId(inputId));
final WriteResult<InputStatusRecord, ObjectId> delete = statusCollection.removeById(new ObjectId(inputId));
return delete.getN();
}

Expand All @@ -90,7 +95,17 @@ public int delete(String inputId) {
@Subscribe
public void handleInputDeleted(InputDeleted event) {
LOG.debug("Input Deleted event received for Input [{}]", event.id());
// TODO: Pending issue #7812
// delete(event.id());

// The input system is currently sending an "InputDeleted" event when an input gets deleted AND when an
// input gets stopped. Check the database if the input was only stopped or actually deleted.
// TODO: Remove this workaround once https://github.com/Graylog2/graylog2-server/issues/7812 is fixed
try {
inputService.find(event.id());
// The input still exists so it only has been stopped. Don't do anything.
} catch (NotFoundException e) {
// The input is actually gone (deleted) so we can remove the state.
LOG.debug("Deleting state for input <{}> from database", event.id());
delete(event.id());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.graylog.testing.mongodb.MongoDBFixtures;
import org.graylog.testing.mongodb.MongoDBInstance;
import org.graylog2.bindings.providers.MongoJackObjectMapperProvider;
import org.graylog2.database.NotFoundException;
import org.graylog2.inputs.InputService;
import org.graylog2.rest.models.system.inputs.responses.InputDeleted;
import org.graylog2.shared.bindings.providers.ObjectMapperProvider;
import org.junit.Before;
Expand All @@ -38,6 +40,7 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.when;


public class MongoInputStatusServiceTest {
Expand All @@ -52,6 +55,8 @@ public class MongoInputStatusServiceTest {

@Mock
EventBus mockEventBus;
@Mock
private InputService inputService;

private JacksonDBCollection<InputStatusRecord, ObjectId> db;

Expand All @@ -60,7 +65,7 @@ public void setUp() {
final ObjectMapper objectMapper = new ObjectMapperProvider().get();
final MongoJackObjectMapperProvider mapperProvider = new MongoJackObjectMapperProvider(objectMapper);

cut = new MongoInputStatusService(mongodb.mongoConnection(), mapperProvider, mockEventBus);
cut = new MongoInputStatusService(mongodb.mongoConnection(), mapperProvider, inputService, mockEventBus);

db = JacksonDBCollection.wrap(mongodb.mongoConnection().getDatabase().getCollection(MongoInputStatusService.COLLECTION_NAME),
InputStatusRecord.class,
Expand Down Expand Up @@ -128,6 +133,7 @@ public void get_ReturnsRecord_OnlyAfterRecordSaved() {
InputStatusRecord savedRecord = cut.save(InputStatusRecord.builder()
.inputId("54e3deadbeefdeadbeef8888")
.inputStateData(new InputStateData() {
@Override
public String type() {
return "test_type_8888";
}
Expand Down Expand Up @@ -182,24 +188,37 @@ public void delete_ReturnsZero_WhenDeletingNonExistantRecord() {

@Test
@MongoDBFixtures("input-status.json")
public void handleDeleteEvent_DoesNothing() {
/*
Currently, the InputDeleted event is propagated both when an input is stopped and when an input is deleted. We
would like to clean up the DB when an input is deleted, but not when it is stopped. This method is in place to
be used once there are separate events for input deleted and input stopped. For now, it should do nothing.
*/

cut.handleInputDeleted(new InputDeleted(){
public void handleDeleteEvent_WhenStoppingInputDoesNothing() throws Exception {
final String deletedInput = "54e3deadbeefdeadbeef0001";
final InputDeleted inputDeletedEvent = new InputDeleted() {
@Override
public String id() {
return "54e3deadbeefdeadbeef0001";
return deletedInput;
}
});
};

cut.handleInputDeleted(inputDeletedEvent);
// The record should not be removed from the DB
Optional<InputStatusRecord> optDbRecord = cut.get("54e3deadbeefdeadbeef0001");
assertThat(cut.get(deletedInput).isPresent(), is(true));
}

assertThat(optDbRecord, notNullValue());
assertThat(optDbRecord.isPresent(), is(true));
@Test
@MongoDBFixtures("input-status.json")
public void handleDeleteEvent_WhenDeletingInputRemovesState() throws Exception {
final String deletedInput = "54e3deadbeefdeadbeef0001";
final InputDeleted inputDeletedEvent = new InputDeleted() {
@Override
public String id() {
return deletedInput;
}
};

// Simulate that the input has actually been deleted
// TODO: This will change once we fix https://github.com/Graylog2/graylog2-server/issues/7812
when(inputService.find(deletedInput)).thenThrow(new NotFoundException());

cut.handleInputDeleted(inputDeletedEvent);
// The record should be removed from the DB
assertThat(cut.get(deletedInput).isPresent(), is(false));
}
}

0 comments on commit 993554b

Please sign in to comment.