-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtext_alignment_solution.py
214 lines (183 loc) · 7.79 KB
/
text_alignment_solution.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
#!/usr/bin/env python
""" Plagiarism detection for near-duplicate plagiarism.
This program provides the baseline for the PAN 2013 Plagiarism Detection
task and can be used as outline for a plagiarism detection program.
"""
__author__ = 'Arnd Oberlaender'
__email__ = 'arnd.oberlaender at uni-weimar dot de'
__version__ = '1.1'
import os
import string
import sys
import xml.dom.minidom
import codecs
import pickle
# Const
# =====
DELETECHARS = ''.join([string.punctuation, string.whitespace])
LENGTH = 50
# Helper functions
# ================
""" The following functions are some simple helper functions you can utilize
and modify to fit your own program.
"""
def tokenize(text, length):
""" Tokenize a given text and return a dict containing all start and end
positions for each token.
Characters defined in the global string DELETECHARS will be ignored.
Keyword arguments:
text -- the text to tokenize
length -- the length of each token
"""
tokens = {}
token = []
for i in range(0, len(text)):
if text[i] not in DELETECHARS:
token.append((i, text[i]))
if len(token) == length:
ngram = ''.join([x[1].lower() for x in token])
if ngram not in tokens:
tokens[ngram] = []
tokens[ngram].append((token[0][0], token[-1][0]))
token = token[1:]
return tokens
def serialize_features(susp, src, features, outdir):
""" Serialze a feature list into a xml file.
The xml is structured as described in the readme file of the
PAN plagiarism corpus 2012. The filename will follow the naming scheme
{susp}-{src}.xml and is located in the current directory.
Existing files will be overwritten.
Keyword arguments:
susp -- the filename of the suspicious document
src -- the filename of the source document
features -- a list containing feature-tuples of the form
((start_pos_susp, end_pos_susp),
(start_pos_src, end_pos_src))
"""
impl = xml.dom.minidom.getDOMImplementation()
doc = impl.createDocument(None, 'document', None)
root = doc.documentElement
root.setAttribute('reference', susp)
doc.createElement('feature')
for f in features:
feature = doc.createElement('feature')
feature.setAttribute('name', 'detected-plagiarism')
feature.setAttribute('this_offset', str(f[1][0]))
feature.setAttribute('this_length', str(f[1][1] - f[1][0]))
feature.setAttribute('source_reference', src)
feature.setAttribute('source_offset', str(f[0][0]))
feature.setAttribute('source_length', str(f[0][1] - f[0][0]))
root.appendChild(feature)
doc.writexml(open(outdir + susp.split('.')[0] + '-'
+ src.split('.')[0] + '.xml', 'w'),
encoding='utf-8')
# Plagiarism pipeline
# ===================
""" The following class implement a very basic baseline comparison, which
aims at near duplicate plagiarism. It is only intended to show a simple
pipeline your plagiarism detector can follow.
Replace the single steps with your implementation to get started.
"""
class Baseline:
def __init__(self, detections, susp, src, outdir):
self.susp = susp
self.src = src
self.susp_file = os.path.split(susp)[1]
self.src_file = os.path.split(src)[1]
self.susp_id = os.path.splitext(susp)[0]
self.src_id = os.path.splitext(src)[0]
self.output = self.susp_id + '-' + self.src_id + '.xml'
self.detections = detections
self.outdir=outdir
def process(self):
""" Process the plagiarism pipeline. """
# if not os.path.exists(self.output):
# ...
#self.preprocess()
#self.detections = [] #self.compare()
self.postprocess()
def preprocess(self):
""" Preprocess the suspicious and source document. """
# TODO: Implement your preprocessing steps here.
susp_fp = codecs.open(self.susp, 'r', 'utf-8')
self.susp_text = susp_fp.read()
self.tokens = tokenize(self.susp_text, LENGTH)
susp_fp.close()
src_fp = codecs.open(self.src, 'r', 'utf-8')
self.src_text = src_fp.read()
src_fp.close()
def compare(self):
""" Test a suspicious document for near-duplicate plagiarism with regards to
a source document and return a feature list.
"""
#TODO: Implement your comparison here and replace the following
# algorithm with your own.
detections = []
skipto = -1
token = []
for i in range(0, len(self.src_text)):
if i > skipto:
if self.src_text[i] not in DELETECHARS:
token.append((i, self.src_text[i]))
if len(token) == LENGTH:
ngram = ''.join([x[1].lower() for x in token])
if ngram in self.tokens:
d = ((token[0][0],token[-1][0]),
(self.tokens[ngram][0][0],
self.tokens[ngram][0][1]))
for t in self.tokens[ngram]:
start_src = token[0][0]
start_susp = t[0]
while (start_susp < len(self.susp_text) and
start_src < len(self.src_text) and
self.src_text[start_src] == self.susp_text[start_susp]):
start_susp = start_susp + 1
start_src = start_src + 1
while (start_susp < len(self.susp_text) and
self.susp_text[start_susp] in DELETECHARS):
start_susp = start_susp + 1
while (start_src < len(self.src_text) and
self.src_text[start_src] in DELETECHARS):
start_src = start_src + 1
if (start_src - 1) - token[0][0] > d[0][1] - d[0][0]:
d = ((token[0][0], start_src), (t[0], start_susp))
detections.append(d)
skipto = d[0][1]
if skipto < len(self.src_text):
token = [(skipto, self.src_text[skipto])]
else:
break
else:
token = token[1:]
print(len(detections))
return detections
def postprocess(self):
""" Postprocess the results. """
# TODO: Implement your postprocessing steps here.
serialize_features(self.susp_file, self.src_file, self.detections, self.outdir)
# Main
# ====
if __name__ == "__main__":
""" Process the commandline arguments. We expect three arguments: The path
pointing to the pairs file and the paths pointing to the directories where
the actual source and suspicious documents are located.
"""
if len(sys.argv) == 6:
pairs = sys.argv[1]
srcdir = sys.argv[2]
suspdir = sys.argv[3]
outdir = sys.argv[4]
detdir = sys.argv[5]
if outdir[-1] != "/":
outdir+="/"
detections = pickle.load(open(detdir, "rb"))
lines = open(pairs, 'r').readlines()
for detection, line in zip(detections, lines):
susp, src = line.split()
baseline = Baseline(detection,
os.path.join(suspdir, susp),
os.path.join(srcdir, src), outdir)
baseline.process()
else:
print('\n'.join(["Unexpected number of commandline arguments.",
"Usage: ./pan12-plagiarism-text-alignment-example.py {pairs} {src-dir} {susp-dir} {out-dir}"]))