Skip to content

Commit

Permalink
Datasources api WIP;
Browse files Browse the repository at this point in the history
  • Loading branch information
mismirnov committed Dec 16, 2024
1 parent 13b85f3 commit 1478a02
Show file tree
Hide file tree
Showing 13 changed files with 682 additions and 2 deletions.
33 changes: 31 additions & 2 deletions .run/API.run.xml
Original file line number Diff line number Diff line change
@@ -1,8 +1,37 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="API" type="GoApplicationRunConfiguration" factoryName="Go Application">
<module name="celestia-indexer" />
<working_directory value="$PROJECT_DIR$" />
<parameters value="-c configs/dipdup.yml" />
<working_directory value="$PROJECT_DIR$/cmd/api" />
<parameters value="-c ../../configs/dipdup.yml" />
<envs>
<env name="API_PROMETHEUS_ENABLED" value="false" />
<env name="API_RATE_LIMIT" value="20" />
<env name="API_REQUEST_TIMEOUT" value="10" />
<env name="API_WEBSOCKET_ENABLED" value="true" />
<env name="BINANCE_API_RPS" value="5" />
<env name="BINANCE_API_TIMEOUT" value="10" />
<env name="BINANCE_API_URL" value="https://api.binance.com/" />
<env name="CELENIUM_ENV" value="production" />
<env name="CELESTIA_DAL_API_RPS" value="10" />
<env name="CELESTIA_DAL_API_TIMEOUT" value="30 # seconds" />
<env name="CELESTIA_DAL_API_URL" value="" />
<env name="CELESTIA_NODE_AUTH_TOKEN" value="" />
<env name="CELESTIA_NODE_RPS" value="5" />
<env name="CELESTIA_NODE_TIMEOUT" value="10 # seconds" />
<env name="CELESTIA_NODE_URL" value="https://node.celenium.io" />
<env name="CELESTIA_NODE_WS_URL" value="https://node.celenium.io:443" />
<env name="INDEXER_BLOCK_PERIOD" value="15 # seconds" />
<env name="INDEXER_NAME" value="celestia_indexer" />
<env name="INDEXER_SCRIPTS_DIR" value="./../../database" />
<env name="INDEXER_START_LEVEL" value="1" />
<env name="INDEXER_THREADS_COUNT" value="10" />
<env name="LOG_LEVEL" value="debug" />
<env name="POSTGRES_DB" value="celestia" />
<env name="POSTGRES_HOST" value="localhost" />
<env name="POSTGRES_PASSWORD" value="changeme" />
<env name="POSTGRES_PORT" value="5432" />
<env name="POSTGRES_USER" value="dipdup" />
</envs>
<EXTENSION ID="net.ashald.envfile">
<option name="IS_ENABLED" value="true" />
<option name="IS_SUBST" value="false" />
Expand Down
29 changes: 29 additions & 0 deletions .run/Indexer.run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,35 @@
<module name="celestia-indexer" />
<working_directory value="$PROJECT_DIR$" />
<parameters value="-c configs/dipdup.yml" />
<envs>
<env name="API_PROMETHEUS_ENABLED" value="false" />
<env name="API_RATE_LIMIT" value="20" />
<env name="API_REQUEST_TIMEOUT" value="10" />
<env name="API_WEBSOCKET_ENABLED" value="true" />
<env name="BINANCE_API_RPS" value="5" />
<env name="BINANCE_API_TIMEOUT" value="10" />
<env name="BINANCE_API_URL" value="https://api.binance.com/" />
<env name="CELENIUM_ENV" value="production" />
<env name="CELESTIA_DAL_API_RPS" value="10" />
<env name="CELESTIA_DAL_API_TIMEOUT" value="30 # seconds" />
<env name="CELESTIA_DAL_API_URL" value="" />
<env name="CELESTIA_NODE_AUTH_TOKEN" value="" />
<env name="CELESTIA_NODE_RPS" value="5" />
<env name="CELESTIA_NODE_TIMEOUT" value="10 # seconds" />
<env name="CELESTIA_NODE_URL" value="https://node.celenium.io" />
<env name="CELESTIA_NODE_WS_URL" value="https://node.celenium.io:443" />
<env name="INDEXER_BLOCK_PERIOD" value="15 # seconds" />
<env name="INDEXER_NAME" value="celestia_indexer" />
<env name="INDEXER_SCRIPTS_DIR" value="./database" />
<env name="INDEXER_START_LEVEL" value="1" />
<env name="INDEXER_THREADS_COUNT" value="10" />
<env name="LOG_LEVEL" value="debug" />
<env name="POSTGRES_DB" value="celestia" />
<env name="POSTGRES_HOST" value="localhost" />
<env name="POSTGRES_PASSWORD" value="changeme" />
<env name="POSTGRES_PORT" value="5432" />
<env name="POSTGRES_USER" value="dipdup" />
</envs>
<EXTENSION ID="net.ashald.envfile">
<option name="IS_ENABLED" value="true" />
<option name="IS_SUBST" value="false" />
Expand Down
69 changes: 69 additions & 0 deletions cmd/tvl/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// SPDX-FileCopyrightText: 2024 PK Lab AG <[email protected]>
// SPDX-License-Identifier: MIT

