Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Futures preview (async/await) #75

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ edition = "2018"

[dependencies]
winapi = { version = "0.3", features = ["winnt", "combaseapi", "oleauto", "roapi", "roerrorapi", "hstring", "winstring", "winerror", "restrictederrorinfo"] }
futures-preview = { version = "0.3.0-alpha.16", features = ["async-await", "nightly"] }
futures-native-timers = { path = "../futures-native-timers" }

[features]
nightly = []
Expand Down
125 changes: 125 additions & 0 deletions examples/async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#![feature(async_await)]

extern crate winapi;
extern crate winrt;
extern crate futures;

use std::time::Duration;

use futures::executor::block_on;
use futures::prelude::*;
use futures::select;

use futures_native_timers::Interval;

use winrt::*;
use winrt::windows::foundation::*;
use winrt::windows::devices::enumeration::*;
use winrt::windows::devices::midi::*;
use winrt::windows::media::*;
use winrt::windows::storage::StorageFile;

fn main() {
block_on(run());
}

async fn run() {

let device_selector = MidiOutPort::get_device_selector().unwrap();
println!("{}", device_selector);

// let async_op = DeviceInformation::find_all_async().unwrap();

// println!("CLS: {}", async_op.get_runtime_class_name());

// let asi = async_op.query_interface::<IAsyncInfo>().unwrap();
// println!("IAsyncInfo: {:p}, Iasync_operation: {:p}", asi, async_op);

// let unknown = async_op.query_interface::<IUnknown>().unwrap();
// println!("IAsyncInfo: {:p}, Iasync_operation: {:p}, IUnknown: {:p}", asi, async_op, unknown);

// let unknown = asi.query_interface::<IUnknown>().unwrap();
// println!("IAsyncInfo: {:p}, Iasync_operation: {:p}, IUnknown: {:p}", asi, async_op, unknown);

// let id = asi.get_id().unwrap();
// println!("id: {:?}", id);
// let status = asi.get_status().unwrap();
// println!("status: {:?}", status);

let mut interval = Interval::new(Duration::from_millis(100));
let mut async_op = DeviceInformation::find_all_async().unwrap().fuse();

let work = async {
let result;
loop {
select! {
_ = interval.next() => {
use std::io::Write;
print!(".");
std::io::stdout().flush().unwrap();
},
res = async_op => {
result = res;
println!("");
break;
}
};
}
result
};

let device_information_collection = work.await.unwrap().unwrap();
println!("CLS: {}", device_information_collection.get_runtime_class_name());
let count = device_information_collection.get_size().unwrap();
println!("Device Count: {}", count);

for (i, current) in device_information_collection.into_iter().enumerate().take(10) {
let current = current.expect("current was null");
let device_name = current.get_name().unwrap();
println!("Device Name ({}): {}", i, device_name);
}

print!("Transcoding media: ");
let source = StorageFile::get_file_from_path_async(&*FastHString::new("D:\\Desktop\\test-transcode\\test.mp3")).unwrap().await.expect("get_file_from_path_async failed").unwrap();
let dest = StorageFile::get_file_from_path_async(&*FastHString::new("D:\\Desktop\\test-transcode\\test.flac")).unwrap().await.expect("get_file_from_path_async failed").unwrap();
let profile = mediaproperties::MediaEncodingProfile::create_flac(mediaproperties::AudioEncodingQuality::Medium).unwrap().unwrap();
let transcoder = transcoding::MediaTranscoder::new();
let prepared = transcoder.prepare_file_transcode_async(&source, &dest, &profile).unwrap().await.unwrap().unwrap();
assert!(prepared.get_can_transcode().unwrap());

let mut interval = Interval::new(Duration::from_millis(100));
let async_op = prepared.transcode_async().unwrap();
// TODO: there should be a way to await the created AsyncActionProgressHandler (and any other delegate) as an async stream
async_op.set_progress(&AsyncActionProgressHandler::new(|_info, progress| {
print_progress(progress);
Ok(())
})).unwrap();
let mut async_op = async_op.fuse();

let work = async {
let result;
loop {
select! {
_ = interval.next() => {
use std::io::Write;
print!(".");
std::io::stdout().flush().unwrap();
},
res = async_op => {
result = res;
println!("");
break;
}
};
}
result
};

work.await
}

fn print_progress(progress: f64) {
use std::io::Write;
print!("{:.0}%", progress);
std::io::stdout().flush().unwrap();
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#![allow(dead_code,non_upper_case_globals,non_snake_case)]

extern crate winapi as w;
extern crate futures;

mod guid;
pub use guid::Guid;
Expand Down
74 changes: 71 additions & 3 deletions src/rt/async_util.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
use futures::future::Future;
use std::pin::Pin;
use std::ops::Deref;
use std::task::{Context, Poll};
use std::sync::{Arc, Mutex, Condvar};

use crate::{
Expand Down Expand Up @@ -75,8 +79,7 @@ macro_rules! impl_blocking_wait {
}
}

impl RtAsyncAction for IAsyncAction
{
impl RtAsyncAction for IAsyncAction {
impl_blocking_wait!{ AsyncActionCompletedHandler }
}

Expand Down Expand Up @@ -118,4 +121,69 @@ impl<T: RtType + 'static, P: RtType + 'static> RtAsyncOperation for IAsyncOperat
fn get_results(&self) -> Result<Self::TResult> {
self.get_results()
}
}
}


macro_rules! impl_poll {
($handler:ident => $retexpr:tt) => {
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let info = crate::comptr::query_interface::<_, IAsyncInfo>(self.deref().deref()).expect("query_interface failed");
let status = info.get_status().expect("get_status failed");
match status {
crate::windows::foundation::AsyncStatus::Completed => {
Poll::Ready($retexpr(self))
},
crate::windows::foundation::AsyncStatus::Started => {
// Calling poll multiple times must work correctly, so we have to check that we didn't already install the Completed handler
if self.get_completed().expect("get_completed failed").is_some() {
// TODO: We might have to check that the installed handler is actually
// the one with the waker (because the user could have installed one independently).
// Or we document that the user is not allowed to do that.
return Poll::Pending;
}
let waker = cx.waker().clone();

let handler = $handler::new(move |_op, _status| {
waker.wake_by_ref();
Ok(())
});

self.set_completed(&handler).expect("set_completed failed");
Poll::Pending
}
_ => unimplemented!() // FIXME
}
}
}
}


impl Future for crate::ComPtr<IAsyncAction> {
type Output = ();

impl_poll!{ AsyncActionCompletedHandler => { |_: Pin<&mut Self>| () } }
}

impl<P: RtType + 'static> Future for crate::ComPtr<IAsyncActionWithProgress<P>>
where AsyncActionWithProgressCompletedHandler<P>: ComIid
{
type Output = ();

impl_poll!{ AsyncActionWithProgressCompletedHandler => { |_: Pin<&mut Self>| () } }
}

impl<T: RtType + 'static> Future for crate::ComPtr<IAsyncOperation<T>>
where AsyncOperationCompletedHandler<T>: ComIid
{
type Output = Result<<T as RtType>::Out>;

impl_poll!{ AsyncOperationCompletedHandler => { |s: Pin<&mut Self>| s.get_results() } }
}

impl<T: RtType + 'static, P: RtType + 'static> Future for crate::ComPtr<IAsyncOperationWithProgress<T, P>>
where AsyncOperationWithProgressCompletedHandler<T, P>: ComIid
{
type Output = Result<<T as RtType>::Out>;

impl_poll!{ AsyncOperationWithProgressCompletedHandler => { |s: Pin<&mut Self>| s.get_results() } }
}