Skip to content

Commit

Permalink
move table and regex option to dpl (#414)
Browse files Browse the repository at this point in the history
* move table and regex option to dpl

* fix equals with Pattern class

* add testWriteFilterTypesToDatabase

* refactoring and rebase

* remove unused properties from tests and refactor some names, use fields parameter to test non-equality

* apply spotless

* use interface and objects to get values from parser context

* add bloom option mode parameter and move interace to ast/

* assert equals exception message, remove default value constructor from bloom filter table, secondary constructor for Pattern.compile

---------

Co-authored-by: Mikko Kortelainen <[email protected]>
  • Loading branch information
elliVM and kortemik authored Dec 3, 2024
1 parent 5528069 commit ec971ed
Show file tree
Hide file tree
Showing 21 changed files with 990 additions and 327 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
<teragrep.dpf_03.version>11.0.1</teragrep.dpf_03.version>
<teragrep.jpr_01.version>3.1.1</teragrep.jpr_01.version>
<teragrep.jue_01.version>0.4.3</teragrep.jue_01.version>
<teragrep.pth_03.version>9.2.0</teragrep.pth_03.version>
<teragrep.pth_03.version>9.3.0</teragrep.pth_03.version>
<teragrep.pth_06.version>3.3.3</teragrep.pth_06.version>
<teragrep.rlp_01.version>4.0.1</teragrep.rlp_01.version>
<teragrep.rlp_03.version>9.0.0</teragrep.rlp_03.version>
Expand Down
53 changes: 53 additions & 0 deletions src/main/java/com/teragrep/pth10/ast/ContextValue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Teragrep Data Processing Language (DPL) translator for Apache Spark (pth_10)
* Copyright (C) 2019-2024 Suomen Kanuuna Oy
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*
* Additional permission under GNU Affero General Public License version 3
* section 7
*
* If you modify this Program, or any covered work, by linking or combining it
* with other code, such other code is not for that reason alone subject to any
* of the requirements of the GNU Affero GPL version 3 as long as this Program
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
* modifications.
*
* Supplemented terms under GNU Affero General Public License version 3
* section 7
*
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
* versions must be marked as "Modified version of" The Program.
*
* Names of the licensors and authors may not be used for publicity purposes.
*
* No rights are granted for use of trade names, trademarks, or service marks
* which are in The Program if any.
*
* Licensee must indemnify licensors and authors for any liability that these
* contractual assumptions impose on licensors and authors.
*
* To the extent this program is licensed as part of the Commercial versions of
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/
package com.teragrep.pth10.ast;

/** Interface to get value from Parser context */
public interface ContextValue<T> {

T value();

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@
import com.teragrep.pth10.ast.bo.Token;
import com.teragrep.pth10.ast.commands.logicalstatement.LogicalStatementCatalyst;
import com.teragrep.pth10.ast.commands.logicalstatement.LogicalStatementXML;
import com.teragrep.pth10.ast.commands.transformstatement.teragrep.ModeFromBloomContext;
import com.teragrep.pth10.ast.commands.transformstatement.teragrep.EstimateColumnFromBloomContext;
import com.teragrep.pth10.ast.commands.transformstatement.teragrep.InputColumnFromBloomContext;
import com.teragrep.pth10.ast.commands.transformstatement.teragrep.OutputColumnFromBloomContext;
import com.teragrep.pth10.ast.commands.transformstatement.teragrep.RegexValueFromBloomContext;
import com.teragrep.pth10.ast.commands.transformstatement.teragrep.TableNameFromBloomContext;
import com.teragrep.pth10.ast.ContextValue;
import com.teragrep.pth10.steps.AbstractStep;
import com.teragrep.pth10.steps.teragrep.*;
import com.teragrep.pth10.steps.teragrep.AbstractTokenizerStep;
Expand Down Expand Up @@ -463,69 +470,58 @@ public Node visitT_kafkaSaveModeParameter(DPLParser.T_kafkaSaveModeParameterCont

// exec bloom (create|update|estimate)
@Override
public Node visitT_bloomModeParameter(DPLParser.T_bloomModeParameterContext ctx) {
TeragrepBloomStep.BloomMode mode = TeragrepBloomStep.BloomMode.DEFAULT;
String inputCol = null;
String outputCol = null;
String estimateCol = null;
if (ctx.t_bloomOptionParameter() != null) {
if (ctx.t_bloomOptionParameter().COMMAND_TERAGREP_MODE_CREATE() != null) {
// bloom create
mode = TeragrepBloomStep.BloomMode.CREATE;
}
else if (ctx.t_bloomOptionParameter().COMMAND_TERAGREP_MODE_UPDATE() != null) {
// bloom update
mode = TeragrepBloomStep.BloomMode.UPDATE;
}
else if (ctx.t_bloomOptionParameter().COMMAND_TERAGREP_MODE_ESTIMATE() != null) {
// bloom estimate
mode = TeragrepBloomStep.BloomMode.ESTIMATE;
}

if (ctx.t_bloomOptionParameter().t_inputParameter() != null) {
inputCol = new UnquotedText(
new TextString(ctx.t_bloomOptionParameter().t_inputParameter().fieldType().getText())
).read();
}
else {
inputCol = "tokens";
}

if (ctx.t_bloomOptionParameter().t_outputParameter() != null) {
outputCol = new UnquotedText(
new TextString(ctx.t_bloomOptionParameter().t_outputParameter().fieldType().getText())
).read();
}
else {
outputCol = String.format("estimate(%s)", inputCol);
}

if (ctx.t_bloomOptionParameter().t_estimatesParameter() != null) {
estimateCol = new UnquotedText(
new TextString(ctx.t_bloomOptionParameter().t_estimatesParameter().fieldType().getText())
).read();
}
else {
estimateCol = String.format("estimate(%s)", inputCol);
}
public Node visitT_bloomModeParameter(final DPLParser.T_bloomModeParameterContext ctx) {
if (ctx.t_bloomOptionParameter() == null) {
throw new IllegalArgumentException("Bloom option parameter in '| teragrep exec bloom' was null");
}
return visit(ctx.t_bloomOptionParameter());
}

TeragrepBloomStep bloomStep = new TeragrepBloomStep(this.zplnConfig, mode, inputCol, outputCol, estimateCol);

if (mode == TeragrepBloomStep.BloomMode.CREATE || mode == TeragrepBloomStep.BloomMode.UPDATE) {
// create aggregate step to run before bloom create and bloom update
TeragrepBloomStep aggregateStep = new TeragrepBloomStep(
this.zplnConfig,
@Override
public Node visitT_bloomOptionParameter(final DPLParser.T_bloomOptionParameterContext ctx) {
// values from context
final ContextValue<TeragrepBloomStep.BloomMode> mode = new ModeFromBloomContext(ctx);
final ContextValue<String> inputCol = new InputColumnFromBloomContext(ctx);
final ContextValue<String> outputCol = new OutputColumnFromBloomContext(ctx, inputCol.value());
final ContextValue<String> estimateCol = new EstimateColumnFromBloomContext(ctx, inputCol.value());
final ContextValue<String> tableName = new TableNameFromBloomContext(ctx);
final ContextValue<String> regex = new RegexValueFromBloomContext(ctx);

final Node rv;
if (mode.value() == TeragrepBloomStep.BloomMode.CREATE || mode.value() == TeragrepBloomStep.BloomMode.UPDATE) {
// create an aggregate step to run before bloom create and bloom update
final TeragrepBloomStep aggregateStep = new TeragrepBloomStep(
zplnConfig,
TeragrepBloomStep.BloomMode.AGGREGATE,
inputCol,
outputCol,
estimateCol
tableName.value(),
regex.value(),
inputCol.value(),
outputCol.value(),
estimateCol.value()
);

return new StepListNode(Arrays.asList(aggregateStep, bloomStep));
// Create a step with table and regex parameters needed (create|update)
final TeragrepBloomStep bloomStepWithRegexAndTable = new TeragrepBloomStep(
zplnConfig,
mode.value(),
tableName.value(),
regex.value(),
inputCol.value(),
outputCol.value(),
estimateCol.value()
);
rv = new StepListNode(Arrays.asList(aggregateStep, bloomStepWithRegexAndTable));
} else {
final TeragrepBloomStep bloomStep = new TeragrepBloomStep(
this.zplnConfig,
mode.value(),
inputCol.value(),
outputCol.value(),
estimateCol.value()
);
rv = new StepNode(bloomStep);
}

return new StepNode(bloomStep);
return rv;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Teragrep Data Processing Language (DPL) translator for Apache Spark (pth_10)
* Copyright (C) 2019-2024 Suomen Kanuuna Oy
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*
* Additional permission under GNU Affero General Public License version 3
* section 7
*
* If you modify this Program, or any covered work, by linking or combining it
* with other code, such other code is not for that reason alone subject to any
* of the requirements of the GNU Affero GPL version 3 as long as this Program
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
* modifications.
*
* Supplemented terms under GNU Affero General Public License version 3
* section 7
*
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
* versions must be marked as "Modified version of" The Program.
*
* Names of the licensors and authors may not be used for publicity purposes.
*
* No rights are granted for use of trade names, trademarks, or service marks
* which are in The Program if any.
*
* Licensee must indemnify licensors and authors for any liability that these
* contractual assumptions impose on licensors and authors.
*
* To the extent this program is licensed as part of the Commercial versions of
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/
package com.teragrep.pth10.ast.commands.transformstatement.teragrep;

import com.teragrep.pth10.ast.ContextValue;
import com.teragrep.pth10.ast.TextString;
import com.teragrep.pth10.ast.UnquotedText;
import com.teragrep.pth_03.antlr.DPLParser;

public final class EstimateColumnFromBloomContext implements ContextValue<String> {

private final DPLParser.T_bloomOptionParameterContext ctx;
private final String inputCol;

public EstimateColumnFromBloomContext(final DPLParser.T_bloomOptionParameterContext ctx, final String inputCol) {
this.ctx = ctx;
this.inputCol = inputCol;
}

public String value() {
final String value;
if (ctx.t_estimatesParameter() != null) {
value = new UnquotedText(
new TextString(ctx.t_estimatesParameter().fieldType().getText())
).read();
}
else {
value = String.format("estimate(%s)", inputCol);
}
return value;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Teragrep Data Processing Language (DPL) translator for Apache Spark (pth_10)
* Copyright (C) 2019-2024 Suomen Kanuuna Oy
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*
* Additional permission under GNU Affero General Public License version 3
* section 7
*
* If you modify this Program, or any covered work, by linking or combining it
* with other code, such other code is not for that reason alone subject to any
* of the requirements of the GNU Affero GPL version 3 as long as this Program
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
* modifications.
*
* Supplemented terms under GNU Affero General Public License version 3
* section 7
*
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
* versions must be marked as "Modified version of" The Program.
*
* Names of the licensors and authors may not be used for publicity purposes.
*
* No rights are granted for use of trade names, trademarks, or service marks
* which are in The Program if any.
*
* Licensee must indemnify licensors and authors for any liability that these
* contractual assumptions impose on licensors and authors.
*
* To the extent this program is licensed as part of the Commercial versions of
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/
package com.teragrep.pth10.ast.commands.transformstatement.teragrep;

import com.teragrep.pth10.ast.ContextValue;
import com.teragrep.pth10.ast.TextString;
import com.teragrep.pth10.ast.UnquotedText;
import com.teragrep.pth_03.antlr.DPLParser;

public final class InputColumnFromBloomContext implements ContextValue<String> {

private final DPLParser.T_bloomOptionParameterContext ctx;

public InputColumnFromBloomContext(DPLParser.T_bloomOptionParameterContext ctx) {
this.ctx = ctx;
}

public String value() {
final String value;
if (ctx.t_inputParameter() != null) {
value = new UnquotedText(
new TextString(ctx.t_inputParameter().fieldType().getText())
).read();
}
else {
value = "tokens";
}
return value;
}
}
Loading

0 comments on commit ec971ed

Please sign in to comment.