-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Dagster Data pipeline #798
Changes from 3 commits
a93d3c6
fc2c0e0
8faf236
1e677de
a4130bf
6434390
3e84a89
29d4796
efc886c
0c65b89
c550e54
96ab64e
c68811a
aee55ac
ad2eedb
6a8503b
b5a2d39
863e63a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
# tabby_data_pipeline | ||
|
||
This is a [Dagster](https://dagster.io/) project scaffolded with [`dagster project scaffold`](https://docs.dagster.io/getting-started/create-new-project). | ||
|
||
## Getting started | ||
|
||
First, install your Dagster code location as a Python package. By using the --editable flag, pip will install your Python package in ["editable mode"](https://pip.pypa.io/en/latest/topics/local-project-installs/#editable-installs) so that as you develop, local code changes will automatically apply. | ||
|
||
```bash | ||
pip install -e ".[dev]" | ||
``` | ||
|
||
Then, start the Dagster UI web server: | ||
|
||
```bash | ||
dagster dev | ||
``` | ||
|
||
Open http://localhost:3000 with your browser to see the project. | ||
|
||
You can start writing assets in `tabby_data_pipeline/assets.py`. The assets are automatically loaded into the Dagster code location as you define them. | ||
|
||
## Development | ||
|
||
|
||
### Adding new Python dependencies | ||
|
||
You can specify new Python dependencies in `setup.py`. | ||
|
||
### Unit testing | ||
|
||
Tests are in the `tabby_data_pipeline_tests` directory and you can run tests using `pytest`: | ||
|
||
```bash | ||
pytest tabby_data_pipeline_tests | ||
``` | ||
|
||
### Schedules and sensors | ||
|
||
If you want to enable Dagster [Schedules](https://docs.dagster.io/concepts/partitions-schedules-sensors/schedules) or [Sensors](https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors) for your jobs, the [Dagster Daemon](https://docs.dagster.io/deployment/dagster-daemon) process must be running. This is done automatically when you run `dagster dev`. | ||
|
||
Once your Dagster Daemon is running, you can start turning on schedules and sensors for your jobs. | ||
|
||
## Deploy on Dagster Cloud | ||
|
||
The easiest way to deploy your Dagster project is to use Dagster Cloud. | ||
|
||
Check out the [Dagster Cloud Documentation](https://docs.dagster.cloud) to learn more. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
model: TabbyML/StarCoder-1B; language: python; file: line_completion.jsonlSkipped 0 rows, 10 rows with predictions, 0 rows with errors | ||
|
||
model: TabbyML/StarCoder-1B; language: python; file: line_completion_rg1_bm25.jsonlSkipped 0 rows, 10 rows with predictions, 0 rows with errors | ||
|
||
model: TabbyML/StarCoder-1B; language: python; file: line_completion_oracle_bm25.jsonlSkipped 0 rows, 10 rows with predictions, 0 rows with errors | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,219 @@ | ||||||
from pathlib import Path | ||||||
|
||||||
import modal | ||||||
from modal import Image, Mount, Secret, Stub, asgi_app, gpu, method | ||||||
import os | ||||||
|
||||||
|
||||||
import asyncio | ||||||
from collections import namedtuple | ||||||
from datetime import datetime | ||||||
|
||||||
GPU_CONFIG = gpu.A10G() | ||||||
|
||||||
MODEL_ID = os.environ.get("MODEL_ID") | ||||||
LAUNCH_FLAGS = ["serve", "--model", MODEL_ID, "--port", "8000", "--device", "cuda"] | ||||||
|
||||||
|
||||||
def download_model(): | ||||||
import subprocess | ||||||
import os | ||||||
MODEL_ID = os.environ.get("MODEL_ID") | ||||||
print(f'MODEL_ID={MODEL_ID}') | ||||||
subprocess.run( | ||||||
[ | ||||||
"/opt/tabby/bin/tabby", | ||||||
"download", | ||||||
"--model", | ||||||
MODEL_ID, | ||||||
] | ||||||
) | ||||||
|
||||||
|
||||||
image = ( | ||||||
Image.from_registry( | ||||||
"tabbyml/tabby:0.5.5", | ||||||
add_python="3.11", | ||||||
) | ||||||
.env({"MODEL_ID": os.environ.get("MODEL_ID")}) | ||||||
.dockerfile_commands("ENTRYPOINT []") | ||||||
.copy_local_dir(local_path='./modal/tabby_python_client/tabby_python_client', remote_path='/root/tabby_python_client') | ||||||
.pip_install( | ||||||
"httpx", | ||||||
"pandas" | ||||||
) | ||||||
.run_function(download_model) | ||||||
) | ||||||
|
||||||
stub = Stub("tabby-" + MODEL_ID.split("/")[-1], image=image) | ||||||
|
||||||
|
||||||
@stub.cls( | ||||||
gpu=GPU_CONFIG, | ||||||
concurrency_limit=10, | ||||||
allow_concurrent_inputs=2, | ||||||
container_idle_timeout=60 * 10, | ||||||
timeout=600, | ||||||
) | ||||||
class Model: | ||||||
def __enter__(self): | ||||||
import socket | ||||||
import subprocess, os | ||||||
import time | ||||||
|
||||||
from tabby_python_client import Client | ||||||
|
||||||
my_env = os.environ.copy() | ||||||
my_env["TABBY_DISABLE_USAGE_COLLECTION"] = "1" | ||||||
MODEL_ID = os.environ.get("MODEL_ID") | ||||||
print(f'MODEL_ID={MODEL_ID}') | ||||||
|
||||||
LAUNCH_FLAGS = ["serve", "--model", MODEL_ID, "--port", "8000", "--device", "cuda"] | ||||||
self.launcher = subprocess.Popen(["/opt/tabby/bin/tabby"] + LAUNCH_FLAGS, env=my_env) | ||||||
self.client = Client("http://127.0.0.1:8000", timeout=240) | ||||||
|
||||||
# Poll until webserver at 127.0.0.1:8000 accepts connections before running inputs. | ||||||
def webserver_ready(): | ||||||
try: | ||||||
socket.create_connection(("127.0.0.1", 8000), timeout=30).close() | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Line 91 already contains retry logic, no need to increase timeout |
||||||
return True | ||||||
except (socket.timeout, ConnectionRefusedError): | ||||||
# Check if launcher webserving process has exited. | ||||||
# If so, a connection can never be made. | ||||||
retcode = self.launcher.poll() | ||||||
if retcode is not None: | ||||||
raise RuntimeError( | ||||||
f"launcher exited unexpectedly with code {retcode}" | ||||||
) | ||||||
return False | ||||||
|
||||||
while not webserver_ready(): | ||||||
time.sleep(1.0) | ||||||
|
||||||
print("Tabby server ready!") | ||||||
|
||||||
def __exit__(self, _exc_type, _exc_value, _traceback): | ||||||
self.launcher.terminate() | ||||||
|
||||||
@method() | ||||||
async def health(self): | ||||||
from tabby_python_client.api.v1 import health | ||||||
|
||||||
resp = await health.asyncio(client=self.client) | ||||||
return resp.to_dict() | ||||||
|
||||||
@method() | ||||||
async def complete(self, language: str, index: int, prompt: str, prediction: bool): | ||||||
from tabby_python_client.api.v1 import completion | ||||||
from tabby_python_client.models import ( | ||||||
CompletionRequest, | ||||||
DebugOptions, | ||||||
CompletionResponse, | ||||||
Segments, | ||||||
) | ||||||
from tabby_python_client.types import Response | ||||||
from tabby_python_client import errors | ||||||
import pandas as pd | ||||||
|
||||||
# if prediction exists, just skip | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if prediction exists, you can simply don't call |
||||||
if prediction: | ||||||
return None, None, None | ||||||
|
||||||
|
||||||
request = CompletionRequest( | ||||||
language=language, debug_options=DebugOptions(raw_prompt=prompt) | ||||||
) | ||||||
# resp: CompletionResponse = await completion.asyncio( | ||||||
# client=self.client, json_body=request | ||||||
# ) | ||||||
try: | ||||||
resp: Response = await completion.asyncio_detailed( | ||||||
client=self.client, json_body=request | ||||||
) | ||||||
|
||||||
if resp.parsed != None: | ||||||
return index, resp.parsed.choices[0].text, None | ||||||
else: | ||||||
return index, None, f"<{resp.status_code}>" | ||||||
except errors.UnexpectedStatus as e: | ||||||
return index, None, f"error: code={e.status_code} content={e.content} error={e}" | ||||||
except Exception as e: | ||||||
return index, None, f"error type: {type(e)}" | ||||||
|
||||||
def write_log(log: str): | ||||||
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | ||||||
with open('./modal/log.txt', 'a') as f: | ||||||
f.write(f"{now} : {log}") | ||||||
f.write("\n") | ||||||
|
||||||
@stub.local_entrypoint() | ||||||
async def main(language: str): | ||||||
import json | ||||||
import pandas as pd | ||||||
|
||||||
|
||||||
print(MODEL_ID) | ||||||
|
||||||
model = Model() | ||||||
print("model info:") | ||||||
health_resp = model.health.remote() | ||||||
print(health_resp) | ||||||
assert(health_resp['model'] == MODEL_ID) | ||||||
|
||||||
|
||||||
for file in ['line_completion.jsonl', 'line_completion_rg1_bm25.jsonl', 'line_completion_oracle_bm25.jsonl']: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please extract function for this, e.g There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure what this meant? extract function for the "for" loop? or do you mean extract function for reading all the three files? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. either interface is fine, it's just good to split the main function here into smaller chunks for better readability |
||||||
|
||||||
whole_path_file = "./data/" + MODEL_ID.split("/")[-1] + "/" + language + "/" + file | ||||||
objs = [] | ||||||
with open(whole_path_file) as fin: | ||||||
for line in fin: | ||||||
obj = json.loads(line) | ||||||
if file == 'line_completion.jsonl': | ||||||
obj['raw_prompt'] = obj['prompt'] | ||||||
else: | ||||||
obj['raw_prompt'] = obj['crossfile_context']['text'] | ||||||
objs.append(obj) | ||||||
|
||||||
df = pd.DataFrame(objs) | ||||||
|
||||||
write_log(f"model: {MODEL_ID}; language: {language}; file: {file}: length = {len(df)}") | ||||||
|
||||||
def chunker(seq, size): | ||||||
return (seq[pos:pos + size] for pos in range(0, len(seq), size)) | ||||||
|
||||||
def get_prediction(row): | ||||||
if 'prediction' in row and not pd.isnull(row['prediction']): | ||||||
return True | ||||||
else: | ||||||
return False | ||||||
|
||||||
skipped = 0 | ||||||
success = 0 | ||||||
error = 0 | ||||||
|
||||||
for group in chunker(df, 30): | ||||||
outputs = await asyncio.gather(*[model.complete.remote.aio(language, index, row['raw_prompt'], get_prediction(row)) for index, row in group.iterrows()]) | ||||||
|
||||||
for index, prediction, error_msg in outputs: | ||||||
if index is None: | ||||||
skipped += 1 | ||||||
elif prediction is not None: | ||||||
df.loc[index, 'prediction'] = prediction | ||||||
success += 1 | ||||||
else: | ||||||
df.loc[index, 'error'] = error_msg | ||||||
error += 1 | ||||||
|
||||||
write_log(f"Skipped {skipped} rows, {success} rows with predictions, {error} rows with errors") | ||||||
|
||||||
whole_path_file = "./data/" + MODEL_ID.split("/")[-1] + "/" + language + "/" + file | ||||||
|
||||||
with open(whole_path_file, 'w') as fout: | ||||||
for index, row in df.iterrows(): | ||||||
row_dict = row.to_dict() | ||||||
json.dump(row_dict, fout) | ||||||
fout.write('\n') | ||||||
|
||||||
|
||||||
|
||||||
write_log(f"model: {MODEL_ID}; language: {language}; file: {file}: end!\n") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
__pycache__/ | ||
build/ | ||
dist/ | ||
*.egg-info/ | ||
.pytest_cache/ | ||
|
||
# pyenv | ||
.python-version | ||
|
||
# Environments | ||
.env | ||
.venv | ||
|
||
# mypy | ||
.mypy_cache/ | ||
.dmypy.json | ||
dmypy.json | ||
|
||
# JetBrains | ||
.idea/ | ||
|
||
/coverage.xml | ||
/.coverage |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
# tabby-python-client | ||
A client library for accessing Tabby Server | ||
|
||
## Usage | ||
First, create a client: | ||
|
||
```python | ||
from tabby_python_client import Client | ||
|
||
client = Client(base_url="https://api.example.com") | ||
``` | ||
|
||
If the endpoints you're going to hit require authentication, use `AuthenticatedClient` instead: | ||
|
||
```python | ||
from tabby_python_client import AuthenticatedClient | ||
|
||
client = AuthenticatedClient(base_url="https://api.example.com", token="SuperSecretToken") | ||
``` | ||
|
||
Now call your endpoint and use your models: | ||
|
||
```python | ||
from tabby_python_client.models import MyDataModel | ||
from tabby_python_client.api.my_tag import get_my_data_model | ||
from tabby_python_client.types import Response | ||
|
||
my_data: MyDataModel = get_my_data_model.sync(client=client) | ||
# or if you need more info (e.g. status_code) | ||
response: Response[MyDataModel] = get_my_data_model.sync_detailed(client=client) | ||
``` | ||
|
||
Or do the same thing with an async version: | ||
|
||
```python | ||
from tabby_python_client.models import MyDataModel | ||
from tabby_python_client.api.my_tag import get_my_data_model | ||
from tabby_python_client.types import Response | ||
|
||
my_data: MyDataModel = await get_my_data_model.asyncio(client=client) | ||
response: Response[MyDataModel] = await get_my_data_model.asyncio_detailed(client=client) | ||
``` | ||
|
||
By default, when you're calling an HTTPS API it will attempt to verify that SSL is working correctly. Using certificate verification is highly recommended most of the time, but sometimes you may need to authenticate to a server (especially an internal server) using a custom certificate bundle. | ||
|
||
```python | ||
client = AuthenticatedClient( | ||
base_url="https://internal_api.example.com", | ||
token="SuperSecretToken", | ||
verify_ssl="/path/to/certificate_bundle.pem", | ||
) | ||
``` | ||
|
||
You can also disable certificate validation altogether, but beware that **this is a security risk**. | ||
|
||
```python | ||
client = AuthenticatedClient( | ||
base_url="https://internal_api.example.com", | ||
token="SuperSecretToken", | ||
verify_ssl=False | ||
) | ||
``` | ||
|
||
There are more settings on the generated `Client` class which let you control more runtime behavior, check out the docstring on that class for more info. | ||
|
||
Things to know: | ||
1. Every path/method combo becomes a Python module with four functions: | ||
1. `sync`: Blocking request that returns parsed data (if successful) or `None` | ||
1. `sync_detailed`: Blocking request that always returns a `Request`, optionally with `parsed` set if the request was successful. | ||
1. `asyncio`: Like `sync` but async instead of blocking | ||
1. `asyncio_detailed`: Like `sync_detailed` but async instead of blocking | ||
|
||
1. All path/query params, and bodies become method arguments. | ||
1. If your endpoint had any tags on it, the first tag will be used as a module name for the function (my_tag above) | ||
1. Any endpoint which did not have a tag will be in `tabby_python_client.api.default` | ||
|
||
## Building / publishing this Client | ||
This project uses [Poetry](https://python-poetry.org/) to manage dependencies and packaging. Here are the basics: | ||
1. Update the metadata in pyproject.toml (e.g. authors, version) | ||
1. If you're using a private repository, configure it with Poetry | ||
1. `poetry config repositories.<your-repository-name> <url-to-your-repository>` | ||
1. `poetry config http-basic.<your-repository-name> <username> <password>` | ||
1. Publish the client with `poetry publish --build -r <your-repository-name>` or, if for public PyPI, just `poetry publish --build` | ||
|
||
If you want to install this client into another project without publishing it (e.g. for development) then: | ||
1. If that project **is using Poetry**, you can simply do `poetry add <path-to-this-client>` from that project | ||
1. If that project is not using Poetry: | ||
1. Build a wheel with `poetry build -f wheel` | ||
1. Install that wheel from the other project `pip install <path-to-wheel>` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line 91 already contains retry logic, no need to increase timeout