Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support latitude/longitude matching #83

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,40 @@ Or
Delimiting character of the input CSV file (default: ,)
* `-h`, `--help` show help message and exit

## Field Definitions & Types

When providing a JSON config file, you can include definitions for each field that tell csvdedupe what _type_ of data the field contains.

A basic definition for a single field looks like this:

{"field": "name", "type": "String"}

The dedupe docs include a [handy reference of all types and what they mean](https://dedupe.io/developers/library/en/latest/Variable-definition.html).

### Latitude & Longitude

When working with geographic points, it's necessary to provide field defintions that match the format of your data. Otherwise, csvdedupe may treat your lat/long fields as text and match in an unexpected way.

* For **one field containing both latitude and longitude**, specify `LatLong` or `LongLat` depending on the order. This expects a field with the coordinates separated by some (any) non-numeric characters, so formats like `-122.23,46.42`, `-122.23, 46.42`, and `-122.23 46.42` will all work.

{
"field_names": ["coordinates"],
"field_definition" : [{"field" : "coordinates", "type" : "LatLong"}],
...more config
}


* For **latitude and longitude in separate fields**, include separate `Latitude` and `Longitude` definitions. Internally, these are squished together into a single `LatLong` field, so you'll see them as `__LatLong` when training.

{
"field_names": ["lat", "lng"],
"field_definition" : [
{"field" : "lat", "type" : "Latitude"},
{"field" : "lng", "type" : "Longitude"}
],
...more config
}

## Training

The _secret sauce_ of csvdedupe is human input. In order to figure out the best rules to deduplicate a set of data, you must give it a set of labeled examples to learn from.
Expand Down
23 changes: 15 additions & 8 deletions csvdedupe/csvdedupe.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def __init__(self):
# We need to get control of STDIN again.
# This is a UNIX/Mac OSX solution only
# http://stackoverflow.com/questions/7141331/pipe-input-to-python-program-and-later-get-input-from-user
#
#
# Same question has a Windows solution
sys.stdin = open('/dev/tty') # Unix only solution,
else:
Expand All @@ -48,7 +48,7 @@ def __init__(self):
except KeyError:
raise self.parser.error("You must provide field_names")
else :
self.field_names = [self.field_def['field']
self.field_names = [field_def['field']
for field_def in self.field_definition]

self.destructive = self.configuration.get('destructive', False)
Expand All @@ -66,20 +66,26 @@ def main(self):
data_d = {}
# import the specified CSV file

data_d = csvhelpers.readData(self.input, self.field_names, delimiter=self.delimiter)
data_d = csvhelpers.readData(input_file=self.input,
field_names=self.field_names,
field_definition=self.field_definition,
delimiter=self.delimiter)

internal_field_definition = csvhelpers.transformLatLongFieldDefinition(
self.field_definition)

logging.info('imported %d rows', len(data_d))

# sanity check for provided field names in CSV file
for field in self.field_definition:
for field in internal_field_definition:
if field['type'] != 'Interaction':
if not field['field'] in data_d[0]:

raise self.parser.error("Could not find field '" +
field['field'] + "' in input")

logging.info('using fields: %s' % [field['field']
for field in self.field_definition])
for field in internal_field_definition])

# If --skip_training has been selected, and we have a settings cache still
# persisting from the last run, use it in this next run.
Expand All @@ -94,10 +100,10 @@ def main(self):

fields = {variable.field for variable in deduper.data_model.primary_fields}
unique_d, parents = exact_matches(data_d, fields)

else:
# # Create a new deduper object and pass our data model to it.
deduper = dedupe.Dedupe(self.field_definition)
deduper = dedupe.Dedupe(internal_field_definition)

fields = {variable.field for variable in deduper.data_model.primary_fields}
unique_d, parents = exact_matches(data_d, fields)
Expand All @@ -115,7 +121,7 @@ def main(self):

# ## Clustering

# Find the threshold that will maximize a weighted average of our precision and recall.
# Find the threshold that will maximize a weighted average of our precision and recall.
# When we set the recall weight to 2, we are saying we care twice as much
# about recall as we do precision.
#
Expand Down Expand Up @@ -161,6 +167,7 @@ def main(self):
else :
write_function(clustered_dupes, self.input, sys.stdout)


