Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faiss to Parquet Conversion #2631

Merged
122 changes: 122 additions & 0 deletions src/main/python/parquet/faiss_to_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import os
import argparse
import logging
import shutil
import faiss
import pandas as pd


def setup_logging():
logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(message)s", level=logging.INFO
)

def read_docid_file(docid_path):
"""
Reads the docid file and returns a list of document IDs.
"""
try:
with open(docid_path, 'r') as f:
docids = [line.strip() for line in f]
logging.info(f"Read {len(docids)} docids from {docid_path}")
return docids
except Exception as e:
logging.error(f"Failed to read docid file {docid_path}: {e}")
raise RuntimeError(f"Failed to read docid file {docid_path}: {e}")


def read_faiss_index(index_path):
"""
Reads a FAISS index file and returns a numpy array of vectors.
"""
try:
index = faiss.read_index(index_path)
vectors = index.reconstruct_n(0, index.ntotal)
logging.info(f"Read {vectors.shape[0]} vectors from {index_path}")
return vectors
except Exception as e:
logging.error(f"Failed to read FAISS index file {index_path}: {e}")
raise RuntimeError(f"Failed to read FAISS index file {index_path}: {e}")


def write_to_parquet_in_chunks(df, output_dir, rows_per_chunk=10**6):
"""
Writes the DataFrame to Parquet files in chunks of specified size.
"""
# Write DataFrame to Parquet in chunks
for i in range(0, len(df), rows_per_chunk):
chunk = df.iloc[i:i + rows_per_chunk]
chunk_file = os.path.join(output_dir, f'chunk_{i//rows_per_chunk}.parquet')
try:
chunk.to_parquet(chunk_file, index=False)
logging.info(f"Successfully wrote chunk to {chunk_file}")
except Exception as e:
logging.error(f"Failed to write chunk to Parquet file {chunk_file}: {e}")
raise RuntimeError(f"Failed to write chunk to Parquet file {chunk_file}: {e}")


def convert_faiss_to_parquet(input_dir, output_dir, overwrite):
"""
Converts FAISS index files in the input directory to Parquet files in the output directory.
"""
# Ensure the input directory contains the necessary files
docid_path = os.path.join(input_dir, 'docid')
index_path = os.path.join(input_dir, 'index')

if not os.path.isfile(docid_path) or not os.path.isfile(index_path):
raise FileNotFoundError("Both 'docid' and 'index' files must be present in the input directory.")

# Set up the output directory
if os.path.exists(output_dir):
if overwrite:
shutil.rmtree(output_dir)
os.makedirs(output_dir)
else:
raise FileExistsError(f"Output directory '{output_dir}' already exists. Use --overwrite to replace it.")
else:
os.makedirs(output_dir)

# Read docids and vectors
docids = read_docid_file(docid_path)
vectors = read_faiss_index(index_path)

# Check if the number of docids matches the number of vectors
if len(docids) != vectors.shape[0]:
error_message = "The number of docids does not match the number of vectors."
logging.error(error_message)
raise ValueError(error_message)

df = pd.DataFrame({
'docid': docids,
'vector': vectors.tolist() # Convert vectors to a list format
})

# Write DataFrame to Parquet in chunks
write_to_parquet_in_chunks(df, output_dir)

if __name__ == "__main__":
setup_logging()

parser = argparse.ArgumentParser(
description="Convert FAISS index files to Parquet format in chunks."
)
parser.add_argument(
"--input", required=True, help="Input directory containing 'docid' and 'index' files."
)
parser.add_argument(
"--output", required=True, help="Output directory where the Parquet files will be saved."
)
parser.add_argument(
"--overwrite",
action="store_true",
default=False,
help="Overwrite the output directory if it already exists.",
)
args = parser.parse_args()

try:
# Convert FAISS index data to Parquet in chunks
convert_faiss_to_parquet(args.input, args.output, args.overwrite)
except Exception as e:
logging.error(f"Script failed: {e}")

57 changes: 57 additions & 0 deletions src/main/python/parquet/get_faiss_indexes.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/bin/bash

# Create the collections folder if it doesn't exist
mkdir -p collections

# Array of URLs to download
urls=(
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-arguana.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-bioasq.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-climate-fever.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-cqadupstack-android.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-cqadupstack-english.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-cqadupstack-gaming.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-cqadupstack-gis.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-cqadupstack-mathematica.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-cqadupstack-physics.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-cqadupstack-programmers.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-cqadupstack-stats.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-cqadupstack-tex.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-cqadupstack-unix.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-cqadupstack-webmasters.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-cqadupstack-wordpress.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-dbpedia-entity.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-fever.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-fiqa.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-hotpotqa.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-nfcorpus.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-nq.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-quora.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-robust04.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-scifact.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-scidocs.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-signal1m.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-trec-covid.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-trec-news.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-webis-touche2020.bge-base-en-v1.5.20240107.tar.gz"
)

