diff --git a/CHANGELOG.md b/CHANGELOG.md index fbe1d80670..95c1196dfb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,6 @@ Special thanks to external contributors for this release: @CharlyCst ([#347]). ### FEATURES -- Update Tokio to version 0.3 ([#402]) - Update to tendermint-rs version `0.17-RC3` ([#403]) - [changelog] Added "unreleased" section in `CHANGELOG.MD` to help streamline releases ([#274]) - [relayer] Integrate relayer spike into relayer crate ([#335]) @@ -48,7 +47,6 @@ Special thanks to external contributors for this release: @CharlyCst ([#347]). [#373]: https://github.com/informalsystems/ibc-rs/issues/373 [#374]: https://github.com/informalsystems/ibc-rs/issues/374 [#389]: https://github.com/informalsystems/ibc-rs/issues/389 -[#402]: https://github.com/informalsystems/ibc-rs/issues/402 [#403]: https://github.com/informalsystems/ibc-rs/issues/403 [proto-compiler]: https://github.com/informalsystems/ibc-rs/tree/master/proto-compiler diff --git a/Cargo.toml b/Cargo.toml index ae87f70e4a..0001bad76b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,18 +10,3 @@ members = [ exclude = [ "proto-compiler" ] - -[patch.crates-io] -tendermint = { git = "https://github.com/informalsystems/tendermint-rs", branch = "romac/prost-dev" } -tendermint-rpc = { git = "https://github.com/informalsystems/tendermint-rs", branch = "romac/prost-dev" } -tendermint-proto = { git = "https://github.com/informalsystems/tendermint-rs", branch = "romac/prost-dev" } -tendermint-light-client = { git = "https://github.com/informalsystems/tendermint-rs", branch = "romac/prost-dev" } - -bytes = { git = "https://github.com/tokio-rs/bytes", tag = "v0.6.0" } - -tonic = { git = "https://github.com/romac/tonic", branch = "romac/hyper-0.14-dev" } - -prost = { git = "https://github.com/danburkert/prost", branch = "master" } -prost-types = { git = "https://github.com/danburkert/prost", branch = "master" } -prost-build = { git = "https://github.com/danburkert/prost", branch = "master" } -prost-derive = { git = "https://github.com/danburkert/prost", branch = "master" } diff --git a/modules/Cargo.toml b/modules/Cargo.toml index 9efae2ed78..5aeee146a5 100644 --- a/modules/Cargo.toml +++ b/modules/Cargo.toml @@ -55,6 +55,6 @@ version = "0.17.0-rc2" optional = true [dev-dependencies] -tokio = { version = "0.3", features = ["macros"] } +tokio = { version = "0.2", features = ["macros"] } subtle-encoding = { version = "0.5" } tendermint-testgen = { version = "0.17.0-rc2" } # Needed for generating (synthetic) light blocks. diff --git a/relayer-cli/Cargo.toml b/relayer-cli/Cargo.toml index 2e81e3e4af..d16eaa9da2 100644 --- a/relayer-cli/Cargo.toml +++ b/relayer-cli/Cargo.toml @@ -13,7 +13,7 @@ anomaly = "0.2.0" gumdrop = "0.7" serde = { version = "1", features = ["serde_derive"] } thiserror = "1" -tokio = { version = "0.3", features = ["rt", "rt-multi-thread", "time", "stream", "sync"] } +tokio = { version = "0.2", features = ["full"] } tracing = "0.1.13" tracing-subscriber = "0.2.3" futures = "0.3.5" diff --git a/relayer-cli/src/commands/listen.rs b/relayer-cli/src/commands/listen.rs index 534305a34b..f70d305370 100644 --- a/relayer-cli/src/commands/listen.rs +++ b/relayer-cli/src/commands/listen.rs @@ -1,4 +1,8 @@ -use std::{ops::Deref, sync::Arc, thread}; +use std::{ + ops::Deref, + sync::{Arc, Mutex}, + thread, +}; use abscissa_core::{application::fatal_error, error::BoxError, Command, Options, Runnable}; use crossbeam_channel as channel; @@ -17,7 +21,7 @@ pub struct ListenCmd { impl ListenCmd { fn cmd(&self) -> Result<(), BoxError> { - let rt = Arc::new(TokioRuntime::new().unwrap()); + let rt = Arc::new(Mutex::new(TokioRuntime::new()?)); let config = app_config().clone(); let chain_id = self.chain_id.clone().unwrap(); @@ -39,7 +43,7 @@ impl Runnable for ListenCmd { } /// Listen to events -pub fn listen(rt: Arc, config: ChainConfig) -> Result<(), BoxError> { +pub fn listen(rt: Arc>, config: ChainConfig) -> Result<(), BoxError> { info!(chain.id = %config.id, "spawning event monitor for"); let (event_monitor, rx) = subscribe(config, rt)?; @@ -54,7 +58,7 @@ pub fn listen(rt: Arc, config: ChainConfig) -> Result<(), BoxError fn subscribe( chain_config: ChainConfig, - rt: Arc, + rt: Arc>, ) -> Result<(EventMonitor, channel::Receiver), BoxError> { let (mut event_monitor, rx) = EventMonitor::new(chain_config.id, chain_config.rpc_addr, rt) .map_err(|e| format!("couldn't initialize event monitor: {}", e))?; diff --git a/relayer-cli/src/commands/query/channel.rs b/relayer-cli/src/commands/query/channel.rs index 55cbe3bb62..1fb18eaa8e 100644 --- a/relayer-cli/src/commands/query/channel.rs +++ b/relayer-cli/src/commands/query/channel.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use abscissa_core::{Command, Options, Runnable}; use tokio::runtime::Runtime as TokioRuntime; @@ -98,7 +98,7 @@ impl Runnable for QueryChannelEndCmd { // run without proof: // cargo run --bin relayer -- -c relayer/tests/config/fixtures/simple_config.toml query channel end ibc-test firstport firstchannel --height 3 -p false - let rt = Arc::new(TokioRuntime::new().unwrap()); + let rt = Arc::new(Mutex::new(TokioRuntime::new().unwrap())); let chain = CosmosSDKChain::from_config(chain_config, rt).unwrap(); let height = ibc::Height::new(chain.id().version(), opts.height); diff --git a/relayer-cli/src/commands/query/client.rs b/relayer-cli/src/commands/query/client.rs index e59a6bc53e..aa960744b6 100644 --- a/relayer-cli/src/commands/query/client.rs +++ b/relayer-cli/src/commands/query/client.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use abscissa_core::{Command, Options, Runnable}; use tokio::runtime::Runtime as TokioRuntime; @@ -78,7 +78,7 @@ impl Runnable for QueryClientStateCmd { }; status_info!("Options", "{:?}", opts); - let rt = Arc::new(TokioRuntime::new().unwrap()); + let rt = Arc::new(Mutex::new(TokioRuntime::new().unwrap())); let chain = CosmosSDKChain::from_config(chain_config, rt).unwrap(); let height = ibc::Height::new(chain.id().version(), opts.height); @@ -171,7 +171,7 @@ impl Runnable for QueryClientConsensusCmd { }; status_info!("Options", "{:?}", opts); - let rt = Arc::new(TokioRuntime::new().unwrap()); + let rt = Arc::new(Mutex::new(TokioRuntime::new().unwrap())); let chain = CosmosSDKChain::from_config(chain_config, rt).unwrap(); let height = ibc::Height::new(chain.id().version(), opts.height); @@ -285,7 +285,7 @@ impl Runnable for QueryClientConnectionsCmd { }; status_info!("Options", "{:?}", opts); - let rt = Arc::new(TokioRuntime::new().unwrap()); + let rt = Arc::new(Mutex::new(TokioRuntime::new().unwrap())); let chain = CosmosSDKChain::from_config(chain_config, rt).unwrap(); let height = ibc::Height::new(chain.id().version(), opts.height); diff --git a/relayer-cli/src/commands/query/connection.rs b/relayer-cli/src/commands/query/connection.rs index 2e80e65781..bd65c844ee 100644 --- a/relayer-cli/src/commands/query/connection.rs +++ b/relayer-cli/src/commands/query/connection.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use abscissa_core::{Command, Options, Runnable}; use tokio::runtime::Runtime as TokioRuntime; @@ -82,7 +82,7 @@ impl Runnable for QueryConnectionEndCmd { }; status_info!("Options", "{:?}", opts); - let rt = Arc::new(TokioRuntime::new().unwrap()); + let rt = Arc::new(Mutex::new(TokioRuntime::new().unwrap())); let chain = CosmosSDKChain::from_config(chain_config, rt).unwrap(); let height = ibc::Height::new(chain.id().version(), opts.height); diff --git a/relayer-cli/src/commands/start.rs b/relayer-cli/src/commands/start.rs index 8b516c65a9..4ef5c4a6d1 100644 --- a/relayer-cli/src/commands/start.rs +++ b/relayer-cli/src/commands/start.rs @@ -21,7 +21,7 @@ impl StartCmd { impl Runnable for StartCmd { fn run(&self) { - let rt = tokio::runtime::Runtime::new().unwrap(); + let mut rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async move { self.cmd() diff --git a/relayer-cli/tests/integration.rs b/relayer-cli/tests/integration.rs index 1111f013d2..9c35ee4b82 100644 --- a/relayer-cli/tests/integration.rs +++ b/relayer-cli/tests/integration.rs @@ -11,8 +11,8 @@ unused_qualifications )] -use std::str::FromStr; use std::sync::Arc; +use std::{str::FromStr, sync::Mutex}; use ibc::ics02_client::raw::ConnectionIds as DomainTypeClientConnections; use ibc::ics04_channel::channel::{ChannelEnd, Order, State as ChannelState}; @@ -45,8 +45,8 @@ fn simd_config() -> Config { /// Chain created for the informaldev/simd DockerHub image running on localhost. fn simd_chain() -> CosmosSDKChain { - let rt = tokio::runtime::Runtime::new().unwrap(); - CosmosSDKChain::from_config(simd_config().chains[0].clone(), Arc::new(rt)).unwrap() + let rt = Arc::new(Mutex::new(tokio::runtime::Runtime::new().unwrap())); + CosmosSDKChain::from_config(simd_config().chains[0].clone(), rt).unwrap() } /// Query connection ID. Requires the informaldev/simd Docker image running on localhost. diff --git a/relayer/Cargo.toml b/relayer/Cargo.toml index bd7f743c90..d43c9b4b0c 100644 --- a/relayer/Cargo.toml +++ b/relayer/Cargo.toml @@ -19,7 +19,7 @@ sled = { version = "0.34.4", features = ["no_metrics", "no_logs"] } thiserror = "1.0.11" toml = "0.5" tracing = "0.1.13" -tokio = { version = "0.3", features = ["rt", "rt-multi-thread", "time", "stream", "sync"] } +tokio = { version = "0.2", features = ["rt-core", "time", "stream", "sync"] } serde_json = { version = "1" } bytes = "0.6.0" prost = "0.6.1" diff --git a/relayer/src/chain/cosmos.rs b/relayer/src/chain/cosmos.rs index f040b31151..1e5f7a1e92 100644 --- a/relayer/src/chain/cosmos.rs +++ b/relayer/src/chain/cosmos.rs @@ -1,6 +1,8 @@ +use std::convert::{TryFrom, TryInto}; +use std::future::Future; use std::str::FromStr; +use std::sync::{Arc, Mutex}; use std::time::Duration; -use std::{convert::TryFrom, convert::TryInto, sync::Arc}; use anomaly::fail; use bitcoin::hashes::hex::ToHex; @@ -55,12 +57,12 @@ use tonic::codegen::http::Uri; pub struct CosmosSDKChain { config: ChainConfig, rpc_client: HttpClient, - rt: Arc, + rt: Arc>, keybase: KeyRing, } impl CosmosSDKChain { - pub fn from_config(config: ChainConfig, rt: Arc) -> Result { + pub fn from_config(config: ChainConfig, rt: Arc>) -> Result { let primary = config .primary() .ok_or_else(|| Kind::LightClient.context("no primary peer specified"))?; @@ -87,18 +89,16 @@ impl CosmosSDKChain { Uri::from_str(&self.config().grpc_addr).map_err(|e| Kind::Grpc.context(e))?; let mut client = self - .rt .block_on( ibc_proto::cosmos::staking::v1beta1::query_client::QueryClient::connect(grpc_addr), - ) + )? .map_err(|e| Kind::Grpc.context(e))?; let request = tonic::Request::new(ibc_proto::cosmos::staking::v1beta1::QueryParamsRequest {}); let response = self - .rt - .block_on(client.params(request)) + .block_on(client.params(request))? .map_err(|e| Kind::Grpc.context(e))?; let res = response @@ -115,11 +115,15 @@ impl CosmosSDKChain { /// Specific to the SDK and used only for Tendermint client create pub fn query_consensus_params(&self) -> Result { Ok(self - .rt - .block_on(self.rpc_client().genesis()) + .block_on(self.rpc_client().genesis())? .map_err(|e| Kind::Rpc.context(e))? .consensus_params) } + + /// Run a future to completion on the Tokio runtime. + fn block_on(&self, f: F) -> Result { + Ok(self.rt.lock().map_err(|_| Kind::PoisonedMutex)?.block_on(f)) + } } impl Chain for CosmosSDKChain { @@ -150,8 +154,7 @@ impl Chain for CosmosSDKChain { } let response = - self.rt - .block_on(abci_query(&self, path, data.to_string(), height, prove))?; + self.block_on(abci_query(&self, path, data.to_string(), height, prove))??; // Verify response proof, if requested. if prove { @@ -191,8 +194,7 @@ impl Chain for CosmosSDKChain { }; let acct_response = self - .rt - .block_on(query_account(self, key.account)) + .block_on(query_account(self, key.account))? .map_err(|e| Kind::Grpc.context(e))?; let single = Single { mode: 1 }; @@ -250,8 +252,7 @@ impl Chain for CosmosSDKChain { prost::Message::encode(&tx_raw, &mut txraw_buf).unwrap(); let response = self - .rt - .block_on(broadcast_tx_commit(self, txraw_buf)) + .block_on(broadcast_tx_commit(self, txraw_buf))? .map_err(|e| Kind::Rpc.context(e))?; Ok(response) @@ -260,8 +261,7 @@ impl Chain for CosmosSDKChain { /// Query the latest height the chain is at via a RPC query fn query_latest_height(&self) -> Result { let status = self - .rt - .block_on(self.rpc_client().status()) + .block_on(self.rpc_client().status())? .map_err(|e| Kind::Rpc.context(e))?; if status.sync_info.catching_up { diff --git a/relayer/src/chain/runtime.rs b/relayer/src/chain/runtime.rs index 47dad8ea58..f3624bbf30 100644 --- a/relayer/src/chain/runtime.rs +++ b/relayer/src/chain/runtime.rs @@ -1,4 +1,7 @@ -use std::{sync::Arc, thread}; +use std::{ + sync::{Arc, Mutex}, + thread, +}; use crossbeam_channel as channel; @@ -58,7 +61,7 @@ pub struct ChainRuntime { light_client: Box>, #[allow(dead_code)] - rt: Arc, + rt: Arc>, } impl ChainRuntime { @@ -66,7 +69,7 @@ impl ChainRuntime { config: ChainConfig, light_client: TMLightClient, event_receiver: channel::Receiver, - rt: Arc, + rt: Arc>, ) -> Result { let chain = CosmosSDKChain::from_config(config, rt.clone())?; Ok(Self::new(chain, light_client, event_receiver, rt)) @@ -74,7 +77,9 @@ impl ChainRuntime { // TODO: Make this work for a generic Chain pub fn spawn(config: ChainConfig) -> Result<(impl ChainHandle, Threads), Error> { - let rt = Arc::new(TokioRuntime::new().map_err(|e| Kind::Io.context(e))?); + let rt = Arc::new(Mutex::new( + TokioRuntime::new().map_err(|e| Kind::Io.context(e))?, + )); // Initialize the light clients let (light_client, supervisor) = TMLightClient::from_config(&config, true)?; @@ -115,7 +120,7 @@ impl ChainRuntime { chain: C, light_client: impl LightClient + 'static, event_receiver: channel::Receiver, - rt: Arc, + rt: Arc>, ) -> Self { let (sender, receiver) = channel::unbounded::(); diff --git a/relayer/src/error.rs b/relayer/src/error.rs index e2aff29633..ef56b01ed0 100644 --- a/relayer/src/error.rs +++ b/relayer/src/error.rs @@ -18,6 +18,10 @@ pub enum Kind { #[error("I/O error")] Io, + /// Poisoned mutex + #[error("poisoned mutex")] + PoisonedMutex, + /// Invalid configuration #[error("Invalid configuration")] Config, diff --git a/relayer/src/event/monitor.rs b/relayer/src/event/monitor.rs index 6b21df2047..673d4eeab4 100644 --- a/relayer/src/event/monitor.rs +++ b/relayer/src/event/monitor.rs @@ -1,16 +1,17 @@ -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use anomaly::BoxError; use crossbeam_channel as channel; use futures::{stream::select_all, Stream}; use itertools::Itertools; +use tokio::runtime::Runtime as TokioRuntime; use tokio::stream::StreamExt; use tokio::task::JoinHandle; use tracing::{debug, error, info}; use ibc::{events::IBCEvent, ics24_host::identifier::ChainId}; -use tendermint::{block::Height, net, Error as TMError}; +use tendermint::{block::Height, net}; use tendermint_rpc::{query::EventType, query::Query, SubscriptionClient, WebSocketClient}; use crate::error::{Error, Kind}; @@ -43,7 +44,7 @@ pub struct EventMonitor { /// All subscriptions combined in a single stream subscriptions: Box, /// Tokio runtime - rt: Arc, + rt: Arc>, } impl EventMonitor { @@ -51,18 +52,24 @@ impl EventMonitor { pub fn new( chain_id: ChainId, rpc_addr: net::Address, - rt: Arc, + rt: Arc>, ) -> Result<(Self, channel::Receiver), Error> { let (tx, rx) = channel::unbounded(); let websocket_addr = rpc_addr.clone(); - let (websocket_client, websocket_driver) = rt.block_on(async move { - WebSocketClient::new(websocket_addr) - .await - .map_err(|e| Kind::Rpc.context(e)) - })?; - - let websocket_driver_handle = rt.spawn(websocket_driver.run()); + let (websocket_client, websocket_driver) = rt + .lock() + .map_err(|_| Kind::PoisonedMutex)? + .block_on(async move { + WebSocketClient::new(websocket_addr) + .await + .map_err(|e| Kind::Rpc.context(e)) + })?; + + let websocket_driver_handle = rt + .lock() + .map_err(|_| Kind::PoisonedMutex)? + .spawn(websocket_driver.run()); // TODO: move them to config file(?) let event_queries = vec![Query::from(EventType::Tx), Query::from(EventType::NewBlock)]; @@ -88,6 +95,8 @@ impl EventMonitor { for query in &self.event_queries { let subscription = self .rt + .lock() + .map_err(|_| Kind::PoisonedMutex)? .block_on(self.websocket_client.subscribe(query.clone()))?; subscriptions.push(subscription); @@ -98,17 +107,19 @@ impl EventMonitor { Ok(()) } - fn try_reconnect(&mut self) { + fn try_reconnect(&mut self) -> Result<(), BoxError> { // Try to reconnect let (mut websocket_client, websocket_driver) = self .rt - .block_on(WebSocketClient::new(self.node_addr.clone())) - .unwrap_or_else(|e| { - debug!("Error on reconnection: {}", e); - panic!("Abort on failed reconnection"); - }); + .lock() + .map_err(|_| Kind::PoisonedMutex)? + .block_on(WebSocketClient::new(self.node_addr.clone()))?; - let mut websocket_driver_handle = self.rt.spawn(websocket_driver.run()); + let mut websocket_driver_handle = self + .rt + .lock() + .map_err(|_| Kind::PoisonedMutex)? + .spawn(websocket_driver.run()); // Swap the new client with the previous one which failed, // so that we can shut the latter down gracefully. @@ -124,30 +135,28 @@ impl EventMonitor { debug!("Gracefully shutting down previous client"); self.rt + .lock() + .map_err(|_| Kind::PoisonedMutex)? .block_on(websocket_client.close()) - .unwrap_or_else(|e| { - error!("Failed to close previous WebSocket client: {}", e); - }); + .map_err(|e| format!("Failed to close previous WebSocket client: {}", e))?; self.rt + .lock() + .map_err(|_| Kind::PoisonedMutex)? .block_on(websocket_driver_handle) - .unwrap_or_else(|e| { - Err(tendermint_rpc::Error::client_internal_error(format!( + .map_err(|e| { + tendermint_rpc::Error::client_internal_error(format!( "failed to terminate previous WebSocket client driver: {}", e - ))) - }) - .unwrap_or_else(|e| { - error!("Previous WebSocket client driver failed with error: {}", e); - }); + )) + })??; + + Ok(()) } /// Try to resubscribe to events - fn try_resubscribe(&mut self) { - if let Err(err) = self.subscribe() { - debug!("Error on recreating subscriptions: {}", err); - panic!("Abort during reconnection"); - }; + fn try_resubscribe(&mut self) -> Result<(), BoxError> { + self.subscribe() } /// Event monitor loop @@ -161,18 +170,28 @@ impl EventMonitor { debug!("Web socket error: {}", err); // Try to reconnect - self.try_reconnect(); + self.try_reconnect().unwrap_or_else(|e| { + debug!("Error on reconnecting: {}", e); + panic!("Abort during reconnection"); + }); // Try to resubscribe - self.try_resubscribe(); + self.try_resubscribe().unwrap_or_else(|e| { + debug!("Error on reconnecting: {}", e); + panic!("Abort during reconnection"); + }); } } } } /// Collect the IBC events from the subscriptions - fn collect_events(&mut self) -> Result<(), TMError> { - let event = self.rt.block_on(self.subscriptions.next()); + fn collect_events(&mut self) -> Result<(), BoxError> { + let event = self + .rt + .lock() + .map_err(|_| Kind::PoisonedMutex)? + .block_on(self.subscriptions.next()); match event { Some(Ok(event)) => match ibc::events::get_all_events(event.clone()) { diff --git a/relayer/src/keys/add.rs b/relayer/src/keys/add.rs index 9339e08636..647621da19 100644 --- a/relayer/src/keys/add.rs +++ b/relayer/src/keys/add.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use tokio::runtime::Runtime as TokioRuntime; @@ -20,7 +20,7 @@ pub fn add_key(opts: KeysAddOptions) -> Result { let rt = TokioRuntime::new().unwrap(); // Get the destination chain - let chain = CosmosSDKChain::from_config(opts.clone().chain_config, Arc::new(rt))?; + let chain = CosmosSDKChain::from_config(opts.clone().chain_config, Arc::new(Mutex::new(rt)))?; let key_contents = fs::read_to_string(&opts.file) .map_err(|_| Kind::KeyBase.context("error reading the key file"))?; diff --git a/relayer/src/keys/list.rs b/relayer/src/keys/list.rs index 15512ef4fa..7a4da84b89 100644 --- a/relayer/src/keys/list.rs +++ b/relayer/src/keys/list.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use tokio::runtime::Runtime as TokioRuntime; @@ -16,7 +16,7 @@ pub fn list_keys(opts: KeysListOptions) -> Result { let rt = TokioRuntime::new().unwrap(); // Get the destination chain - let chain = CosmosSDKChain::from_config(opts.chain_config, Arc::new(rt))?; + let chain = CosmosSDKChain::from_config(opts.chain_config, Arc::new(Mutex::new(rt)))?; let key_entry = chain.keybase().get_key(); diff --git a/relayer/src/util/block_on.rs b/relayer/src/util/block_on.rs index 93459ece34..aed8c9f98e 100644 --- a/relayer/src/util/block_on.rs +++ b/relayer/src/util/block_on.rs @@ -4,7 +4,8 @@ use futures::Future; /// Spawns a new tokio runtime and use it to block on the given future. pub fn block_on(future: F) -> F::Output { - tokio::runtime::Builder::new_current_thread() + tokio::runtime::Builder::new() + .basic_scheduler() .enable_all() .build() .unwrap()