Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PPL command implementation for appendCol #990

Open
wants to merge 40 commits into
base: main
Choose a base branch
from

Conversation

andy-k-improving
Copy link
Contributor

@andy-k-improving andy-k-improving commented Dec 11, 2024

Description

Introduce the new PPL command appendCol which aim to aggregate result from multiple searches into a single comprehensive table for user to view.

This is accomplished by reading both main-search and sub-search in the form of node then transform it into the following of SQL with by adding _row_number_ for the dataset's natural order, then join both main and sub search with the _row_number_ column.

select t1.*, t2.* 

FROM (
     SELECT *, row_number() over (order by '1') as row_org 
     FROM employees) as t1

 LEFT JOIN (
     SELECT *, row_number() over (order by '1') as row_app 
     FROM employees) as t2 

ON t1.row_org = t2.row_app;

Related Issues

Resolves: #956

Check List

  • Updated documentation (docs/ppl-lang/README.md)
  • Implemented unit tests
  • Implemented tests for combination with other commands
  • New added source code should include a copyright header
  • Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Test plan:

# Produce the artifact
sbt clean sparkPPLCosmetic/publishM2

# Start Spark with the plugin
bin/spark-sql --jars "/ABSOLUTE_PATH_TO_ARTIFACT/opensearch-spark-ppl_2.12-0.6.0-SNAPSHOT.jar" \
--conf "spark.sql.extensions=org.opensearch.flint.spark.FlintPPLSparkExtensions"  \
--conf "spark.sql.catalog.dev=org.apache.spark.opensearch.catalog.OpenSearchCatalog" \
--conf "spark.hadoop.hive.cli.print.header=true"

# Insert test table and data
CREATE TABLE employees (name STRING, dept STRING, salary INT, age INT, con STRING);

INSERT INTO employees VALUES ("Lisa", "Sales------", 10000, 35, 'test');
INSERT INTO employees VALUES ("Evan", "Sales------", 32000, 38, 'test');
INSERT INTO employees VALUES ("Fred", "Engineering", 21000, 28, 'test');
INSERT INTO employees VALUES ("Alex", "Sales", 30000, 33, 'test');
INSERT INTO employees VALUES ("Tom", "Engineering", 23000, 33, 'test');
INSERT INTO employees VALUES ("Jane", "Marketing", 29000, 28, 'test');
INSERT INTO employees VALUES ("Jeff", "Marketing", 35000, 38, 'test');
INSERT INTO employees VALUES ("Paul", "Engineering", 29000, 23, 'test');
INSERT INTO employees VALUES ("Chloe", "Engineering", 23000, 25, 'test');

# Append one sub-search:

source=employees | FIELDS name, dept, salary | APPENDCOL  [ stats count() as event_count];

name	dept	salary	event_count
Lisa	Sales------	10000	9
Fred	Engineering	21000	NULL
Paul	Engineering	29000	NULL
Evan	Sales------	32000	NULL
Chloe	Engineering	23000	NULL
Tom	Engineering	23000	NULL
Alex	Sales	30000	NULL
Jane	Marketing	29000	NULL
Jeff	Marketing	35000	NULL


# Append multiple sub-searches:

source=employees | FIELDS name, dept, salary | APPENDCOL  [ stats count() as event_count] | APPENDCOL [stats avg(age) as avg_age];

name	dept	salary	event_count	avg_age
Lisa	Sales------	10000	9	31.22222222222222
Fred	Engineering	21000	NULL	NULL
Paul	Engineering	29000	NULL	NULL
Evan	Sales------	32000	NULL	NULL
Chloe	Engineering	23000	NULL	NULL
Tom	Engineering	23000	NULL	NULL
Alex	Sales	30000	NULL	NULL
Jane	Marketing	29000	NULL	NULL
Jeff	Marketing	35000	NULL	NULL



# With override option (`salary` column from the main-search is being dropped and replaced by the `salary` column over the sub-search)

source=employees | FIELDS name, dept, salary | APPENDCOL OVERRIDE=true [stats avg(salary) as salary];

