Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Enhance Spark JSON table for semi-structured log analytics #535

Open
dai-chen opened this issue Aug 7, 2024 · 0 comments
Open
Labels

Comments

@dai-chen
Copy link
Collaborator

dai-chen commented Aug 7, 2024

Is your feature request related to a problem?

Users face several challenges when working with semi-structured data, particularly log data in JSON format that is not standard AWS logs (e.g., CloudTrail, WAF). The current workflow requires users to create a Spark table prior to querying or creating a Flint index, making it difficult to perform ad-hoc analysis with SparkSQL or advanced analytics with OpenSearch DSL and Dashboard.

The main challenges include:

  1. Strict Schema Definition Required: Users must define a clear schema for tables before querying. Although Spark data sources can infer schemas, this process often requires scanning or sampling the source data, which can be time-consuming and inefficient.

  2. Limitations in Schema Evolution: Although Spark provides an ALTER TABLE statement to modify the schema, this process is manual and comes with limitations, especially for V1 data source tables. This can hinder users' ability to adapt to evolving data structures.

  3. New Columns Inaccessible in Query: Users cannot access data for columns that are not explicitly defined in the table schema until they can alter the table to include the new columns.

What solution would you like? [TBD]

Solution 1: Schema-less

A Schema-less solution would enable users to analyze semi-structured log data without the need for predefined schemas. This approach allows users to run queries directly against the raw JSON data, providing flexibility and ease of use. This is similar as what CloudWatch Log Insights provide today.

  • Pros
    • No Background Jobs Required: Users can perform ad-hoc queries without the need for background streaming jobs or data preprocessing, streamlining the workflow and reducing setup time. [Compared with solution 2, 4]
    • Increased Flexibility: Users can immediately adapt to changing data structures and explore data without being constrained by a rigid schema. [Compared with solution 2, 4]
  • Cons
    • Lack of Strict Query Validation: The absence of schema validation may lead to runtime errors or unexpected results, as queries could reference non-existent fields or have incorrect data types.
    • Integration Challenges with BI Tools: Many business intelligence tools rely on well-defined schemas for efficient querying and data visualization, which may complicate integration with a schema-less approach.
    • Performance Overhead: Parsing JSON at runtime for each query can introduce performance overhead, particularly for large datasets, as the system must interpret the data structure dynamically for each request. [Compared with solution 2]

User Experience

Using PartiQL test dataset shown in the appendix section, users first create Spark JSON table without a schema by utilizing the new schemaless option:

TBD: How to specify partitioned columns

CREATE TABLE employeesNest
USING json
OPTIONS (
  path = 's3://test/partiql',
  schemaless = true
);

Query using JSON query path and Lateral Join in ANSI SQL standard (alternatively PartiQL can be introduced, e.g. by partiql-scribe):

SELECT
  e.name AS employeeName,
  p.name AS projectName
FROM
 employeesNest AS e
LATERAL VIEW explode(e.projects) AS p
WHERE
  p.name LIKE '%security%';

The table can also be used in a Flint index or materialized view statement, such as:

CREATE INDEX name_and_projects ON employees
( name, projects )
WITH (auto_refresh = true);

High-Level Implementation Overview

Behind the scene, the Spark table is created with a single column of user-defined type (UDT) JSON that represents the entire row (could be new Flint table format in future):

TBD: enhance JSON format because TEXT cannot support option such as multi-line

CREATE TABLE employeesNest (
  message STRING
)
USING text
OPTIONS (
  path 's3://test/partiql/'
)
...

Queries can then utilize Spark's built-in JSON functions to operate on the JSON column effectively:

SELECT
  get_json_object(e.message, '$.name') AS employeeName,
  get_json_object(p, '$.name') AS projectName
FROM
  employeesNest AS e
LATERAL VIEW explode(
  from_json(get_json_object(e.message, '$.projects'), 'array')
) AS p
WHERE
  get_json_object(p, '$.name') LIKE '%security%';
-------------------------------------
employeeName	projectName
Bob Smith	AWS Redshift security
Bob Smith	AWS Aurora security
Jane Smith	AWS Redshift security

CREATE INDEX name_and_projects ON employees (
  get_json_object(e.message, '$.name'),
  get_json_object(e.message, '$.projects')
)
...;

(Appendix with comprehensive details below)

What alternatives have you considered?

Solution 2: Schema-on-Write by Flint

Maintain a Flint covering index behind the table and delegate table schema management to the covering index in OpenSearch.

  • Cons
    • Lack of Inflexibility: This solution may be limiting for users who only wish to perform direct queries instead of analytics in OpenSearch.
    • User Experience Complexity
      • Lack of Clarity: Users may find it challenging to understand the relationship between the Spark table schema and the covering index mapping.
      • Syntax Complexity: Merging the covering index definition into the CREATE TABLE statement increases complexity, making it harder for users to manage.
CREATE TABLE myglue.default.alb_logs
USING json
LOCATION 's3://test/alb_logs'
OPTIONS (
  index_from = '2024-08-01',
  index_options = (
    auto_refresh = true,
    refresh_interval = '1 minute',
    checkpoint_location = 's3://test/checkpoint'
  )
);

# Refresh for incremental/full
REFRESH TABLE myglue.default.alb_logs;

DROP TABLE myglue.default.alb_logs;

