Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delta table expect a "__index_level_0__" column #1698

Closed
ThomasMargnac opened this issue Oct 3, 2023 · 4 comments
Closed

Delta table expect a "__index_level_0__" column #1698

ThomasMargnac opened this issue Oct 3, 2023 · 4 comments
Labels
bug Something isn't working

Comments

@ThomasMargnac
Copy link

Environment

Delta-rs version: 0.10.2

Binding: Python 3.9.17

Environment: local

  • Cloud provider: None
  • OS: MacOS 12.7
  • Other: Everything is dockerized from Python to Minio

Bug

What happened:

I am trying to pull new data (which contains text) from a delta table in my bucket A, apply some transformations to it (removing urls, removing hashtags, …) and finally load transformed data into a delta table in my bucket B. 
 
The first time I ran this pipeline, it worked perfectly fine. Then I inserted new data in my delta table (bucket A). The second time, it failed and displayed the following error:

Traceback (most recent call last):
  File "//pipelines/silver_pipeline.py", line 23, in <module>
    main()
  File "//pipelines/silver_pipeline.py", line 18, in main
    load_into_delta_lake(
  File "/etl/load/load_into_silver.py", line 24, in load_into_delta_lake
    write_deltalake(
  File "/usr/local/lib/python3.9/site-packages/deltalake/writer.py", line 180, in write_deltalake
    raise ValueError(
ValueError: Schema of data does not match table schema
Table schema:
x: int64
y: string
z: timestamp[us]
__index_level_0__: int64
-- schema metadata --
pandas: '{"index_columns": ["__index_level_0__"], "column_indexes": [{"na' + 1781
Data Schema:
x: int64
y: string
z: timestamp[us]

Apparently, a column named "index_level_0" is required but it is not a column defined by me.

What you expected to happen:

I expected my transformed data to be stored in my delta table (bucket B) without a problem.

How to reproduce it:

Here is my Python script to reproduce it:

from deltalake import DeltaTable
from deltalake.writer import write_deltalake
from datetime import datetime
import pandas as pd
import os

def extract():
    # Getting credentials
    access_id = os.environ.get("MINIO_ID")
    access_secret = os.environ.get("MINIO_SECRET")
    # Defining properties of Data Lakehouse
    storage_options = {
        "AWS_ACCESS_KEY_ID": str(access_id),
        "AWS_SECRET_ACCESS_KEY": str(access_secret),
        "AWS_REGION": "us-east-1",
        "AWS_ENDPOINT_URL": "http://minio:9000",
        "AWS_STORAGE_ALLOW_HTTP": "TRUE",
        "AWS_S3_ALLOW_UNSAFE_RENAME": "TRUE"
    }
    s3_endpoint = "s3://bronze/data"
    # Reading data from Delta Lake bronze layer
    now = datetime.now()
    data = DeltaTable(
        table_uri=s3_endpoint,
        storage_options=storage_options
    ).to_pandas()
    data['z'] = pd.to_datetime(
        arg=data['z'],
        format="%Y-%m-%d %H:%M:%S.%f"
    )
    # Filtering data
    last_bronze_check_file = "last_bronze_check.txt"
    last_bronze_check = None
    # Getting last time bronze was pulled
    try:
        with open(last_bronze_check_file, "r") as file:
            last_bronze_check = datetime.strptime(file.read(), "%Y-%m-%d %H:%M:%S.%f")
    except FileNotFoundError as error:
        print(error)
    if last_bronze_check != None:
        data = data[data['z'] > last_bronze_check].copy()
    # Updating last time data was pulled
    with open(last_bronze_check_file, "w") as file:
        file.write(str(now))
    # Returning data
    return data

def load(
    data: pd.DataFrame
):
    if len(data) > 0:
        # Getting credentials
        access_id = os.environ.get("MINIO_ID")
        access_secret = os.environ.get("MINIO_SECRET")
        # Defining properties of Data Lakehouse
        storage_options = {
            "AWS_ACCESS_KEY_ID": str(access_id),
            "AWS_SECRET_ACCESS_KEY": str(access_secret),
            "AWS_REGION": "us-east-1",
            "AWS_ENDPOINT_URL": "http://minio:9000",
            "AWS_STORAGE_ALLOW_HTTP": "TRUE",
            "AWS_S3_ALLOW_UNSAFE_RENAME": "TRUE"
        }
        # Writing data to Delta Lake
        s3_endpoint = "s3://silver/data"
        write_deltalake(
            table_or_uri=s3_endpoint,
            data=data,
            mode='append',
            storage_options=storage_options
        )

if __name__ == "__main__":
    data = extract()
    load(data)

More details:

  • The main difference between the first run and the second one is that on the second data has been filtered.
  • As far as I understood, this problem could be related to the parquet format.
  • I also read Overwrite mode does not work with Azure #939 where at some point the same issue was encountered but nothing worked.
@ThomasMargnac ThomasMargnac added the bug Something isn't working label Oct 3, 2023
@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Oct 4, 2023

This is because Pandas has a lovely index.... So, the main issue is that on the first write the index column stayed in the data while writing to parquet. What PyArrow version are you using?

I can see pa.Table.from_pandas has preserve_index parameter,

@ThomasMargnac
Copy link
Author

I am using PyArrow 12.0.0

@titowoche30
Copy link

titowoche30 commented Oct 4, 2023

Same issue here. @ion-elgreco suggestion fixed it

data = pa.Table.from_pandas(data, preserve_index=False)

write_deltalake(
            table_or_uri=s3_endpoint,
            data=data,
            mode='append',
            storage_options=storage_options
        )

@ThomasMargnac
Copy link
Author

Thanks a lot @ion-elgreco and @titowoche30, it worked! After I had an issue on the first write related to the data schema but I fixed my data schema in pa.Table.from_pandas and everything works fine.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants