forked from Reecepbcups/cosmos-cache
-
Notifications
You must be signed in to change notification settings - Fork 0
/
COINGECKO.py
78 lines (59 loc) · 2.2 KB
/
COINGECKO.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import json
from time import time
from pycoingecko import CoinGeckoAPI
import CONFIG
from CONFIG import KV_STORE
from HELPERS import ttl_block_only
from HELPERS_TYPES import Mode
class Coingecko:
# https://www.coingecko.com/en/api/documentation
# 10-30 calls per minute. So we do 6 seconds to be on the safe side by default.
# If you use a paid plan, you can do 500+ -> https://www.coingecko.com/en/api/pricing
def __init__(self):
api_key = CONFIG.COINGECKO_API_KEY
if len(api_key) > 0:
self.cg = CoinGeckoAPI(api_key=api_key)
else:
self.cg = CoinGeckoAPI()
def get_symbols(self):
ids = CONFIG.COINGECKO_IDS
key = f"coingecko_symbols;{ids}"
values = KV_STORE.get(key)
if values is not None:
return json.loads(values)
values = {}
for _id in ids:
data = self.cg.get_coin_by_id(_id)
symbol = data.get("symbol", "")
values[_id] = symbol
KV_STORE.set(key, json.dumps(values), timeout=86400)
return values
def get_price(self):
ids = CONFIG.COINGECKO_IDS
vs_currencies = CONFIG.COINGECKO_FIAT
cache_seconds = int(CONFIG.COINGECKO_CACHE.get("seconds", 7))
key = f"coingecko;{ttl_block_only(cache_seconds)};{ids};{vs_currencies}"
value = KV_STORE.get(key)
if value is not None:
return json.loads(value)
symbols = self.get_symbols() # cached 1 day
coins = self.cg.get_price(ids=ids, vs_currencies=vs_currencies)
# print(symbols)
updated_coins = {}
for k, v in coins.items():
symbol = str(symbols.get(k, k)).upper()
updated_coins[symbol] = {"coingecko-id": k, "prices": v}
data = {
"coins": updated_coins,
"last_update": int(time()),
}
if cache_seconds == Mode.FOR_BLOCK_TIME.value: # -2
cache_seconds = int(CONFIG.DEFAULT_CACHE_SECONDS)
KV_STORE.set(key, json.dumps(data), timeout=int(cache_seconds))
return data
if __name__ == "__main__":
p = Coingecko()
# v = p.get_price()
# print(v)
# print(p.get_symbols())
print(p.get_price())