Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.1.0-RC #174

Merged
merged 3 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
88 changes: 88 additions & 0 deletions AZURE_INSTALLATION.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
**Azure**
### Prerequisites:
* Log into your cloud environment in your terminal. Please see [Sign in with Azure CLI](https://learn.microsoft.com/en-us/cli/azure/authenticate-azure-cli) for reference.
```
az login
```
* Create a storage account and export the below variables in your terminal. Please see [Create a storage container](https://learn.microsoft.com/en-us/azure/storage/common/storage-account-create?toc=/azure/storage/blobs/toc.json) for reference. Export the below variables in your terminal session
```
export AZURE_TERRAFORM_BACKEND_RG=myregion
export AZURE_TERRAFORM_BACKEND_STORAGE_ACCOUNT=mystorage
export AZURE_TERRAFORM_BACKEND_CONTAINER=mycontainer
```
### Steps to install Obsrv:
* Execute the below commands in the same terminal session:
```
cd terraform/azure
```
* Pass the below environment variables when prompted and execute the below commands:
- Note: The below variable values are give for example
```
env = dev
building_block = obsrv
location = EAST US 2
terragrunt init
terragrunt apply -target module.aks -auto-approve
```

* Export kubeconfig file and kubeconfig file path
- The kubeconfig file is stored in current directory
```
export KUBECONFIG=<path_to_kubeconfig>
export KUBE_CONFIG_PATH=<path_to_kubeconfig>
```

* Execute the below commands in the same terminal session:
```
terragrunt apply -target module.unified_helm -auto-approve
kubectl get ingress superset -n superset
```
* Replace the ingress ip in terraform variables:
```
web_console_base_url
superset_base_url
```
* Execute the below commands in the same terminal session:
```
terragrunt apply -target module.unified_helm -auto-approve
```
### Deployment using helm (Discontinued):
```
cd terraform/modules/helm/unified_helm

- Get the storage account name, storage account key, storage account container from azure portal

helm upgrade --install obsrv . --namespace obsrv --create-namespace --set "global. azure_storage_account_name=<storage account name>" --set "global.azure_storage_account_key=<storage account key>" --set "global.azure_storage_container=<storage container>" –set “global.web_console_base_url=https://<ingress_ip>” –set “global.superset_base_url=https://<ingress_ip>” --atomic --timeout 1800s --debug

Get the ingress ip (kubectl get ingress superset -n superset)

helm upgrade --install obsrv . --namespace obsrv --create-namespace --set "global.azure_storage_account_name=<storage account name>" --set "global.azure_storage_account_key=<storage account key>" --set "global.azure_storage_container=<storage container>" –set “global.web_console_base_url=https://<ingress_ip>” –set “global.superset_base_url=https://<ingress_ip>” --atomic --timeout 1800s --debug
```
Note: Get the `storage account name`, `storage account key`, `storage account container` from portal here -
```
https://portal.azure.com/#@sanketika.in/resource/subscriptions/<subscription-id>/resourceGroups/<resource-group-name>
```
- Make a note of Resource Group created during the cluster creation. Usually it is a combination of `<building_block>-<env>`
- You can look for the logs for the statement like below to get the resource group
```
module.network.azurerm_resource_group.rg: Creation complete after 3s [id=/subscriptions/<uuid>/resourceGroups/<your-resource-group>]
```

### Steps to uninstall Obsrv:
* Execute the below commands:
```
helm uninstall obsrv -n obsrv
kubectl edit druid -n druid-raw
```
- In the YAML editor, locate lines 12-13.
- Delete any finalizers present in those lines
- Save the changes
```
terragrunt destroy -auto-approve
```
- Pass the following variables when prompted
```
env = dev
building_block = obsrv
location = EAST US 2
```
1 change: 1 addition & 0 deletions Dockerfiles/flink-connectors/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.gitignore
1 change: 1 addition & 0 deletions Dockerfiles/flink-connectors/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.tar.gz
18 changes: 18 additions & 0 deletions Dockerfiles/flink-connectors/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
FROM flink:1.17.2-scala_2.12-java11

USER root
RUN apt update -y
RUN apt install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev software-properties-common python3 python3-pip unzip vim

# RUN chown -R flink:flink /data/connectors-init
USER flink

RUN mkdir $FLINK_HOME/plugins/s3-fs-presto
RUN mkdir $FLINK_HOME/plugins/gs-fs-hadoop
RUN cd $FLINK_HOME/lib/ && curl -LO https://repo1.maven.org/maven2/org/apache/flink/flink-azure-fs-hadoop/1.17.2/flink-azure-fs-hadoop-1.17.2.jar
RUN cd $FLINK_HOME/plugins/s3-fs-presto && curl -LO https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-presto/1.17.2/flink-s3-fs-presto-1.17.2.jar
RUN cd $FLINK_HOME/plugins/gs-fs-hadoop && curl -LO https://repo1.maven.org/maven2/org/apache/flink/flink-gs-fs-hadoop/1.17.2/flink-gs-fs-hadoop-1.17.2.jar

COPY ./connectors /data/connectors-init
RUN pip install -r /data/connectors-init/requirements.txt
COPY kafka-connector-1.0.0-distribution.tar.gz /tmp/obsrv-connectors-extracted/
17 changes: 17 additions & 0 deletions Dockerfiles/flink-connectors/connectors/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import operator
import os
from functools import reduce

import yaml


class Config:
def __init__(self):
config_path = os.getenv("CONFIG_PATH", "/data/flink/connectors/connectors-init/")
conf_file = os.getenv("CONFIG_FILE", "connector-conf.yaml")
with open(os.path.join(config_path, conf_file)) as config_file:
self.config = yaml.safe_load(config_file)

def find(self, path):
element_value = reduce(operator.getitem, path.split("."), self.config)
return element_value
27 changes: 27 additions & 0 deletions Dockerfiles/flink-connectors/connectors/connector-conf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
postgres:
dbname: obsrv
user: postgres
password: postgres
host: postgresql-hl.postgresql.svc.cluster.local
port: 5432

kafka:
broker-servers: kafka-headless.kafka.svc.cluster.local:9092
telemetry-topic: obsrv-connectors-telemetry
connector-metrics-topic: obsrv-connectors-metrics
producer:
compression: snappy
max-request-size: 1000000 # 1MB {1M: 1000000, 10M: 10000000, 5M: 5000000}

obsrv_encryption_key: strong_encryption_key_to_encrypt

building-block: obsrv-connectors
env: local

dataset_api:
host: "http://dataset-api.dataset-api.svc.cluster.local:3000"
pre_signed_url: "v2/files/generate-url"

connectors:
extraction_path: "/tmp/obsrv-connectors-extracted"
storage_path: "/flink/connectors"
170 changes: 170 additions & 0 deletions Dockerfiles/flink-connectors/connectors/connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import tarfile
import zipfile
import os
import requests
import uuid
import shutil
import json

from config import Config
from db_service import DatabaseService

config = Config()
db_service = DatabaseService()

def main():
runtime = os.getenv("RUNTIME", get_runtime())
connector_id = os.getenv("CONNECTOR_ID", None)
download_path = config.find("connectors.extraction_path")
storage_path = config.find("connectors.storage_path")

if not os.path.exists(download_path):
os.makedirs(download_path)

if not os.path.exists(storage_path):
os.makedirs(storage_path)

query = f"""
SELECT cr.source_url, cr.source
FROM connector_registry cr
LEFT JOIN connector_instances ci on ci.connector_id = cr.id
WHERE (cr.status = 'Live' or ci.status = 'Live') AND cr.runtime = '{runtime}'
"""

if connector_id:
query += f"AND cr.id = '{connector_id}' "

# query += "GROUP BY cr.source_url;"

connectors = db_service.execute_select_all(query)

for connector in connectors:
source_url, source = connector
main_jar = source["main_program"]
source = source["source"]

print(f"Processing file with URL: {source_url} and Source: {source}")

if os.path.exists(f"{storage_path}/{source}"):
print(f"Connector Registry | Connector {source} already exists")
with open(f"{storage_path}/{source}/metadata.json") as f:
metadata = json.load(f)
install_python_requirements(metadata, storage_path, source)
continue

if os.path.exists(f"{download_path}/{source_url}"):
print(f"Connector Registry | Connector {source_url} already downloaded")
else:
download_status = download_file(source_url, f"{download_path}/{source_url}")
if not download_status:
print(f"Connector Registry | Error occurred while downloading {source}")
exit(1)

# ext_path = f"{download_path}/{uuid.uuid4()}"
# ext_path = f"{storage_path}/{source}"

ExtractionUtil.extract(f"{download_path}/{source_url}", storage_path, source.split('.')[-1])
print(f"Connector Registry | Connector made available in storage path {storage_path}/{source}")

# shutil.copy(f"{storage_path}/{source}/{main_jar}", f"/opt/flink/lib/{main_jar}")
# print(f"Connector Registry | Jar copied to flink classpath /opt/flink/lib/")

# load metadata and install python packages
with open(f"{storage_path}/{source}/metadata.json") as f:
metadata = json.load(f)
install_python_requirements(metadata, storage_path, source)

def install_python_requirements(metadata, storage_path, source):
if metadata.get("metadata", {}).get("technology", "") == "python":
print(f"installing Python requirements for {storage_path}/{source}")
os.system(f"pip install -r {storage_path}/{source}/requirements.txt")

def get_runtime():
if os.path.exists("/opt/bitnami/spark"):
return "spark"

if os.path.exists("/opt/flink"):
return "flink"


# Method to download the file from blob store
def download_file(rel_path, destination) -> bool:
try:
# get pre-signed URL from dataset-api
dataset_api_host = config.find("dataset_api.host").strip("/")
pre_signed_endpoint = config.find("dataset_api.pre_signed_url").strip("/")

dataset_api_request = json.dumps({"request": {"files": [rel_path], "access": "read", "type": "connector"}})
dataset_api_response = requests.post(f"{dataset_api_host}/{pre_signed_endpoint}", data=dataset_api_request, headers={"Content-Type": "application/json"})

if dataset_api_response.status_code != 200:
print(f"Connector Registry | Error occurred while fetching pre-signed URL for {rel_path}: {dataset_api_response.text}")
return False

dataset_api_response_json = dataset_api_response.json()

url = dataset_api_response_json.get("result", [{}])[0].get(
"preSignedUrl", None
)

if not url:
print(f"Connector Registry | Pre-signed URL not found for {rel_path}")
return False

response = requests.get(url, stream=True)
response.raise_for_status()

with open(destination, 'wb') as file:
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)

print(f"Connector Registry | Download completed successfully. URL:{rel_path} Destination: {destination}")
return True
except requests.exceptions.HTTPError as http_err:
print(f"Connector Registry | HTTP error occurred during the file download: {http_err}")
return False
except Exception as e:
print(f"Connector Registry | An unexpected error occurred during the file download: {e}")
return False


class ExtractionUtil:
def extract_gz(tar_path, extract_path):
with tarfile.open(tar_path, "r:*") as tar:
tar.extractall(path=extract_path)

def extract_zip(tar_path, extract_path):
with zipfile.ZipFile(tar_path, "r") as zip_ref:
zip_ref.extractall(path=extract_path)

# Method to extract the compressed files
def extract(file, extract_out_path, ext) -> bool:
extraction_function = ExtractionUtil.extract_gz

compression_types = {
"zip": ExtractionUtil.extract_zip,
}

try:
print(
f"Connector Registry | Extracting {file} to {extract_out_path} of {ext} file type"
)

if ext in compression_types:
extraction_function = compression_types.get(ext)

extraction_function(file, extract_out_path)
print(f"Connector Registry | Extraction complete for {file}")
return True
except (tarfile.TarError, zipfile.BadZipFile, OSError) as e:
print(
f"Connector Registry | An error occurred while extracting the file: {e}"
)
return False
except Exception as e:
print(f"Connector Registry | An unexpected error occurred: {e}")
return False


if __name__ == "__main__":
main()
50 changes: 50 additions & 0 deletions Dockerfiles/flink-connectors/connectors/db_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import psycopg2
import psycopg2.extras

from config import Config


class DatabaseService:

def __init__(self):
self.config = Config()

def connect(self):
db_host = self.config.find("postgres.host")
db_port = self.config.find("postgres.port")
db_user = self.config.find("postgres.user")
db_password = self.config.find("postgres.password")
database = self.config.find("postgres.dbname")
db_connection = psycopg2.connect(
database=database,
host=db_host,
port=db_port,
user=db_user,
password=db_password,
)
db_connection.autocommit = True
return db_connection

def execute_select_one(self, sql):
db_connection = self.connect()
cursor = db_connection.cursor(cursor_factory=psycopg2.extras.DictCursor)
cursor.execute(sql)
result = cursor.fetchone()
db_connection.close()
return result

def execute_select_all(self, sql):
db_connection = self.connect()
cursor = db_connection.cursor(cursor_factory=psycopg2.extras.DictCursor)
cursor.execute(sql)
result = cursor.fetchall()
db_connection.close()
return result

def execute_upsert(self, sql):
db_connection = self.connect()
cursor = db_connection.cursor(cursor_factory=psycopg2.extras.DictCursor)
cursor.execute(sql)
record_count = cursor.rowcount
db_connection.close()
return record_count
5 changes: 5 additions & 0 deletions Dockerfiles/flink-connectors/connectors/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
backoff
dataclasses-json
psycopg2-binary
requests
pyyaml
7 changes: 7 additions & 0 deletions Dockerfiles/flink/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM flink:1.17.2-scala_2.12-java11

RUN mkdir $FLINK_HOME/plugins/s3-fs-presto
RUN mkdir $FLINK_HOME/plugins/gs-fs-hadoop
RUN cd $FLINK_HOME/lib/ && curl -LO https://repo1.maven.org/maven2/org/apache/flink/flink-azure-fs-hadoop/1.17.2/flink-azure-fs-hadoop-1.17.2.jar
RUN cd $FLINK_HOME/plugins/s3-fs-presto && curl -LO https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-presto/1.17.2/flink-s3-fs-presto-1.17.2.jar
RUN cd $FLINK_HOME/plugins/gs-fs-hadoop && curl -LO https://repo1.maven.org/maven2/org/apache/flink/flink-gs-fs-hadoop/1.17.2/flink-gs-fs-hadoop-1.17.2.jar
Loading