Skip to content
This repository has been archived by the owner on Aug 27, 2022. It is now read-only.

Commit

Permalink
Move
Browse files Browse the repository at this point in the history
  • Loading branch information
ludeeus committed Jan 26, 2020
1 parent 6b9257e commit bc99a2a
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 166 deletions.
4 changes: 4 additions & 0 deletions custom_components/authenticated/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
"""The authenticated component."""


class AuthenticatedBaseException(Exception):
"""Base exception for Authenticated."""
133 changes: 133 additions & 0 deletions custom_components/authenticated/providers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
"""Providers"""
import logging
import requests
from . import AuthenticatedBaseException

_LOGGER = logging.getLogger(__name__)

PROVIDERS = {}


def register_provider(classname):
"""Decorator used to register providers."""
PROVIDERS[classname.name] = classname
return classname


class GeoProvider:
"""GeoProvider class."""

url = None

def __init__(self, ipaddr):
"""Initialize."""
self.result = {}
self.ipaddr = ipaddr

@property
def country(self):
"""Return country name or None."""
return self.result.get("country")

@property
def region(self):
"""Return region name or None."""
return self.result.get("region")

@property
def city(self):
"""Return city name or None."""
return self.result.get("city")

@property
def computed_result(self):
"""Return the computed result."""
if self.result is not None:
return {"country": self.country, "region": self.region, "city": self.city}
return None

def update_geo_info(self):
"""Update Geo Information."""
self.result = {}
try:
api = self.url.format(self.ipaddr)
header = {"user-agent": "Home Assistant/Python"}
data = requests.get(api, headers=header, timeout=5).json()

if data.get("error"):
if data.get("reason") == "RateLimited":
raise AuthenticatedBaseException(
"RatelimitError, try a different provider."
)

elif data.get("status", "success") == "error":
return

elif data.get("reserved"):
return

elif data.get("status", "success") == "fail":
raise AuthenticatedBaseException(
"[{}] - {}".format(
self.ipaddr, data.get("message", "Unknown error.")
)
)

self.result = data
self.parse_data()
except AuthenticatedBaseException as exception:
_LOGGER.error(exception)
except requests.exceptions.ConnectionError:
pass

def parse_data(self):
"""Parse data from geoprovider."""
self.result = self.result


@register_provider
class IPApi(GeoProvider):
"""IPApi class."""

url = "https://ipapi.co/{}/json"
name = "ipapi"

@property
def country(self):
"""Return country name or None."""
return self.result.get("country_name")


@register_provider
class ExtremeIPLookup(GeoProvider):
"""IPApi class."""

url = "https://extreme-ip-lookup.com/json/{}"
name = "extreme"


@register_provider
class IPVigilante(GeoProvider):
"""IPVigilante class."""

url = "https://ipvigilante.com/json/{}"
name = "ipvigilante"

def parse_data(self):
"""Parse data from geoprovider."""
self.result = self.result.get("data", {})

@property
def country(self):
"""Return country name or None."""
return self.result.get("country_name")

@property
def region(self):
"""Return region name or None."""
return self.result.get("subdivision_1_name")

@property
def city(self):
"""Return city name or None."""
return self.result.get("city_name")
175 changes: 9 additions & 166 deletions custom_components/authenticated/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@
import os
from ipaddress import ip_address as ValidateIP, ip_network
import socket
import requests
import voluptuous as vol
import yaml

import homeassistant.helpers.config_validation as cv
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.helpers.entity import Entity

from .providers import PROVIDERS

_LOGGER = logging.getLogger(__name__)

CONF_NOTIFY = "enable_notification"
Expand All @@ -41,24 +42,21 @@
LOGFILE = "home-assistant.log"
OUTFILE = ".ip_authenticated.yaml"

PROVIDERS = ["ipapi", "extreme", "ipvigilante"]

PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Optional(CONF_PROVIDER, default="ipapi"): vol.In(PROVIDERS),
vol.Optional(CONF_PROVIDER, default="ipapi"): vol.In(
["ipapi", "extreme", "ipvigilante"]
),
vol.Optional(CONF_LOG_LOCATION, default=""): cv.string,
vol.Optional(CONF_NOTIFY, default=True): cv.boolean,
vol.Optional(CONF_EXCLUDE, default=[]): vol.All(cv.ensure_list, [cv.string]),
}
)

PROVIDERS = {}