package main

import (
"os"
"strconv"

"github.com/celenium-io/celestia-indexer/pkg/indexer/config"
goLibConfig "github.com/dipdup-net/go-lib/config"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

func init() {
log.Logger = log.Output(zerolog.ConsoleWriter{
Out: os.Stdout,
TimeFormat: "2006-01-02 15:04:05",
})
}

func initConfig() (*config.Config, error) {
configPath := rootCmd.PersistentFlags().StringP("config", "c", "dipdup.yml", "path to YAML config file")
if err := rootCmd.Execute(); err != nil {
log.Panic().Err(err).Msg("command line execute")
return nil, err
}

if err := rootCmd.MarkFlagRequired("config"); err != nil {
log.Panic().Err(err).Msg("config command line arg is required")
return nil, err
}

var cfg config.Config
if err := goLibConfig.Parse(*configPath, &cfg); err != nil {
log.Panic().Err(err).Msg("parsing config file")
return nil, err
}

if cfg.LogLevel == "" {
cfg.LogLevel = zerolog.LevelInfoValue
}

return &cfg, nil
}

func initLogger(level string) error {
logLevel, err := zerolog.ParseLevel(level)
if err != nil {
log.Panic().Err(err).Msg("parsing log level")
return err
}
zerolog.SetGlobalLevel(logLevel)
zerolog.CallerMarshalFunc = func(pc uintptr, file string, line int) string {
short := file
for i := len(file) - 1; i > 0; i-- {
if file[i] == '/' {
short = file[i+1:]
break
}
}
file = short
return file + ":" + strconv.Itoa(line)
}
log.Logger = log.Logger.With().Caller().Logger()

return nil
}
52 changes: 52 additions & 0 deletions cmd/tvl/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// SPDX-FileCopyrightText: 2024 PK Lab AG <[email protected]>
// SPDX-License-Identifier: MIT

package main

import (
"context"
indexer "github.com/celenium-io/celestia-indexer/pkg/tvl"
"github.com/dipdup-net/indexer-sdk/pkg/modules/stopper"

"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"os"
"os/signal"
"syscall"
)

var rootCmd = &cobra.Command{
Use: "indexer",
Short: "DipDup Verticals | Celenium TVL Scanner",
}

func main() {
cfg, err := initConfig()
if err != nil {
return
}

if err = initLogger(cfg.LogLevel); err != nil {
return
}

ctx, cancel := context.WithCancel(context.Background())

notifyCtx, notifyCancel := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)
defer notifyCancel()

stopperModule := stopper.NewModule(cancel)
indexerModule, err := indexer.New(ctx, *cfg, &stopperModule)
if err != nil {
log.Panic().Err(err).Msg("error during indexer module creation")
return
}

stopperModule.Start(ctx)
indexerModule.Start(ctx)

<-notifyCtx.Done()
cancel()

log.Info().Msg("stopped")
}
10 changes: 10 additions & 0 deletions configs/dipdup.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ datasources:
url: ${BINANCE_API_URL:-https://api.binance.com/}
rps: ${BINANCE_API_RPS:-5}
timeout: ${BINANCE_API_TIMEOUT:-10}
l2beat:
kind: l2beat_api
url: ${L2BEAT_API_URL:-https://l2beat.com/api/}
rps: ${L2BEAT_API_RPS:-5}
timeout: ${L2BEAT_API_TIMEOUT:-10}
lama:
kind: lama_api
url: ${LAMA_API_URL:-https://api.llama.fi/}
rps: ${LAMA_API_RPS:-5}
timeout: ${LAMA_API_TIMEOUT:-10}

api:
bind: ${API_HOST:-0.0.0.0}:${API_PORT:-9876}
Expand Down
103 changes: 103 additions & 0 deletions internal/tvl/l2beat/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// SPDX-FileCopyrightText: 2024 PK Lab AG <[email protected]>
// SPDX-License-Identifier: MIT

package l2beat

import (
"context"
"encoding/json"
"github.com/dipdup-net/go-lib/config"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"golang.org/x/time/rate"
"io"
"net/http"
"net/url"
"time"
)

type API struct {
client *http.Client
cfg config.DataSource
rateLimit *rate.Limiter
log zerolog.Logger
}

func NewAPI(cfg config.DataSource) API {
rps := cfg.RequestsPerSecond
if cfg.RequestsPerSecond < 1 || cfg.RequestsPerSecond > 100 {
rps = 10
}

t := http.DefaultTransport.(*http.Transport).Clone()
t.MaxIdleConns = rps
t.MaxConnsPerHost = rps
t.MaxIdleConnsPerHost = rps

return API{
client: &http.Client{
Transport: t,
},
cfg: cfg,
rateLimit: rate.NewLimiter(rate.Every(time.Second/time.Duration(rps)), rps),
log: log.With().Str("module", "L2Beat api").Logger(),
}
}

func (api *API) get(ctx context.Context, path string, args map[string]string, output any) error {
u, err := url.Parse(api.cfg.URL)
if err != nil {
return err
}
u.Path, err = url.JoinPath(u.Path, path)
if err != nil {
return err
}

values := u.Query()
for key, value := range args {
values.Add(key, value)
}
u.RawQuery = values.Encode()

if api.rateLimit != nil {
if err := api.rateLimit.Wait(ctx); err != nil {
return err
}
}

start := time.Now()

req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
if err != nil {
return err
}

response, err := api.client.Do(req)
if err != nil {
return err
}
defer closeWithLogError(response.Body, api.log)

api.log.Trace().
Int64("ms", time.Since(start).Milliseconds()).
Str("url", u.String()).
Msg("request")

if response.StatusCode != http.StatusOK {
return errors.Errorf("invalid status: %d", response.StatusCode)
}

err = json.NewDecoder(response.Body).Decode(output)
return err
}

func closeWithLogError(stream io.ReadCloser, log zerolog.Logger) {
if _, err := io.Copy(io.Discard, stream); err != nil {
log.Err(err).Msg("L2Beat api copy GET body response to discard")
}
if err := stream.Close(); err != nil {
log.Err(err).Msg("L2Beat api close GET body request")
}
}
11 changes: 11 additions & 0 deletions internal/tvl/l2beat/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// SPDX-FileCopyrightText: 2024 PK Lab AG <[email protected]>
// SPDX-License-Identifier: MIT

package l2beat

import "context"

//go:generate mockgen -source=$GOFILE -destination=mock/$GOFILE -package=mock -typed
type IApi interface {
TVL(ctx context.Context, arguments *TVLArgs) (result TVLResponse, err error)
}
40 changes: 40 additions & 0 deletions internal/tvl/l2beat/tvl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package l2beat

import (
"context"
)

type Data struct {
JSON [][]interface{} `json:"json"`
}

type Result struct {
Data Data `json:"data"`
}

type TVLResponse []struct {
Result Result `json:"result"`
}

type TVLArgs struct {
Batch int64 `json:"batch"`
Input JsonData `json:"input"`
}

type JsonData struct {
Filter Filter `json:"filter"`
Range string `json:"range"`
ExcludeAssociatedTokens bool `json:"excludeAssociatedTokens"`
}

type Filter struct {
Type string `json:"type"`
ProjectIds []string `json:"projectIds"`
}

func (api API) TVL(ctx context.Context, arguments *TVLArgs) (result TVLResponse, err error) {
// TODO: cast args to query params
args := map[string]string{}
err = api.get(ctx, "trpc/tvl.chart", args, &result)
return
}
Loading

0 comments on commit 1478a02

Please sign in to comment.