Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync upstream spec - all APIs implemented #228

Merged
merged 34 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
028fab8
types updated from spec
64bit Jun 3, 2024
9d6f723
fix compilation errors in examples
64bit Jun 3, 2024
33e7ace
add assistants streaming APIs
64bit Jun 4, 2024
5acdf05
updated readme
64bit Jun 4, 2024
d061551
updated readme
64bit Jun 4, 2024
9f68322
updates from spec
64bit Jun 4, 2024
0eced04
MessageContentInput for CreateMessageRequest
64bit Jun 4, 2024
05f2253
update message content types
64bit Jun 4, 2024
75b8cdf
updated spec
64bit Jun 4, 2024
8193996
update crate dependencies
64bit Jun 4, 2024
1bcec3d
update examples dependencies
64bit Jun 4, 2024
67f9802
cleanup types
64bit Jun 4, 2024
f417fba
add file search assistants example
64bit Jun 4, 2024
29e8e3a
cleanup
64bit Jun 4, 2024
b0f1ea0
updated types
64bit Jun 4, 2024
b5f7c0a
message delta types to have not have explicty 'type' field, instead i…
64bit Jun 4, 2024
776678c
retrieve file contents as Bytes instead of string
64bit Jun 4, 2024
1087e3e
fix types
64bit Jun 4, 2024
b7aafc5
add code interpreter assistant example
64bit Jun 4, 2024
15687b9
updated examples and their data files
64bit Jun 4, 2024
4ba6934
fix chat types for ser-de
64bit Jun 4, 2024
48ea7c6
update step types for ser-de
64bit Jun 4, 2024
19f9abd
assistant stream event non_exhaustive
64bit Jun 4, 2024
736e484
update lib doc
64bit Jun 4, 2024
31692be
support for assitant streaming
64bit Jun 4, 2024
ce21bc7
assistant function call with streaming example (partially complete)
64bit Jun 4, 2024
5a89e19
update type
64bit Jun 4, 2024
5bcdf92
working assistants-fun-call-stream
64bit Jun 4, 2024
ee90342
cargo test fix
64bit Jun 5, 2024
1c6d14b
update assistant example
64bit Jun 5, 2024
83d2817
cargo fix
64bit Jun 5, 2024
2f5ade5
cargo fmt
64bit Jun 5, 2024
a1ef6bc
better message
64bit Jun 5, 2024
356fe6c
fix
64bit Jun 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 20 additions & 17 deletions async-openai/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
[package]
name = "async-openai"
version = "0.21.0"
authors = [
"Himanshu Neema"
]
authors = ["Himanshu Neema"]
categories = ["api-bindings", "web-programming", "asynchronous"]
keywords = ["openai", "async", "openapi", "ai"]
description = "Rust library for OpenAI"
Expand All @@ -26,23 +24,28 @@ native-tls = ["reqwest/native-tls"]
native-tls-vendored = ["reqwest/native-tls-vendored"]

[dependencies]
backoff = {version = "0.4.0", features = ["tokio"] }
base64 = "0.22.0"
futures = "0.3.26"
backoff = { version = "0.4.0", features = ["tokio"] }
base64 = "0.22.1"
futures = "0.3.30"
rand = "0.8.5"
reqwest = { version = "0.12.0", features = ["json", "stream", "multipart"],default-features = false }
reqwest = { version = "0.12.4", features = [
"json",
"stream",
"multipart",
], default-features = false }
reqwest-eventsource = "0.6.0"
serde = { version = "1.0.152", features = ["derive", "rc"] }
serde_json = "1.0.93"
thiserror = "1.0.38"
tokio = { version = "1.25.0", features = ["fs", "macros"] }
tokio-stream = "0.1.11"
tokio-util = { version = "0.7.7", features = ["codec", "io-util"] }
tracing = "0.1.37"
serde = { version = "1.0.203", features = ["derive", "rc"] }
serde_json = "1.0.117"
thiserror = "1.0.61"
tokio = { version = "1.38.0", features = ["fs", "macros"] }
tokio-stream = "0.1.15"
tokio-util = { version = "0.7.11", features = ["codec", "io-util"] }
tracing = "0.1.40"
derive_builder = "0.20.0"
async-convert = "1.0.0"
secrecy = { version = "0.8.0", features=["serde"] }
bytes = "1.5.0"
secrecy = { version = "0.8.0", features = ["serde"] }
bytes = "1.6.0"
eventsource-stream = "0.2.3"

