-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmt_to_cuking_inputs.py
71 lines (61 loc) · 2.39 KB
/
mt_to_cuking_inputs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#!/usr/bin/env python3
"""
Converts a Hail MatrixTable to the Parquet format suitable for cuKING.
It is assumed that the table only contains biallelics, which is the case
for gnomAD QC MTs.
"""
import argparse
import hail as hl
import json
def mt_to_cuking_inputs(mt: hl.MatrixTable, parquet_uri: str, overwrite: bool) -> None:
# Compute the field required for KING.
mt = mt.select_entries(n_alt_alleles=mt.GT.n_alt_alleles())
# Create a table based on entries alone. By adding a row and column index, we can
# drop the locus, alleles, and sample field, as well as skip writing missing
# entries.
mt = mt.select_globals()
mt = mt.select_rows()
mt = mt.select_cols()
mt = mt.add_row_index()
mt = mt.add_col_index()
entries = mt.entries()
entries = entries.key_by()
entries = entries.select(entries.row_idx, entries.col_idx, entries.n_alt_alleles)
# Export one compressed Parquet file per partition.
entry_write = entries.to_spark().write.option('compression', 'zstd')
if overwrite:
entry_write = entry_write.mode('overwrite')
entry_write.parquet(parquet_uri)
# The Parquet files only contain `col_idx` instead of the sample name `s`, to reduce
# storage requirements. Save a dict from `col_idx` to `s`, so cuKING can map back to
# sample names for its results. By collecting both `col_idx` and `s` simultaneously,
# we don't rely on a particular order being returned by `collect`.
col_idx_mapping = hl.struct(col_idx=mt.col_idx, s=mt.s).collect()
col_idx_mapping.sort(key=lambda e: e.col_idx)
metadata = {
'num_sites': mt.count_rows(),
'samples': [e.s for e in col_idx_mapping],
}
with hl.utils.hadoop_open(f'{parquet_uri}/metadata.json', 'w') as f:
json.dump(metadata, f)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--mt-uri',
help='Input URI for the Hail MT',
required=True,
)
parser.add_argument(
'--parquet-uri',
help='Output URI for the Parquet files',
required=True,
)
parser.add_argument(
'--overwrite',
help='Overwrite output files',
action='store_true',
)
args = parser.parse_args()
hl.init(default_reference='GRCh38')
mt = hl.read_matrix_table(args.mt_uri)
mt_to_cuking_inputs(mt=mt, parquet_uri=args.parquet_uri, overwrite=args.overwrite)