Skip to content

Commit

Permalink
Cache mem optimize (ExpDev07#23)
Browse files Browse the repository at this point in the history
* formatting (120 length -> 100)
* csbs to/from Redis
* nyt to/from Redis
* ignore locustfile
* unused coordinates
  • Loading branch information
Kilo59 authored May 9, 2020
1 parent e54724e commit 170eb12
Show file tree
Hide file tree
Showing 17 changed files with 194 additions and 116 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ htmlcov/
nosetests.xml
coverage.xml
*,cover
locustfile.py

# Translations
*.mo
Expand Down
6 changes: 5 additions & 1 deletion app/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
from ..services.location.nyt import NYTLocationService

# Mapping of services to data-sources.
DATA_SOURCES = {"jhu": JhuLocationService(), "csbs": CSBSLocationService(), "nyt": NYTLocationService()}
DATA_SOURCES = {
"jhu": JhuLocationService(),
"csbs": CSBSLocationService(),
"nyt": NYTLocationService(),
}


def data_source(source):
Expand Down
13 changes: 11 additions & 2 deletions app/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@


def save(
name: str, content: Union[str, Dict, List], write_mode: str = "w", indent: int = 2, **json_dumps_kwargs
name: str,
content: Union[str, Dict, List],
write_mode: str = "w",
indent: int = 2,
**json_dumps_kwargs,
) -> pathlib.Path:
"""Save content to a file. If content is a dictionary, use json.dumps()."""
path = DATA / name
Expand All @@ -35,7 +39,12 @@ class AIO:

@classmethod
async def save(
cls, name: str, content: Union[str, Dict, List], write_mode: str = "w", indent: int = 2, **json_dumps_kwargs
cls,
name: str,
content: Union[str, Dict, List],
write_mode: str = "w",
indent: int = 2,
**json_dumps_kwargs,
):
"""Save content to a file. If content is a dictionary, use json.dumps()."""
path = DATA / name
Expand Down
8 changes: 6 additions & 2 deletions app/location/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class Location: # pylint: disable=too-many-instance-attributes
"""

def __init__(
self, id, country, province, coordinates, last_updated, confirmed, deaths, recovered
self, id, country, province, coordinates, last_updated, confirmed, deaths, recovered,
): # pylint: disable=too-many-arguments
# General info.
self.id = id
Expand Down Expand Up @@ -66,7 +66,11 @@ def serialize(self):
# Last updated.
"last_updated": self.last_updated,
# Latest data (statistics).
"latest": {"confirmed": self.confirmed, "deaths": self.deaths, "recovered": self.recovered},
"latest": {
"confirmed": self.confirmed,
"deaths": self.deaths,
"recovered": self.recovered,
},
}


Expand Down
6 changes: 5 additions & 1 deletion app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@

# Enable CORS.
APP.add_middleware(
CORSMiddleware, allow_credentials=True, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"],
CORSMiddleware,
allow_credentials=True,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
APP.add_middleware(GZipMiddleware, minimum_size=1000)

Expand Down
6 changes: 5 additions & 1 deletion app/routers/v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ async def all_categories():
"deaths": deaths,
"recovered": recovered,
# Latest.
"latest": {"confirmed": confirmed["latest"], "deaths": deaths["latest"], "recovered": recovered["latest"],},
"latest": {
"confirmed": confirmed["latest"],
"deaths": deaths["latest"],
"recovered": recovered["latest"],
},
}


Expand Down
14 changes: 11 additions & 3 deletions app/routers/v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,17 @@ async def get_locations(

# Do filtering.
try:
locations = [location for location in locations if str(getattr(location, key)).lower() == str(value)]
locations = [
location
for location in locations
if str(getattr(location, key)).lower() == str(value)
]
except AttributeError:
pass
if not locations:
raise HTTPException(404, detail=f"Source `{source}` does not have the desired location data.")
raise HTTPException(
404, detail=f"Source `{source}` does not have the desired location data.",
)

# Return final serialized data.
return {
Expand All @@ -84,7 +90,9 @@ async def get_locations(

# pylint: disable=invalid-name
@V2.get("/locations/{id}", response_model=LocationResponse)
async def get_location_by_id(request: Request, id: int, source: Sources = "jhu", timelines: bool = True):
async def get_location_by_id(
request: Request, id: int, source: Sources = "jhu", timelines: bool = True
):
"""
Getting specific location by id.
"""
Expand Down
91 changes: 49 additions & 42 deletions app/services/location/csbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from asyncache import cached
from cachetools import TTLCache

from ...caches import check_cache, load_cache
from ...coordinates import Coordinates
from ...location.csbs import CSBSLocation
from ...utils import httputils
Expand Down Expand Up @@ -34,7 +35,7 @@ async def get(self, loc_id): # pylint: disable=arguments-differ
BASE_URL = "https://facts.csbs.org/covid-19/covid19_county.csv"


@cached(cache=TTLCache(maxsize=1, ttl=3600))
@cached(cache=TTLCache(maxsize=1, ttl=1800))
async def get_locations():
"""
Retrieves county locations; locations are cached for 1 hour
Expand All @@ -44,48 +45,54 @@ async def get_locations():
"""
data_id = "csbs.locations"
LOGGER.info(f"{data_id} Requesting data...")
async with httputils.CLIENT_SESSION.get(BASE_URL) as response:
text = await response.text()

LOGGER.debug(f"{data_id} Data received")

data = list(csv.DictReader(text.splitlines()))
LOGGER.debug(f"{data_id} CSV parsed")

locations = []

for i, item in enumerate(data):
# General info.
state = item["State Name"]
county = item["County Name"]

# Ensure country is specified.
if county in {"Unassigned", "Unknown"}:
continue

# Coordinates.
coordinates = Coordinates(item["Latitude"], item["Longitude"]) # pylint: disable=unused-variable

# Date string without "EDT" at end.
last_update = " ".join(item["Last Update"].split(" ")[0:2])

# Append to locations.
locations.append(
CSBSLocation(
# General info.
i,
state,
county,
# Coordinates.
Coordinates(item["Latitude"], item["Longitude"]),
# Last update (parse as ISO).
datetime.strptime(last_update, "%Y-%m-%d %H:%M").isoformat() + "Z",
# Statistics.
int(item["Confirmed"] or 0),
int(item["Death"] or 0),
# check shared cache
cache_results = await check_cache(data_id)
if cache_results:
LOGGER.info(f"{data_id} using shared cache results")
locations = cache_results
else:
LOGGER.info(f"{data_id} shared cache empty")
async with httputils.CLIENT_SESSION.get(BASE_URL) as response:
text = await response.text()

LOGGER.debug(f"{data_id} Data received")

data = list(csv.DictReader(text.splitlines()))
LOGGER.debug(f"{data_id} CSV parsed")

locations = []

for i, item in enumerate(data):
# General info.
state = item["State Name"]
county = item["County Name"]

# Ensure country is specified.
if county in {"Unassigned", "Unknown"}:
continue

# Date string without "EDT" at end.
last_update = " ".join(item["Last Update"].split(" ")[0:2])

# Append to locations.
locations.append(
CSBSLocation(
# General info.
i,
state,
county,
# Coordinates.
Coordinates(item["Latitude"], item["Longitude"]),
# Last update (parse as ISO).
datetime.strptime(last_update, "%Y-%m-%d %H:%M").isoformat() + "Z",
# Statistics.
int(item["Confirmed"] or 0),
int(item["Death"] or 0),
)
)
)
LOGGER.info(f"{data_id} Data normalized")
LOGGER.info(f"{data_id} Data normalized")
# save the results to distributed cache
await load_cache(data_id, locations)

# Return the locations.
return locations
4 changes: 1 addition & 3 deletions app/services/location/jhu.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ async def get(self, loc_id): # pylint: disable=arguments-differ


# Base URL for fetching category.
BASE_URL = (
"https://raw.githubusercontent.com/CSSEGISandData/2019-nCoV/master/csse_covid_19_data/csse_covid_19_time_series/"
)
BASE_URL = "https://raw.githubusercontent.com/CSSEGISandData/2019-nCoV/master/csse_covid_19_data/csse_covid_19_time_series/"


@cached(cache=TTLCache(maxsize=4, ttl=1800))
Expand Down
110 changes: 60 additions & 50 deletions app/services/location/nyt.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from asyncache import cached
from cachetools import TTLCache

from ...caches import check_cache, load_cache
from ...coordinates import Coordinates
from ...location.nyt import NYTLocation
from ...timeline import Timeline
Expand Down Expand Up @@ -66,7 +67,7 @@ def get_grouped_locations_dict(data):
return grouped_locations


@cached(cache=TTLCache(maxsize=1, ttl=3600))
@cached(cache=TTLCache(maxsize=1, ttl=1800))
async def get_locations():
"""
Returns a list containing parsed NYT data by US county. The data is cached for 1 hour.
Expand All @@ -77,55 +78,64 @@ async def get_locations():
data_id = "nyt.locations"
# Request the data.
LOGGER.info(f"{data_id} Requesting data...")
async with httputils.CLIENT_SESSION.get(BASE_URL) as response:
text = await response.text()

LOGGER.debug(f"{data_id} Data received")

# Parse the CSV.
data = list(csv.DictReader(text.splitlines()))
LOGGER.debug(f"{data_id} CSV parsed")

# Group together locations (NYT data ordered by dates not location).
grouped_locations = get_grouped_locations_dict(data)

# The normalized locations.
locations = []

for idx, (county_state, histories) in enumerate(grouped_locations.items()):
# Make location history for confirmed and deaths from dates.
# List is tuples of (date, amount) in order of increasing dates.
confirmed_list = histories["confirmed"]
confirmed_history = {date: int(amount or 0) for date, amount in confirmed_list}

deaths_list = histories["deaths"]
deaths_history = {date: int(amount or 0) for date, amount in deaths_list}

# Normalize the item and append to locations.
locations.append(
NYTLocation(
id=idx,
state=county_state[1],
county=county_state[0],
coordinates=Coordinates(None, None), # NYT does not provide coordinates
last_updated=datetime.utcnow().isoformat() + "Z", # since last request
timelines={
"confirmed": Timeline(
{
datetime.strptime(date, "%Y-%m-%d").isoformat() + "Z": amount
for date, amount in confirmed_history.items()
}
),
"deaths": Timeline(
{
datetime.strptime(date, "%Y-%m-%d").isoformat() + "Z": amount
for date, amount in deaths_history.items()
}
),
"recovered": Timeline({}),
},
# check shared cache
cache_results = await check_cache(data_id)
if cache_results:
LOGGER.info(f"{data_id} using shared cache results")
locations = cache_results
else:
LOGGER.info(f"{data_id} shared cache empty")
async with httputils.CLIENT_SESSION.get(BASE_URL) as response:
text = await response.text()

LOGGER.debug(f"{data_id} Data received")

# Parse the CSV.
data = list(csv.DictReader(text.splitlines()))
LOGGER.debug(f"{data_id} CSV parsed")

# Group together locations (NYT data ordered by dates not location).
grouped_locations = get_grouped_locations_dict(data)

# The normalized locations.
locations = []

for idx, (county_state, histories) in enumerate(grouped_locations.items()):
# Make location history for confirmed and deaths from dates.
# List is tuples of (date, amount) in order of increasing dates.
confirmed_list = histories["confirmed"]
confirmed_history = {date: int(amount or 0) for date, amount in confirmed_list}

deaths_list = histories["deaths"]
deaths_history = {date: int(amount or 0) for date, amount in deaths_list}

# Normalize the item and append to locations.
locations.append(
NYTLocation(
id=idx,
state=county_state[1],
county=county_state[0],
coordinates=Coordinates(None, None), # NYT does not provide coordinates
last_updated=datetime.utcnow().isoformat() + "Z", # since last request
timelines={
"confirmed": Timeline(
{
datetime.strptime(date, "%Y-%m-%d").isoformat() + "Z": amount
for date, amount in confirmed_history.items()
}
),
"deaths": Timeline(
{
datetime.strptime(date, "%Y-%m-%d").isoformat() + "Z": amount
for date, amount in deaths_history.items()
}
),
"recovered": Timeline({}),
},
)
)
)
LOGGER.info(f"{data_id} Data normalized")
LOGGER.info(f"{data_id} Data normalized")
# save the results to distributed cache
await load_cache(data_id, locations)

return locations
4 changes: 3 additions & 1 deletion app/utils/populations.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ def fetch_populations(save=False):

# Fetch the countries.
try:
countries = requests.get(GEONAMES_URL, params={"username": "dperic"}, timeout=1.25).json()["geonames"]
countries = requests.get(GEONAMES_URL, params={"username": "dperic"}, timeout=1.25).json()[
"geonames"
]
# Go through all the countries and perform the mapping.
for country in countries:
mappings.update({country["countryCode"]: int(country["population"]) or None})
Expand Down
Loading

0 comments on commit 170eb12

Please sign in to comment.