From 70f6125a3d0fb42e89d4fac0a8b933d06d8d902e Mon Sep 17 00:00:00 2001
From: Nikhil Benesch <nikhil.benesch@gmail.com>
Date: Tue, 23 Apr 2019 15:27:17 -0400
Subject: [PATCH 1/3] Add an ErrBuf helper to manage APIs with error strings

---
 src/client.rs |  9 ++++-----
 src/config.rs | 14 +++++---------
 src/util.rs   | 30 ++++++++++++++++++++++++++++++
 3 files changed, 39 insertions(+), 14 deletions(-)

diff --git a/src/client.rs b/src/client.rs
index 030513648..ac465930a 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -18,7 +18,7 @@ use crate::error::{IsError, KafkaError, KafkaResult};
 use crate::groups::GroupList;
 use crate::metadata::Metadata;
 use crate::statistics::Statistics;
-use crate::util::{bytes_cstr_to_owned, timeout_to_ms};
+use crate::util::{ErrBuf, timeout_to_ms};
 
 /// Client-level context
 ///
@@ -116,7 +116,7 @@ impl<C: ClientContext> Client<C> {
     pub fn new(config: &ClientConfig, native_config: NativeClientConfig, rd_kafka_type: RDKafkaType,
                context: C)
             -> KafkaResult<Client<C>> {
-        let errstr = [0i8; 1024];
+        let mut err_buf = ErrBuf::new();
         let mut boxed_context = Box::new(context);
         unsafe { rdsys::rd_kafka_conf_set_opaque(native_config.ptr(), (&mut *boxed_context) as *mut C as *mut c_void) };
         unsafe { rdsys::rd_kafka_conf_set_log_cb(native_config.ptr(), Some(native_log_cb::<C>)) };
@@ -124,13 +124,12 @@ impl<C: ClientContext> Client<C> {
         unsafe { rdsys::rd_kafka_conf_set_error_cb(native_config.ptr(), Some(native_error_cb::<C>)) };
 
         let client_ptr = unsafe {
-            rdsys::rd_kafka_new(rd_kafka_type, native_config.ptr_move(), errstr.as_ptr() as *mut c_char, errstr.len())
+            rdsys::rd_kafka_new(rd_kafka_type, native_config.ptr_move(), err_buf.as_mut_ptr(), err_buf.len())
         };
         trace!("Create new librdkafka client {:p}", client_ptr);
 
         if client_ptr.is_null() {
-            let descr = unsafe { bytes_cstr_to_owned(&errstr) };
-            return Err(KafkaError::ClientCreation(descr));
+            return Err(KafkaError::ClientCreation(err_buf.to_string()));
         }
 
         unsafe { rdsys::rd_kafka_set_log_level(client_ptr, config.log_level as i32) };
diff --git a/src/config.rs b/src/config.rs
index 6ab2c0e08..83d569ce1 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -24,14 +24,11 @@ use crate::rdsys;
 
 use crate::client::ClientContext;
 use crate::error::{KafkaError, KafkaResult, IsError};
-use crate::util::bytes_cstr_to_owned;
+use crate::util::ErrBuf;
 
 use std::collections::HashMap;
 use std::ffi::CString;
 use std::mem;
-use std::os::raw::c_char;
-
-const ERR_LEN: usize = 256;
 
 
 /// The log levels supported by librdkafka.
@@ -146,17 +143,16 @@ impl ClientConfig {
     /// Returns the native rdkafka-sys configuration.
     pub fn create_native_config(&self) -> KafkaResult<NativeClientConfig> {
         let conf = unsafe { rdsys::rd_kafka_conf_new() };
-        let errstr = [0; ERR_LEN];
+        let mut err_buf = ErrBuf::new();
         for (key, value) in &self.conf_map {
             let key_c = CString::new(key.to_string())?;
             let value_c = CString::new(value.to_string())?;
             let ret = unsafe {
                 rdsys::rd_kafka_conf_set(conf, key_c.as_ptr(), value_c.as_ptr(),
-                                           errstr.as_ptr() as *mut c_char, errstr.len())
+                                         err_buf.as_mut_ptr(), err_buf.len())
             };
-            if ret.is_error() {
-                let descr = unsafe { bytes_cstr_to_owned(&errstr) };
-                return Err(KafkaError::ClientConfig(ret, descr, key.to_string(), value.to_string()));
+            if ret.is_error() {;
+                return Err(KafkaError::ClientConfig(ret, err_buf.to_string(), key.to_string(), value.to_string()));
             }
         }
         Ok(unsafe {NativeClientConfig::from_ptr(conf)})
diff --git a/src/util.rs b/src/util.rs
index f3d86203e..feb0d7c94 100644
--- a/src/util.rs
+++ b/src/util.rs
@@ -111,6 +111,36 @@ pub unsafe fn cstr_to_owned(cstr: *const i8) -> String {
     CStr::from_ptr(cstr as *const c_char).to_string_lossy().into_owned()
 }
 
+pub(crate) struct ErrBuf {
+    buf: [c_char; ErrBuf::MAX_ERR_LEN]
+}
+
+impl ErrBuf {
+    const MAX_ERR_LEN: usize = 512;
+
+    pub fn new() -> ErrBuf {
+        ErrBuf { buf: [0; ErrBuf::MAX_ERR_LEN] }
+    }
+
+    pub fn as_mut_ptr(&mut self) -> *mut i8 {
+        self.buf.as_mut_ptr()
+    }
+
+    pub fn len(&self) -> usize {
+        self.buf.len()
+    }
+
+    pub fn to_string(&self) -> String {
+        unsafe { bytes_cstr_to_owned(&self.buf) }
+    }
+}
+
+impl Default for ErrBuf {
+    fn default() -> ErrBuf {
+        ErrBuf::new()
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;

From 473f8ff55ecc5dc78c1a7b031133e482b1adac28 Mon Sep 17 00:00:00 2001
From: Nikhil Benesch <nikhil.benesch@gmail.com>
Date: Tue, 23 Apr 2019 19:20:19 -0400
Subject: [PATCH 2/3] Add admin API client

Fix #92.
---
 Cargo.toml               |    2 +
 docker-compose.yaml      |    2 +
 rdkafka-sys/build.rs     |    3 +
 rdkafka-sys/src/types.rs |   33 +
 src/admin.rs             | 1241 ++++++++++++++++++++++++++++++++++++++
 src/client.rs            |   43 ++
 src/error.rs             |   18 +
 src/lib.rs               |    1 +
 src/util.rs              |   21 +
 tests/test_admin.rs      |  390 ++++++++++++
 tests/utils.rs           |   28 +-
 11 files changed, 1780 insertions(+), 2 deletions(-)
 create mode 100644 src/admin.rs
 create mode 100644 tests/test_admin.rs

diff --git a/Cargo.toml b/Cargo.toml
index e33543752..e70843764 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -20,10 +20,12 @@ serde_derive = "1.0.0"
 serde_json = "1.0.0"
 
 [dev-dependencies]
+backoff = "0.1.5"
 chrono = "0.4.0"
 clap = "2.18.0"
 env_logger = "0.3.0"
 rand = "0.3.15"
+regex = "1.1.6"
 tokio = "0.1.7"
 
 [features]
diff --git a/docker-compose.yaml b/docker-compose.yaml
index 84b0a5ae5..7525cf1d0 100644
--- a/docker-compose.yaml
+++ b/docker-compose.yaml
@@ -11,6 +11,8 @@ services:
     volumes:
       - .:/mount
     command: ./run_tests.sh
+    environment:
+      - KAFKA_VERSION=2.2.0
 
   kafka:
     image: confluentinc/cp-kafka:5.2.1
diff --git a/rdkafka-sys/build.rs b/rdkafka-sys/build.rs
index 2f3e9ec0e..5a427e8ad 100644
--- a/rdkafka-sys/build.rs
+++ b/rdkafka-sys/build.rs
@@ -73,6 +73,9 @@ fn main() {
         .rustified_enum("rd_kafka_conf_res_t")
         .rustified_enum("rd_kafka_resp_err_t")
         .rustified_enum("rd_kafka_timestamp_type_t")
+        .rustified_enum("rd_kafka_admin_op_t")
+        .rustified_enum("rd_kafka_ResourceType_t")
+        .rustified_enum("rd_kafka_ConfigSource_t")
         .generate()
         .expect("failed to generate bindings");
 
diff --git a/rdkafka-sys/src/types.rs b/rdkafka-sys/src/types.rs
index 08d76499e..ad2e2cb1d 100644
--- a/rdkafka-sys/src/types.rs
+++ b/rdkafka-sys/src/types.rs
@@ -55,6 +55,30 @@ pub type RDKafkaGroupMemberInfo = bindings::rd_kafka_group_member_info;
 /// Native rdkafka group member information
 pub type RDKafkaHeaders = bindings::rd_kafka_headers_t;
 
+/// Native rdkafka queue
+pub type RDKafkaQueue = bindings::rd_kafka_queue_t;
+
+// Native rdkafka new topic object
+pub type RDKafkaNewTopic = bindings::rd_kafka_NewTopic_t;
+
+// Native rdkafka delete topic object
+pub type RDKafkaDeleteTopic = bindings::rd_kafka_DeleteTopic_t;
+
+// Native rdkafka new partitions object
+pub type RDKafkaNewPartitions = bindings::rd_kafka_NewPartitions_t;
+
+// Native rdkafka config resource
+pub type RDKafkaConfigResource = bindings::rd_kafka_ConfigResource_t;
+
+// Native rdkafka event
+pub type RDKafkaEvent = bindings::rd_kafka_event_t;
+
+// Native rdkafka admin options
+pub type RDKafkaAdminOptions = bindings::rd_kafka_AdminOptions_t;
+
+// Native rdkafka topic result
+pub type RDKafkaTopicResult = bindings::rd_kafka_topic_result_t;
+
 // ENUMS
 
 /// Client types
@@ -66,6 +90,15 @@ pub use bindings::rd_kafka_conf_res_t as RDKafkaConfRes;
 /// Response error
 pub use bindings::rd_kafka_resp_err_t as RDKafkaRespErr;
 
+/// Admin operation
+pub use bindings::rd_kafka_admin_op_t as RDKafkaAdminOp;
+
+/// Config resource type
+pub use bindings::rd_kafka_ResourceType_t as RDKafkaResourceType;
+
+/// Config source
+pub use bindings::rd_kafka_ConfigSource_t as RDKafkaConfigSource;
+
 /// Errors enum
 
 /// Error from the underlying rdkafka library.
diff --git a/src/admin.rs b/src/admin.rs
new file mode 100644
index 000000000..ca9012a87
--- /dev/null
+++ b/src/admin.rs
@@ -0,0 +1,1241 @@
+//! Admin client.
+//!
+//! The main object is the [`AdminClient`] struct.
+//!
+//! [`AdminClient`]: struct.AdminClient.html
+
+use crate::rdsys;
+use crate::rdsys::types::*;
+
+use crate::client::{Client, ClientContext, DefaultClientContext, NativeQueue};
+use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext};
+use crate::error::{IsError, KafkaError, KafkaResult};
+use crate::util::{cstr_to_owned, timeout_to_ms, AsCArray, ErrBuf, IntoOpaque, WrappedCPointer};
+
+use futures::future::{self, Either};
+use futures::{Async, Canceled, Complete, Future, Oneshot, Poll};
+
+use std::collections::HashMap;
+use std::ffi::{CStr, CString};
+use std::mem;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Arc;
+use std::thread::{self, JoinHandle};
+use std::time::Duration;
+
+//
+// ********** ADMIN CLIENT **********
+//
+
+/// A client for the Kafka admin API.
+///
+/// `AdminClient` provides programmatic access to managing a Kafka cluster,
+/// notably manipulating topics, partitions, and configuration paramaters.
+pub struct AdminClient<C: ClientContext> {
+    client: Client<C>,
+    queue: Arc<NativeQueue>,
+    should_stop: Arc<AtomicBool>,
+    handle: Option<JoinHandle<()>>,
+}
+
+impl<C: ClientContext> AdminClient<C> {
+    /// Creates new topics according to the provided `NewTopic` specifications.
+    ///
+    /// Note that while the API supports creating multiple topics at once, it
+    /// is not transactional. Creation of some topics may succeed while others
+    /// fail. Be sure to check the result of each individual operation.
+    pub fn create_topics<'a, I>(
+        &self,
+        topics: I,
+        opts: &AdminOptions,
+    ) -> impl Future<Item = Vec<TopicResult>, Error = KafkaError>
+    where
+        I: IntoIterator<Item = &'a NewTopic<'a>>,
+    {
+        match self.create_topics_inner(topics, opts) {
+            Ok(rx) => Either::A(CreateTopicsFuture { rx }),
+            Err(err) => Either::B(future::err(err)),
+        }
+    }
+
+    fn create_topics_inner<'a, I>(&self, topics: I, opts: &AdminOptions) -> KafkaResult<Oneshot<NativeEvent>>
+    where
+        I: IntoIterator<Item = &'a NewTopic<'a>>,
+    {
+        let mut native_topics = Vec::new();
+        let mut err_buf = ErrBuf::new();
+        for t in topics {
+            native_topics.push(t.to_native(&mut err_buf)?);
+        }
+        let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
+        unsafe {
+            rdsys::rd_kafka_CreateTopics(
+                self.client.native_ptr(),
+                native_topics.as_c_array(),
+                native_topics.len(),
+                native_opts.ptr(),
+                self.queue.ptr(),
+            );
+        }
+        Ok(rx)
+    }
+
+    /// Deletes the named topics.
+    ///
+    /// Note that while the API supports deleting multiple topics at once, it is
+    /// not transactional. Deletion of some topics may succeed while others
+    /// fail. Be sure to check the result of each individual operation.
+    pub fn delete_topics(
+        &self,
+        topic_names: &[&str],
+        opts: &AdminOptions,
+    ) -> impl Future<Item = Vec<TopicResult>, Error = KafkaError> {
+        match self.delete_topics_inner(topic_names, opts) {
+            Ok(rx) => Either::A(DeleteTopicsFuture { rx }),
+            Err(err) => Either::B(future::err(err)),
+        }
+    }
+
+    fn delete_topics_inner(&self, topic_names: &[&str], opts: &AdminOptions) -> KafkaResult<Oneshot<NativeEvent>> {
+        let mut native_topics = Vec::new();
+        let mut err_buf = ErrBuf::new();
+        for tn in topic_names {
+            let tn_c = CString::new(*tn)?;
+            let native_topic = unsafe { NativeDeleteTopic::from_ptr(rdsys::rd_kafka_DeleteTopic_new(tn_c.as_ptr())) };
+            native_topics.push(native_topic);
+        }
+        let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
+        unsafe {
+            rdsys::rd_kafka_DeleteTopics(
+                self.client.native_ptr(),
+                native_topics.as_c_array(),
+                native_topics.len(),
+                native_opts.ptr(),
+                self.queue.ptr(),
+            );
+        }
+        Ok(rx)
+    }
+
+    /// Adds additional partitions to existing topics according to the provided
+    /// `NewPartitions` specifications.
+    ///
+    /// Note that while the API supports creating partitions for multiple topics
+    /// at once, it is not transactional. Creation of partitions for some topics
+    /// may succeed while others fail. Be sure to check the result of each
+    /// individual operation.
+    pub fn create_partitions<'a, I>(
+        &self,
+        partitions: I,
+        opts: &AdminOptions,
+    ) -> impl Future<Item = Vec<TopicResult>, Error = KafkaError>
+    where
+        I: IntoIterator<Item = &'a NewPartitions<'a>>,
+    {
+        match self.create_partitions_inner(partitions, opts) {
+            Ok(rx) => Either::A(CreatePartitionsFuture { rx }),
+            Err(err) => Either::B(future::err(err)),
+        }
+    }
+
+    fn create_partitions_inner<'a, I>(&self, partitions: I, opts: &AdminOptions) -> KafkaResult<Oneshot<NativeEvent>>
+    where
+        I: IntoIterator<Item = &'a NewPartitions<'a>>,
+    {
+        let mut native_partitions = Vec::new();
+        let mut err_buf = ErrBuf::new();
+        for p in partitions {
+            native_partitions.push(p.to_native(&mut err_buf)?);
+        }
+        let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
+        unsafe {
+            rdsys::rd_kafka_CreatePartitions(
+                self.client.native_ptr(),
+                native_partitions.as_c_array(),
+                native_partitions.len(),
+                native_opts.ptr(),
+                self.queue.ptr(),
+            );
+        }
+        Ok(rx)
+    }
+
+    /// Retrieves the configuration parameters for the specified resources.
+    ///
+    /// Note that while the API supports describing multiple configurations at
+    /// once, it is not transactional. There is no guarantee that you will see
+    /// a consistent snapshot of the configuration across different resources.
+    pub fn describe_configs<'a, I>(
+        &self,
+        configs: I,
+        opts: &AdminOptions,
+    ) -> impl Future<Item = Vec<ConfigResourceResult>, Error = KafkaError>
+    where
+        I: IntoIterator<Item = &'a ResourceSpecifier<'a>>,
+    {
+        match self.describe_configs_inner(configs, opts) {
+            Ok(rx) => Either::A(DescribeConfigsFuture { rx }),
+            Err(err) => Either::B(future::err(err)),
+        }
+    }
+
+    fn describe_configs_inner<'a, I>(&self, configs: I, opts: &AdminOptions) -> KafkaResult<Oneshot<NativeEvent>>
+    where
+        I: IntoIterator<Item = &'a ResourceSpecifier<'a>>,
+    {
+        let mut native_configs = Vec::new();
+        let mut err_buf = ErrBuf::new();
+        for c in configs {
+            let (name, typ) = match c {
+                ResourceSpecifier::Topic(name) => (CString::new(*name)?, RDKafkaResourceType::RD_KAFKA_RESOURCE_TOPIC),
+                ResourceSpecifier::Group(name) => (CString::new(*name)?, RDKafkaResourceType::RD_KAFKA_RESOURCE_GROUP),
+                ResourceSpecifier::Broker(id) => (
+                    CString::new(format!("{}", id))?,
+                    RDKafkaResourceType::RD_KAFKA_RESOURCE_BROKER,
+                ),
+            };
+            native_configs.push(unsafe {
+                NativeConfigResource::from_ptr(rdsys::rd_kafka_ConfigResource_new(typ, name.as_ptr()))
+            });
+        }
+        let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
+        unsafe {
+            rdsys::rd_kafka_DescribeConfigs(
+                self.client.native_ptr(),
+                native_configs.as_c_array(),
+                native_configs.len(),
+                native_opts.ptr(),
+                self.queue.ptr(),
+            );
+        }
+        Ok(rx)
+    }
+
+    /// Sets configuration parameters for the specified resources.
+    ///
+    /// Note that while the API supports altering multiple resources at once, it
+    /// is not transactional. Alteration of some resources may succeed while
+    /// others fail. Be sure to check the result of each individual operation.
+    pub fn alter_configs<'a, I>(
+        &self,
+        configs: I,
+        opts: &AdminOptions,
+    ) -> impl Future<Item = Vec<AlterConfigsResult>, Error = KafkaError>
+    where
+        I: IntoIterator<Item = &'a AlterConfig<'a>>,
+    {
+        match self.alter_configs_inner(configs, opts) {
+            Ok(rx) => Either::A(AlterConfigsFuture { rx }),
+            Err(err) => Either::B(future::err(err)),
+        }
+    }
+
+    fn alter_configs_inner<'a, I>(&self, configs: I, opts: &AdminOptions) -> KafkaResult<Oneshot<NativeEvent>>
+    where
+        I: IntoIterator<Item = &'a AlterConfig<'a>>,
+    {
+        let mut native_configs = Vec::new();
+        let mut err_buf = ErrBuf::new();
+        for c in configs {
+            native_configs.push(c.to_native(&mut err_buf)?);
+        }
+        let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
+        unsafe {
+            rdsys::rd_kafka_AlterConfigs(
+                self.client.native_ptr(),
+                native_configs.as_c_array(),
+                native_configs.len(),
+                native_opts.ptr(),
+                self.queue.ptr(),
+            );
+        }
+        Ok(rx)
+    }
+}
+
+impl FromClientConfig for AdminClient<DefaultClientContext> {
+    fn from_config(config: &ClientConfig) -> KafkaResult<AdminClient<DefaultClientContext>> {
+        AdminClient::from_config_and_context(config, DefaultClientContext)
+    }
+}
+
+impl<C: ClientContext> FromClientConfigAndContext<C> for AdminClient<C> {
+    fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult<AdminClient<C>> {
+        let native_config = config.create_native_config()?;
+        // librdkafka only provides consumer and producer types. We follow the
+        // example of the Python bindings in choosing to pretend to be a
+        // producer, as producer clients are allegedly more lightweight. [0]
+        //
+        // [0]: https://github.com/confluentinc/confluent-kafka-python/blob/bfb07dfbca47c256c840aaace83d3fe26c587360/confluent_kafka/src/Admin.c#L1492-L1493
+        let client = Client::new(config, native_config, RDKafkaType::RD_KAFKA_PRODUCER, context)?;
+        let queue = Arc::new(client.new_native_queue());
+        let should_stop = Arc::new(AtomicBool::new(false));
+        let handle = start_poll_thread(queue.clone(), should_stop.clone());
+        Ok(AdminClient {
+            client,
+            queue,
+            should_stop,
+            handle: Some(handle),
+        })
+    }
+}
+
+impl<C: ClientContext> Drop for AdminClient<C> {
+    fn drop(&mut self) {
+        trace!("Stopping polling");
+        self.should_stop.store(true, Ordering::Relaxed);
+        trace!("Waiting for polling thread termination");
+        match self.handle.take().unwrap().join() {
+            Ok(()) => trace!("Polling stopped"),
+            Err(e) => warn!("Failure while terminating thread: {:?}", e),
+        };
+    }
+}
+
+fn start_poll_thread(queue: Arc<NativeQueue>, should_stop: Arc<AtomicBool>) -> JoinHandle<()> {
+    thread::Builder::new()
+        .name("admin client polling thread".into())
+        .spawn(move || {
+            trace!("Admin polling thread loop started");
+            loop {
+                let event = queue.poll(Duration::from_millis(100));
+                if event.is_null() {
+                    if should_stop.load(Ordering::Relaxed) {
+                        // We received nothing and the thread should stop, so
+                        // break the loop.
+                        break;
+                    }
+                    continue;
+                }
+                let event = unsafe { NativeEvent::from_ptr(event) };
+                let tx: Box<Complete<NativeEvent>> =
+                    unsafe { IntoOpaque::from_ptr(rdsys::rd_kafka_event_opaque(event.ptr())) };
+                let _ = tx.send(event);
+            }
+            trace!("Admin polling thread loop terminated");
+        })
+        .expect("Failed to start polling thread")
+}
+
+struct NativeEvent {
+    ptr: *mut RDKafkaEvent,
+}
+
+impl NativeEvent {
+    unsafe fn from_ptr(ptr: *mut RDKafkaEvent) -> NativeEvent {
+        NativeEvent { ptr }
+    }
+
+    fn ptr(&self) -> *mut RDKafkaEvent {
+        self.ptr
+    }
+
+    fn check_error(&self) -> KafkaResult<()> {
+        let err = unsafe { rdsys::rd_kafka_event_error(self.ptr) };
+        if err.is_error() {
+            Err(KafkaError::AdminOp(err.into()))
+        } else {
+            Ok(())
+        }
+    }
+}
+
+impl Drop for NativeEvent {
+    fn drop(&mut self) {
+        trace!("Destroying event: {:?}", self.ptr);
+        unsafe {
+            rdsys::rd_kafka_event_destroy(self.ptr);
+        }
+        trace!("Event destroyed: {:?}", self.ptr);
+    }
+}
+
+unsafe impl Sync for NativeEvent {}
+unsafe impl Send for NativeEvent {}
+
+//
+// ********** ADMIN OPTIONS **********
+//
+
+/// Options for an admin API request.
+pub struct AdminOptions {
+    request_timeout: Option<Duration>,
+    operation_timeout: Option<Duration>,
+    validate_only: bool,
+    broker_id: Option<i32>,
+}
+
+impl AdminOptions {
+    /// Creates a new `AdminOptions`.
+    pub fn new() -> AdminOptions {
+        AdminOptions {
+            request_timeout: None,
+            operation_timeout: None,
+            validate_only: false,
+            broker_id: None,
+        }
+    }
+
+    /// Sets the overall request timeout, including broker lookup, request
+    /// transmission, operation time on broker, and response.
+    ///
+    /// Defaults to the `socket.timeout.ms` configuration parameter.
+    pub fn request_timeout<T: Into<Option<Duration>>>(mut self, timeout: T) -> Self {
+        self.request_timeout = timeout.into();
+        self
+    }
+
+    /// Sets the broker's operation timeout, such as the timeout for
+    /// CreateTopics to complete the creation of topics on the controller before
+    /// returning a result to the application.
+    ///
+    /// If unset (the default), the API calls will return immediately after
+    /// triggering the operation.
+    ///
+    /// Only the CreateTopics, DeleteTopics, and CreatePartitions API calls
+    /// respect this option.
+    pub fn operation_timeout<T: Into<Option<Duration>>>(mut self, timeout: T) -> Self {
+        self.operation_timeout = timeout.into();
+        self
+    }
+
+    /// Tells the broker to only validate the request, without performing the
+    /// requested operation.
+    ///
+    /// Defaults to false.
+    pub fn validate_only(mut self, validate_only: bool) -> Self {
+        self.validate_only = validate_only;
+        self
+    }
+
+    /// Override what broker the admin request will be sent to.
+    ///
+    /// By default, a reasonable broker will be selected automatically. See the
+    /// librdkafka docs on `rd_kafka_AdminOptions_set_broker` for details.
+    pub fn broker_id<T: Into<Option<i32>>>(mut self, broker_id: T) -> Self {
+        self.broker_id = broker_id.into();
+        self
+    }
+
+    fn to_native(
+        &self,
+        client: *mut RDKafka,
+        err_buf: &mut ErrBuf,
+    ) -> KafkaResult<(NativeAdminOptions, Oneshot<NativeEvent>)> {
+        let native_opts = unsafe {
+            NativeAdminOptions::from_ptr(rdsys::rd_kafka_AdminOptions_new(
+                client,
+                RDKafkaAdminOp::RD_KAFKA_ADMIN_OP_ANY,
+            ))
+        };
+
+        if let Some(timeout) = self.request_timeout {
+            let res = unsafe {
+                rdsys::rd_kafka_AdminOptions_set_request_timeout(
+                    native_opts.ptr(),
+                    timeout_to_ms(timeout),
+                    err_buf.as_mut_ptr(),
+                    err_buf.len(),
+                )
+            };
+            check_rdkafka_invalid_arg(res, err_buf)?;
+        }
+
+        if let Some(timeout) = self.operation_timeout {
+            let res = unsafe {
+                rdsys::rd_kafka_AdminOptions_set_operation_timeout(
+                    native_opts.ptr(),
+                    timeout_to_ms(timeout),
+                    err_buf.as_mut_ptr(),
+                    err_buf.len(),
+                )
+            };
+            check_rdkafka_invalid_arg(res, err_buf)?;
+        }
+
+        if self.validate_only {
+            let res = unsafe {
+                rdsys::rd_kafka_AdminOptions_set_validate_only(
+                    native_opts.ptr(),
+                    1, // true
+                    err_buf.as_mut_ptr(),
+                    err_buf.len(),
+                )
+            };
+            check_rdkafka_invalid_arg(res, err_buf)?;
+        }
+
+        if let Some(broker_id) = self.broker_id {
+            let res = unsafe {
+                rdsys::rd_kafka_AdminOptions_set_broker(
+                    native_opts.ptr(),
+                    broker_id,
+                    err_buf.as_mut_ptr(),
+                    err_buf.len(),
+                )
+            };
+            check_rdkafka_invalid_arg(res, err_buf)?;
+        }
+
+        let (tx, rx) = futures::oneshot();
+        let tx = Box::new(tx);
+        unsafe { rdsys::rd_kafka_AdminOptions_set_opaque(native_opts.ptr, IntoOpaque::as_ptr(&tx)) };
+        mem::forget(tx);
+
+        Ok((native_opts, rx))
+    }
+}
+
+struct NativeAdminOptions {
+    ptr: *mut RDKafkaAdminOptions,
+}
+
+impl NativeAdminOptions {
+    unsafe fn from_ptr(ptr: *mut RDKafkaAdminOptions) -> NativeAdminOptions {
+        NativeAdminOptions { ptr }
+    }
+
+    fn ptr(&self) -> *mut RDKafkaAdminOptions {
+        self.ptr
+    }
+}
+
+impl Drop for NativeAdminOptions {
+    fn drop(&mut self) {
+        trace!("Destroying admin options: {:?}", self.ptr);
+        unsafe {
+            rdsys::rd_kafka_AdminOptions_destroy(self.ptr);
+        }
+        trace!("Admin options destroyed: {:?}", self.ptr);
+    }
+}
+
+fn check_rdkafka_invalid_arg(res: RDKafkaRespErr, err_buf: &ErrBuf) -> KafkaResult<()> {
+    match res.into() {
+        RDKafkaError::NoError => Ok(()),
+        RDKafkaError::InvalidArgument => {
+            let msg = if err_buf.len() == 0 {
+                "invalid argument".into()
+            } else {
+                err_buf.to_string()
+            };
+            Err(KafkaError::AdminOpCreation(msg))
+        }
+        res => Err(KafkaError::AdminOpCreation(format!(
+            "setting admin options returned unexpected error code {}",
+            res
+        ))),
+    }
+}
+
+//
+// ********** RESPONSE HANDLING **********
+//
+
+/// The result of an individual CreateTopic, DeleteTopic, or
+/// CreatePartition operation.
+pub type TopicResult = Result<String, (String, RDKafkaError)>;
+
+fn build_topic_results(topics: *const *const RDKafkaTopicResult, n: usize) -> Vec<TopicResult> {
+    let mut out = Vec::with_capacity(n);
+    for i in 0..n {
+        let topic = unsafe { *topics.offset(i as isize) };
+        let name = unsafe { cstr_to_owned(rdsys::rd_kafka_topic_result_name(topic)) };
+        let err = unsafe { rdsys::rd_kafka_topic_result_error(topic) };
+        if err.is_error() {
+            out.push(Err((name, err.into())));
+        } else {
+            out.push(Ok(name));
+        }
+    }
+    out
+}
+
+//
+// Create topic handling
+//
+
+/// Configuration for a CreateTopic operation.
+#[derive(Debug)]
+pub struct NewTopic<'a> {
+    /// The name of the new topic.
+    pub name: &'a str,
+    /// The initial number of partitions.
+    pub num_partitions: i32,
+    /// The initial replication configuration.
+    pub replication: TopicReplication<'a>,
+    /// The initial configuration parameters for the topic.
+    pub config: Vec<(&'a str, &'a str)>,
+}
+
+impl<'a> NewTopic<'a> {
+    /// Creates a new `NewTopic`.
+    pub fn new(name: &'a str, num_partitions: i32, replication: TopicReplication<'a>) -> NewTopic<'a> {
+        NewTopic {
+            name,
+            num_partitions,
+            replication,
+            config: Vec::new(),
+        }
+    }
+
+    /// Sets a new parameter in the initial topic configuration.
+    pub fn set(mut self, key: &'a str, value: &'a str) -> NewTopic<'a> {
+        self.config.push((key, value));
+        self
+    }
+
+    fn to_native(&self, err_buf: &mut ErrBuf) -> KafkaResult<NativeNewTopic> {
+        let name = CString::new(self.name)?;
+        let repl = match self.replication {
+            TopicReplication::Fixed(n) => n,
+            TopicReplication::Variable(partitions) => {
+                if partitions.len() as i32 != self.num_partitions {
+                    return Err(KafkaError::AdminOpCreation(format!(
+                        "replication configuration for topic '{}' assigns {} partition(s), \
+                         which does not match the specified number of partitions ({})",
+                        self.name,
+                        partitions.len(),
+                        self.num_partitions,
+                    )));
+                }
+                -1
+            }
+        };
+        let topic = unsafe {
+            rdsys::rd_kafka_NewTopic_new(
+                name.as_ptr(),
+                self.num_partitions,
+                repl,
+                err_buf.as_mut_ptr(),
+                err_buf.len(),
+            )
+        };
+        if topic.is_null() {
+            return Err(KafkaError::AdminOpCreation(err_buf.to_string()));
+        }
+        // N.B.: we wrap topic immediately, so that it is destroyed via the
+        // NativeNewTopic's Drop implementation if replica assignment or config
+        // installation fails.
+        let topic = unsafe { NativeNewTopic::from_ptr(topic) };
+        if let TopicReplication::Variable(assignment) = self.replication {
+            for (partition_id, broker_ids) in assignment.into_iter().enumerate() {
+                let res = unsafe {
+                    rdsys::rd_kafka_NewTopic_set_replica_assignment(
+                        topic.ptr(),
+                        partition_id as i32,
+                        broker_ids.as_ptr() as *mut i32,
+                        broker_ids.len(),
+                        err_buf.as_mut_ptr(),
+                        err_buf.len(),
+                    )
+                };
+                check_rdkafka_invalid_arg(res, err_buf)?;
+            }
+        }
+        for (key, val) in &self.config {
+            let key_c = CString::new(*key)?;
+            let val_c = CString::new(*val)?;
+            let res = unsafe { rdsys::rd_kafka_NewTopic_set_config(topic.ptr(), key_c.as_ptr(), val_c.as_ptr()) };
+            check_rdkafka_invalid_arg(res, err_buf)?;
+        }
+        Ok(topic)
+    }
+}
+
+/// An assignment of partitions to replicas.
+///
+/// Each element in the outer slice corresponds to the partition with that
+/// index. The inner slice specifies the broker IDs to which replicas of that
+/// partition should be assigned.
+pub type PartitionAssignment<'a> = &'a [&'a [i32]];
+
+/// Replication configuration for a new topic.
+#[derive(Debug)]
+pub enum TopicReplication<'a> {
+    /// All partitions should use the same fixed replication factor.
+    Fixed(i32),
+    /// Each partition should use the replica assignment from
+    /// `PartitionAssignment`.
+    Variable(PartitionAssignment<'a>),
+}
+
+#[repr(transparent)]
+struct NativeNewTopic {
+    ptr: *mut RDKafkaNewTopic,
+}
+
+impl NativeNewTopic {
+    unsafe fn from_ptr(ptr: *mut RDKafkaNewTopic) -> NativeNewTopic {
+        NativeNewTopic { ptr }
+    }
+}
+
+impl WrappedCPointer for NativeNewTopic {
+    type Target = RDKafkaNewTopic;
+
+    fn ptr(&self) -> *mut RDKafkaNewTopic {
+        self.ptr
+    }
+}
+
+impl Drop for NativeNewTopic {
+    fn drop(&mut self) {
+        trace!("Destroying new topic: {:?}", self.ptr);
+        unsafe {
+            rdsys::rd_kafka_NewTopic_destroy(self.ptr);
+        }
+        trace!("New topic destroyed: {:?}", self.ptr);
+    }
+}
+
+struct CreateTopicsFuture {
+    rx: Oneshot<NativeEvent>,
+}
+
+impl Future for CreateTopicsFuture {
+    type Item = Vec<TopicResult>;
+    type Error = KafkaError;
+
+    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+        match self.rx.poll() {
+            Ok(Async::Ready(event)) => {
+                event.check_error()?;
+                let res = unsafe { rdsys::rd_kafka_event_CreateTopics_result(event.ptr()) };
+                if res.is_null() {
+                    let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
+                    return Err(KafkaError::AdminOpCreation(format!(
+                        "create topics request received response of incorrect type ({})",
+                        typ
+                    )));
+                }
+                let mut n = 0;
+                let topics = unsafe { rdsys::rd_kafka_CreateTopics_result_topics(res, &mut n) };
+                Ok(Async::Ready(build_topic_results(topics, n)))
+            }
+            Ok(Async::NotReady) => Ok(Async::NotReady),
+            Err(Canceled) => Err(KafkaError::Canceled),
+        }
+    }
+}
+
+//
+// Delete topic handling
+//
+
+#[repr(transparent)]
+struct NativeDeleteTopic {
+    ptr: *mut RDKafkaDeleteTopic,
+}
+
+impl NativeDeleteTopic {
+    unsafe fn from_ptr(ptr: *mut RDKafkaDeleteTopic) -> NativeDeleteTopic {
+        NativeDeleteTopic { ptr }
+    }
+}
+
+impl WrappedCPointer for NativeDeleteTopic {
+    type Target = RDKafkaDeleteTopic;
+
+    fn ptr(&self) -> *mut RDKafkaDeleteTopic {
+        self.ptr
+    }
+}
+
+impl Drop for NativeDeleteTopic {
+    fn drop(&mut self) {
+        trace!("Destroying delete topic: {:?}", self.ptr);
+        unsafe {
+            rdsys::rd_kafka_DeleteTopic_destroy(self.ptr);
+        }
+        trace!("Delete topic destroyed: {:?}", self.ptr);
+    }
+}
+
+struct DeleteTopicsFuture {
+    rx: Oneshot<NativeEvent>,
+}
+
+impl Future for DeleteTopicsFuture {
+    type Item = Vec<TopicResult>;
+    type Error = KafkaError;
+
+    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+        match self.rx.poll() {
+            Ok(Async::Ready(event)) => {
+                event.check_error()?;
+                let res = unsafe { rdsys::rd_kafka_event_DeleteTopics_result(event.ptr()) };
+                if res.is_null() {
+                    let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
+                    return Err(KafkaError::AdminOpCreation(format!(
+                        "delete topics request received response of incorrect type ({})",
+                        typ
+                    )));
+                }
+                let mut n = 0;
+                let topics = unsafe { rdsys::rd_kafka_DeleteTopics_result_topics(res, &mut n) };
+                Ok(Async::Ready(build_topic_results(topics, n)))
+            }
+            Ok(Async::NotReady) => Ok(Async::NotReady),
+            Err(Canceled) => Err(KafkaError::Canceled),
+        }
+    }
+}
+
+//
+// Create partitions handling
+//
+
+/// Configuration for a CreatePartitions operation.
+pub struct NewPartitions<'a> {
+    /// The name of the topic to which partitions should be added.
+    pub topic_name: &'a str,
+    /// The total number of partitions after the operation completes.
+    pub new_partition_count: usize,
+    /// The replica assignments for the new partitions.
+    pub assignment: Option<PartitionAssignment<'a>>,
+}
+
+impl<'a> NewPartitions<'a> {
+    /// Creates a new `NewPartitions`.
+    pub fn new(topic_name: &'a str, new_partition_count: usize) -> NewPartitions<'a> {
+        NewPartitions {
+            topic_name,
+            new_partition_count,
+            assignment: None,
+        }
+    }
+
+    /// Sets the partition replica assignment for the new partitions. Only
+    /// assignments for newly created replicas should be included.
+    pub fn assign(mut self, assignment: PartitionAssignment<'a>) -> NewPartitions {
+        self.assignment = Some(assignment);
+        self
+    }
+
+    fn to_native(&self, err_buf: &mut ErrBuf) -> KafkaResult<NativeNewPartitions> {
+        let name = CString::new(self.topic_name)?;
+        if let Some(assignment) = self.assignment {
+            // If assignment contains more than self.new_partition_count
+            // entries, we'll trip an assertion in librdkafka that crashes the
+            // process. Note that this check isn't a guarantee that the
+            // partition assignment is valid, since the assignment should only
+            // contain entries for the *new* partitions added, and not any
+            // existing partitions, but we can let the server handle that
+            // validation--we just need to make sure not to crash librdkafka.
+            if assignment.len() > self.new_partition_count {
+                return Err(KafkaError::AdminOpCreation(format!(
+                    "partition assignment for topic '{}' assigns {} partition(s), \
+                     which is more than the requested total number of partitions ({})",
+                    self.topic_name,
+                    assignment.len(),
+                    self.new_partition_count,
+                )));
+            }
+        }
+        let partitions = unsafe {
+            rdsys::rd_kafka_NewPartitions_new(
+                name.as_ptr(),
+                self.new_partition_count,
+                err_buf.as_mut_ptr(),
+                err_buf.len(),
+            )
+        };
+        if partitions.is_null() {
+            return Err(KafkaError::AdminOpCreation(err_buf.to_string()));
+        }
+        // N.B.: we wrap partition immediately, so that it is destroyed via
+        // NativeNewPartitions's Drop implementation if replica assignment or
+        // config installation fails.
+        let partitions = unsafe { NativeNewPartitions::from_ptr(partitions) };
+        if let Some(assignment) = self.assignment {
+            for (partition_id, broker_ids) in assignment.into_iter().enumerate() {
+                let res = unsafe {
+                    rdsys::rd_kafka_NewPartitions_set_replica_assignment(
+                        partitions.ptr(),
+                        partition_id as i32,
+                        broker_ids.as_ptr() as *mut i32,
+                        broker_ids.len(),
+                        err_buf.as_mut_ptr(),
+                        err_buf.len(),
+                    )
+                };
+                check_rdkafka_invalid_arg(res, err_buf)?;
+            }
+        }
+        Ok(partitions)
+    }
+}
+
+#[repr(transparent)]
+struct NativeNewPartitions {
+    ptr: *mut RDKafkaNewPartitions,
+}
+
+impl NativeNewPartitions {
+    unsafe fn from_ptr(ptr: *mut RDKafkaNewPartitions) -> NativeNewPartitions {
+        NativeNewPartitions { ptr }
+    }
+}
+
+impl WrappedCPointer for NativeNewPartitions {
+    type Target = RDKafkaNewPartitions;
+
+    fn ptr(&self) -> *mut RDKafkaNewPartitions {
+        self.ptr
+    }
+}
+
+impl Drop for NativeNewPartitions {
+    fn drop(&mut self) {
+        trace!("Destroying new partitions: {:?}", self.ptr);
+        unsafe {
+            rdsys::rd_kafka_NewPartitions_destroy(self.ptr);
+        }
+        trace!("New partitions destroyed: {:?}", self.ptr);
+    }
+}
+
+struct CreatePartitionsFuture {
+    rx: Oneshot<NativeEvent>,
+}
+
+impl Future for CreatePartitionsFuture {
+    type Item = Vec<TopicResult>;
+    type Error = KafkaError;
+
+    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+        match self.rx.poll() {
+            Ok(Async::Ready(event)) => {
+                event.check_error()?;
+                let res = unsafe { rdsys::rd_kafka_event_CreatePartitions_result(event.ptr()) };
+                if res.is_null() {
+                    let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
+                    return Err(KafkaError::AdminOpCreation(format!(
+                        "create partitions request received response of incorrect type ({})",
+                        typ
+                    )));
+                }
+                let mut n = 0;
+                let topics = unsafe { rdsys::rd_kafka_CreatePartitions_result_topics(res, &mut n) };
+                Ok(Async::Ready(build_topic_results(topics, n)))
+            }
+            Ok(Async::NotReady) => Ok(Async::NotReady),
+            Err(Canceled) => Err(KafkaError::Canceled),
+        }
+    }
+}
+
+//
+// Describe configs handling
+//
+
+/// The result of an individual DescribeConfig operation.
+pub type ConfigResourceResult = Result<ConfigResource, RDKafkaError>;
+
+/// Specification of a configurable resource.
+#[derive(Copy, Clone, Debug, Eq, PartialEq)]
+pub enum ResourceSpecifier<'a> {
+    /// A topic resource, identified by its name.
+    Topic(&'a str),
+    /// A group resource, identified by its ID.
+    Group(&'a str),
+    /// A broker resource, identified by its ID.
+    Broker(i32),
+}
+
+/// A `ResourceSpecifier` that owns its data.
+#[derive(Debug, Eq, PartialEq)]
+pub enum OwnedResourceSpecifier {
+    /// A topic resource, identified by its name.
+    Topic(String),
+    /// A group resource, identified by its ID.
+    Group(String),
+    /// A broker resource, identified by its ID.
+    Broker(i32),
+}
+
+/// The source of a configuration entry.
+#[derive(Debug, Eq, PartialEq)]
+pub enum ConfigSource {
+    /// Unknown. Note that Kafka brokers before v1.1.0 do not reliably provide
+    /// configuration source information.
+    Unknown,
+    /// A dynamic topic configuration.
+    DynamicTopic,
+    /// A dynamic broker configuration.
+    DynamicBroker,
+    /// The default dynamic broker configuration.
+    DynamicDefaultBroker,
+    /// The static broker configuration.
+    StaticBroker,
+    /// The hardcoded default configuration.
+    Default,
+}
+
+/// An individual configuration parameter for a `ConfigResource`.
+#[derive(Debug, Eq, PartialEq)]
+pub struct ConfigEntry {
+    /// The name of the configuration parameter.
+    pub name: String,
+    /// The value of the configuration parameter.
+    pub value: Option<String>,
+    /// The source of the configuration parameter.
+    pub source: ConfigSource,
+    /// Whether the configuration parameter is read only.
+    pub is_read_only: bool,
+    /// Whether the configuration parameter currently has the default value.
+    pub is_default: bool,
+    /// Whether the configuration parameter contains sensitive data.
+    pub is_sensitive: bool,
+}
+
+/// A configurable resource and its current configuration values.
+#[derive(Debug)]
+pub struct ConfigResource {
+    /// Identifies the resource.
+    pub specifier: OwnedResourceSpecifier,
+    /// The current configuration parameters.
+    pub entries: Vec<ConfigEntry>,
+}
+
+impl ConfigResource {
+    /// Builds a `HashMap` of configuration entries, keyed by configuration
+    /// entry name.
+    pub fn entry_map(&self) -> HashMap<&str, &ConfigEntry> {
+        self.entries.iter().map(|e| (&*e.name, e)).collect()
+    }
+
+    /// Searches the configuration entries to find the named parameter.
+    ///
+    /// For more efficient lookups, use `entry_map` to build a `HashMap`
+    /// instead.
+    pub fn get(&self, name: &str) -> Option<&ConfigEntry> {
+        self.entries.iter().find(|e| e.name == name)
+    }
+}
+
+#[repr(transparent)]
+struct NativeConfigResource {
+    ptr: *mut RDKafkaConfigResource,
+}
+
+impl NativeConfigResource {
+    unsafe fn from_ptr(ptr: *mut RDKafkaConfigResource) -> NativeConfigResource {
+        NativeConfigResource { ptr }
+    }
+}
+
+impl WrappedCPointer for NativeConfigResource {
+    type Target = RDKafkaConfigResource;
+
+    fn ptr(&self) -> *mut RDKafkaConfigResource {
+        self.ptr
+    }
+}
+
+impl Drop for NativeConfigResource {
+    fn drop(&mut self) {
+        trace!("Destroying config resource: {:?}", self.ptr);
+        unsafe {
+            rdsys::rd_kafka_ConfigResource_destroy(self.ptr);
+        }
+        trace!("Config resource destroyed: {:?}", self.ptr);
+    }
+}
+
+fn extract_config_specifier(resource: *const RDKafkaConfigResource) -> KafkaResult<OwnedResourceSpecifier> {
+    let typ = unsafe { rdsys::rd_kafka_ConfigResource_type(resource) };
+    match typ {
+        RDKafkaResourceType::RD_KAFKA_RESOURCE_TOPIC => {
+            let name = unsafe { cstr_to_owned(rdsys::rd_kafka_ConfigResource_name(resource)) };
+            Ok(OwnedResourceSpecifier::Topic(name))
+        }
+        RDKafkaResourceType::RD_KAFKA_RESOURCE_GROUP => {
+            let name = unsafe { cstr_to_owned(rdsys::rd_kafka_ConfigResource_name(resource)) };
+            Ok(OwnedResourceSpecifier::Group(name))
+        }
+        RDKafkaResourceType::RD_KAFKA_RESOURCE_BROKER => {
+            let name = unsafe { CStr::from_ptr(rdsys::rd_kafka_ConfigResource_name(resource)) }.to_string_lossy();
+            match name.parse::<i32>() {
+                Ok(id) => Ok(OwnedResourceSpecifier::Broker(id)),
+                Err(_) => Err(KafkaError::AdminOpCreation(format!(
+                    "bogus broker ID in kafka response: {}",
+                    name
+                ))),
+            }
+        }
+        _ => Err(KafkaError::AdminOpCreation(format!(
+            "bogus resource type in kafka response: {:?}",
+            typ
+        ))),
+    }
+}
+
+fn extract_config_source(config_source: RDKafkaConfigSource) -> KafkaResult<ConfigSource> {
+    match config_source {
+        RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_UNKNOWN_CONFIG => Ok(ConfigSource::Unknown),
+        RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG => Ok(ConfigSource::DynamicTopic),
+        RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG => Ok(ConfigSource::DynamicBroker),
+        RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG => {
+            Ok(ConfigSource::DynamicDefaultBroker)
+        }
+        RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_STATIC_BROKER_CONFIG => Ok(ConfigSource::StaticBroker),
+        RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG => Ok(ConfigSource::Default),
+        _ => Err(KafkaError::AdminOpCreation(format!(
+            "bogus config source type in kafka response: {:?}",
+            config_source,
+        ))),
+    }
+}
+
+struct DescribeConfigsFuture {
+    rx: Oneshot<NativeEvent>,
+}
+
+impl Future for DescribeConfigsFuture {
+    type Item = Vec<ConfigResourceResult>;
+    type Error = KafkaError;
+
+    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+        match self.rx.poll() {
+            Ok(Async::Ready(event)) => {
+                event.check_error()?;
+                let res = unsafe { rdsys::rd_kafka_event_DescribeConfigs_result(event.ptr()) };
+                if res.is_null() {
+                    let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
+                    return Err(KafkaError::AdminOpCreation(format!(
+                        "describe configs request received response of incorrect type ({})",
+                        typ
+                    )));
+                }
+                let mut n = 0;
+                let resources = unsafe { rdsys::rd_kafka_DescribeConfigs_result_resources(res, &mut n) };
+                let mut out = Vec::with_capacity(n);
+                for i in 0..n {
+                    let resource = unsafe { *resources.offset(i as isize) };
+                    let specifier = extract_config_specifier(resource)?;
+                    let mut entries_out = Vec::new();
+                    let mut n = 0;
+                    let entries = unsafe { rdsys::rd_kafka_ConfigResource_configs(resource, &mut n) };
+                    for j in 0..n {
+                        let entry = unsafe { *entries.offset(j as isize) };
+                        let name = unsafe { cstr_to_owned(rdsys::rd_kafka_ConfigEntry_name(entry)) };
+                        let value = unsafe {
+                            let value = rdsys::rd_kafka_ConfigEntry_value(entry);
+                            if value.is_null() {
+                                None
+                            } else {
+                                Some(cstr_to_owned(value))
+                            }
+                        };
+                        entries_out.push(ConfigEntry {
+                            name,
+                            value,
+                            source: extract_config_source(unsafe { rdsys::rd_kafka_ConfigEntry_source(entry) })?,
+                            is_read_only: unsafe { rdsys::rd_kafka_ConfigEntry_is_read_only(entry) } != 0,
+                            is_default: unsafe { rdsys::rd_kafka_ConfigEntry_is_default(entry) } != 0,
+                            is_sensitive: unsafe { rdsys::rd_kafka_ConfigEntry_is_sensitive(entry) } != 0,
+                        });
+                    }
+                    out.push(Ok(ConfigResource {
+                        specifier: specifier,
+                        entries: entries_out,
+                    }))
+                }
+                Ok(Async::Ready(out))
+            }
+            Ok(Async::NotReady) => Ok(Async::NotReady),
+            Err(Canceled) => Err(KafkaError::Canceled),
+        }
+    }
+}
+
+//
+// Alter configs handling
+//
+
+/// The result of an individual AlterConfig operation.
+pub type AlterConfigsResult = Result<OwnedResourceSpecifier, (OwnedResourceSpecifier, RDKafkaError)>;
+
+/// Configuration for an AlterConfig operation.
+pub struct AlterConfig<'a> {
+    /// Identifies the resource to be altered.
+    pub specifier: ResourceSpecifier<'a>,
+    /// The configuration parameters to be updated.
+    pub entries: HashMap<&'a str, &'a str>,
+}
+
+impl<'a> AlterConfig<'a> {
+    /// Creates a new `AlterConfig`.
+    pub fn new(specifier: ResourceSpecifier) -> AlterConfig {
+        AlterConfig {
+            specifier,
+            entries: HashMap::new(),
+        }
+    }
+
+    /// Sets the configuration parameter named `key` to the specified `value`.
+    pub fn set(mut self, key: &'a str, value: &'a str) -> AlterConfig<'a> {
+        self.entries.insert(key, value);
+        self
+    }
+
+    fn to_native(&self, err_buf: &mut ErrBuf) -> KafkaResult<NativeConfigResource> {
+        let (name, typ) = match self.specifier {
+            ResourceSpecifier::Topic(name) => (CString::new(name)?, RDKafkaResourceType::RD_KAFKA_RESOURCE_TOPIC),
+            ResourceSpecifier::Group(name) => (CString::new(name)?, RDKafkaResourceType::RD_KAFKA_RESOURCE_GROUP),
+            ResourceSpecifier::Broker(id) => (
+                CString::new(format!("{}", id))?,
+                RDKafkaResourceType::RD_KAFKA_RESOURCE_BROKER,
+            ),
+        };
+        // N.B.: we wrap config immediately, so that it is destroyed via the
+        // NativeNewTopic's Drop implementation if config installation fails.
+        let config = unsafe { NativeConfigResource::from_ptr(rdsys::rd_kafka_ConfigResource_new(typ, name.as_ptr())) };
+        for (key, val) in &self.entries {
+            let key_c = CString::new(*key)?;
+            let val_c = CString::new(*val)?;
+            let res =
+                unsafe { rdsys::rd_kafka_ConfigResource_set_config(config.ptr(), key_c.as_ptr(), val_c.as_ptr()) };
+            check_rdkafka_invalid_arg(res, err_buf)?;
+        }
+        Ok(config)
+    }
+}
+
+struct AlterConfigsFuture {
+    rx: Oneshot<NativeEvent>,
+}
+
+impl Future for AlterConfigsFuture {
+    type Item = Vec<AlterConfigsResult>;
+    type Error = KafkaError;
+
+    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+        match self.rx.poll() {
+            Ok(Async::Ready(event)) => {
+                event.check_error()?;
+                let res = unsafe { rdsys::rd_kafka_event_AlterConfigs_result(event.ptr()) };
+                if res.is_null() {
+                    let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
+                    return Err(KafkaError::AdminOpCreation(format!(
+                        "alter configs request received response of incorrect type ({})",
+                        typ
+                    )));
+                }
+                let mut n = 0;
+                let resources = unsafe { rdsys::rd_kafka_AlterConfigs_result_resources(res, &mut n) };
+                let mut out = Vec::with_capacity(n);
+                for i in 0..n {
+                    let resource = unsafe { *resources.offset(i as isize) };
+                    let specifier = extract_config_specifier(resource)?;
+                    out.push(Ok(specifier));
+                }
+                Ok(Async::Ready(out))
+            }
+            Ok(Async::NotReady) => Ok(Async::NotReady),
+            Err(Canceled) => Err(KafkaError::Canceled),
+        }
+    }
+}
diff --git a/src/client.rs b/src/client.rs
index ac465930a..76eba6e5e 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -233,6 +233,14 @@ impl<C: ClientContext> Client<C> {
             )}
         )
     }