def register_provider(classname):
"""Decorator used to register providers."""
PROVIDERS[classname.name] = classname
return classname
def humanize_time(timestring):
"""Convert time."""
return datetime.strptime(timestring[:19], "%Y-%m-%dT%H:%M:%S")


def setup_platform(hass, config, add_devices, discovery_info=None):
Expand All @@ -78,14 +76,6 @@ def setup_platform(hass, config, add_devices, discovery_info=None):
add_devices([sensor], True)


class AuthenticatedBaseException(Exception):
"""Base exception for Authenticated."""


class AuthenticatedRateLimitException(AuthenticatedBaseException):
"""Ratelimit exception."""


class AuthenticatedSensor(Entity):
"""Representation of a Sensor."""

Expand Down Expand Up @@ -325,7 +315,7 @@ def load_authentications(authfile, exclude):
if ValidateIP(token["last_used_ip"]) in ip_network(
excludeaddress, False
):
raise Exception('IP in excluded address configuration')
raise Exception("IP in excluded address configuration")
if token.get("last_used_at") is None:
continue
if token["last_used_ip"] in tokens_cleaned:
Expand Down Expand Up @@ -433,150 +423,3 @@ def notify(self, hass):
last_used_at.replace("T", " "),
)
notify(message, title="New successful login", notification_id=self.ip_address)


class GeoProvider:
"""GeoProvider class."""

url = None

def __init__(self, ipaddr):
"""Initialize."""
self.result = {}
self.ipaddr = ipaddr

@property
def country(self):
"""Return country name or None."""
if self.result:
if self.result.get("country") is not None:
return self.result["country"]
return None

@property
def region(self):
"""Return region name or None."""
if self.result:
if self.result.get("region") is not None:
return self.result["region"]
return None

@property
def city(self):
"""Return city name or None."""
if self.result:
if self.result.get("city") is not None:
return self.result["city"]
return None

@property
def computed_result(self):
"""Return the computed result."""
if self.result is not None:
return {"country": self.country, "region": self.region, "city": self.city}
return None

def update_geo_info(self):
"""Update Geo Information."""
self.result = {}
try:
api = self.url.format(self.ipaddr)
header = {"user-agent": "Home Assistant/Python"}
data = requests.get(api, headers=header, timeout=5).json()

if data.get("error"):
if data.get("reason") == "RateLimited":
raise AuthenticatedRateLimitException(
"RatelimitError, try a different provider."
)

elif data.get("status", "success") == "error":
return

elif data.get("reserved"):
return

elif data.get("status", "success") == "fail":
raise AuthenticatedBaseException(
"[{}] - {}".format(
self.ipaddr, data.get("message", "Unknown error.")
)
)

self.result = data
self.parse_data()
except AuthenticatedRateLimitException as exception:
_LOGGER.error(exception)
except AuthenticatedBaseException as exception:
_LOGGER.error(exception)
except requests.exceptions.ConnectionError:
pass

def parse_data(self):
"""Parse data from geoprovider."""
self.result = self.result


@register_provider
class IPApi(GeoProvider):
"""IPApi class."""

url = "https://ipapi.co/{}/json"
name = "ipapi"

@property
def country(self):
"""Return country name or None."""
if self.result:
if self.result.get("country_name") is not None:
return self.result["country_name"]
return None


@register_provider
class ExtremeIPLookup(GeoProvider):
"""IPApi class."""

url = "https://extreme-ip-lookup.com/json/{}"
name = "extreme"


@register_provider
class IPVigilante(GeoProvider):
"""IPVigilante class."""

url = "https://ipvigilante.com/json/{}"
name = "ipvigilante"

def parse_data(self):
"""Parse data from geoprovider."""
self.result = self.result.get("data", {})

@property
def country(self):
"""Return country name or None."""
if self.result:
if self.result.get("country_name") is not None:
return self.result["country_name"]
return None

@property
def region(self):
"""Return region name or None."""
if self.result:
if self.result.get("subdivision_1_name") is not None:
return self.result["subdivision_1_name"]
return None

@property
def city(self):
"""Return city name or None."""
if self.result:
if self.result.get("city_name") is not None:
return self.result["city_name"]
return None


def humanize_time(timestring):
"""Convert time."""
return datetime.strptime(timestring[:19], "%Y-%m-%dT%H:%M:%S")

0 comments on commit bc99a2a

Please sign in to comment.