Skip to content

Commit

Permalink
#385: Add teragrep set|get config (#394)
Browse files Browse the repository at this point in the history
* pth_03 update to 9.2.0, initial tg set config version.

* tg set config mismatched types test

* apply spotless

* initial TeragrepGetConfigStep

* add tg get config with test

* applied spotless

* set TeragrepGet/SetConfigStep as final class, improved GeneratedDatasource exception message, cleaned up debug ds.show(), removed unnecessary visitor function code from visitT_setParameter()
  • Loading branch information
eemhu authored Nov 7, 2024
1 parent 17f2ce6 commit 7929a08
Show file tree
Hide file tree
Showing 5 changed files with 304 additions and 4 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
<teragrep.dpf_03.version>11.0.1</teragrep.dpf_03.version>
<teragrep.jpr_01.version>3.1.1</teragrep.jpr_01.version>
<teragrep.jue_01.version>0.4.3</teragrep.jue_01.version>
<teragrep.pth_03.version>9.1.0</teragrep.pth_03.version>
<teragrep.pth_03.version>9.2.0</teragrep.pth_03.version>
<teragrep.pth_06.version>3.2.2</teragrep.pth_06.version>
<teragrep.rlp_01.version>4.0.1</teragrep.rlp_01.version>
<teragrep.rlp_03.version>1.7.6</teragrep.rlp_03.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,6 @@ public Node visitTeragrepTransformation(DPLParser.TeragrepTransformationContext
* @return CatalystNode
*/
private Node teragrepTransformationEmitCatalyst(DPLParser.TeragrepTransformationContext ctx) {

LOGGER.info("TeragrepTransformation Emit Catalyst");

// get zeppelin config
zplnConfig = catCtx.getConfig();
if (zplnConfig != null) {
Expand Down Expand Up @@ -164,11 +161,27 @@ public Node visitT_getParameter(DPLParser.T_getParameterContext ctx) {
else if (ctx.t_getArchiveSummaryParameter() != null) {
return visit(ctx.t_getArchiveSummaryParameter());
}
else if (ctx.COMMAND_TERAGREP_MODE_CONFIG() != null) {
return new StepNode(new TeragrepGetConfigStep(catCtx));
}
else {
throw new IllegalArgumentException("Unsupported teragrep command: " + ctx.getText());
}
}

@Override
public Node visitT_setParameter(DPLParser.T_setParameterContext ctx) {
return visitChildren(ctx);
}

@Override
public Node visitT_setConfigParameter(DPLParser.T_setConfigParameterContext ctx) {
final String key = new UnquotedText(new TextString(ctx.t_configKeyParameter().stringType().getText())).read();
final String value = new UnquotedText(new TextString(ctx.t_configValueParameter().stringType().getText()))
.read();
return new StepNode(new TeragrepSetConfigStep(catCtx, key, value));
}

@Override
public Node visitT_getArchiveSummaryParameter(DPLParser.T_getArchiveSummaryParameterContext ctx) {
// archive summary
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Teragrep Data Processing Language (DPL) translator for Apache Spark (pth_10)
* Copyright (C) 2019-2024 Suomen Kanuuna Oy
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*
* Additional permission under GNU Affero General Public License version 3
* section 7
*
* If you modify this Program, or any covered work, by linking or combining it
* with other code, such other code is not for that reason alone subject to any
* of the requirements of the GNU Affero GPL version 3 as long as this Program
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
* modifications.
*
* Supplemented terms under GNU Affero General Public License version 3
* section 7
*
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
* versions must be marked as "Modified version of" The Program.
*
* Names of the licensors and authors may not be used for publicity purposes.
*
* No rights are granted for use of trade names, trademarks, or service marks
* which are in The Program if any.
*
* Licensee must indemnify licensors and authors for any liability that these
* contractual assumptions impose on licensors and authors.
*
* To the extent this program is licensed as part of the Commercial versions of
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/
package com.teragrep.pth10.steps.teragrep;

import com.teragrep.pth10.ast.DPLParserCatalystContext;
import com.teragrep.pth10.datasources.GeneratedDatasource;
import com.teragrep.pth10.steps.AbstractStep;
import com.typesafe.config.ConfigValue;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQueryException;

import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public final class TeragrepGetConfigStep extends AbstractStep {

private final DPLParserCatalystContext catCtx;

public TeragrepGetConfigStep(DPLParserCatalystContext catCtx) {
this.catCtx = catCtx;
}

@Override
public Dataset<Row> get(Dataset<Row> dataset) throws StreamingQueryException {
List<String> configs = new ArrayList<>();

for (Map.Entry<String, ConfigValue> entry : catCtx.getConfig().entrySet()) {
configs.add(entry.getKey().concat(" = ").concat(entry.getValue().unwrapped().toString()));
}

GeneratedDatasource datasource = new GeneratedDatasource(catCtx);
try {
dataset = datasource.constructStream(configs, "teragrep get config");
}
catch (InterruptedException | UnknownHostException e) {
throw new RuntimeException("Unable to construct 'teragrep get config' dataset", e);
}

return dataset;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Teragrep Data Processing Language (DPL) translator for Apache Spark (pth_10)
* Copyright (C) 2019-2024 Suomen Kanuuna Oy
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*
* Additional permission under GNU Affero General Public License version 3
* section 7
*
* If you modify this Program, or any covered work, by linking or combining it
* with other code, such other code is not for that reason alone subject to any
* of the requirements of the GNU Affero GPL version 3 as long as this Program
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
* modifications.
*
* Supplemented terms under GNU Affero General Public License version 3
* section 7
*
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
* versions must be marked as "Modified version of" The Program.
*
* Names of the licensors and authors may not be used for publicity purposes.
*
* No rights are granted for use of trade names, trademarks, or service marks
* which are in The Program if any.
*
* Licensee must indemnify licensors and authors for any liability that these
* contractual assumptions impose on licensors and authors.
*
* To the extent this program is licensed as part of the Commercial versions of
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/
package com.teragrep.pth10.steps.teragrep;

import com.teragrep.pth10.ast.DPLParserCatalystContext;
import com.teragrep.pth10.steps.AbstractStep;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValue;
import com.typesafe.config.ConfigValueFactory;
import com.typesafe.config.ConfigValueType;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class TeragrepSetConfigStep extends AbstractStep {

private final Logger LOGGER = LoggerFactory.getLogger(TeragrepSetConfigStep.class);
private final DPLParserCatalystContext catCtx;
private final String key;
private final String value;

public TeragrepSetConfigStep(final DPLParserCatalystContext catCtx, final String key, final String value) {
this.catCtx = catCtx;
this.key = key;
this.value = value;
}

@Override
public Dataset<Row> get(Dataset<Row> dataset) throws StreamingQueryException {
if (catCtx == null) {
throw new IllegalStateException("DPLParserCatalystContext not set");
}

final Config config = catCtx.getConfig();
if (config == null || config.isEmpty()) {
throw new IllegalArgumentException("Config is null or empty");
}

if (!config.hasPath(key)) {
throw new IllegalArgumentException("Config key " + key + " not found");
}

final ConfigValue oldValue = config.getValue(key);
final Config newConfig;
if (oldValue.valueType().equals(ConfigValueType.BOOLEAN)) {
newConfig = config.withValue(key, ConfigValueFactory.fromAnyRef(Boolean.parseBoolean(value)));
}
else if (oldValue.valueType().equals(ConfigValueType.NUMBER)) {
// getInt(), getLong() will work without decimals
newConfig = config.withValue(key, ConfigValueFactory.fromAnyRef(Double.parseDouble(value)));
}
else if (oldValue.valueType().equals(ConfigValueType.STRING)) {
newConfig = config.withValue(key, ConfigValueFactory.fromAnyRef(value));
}
else {
throw new IllegalArgumentException("Unknown config value type: " + oldValue.valueType());
}

LOGGER.info("Set configuration <[{}]> to new value <[{}]>", key, value);

catCtx.setConfig(newConfig);

return dataset;
}
}
91 changes: 91 additions & 0 deletions src/test/java/com/teragrep/pth10/TeragrepTransformationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
*/
package com.teragrep.pth10;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.StructField;
Expand Down Expand Up @@ -819,4 +822,92 @@ public void tgForEachBatchTest() {
Assertions.assertEquals(5, ds.count());
});
}

@Test
@DisabledIfSystemProperty(
named = "skipSparkTest",
matches = "true"
)
public void tgSetConfigStringTest() {
Config fakeConfig = ConfigFactory
.defaultApplication()
.withValue("dpl.pth_00.dummy.value", ConfigValueFactory.fromAnyRef("oldValue"));
streamingTestUtil.getCtx().setConfig(fakeConfig);
Assertions.assertEquals("oldValue", streamingTestUtil.getCtx().getConfig().getString("dpl.pth_00.dummy.value"));
streamingTestUtil
.performDPLTest("index=index_A | teragrep set config dpl.pth_00.dummy.value newValue", testFile, ds -> {
Assertions
.assertEquals(
"newValue",
streamingTestUtil.getCtx().getConfig().getString("dpl.pth_00.dummy.value")
);
});
}

@Test
@DisabledIfSystemProperty(
named = "skipSparkTest",
matches = "true"
)
public void tgSetConfigLongTest() {
Config fakeConfig = ConfigFactory
.defaultApplication()
.withValue("dpl.pth_00.dummy.value", ConfigValueFactory.fromAnyRef(12345));
streamingTestUtil.getCtx().setConfig(fakeConfig);
Assertions.assertEquals(12345L, streamingTestUtil.getCtx().getConfig().getLong("dpl.pth_00.dummy.value"));
streamingTestUtil
.performDPLTest("index=index_A | teragrep set config dpl.pth_00.dummy.value 99999", testFile, ds -> {
Assertions
.assertEquals(
99999L, streamingTestUtil.getCtx().getConfig().getLong("dpl.pth_00.dummy.value")
);
});
}

@Test
@DisabledIfSystemProperty(
named = "skipSparkTest",
matches = "true"
)
public void tgSetConfigMismatchedTypesTest() {
Config fakeConfig = ConfigFactory
.defaultApplication()
.withValue("dpl.pth_00.dummy.value", ConfigValueFactory.fromAnyRef(12345));
streamingTestUtil.getCtx().setConfig(fakeConfig);
Assertions.assertEquals(12345L, streamingTestUtil.getCtx().getConfig().getLong("dpl.pth_00.dummy.value"));
Throwable t = streamingTestUtil
.performThrowingDPLTest(
NumberFormatException.class,
"index=index_A | teragrep set config dpl.pth_00.dummy.value stringValue", testFile, ds -> {
}
);

Assertions.assertEquals("For input string: \"stringValue\"", t.getMessage());
}

@Test
@DisabledIfSystemProperty(
named = "skipSparkTest",
matches = "true"
)
public void tgGetConfigTest() {
Config fakeConfig = ConfigFactory
.defaultApplication()
.withValue("dpl.pth_00.dummy.value", ConfigValueFactory.fromAnyRef(12345))
.withValue("dpl.pth_00.another.dummy.value", ConfigValueFactory.fromAnyRef("string_here"));
streamingTestUtil.getCtx().setConfig(fakeConfig);

streamingTestUtil.performDPLTest("index=index_A | teragrep get config", testFile, ds -> {
List<String> configs = ds
.select("_raw")
.collectAsList()
.stream()
.map(r -> r.getAs(0).toString())
.collect(Collectors.toList());
Assertions.assertEquals(2, configs.size());
Assertions.assertTrue(configs.contains("dpl.pth_00.another.dummy.value = string_here"));
Assertions.assertTrue(configs.contains("dpl.pth_00.dummy.value = 12345"));
});
}

}

0 comments on commit 7929a08

Please sign in to comment.