Skip to content

Commit

Permalink
setup docker build in cicd (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
geekflyer authored Apr 18, 2023
1 parent ae22372 commit 0c22d9d
Show file tree
Hide file tree
Showing 13 changed files with 449 additions and 44 deletions.
32 changes: 32 additions & 0 deletions .github/actions/docker-setup/action.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
name: "Docker build setup"
description: |
Runs an opinionated and unified docker build setup action. It does the following:
* Logs in to docker image registries GCP GAR
inputs:
# GCP auth
GCP_WORKLOAD_IDENTITY_PROVIDER:
required: true
description: "GCP Workload Identity provider"
GCP_SERVICE_ACCOUNT_EMAIL:
required: true
description: "GCP service account email"

runs:
using: composite
steps:
- id: auth
name: "Authenticate to Google Cloud"
uses: "google-github-actions/auth@dac4e13deb3640f22e3ffe758fd3f95e6e89f712" # pin@v0
with:
create_credentials_file: false
token_format: "access_token"
access_token_lifetime: 5400 # setting this to 1.5h since sometimes docker builds (special performance builds etc.) take that long. Default is 1h.
workload_identity_provider: ${{ inputs.GCP_WORKLOAD_IDENTITY_PROVIDER }}
service_account: ${{ inputs.GCP_SERVICE_ACCOUNT_EMAIL }}

- name: Login to US multi-region Google Artifact Registry
uses: docker/login-action@49ed152c8eca782a232dede0303416e8f356c37b # pin@v2
with:
registry: us-docker.pkg.dev
username: oauth2accesstoken
password: ${{ steps.auth.outputs.access_token }}
36 changes: 36 additions & 0 deletions .github/workflows/build-images.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: "Build Docker Images"
on:
# Allow us to run this specific workflow without a PR
workflow_dispatch:
pull_request:
push:
branches:
- main

# cancel redundant builds
concurrency:
# for push and workflow_dispatch events we use `github.sha` in the concurrency group and don't really cancel each other out/limit concurrency
# for pull_request events newer jobs cancel earlier jobs to save on CI etc.
group: ${{ github.workflow }}-${{ github.event_name }}-${{ (github.event_name == 'push' || github.event_name == 'workflow_dispatch') && github.sha || github.head_ref || github.ref }}
cancel-in-progress: true

env:
GIT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}

permissions:
contents: read
id-token: write #required for GCP Workload Identity federation which we use to login into Google Artifact Registry

jobs:
Build:
strategy:
matrix:
example: [python]
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: ./.github/actions/docker-setup
with:
GCP_SERVICE_ACCOUNT_EMAIL: ${{ secrets.GCP_SERVICE_ACCOUNT_EMAIL }}
GCP_WORKLOAD_IDENTITY_PROVIDER: ${{ secrets.GCP_WORKLOAD_IDENTITY_PROVIDER }}
- run: ./scripts/build-and-push-images.sh ${{ matrix.example }}
26 changes: 26 additions & 0 deletions .vscode/indexer-client-examples.code-workspace
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"folders": [
{
"name": "ROOT",
"path": "../"
},
{
"name": "typescript",
"path": "../typescript"
},
{
"name": "python",
"path": "../python"
}
],
"settings": {
"python.formatting.provider": "black",
"typescript.preferences.importModuleSpecifierEnding": "js",
"[typescript]": {
"editor.defaultFormatter": "esbenp.prettier-vscode"
}
},
"extensions": {
"recommendations": ["ms-python.python", "esbenp.prettier-vscode"]
}
}
12 changes: 6 additions & 6 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
chain-id: 1
indexer-endpoint: "34.30.218.153:50051"
x-aptos-data-authorization: "YOUR_TOKEN"
starting-version: 10000
tablename: "YOUR_TABLENAME"
cursor-filename: "cursor.txt"
chain_id: 1
indexer_endpoint: "34.30.218.153:50051"
indexer_api_key: "<INDEXER_API_KEY>"
starting_version: 10000
db_connection_uri: "postgresql://<your_connection_uri_to_postgres>"
cursor_filename: "cursor.txt"
9 changes: 4 additions & 5 deletions python/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@ FROM python:3.11
RUN pip install "poetry==1.4.2"

