-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathgeocoder.py
153 lines (126 loc) · 4.29 KB
/
geocoder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#!/usr/bin/python
#
# Run addresses or cross-streets through the Google Maps geocoder.
#
# Maintains a cache of previously-geocoded locations and throttles traffic to the Geocoder.
import base64
import xml.dom.minidom
import os
import sys
import time
import urllib
# MapsKey = "ABQIAAAAafDALeUVyxhUndZQcT0BRRQjgiEk1Ut90lZbiCSD8tXKcVgrkBQLYOFQ3xwutc5R9SNzfGaKxMnf7g"
BaseURL = "http://maps.google.com/maps/geo?output=xml&key=%s"
CacheDir = "geocache"
class Location:
"""Provides status, lat, lon, city, zip, accuracy fields"""
def __init__(self, dom):
els = dom.getElementsByTagName("code")
assert els.length == 1
self.status = int(els.item(0).firstChild.nodeValue)
self.lat, self.lon = None, None
# keep it around just in case
# self._dom = dom
# Just take the first place for now
places = dom.getElementsByTagName("Placemark")
if len(places) == 0: return
place = places[0]
# Geocoding accuracy
details = place.getElementsByTagName("AddressDetails")
if len(details) > 0:
self.accuracy = int(details[0].getAttribute("Accuracy"))
# Lat/Lon
coord = place.getElementsByTagName("coordinates")
if len(coord) >= 1:
lon, lat, z = coord[0].firstChild.nodeValue.split(",")
self.lat, self.lon = float(lat), float(lon)
# City
cities = place.getElementsByTagName("LocalityName")
if len(cities) >= 1:
self.city = cities[0].firstChild.nodeValue
# Zip Code
zips = place.getElementsByTagName("PostalCodeNumber")
if len(zips) >= 1:
z = zips[0].firstChild.nodeValue
if len(z) == 10:
self.zipcode = float(z[0:5]) + float(z[6:])/10000
else:
self.zipcode = int(z)
def __str__(self):
if self.lat and self.lon:
return "(%f, %f)" % (self.lat, self.lon)
else:
return "(???)"
class FakeLocation:
def __init__(self, lat, lon, accuracy):
"""constructor for creating Locations not from google maps."""
self.lat = lat
self.lon = lon
self.accuracy = accuracy
self.status = 200
def __str__(self):
if self.lat and self.lon:
return "(%f, %f)" % (self.lat, self.lon)
else:
return "(???)"
def _cache_file(loc):
key = base64.b64encode(loc)[:-2] # minus the trailing '=='
return "%s/%s" % (CacheDir, key)
class Geocoder:
def __init__(self, maps_key, wait_time):
self._api_key = maps_key
self._base_url = BaseURL % self._api_key
self._wait_time = wait_time
self._last_fetch = 0
def _check_cache(self, loc):
"""Returns cached results for the location or None if not available."""
cache_file = _cache_file(loc)
try:
return file(cache_file).read()
except:
return None
def _cache_result(self, loc, result):
cache_file = _cache_file(loc)
file(cache_file, "w").write(result)
def _parse_xml(self, xml_str):
"""Returns a (lat, lon) pair based on XML"""
dom = xml.dom.minidom.parseString(xml_str)
loc = Location(dom)
return loc
def _fetch(self, url):
"""Attempts to fetch the URL. Does rate throttling. Returns XML."""
now = time.time()
diff = now - self._last_fetch
print "now=%f, then=%f, diff=%f vs. %f" % (now, self._last_fetch, diff, self._wait_time)
if diff < self._wait_time:
time.sleep(self._wait_time - diff)
self._last_fetch = time.time()
print "Fetching %s" % url
f = urllib.URLopener().open(url)
return f.read()
def Locate(self, loc, check_cache=True):
"""Returns a Location object based on the loc string or None."""
sf_loc = loc + " San Francisco, CA"
url = "%s&q=%s" % (self._base_url, urllib.quote(sf_loc))
data = None
from_cache = False
if check_cache:
data = self._check_cache(loc)
from_cache = data != None
if not data:
if not self._api_key:
sys.stderr.write('Using fake location for %s\n' % loc)
return FakeLocation(37.784724, -122.407715, 7)
data = self._fetch(url)
location = self._parse_xml(data)
if not from_cache and location:
self._cache_result(loc, data)
return location
def InCache(self, loc):
data = self._check_cache(loc)
return data == None
def LocateFromCache(self, loc):
"""Like Locate, but never goes to the network to get a location."""
data = self._check_cache(loc)
if not data: return None
return self._parse_xml(data)