Skip to content

Replace parts of futures-util with std APIs #1233

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions postgres/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::{Error, Notification};
use futures_util::{future, pin_mut, Stream};
use futures_util::Stream;
use std::collections::VecDeque;
use std::future::Future;
use std::future::{self, Future};
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::pin::{pin, Pin};
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite};
@@ -52,7 +52,7 @@ impl Connection {
where
F: Future<Output = Result<T, Error>>,
{
pin_mut!(future);
let mut future = pin!(future);
self.poll_block_on(|cx, _, _| future.as_mut().poll(cx))
}

4 changes: 2 additions & 2 deletions postgres/src/notifications.rs
Original file line number Diff line number Diff line change
@@ -3,9 +3,9 @@
use crate::connection::ConnectionRef;
use crate::{Error, Notification};
use fallible_iterator::FallibleIterator;
use futures_util::{ready, FutureExt};
use futures_util::FutureExt;
use std::pin::Pin;
use std::task::Poll;
use std::task::{ready, Poll};
use std::time::Duration;
use tokio::time::{self, Instant, Sleep};

4 changes: 2 additions & 2 deletions tokio-postgres/src/binary_copy.rs
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ use crate::types::{FromSql, IsNull, ToSql, Type, WrongType};
use crate::{slice_iter, CopyInSink, CopyOutStream, Error};
use byteorder::{BigEndian, ByteOrder};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures_util::{ready, SinkExt, Stream};
use futures_util::{SinkExt, Stream};
use pin_project_lite::pin_project;
use postgres_types::BorrowToSql;
use std::convert::TryFrom;
@@ -13,7 +13,7 @@ use std::io::Cursor;
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};