def exact_matches(data_d, match_fields):
unique = {}
redundant = {}
Expand Down
125 changes: 108 additions & 17 deletions csvdedupe/csvhelpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import re
import collections
import logging
from copy import deepcopy
from io import StringIO, open
import sys
import platform
Expand All @@ -23,7 +24,7 @@

def preProcess(column):
"""
Do a little bit of data cleaning. Things like casing, extra spaces,
Do a little bit of data cleaning. Things like casing, extra spaces,
quotes and new lines are ignored.
"""
column = re.sub(' +', ' ', column)
Expand All @@ -34,10 +35,10 @@ def preProcess(column):
return column


def readData(input_file, field_names, delimiter=',', prefix=None):
def readData(input_file, field_names, field_definition=None, delimiter=',', prefix=None):
"""
Read in our data from a CSV file and create a dictionary of records,
where the key is a unique record ID and each value is a dict
Read in our data from a CSV file and create a dictionary of records,
where the key is a unique record ID and each value is a dict
of the row fields.

**Currently, dedupe depends upon records' unique ids being integers
Expand All @@ -46,23 +47,113 @@ def readData(input_file, field_names, delimiter=',', prefix=None):
"""

data = {}
reader = csv.DictReader(StringIO(input_file),delimiter=delimiter)

reader = csv.DictReader(StringIO(input_file), delimiter=delimiter)
for i, row in enumerate(reader):
clean_row = {k: preProcess(v) for (k, v) in row.items() if k is not None}
if prefix:
row_id = u"%s|%s" % (prefix, i)
else:
row_id = i
data[row_id] = clean_row
row_id = u"%s|%s" % (prefix, i) if prefix else i
data[row_id] = parseLatLongFields(
{k: preProcess(v) for (k, v) in row.items() if k is not None},
field_definition)

return data


def parseLatLongFields(row, field_definition):
"""
Parse any fields defined as LatLong, LongLat, Latitude, or Longitude
in the field definition into the format that dedupe expects them.

LongLat and LatLong fields can be separated by any non-number character(s),
for example: `-122,46`, `-122, 46`, `-122 46`, `-122;46`.

Latitude and Longitude fields are removed from the row and converted to
a single LatLong column that we pass to dedupe.
"""

if not field_definition:
return row

has_latitude_field = False
has_longitude_field = False

latitude = None
longitude = None

for field in field_definition:
# Both lat and long in the same field, in either order
if field['type'] == 'LatLong' or field['type'] == 'LongLat':
try:
# Split the field by anything other than the characters
# we'd expect to find in a number (0-9, -, .)
row[field['field']] = tuple([
float(l) for l in
re.split(r'[^-.0-9]+', row[field['field']])])

# Flip the order if LongLat was specified
if field['type'] == 'LongLat':
row[field['field']] = row[field['field']][::-1]

except ValueError: # Thrown if float() fails
row[field['field']] = None

elif field['type'] == 'Latitude':
has_latitude_field = True
if field['field'] in row:
try:
latitude = float(row.pop(field['field']))
except ValueError:
latitude = None

elif field['type'] == 'Longitude' and field['field'] in row:
has_longitude_field = True
if field['field'] in row:
try:
longitude = float(row.pop(field['field']))
except ValueError:
longitude = None

if has_latitude_field and has_longitude_field:
if latitude and longitude:
row['__LatLong'] = (latitude, longitude)
else:
row['__LatLong'] = None

return row


def transformLatLongFieldDefinition(field_definition):
"""
Converts the custom LongLat, Latitude, and Longitude field types
used in the csvdedupe config to the single LatLong field type that
dedupe itself expects.

Works in concert with csvhelpers.parseLatLongFields()
"""

converted_defs = deepcopy(field_definition)
lat_long_indicies = []

for i, field in enumerate(converted_defs):
if field['type'] == 'LongLat':
field['type'] = 'LatLong'
elif field['type'] == 'Latitude' or field['type'] == 'Longitude':
lat_long_indicies.append(i)

if len(lat_long_indicies) == 2:
for i in sorted(lat_long_indicies, reverse=True):
del converted_defs[i]

converted_defs.append({
'field': '__LatLong',
'type': 'LatLong'})

return converted_defs


# ## Writing results
def writeResults(clustered_dupes, input_file, output_file):

# Write our original data back out to a CSV with a new column called
# Write our original data back out to a CSV with a new column called
# 'Cluster ID' which indicates which records refer to each other.