Solution 3: Schema-on-Read

Implement a schema-on-read approach similar to Spark DataFrames, where the schema is inferred at query time. This allows for dynamic parsing and planning of the data based on the structure of the incoming data. For example, similar as Delta table read_files function, we can sample the dataset using the user query with LIMIT clause.

  • Cons
    • Performance Overhead
      Inferring the schema at runtime may introduce latency, especially for large datasets, which can slow down query execution.

Solution 4: Automatic Schema Management

Establish a dedicated background job to automatically detect and evolve the schema (could be the same one as skipping index refresh job).

... provides a managed service for building and managing your data lakehouse. Onehouse handles schema evolution as data is ingested into the lakehouse, so schema changes in your upstream sources don’t disrupt the delivery of fresh, high-quality data to the lakehouse.
... can automatically detect these new entities and materialize them as analytics-ready tables in your data lakehouse.

Solution 5: Hybrid (Adaptive/Incremental Schema Management)

Implement a hybrid approach that combines the advantages of both Schema-less (Solution 1) and Schema-on-Read (Solution 3). In this model, users start with a schema-less table without upfront schema definitions. As queries are executed, the schema inferred from the query results will be applied to alter the table.

  • Cons
    • Performance Overhead: Inferring and altering the schema during query execution may introduce performance overhead, especially for larger datasets or complex queries.
    • Dependency on Query Patterns: The effectiveness of this approach depends on the queries executed; if users do not frequently run queries that require schema inference, the benefits may not be fully realized.

Do you have any additional context?

Consider the following example using a PartiQL test dataset:

$ cat hr.employeesNest.1.json
{
  "id": 3, "name": "Bob Smith", "title": null,
  "projects": [
    {"name": "AWS Redshift Spectrum querying"},
    {"name": "AWS Redshift security"},
    {"name": "AWS Aurora security"}
  ]
}
{
  "id": 4, "name": "Susan Smith", "title": "Dev Mgr",
  "projects": []
}
{
  "id": 6, "name": "Jane Smith", "title": "Software Eng 2",
  "projects": [{"name": "AWS Redshift security"}]
}

# Another file with new column age and size added after table creation
$ cat hr.employeesNest.2.json
{
  "id": 7, "name": "Allen Hank", "title": "Software Eng 3",
  "projects": [{"name": "AWS OpenSearch", "size": 100}],
  "age": 30
}

Here are the limitations identified during testing with Spark SQL:

spark-sql>
         > CREATE EXTERNAL TABLE employeesNest
         > USING json
         > OPTIONS (
         >   path 's3://test/partiql/'
         > );

spark-sql> DESC employeesNest;
id                  	bigint
name                	string
projects            	array>
title               	string

spark-sql> SELECT
         >   e.name AS employeeName,
         >   p.name AS projectName
         > FROM
         >   employeesNest AS e
         > LATERAL VIEW explode(e.projects) AS p
         > WHERE
         >   p.name LIKE '%security%';
Bob Smith	AWS Redshift security
Bob Smith	AWS Aurora security
Jane Smith	AWS Redshift security

# Table schema remain the same after hr.employeesNest.2.json uploaded

# New column age at top level can be added to table schema
spark-sql> ALTER TABLE ds_tables.employeesNest ADD COLUMNS (age INT);

spark-sql> DESC ds_tables.employeesNest;
col_name	data_type	comment
id                  	bigint
name                	string
projects            	array>
title               	string
age                 	int

spark-sql> SELECT * FROM ds_tables.employeesNest;
id	name	projects	title	age
3	Bob Smith	[{"name":"AWS Redshift Spectrum querying"},{"name":"AWS Redshift security"},{"name":"AWS Aurora security"}]	NULL	NULL
4	Susan Smith	[]	Dev Mgr	NULL
6	Jane Smith	[{"name":"AWS Redshift security"}]	Software Eng 2	NULL
7	Allen Hank	[{"name":"AWS OpenSearch"}]	Software Eng 3	30

# Failed to add new nested column size
spark-sql> ALTER TABLE ds_tables.employeesNest ADD COLUMNS (projects.size INT);
Error in query: Field name `projects`.`size` is invalid: `projects` is not a struct.; line 1 pos 49
spark-sql> ALTER TABLE ds_tables.employeesNest
         > REPLACE COLUMNS (
         >     id BIGINT,
         >     name STRING,
         >     projects ARRAY>,  -- Update the struct to include the 'size' field
         >     title STRING,
         >     age INT
         > );
Error in query: REPLACE COLUMNS is only supported with v2 tables.

#No way to access the new column size absent in schema
spark-sql> SELECT to_json(struct(*)) FROM ds_tables.employeesNest;
to_json(struct(id, name, projects, title, age))
{"id":3,"name":"Bob Smith","projects":[{"name":"AWS Redshift Spectrum querying"},{"name":"AWS Redshift security"},{"name":"AWS Aurora security"}]}
{"id":4,"name":"Susan Smith","projects":[],"title":"Dev Mgr"}
{"id":6,"name":"Jane Smith","projects":[{"name":"AWS Redshift security"}],"title":"Software Eng 2"}
{"id":7,"name":"Allen Hank","projects":[{"name":"AWS OpenSearch"}],"title":"Software Eng 3","age":30}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant