Skip to content

Commit

Permalink
For now, use local cluster
Browse files Browse the repository at this point in the history
Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn committed Nov 21, 2022
1 parent 1e9d5e4 commit 0647d5b
Show file tree
Hide file tree
Showing 10 changed files with 254 additions and 29 deletions.
83 changes: 83 additions & 0 deletions .github/workflows/docker-security-test-workflow.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
name: Docker Security Test Workflow
on:
pull_request:
branches:
- "*"
push:
branches:
- "*"

jobs:
test:
# This job runs on Linux
runs-on: ubuntu-latest
steps:
- name: Set Up JDK
uses: actions/setup-java@v1
with:
java-version: 17
- name: Checkout Branch
uses: actions/checkout@v2
- name: Build Index Management
run: ./gradlew assemble -Dbuild.snapshot=false
- name: Pull and Run Docker
run: |
plugin=`basename $(ls build/distributions/*.zip)`
list_of_files=`ls`
list_of_all_files=`ls build/distributions/`
version=`echo $plugin|awk -F- '{print $4}'| cut -d. -f 1-3`
plugin_version=`echo $plugin|awk -F- '{print $4}'| cut -d. -f 1-4`
qualifier=`echo $plugin|awk -F- '{print $4}'| cut -d. -f 1-1`
candidate_version=`echo $plugin|awk -F- '{print $5}'| cut -d. -f 1-1`
if qualifier
then
docker_version=$version-$qualifier
else
docker_version=$version
fi
[[ -z $candidate_version ]] && candidate_version=$qualifier && qualifier=""
echo plugin version plugin_version qualifier candidate_version docker_version
echo "($plugin) ($version) ($plugin_version) ($qualifier) ($candidate_version) ($docker_version)"
echo $ls $list_of_all_files
if docker pull opensearchstaging/opensearch:$docker_version
then
echo "FROM opensearchstaging/opensearch:$docker_version" >> Dockerfile
echo "RUN if [ -d /usr/share/opensearch/plugins/opensearch-index-management ]; then /usr/share/opensearch/bin/opensearch-plugin remove opensearch-index-management; fi" >> Dockerfile
echo "ADD build/distributions/$plugin /tmp/" >> Dockerfile
echo "RUN /usr/share/opensearch/bin/opensearch-plugin install --batch file:/tmp/$plugin" >> Dockerfile
echo "RUN echo 'path.repo: ["/usr/share/opensearch/data/repo"]' >> /usr/share/opensearch/config/opensearch.yml" >> Dockerfile
docker build -t opensearch-index-management:test .
echo "imagePresent=true" >> $GITHUB_ENV
else
echo "imagePresent=false" >> $GITHUB_ENV
fi
- name: Run Docker Image
if: env.imagePresent == 'true'
run: |
cd ..
docker run -p 9200:9200 -d -p 9600:9600 -e "discovery.type=single-node" opensearch-index-management:test
sleep 120
- name: Run Index Management Test for security enabled test cases
if: env.imagePresent == 'true'
run: |
cluster_running=`curl -XGET https://localhost:9200/_cat/plugins -u admin:admin --insecure`
echo $cluster_running
security=`curl -XGET https://localhost:9200/_cat/plugins -u admin:admin --insecure |grep opensearch-security|wc -l`
echo $security
if [ $security -gt 0 ]
then
echo "Security plugin is available"
./gradlew integTest -Dtests.rest.cluster=localhost:9200 -Dtests.cluster=localhost:9200 -Dtests.clustername=docker-cluster -Dsecurity=true -Dhttps=true -Duser=admin -Dpassword=admin
else
echo "Security plugin is NOT available skipping this run as tests without security have already been run"
fi
- name: Upload failed logs
uses: actions/upload-artifact@v2
if: failure()
with:
name: logs
path: build/testclusters/integTest-*/logs/*
9 changes: 7 additions & 2 deletions .github/workflows/security-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@ jobs:
# index-management
- name: Checkout Branch
uses: actions/checkout@v2
- name: Run integration tests with security plugin
run: ./gradlew integTest -Dsecurity=true -Dhttps=true
- name: Start cluster with security plugin
run: |
./gradlew run -Dsecurity=true &
sleep 120
- name: Run integration tests
run: |
./gradlew integTestRemote -Dsecurity=true -Dhttps=true -Dtests.rest.cluster="localhost:9200" -Dtests.cluster="localhost:9200" -Dtests.clustername="integTest" -Duser=admin -Dpassword=admin
- name: Upload failed logs
uses: actions/upload-artifact@v2
if: failure()
Expand Down
127 changes: 119 additions & 8 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,22 @@


import org.opensearch.gradle.testclusters.OpenSearchCluster
import org.opensearch.gradle.testclusters.TestClusterConfiguration
import org.opensearch.gradle.testclusters.StandaloneRestIntegTestTask

import javax.net.ssl.HostnameVerifier
import javax.net.ssl.HttpsURLConnection
import javax.net.ssl.SSLContext
import javax.net.ssl.SSLSession
import javax.net.ssl.TrustManager
import javax.net.ssl.X509TrustManager
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.security.GeneralSecurityException
import java.security.cert.X509Certificate
import java.util.concurrent.Callable
import java.util.concurrent.TimeUnit
import java.util.function.Predicate
import org.opensearch.gradle.http.WaitForHttpResource
import java.util.stream.Collectors
import java.util.concurrent.TimeUnit


buildscript {
Expand Down Expand Up @@ -301,7 +308,7 @@ afterEvaluate {
node.setting("plugins.security.ssl.http.pemtrustedcas_filepath", "root-ca.pem")
node.setting("plugins.security.allow_unsafe_democertificates", "true")
node.setting("plugins.security.allow_default_init_securityindex", "true")
node.setting("plugins.security.authcz.admin_dn", "\n - CN=kirk,OU=client,O=client,L=test, C=de")
node.setting("plugins.security.authcz.admin_dn", "\n - CN=kirk,OU=client,O=client,L=test,C=de")
node.setting("plugins.security.audit.type", "internal_elasticsearch")
node.setting("plugins.security.enable_snapshot_restore_privilege", "true")
node.setting("plugins.security.check_snapshot_restore_write_privileges", "true")
Expand Down Expand Up @@ -405,22 +412,125 @@ testClusters.integTest {
setting 'path.repo', repo.absolutePath
}

// Re-write WaitForHttpResource with updated code to support security plugin use case
class WaitForClusterYellow {

private URL url
private String username
private String password
Set<Integer> validResponseCodes = Collections.singleton(200)

WaitForClusterYellow(String protocol, String host, int numberOfNodes) throws MalformedURLException {
this(new URL(protocol + "://" + host + "/_cluster/health?wait_for_nodes=>=" + numberOfNodes + "&wait_for_status=yellow"))
}

WaitForClusterYellow(URL url) {
this.url = url
}

boolean wait(int durationInMs) throws GeneralSecurityException, InterruptedException, IOException {
final long waitUntil = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(durationInMs)
final long sleep = 100

IOException failure = null
while (true) {
try {
checkResource()
return true
} catch (IOException e) {
failure = e
}
if (System.nanoTime() < waitUntil) {
Thread.sleep(sleep)
} else {
throw failure
}
}
}

void setUsername(String username) {
this.username = username
}

void setPassword(String password) {
this.password = password
}

void checkResource() throws IOException {
final HttpURLConnection connection = buildConnection()
connection.connect()
final Integer response = connection.getResponseCode()
if (validResponseCodes.contains(response)) {
return
} else {
throw new IOException(response + " " + connection.getResponseMessage())
}
}

HttpURLConnection buildConnection() throws IOException {
final HttpURLConnection connection = (HttpURLConnection) this.@url.openConnection()

if (connection instanceof HttpsURLConnection) {
TrustManager[] trustAllCerts = [new X509TrustManager() {
X509Certificate[] getAcceptedIssuers() {
return null
}

void checkClientTrusted(X509Certificate[] certs, String authType) {
}

void checkServerTrusted(X509Certificate[] certs, String authType) {
}
}
] as TrustManager[]
SSLContext sc = SSLContext.getInstance("SSL")
sc.init(null, trustAllCerts, new java.security.SecureRandom())
connection.setSSLSocketFactory(sc.getSocketFactory())
// Create all-trusting host name verifier
HostnameVerifier allHostsValid = new HostnameVerifier() {
boolean verify(String hostname, SSLSession session) {
return true
}
}
// Install the all-trusting host verifier
connection.setHostnameVerifier(allHostsValid)
}

configureBasicAuth(connection)
connection.setRequestMethod("GET")
return connection
}

void configureBasicAuth(HttpURLConnection connection) {
if (username != null) {
if (password == null) {
throw new IllegalStateException("Basic Auth user [" + username + "] has been set, but no password has been configured")
}
connection.setRequestProperty(
"Authorization",
"Basic " + Base64.getEncoder().encodeToString((username + ":" + password).getBytes(StandardCharsets.UTF_8))
)
}
}

}

def waitForClusterSetup(OpenSearchCluster cluster, Boolean securityEnabled) {
cluster.@waitConditions.clear()
String unicastUris = cluster.nodes.stream().flatMap { node ->
node.getAllTransportPortURI().stream()
}.collect(Collectors.joining("\n"))
cluster.nodes.forEach {node ->
try {
Files.write(node.getConfigDir().resolve("unicast_hosts.txt"), unicastUris.getBytes(StandardCharsets.UTF_8));
Files.write(node.getConfigDir().resolve("unicast_hosts.txt"), unicastUris.getBytes(StandardCharsets.UTF_8))
} catch (IOException e) {
throw new java.io.UncheckedIOException("Failed to write configuation files for " + this, e);
throw new java.io.UncheckedIOException("Failed to write configuation files for " + this, e)
}
}

Predicate pred = {
String protocol = securityEnabled ? "https" : "http"
WaitForHttpResource wait = new WaitForHttpResource(protocol, cluster.getFirstNode().getHttpSocketURI(), cluster.nodes.size())
WaitForClusterYellow wait = new WaitForClusterYellow(protocol, cluster.getFirstNode().getHttpSocketURI(), cluster.nodes.size())
wait.setUsername(System.getProperty("user", "admin"))
wait.setPassword(System.getProperty("password", "admin"))
return wait.wait(500)
Expand Down Expand Up @@ -451,7 +561,7 @@ integTest {
}
}

// The -Dcluster.debug option makes the cluster debuggable; this makes the tests debuggable
// The -Dcluster.debug option makes the cluster debuggable, this makes the tests debuggable
if (System.getProperty("test.debug") != null) {
jvmArgs '-agentlib:jdwp=transport=dt_socket,server=n,suspend=y,address=8000'
}
Expand Down Expand Up @@ -490,6 +600,7 @@ task integTestRemote(type: RestIntegTestTask) {
systemProperty "https", System.getProperty("https")
systemProperty "user", System.getProperty("user")
systemProperty "password", System.getProperty("password")
systemProperty 'buildDir', buildDir.path

if (System.getProperty("tests.rest.bwcsuite") == null) {
filter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ class TransportIndexRollupAction @Inject constructor(

private fun validateTargetIndexName(): Boolean {
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(request.rollup, request.rollup.targetIndex)
return targetIndexResolvedName.contains("*") == false && targetIndexResolvedName.contains("?") == false
return !targetIndexResolvedName.contains("*") && !targetIndexResolvedName.contains("?")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ abstract class ODFERestTestCase : OpenSearchRestTestCase() {
// create adminDN (super-admin) client
val uri = javaClass.classLoader.getResource("security/sample.pem")?.toURI()
val configPath = PathUtils.get(uri).parent.toAbsolutePath()
SecureRestClientBuilder(settings, configPath, hosts).setSocketTimeout(5000).build()
// TODO once common utils is updated in maven, we can use this method to define hosts
// SecureRestClientBuilder(settings, configPath, hosts).setSocketTimeout(5000).build()
SecureRestClientBuilder(settings, configPath).setSocketTimeout(5000).build()
}
false -> {
// create client with passed user
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,11 +281,14 @@ class RolloverActionIT : IndexStateManagementRestTestCase() {
insertSampleData(index = firstIndex, docCount = 20, delay = 0, jsonString = "{ \"test_field\": \"${OpenSearchTestCase.randomAlphaOfLength(7000)}\" }", routing = "custom_routing")
flush(firstIndex, true)
forceMerge(firstIndex, "1")
val primaryShards = (cat("shards/$firstIndex?format=json&bytes=b") as List<Map<String, Any>>).filter { it["prirep"] == "p" }
// TODO seeing flakyness of multiple shards over 100kb, log out shards to further debug
logger.info("cat shards result: $primaryShards")
val primaryShardsOver100KB = primaryShards.filter { (it["store"] as String).toInt() > 100000 }
assertTrue("Found multiple shards over 100kb", primaryShardsOver100KB.size == 1)
val primaryShards = waitFor {
val primaryShards = (cat("shards/$firstIndex?format=json&bytes=b") as List<Map<String, Any>>).filter { it["prirep"] == "p" }
// TODO seeing flakyness of multiple shards over 100kb, log out shards to further debug
logger.info("cat shards result: $primaryShards")
val primaryShardsOver100KB = primaryShards.filter { (it["store"] as String).toInt() > 100000 }
assertTrue("Shard over 100kb is not exactly 1", primaryShardsOver100KB.size == 1)
primaryShards
}
primaryShardSizeBytes = primaryShards.maxOf { (it["store"] as String).toInt() }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import java.io.InputStreamReader
import java.nio.charset.Charset
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.util.Locale

class RefreshSearchAnalyzerActionIT : IndexManagementRestTestCase() {
private val testIndexName = javaClass.simpleName.lowercase(Locale.ROOT)

@After
fun clearIndicesAfterEachTest() {
Expand All @@ -34,7 +36,7 @@ class RefreshSearchAnalyzerActionIT : IndexManagementRestTestCase() {
fun `test index time analyzer`() {
val buildDir = System.getProperty("buildDir")
val numNodes = System.getProperty("cluster.number_of_nodes", "1").toInt()
val indexName = "testindex"
val indexName = "${testIndexName}_index_1"

for (i in 0 until numNodes) {
writeToFile("$buildDir/testclusters/integTest-$i/config/pacman_synonyms.txt", "hello, hola")
Expand Down Expand Up @@ -80,7 +82,7 @@ class RefreshSearchAnalyzerActionIT : IndexManagementRestTestCase() {
fun `test search time analyzer`() {
val buildDir = System.getProperty("buildDir")
val numNodes = System.getProperty("cluster.number_of_nodes", "1").toInt()
val indexName = "testindex"
val indexName = "${testIndexName}_index_2"

for (i in 0 until numNodes) {
writeToFile("$buildDir/testclusters/integTest-$i/config/pacman_synonyms.txt", "hello, hola")
Expand Down Expand Up @@ -124,7 +126,7 @@ class RefreshSearchAnalyzerActionIT : IndexManagementRestTestCase() {
}

fun `test alias`() {
val indexName = "testindex"
val indexName = "${testIndexName}_index_3"
val numNodes = System.getProperty("cluster.number_of_nodes", "1").toInt()
val buildDir = System.getProperty("buildDir")
val aliasName = "test"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,17 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() {
fun setDebugLogLevel() {
client().makeRequest(
"PUT", "_cluster/settings",
StringEntity("""{"transient":{"logger.org.opensearch.indexmanagement.rollup":"DEBUG"}}""", APPLICATION_JSON)
StringEntity(
"""
{
"transient": {
"logger.org.opensearch.indexmanagement.rollup":"DEBUG",
"logger.org.opensearch.jobscheduler":"DEBUG"
}
}
""".trimIndent(),
APPLICATION_JSON
)
)
}

Expand Down Expand Up @@ -233,10 +243,16 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() {
}
val intervalSchedule = (update.jobSchedule as IntervalSchedule)
val millis = Duration.of(intervalSchedule.interval.toLong(), intervalSchedule.unit).minusSeconds(2).toMillis()
val startTimeMillis = desiredStartTimeMillis ?: Instant.now().toEpochMilli() - millis
val startTimeMillis = desiredStartTimeMillis ?: (Instant.now().toEpochMilli() - millis)
val waitForActiveShards = if (isMultiNode) "all" else "1"
// TODO flaky: Add this log to confirm this update is missed by job scheduler
// This miss is because shard remove, job scheduler deschedule on the original node and reschedule on another node
// However the shard comes back, and job scheduler deschedule on the another node and reschedule on the original node
// During this period, this update got missed
// Since from the log, this happens very fast (within 0.1~0.2s), the above cluster explain may not have the granularity to catch this.
logger.info("Update rollup start time to $startTimeMillis")
val response = client().makeRequest(
"POST", "$INDEX_MANAGEMENT_INDEX/_update/${update.id}?wait_for_active_shards=$waitForActiveShards",
"POST", "$INDEX_MANAGEMENT_INDEX/_update/${update.id}?wait_for_active_shards=$waitForActiveShards&refresh=true",
StringEntity(
"{\"doc\":{\"rollup\":{\"schedule\":{\"interval\":{\"start_time\":" +
"\"$startTimeMillis\"}}}}}",
Expand Down
Loading

0 comments on commit 0647d5b

Please sign in to comment.