-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdata.py
executable file
·200 lines (171 loc) · 7.21 KB
/
data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# inbuilt lib imports:
from collections import Counter
from typing import List, Dict, Tuple, Any
import json
import os
import zipfile
# external lib imports:
import numpy as np
from tqdm import tqdm
import spacy
nlp = spacy.load("en_core_web_sm", disable = ['ner', 'tagger', 'parser', 'textcat'])
def read_instances(data_file_path: str,
max_allowed_num_tokens: int = 150) -> List[Dict]:
"""
Reads raw classification dataset from a file and returns a list
of dicts where each dict defines an instance.
Parameters
----------
data_file_path : ``str``
Path to data to be read.
max_allowed_num_tokens : ``int``
Maximum number of tokens allowed in the classification instance.
"""
instances = []
with open(data_file_path) as file:
for line in tqdm(file.readlines()):
instance = json.loads(line.strip())
text = instance["text"]
tokens = [token.text.lower() for token in nlp.tokenizer(text)][:max_allowed_num_tokens]
instance["labels"] = instance.pop("label", None)
instance["text_tokens"] = tokens
instance.pop("text")
instances.append(instance)
return instances
def build_vocabulary(instances: List[Dict],
vocab_size: 10000,
add_tokens: List[str] = None) -> Tuple[Dict, Dict]:
"""
Given the instances and max vocab size, this function builds the
token to index and index to token vocabularies. If list of add_tokens are
passed, those words will be added first.
Parameters
----------
instances : ``List[Dict]``
List of instance returned by read_instances from which we want
to build the vocabulary.
vocab_size : ``int``
Maximum size of vocabulary
add_tokens : ``List[str]``
if passed, those words will be added to vocabulary first.
"""
print("\nBuilding Vocabulary.")
# make sure pad_token is on index 0
UNK_TOKEN = "@UNK@"
PAD_TOKEN = "@PAD@"
token_to_id = {PAD_TOKEN: 0, UNK_TOKEN: 1}
# First add tokens which were explicitly passed.
add_tokens = add_tokens or []
for token in add_tokens:
if not token.lower() in token_to_id:
token_to_id[token] = len(token_to_id)
# Add remaining tokens from the instances as the space permits
words = []
for instance in instances:
words.extend(instance["text_tokens"])
token_counts = dict(Counter(words).most_common(vocab_size))
for token, _ in token_counts.items():
if token not in token_to_id:
token_to_id[token] = len(token_to_id)
if len(token_to_id) == vocab_size:
break
# Make reverse vocabulary lookup
id_to_token = dict(zip(token_to_id.values(), token_to_id.keys()))
return (token_to_id, id_to_token)
def save_vocabulary(vocab_id_to_token: Dict[int, str], vocabulary_path: str) -> None:
"""
Saves vocabulary to vocabulary_path.
"""
with open(vocabulary_path, "w") as file:
# line number is the index of the token
for idx in range(len(vocab_id_to_token)):
file.write(vocab_id_to_token[idx] + "\n")
def load_vocabulary(vocabulary_path: str) -> Tuple[Dict[str, int], Dict[int, str]]:
"""
Loads vocabulary from vocabulary_path.
"""
vocab_id_to_token = {}
vocab_token_to_id = {}
with open(vocabulary_path, "r") as file:
for index, token in enumerate(file):
token = token.strip()
if not token:
continue
vocab_id_to_token[index] = token
vocab_token_to_id[token] = index
return (vocab_token_to_id, vocab_id_to_token)
def load_glove_embeddings(embeddings_txt_file: str,
embedding_dim: int,
vocab_id_to_token: Dict[int, str]) -> np.ndarray:
"""
Given a vocabulary (mapping from index to token), this function builds
an embedding matrix of vocabulary size in which ith row vector is an
entry from pretrained embeddings (loaded from embeddings_txt_file).
"""
tokens_to_keep = set(vocab_id_to_token.values())
vocab_size = len(vocab_id_to_token)
embeddings = {}
print("\nReading pretrained embedding file.")
with open(embeddings_txt_file) as file:
for line in tqdm(file):
line = str(line).strip()
token = line.split(' ', 1)[0]
if not token in tokens_to_keep:
continue
fields = line.rstrip().split(' ')
if len(fields) - 1 != embedding_dim:
raise Exception(f"Pretrained embedding vector and expected "
f"embedding_dim do not match for {token}.")
continue
vector = np.asarray(fields[1:], dtype='float32')
embeddings[token] = vector
# Estimate mean and std variation in embeddings and initialize it random normally with it
all_embeddings = np.asarray(list(embeddings.values()))
embeddings_mean = float(np.mean(all_embeddings))
embeddings_std = float(np.std(all_embeddings))
embedding_matrix = np.random.normal(embeddings_mean, embeddings_std,
(vocab_size, embedding_dim))
embedding_matrix = np.asarray(embedding_matrix, dtype='float32')
for idx, token in vocab_id_to_token.items():
if token in embeddings:
embedding_matrix[idx] = embeddings[token]
return embedding_matrix
def index_instances(instances: List[Dict], token_to_id: Dict) -> List[Dict]:
"""
Uses the vocabulary to index the fields of the instances. This function
prepares the instances to be tensorized.
"""
for instance in instances:
token_ids = []
for token in instance["text_tokens"]:
if token in token_to_id:
token_ids.append(token_to_id[token])
else:
token_ids.append(0) # 0 is index for UNK
instance["text_tokens_ids"] = token_ids
instance.pop("text_tokens")
return instances
def generate_batches(instances: List[Dict], batch_size) -> List[Dict[str, np.ndarray]]:
"""
Generates and returns batch of tensorized instances in a chunk of batch_size.
"""
def chunk(items: List[Any], num: int):
return [items[index:index+num] for index in range(0, len(items), num)]
batches_of_instances = chunk(instances, batch_size)
batches = []
for batch_of_instances in tqdm(batches_of_instances):
num_token_ids = [len(instance["text_tokens_ids"])
for instance in batch_of_instances]
max_num_token_ids = max(num_token_ids)
count = min(batch_size, len(batch_of_instances))
batch = {"inputs": np.zeros((count, max_num_token_ids), dtype=np.int32)}
if "labels" in batch_of_instances[0]:
batch["labels"] = np.zeros(count, dtype=np.int32)
for batch_index, instance in enumerate(batch_of_instances):
num_tokens = len(instance["text_tokens_ids"])
inputs = np.array(instance["text_tokens_ids"])
batch["inputs"][batch_index][:num_tokens] = inputs
if "labels" in instance:
batch["labels"][batch_index] = np.array(instance["labels"])
batches.append(batch)
return batches