Skip to content

Fixed retry and duplicated replacements #3015

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ object Context {
userAgent <- Resource.eval(F.fromEither(`User-Agent`.parse(1)(userAgentString)))
middleware = ClientConfiguration
.setUserAgent[F](userAgent)
.andThen(ClientConfiguration.retryAfter[F](maxAttempts = 5))
.andThen(ClientConfiguration.withRetry[F](maxAttempts = 5))
defaultClient <- ClientConfiguration.build(
ClientConfiguration.BuilderMiddleware.default,
middleware
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import eu.timepit.refined.auto._
import eu.timepit.refined.types.numeric.PosInt
import java.net.http.HttpClient
import java.net.http.HttpClient.Builder
import org.http4s.Response
import org.http4s.{Headers, Response}
import org.http4s.client._
import org.http4s.headers.`User-Agent`
import org.http4s.jdkhttpclient.JdkHttpClient
import org.typelevel.ci._
import java.time.Instant
import scala.concurrent.duration._

object ClientConfiguration {
Expand Down Expand Up @@ -65,20 +66,38 @@ object ClientConfiguration {

private val RetryAfterStatuses = Set(403, 429, 503)

implicit class FunctionExtender[T, U](f: T => Option[U]) {
def orElse(g: T => Option[U])(t: T): Option[U] = f(t).orElse(g(t))
}

private val retryAfter: Headers => Option[Int] =
_.get(ci"Retry-After").flatMap(_.head.value.toIntOption)

private val retryWaitMax60Seconds: Long => Int = {
case i if i > 0 && i < 60 => i.toInt
case i if i > 60 => 60
case _ => 1
}

private val rateLimitReset: Headers => Option[Int] = _.get(ci"X-Ratelimit-Reset")
.flatMap(_.head.value.toLongOption)
.map(_ - Instant.now.getEpochSecond)
.map(retryWaitMax60Seconds)

private val retryInterval: Headers => Option[Int] = retryAfter.orElse(rateLimitReset)

/** @param maxAttempts
* max number times the HTTP request should be sent useful to avoid unexpected cloud provider
* costs
*/
def retryAfter[F[_]: Temporal](maxAttempts: PosInt = 5): Middleware[F] = { client =>
def withRetry[F[_]: Temporal](maxAttempts: PosInt = 5): Middleware[F] = { client =>
Client[F] { req =>
def run(attempt: Int = 1): Resource[F, Response[F]] = client
.run(req.putHeaders("X-Attempt" -> attempt.toString))
.flatMap { response =>
val maybeRetried = for {
header <- response.headers.get(ci"Retry-After")
seconds <- header.head.value.toIntOption
if seconds > 0
duration = seconds.seconds
interval <- retryInterval(response.headers)
duration = interval.seconds
if RetryAfterStatuses.contains(response.status.code)
if attempt < maxAttempts.value
} yield Resource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,30 @@ import cats.implicits._
import eu.timepit.refined.auto._
import eu.timepit.refined.types.numeric.PosInt
import munit.CatsEffectSuite
import org.http4s.HttpRoutes
import org.http4s._
import org.http4s.client._
import org.http4s.client.dsl.io._
import org.http4s.dsl.io._
import org.http4s.ember.server.EmberServerBuilder
import org.http4s.headers.{`Retry-After`, `User-Agent`, Location}
import org.http4s.implicits._
import org.typelevel.ci._
import org.typelevel.log4cats.LoggerFactory
import org.typelevel.log4cats.slf4j.Slf4jFactory

import java.time.Instant
import scala.concurrent.duration._
import scala.util.Try

class ClientConfigurationTest extends CatsEffectSuite {

private val previousEpochSecond = Instant.now().minusSeconds(1).getEpochSecond
private val nextEpochSecond = Instant.now().plusSeconds(1).getEpochSecond
private val userAgentValue = "my-user-agent"
private val dummyUserAgent =
`User-Agent`.parse(1)(userAgentValue).getOrElse(fail("unable to create user agent"))

private val routes: HttpRoutes[IO] = {
import org.http4s.dsl.io._
private val routes: HttpRoutes[IO] =
HttpRoutes.of[IO] {
case req @ GET -> Root / "user-agent" =>
req.headers.get(ci"user-agent") match {
Expand All @@ -48,15 +54,22 @@ class ClientConfigurationTest extends CatsEffectSuite {
case Some(attempt) if attempt >= 2 =>
Ok()
case _ =>
Forbidden().map(_.putHeaders(`Retry-After`.fromLong(1)))
val resetHeader =
ParseResult.success(Header.Raw(ci"X-Ratelimit-Reset", s"$nextEpochSecond"))
Forbidden().map(_.putHeaders(`Retry-After`.fromLong(1), resetHeader))
}
case req @ GET -> Root / "rate-limit-reset" / epochSecondsParam =>
req.headers.get(ci"X-Attempt").flatMap(_.head.value.toIntOption) match {
case Some(attempt) if attempt >= 2 => Ok()
case _ =>
val seconds = Try(epochSecondsParam.toLong).getOrElse(0L)
val resetHeader =
ParseResult.success(Header.Raw(ci"X-Ratelimit-Reset", seconds.toString))
Forbidden().map(_.putHeaders(resetHeader))
}
}
}

test("setUserAgent add a specific user agent to requests") {
import org.http4s.Method._
import org.http4s.client.dsl.io._

val initialClient = Client.fromHttpApp[IO](routes.orNotFound)
val setUserAgent = ClientConfiguration.setUserAgent[IO](dummyUserAgent)
val newClient = setUserAgent(initialClient)
Expand Down Expand Up @@ -106,26 +119,32 @@ class ClientConfigurationTest extends CatsEffectSuite {
test.assertEquals((400, 302))
}

test("retries on retry-after response header") {
import org.http4s.Method._
import org.http4s.client.dsl.io._
test("retries on retry-after response header even though 'X-Ratelimit-Reset' exists") {
val notEnoughRetries = request(uri"/retry-after", 1).assertEquals(403)
val exactlyEnoughRetries = request(uri"/retry-after", 2).assertEquals(200)
notEnoughRetries.flatMap(_ => exactlyEnoughRetries)
}

def clientWithMaxAttempts(maxAttempts: PosInt): Client[IO] = {
val initialClient = Client.fromHttpApp[IO](routes.orNotFound)
val retryAfter = ClientConfiguration.retryAfter[IO](maxAttempts)
retryAfter(initialClient)
}
test("retries with the value mentioned in 'X-Ratelimit-Reset' in the absense of 'Retry-After'") {
val uri = Uri.unsafeFromString(s"/rate-limit-reset/$nextEpochSecond")
val notEnoughRetries = request(uri, 1).assertEquals(403)
val exactlyEnoughRetries = request(uri, 2).assertEquals(200)
notEnoughRetries.flatMap(_ => exactlyEnoughRetries)
}

val notEnoughRetries = clientWithMaxAttempts(1)
.run(GET(uri"/retry-after"))
.use(r => r.status.code.pure[IO])
.assertEquals(403)
test("retries after 1 second when the given value in 'X-Ratelimit-Reset' elapsed") {
val uri: Uri = Uri.unsafeFromString(s"/rate-limit-reset/$previousEpochSecond")
val notEnoughRetries = request(uri, 1).assertEquals(403)
val exactlyEnoughRetries = request(uri, 2).assertEquals(200)
notEnoughRetries.flatMap(_ => exactlyEnoughRetries)
}

val exactlyEnoughRetries = clientWithMaxAttempts(2)
.run(GET(uri"/retry-after"))
.use(r => r.status.code.pure[IO])
.assertEquals(200)
private def request(uri: Uri, attempts: PosInt): IO[Int] =
clientWithMaxAttempts(attempts).run(GET(uri)).use(r => r.status.code.pure[IO])

notEnoughRetries.flatMap(_ => exactlyEnoughRetries)
private def clientWithMaxAttempts(maxAttempts: PosInt): Client[IO] = {
val initialClient = Client.fromHttpApp[IO](routes.orNotFound)
val withRetry = ClientConfiguration.withRetry[IO](maxAttempts)
withRetry(initialClient)
}
}
Loading