const MAGIC: &[u8] = b"PGCOPY\n\xff\r\n\0";
const HEADER_LEN: usize = MAGIC.len() + 4 + 4;
25 changes: 13 additions & 12 deletions tokio-postgres/src/client.rs
Original file line number Diff line number Diff line change
@@ -19,18 +19,20 @@ use crate::{
use bytes::{Buf, BytesMut};
use fallible_iterator::FallibleIterator;
use futures_channel::mpsc;
use futures_util::{future, pin_mut, ready, StreamExt, TryStreamExt};
use futures_util::{StreamExt, TryStreamExt};
use parking_lot::Mutex;
use postgres_protocol::message::backend::Message;
use postgres_types::BorrowToSql;
use std::collections::HashMap;
use std::fmt;
use std::future;
#[cfg(feature = "runtime")]
use std::net::IpAddr;
#[cfg(feature = "runtime")]
use std::path::PathBuf;
use std::pin::pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};
#[cfg(feature = "runtime")]
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite};
@@ -300,8 +302,7 @@ impl Client {
where
T: ?Sized + ToStatement,
{
let stream = self.query_raw(statement, slice_iter(params)).await?;
pin_mut!(stream);
let mut stream = pin!(self.query_raw(statement, slice_iter(params)).await?);

let mut first = None;

@@ -336,18 +337,18 @@ impl Client {
///
/// ```no_run
/// # async fn async_main(client: &tokio_postgres::Client) -> Result<(), tokio_postgres::Error> {
/// use futures_util::{pin_mut, TryStreamExt};
/// use std::pin::pin;
/// use futures_util::TryStreamExt;
///
/// let params: Vec<String> = vec![
/// "first param".into(),
/// "second param".into(),
/// ];
/// let mut it = client.query_raw(
/// let mut it = pin!(client.query_raw(
/// "SELECT foo FROM bar WHERE biz = $1 AND baz = $2",
/// params,
/// ).await?;
/// ).await?);
///
/// pin_mut!(it);
/// while let Some(row) = it.try_next().await? {
/// let foo: i32 = row.get("foo");
/// println!("foo: {}", foo);
@@ -402,19 +403,19 @@ impl Client {
///
/// ```no_run
/// # async fn async_main(client: &tokio_postgres::Client) -> Result<(), tokio_postgres::Error> {
/// use futures_util::{pin_mut, TryStreamExt};
/// use std::pin::pin;
/// use futures_util::{TryStreamExt};
/// use tokio_postgres::types::Type;
///
/// let params: Vec<(String, Type)> = vec![
/// ("first param".into(), Type::TEXT),
/// ("second param".into(), Type::TEXT),
/// ];
/// let mut it = client.query_typed_raw(
/// let mut it = pin!(client.query_typed_raw(
/// "SELECT foo FROM bar WHERE biz = $1 AND baz = $2",
/// params,
/// ).await?;
/// ).await?);
///
/// pin_mut!(it);
/// while let Some(row) = it.try_next().await? {
/// let foo: i32 = row.get("foo");
/// println!("foo: {}", foo);
24 changes: 13 additions & 11 deletions tokio-postgres/src/connect.rs
Original file line number Diff line number Diff line change
@@ -4,8 +4,10 @@ use crate::connect_raw::connect_raw;
use crate::connect_socket::connect_socket;
use crate::tls::MakeTlsConnect;
use crate::{Client, Config, Connection, Error, SimpleQueryMessage, Socket};
use futures_util::{future, pin_mut, Future, FutureExt, Stream};
use futures_util::{FutureExt, Stream};
use rand::seq::SliceRandom;
use std::future::{self, Future};
use std::pin::pin;
use std::task::Poll;
use std::{cmp, io};
use tokio::net;
@@ -161,18 +163,18 @@ where
let (mut client, mut connection) = connect_raw(socket, tls, has_hostname, config).await?;

if config.target_session_attrs != TargetSessionAttrs::Any {
let rows = client.simple_query_raw("SHOW transaction_read_only");
pin_mut!(rows);
let mut rows = pin!(client.simple_query_raw("SHOW transaction_read_only"));

let rows = future::poll_fn(|cx| {
if connection.poll_unpin(cx)?.is_ready() {
return Poll::Ready(Err(Error::closed()));
}
let mut rows = pin!(
future::poll_fn(|cx| {
if connection.poll_unpin(cx)?.is_ready() {
return Poll::Ready(Err(Error::closed()));
}

rows.as_mut().poll(cx)
})
.await?;
pin_mut!(rows);
rows.as_mut().poll(cx)
})
.await?
);

loop {
let next = future::poll_fn(|cx| {
4 changes: 2 additions & 2 deletions tokio-postgres/src/connect_raw.rs
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@ use crate::{Client, Connection, Error};
use bytes::BytesMut;
use fallible_iterator::FallibleIterator;
use futures_channel::mpsc;
use futures_util::{ready, Sink, SinkExt, Stream, TryStreamExt};
use futures_util::{Sink, SinkExt, Stream, TryStreamExt};
use postgres_protocol::authentication;
use postgres_protocol::authentication::sasl;
use postgres_protocol::authentication::sasl::ScramSha256;
@@ -17,7 +17,7 @@ use std::borrow::Cow;
use std::collections::{HashMap, VecDeque};
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::Framed;

4 changes: 2 additions & 2 deletions tokio-postgres/src/connection.rs
Original file line number Diff line number Diff line change
@@ -6,14 +6,14 @@ use crate::{AsyncMessage, Error, Notification};
use bytes::BytesMut;
use fallible_iterator::FallibleIterator;
use futures_channel::mpsc;
use futures_util::{ready, stream::FusedStream, Sink, Stream, StreamExt};
use futures_util::{stream::FusedStream, Sink, Stream, StreamExt};
use log::{info, trace};
use postgres_protocol::message::backend::Message;
use postgres_protocol::message::frontend;
use std::collections::{HashMap, VecDeque};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::Framed;

5 changes: 3 additions & 2 deletions tokio-postgres/src/copy_in.rs
Original file line number Diff line number Diff line change
@@ -5,15 +5,16 @@ use crate::query::extract_row_affected;
use crate::{query, slice_iter, Error, Statement};
use bytes::{Buf, BufMut, BytesMut};
use futures_channel::mpsc;
use futures_util::{future, ready, Sink, SinkExt, Stream, StreamExt};
use futures_util::{Sink, SinkExt, Stream, StreamExt};
use log::debug;
use pin_project_lite::pin_project;
use postgres_protocol::message::backend::Message;
use postgres_protocol::message::frontend;
use postgres_protocol::message::frontend::CopyData;
use std::future;
use std::marker::{PhantomData, PhantomPinned};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};

enum CopyInMessage {
Message(FrontendMessage),
4 changes: 2 additions & 2 deletions tokio-postgres/src/copy_out.rs
Original file line number Diff line number Diff line change
@@ -3,13 +3,13 @@ use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::{query, slice_iter, Error, Statement};
use bytes::Bytes;
use futures_util::{ready, Stream};
use futures_util::Stream;
use log::debug;
use pin_project_lite::pin_project;
use postgres_protocol::message::backend::Message;
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};

pub async fn copy_out(client: &InnerClient, statement: Statement) -> Result<CopyOutStream, Error> {
debug!("executing copy out statement {}", statement.name());
7 changes: 3 additions & 4 deletions tokio-postgres/src/prepare.rs
Original file line number Diff line number Diff line change
@@ -7,12 +7,12 @@ use crate::{query, slice_iter};
use crate::{Column, Error, Statement};
use bytes::Bytes;
use fallible_iterator::FallibleIterator;
use futures_util::{pin_mut, TryStreamExt};
use futures_util::TryStreamExt;
use log::debug;
use postgres_protocol::message::backend::Message;
use postgres_protocol::message::frontend;
use std::future::Future;
use std::pin::Pin;
use std::pin::{pin, Pin};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

@@ -142,8 +142,7 @@ pub(crate) async fn get_type(client: &Arc<InnerClient>, oid: Oid) -> Result<Type

let stmt = typeinfo_statement(client).await?;

let rows = query::query(client, stmt, slice_iter(&[&oid])).await?;
pin_mut!(rows);
let mut rows = pin!(query::query(client, stmt, slice_iter(&[&oid])).await?);

let row = match rows.try_next().await? {
Some(row) => row,
4 changes: 2 additions & 2 deletions tokio-postgres/src/query.rs
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@ use crate::types::{BorrowToSql, IsNull};
use crate::{Column, Error, Portal, Row, Statement};
use bytes::{Bytes, BytesMut};
use fallible_iterator::FallibleIterator;
use futures_util::{ready, Stream};
use futures_util::Stream;
use log::{debug, log_enabled, Level};
use pin_project_lite::pin_project;
use postgres_protocol::message::backend::{CommandCompleteBody, Message};
@@ -16,7 +16,7 @@ use std::fmt;
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};

struct BorrowToSqlParamsDebug<'a, T>(&'a [T]);

4 changes: 2 additions & 2 deletions tokio-postgres/src/simple_query.rs
Original file line number Diff line number Diff line change
@@ -5,15 +5,15 @@ use crate::query::extract_row_affected;
use crate::{Error, SimpleQueryMessage, SimpleQueryRow};
use bytes::Bytes;
use fallible_iterator::FallibleIterator;
use futures_util::{ready, Stream};
use futures_util::Stream;
use log::debug;
use pin_project_lite::pin_project;
use postgres_protocol::message::backend::Message;
use postgres_protocol::message::frontend;
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};

/// Information about a column of a single query row.
#[derive(Debug)]
12 changes: 5 additions & 7 deletions tokio-postgres/tests/test/binary_copy.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::connect;
use futures_util::{pin_mut, TryStreamExt};
use futures_util::TryStreamExt;
use std::pin::pin;
use tokio_postgres::binary_copy::{BinaryCopyInWriter, BinaryCopyOutStream};
use tokio_postgres::types::Type;

@@ -16,8 +17,7 @@ async fn write_basic() {
.copy_in("COPY foo (id, bar) FROM STDIN BINARY")
.await
.unwrap();
let writer = BinaryCopyInWriter::new(sink, &[Type::INT4, Type::TEXT]);
pin_mut!(writer);
let mut writer = pin!(BinaryCopyInWriter::new(sink, &[Type::INT4, Type::TEXT]));
writer.as_mut().write(&[&1i32, &"foobar"]).await.unwrap();
writer
.as_mut()
@@ -50,8 +50,7 @@ async fn write_many_rows() {
.copy_in("COPY foo (id, bar) FROM STDIN BINARY")
.await
.unwrap();
let writer = BinaryCopyInWriter::new(sink, &[Type::INT4, Type::TEXT]);
pin_mut!(writer);
let mut writer = pin!(BinaryCopyInWriter::new(sink, &[Type::INT4, Type::TEXT]));

for i in 0..10_000i32 {
writer
@@ -86,8 +85,7 @@ async fn write_big_rows() {
.copy_in("COPY foo (id, bar) FROM STDIN BINARY")
.await
.unwrap();
let writer = BinaryCopyInWriter::new(sink, &[Type::INT4, Type::BYTEA]);
pin_mut!(writer);
let mut writer = pin!(BinaryCopyInWriter::new(sink, &[Type::INT4, Type::BYTEA]));

for i in 0..2i32 {
writer
16 changes: 6 additions & 10 deletions tokio-postgres/tests/test/main.rs
Original file line number Diff line number Diff line change
@@ -2,12 +2,11 @@

use bytes::{Bytes, BytesMut};
use futures_channel::mpsc;
use futures_util::{
future, join, pin_mut, stream, try_join, Future, FutureExt, SinkExt, StreamExt, TryStreamExt,
};
use futures_util::{join, stream, try_join, FutureExt, SinkExt, StreamExt, TryStreamExt};
use pin_project_lite::pin_project;
use std::fmt::Write;
use std::pin::Pin;
use std::future::{self, Future};
use std::pin::{pin, Pin};
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::net::TcpStream;
@@ -589,8 +588,7 @@ async fn copy_in() {
.into_iter()
.map(Ok::<_, Error>),
);
let sink = client.copy_in("COPY foo FROM STDIN").await.unwrap();
pin_mut!(sink);
let mut sink = pin!(client.copy_in("COPY foo FROM STDIN").await.unwrap());
sink.send_all(&mut stream).await.unwrap();
let rows = sink.finish().await.unwrap();
assert_eq!(rows, 2);
@@ -636,8 +634,7 @@ async fn copy_in_large() {
.map(Ok::<_, Error>),
);

let sink = client.copy_in("COPY foo FROM STDIN").await.unwrap();
pin_mut!(sink);
let mut sink = pin!(client.copy_in("COPY foo FROM STDIN").await.unwrap());
sink.send_all(&mut stream).await.unwrap();
let rows = sink.finish().await.unwrap();
assert_eq!(rows, 10_000);
@@ -658,8 +655,7 @@ async fn copy_in_error() {
.unwrap();

{
let sink = client.copy_in("COPY foo FROM STDIN").await.unwrap();
pin_mut!(sink);
let mut sink = pin!(client.copy_in("COPY foo FROM STDIN").await.unwrap());
sink.send(Bytes::from_static(b"1\tsteven")).await.unwrap();
}