name	dept	salary
Lisa	Sales------	25777.777777777777
Fred	Engineering	NULL
Paul	Engineering	NULL
Evan	Sales------	NULL
Chloe	Engineering	NULL
Tom	Engineering	NULL
Alex	Sales	NULL
Jane	Marketing	NULL
Jeff	Marketing	NULL


Signed-off-by: Andy Kwok <[email protected]>
Signed-off-by: Andy Kwok <[email protected]>
Signed-off-by: Andy Kwok <[email protected]>
Signed-off-by: Andy Kwok <[email protected]>
Signed-off-by: Andy Kwok <[email protected]>
Signed-off-by: Andy Kwok <[email protected]>
Signed-off-by: Andy Kwok <[email protected]>
Signed-off-by: Andy Kwok <[email protected]>
Signed-off-by: Andy Kwok <[email protected]>
Signed-off-by: Andy Kwok <[email protected]>
Signed-off-by: Andy Kwok <[email protected]>
Signed-off-by: Andy Kwok <[email protected]>
Signed-off-by: Andy Kwok <[email protected]>
Signed-off-by: Andy Kwok <[email protected]>
Signed-off-by: Andy Kwok <[email protected]>
Signed-off-by: Andy Kwok <[email protected]>
Signed-off-by: Andy Kwok <[email protected]>
Signed-off-by: Andy Kwok <[email protected]>
Signed-off-by: Andy Kwok <[email protected]>
Signed-off-by: Andy Kwok <[email protected]>
Signed-off-by: Andy Kwok <[email protected]>
Signed-off-by: Andy Kwok <[email protected]>
Signed-off-by: Andy Kwok <[email protected]>
Signed-off-by: Andy Kwok <[email protected]>
Signed-off-by: Andy Kwok <[email protected]>
Signed-off-by: Andy Kwok <[email protected]>
Signed-off-by: Andy Kwok <[email protected]>
Signed-off-by: Andy Kwok <[email protected]>
Signed-off-by: Andy Kwok <[email protected]>
@andy-k-improving andy-k-improving changed the title DRAFT: PPL command appendCol implementaion PPL command implementation for appendCol Dec 16, 2024
@andy-k-improving andy-k-improving marked this pull request as ready for review December 16, 2024 22:45
Signed-off-by: Andy Kwok <[email protected]>
static List<Expression> getoverridedlist(LogicalPlan lp, String tableName) {
// When override option present, extract fields to project from sub-search,
// then apply a dfDropColumns on main-search to avoid duplicate fields.
final SparkSession sparkSession = SparkSession.getActiveSession().get();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question to @LantaoJin : is there a more generic way to extract the table's attributes list ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LantaoJin Would you mind to point me the specific doc for this purpose, I have looked up the implementation and the doc for eval but seems all examples are rely on the user to supply the name of the columns to override.

However in this particular case (AppendCol), I need to first figure out the exact fields which sub-query will project (Including resolve select * to actual columns name), and only then I can take that list and apply a dropDFColumn( ) on the main query.

Copy link
Member

@LantaoJin LantaoJin Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However in this particular case (AppendCol), I need to first figure out the exact fields which sub-query will project (Including resolve select * to actual columns name).

I see, but IMO, we don't need to figure out all column names (overridedList) as long as we restrict the case of select *:

  1. For cases with fields or aliases, use the fields or aliases directly, for example
source=employees | FIELDS name, dept, age | APPENDCOL OVERRIDE=true [ ... | FIELDS foo, bar, age ];

Adding foo, bar, age to fieldsToRemove.

source=employees | FIELDS name, dept, age | APPENDCOL OVERRIDE=true [ stats avg(age) as age, count(name) as cnt ];

Adding age, cnt to fieldsToRemove. (note: no harmful to add a non-conflicting field to DataFrameDropColumns)

  1. For cases with allFields. throw an exception APPENDCOL should specify the output fields:
source=employees | FIELDS name, dept, age | APPENDCOL OVERRIDE=true [ where age > 0 ];

Signed-off-by: Andy Kwok <[email protected]>
Signed-off-by: Andy Kwok <[email protected]>
Signed-off-by: Andy Kwok <[email protected]>
@LantaoJin
Copy link
Member

LantaoJin commented Dec 18, 2024

Two high level questions:

  1. appendCol command syntax is
    APPENDCOL <override=?> [sub-search]...

And the sub-search syntax is

subSearch
: searchCommand (PIPE commands)*
;

Seems this PR doesn't follow the sub-search syntax.
I prefer to follow the current sub-search syntax, in case we could combine columns from different tables. for examples:
source=employees | FIELDS name, dept, salary | APPENDCOL [ search source = company | stats count() as event_count ]
But if this is intentional (appendcol only works for one table), I am okey for current syntax.

  1. why the result of query source=employees | FIELDS name, dept, salary | APPENDCOL [ stats count() as event_count]
    is
name	dept	salary	event_count
Lisa	Sales------	10000	9
Fred	Engineering	21000	NULL
Paul	Engineering	29000	NULL
Evan	Sales------	32000	NULL
Chloe	Engineering	23000	NULL
Tom	Engineering	23000	NULL
Alex	Sales	30000	NULL
Jane	Marketing	29000	NULL
Jeff	Marketing	35000	NULL 

instead of

name	dept	salary	event_count
Lisa	Sales------	10000	9
Fred	Engineering	21000	9
Paul	Engineering	29000	9
Evan	Sales------	32000	9
Chloe	Engineering	23000	9
Tom	Engineering	23000	9
Alex	Sales	30000	9
Jane	Marketing	29000	9
Jeff	Marketing	35000	9

PS, what is the expected result of query source=employees | stats sum(salary) as total_salary by dept | appendcol [ stats avg(age) as avg_age by dept ]?

Comment on lines +291 to +295
LogicalPlan joinedQuery = join(
mainSearchWithRowNumber, subSearchWithRowNumber,
Join.JoinType.LEFT,
Optional.of(new EqualTo(t1Attr, t2Attr)),
new Join.JoinHint());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I know why you got NULL in the example.
I think this is not what we expected. How about use inner and cross join together.
For example:

  1. with same group-by key, convert it to join key in inner
source=employees | stats sum(salary) as total_salary by dept | appendcol [  stats avg(age) as avg_age by dept ]
  1. without group-by key, use a cross join
source=employees | stats sum(salary) as total_salary by dept | appendcol [  stats avg(age) as avg_age ]

The implementation equals to

def appendCols(mainSearchDF: DataFrame, subSearchDF: DataFrame, joinKey: Option[String] = None): DataFrame = {
  joinKey match {
    case Some(key) =>
      // If a join key is provided, perform a join
      mainSearchDF.join(subSearchDF, Seq(key), "inner")
    case None =>
      // If no join key is provided, assume a global aggregation and use a cross join
      mainSearchDF.crossJoin(subSearchDF)
  }
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LantaoJin thanks for reviewing and feedback !
AFAIK the appendcol is intended for a single index

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YANG-DB , @LantaoJin the comment's seems to contradict what we have discussed over #956 and the Splunk doc, hence would that be possible to clarify?

Join condition

By referring to the prior discussion on Github issue and the Spunk doc, I don't think that is the case, as user won't be asked to enter the neither the join column or the join condition, hence it's up the code logic to generate the natural_oder for the dataFrame and use it to join, Can I confirm this?

I'm asking this because the above comment seems to indicate user will provide a the name of column for the join (Both inner || cross) join, but that is not the case the for the method signature || grammar of appenCol( )

Expected result

Accordingly the Splunk doc

Appends the fields of the [subsearch](https://docs.splunk.com/Splexicon:Subsearch) results with the input search results. 
All fields of the subsearch are combined into the current results, 
with the exception of [internal fields](https://docs.splunk.com/Splexicon:Internalfield). 
For example, the first subsearch result is merged with the first main result, 
the second subsearch result is merged with the second main result, and so on.

Both main and the sub are simply being joined row by row from top the bottom, 1:1 without explicit joining condition, and in the case of row.length( ) different between two dataFrame, null will be used to fill the gap, and this seems to align with described on Github issue.

From Github issue:

Behavior
The new column(s) would be aligned with the rows of the original dataset based on their order of appearance.
Each appended column must produce the same number of rows as the base dataset to ensure proper alignment. 
Any discrepancies in row counts could result in null values for mismatched rows.

Hence I don't think neither cross and inner join will do the same, as rows will be truncated in both scenario.
And I reckon this is what make appendCol as standalone implementation, or else user can simply achieved by existing subQuery or LookUp command?

Copy link
Member

@LantaoJin LantaoJin Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me clarify it.

I'm asking this because the above comment seems to indicate user will provide a the name of column for the join (Both inner || cross) join, but that is not the case the for the method signature || grammar of appenCol( )

No, my suggestion doesn't ask user to provide a name of column for the join. My point is about the result of current implementation seems incorrect IMO.
Query

source=employees | FIELDS name, dept, salary | APPENDCOL  [ stats count() as event_count]

should outcome

name	dept	salary	event_count
Lisa	Sales------	10000	9
Fred	Engineering	21000	9
Paul	Engineering	29000	9
Evan	Sales------	32000	9
Chloe	Engineering	23000	9
Tom	Engineering	23000	9
Alex	Sales	30000	9
Jane	Marketing	29000	9
Jeff	Marketing	35000	9

rather than

name	dept	salary	event_count
Lisa	Sales------	10000	9
Fred	Engineering	21000	NULL
Paul	Engineering	29000	NULL
Evan	Sales------	32000	NULL
Chloe	Engineering	23000	NULL
Tom	Engineering	23000	NULL
Alex	Sales	30000	NULL
Jane	Marketing	29000	NULL
Jeff	Marketing	35000	NULL

Similar, here are two examples:
Q1: output rows are unmatched:

source=employees | stats sum(salary) as total_salary by dept | appendcol [  stats count() as cnt ]

Will outcome

dept	total_salary	cnt
Sales	         72000	9
Engineering	96000	9
Marketing	64000	9

Rather that

dept	total_salary	cnt
Sales	         72000	9
Engineering	96000	NULL
Marketing	64000	NULL

Q2: output rows are matched:

source=employees | stats sum(salary) as total_salary by dept | appendcol [  stats count() as cnt by cnt_dept ]

Will outcome

dept	total_salary	cnt_dept
Sales	         72000	3
Engineering	96000	4
Marketing	64000	2

What are the outputs of above queries in your implementation?

For the Q1, my suggestion is implementing by crossJoin (no join key required)
For the Q2, my suggestion is implementing by innerJoin with group-by keys as join keys.

@andy-k-improving
Copy link
Contributor Author

Two high level questions:

  1. appendCol command syntax is
    APPENDCOL <override=?> [sub-search]...

And the sub-search syntax is

subSearch
: searchCommand (PIPE commands)*
;

Seems this PR doesn't follow the sub-search syntax.
I prefer to follow the current sub-search syntax, in case we could combine columns from different tables. for examples:
source=employees | FIELDS name, dept, salary | APPENDCOL [ search source = company | stats count() as event_count ]
But if this is intentional (appendcol only works for one table), I am okey for current syntax.
2. why the result of query source=employees | FIELDS name, dept, salary | APPENDCOL [ stats count() as event_count]
is

name	dept	salary	event_count
Lisa	Sales------	10000	9
Fred	Engineering	21000	NULL
Paul	Engineering	29000	NULL
Evan	Sales------	32000	NULL
Chloe	Engineering	23000	NULL
Tom	Engineering	23000	NULL
Alex	Sales	30000	NULL
Jane	Marketing	29000	NULL
Jeff	Marketing	35000	NULL 

instead of

name	dept	salary	event_count
Lisa	Sales------	10000	9
Fred	Engineering	21000	9
Paul	Engineering	29000	9
Evan	Sales------	32000	9
Chloe	Engineering	23000	9
Tom	Engineering	23000	9
Alex	Sales	30000	9
Jane	Marketing	29000	9
Jeff	Marketing	35000	9

PS, what is the expected result of query source=employees | stats sum(salary) as total_salary by dept | appendcol [ stats avg(age) as avg_age by dept ]?

Yep, the sub-search under appendcol is restricted to use the same dataSource as the main PPL command, for the expected result, we can probably discuss it on the other thread, in order to centralise the convo.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[PPL-Lang]add appendcol ppl command
3 participants