-
Notifications
You must be signed in to change notification settings - Fork 21
/
fetch_ncbi.py
executable file
·133 lines (112 loc) · 4.93 KB
/
fetch_ncbi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#!/usr/bin/env python3
# coding=utf-8
# This file is part of MGnify genome analysis pipeline.
#
# MGnify genome analysis pipeline is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# MGnify genome analysis pipeline is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with MGnify genome analysis pipeline. If not, see <https://www.gnu.org/licenses/>.
import argparse
import logging
import os
import time
import urllib.request as request
import urllib.error as error
from utils import download_fasta
LINKS = 'https://ftp.ncbi.nlm.nih.gov/genomes/genbank/assembly_summary_genbank.txt'
logging.basicConfig(level=logging.INFO)
#Add checksums
def main():
parser = argparse.ArgumentParser(description='Takes a list of accessions and fetches genomes from NCBI')
parser.add_argument('-i', '--infile', required=True, help='A file containing a list of GenBank accessions,'
'one accession per line')
parser.add_argument('-d', '--dir', required=True, help='A path to a local directory where MAGs will be '
'downloaded to')
parser.add_argument('-u', '--unzip', action='store_true', help='Store unzipped fasta files. Default = False')
args = parser.parse_args()
failed = 0
complete = 0
no_url = 0
already_exist = 0
if not os.path.exists(args.dir):
os.makedirs(args.dir)
# save accessions to dictionary
accession_dict = load_accessions(args.infile)
assert len(accession_dict) > 0, 'No accessions were loaded from infile'
# add GenBank file locations as arguments to the accessions dictionary
success = load_locations(accession_dict)
# create a list of already fetched genomes in output directory (if any)
already_fetched = [i.split('.')[0] for i in os.listdir(args.dir)]
if not success:
logging.error('Failed to load the FTP locations')
else:
# go through accessions and download the fasta files
logging.info('Loaded FTP locations, proceeding to download...')
for a in accession_dict:
if not accession_dict[a]:
no_url += 1
else:
if a in already_fetched:
logging.info(f'Skipping {a} because it already exists')
already_exist += 1
else:
logging.info('Downloading {}...'.format(a))
result = download_fasta(accession_dict[a], args.dir, a, args.unzip, '')
if result:
complete += 1
else:
failed += 1
logging.error('Could not download {}'.format(accession_dict[a]))
logging.info('Total number of accessions provided: {}'.format(len(accession_dict)))
logging.info(f'Already pre-fetched: {str(already_exist)}')
logging.info('Successfully downloaded {} files'.format(complete))
logging.info('Download failed for {} files'.format(failed))
logging.info('No URL retrieved for {} files'.format(no_url))
def load_accessions(infile):
accessions = dict()
with open(infile, 'r') as acc_in:
for line in acc_in:
accessions[line.strip().split('.')[0]] = ''
return accessions
def load_locations(accessions):
max_attempts = 6
attempt = 1
sleep_time = 10
flag = False # used to check if any lines were successfully read
while attempt < max_attempts:
logging.info('Downloading FTP locations...')
logging.info('Attempt {}'.format(attempt))
try:
response = request.urlopen(LINKS)
while True:
content = response.readline()
if not content:
break
line = content.decode('utf-8')
if line.startswith('#') or line.isspace():
pass
else:
acc = line.split()[0].split('.')[0]
if acc in accessions:
path = line.split('\t')[19]
file_name = '{}_genomic.fna.gz'.format(path.split('/')[-1])
full_url = '/'.join([path, file_name])
accessions[acc] = full_url
flag = True
break
except error.HTTPError as e:
logging.error(e.reason)
attempt += 1
time.sleep(sleep_time)
if attempt == max_attempts and not flag:
return False
else:
return True
if __name__ == '__main__':
main()