WORKDIR /app
COPY python/poetry.lock python/pyproject.toml /app/
COPY poetry.lock pyproject.toml /app/


# Project initialization
RUN poetry config virtualenvs.create false \
&& poetry install --only main

# Copy files and folders
COPY ./config.yaml /app/configs/config.yaml
COPY python/*.py /app/
COPY python/aptos /app/aptos
COPY *.py /app/
COPY /aptos /app/aptos

CMD ["poetry", "run", "python", "grpc_client.py", "--config", "configs/config.yaml"]
CMD ["poetry", "run", "python", "grpc_client.py", "--config", "/app/config/config.yaml"]
30 changes: 30 additions & 0 deletions python/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import yaml
from pydantic import BaseSettings
from pydantic.env_settings import SettingsSourceCallable


class Config(BaseSettings):
chain_id: int
indexer_endpoint: str
indexer_api_key: str
starting_version: int
db_connection_uri: str
cursor_filename: str

class Config:
# change order of priority of settings sources such that environment variables take precedence over config file settings
# inspired by https://docs.pydantic.dev/usage/settings/#changing-priority
@classmethod
def customise_sources(
cls,
init_settings: SettingsSourceCallable,
env_settings: SettingsSourceCallable,
file_secret_settings: SettingsSourceCallable,
) -> tuple[SettingsSourceCallable, ...]:
return env_settings, init_settings, file_secret_settings

@classmethod
def from_yaml_file(cls, path: str):
with open(path, "r") as file:
config = yaml.safe_load(file)
return cls(**config)
16 changes: 9 additions & 7 deletions python/create_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@
from sqlalchemy.orm import declarative_base

import argparse
import yaml

from config import Config

parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config', help='Path to config file', required=True)
parser.add_argument("-c", "--config", help="Path to config file", required=True)
args = parser.parse_args()

config = Config.from_yaml_file(args.config)

Base = declarative_base()


class Event(Base):
__tablename__ = 'events'
__tablename__ = "events"

sequence_number = Column(BigInteger, primary_key=True)
creation_number = Column(BigInteger, primary_key=True)
Expand All @@ -23,8 +27,6 @@ class Event(Base):
inserted_at = Column(DateTime)
event_index = Column(BigInteger)

with open(args.config, 'r') as file:
config = yaml.safe_load(file)

engine = create_engine(config['tablename'])
Base.metadata.create_all(engine)
engine = create_engine(config.db_connection_uri)
Base.metadata.create_all(engine)
19 changes: 19 additions & 0 deletions python/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
version: '3'
services:
index-processor:
build: .
environment:
DB_CONNECTION_URI: postgresql://postgres:postgres@db:5432/postgres
depends_on:
- db
volumes:
- ../config.yaml:/app/config/config.yaml
db:
image: postgres:15.2
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
volumes:
- db-data:/var/lib/postgresql/data
volumes:
db-data:
70 changes: 50 additions & 20 deletions python/grpc_client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from config import Config
from grpc_parser import parse
from aptos.datastream.v1 import datastream_pb2_grpc

Expand All @@ -11,39 +12,52 @@
import argparse
import base64
import datetime
import yaml
import json

parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config', help='Path to config file', required=True)
parser.add_argument("-c", "--config", help="Path to config file", required=True)
args = parser.parse_args()
config = Config.from_yaml_file(args.config)

with open(args.config, 'r') as file:
config = yaml.safe_load(file)
metadata = (("x-aptos-data-authorization", config.indexer_api_key),)
options = [("grpc.max_receive_message_length", -1)]
engine = create_engine(config.db_connection_uri)

metadata = (("x-aptos-data-authorization", config["x-aptos-data-authorization"]),)
options = [('grpc.max_receive_message_length', -1)]
engine = create_engine(config['tablename'])

with grpc.insecure_channel(config["indexer-endpoint"], options=options) as channel:
with grpc.insecure_channel(config.indexer_endpoint, options=options) as channel:
stub = datastream_pb2_grpc.IndexerStreamStub(channel)
current_transaction_version = config["starting-version"]
current_transaction_version = config.starting_version

for response in stub.RawDatastream(datastream_pb2.RawDatastreamRequest(starting_version=config["starting-version"]), metadata=metadata):
for response in stub.RawDatastream(
datastream_pb2.RawDatastreamRequest(starting_version=config.starting_version),
metadata=metadata,
):
chain_id = response.chain_id

if chain_id != config["chain-id"]:
raise Exception("Chain ID mismatch. Expected chain ID is: " + str(config["chain-id"]) + ", but received chain ID is: " + str(chain_id))

if chain_id != config.chain_id:
raise Exception(
"Chain ID mismatch. Expected chain ID is: "
+ str(config.chain_id)
+ ", but received chain ID is: "
+ str(chain_id)
)

transactions_output = response.data
for transaction_output in transactions_output.transactions:
# Decode transaction data
decoded_transaction = base64.b64decode(transaction_output.encoded_proto_data)
decoded_transaction = base64.b64decode(
transaction_output.encoded_proto_data
)
transaction = transaction_pb2.Transaction()
transaction.ParseFromString(decoded_transaction)

transaction_version = transaction.version
if transaction_version != current_transaction_version:
raise Exception("Transaction version mismatch. Expected transaction version is: " + str(current_transaction_version) + ", but received transaction version is: " + str(transaction_version))
raise Exception(
"Transaction version mismatch. Expected transaction version is: "
+ str(current_transaction_version)
+ ", but received transaction version is: "
+ str(transaction_version)
)

current_transaction_version += 1
parsed_objs = parse(transaction)
Expand All @@ -53,8 +67,24 @@
with Session(engine) as session, session.begin():
session.add_all(parsed_objs)

if (current_transaction_version % 1000) == 0:
print(
json.dumps(
{
"message": "Successfully processed transaction",
"last_success_transaction_version": current_transaction_version,
}
)
)
# Keep track of last successfully processed transaction version
cursor_file = open(config["cursor-filename"], 'w+')
cursor_file.write("last_success_transaction_version=" + str(current_transaction_version) + "\n")
cursor_file.write("last_updated=" + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'))
cursor_file.close()
cursor_file = open(config.cursor_filename, "w+")
cursor_file.write(
"last_success_transaction_version="
+ str(current_transaction_version)
+ "\n"
)
cursor_file.write(
"last_updated="
+ datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
)
cursor_file.close()
15 changes: 10 additions & 5 deletions python/grpc_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,21 @@
from create_table import Event
import datetime


def parse(transaction: transaction_pb2.Transaction):
# Custom filtering
# Here we filter out all transactions that are not of type TRANSACTION_TYPE_USER
if transaction.type != transaction_pb2.Transaction.TRANSACTION_TYPE_USER:
return

# Parse Transaction struct
transaction_version = transaction.version
transaction_block_height = transaction.block_height
inserted_at = parse_timestamp(transaction.timestamp)
user_transaction = transaction.user

# Parse Event struct
event_db_objs = []
event_db_objs: list[Event] = []
for event_index, event in enumerate(user_transaction.events):
creation_number = event.key.creation_number
sequence_number = event.sequence_number
Expand All @@ -37,12 +38,16 @@ def parse(transaction: transaction_pb2.Transaction):
event_index=event_index,
)
event_db_objs.append(event_db_obj)

return event_db_objs


def parse_timestamp(timestamp: timestamp_pb2.Timestamp):
datetime_obj = datetime.datetime.fromtimestamp(timestamp.seconds + timestamp.nanos * 1e-9)
return datetime_obj.strftime('%Y-%m-%d %H:%M:%S.%f')
datetime_obj = datetime.datetime.fromtimestamp(
timestamp.seconds + timestamp.nanos * 1e-9
)
return datetime_obj.strftime("%Y-%m-%d %H:%M:%S.%f")


def standardize_address(address: str):
return "0x" + address
Loading

0 comments on commit 0c22d9d

Please sign in to comment.