Skip to content

Commit

Permalink
Coinbase datasource (#78)
Browse files Browse the repository at this point in the history
  • Loading branch information
droserasprout authored Jul 2, 2021
1 parent 1442b02 commit 86508af
Show file tree
Hide file tree
Showing 10 changed files with 184 additions and 19 deletions.
43 changes: 39 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ click = "^8.0.1"
pyee = "^8.1.0"
APScheduler = "^3.7.0"
sentry-sdk = "^1.1.0"
aiolimiter = "^1.0.0-beta.1"

[tool.poetry.dev-dependencies]
black = "^20.8b1"
Expand Down
15 changes: 13 additions & 2 deletions src/dipdup/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,18 @@ def valid_url(cls, v):
return v


DatasourceConfigT = Union[TzktDatasourceConfig, BcdDatasourceConfig]
@dataclass
class CoinbaseDatasourceConfig(NameMixin):
kind: Literal['coinbase']
api_key: Optional[str] = None
secret_key: Optional[str] = None
passphrase: Optional[str] = None

def __hash__(self):
return hash(self.kind)


DatasourceConfigT = Union[TzktDatasourceConfig, BcdDatasourceConfig, CoinbaseDatasourceConfig]


@dataclass
Expand Down Expand Up @@ -644,7 +655,7 @@ class DipDupConfig:

spec_version: str
package: str
datasources: Dict[str, Union[TzktDatasourceConfig, BcdDatasourceConfig]]
datasources: Dict[str, DatasourceConfigT]
contracts: Dict[str, ContractConfig] = Field(default_factory=dict)
indexes: Dict[str, IndexConfigT] = Field(default_factory=dict)
templates: Optional[Dict[str, IndexConfigTemplateT]] = None
Expand Down
3 changes: 2 additions & 1 deletion src/dipdup/datasources/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Union

from dipdup.datasources.bcd.datasource import BcdDatasource
from dipdup.datasources.coinbase.datasource import CoinbaseDatasource
from dipdup.datasources.tzkt.datasource import TzktDatasource

DatasourceT = Union[TzktDatasource, BcdDatasource]
DatasourceT = Union[TzktDatasource, BcdDatasource, CoinbaseDatasource]
1 change: 0 additions & 1 deletion src/dipdup/datasources/bcd/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

class BcdDatasource:
def __init__(self, url: str, network: str, cache: bool) -> None:
super().__init__()
self._url = url.rstrip('/')
self._network = network
self._logger = logging.getLogger('dipdup.bcd')
Expand Down
Empty file.
60 changes: 60 additions & 0 deletions src/dipdup/datasources/coinbase/datasource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import logging
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Tuple

from aiolimiter import AsyncLimiter

from dipdup.datasources.coinbase.models import CandleData, CandleInterval
from dipdup.datasources.proxy import DatasourceRequestProxy

CANDLES_REQUEST_LIMIT = 300
REST_API_URL = 'https://api.pro.coinbase.com'
WEBSOCKET_API_URL = 'wss://ws-feed.pro.coinbase.com'


class CoinbaseDatasource:
def __init__(self, cache: bool) -> None:
self._logger = logging.getLogger('dipdup.coinbase')
self._proxy = DatasourceRequestProxy(
cache=cache,
ratelimiter=AsyncLimiter(max_rate=10, time_period=1),
)

async def close_session(self) -> None:
await self._proxy.close_session()

async def run(self) -> None:
pass

async def resync(self) -> None:
pass

async def get_oracle_prices(self) -> Dict[str, Any]:
return await self._proxy.http_request(
'get',
url=f'{REST_API_URL}/oracle',
)

async def get_candles(self, since: datetime, until: datetime, interval: CandleInterval, ticker: str = 'XTZ-USD') -> List[CandleData]:
candles = []
for _since, _until in self._split_candle_requests(since, until, interval):
candles_json = await self._proxy.http_request(
'get',
url=f'{REST_API_URL}/products/{ticker}/candles',
params={
'start': _since.replace(tzinfo=timezone.utc).isoformat(),
'end': _until.replace(tzinfo=timezone.utc).isoformat(),
'granularity': interval.seconds,
},
)
candles += [CandleData.from_json(c) for c in candles_json]
return sorted(candles, key=lambda c: c.timestamp)

def _split_candle_requests(self, since: datetime, until: datetime, interval: CandleInterval) -> List[Tuple[datetime, datetime]]:
request_interval_limit = timedelta(seconds=interval.seconds * CANDLES_REQUEST_LIMIT)
request_intervals = []
while since + request_interval_limit < until:
request_intervals.append((since, since + request_interval_limit))
since += request_interval_limit
request_intervals.append((since, until))
return request_intervals
47 changes: 47 additions & 0 deletions src/dipdup/datasources/coinbase/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from datetime import datetime, timezone
from decimal import Decimal
from enum import Enum
from typing import List, Union

from pydantic.dataclasses import dataclass


class CandleInterval(Enum):
ONE_MINUTE = 'ONE_MINUTE'
FIVE_MINUTES = 'FIVE_MINUTES'
FIFTEEN_MINUTES = 'FIFTEEN_MINUTES'
ONE_HOUR = 'ONE_HOUR'
SIX_HOURS = 'SIX_HOURS'
ONE_DAY = 'ONE_DAY'

@property
def seconds(self) -> int:
return {
CandleInterval.ONE_MINUTE: 60,
CandleInterval.FIVE_MINUTES: 300,
CandleInterval.FIFTEEN_MINUTES: 900,
CandleInterval.ONE_HOUR: 3600,
CandleInterval.SIX_HOURS: 21600,
CandleInterval.ONE_DAY: 86400,
}[self]


@dataclass
class CandleData:
timestamp: datetime
low: Decimal
high: Decimal
open: Decimal
close: Decimal
volume: Decimal

@classmethod
def from_json(cls, json: List[Union[int, float]]) -> 'CandleData':
return CandleData(
timestamp=datetime.fromtimestamp(json[0], tz=timezone.utc),
low=Decimal(str(json[1])),
high=Decimal(str(json[2])),
open=Decimal(str(json[3])),
close=Decimal(str(json[4])),
volume=Decimal(str(json[5])),
)
11 changes: 9 additions & 2 deletions src/dipdup/datasources/proxy.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,30 @@
import hashlib
import logging
import pickle
from typing import Optional

import aiohttp
from aiolimiter import AsyncLimiter
from fcache.cache import FileCache # type: ignore

from dipdup.utils import http_request


class DatasourceRequestProxy:
def __init__(self, cache: bool = False) -> None:
def __init__(self, cache: bool = False, ratelimiter: Optional[AsyncLimiter] = None) -> None:
self._logger = logging.getLogger(__name__)
self._cache = FileCache('dipdup', flag='cs') if cache else None
self._ratelimiter = ratelimiter
self._session = aiohttp.ClientSession()

async def http_request(self, method: str, skip_cache: bool = False, **kwargs):
async def http_request(self, method: str, skip_cache: bool = False, weight: int = 1, **kwargs):
if self._cache is not None and not skip_cache:
key = hashlib.sha256(pickle.dumps([method, kwargs])).hexdigest()
try:
return self._cache[key]
except KeyError:
if self._ratelimiter:
await self._ratelimiter.acquire(weight)
response = await http_request(
session=self._session,
method=method,
Expand All @@ -28,6 +33,8 @@ async def http_request(self, method: str, skip_cache: bool = False, **kwargs):
self._cache[key] = response
return response
else:
if self._ratelimiter:
await self._ratelimiter.acquire(weight)
response = await http_request(
session=self._session,
method=method,
Expand Down
22 changes: 13 additions & 9 deletions src/dipdup/dipdup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
ROLLBACK_HANDLER,
BcdDatasourceConfig,
BigMapIndexConfig,
CoinbaseDatasourceConfig,
DatasourceConfigT,
DipDupConfig,
IndexConfigTemplateT,
Expand All @@ -30,6 +31,7 @@
from dipdup.context import DipDupContext, RollbackHandlerContext
from dipdup.datasources import DatasourceT
from dipdup.datasources.bcd.datasource import BcdDatasource
from dipdup.datasources.coinbase.datasource import CoinbaseDatasource
from dipdup.datasources.datasource import IndexDatasource
from dipdup.datasources.tzkt.datasource import TzktDatasource
from dipdup.exceptions import ConfigurationError, HandlerImportError
Expand Down Expand Up @@ -201,7 +203,7 @@ async def run(self, reindex: bool, oneshot: bool) -> None:
await asyncio.gather(*[d.close_session() for d in self._datasources.values()])
# FIXME: AttributeError: 'NoneType' object has no attribute 'call_soon_threadsafe'
with suppress(AttributeError, SchedulerNotRunningError):
await self._scheduler.shutdown(wait=True)
self._scheduler.shutdown(wait=True)

async def migrate(self) -> None:
codegen = DipDupCodeGenerator(self._config, self._datasources_by_config)
Expand Down Expand Up @@ -233,20 +235,22 @@ async def _create_datasources(self) -> None:
url=datasource_config.url,
cache=self._config.cache_enabled,
)
self._datasources[name] = datasource
self._datasources_by_config[datasource_config] = datasource

elif isinstance(datasource_config, BcdDatasourceConfig):
datasource = BcdDatasource(
datasource_config.url,
datasource_config.network,
self._config.cache_enabled,
url=datasource_config.url,
network=datasource_config.network,
cache=self._config.cache_enabled,
)
elif isinstance(datasource_config, CoinbaseDatasourceConfig):
datasource = CoinbaseDatasource(
cache=self._config.cache_enabled,
)
self._datasources[name] = datasource
self._datasources_by_config[datasource_config] = datasource
else:
raise NotImplementedError

self._datasources[name] = datasource
self._datasources_by_config[datasource_config] = datasource

async def _initialize_database(self, reindex: bool = False) -> None:
self._logger.info('Initializing database')

Expand Down

0 comments on commit 86508af

Please sign in to comment.