Skip to content

Commit

Permalink
feat(screener): use chainalysis (#2744)
Browse files Browse the repository at this point in the history
Co-authored-by: Trajan0x <[email protected]>
  • Loading branch information
golangisfun123 and trajan0x authored Jun 26, 2024
1 parent ac748d3 commit 08e2b19
Show file tree
Hide file tree
Showing 31 changed files with 518 additions and 1,143 deletions.
8 changes: 4 additions & 4 deletions contrib/screener-api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@
[![Go Reference](https://pkg.go.dev/badge/github.com/synapsecns/sanguine/contrib/screener-api.svg)](https://pkg.go.dev/github.com/synapsecns/sanguine/contrib/screener-api)
[![Go Report Card](https://goreportcard.com/badge/github.com/synapsecns/sanguine/contrib/screener-api)](https://goreportcard.com/report/github.com/synapsecns/sanguine/contrib/screener-api)

The screening api provides a simple restful interface for checking wether an address is blocked or not against a variety of data sources. Right now, two data sources are supported:
The screening api provides a simple restful interface for checking whether an address is blocked or not against a multiple data sources. Right now, two data sources are supported:

- Blacklist URL: a json list of addresses that are blocked
- TRM Labs: a list of rules that are used to determine if an address is blocked, can be different per "rule set"
- Chainalysis: the Entity API runs a screen against an address to quantify the risk associated with it, `Severe`, `High`, `Medium`, or `Low`.

Addresses themselves are checked against specific rulesets:

`https://screener-url/[ruleset]/address/[address]`
`https://screener-url/[address]`

<pre>
root
├── <a href="./chainalysis">chainalysis</a>: chainalysis client stub.
├── <a href="./client">client</a>: client library for using the screening api.
├── <a href="./cmd">cmd</a>: contains the command line interface to be used for the screener.
├── <a href="./config">config</a>: Yaml config struct/parsing.
├── <a href="./db">db</a>: db interface for the screener.
├── <a href="./screener">screener</a>: screening code.
├── <a href="./trmlabs">trmlabs</a>: trm client stub.
</pre>
140 changes: 140 additions & 0 deletions contrib/screener-api/chainalysis/chainalysisapi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package chainalysis

import (
"context"
"errors"
"fmt"
"github.com/TwiN/gocache/v2"
"github.com/valyala/fastjson"
"net/http"
"slices"
"strings"
"time"

"github.com/go-resty/resty/v2"
"github.com/synapsecns/sanguine/core/retry"
)

const (
// EntityEndpoint is the endpoint for the entity API.
EntityEndpoint = "/api/risk/v2/entities"
)

// Client is the interface for the Chainalysis API client. It makes requests to the Chainalysis API.
type Client interface {
ScreenAddress(ctx context.Context, address string) (bool, error)
}

// clientImpl is the implementation of the Chainalysis API client.
type clientImpl struct {
client *resty.Client
apiKey string
url string
riskLevels []string
registrationCache *gocache.Cache
}

const (
maxCacheSizeGB = 3
bytesInGB = 1024 * 1024 * 1024
chainalysisRequestTimeout = 30 * time.Second
)

// NewClient creates a new Chainalysis API client.
func NewClient(riskLevels []string, apiKey, url string) Client {
client := resty.New().
SetBaseURL(url).
SetHeader("Content-Type", "application/json").
SetHeader("Token", apiKey).
SetTimeout(chainalysisRequestTimeout)

// max cache size 3gb
// TODO: make this configurable.
registrationCache := gocache.NewCache().WithEvictionPolicy(gocache.LeastRecentlyUsed).WithMaxMemoryUsage(maxCacheSizeGB * bytesInGB)

return &clientImpl{
client: client,
apiKey: apiKey,
url: url,
riskLevels: riskLevels,
registrationCache: registrationCache,
}
}

// ScreenAddress screens an address from the Chainalysis API.
func (c *clientImpl) ScreenAddress(parentCtx context.Context, address string) (bool, error) {
// make sure to cancel the context when we're done.
// this ensures if we didn't need pessimistic register, we don't wait on it.
ctx, cancel := context.WithCancel(parentCtx)
defer cancel()

address = strings.ToLower(address)

// we don't even wait on pessimistic register since if the address is already registered, but not in the in-memory cache
// this will just get canceled.
go func() {
// Register the address in the cache.
if err := c.pessimisticRegister(ctx, address); err != nil && !errors.Is(err, context.Canceled) {
fmt.Printf("could not register address: %v\n", err)
}
}()

return c.checkBlacklist(ctx, address)
}

// pessimisticRegister registers an address if its not in memory cache. This happens regardless it was registered before.
func (c *clientImpl) pessimisticRegister(ctx context.Context, address string) error {
if _, isPresent := c.registrationCache.Get(address); !isPresent {
if err := c.registerAddress(ctx, address); err != nil {
return fmt.Errorf("could not register address: %w", err)
}
}
return nil
}

func (c *clientImpl) checkBlacklist(ctx context.Context, address string) (bool, error) {
var resp *resty.Response
// Retry until the user is registered.
err := retry.WithBackoff(ctx,
func(ctx context.Context) (err error) {
resp, err = c.client.R().
SetContext(ctx).
SetPathParam("address", address).
Get(EntityEndpoint + "/" + address)
if err != nil {
return fmt.Errorf("could not get response: %w", err)
}

if resp.StatusCode() != http.StatusOK {
return fmt.Errorf("could not get response: %s", resp.Status())
}
return nil
}, retry.WithMax(time.Second))
if err != nil {
return false, fmt.Errorf("could not get response: %w", err)
}

// address has been found, let's screen it.
c.registrationCache.Set(address, struct{}{})

risk := fastjson.GetString(resp.Body(), "risk")
return slices.Contains(c.riskLevels, risk), nil
}

// registerAddress registers an address in the case that we try and screen for a nonexistent address.
func (c *clientImpl) registerAddress(ctx context.Context, address string) error {
payload := map[string]interface{}{
"address": address,
}
res, err := c.client.R().SetContext(ctx).SetBody(payload).Post(EntityEndpoint)
if err != nil {
return fmt.Errorf("could not register address: %w", err)
}
if res.IsError() {
return fmt.Errorf("could not register address: %s", res.Status())
}

return nil
}

var _ Client = &clientImpl{}
3 changes: 3 additions & 0 deletions contrib/screener-api/chainalysis/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// Package chainalysis contains the implementation of the Chainalysis API client.
// this implementation is incomplete, but it is a good starting point.
package chainalysis
58 changes: 36 additions & 22 deletions contrib/screener-api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

// ScreenerClient is an interface for the Screener API.
type ScreenerClient interface {
ScreenAddress(ctx context.Context, ruleset, address string) (blocked bool, err error)
ScreenAddress(ctx context.Context, address string) (blocked bool, err error)
BlacklistAddress(ctx context.Context, appsecret string, appid string, body BlackListBody) (string, error)
}

Expand All @@ -45,40 +45,34 @@ type blockedResponse struct {
Blocked bool `json:"risk"`
}

// ScreenAddress checks if an address is blocked by the screener.
func (c clientImpl) ScreenAddress(ctx context.Context, ruleset, address string) (bool, error) {
type notFoundResponse struct {
Message string `json:"message"`
}

// ScreenAddress checks if an address is blocked by the screener API.
func (c clientImpl) ScreenAddress(ctx context.Context, address string) (bool, error) {
var blockedRes blockedResponse
resp, err := c.rClient.R().
SetContext(ctx).
SetResult(&blockedRes).
Get(fmt.Sprintf("/%s/address/%s", ruleset, address))
Get("/address/" + address)
if err != nil {
return false, fmt.Errorf("error from server: %s: %w", resp.Status(), err)
}

if resp.IsError() {
return false, fmt.Errorf("error from server: %s", resp.Status())
// The address was not found
if err := json.Unmarshal(resp.Body(), &notFoundResponse{}); err == nil {
return false, nil
}

return false, fmt.Errorf("error from server: %s %w", resp, err)
}

return blockedRes.Blocked, nil
}

// BlackListBody is the json payload that represents a blacklisted address.
type BlackListBody struct {
Type string `json:"type"`
ID string `json:"id"`
Data string `json:"data"`
Address string `json:"address"`
Network string `json:"network"`
Tag string `json:"tag"`
Remark string `json:"remark"`
}

type blacklistResponse struct {
Status string `json:"status"`
Error string `json:"error"`
}

// BlacklistAddress blacklists an address with the screener API.
func (c clientImpl) BlacklistAddress(ctx context.Context, appsecret string, appid string, body BlackListBody) (string, error) {
var blacklistRes blacklistResponse

Expand Down Expand Up @@ -118,6 +112,22 @@ func (c clientImpl) BlacklistAddress(ctx context.Context, appsecret string, appi
return blacklistRes.Status, nil
}

// BlackListBody is the json payload that represents a blacklisted address.
type BlackListBody struct {
Type string `json:"type"`
ID string `json:"id"`
Data string `json:"data"`
Address string `json:"address"`
Network string `json:"network"`
Tag string `json:"tag"`
Remark string `json:"remark"`
}

type blacklistResponse struct {
Status string `json:"status"`
Error string `json:"error"`
}

// GenerateSignature generates a signature for the request.
func GenerateSignature(
secret,
Expand All @@ -137,10 +147,14 @@ func NewNoOpClient() (ScreenerClient, error) {

type noOpClient struct{}

func (n noOpClient) ScreenAddress(_ context.Context, _, _ string) (bool, error) {
func (n noOpClient) ScreenAddress(_ context.Context, _ string) (bool, error) {
return false, nil
}

func (n noOpClient) RegisterAddress(_ context.Context, _ string) error {
return nil
}

func (n noOpClient) BlacklistAddress(_ context.Context, _ string, _ string, _ BlackListBody) (string, error) {
return "", nil
}
Expand Down
3 changes: 2 additions & 1 deletion contrib/screener-api/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cmd

import (
"fmt"

"github.com/synapsecns/sanguine/core/commandline"
"github.com/synapsecns/sanguine/core/config"
"github.com/synapsecns/sanguine/core/metrics"
Expand All @@ -21,7 +22,7 @@ func Start(args []string, buildInfo config.BuildInfo) {
// nolint:wrapcheck
return metrics.Setup(c.Context, buildInfo)
}
app.Commands = cli.Commands{screenerCommand, splitterCommand}
app.Commands = cli.Commands{screenerCommand}
shellCommand := commandline.GenerateShellCommand(app.Commands)
app.Commands = append(app.Commands, shellCommand)
app.Action = shellCommand.Action
Expand Down
21 changes: 2 additions & 19 deletions contrib/screener-api/cmd/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package cmd

import (
"fmt"
"os"

"github.com/synapsecns/sanguine/contrib/screener-api/config"
"github.com/synapsecns/sanguine/contrib/screener-api/screener"
"github.com/synapsecns/sanguine/core/metrics"
"github.com/urfave/cli/v2"
"gopkg.in/yaml.v2"
"os"
)

var fileFlag = &cli.StringFlag{
Expand Down Expand Up @@ -54,21 +55,3 @@ var outDirFlag = &cli.StringFlag{
Name: "out-dir",
Usage: "Specify the path to the output directory where split CSV files will be saved. Example: --out-dir /path/to/output",
}

var splitterCommand = &cli.Command{
Name: "splitter",
Usage: "splitter",
Flags: []cli.Flag{inFileFlag, outDirFlag},
Description: "takes a csv and splits it into many out csvs",
Action: func(context *cli.Context) error {
inFile := context.String(inFileFlag.Name)
outFile := context.String(outDirFlag.Name)

_, err := screener.SplitAndWriteCSV(inFile, outFile)
if err != nil {
return fmt.Errorf("failed to split csv: %w", err)
}

return nil
},
}
50 changes: 7 additions & 43 deletions contrib/screener-api/config/config.go
Original file line number Diff line number Diff line change
@@ -1,64 +1,28 @@
package config

import "time"

// Config is the configuration for the screener.
type Config struct {
// AppSecret is the app secret
AppSecret string `yaml:"app-secret"`
// AppID is the app id
AppID string `yaml:"app-id"`
// TRMKey is the api key for trmlabs
TRMKey string `yaml:"trm-key"`
// Rules of [caller_type]->risk_type
Rulesets map[string]RulesetConfig `yaml:"rulesets"`
// ChainalysisKey is the api key for chainalysis
ChainalysisKey string `yaml:"chainalysis-key"`
// ChainalysisURL is the url for chainalysis
ChainalysisURL string `yaml:"chainalysis-url"`
// BlacklistURL is the url to the blacklist file
// this is appplied to all rules and cannot be overridden
// this is applied to all rules and cannot be overridden
BlacklistURL string `yaml:"blacklist-url"`
// CacheTime is the time to cache results for (in seconds)
// can be overridden per rulesets
CacheTime int `yaml:"cache-time"`
// Port is the port to listen on
Port int `yaml:"port"`
// Database is the database configuration
Database DatabaseConfig `yaml:"database"`
// VolumeThresholds is the volume thresholds for each risk type
VolumeThresholds []VolumeThreshold `yaml:"volumeThresholds"`
// TODO: This HAS to be re-structured somehow
// Severities are the severity levels for each address we want to screen
RiskLevels []string `yaml:"risk-levels"`
// Whitelist is a list of addresses to whitelist
Whitelist []string `yaml:"whitelist"`
}

// VolumeThreshold defines thresholds for different risk categories and types.
type VolumeThreshold struct {
Category string `yaml:"category"`
TypeOfRisk string `yaml:"typeOfRisk"`
Incoming float64 `yaml:"incoming"`
Outgoing float64 `yaml:"outgoing"`
}

// GetCacheTime gets how long to use the cache for a given ruleset.
func (c Config) GetCacheTime(rulset string) time.Duration {
ruleset, hasRuleset := c.Rulesets[rulset]
if !hasRuleset {
return time.Duration(c.CacheTime) * time.Second
}

if ruleset.CacheTime != nil {
return time.Duration(*ruleset.CacheTime) * time.Second
}

return time.Duration(c.CacheTime) * time.Second
}

// RulesetConfig is the config for each given ruleset.
type RulesetConfig struct {
// Filename is the filename of the ruleset
Filename string `json:"filename"`
// CacheTime (in seconds)
CacheTime *int `json:"cache-time"`
}

// DatabaseConfig represents the configuration for the database.
type DatabaseConfig struct {
Type string `yaml:"type"`
Expand Down
Loading

0 comments on commit 08e2b19

Please sign in to comment.