logging.info('saving results to: %s' % output_file)
Expand Down Expand Up @@ -95,7 +186,7 @@ def writeResults(clustered_dupes, input_file, output_file):
# ## Writing results
def writeUniqueResults(clustered_dupes, input_file, output_file):

# Write our original data back out to a CSV with a new column called
# Write our original data back out to a CSV with a new column called
# 'Cluster ID' which indicates which records refer to each other.

logging.info('saving unique results to: %s' % output_file)
Expand Down Expand Up @@ -232,13 +323,13 @@ def _common_args(self) :
help='CSV file to store deduplication results')
self.parser.add_argument('--skip_training', action='store_true',
help='Skip labeling examples by user and read training from training_files only')
self.parser.add_argument('--training_file', type=str,
self.parser.add_argument('--training_file', type=str,
help='Path to a new or existing file consisting of labeled training examples')
self.parser.add_argument('--settings_file', type=str,
help='Path to a new or existing file consisting of learned training settings')
self.parser.add_argument('--sample_size', type=int,
self.parser.add_argument('--sample_size', type=int,
help='Number of random sample pairs to train off of')
self.parser.add_argument('--recall_weight', type=int,
self.parser.add_argument('--recall_weight', type=int,
help='Threshold that will maximize a weighted average of our precision and recall')
self.parser.add_argument('-d', '--delimiter', type=str,
help='Delimiting character of the input CSV file', default=',')
Expand Down
35 changes: 21 additions & 14 deletions csvdedupe/csvlink.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,19 @@ def main(self):
data_2 = {}
# import the specified CSV file

data_1 = csvhelpers.readData(self.input_1, self.field_names_1,
delimiter=self.delimiter,
prefix='input_1')
data_2 = csvhelpers.readData(self.input_2, self.field_names_2,
delimiter=self.delimiter,
prefix='input_2')
data_1 = csvhelpers.readData(input_file=self.input_1,
field_names=self.field_names_1,
field_definition=self.field_definition,
delimiter=self.delimiter,
prefix='input_1')
data_2 = csvhelpers.readData(input_file=self.input_2,
field_names=self.field_names_2,
field_definition=self.field_definition,
delimiter=self.delimiter,
prefix='input_2')

internal_field_definition = csvhelpers.transformLatLongFieldDefinition(
self.field_definition)

# sanity check for provided field names in CSV file
for field in self.field_names_1:
Expand All @@ -100,7 +107,7 @@ def main(self):
logging.info('imported %d rows from file 2', len(data_2))

logging.info('using fields: %s' % [field['field']
for field in self.field_definition])
for field in internal_field_definition])

# If --skip_training has been selected, and we have a settings cache still
# persisting from the last run, use it in this next run.
Expand All @@ -112,17 +119,17 @@ def main(self):
% self.settings_file)
with open(self.settings_file, 'rb') as f:
deduper = dedupe.StaticRecordLink(f)


fields = {variable.field for variable in deduper.data_model.primary_fields}
(nonexact_1,
nonexact_2,
exact_pairs) = exact_matches(data_1, data_2, fields)


else:
# # Create a new deduper object and pass our data model to it.
deduper = dedupe.RecordLink(self.field_definition)
deduper = dedupe.RecordLink(internal_field_definition)

fields = {variable.field for variable in deduper.data_model.primary_fields}
(nonexact_1,
Expand All @@ -142,7 +149,7 @@ def main(self):

# ## Clustering

# Find the threshold that will maximize a weighted average of our precision and recall.
# Find the threshold that will maximize a weighted average of our precision and recall.
# When we set the recall weight to 2, we are saying we care twice as much
# about recall as we do precision.
#
Expand Down Expand Up @@ -189,7 +196,7 @@ def exact_matches(data_1, data_2, match_fields):

for key, record in data_1.items():
record_hash = hash(tuple(record[f] for f in match_fields))
redundant[record_hash] = key
redundant[record_hash] = key

for key_2, record in data_2.items():
record_hash = hash(tuple(record[f] for f in match_fields))
Expand All @@ -202,7 +209,7 @@ def exact_matches(data_1, data_2, match_fields):

for key_1 in redundant.values():
nonexact_1[key_1] = data_1[key_1]

return nonexact_1, nonexact_2, exact_pairs

def launch_new_instance():
Expand Down