+
+    /// Returns a NativeQueue from the current client. The NativeQueue shouldn't
+    /// outlive the client it was generated from.
+    pub(crate) fn new_native_queue(&self) -> NativeQueue {
+        unsafe {
+            NativeQueue::from_ptr(rdsys::rd_kafka_queue_new(self.native_ptr()))
+        }
+    }
 }
 
 struct NativeTopic {
@@ -269,6 +277,41 @@ impl Drop for NativeTopic {
     }
 }
 
+pub(crate) struct NativeQueue {
+    ptr: *mut RDKafkaQueue
+}
+
+// The library is completely thread safe, according to the documentation.
+unsafe impl Sync for NativeQueue {}
+unsafe impl Send for NativeQueue {}
+
+impl NativeQueue {
+    /// Wraps a pointer to an `RDKafkaQueue` object and returns a new
+    /// `NativeQueue`.
+    unsafe fn from_ptr(ptr: *mut RDKafkaQueue) -> NativeQueue {
+        NativeQueue { ptr }
+    }
+
+    /// Returns the pointer to the librdkafka RDKafkaQueue structure.
+    pub fn ptr(&self) -> *mut RDKafkaQueue {
+        self.ptr
+    }
+
+    pub fn poll<T: Into<Option<Duration>>>(&self, t: T) -> *mut RDKafkaEvent {
+        unsafe { rdsys::rd_kafka_queue_poll(self.ptr, timeout_to_ms(t)) }
+    }
+}
+
+impl Drop for NativeQueue {
+    fn drop(&mut self) {
+        trace!("Destroying queue: {:?}", self.ptr);
+        unsafe {
+            rdsys::rd_kafka_queue_destroy(self.ptr);
+        }
+        trace!("Queue destroyed: {:?}", self.ptr);
+    }
+}
+
 pub(crate) unsafe extern "C" fn native_log_cb<C: ClientContext>(
         client: *const RDKafka, level: i32,
         fac: *const c_char, buf: *const c_char) {
diff --git a/src/error.rs b/src/error.rs
index 293132ceb..37a8a641d 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -32,6 +32,12 @@ impl IsError for RDKafkaConfRes {
 /// Represents all Kafka errors. Check the underlying `RDKafkaError` to get details.
 #[derive(Clone, PartialEq, Eq)]
 pub enum KafkaError {
+    /// Creation of admin operation failed.
+    AdminOpCreation(String),
+    /// The admin operation itself failed.
+    AdminOp(RDKafkaError),
+    /// The client was dropped before the operation completed.
+    Canceled,
     /// Invalid client configuration.
     ClientConfig(RDKafkaConfRes, String, String, String),
     /// Client creation failed.
@@ -67,6 +73,9 @@ pub enum KafkaError {
 impl fmt::Debug for KafkaError {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
         match *self {
+            KafkaError::AdminOp(err) => write!(f, "KafkaError (Admin operation error: {})", err),
+            KafkaError::AdminOpCreation(ref err) => write!(f, "KafkaError (Admin operation creation error: {})", err),
+            KafkaError::Canceled => write!(f, "KafkaError (Client dropped)"),
             KafkaError::ClientConfig(_, ref desc, ref key, ref value) => write!(f, "KafkaError (Client config error: {} {} {})", desc, key, value),
             KafkaError::ClientCreation(ref err) => write!(f, "KafkaError (Client creation error: {})", err),
             KafkaError::ConsumerCommit(err) => write!(f, "KafkaError (Consumer commit error: {})", err),
@@ -89,6 +98,9 @@ impl fmt::Debug for KafkaError {
 impl fmt::Display for KafkaError {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
         match *self {
+            KafkaError::AdminOp(err) => write!(f, "Admin operation error: {}", err),
+            KafkaError::AdminOpCreation(ref err) => write!(f, "Admin operation creation error: {}", err),
+            KafkaError::Canceled => write!(f, "KafkaError (Client dropped)"),
             KafkaError::ClientConfig(_, ref desc, ref key, ref value) => write!(f, "Client config error: {} {} {}", desc, key, value),
             KafkaError::ClientCreation(ref err) => write!(f, "Client creation error: {}", err),
             KafkaError::ConsumerCommit(err) => write!(f, "Consumer commit error: {}", err),
@@ -111,6 +123,9 @@ impl fmt::Display for KafkaError {
 impl error::Error for KafkaError {
     fn description(&self) -> &str {
         match *self {
+            KafkaError::AdminOp(_) => "Admin operation error",
+            KafkaError::AdminOpCreation(_) => "Admin operation creation error",
+            KafkaError::Canceled => "Client dropped",
             KafkaError::ClientConfig(_, _, _, _) => "Client config error",
             KafkaError::ClientCreation(_) => "Client creation error",
             KafkaError::ConsumerCommit(_) => "Consumer commit error",
@@ -132,6 +147,9 @@ impl error::Error for KafkaError {
     #[allow(clippy::match_same_arms)]
     fn cause(&self) -> Option<&error::Error> {
         match *self {
+            KafkaError::AdminOp(_) => None,
+            KafkaError::AdminOpCreation(_) => None,
+            KafkaError::Canceled => None,
             KafkaError::ClientConfig(_, _, _, _) => None,
             KafkaError::ClientCreation(_) => None,
             KafkaError::ConsumerCommit(ref err) => Some(err),
diff --git a/src/lib.rs b/src/lib.rs
index 563355729..e3f1bde98 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -224,6 +224,7 @@ extern crate rdkafka_sys as rdsys;
 
 pub use crate::rdsys::types as types;
 
+pub mod admin;
 pub mod client;
 pub mod config;
 pub mod consumer;
diff --git a/src/util.rs b/src/util.rs
index feb0d7c94..a3eaffaa2 100644
--- a/src/util.rs
+++ b/src/util.rs
@@ -141,6 +141,27 @@ impl Default for ErrBuf {
     }
 }
 
+pub(crate) trait WrappedCPointer {
+    type Target;
+
+    fn ptr(&self) -> *mut Self::Target;
+
+    fn is_null(&self) -> bool {
+        self.ptr().is_null()
+    }
+}
+
+/// Converts a container into a C array.
+pub(crate) trait AsCArray<T: WrappedCPointer> {
+    fn as_c_array(&self) -> *mut *mut T::Target;
+}
+
+impl<T: WrappedCPointer> AsCArray<T> for Vec<T> {
+    fn as_c_array(&self) -> *mut *mut T::Target {
+        self.as_ptr() as *mut *mut T::Target
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/tests/test_admin.rs b/tests/test_admin.rs
new file mode 100644
index 000000000..c3d7bc499
--- /dev/null
+++ b/tests/test_admin.rs
@@ -0,0 +1,390 @@
+//! Test administrative commands using the admin API.
+
+use backoff::{ExponentialBackoff, Operation};
+
+use futures::Future;
+
+use std::time::Duration;
+
+use rdkafka::admin::{
+    AdminClient, AdminOptions, AlterConfig, ConfigEntry, ConfigSource, NewPartitions, NewTopic, OwnedResourceSpecifier,
+    ResourceSpecifier, TopicReplication,
+};
+use rdkafka::client::DefaultClientContext;
+use rdkafka::consumer::{BaseConsumer, Consumer, DefaultConsumerContext};
+use rdkafka::error::{KafkaError, RDKafkaError};
+use rdkafka::metadata::Metadata;
+use rdkafka::ClientConfig;
+
+mod utils;
+use crate::utils::*;
+
+fn create_config() -> ClientConfig {
+    let mut config = ClientConfig::new();
+    config.set("bootstrap.servers", get_bootstrap_server().as_str());
+    config
+}
+
+fn create_admin_client() -> AdminClient<DefaultClientContext> {
+    create_config().create().expect("admin client creation failed")
+}
+
+fn fetch_metadata(topic: &str) -> Metadata {
+    let consumer: BaseConsumer<DefaultConsumerContext> = create_config().create().expect("consumer creation failed");
+    let timeout = Some(Duration::from_secs(1));
+
+    let mut backoff = ExponentialBackoff::default();
+    backoff.max_elapsed_time = Some(Duration::from_secs(5));
+    (|| {
+        let metadata = consumer
+            .fetch_metadata(Some(topic), timeout)
+            .map_err(|e| e.to_string())?;
+        if metadata.topics().len() == 0 {
+            Err("metadata fetch returned no topics".to_string())?
+        }
+        let topic = &metadata.topics()[0];
+        if topic.partitions().len() == 0 {
+            Err("metadata fetch returned a topic with no partitions".to_string())?
+        }
+        Ok(metadata)
+    })
+    .retry(&mut backoff)
+    .unwrap()
+}
+
+fn verify_delete(topic: &str) {
+    let consumer: BaseConsumer<DefaultConsumerContext> = create_config().create().expect("consumer creation failed");
+    let timeout = Some(Duration::from_secs(1));
+
+    let mut backoff = ExponentialBackoff::default();
+    backoff.max_elapsed_time = Some(Duration::from_secs(5));
+    (|| {
+        // Asking about the topic specifically will recreate it (under the
+        // default Kafka configuration, at least) so we have to ask for the list
+        // of all topics and search through it.
+        let metadata = consumer.fetch_metadata(None, timeout).map_err(|e| e.to_string())?;
+        if let Some(_) = metadata.topics().iter().find(|t| t.name() == topic) {
+            Err(format!("topic {} still exists", topic))?
+        }
+        Ok(())
+    })
+    .retry(&mut backoff)
+    .unwrap()
+}
+
+#[test]
+fn test_topics() {
+    let admin_client = create_admin_client();
+    let opts = AdminOptions::new().operation_timeout(Duration::from_secs(1));
+
+    // Verify that topics are created as specified, and that they can later
+    // be deleted.
+    {
+        let name1 = rand_test_topic();
+        let name2 = rand_test_topic();
+
+        // Test both the builder API and the literal construction.
+        let topic1 = NewTopic::new(&name1, 1, TopicReplication::Fixed(1))
+            .set("max.message.bytes", "1234");
+        let topic2 = NewTopic {
+            name: &name2,
+            num_partitions: 3,
+            replication: TopicReplication::Variable(&[&[0], &[0], &[0]]),
+            config: Vec::new(),
+        };
+
+        let res = admin_client
+            .create_topics(&[topic1, topic2], &opts)
+            .wait()
+            .expect("topic creation failed");
+        assert_eq!(res, &[Ok(name1.clone()), Ok(name2.clone())]);
+
+        let metadata1 = fetch_metadata(&name1);
+        let metadata2 = fetch_metadata(&name2);
+        assert_eq!(1, metadata1.topics().len());
+        assert_eq!(1, metadata2.topics().len());
+        let metadata_topic1 = &metadata1.topics()[0];
+        let metadata_topic2 = &metadata2.topics()[0];
+        assert_eq!(&name1, metadata_topic1.name());
+        assert_eq!(&name2, metadata_topic2.name());
+        assert_eq!(1, metadata_topic1.partitions().len());
+        assert_eq!(3, metadata_topic2.partitions().len());
+
+        let res = admin_client
+            .describe_configs(
+                &[ResourceSpecifier::Topic(&name1), ResourceSpecifier::Topic(&name2)],
+                &opts,
+            )
+            .wait()
+            .expect("describe configs failed");
+        let config1 = &res[0].as_ref().expect("describe configs failed on topic 1");
+        let config2 = &res[1].as_ref().expect("describe configs failed on topic 2");
+        let mut expected_entry1 = ConfigEntry {
+            name: "max.message.bytes".into(),
+            value: Some("1234".into()),
+            source: ConfigSource::DynamicTopic,
+            is_read_only: false,
+            is_default: false,
+            is_sensitive: false,
+        };
+        let expected_entry2 = ConfigEntry {
+            name: "max.message.bytes".into(),
+            value: Some("1000012".into()),
+            source: ConfigSource::Default,
+            is_read_only: false,
+            is_default: true,
+            is_sensitive: false,
+        };
+        if get_broker_version() < KafkaVersion(1, 1, 0, 0) {
+            expected_entry1.source = ConfigSource::Unknown;
+        }
+        assert_eq!(Some(&expected_entry1), config1.get("max.message.bytes"));
+        assert_eq!(Some(&expected_entry2), config2.get("max.message.bytes"));
+        let config_entries1 = config1.entry_map();
+        let config_entries2 = config2.entry_map();
+        assert_eq!(config1.entries.len(), config_entries1.len());
+        assert_eq!(config2.entries.len(), config_entries2.len());
+        assert_eq!(Some(&&expected_entry1), config_entries1.get("max.message.bytes"));
+        assert_eq!(Some(&&expected_entry2), config_entries2.get("max.message.bytes"));
+
+        let partitions1 = NewPartitions::new(&name1, 5);
+        let res = admin_client
+            .create_partitions(&[partitions1], &opts)
+            .wait()
+            .expect("partition creation failed");
+        assert_eq!(res, &[Ok(name1.clone())]);
+
+        let mut backoff = ExponentialBackoff::default();
+        backoff.max_elapsed_time = Some(Duration::from_secs(5));
+        (|| {
+            let metadata = fetch_metadata(&name1);
+            let topic = &metadata.topics()[0];
+            let n = topic.partitions().len();
+            if n != 5 {
+                Err(format!("topic has {} partitions, but expected {}", n, 5))?;
+            }
+            Ok(())
+        })
+        .retry(&mut backoff)
+        .unwrap();
+
+        let res = admin_client
+            .delete_topics(&[&name1, &name2], &opts)
+            .wait()
+            .expect("topic deletion failed");
+        assert_eq!(res, &[Ok(name1.clone()), Ok(name2.clone())]);
+        verify_delete(&name1);
+        verify_delete(&name2);
+    }
+
+    // Verify that incorrect replication configurations are ignored when
+    // creating topics.
+    {
+        let topic = NewTopic::new("ignored", 1, TopicReplication::Variable(&[&[0], &[0]]));
+        let res = admin_client.create_topics(&[topic], &opts).wait();
+        assert_eq!(
+            Err(KafkaError::AdminOpCreation(
+                "replication configuration for topic 'ignored' assigns 2 partition(s), \
+                 which does not match the specified number of partitions (1)"
+                    .into()
+            )),
+            res,
+        )
+    }
+
+    // Verify that incorrect replication configurations are ignored when
+    // creating partitions.
+    {
+        let name = rand_test_topic();
+        let topic = NewTopic::new(&name, 1, TopicReplication::Fixed(1));
+
+        let res = admin_client
+            .create_topics(vec![&topic], &opts)
+            .wait()
+            .expect("topic creation failed");
+        assert_eq!(res, &[Ok(name.clone())]);
+        let _ = fetch_metadata(&name);
+
+        // This partition specification is obviously garbage, and so trips
+        // a client-side error.
+        let partitions = NewPartitions::new(&name, 2).assign(&[&[0], &[0], &[0]]);
+        let res = admin_client.create_partitions(&[partitions], &opts).wait();
+        assert_eq!(
+            res,
+            Err(KafkaError::AdminOpCreation(format!(
+                "partition assignment for topic '{}' assigns 3 partition(s), \
+                 which is more than the requested total number of partitions (2)",
+                name
+            )))
+        );
+
+        // Only the server knows that this partition specification is garbage.
+        let partitions = NewPartitions::new(&name, 2).assign(&[&[0], &[0]]);
+        let res = admin_client
+            .create_partitions(&[partitions], &opts)
+            .wait()
+            .expect("partition creation failed");
+        assert_eq!(res, &[Err((name, RDKafkaError::InvalidReplicaAssignment))],);
+    }
+
+    // Verify that deleting a non-existent topic fails.
+    {
+        let name = rand_test_topic();
+        let res = admin_client
+            .delete_topics(&[&name], &opts)
+            .wait()
+            .expect("delete topics failed");
+        assert_eq!(res, &[Err((name, RDKafkaError::UnknownTopicOrPartition))]);
+    }
+
+    // Verify that mixed-success operations properly report the successful and
+    // failing operators.
+    {
+        let name1 = rand_test_topic();
+        let name2 = rand_test_topic();
+
+        let topic1 = NewTopic::new(&name1, 1, TopicReplication::Fixed(1));
+        let topic2 = NewTopic::new(&name2, 1, TopicReplication::Fixed(1));
+
+        let res = admin_client
+            .create_topics(vec![&topic1], &opts)
+            .wait()
+            .expect("topic creation failed");
+        assert_eq!(res, &[Ok(name1.clone())]);
+        let _ = fetch_metadata(&name1);
+
+        let res = admin_client
+            .create_topics(vec![&topic1, &topic2], &opts)
+            .wait()
+            .expect("topic creation failed");
+        assert_eq!(
+            res,
+            &[
+                Err((name1.clone(), RDKafkaError::TopicAlreadyExists)),
+                Ok(name2.clone())
+            ]
+        );
+        let _ = fetch_metadata(&name2);
+
+        let res = admin_client
+            .delete_topics(&[&name1], &opts)
+            .wait()
+            .expect("topic deletion failed");
+        assert_eq!(res, &[Ok(name1.clone())]);
+        verify_delete(&name1);
+
+        let res = admin_client
+            .delete_topics(&[&name2, &name1], &opts)
+            .wait()
+            .expect("topic deletion failed");
+        assert_eq!(
+            res,
+            &[
+                Ok(name2.clone()),
+                Err((name1.clone(), RDKafkaError::UnknownTopicOrPartition))
+            ]
+        );
+    }
+}
+
+#[test]
+fn test_configs() {
+    let admin_client = create_admin_client();
+    let opts = AdminOptions::new();
+    let broker = ResourceSpecifier::Broker(0);
+
+    let res = admin_client
+        .describe_configs(&[broker], &opts)
+        .wait()
+        .expect("describe configs failed");
+    let config = &res[0].as_ref().expect("describe configs failed");
+    let orig_val = config
+        .get("log.flush.interval.messages")
+        .expect("original config entry missing")
+        .value
+        .as_ref()
+        .expect("original value missing");
+
+    let config = AlterConfig::new(broker).set("log.flush.interval.messages", "1234");
+    let res = admin_client
+        .alter_configs(&[config], &opts)
+        .wait()
+        .expect("alter configs failed");
+    assert_eq!(res, &[Ok(OwnedResourceSpecifier::Broker(0))]);
+
+    let mut backoff = ExponentialBackoff::default();
+    backoff.max_elapsed_time = Some(Duration::from_secs(5));
+    (|| {
+        let res = admin_client
+            .describe_configs(&[broker], &opts)
+            .wait()
+            .expect("describe configs failed");
+        let config = &res[0].as_ref().expect("describe configs failed");
+        let entry = config.get("log.flush.interval.messages");
+        let expected_entry = if get_broker_version() < KafkaVersion(1, 1, 0, 0) {
+            // Pre-1.1, the AlterConfig operation will silently fail, and the
+            // config will remain unchanged, which I guess is worth testing.
+            ConfigEntry {
+                name: "log.flush.interval.messages".into(),
+                value: Some(orig_val.clone()),
+                source: ConfigSource::Default,
+                is_read_only: true,
+                is_default: true,
+                is_sensitive: false,
+            }
+        } else {
+            ConfigEntry {
+                name: "log.flush.interval.messages".into(),
+                value: Some("1234".into()),
+                source: ConfigSource::DynamicBroker,
+                is_read_only: false,
+                is_default: false,
+                is_sensitive: false,
+            }
+        };
+        if entry != Some(&expected_entry) {
+            Err(format!("{:?} != {:?}", entry, Some(&expected_entry)))?
+        }
+        Ok(())
+    })
+    .retry(&mut backoff)
+    .unwrap();
+
+    let config = AlterConfig::new(broker).set("log.flush.interval.ms", &orig_val);
+    let res = admin_client
+        .alter_configs(&[config], &opts)
+        .wait()
+        .expect("alter configs failed");
+    assert_eq!(res, &[Ok(OwnedResourceSpecifier::Broker(0))]);
+}
+
+// Tests whether each admin operation properly reports an error if the entire
+// request fails. The original implementations failed to check this, resulting
+// in confusing situations where a failed admin request would return Ok([]).
+#[test]
+fn test_event_errors() {
+    // Configure an admin client to target a Kafka server that doesn't exist,
+    // then set an impossible timeout. This will ensure that every request fails
+    // with an OperationTimedOut error, assuming, of course, that the request
+    // passes client-side validation.
+    let admin_client = ClientConfig::new()
+        .set("bootstrap.servers", "noexist")
+        .create::<AdminClient<DefaultClientContext>>()
+        .expect("admin client creation failed");
+    let opts = AdminOptions::new().request_timeout(Duration::from_nanos(1));
+
+    let res = admin_client.create_topics(&[], &opts).wait();
+    assert_eq!(res, Err(KafkaError::AdminOp(RDKafkaError::OperationTimedOut)));
+
+    let res = admin_client.create_partitions(&[], &opts).wait();
+    assert_eq!(res, Err(KafkaError::AdminOp(RDKafkaError::OperationTimedOut)));
+
+    let res = admin_client.delete_topics(&[], &opts).wait();
+    assert_eq!(res, Err(KafkaError::AdminOp(RDKafkaError::OperationTimedOut)));
+
+    let res = admin_client.describe_configs(&[], &opts).wait();
+    assert_eq!(res.err(), Some(KafkaError::AdminOp(RDKafkaError::OperationTimedOut)));
+
+    let res = admin_client.alter_configs(&[], &opts).wait();
+    assert_eq!(res, Err(KafkaError::AdminOp(RDKafkaError::OperationTimedOut)));
+}
diff --git a/tests/utils.rs b/tests/utils.rs
index 3aaa7646f..fdc555b11 100644
--- a/tests/utils.rs
+++ b/tests/utils.rs
@@ -1,10 +1,12 @@
 #![allow(dead_code)]
 extern crate rdkafka;
 extern crate rand;
+extern crate regex;
 extern crate futures;
 
-use rand::Rng;
 use futures::*;
+use rand::Rng;
+use regex::Regex;
 
 use rdkafka::client::ClientContext;
 use rdkafka::config::ClientConfig;
@@ -13,7 +15,7 @@ use rdkafka::message::ToBytes;
 use rdkafka::statistics::Statistics;
 
 use std::collections::HashMap;
-use std::env;
+use std::env::{self, VarError};
 
 #[macro_export]
 macro_rules! map(
@@ -46,6 +48,28 @@ pub fn get_bootstrap_server() -> String {
     env::var("KAFKA_HOST").unwrap_or_else(|_| "localhost:9092".to_owned())
 }
 
+pub fn get_broker_version() -> KafkaVersion {
+    // librdkafka doesn't expose this directly, sadly.
+    match env::var("KAFKA_VERSION") {
+        Ok(v) => {
+            let regex = Regex::new(r"^(\d+)(?:\.(\d+))?(?:\.(\d+))?(?:\.(\d+))?$").unwrap();
+            match regex.captures(&v) {
+                Some(captures) => {
+                    let extract = |i| captures.get(i).map(|m| m.as_str().parse().unwrap()).unwrap_or(0);
+                    KafkaVersion(extract(1), extract(2), extract(3), extract(4))
+                },
+                None => panic!("KAFKA_VERSION env var was not in expected [n[.n[.n[.n]]]] format")
+            }
+        },
+        Err(VarError::NotUnicode(_)) => panic!("KAFKA_VERSION env var contained non-unicode characters"),
+        // If the environment variable is unset, assume we're running the latest version.
+        Err(VarError::NotPresent) => KafkaVersion(std::u32::MAX, std::u32::MAX, std::u32::MAX, std::u32::MAX),
+    }
+}
+
+#[derive(Debug, Eq, PartialEq, Ord, PartialOrd)]
+pub struct KafkaVersion(pub u32, pub u32, pub u32, pub u32);
+
 pub struct TestContext {
     _some_data: i64, // Add some data so that valgrind can check proper allocation
 }

From fc8b4f01fc54d30f0549c3a681f00e11f38deac3 Mon Sep 17 00:00:00 2001
From: Nikhil Benesch <nikhil.benesch@gmail.com>
Date: Tue, 6 Aug 2019 14:19:15 -0400
Subject: [PATCH 3/3] Allow -1 as orig_broker_id

After diving into the librdkafka source, this is a valid return value
for metadata.orig_broker_id().
---
 tests/test_metadata.rs | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/tests/test_metadata.rs b/tests/test_metadata.rs
index 4c4c815ff..30438e915 100644
--- a/tests/test_metadata.rs
+++ b/tests/test_metadata.rs
@@ -40,7 +40,12 @@ fn test_metadata() {
     let consumer = create_consumer(&rand_test_group());
 
     let metadata = consumer.fetch_metadata(None, Duration::from_secs(5)).unwrap();
-    assert_eq!(metadata.orig_broker_id(), 0);
+    let orig_broker_id = metadata.orig_broker_id();
+    // The orig_broker_id may be -1 if librdkafka's bootstrap "broker" handles
+    // the request.
+    if orig_broker_id != -1 && orig_broker_id != 0 {
+        panic!("metadata.orig_broker_id = {}, not 0 or 1 as expected", orig_broker_id)
+    }
     assert!(!metadata.orig_broker_name().is_empty());
 
     let broker_metadata = metadata.brokers();