Concurrency-LimitsとOkHttpのIntegrationを書いてみた

Netflix/concurrency-limits とOkHttp3のIntegrationを書いた。 このライブラリの強みはNetflixの人が書いた、mediumの記事リポジトリのREADMEを読んでほしい。

GrpcのClientのInterceptor周りを参考に書いた。

以下コード。

public class OkHttpClientLimiterBuilder extends
  AbstractPartitionedLimiter.Builder<OkHttpClientLimiterBuilder, OkhttpClientRequestContext> {

  private boolean blockOnLimit = false;

  public OkHttpClientLimiterBuilder partitionByHeaderName(String headerName) {
    return partitionResolver(context -> context.request().header(headerName));
  }

  public OkHttpClientLimiterBuilder partitionByHost() {
    return partitionResolver(context -> context.request().url().host());
  }

  public <T> OkHttpClientLimiterBuilder blockOnLimit(boolean blockOnLimit) {
    this.blockOnLimit = blockOnLimit;
    return this;
  }

  @Override
  protected OkHttpClientLimiterBuilder self() {
    return this;
  }

  public Limiter<OkhttpClientRequestContext> build() {
    Limiter<OkhttpClientRequestContext> limiter = super.build();

    if (blockOnLimit) {
      limiter = BlockingLimiter.wrap(limiter);
    }
    return limiter;
  }
}
public class OkHttpClientLimitInterceptor implements Interceptor {
  private final Limiter<OkhttpClientRequestContext> contextLimiter;

  public OkHttpClientLimitInterceptor(
    Limiter<OkhttpClientRequestContext> contextLimiter) {
    this.contextLimiter = contextLimiter;
  }

  @Override
  public Response intercept(Chain chain) throws IOException {
    OkhttpClientRequestContext context = new OkhttpClientRequestContext(chain.request());
    Optional<Limiter.Listener> listerOpt = contextLimiter.acquire(context);
    if (listerOpt.isPresent()) {
      Limiter.Listener listener = listerOpt.get();
      try {
        Response response = chain.proceed(chain.request());
        if (response.isSuccessful()) {
          listener.onSuccess();
        } else if (response.code() == 503) {
          listener.onDropped();
        } else {
          listener.onIgnore();
        }

        return response;
      } catch (IOException e) {
        listener.onIgnore();
        throw e;
      }
    } else {
      return new Response.Builder()
        .code(503)
        .protocol(Protocol.HTTP_1_1) // dummy
        .request(chain.request())
        .message("Client concurrency limit reached")
        .body(ResponseBody.create(null, new byte[0]))
        .build();
    }
  }
}
public class OkhttpClientRequestContext { // このクラス、参考元のコードはinterfaceだったが、classにして楽をした。

  private final Request request;

  public OkhttpClientRequestContext(Request request) {
    this.request = request;
  }

  Request request() {
    return request;
  }
}

余談

NetflixはConcurrency-Limitsとは別にResillience4jを入れようとしているらしいが このConcurrency-Limitsだけで良いのでは?と思ったんだけど Circuit Breakerとは性質が少し異なるのかな?ちょっとその辺が分からない。 即応性はCircuit Breakerのほうが高い気もするが。 流量制御はConcurrency Limitsで良いのかもしれない。 この辺詳しい人居たら教えてほしい。