[dev-dependencies]
tokio-test = "0.4.2"
tokio-test = "0.4.4"
13 changes: 5 additions & 8 deletions async-openai/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@

- It's based on [OpenAI OpenAPI spec](https://github.com/openai/openai-openapi)
- Current features:
- [x] Assistants v2
- [ ] Assistants v2 streaming
- [x] Assistants (v2)
- [x] Audio
- [x] Batch
- [x] Chat
Expand All @@ -33,14 +32,12 @@
- [x] Files
- [x] Fine-Tuning
- [x] Images
- [x] Microsoft Azure OpenAI Service
- [x] Models
- [x] Moderations
- Support SSE streaming on available APIs
- All requests including form submissions (except SSE streaming) are retried with exponential backoff when [rate limited](https://platform.openai.com/docs/guides/rate-limits) by the API server.
- SSE streaming on all available APIs
- Requests (except SSE streaming) including form submissions are retried with exponential backoff when [rate limited](https://platform.openai.com/docs/guides/rate-limits).
- Ergonomic builder pattern for all request objects.

**Note on Azure OpenAI Service (AOS)**: `async-openai` primarily implements OpenAI spec, and doesn't try to maintain parity with spec of AOS.
- Microsoft Azure OpenAI Service (only APIs matching OpenAI spec)

## Usage

Expand Down Expand Up @@ -95,7 +92,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
Ok(())
}
```

`
<div align="center">
<img width="315" src="https://raw.githubusercontent.com/64bit/async-openai/assets/create-image/img-1.png" />
<img width="315" src="https://raw.githubusercontent.com/64bit/async-openai/assets/create-image/img-2.png" />
Expand Down
2 changes: 1 addition & 1 deletion async-openai/src/batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl<'c, C: Config> Batches<'c, C> {
self.client.get(&format!("/batches/{batch_id}")).await
}

/// Cancels an in-progress batch.
/// Cancels an in-progress batch. The batch will be in status `cancelling` for up to 10 minutes, before changing to `cancelled`, where it will have partial results (if any) available in the output file.
pub async fn cancel(&self, batch_id: &str) -> Result<Batch, OpenAIError> {
self.client
.post(
Expand Down
86 changes: 85 additions & 1 deletion async-openai/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,20 @@ impl<C: Config> Client<C> {
self.execute(request_maker).await
}

/// Make a GET request to {path} and return the response body
pub(crate) async fn get_raw(&self, path: &str) -> Result<Bytes, OpenAIError> {
let request_maker = || async {
Ok(self
.http_client
.get(self.config.url(path))
.query(&self.config.query())
.headers(self.config.headers())
.build()?)
};

self.execute_raw(request_maker).await
}

/// Make a POST request to {path} and return the response body
pub(crate) async fn post_raw<I>(&self, path: &str, request: I) -> Result<Bytes, OpenAIError>
where
Expand Down Expand Up @@ -369,8 +383,30 @@ impl<C: Config> Client<C> {
stream(event_source).await
}

pub(crate) async fn post_stream_mapped_raw_events<I, O>(
&self,
path: &str,
request: I,
event_mapper: impl Fn(eventsource_stream::Event) -> Result<O, OpenAIError> + Send + 'static,
) -> Pin<Box<dyn Stream<Item = Result<O, OpenAIError>> + Send>>
where
I: Serialize,
O: DeserializeOwned + std::marker::Send + 'static,
{
let event_source = self
.http_client
.post(self.config.url(path))
.query(&self.config.query())
.headers(self.config.headers())
.json(&request)
.eventsource()
.unwrap();

stream_mapped_raw_events(event_source, event_mapper).await
}

/// Make HTTP GET request to receive SSE
pub(crate) async fn get_stream<Q, O>(
pub(crate) async fn _get_stream<Q, O>(
&self,
path: &str,
query: &Q,
Expand Down Expand Up @@ -437,3 +473,51 @@ where

Box::pin(tokio_stream::wrappers::UnboundedReceiverStream::new(rx))
}

pub(crate) async fn stream_mapped_raw_events<O>(
mut event_source: EventSource,
event_mapper: impl Fn(eventsource_stream::Event) -> Result<O, OpenAIError> + Send + 'static,
) -> Pin<Box<dyn Stream<Item = Result<O, OpenAIError>> + Send>>
where
O: DeserializeOwned + std::marker::Send + 'static,
{
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();

tokio::spawn(async move {
while let Some(ev) = event_source.next().await {
match ev {
Err(e) => {
if let Err(_e) = tx.send(Err(OpenAIError::StreamError(e.to_string()))) {
// rx dropped
break;
}
}
Ok(event) => match event {
Event::Message(message) => {
let mut done = false;

if message.data == "[DONE]" {
done = true;
}

let response = event_mapper(message);

if let Err(_e) = tx.send(response) {
// rx dropped
break;
}

if done {
break;
}
}
Event::Open => continue,
},
}
}

event_source.close();
});

Box::pin(tokio_stream::wrappers::UnboundedReceiverStream::new(rx))
}
2 changes: 1 addition & 1 deletion async-openai/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub enum OpenAIError {
}

/// OpenAI API returns error object on failure
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
pub struct ApiError {
pub message: String,
pub r#type: Option<String>,
Expand Down
20 changes: 14 additions & 6 deletions async-openai/src/file.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use bytes::Bytes;
use serde::Serialize;

use crate::{
Expand All @@ -17,9 +18,13 @@ impl<'c, C: Config> Files<'c, C> {
Self { client }
}

/// Upload a file that can be used across various endpoints. The size of all the files uploaded by one organization can be up to 100 GB.
/// Upload a file that can be used across various endpoints. Individual files can be up to 512 MB, and the size of all files uploaded by one organization can be up to 100 GB.
///
/// The size of individual files can be a maximum of 512 MB or 2 million tokens for Assistants. See the [Assistants Tools guide](https://platform.openai.com/docs/assistants/tools) to learn more about the types of files supported. The Fine-tuning API only supports `.jsonl` files.
/// The Assistants API supports files up to 2 million tokens and of specific file types. See the [Assistants Tools guide](https://platform.openai.com/docs/assistants/tools) for details.
///
/// The Fine-tuning API only supports `.jsonl` files. The input also has certain required formats for fine-tuning [chat](https://platform.openai.com/docs/api-reference/fine-tuning/chat-input) or [completions](https://platform.openai.com/docs/api-reference/fine-tuning/completions-input) models.
///
///The Batch API only supports `.jsonl` files up to 100 MB in size. The input also has a specific required [format](https://platform.openai.com/docs/api-reference/batch/request-input).
///
/// Please [contact us](https://help.openai.com/) if you need to increase these storage limits.
pub async fn create(&self, request: CreateFileRequest) -> Result<OpenAIFile, OpenAIError> {
Expand Down Expand Up @@ -47,16 +52,19 @@ impl<'c, C: Config> Files<'c, C> {
}

/// Returns the contents of the specified file
pub async fn retrieve_content(&self, file_id: &str) -> Result<String, OpenAIError> {
pub async fn content(&self, file_id: &str) -> Result<Bytes, OpenAIError> {
self.client
.get(format!("/files/{file_id}/content").as_str())
.get_raw(format!("/files/{file_id}/content").as_str())
.await
}
}

#[cfg(test)]
mod tests {
use crate::{types::CreateFileRequestArgs, Client};
use crate::{
types::{CreateFileRequestArgs, FilePurpose},
Client,
};

#[tokio::test]
async fn test_file_mod() {
Expand All @@ -72,7 +80,7 @@ mod tests {

let request = CreateFileRequestArgs::default()
.file(test_file_path)
.purpose("fine-tune")
.purpose(FilePurpose::FineTune)
.build()
.unwrap();

Expand Down
4 changes: 2 additions & 2 deletions async-openai/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
//!
//! let client = Client::with_config(config);
//!
//! // Note that Azure OpenAI service does not support all APIs and `async-openai`
//! // doesn't restrict and still allows calls to all of the APIs as OpenAI.
//! // Note that `async-openai` only implements OpenAI spec
//! // and doesn't maintain parity with the spec of Azure OpenAI service.
//!
//! ```
//!
Expand Down
53 changes: 51 additions & 2 deletions async-openai/src/runs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use crate::{
error::OpenAIError,
steps::Steps,
types::{
CreateRunRequest, ListRunsResponse, ModifyRunRequest, RunObject,
SubmitToolOutputsRunRequest,
AssistantEventStream, AssistantStreamEvent, CreateRunRequest, ListRunsResponse,
ModifyRunRequest, RunObject, SubmitToolOutputsRunRequest,
},
Client,
};
Expand Down Expand Up @@ -39,6 +39,29 @@ impl<'c, C: Config> Runs<'c, C> {
.await
}

/// Create a run.
pub async fn create_stream(
&self,
mut request: CreateRunRequest,
) -> Result<AssistantEventStream, OpenAIError> {
if request.stream.is_some() && !request.stream.unwrap() {
return Err(OpenAIError::InvalidArgument(
"When stream is false, use Runs::create".into(),
));
}

request.stream = Some(true);

Ok(self
.client
.post_stream_mapped_raw_events(
&format!("/threads/{}/runs", self.thread_id),
request,
AssistantStreamEvent::try_from,
)
.await)
}

/// Retrieves a run.
pub async fn retrieve(&self, run_id: &str) -> Result<RunObject, OpenAIError> {
self.client
Expand Down Expand Up @@ -87,6 +110,32 @@ impl<'c, C: Config> Runs<'c, C> {
.await
}

pub async fn submit_tool_outputs_stream(
&self,
run_id: &str,
mut request: SubmitToolOutputsRunRequest,
) -> Result<AssistantEventStream, OpenAIError> {
if request.stream.is_some() && !request.stream.unwrap() {
return Err(OpenAIError::InvalidArgument(
"When stream is false, use Runs::submit_tool_outputs".into(),
));
}

request.stream = Some(true);

Ok(self
.client
.post_stream_mapped_raw_events(
&format!(
"/threads/{}/runs/{run_id}/submit_tool_outputs",
self.thread_id
),
request,
AssistantStreamEvent::try_from,
)
.await)
}

/// Cancels a run that is `in_progress`
pub async fn cancel(&self, run_id: &str) -> Result<RunObject, OpenAIError> {
self.client
Expand Down
23 changes: 21 additions & 2 deletions async-openai/src/threads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use crate::{
config::Config,
error::OpenAIError,
types::{
CreateThreadAndRunRequest, CreateThreadRequest, DeleteThreadResponse, ModifyThreadRequest,
RunObject, ThreadObject,
AssistantEventStream, AssistantStreamEvent, CreateThreadAndRunRequest, CreateThreadRequest,
DeleteThreadResponse, ModifyThreadRequest, RunObject, ThreadObject,
},
Client, Messages, Runs,
};
Expand Down Expand Up @@ -38,6 +38,25 @@ impl<'c, C: Config> Threads<'c, C> {
self.client.post("/threads/runs", request).await
}

/// Create a thread and run it in one request (streaming).
pub async fn create_and_run_stream(
&self,
mut request: CreateThreadAndRunRequest,
) -> Result<AssistantEventStream, OpenAIError> {
if request.stream.is_some() && !request.stream.unwrap() {
return Err(OpenAIError::InvalidArgument(
"When stream is false, use Threads::create_and_run".into(),
));
}

request.stream = Some(true);

Ok(self
.client
.post_stream_mapped_raw_events("/threads/runs", request, AssistantStreamEvent::try_from)
.await)
}

/// Create a thread.
pub async fn create(&self, request: CreateThreadRequest) -> Result<ThreadObject, OpenAIError> {
self.client.post("/threads", request).await
Expand Down
Loading