diff --git a/asyncpg/pool.py b/asyncpg/pool.py
index 5c7ea9ca..a38859e0 100644
--- a/asyncpg/pool.py
+++ b/asyncpg/pool.py
@@ -7,6 +7,7 @@
 from __future__ import annotations
 
 import asyncio
+import typing
 from collections.abc import Awaitable, Callable
 import functools
 import inspect
@@ -341,7 +342,7 @@ class Pool:
         '_queue', '_loop', '_minsize', '_maxsize',
         '_init', '_connect', '_reset', '_connect_args', '_connect_kwargs',
         '_holders', '_initialized', '_initializing', '_closing',
-        '_closed', '_connection_class', '_record_class', '_generation',
+        '_closed', '_connection_class', '_connection_holder_class', '_record_class', '_generation',
         '_setup', '_max_queries', '_max_inactive_connection_lifetime'
     )
 
@@ -356,6 +357,7 @@ def __init__(self, *connect_args,
                  reset=None,
                  loop,
                  connection_class,
+                 connection_holder_class: typing.Type[PoolConnectionHolder] = PoolConnectionHolder,
                  record_class,
                  **connect_kwargs):
 
@@ -408,6 +410,7 @@ def __init__(self, *connect_args,
         self._queue = None
 
         self._connection_class = connection_class
+        self._connection_holder_class = connection_holder_class
         self._record_class = record_class
 
         self._closing = False
@@ -443,37 +446,48 @@ async def _async__init__(self):
             self._initialized = True
 
     async def _initialize(self):
+        self._initialize_connections_queue()
+        if self._minsize:
+            await self._initialize_connections()
+
+    def _initialize_connections_queue(self) -> None:
         self._queue = asyncio.LifoQueue(maxsize=self._maxsize)
         for _ in range(self._maxsize):
-            ch = PoolConnectionHolder(
-                self,
+            ch = self._connection_holder_class(
+                pool=self,
+                setup=self._setup,
                 max_queries=self._max_queries,
                 max_inactive_time=self._max_inactive_connection_lifetime,
-                setup=self._setup)
-
+            )
             self._holders.append(ch)
             self._queue.put_nowait(ch)
 
-        if self._minsize:
-            # Since we use a LIFO queue, the first items in the queue will be
-            # the last ones in `self._holders`.  We want to pre-connect the
-            # first few connections in the queue, therefore we want to walk
-            # `self._holders` in reverse.
-
-            # Connect the first connection holder in the queue so that
-            # any connection issues are visible early.
-            first_ch = self._holders[-1]  # type: PoolConnectionHolder
-            await first_ch.connect()
-
-            if self._minsize > 1:
-                connect_tasks = []
-                for i, ch in enumerate(reversed(self._holders[:-1])):
-                    # `minsize - 1` because we already have first_ch
-                    if i >= self._minsize - 1:
-                        break
-                    connect_tasks.append(ch.connect())
-
-                await asyncio.gather(*connect_tasks)
+
+    async def _initialize_connections(self) -> None:
+
+        if not self._minsize:
+            raise exceptions.InterfaceError(
+                'pool is already initialized with min_size > 0')
+
+        # Since we use a LIFO queue, the first items in the queue will be
+        # the last ones in `self._holders`.  We want to pre-connect the
+        # first few connections in the queue, therefore we want to walk
+        # `self._holders` in reverse.
+
+        # Connect the first connection holder in the queue so that
+        # any connection issues are visible early.
+        first_ch = self._holders[-1]  # type: PoolConnectionHolder
+        await first_ch.connect()
+
+        if self._minsize > 1:
+            connect_tasks = []
+            for i, ch in enumerate(reversed(self._holders[:-1])):
+                # `minsize - 1` because we already have first_ch
+                if i >= self._minsize - 1:
+                    break
+                connect_tasks.append(ch.connect())
+
+            await asyncio.gather(*connect_tasks)
 
     def is_closing(self):
         """Return ``True`` if the pool is closing or is closed.
@@ -1083,6 +1097,7 @@ def create_pool(dsn=None, *,
                 reset=None,
                 loop=None,
                 connection_class=connection.Connection,
+                connection_holder_class: typing.Type[PoolConnectionHolder] = PoolConnectionHolder,
                 record_class=protocol.Record,
                 **connect_kwargs):
     r"""Create a connection pool.
@@ -1142,6 +1157,11 @@ def create_pool(dsn=None, *,
         The class to use for connections.  Must be a subclass of
         :class:`~asyncpg.connection.Connection`.
 
+    :param PoolConnectionHolder connection_holder_class:
+        The class to use for connection holders. This class is used
+        to manage the connection lifecycle in the pool. Must be a subclass of
+        :class:`~asyncpg.pool.PoolConnectionHolder`
+
     :param type record_class:
         If specified, the class to use for records returned by queries on
         the connections in this pool.  Must be a subclass of
@@ -1230,10 +1250,14 @@ def create_pool(dsn=None, *,
 
     .. versionchanged:: 0.30.0
        Added the *connect* and *reset* parameters.
+
+    .. versionchanged:: 0.31.0
+       Added the *pool_connection_holder_class* parameter.
     """
     return Pool(
         dsn,
         connection_class=connection_class,
+        connection_holder_class=connection_holder_class,
         record_class=record_class,
         min_size=min_size,
         max_size=max_size,