# Change to the collections directory
cd collections

# Download each file using wget in parallel
for url in "${urls[@]}"; do
wget "$url" &
done

# Wait for all downloads to complete
wait

# Extract each tar.gz file in parallel
for file in *.tar.gz; do
tar -xvzf "$file" &
done

# Wait for all extractions to complete
wait

2 changes: 1 addition & 1 deletion src/main/python/parquet/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ python-dateutil==2.9.0.post0
pytz==2024.1
six==1.16.0
tqdm==4.66.5
tzdata==2024.1
tzdata==2024.1
91 changes: 91 additions & 0 deletions src/main/python/parquet/run_conversions.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#!/bin/bash

# Default base directory
DEFAULT_BASE_DIR="collections/faiss/"
BASE_DIR=${1:-$DEFAULT_BASE_DIR}

# Shift the arguments if a base directory is passed
if [ "$BASE_DIR" != "$DEFAULT_BASE_DIR" ]; then
shift
fi

# Check if the --all flag is passed
if [ "$1" == "--all" ]; then
# Get all subdirectories in the base directory
SUBDIRS=("$BASE_DIR"/*/)
else
# If no --all flag, use the provided arguments as subdirectory names
if [ $# -eq 0 ]; then
echo "No subdirectories specified. Exiting."
exit 1
fi

# Convert the passed arguments to subdirectory paths
SUBDIRS=()
for SUBDIR_NAME in "$@"; do
SUBDIR_PATH="$BASE_DIR/$SUBDIR_NAME"
if [ -d "$SUBDIR_PATH" ]; then
SUBDIRS+=("$SUBDIR_PATH/")
else
echo "Subdirectory $SUBDIR_PATH does not exist. Skipping."
fi
done

# If no valid subdirectories were provided, exit
if [ ${#SUBDIRS[@]} -eq 0 ]; then
echo "No valid subdirectories provided. Exiting."
exit 1
fi
fi

# Loop through each specified subdirectory (or all subdirectories if --all was passed)
for SUBDIR in "${SUBDIRS[@]}"; do
if [ -d "$SUBDIR" ]; then
(
echo "Processing $SUBDIR"

PARQUET_DIR="${SUBDIR%/}.faiss-parquet"
SUBDIR_NAME=$(basename "$SUBDIR")
INDEX_NAME="indexes/faiss-parquet/$SUBDIR_NAME"
RUNS_FILE="runs/${SUBDIR_NAME}_faiss_parquet.txt"
EVAL_FILE="runs/${SUBDIR_NAME}_faiss_parquet_evals.txt"

# Convert to Parquet
python src/main/python/parquet/faiss_to_parquet.py --input "$SUBDIR" --output "$PARQUET_DIR" --overwrite

# Index Parquet data
bin/run.sh io.anserini.index.IndexFlatDenseVectors \
-threads 16 \
-collection ParquetDenseVectorCollection \
-input "$PARQUET_DIR" \
-generator ParquetDenseVectorDocumentGenerator \
-index "$INDEX_NAME" \
>&"logs/debug-log.beir-v1.0.0-${SUBDIR_NAME}.bge-base-en-v1.5"

# Search on the indexed data
bin/run.sh io.anserini.search.SearchFlatDenseVectors \
-index "$INDEX_NAME" \
-topics "tools/topics-and-qrels/topics.beir-v1.0.0-${SUBDIR_NAME}.test.bge-base-en-v1.5.jsonl.gz" \
-topicReader JsonStringVector \
-output "$RUNS_FILE" \
-hits 1000 -removeQuery -threads 16
echo "Running evaluations for $SUBDIR_NAME"
{
bin/trec_eval -c -m ndcg_cut.10 "tools/topics-and-qrels/qrels.beir-v1.0.0-${SUBDIR_NAME}.test.txt" "$RUNS_FILE"
bin/trec_eval -c -m recall.100 "tools/topics-and-qrels/qrels.beir-v1.0.0-${SUBDIR_NAME}.test.txt" "$RUNS_FILE"
bin/trec_eval -c -m recall.1000 "tools/topics-and-qrels/qrels.beir-v1.0.0-${SUBDIR_NAME}.test.txt" "$RUNS_FILE"
} >"$EVAL_FILE"

# Check if the commands were successful
if [ $? -eq 0 ]; then
echo "Successfully processed $SUBDIR"
else
echo "Failed to process $SUBDIR"
fi
) &
fi
done

wait

echo "All specified subdirectories processed."
Loading