-
Notifications
You must be signed in to change notification settings - Fork 291
/
Copy pathTestAuditlogImpl.java
162 lines (139 loc) · 6.51 KB
/
TestAuditlogImpl.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/
package org.opensearch.security.auditlog.integration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.opensearch.common.settings.Settings;
import org.opensearch.security.auditlog.impl.AuditMessage;
import org.opensearch.security.auditlog.sink.AuditLogSink;
public class TestAuditlogImpl extends AuditLogSink {
/** Use the results of `doThenWaitForMessages(...)` instead */
@Deprecated
public static List<AuditMessage> messages = new ArrayList<AuditMessage>(100);
/** Check messages indvidually instead of searching this string */
@Deprecated
public static StringBuffer sb = new StringBuffer();
private static final AtomicReference<CountDownLatch> countDownRef = new AtomicReference<>();
private static final AtomicReference<List<AuditMessage>> messagesRef = new AtomicReference<>();
public TestAuditlogImpl(String name, Settings settings, String settingsPrefix, AuditLogSink fallbackSink) {
super(name, settings, null, fallbackSink);
}
public synchronized boolean doStore(AuditMessage msg) {
if (messagesRef.get() == null || countDownRef.get() == null) {
// Ignore any messages that are sent before TestAuditlogImpl is waiting.
return true;
}
sb.append(msg.toPrettyString() + System.lineSeparator());
messagesRef.get().add(msg);
countDownRef.get().countDown();
return true;
}
/** Unneeded after switching to `doThenWaitForMessages(...)` as data is automatically flushed */
@Deprecated
public static synchronized void clear() {
doThenWaitForMessages(() -> {}, 0);
}
/**
* Perform an action and then wait until the expected number of messages have been found.
*/
public static List<AuditMessage> doThenWaitForMessages(final Runnable action, final int expectedCount) {
final List<AuditMessage> missedMessages = new ArrayList<>();
final List<AuditMessage> messages = new ArrayList<>();
final CountDownLatch latch = resetAuditStorage(expectedCount, messages);
try {
action.run();
final int maxSecondsToWaitForMessages = 1;
boolean foundAll = false;
foundAll = latch.await(maxSecondsToWaitForMessages, TimeUnit.SECONDS);
// After the wait has prevent any new messages from being recieved
resetAuditStorage(0, missedMessages);
if (!foundAll || messages.size() != expectedCount) {
throw new MessagesNotFoundException(expectedCount, messages);
}
} catch (final InterruptedException e) {
throw new RuntimeException("Unexpected exception", e);
}
// Do not check for missed messages if no messages were expected
if (expectedCount != 0) {
try {
Thread.sleep(100);
if (missedMessages.size() != 0) {
final String missedMessagesErrorMessage = new StringBuilder().append("Audit messages were missed! ")
.append("Found " + (missedMessages.size()) + " messages.")
.append("Messages found during this time: \n\n")
.append(missedMessages.stream().map(AuditMessage::toString).collect(Collectors.joining("\n")))
.toString();
throw new RuntimeException(missedMessagesErrorMessage);
}
} catch (final Exception e) {
throw new RuntimeException("Unexpected exception", e);
}
}
// Next usage of this class might be using raw stringbuilder / list so reset before that test might run
resetAuditStorage(0, new ArrayList<>());
return new ArrayList<>(messages);
}
/**
* Resets all of the mechanics for fresh messages to be captured
*
* @param expectedMessageCount The number of messages before the latch is signalled, indicating all messages have been recieved
* @param messages Where messages will be stored after being recieved
*/
private static CountDownLatch resetAuditStorage(int expectedMessageCount, List<AuditMessage> messages) {
final CountDownLatch latch = new CountDownLatch(expectedMessageCount);
countDownRef.set(latch);
messagesRef.set(messages);
TestAuditlogImpl.sb = new StringBuffer();
TestAuditlogImpl.messages = messages;
return latch;
}
/**
* Perform an action and then wait until a single message has been found.
*/
public static AuditMessage doThenWaitForMessage(final Runnable action) {
return doThenWaitForMessages(action, 1).get(0);
}
@Override
public boolean isHandlingBackpressure() {
return true;
}
public static class MessagesNotFoundException extends RuntimeException {
private final int expectedCount;
private final int missingCount;
private final List<AuditMessage> foundMessages;
public MessagesNotFoundException(final int expectedCount, List<AuditMessage> foundMessages) {
super(MessagesNotFoundException.createDetailMessage(expectedCount, foundMessages));
this.expectedCount = expectedCount;
this.missingCount = expectedCount - foundMessages.size();
this.foundMessages = foundMessages;
}
public int getExpectedCount() {
return expectedCount;
}
public int getMissingCount() {
return missingCount;
}
public List<AuditMessage> getFoundMessages() {
return foundMessages;
}
private static String createDetailMessage(final int expectedCount, final List<AuditMessage> foundMessages) {
return new StringBuilder().append("Did not receive all " + expectedCount + " audit messages after a short wait. ")
.append("Missing " + (expectedCount - foundMessages.size()) + " messages.")
.append("Messages found during this time: \n\n")
.append(foundMessages.stream().map(AuditMessage::toString).collect(Collectors.joining("\n")))
.toString();
}
}
}