ThreadPoolExecutorを拡張してスレッドプールを使いつつもMDCを引き回すようなExecutorを作ってみる

今、Java並行処理プログラミング読んでいて ThreadPoolExecutorというかAbstractExecutorServiceのnewTaskForというメソッドを知ったので 使ってみます。

今回の記事では、ThreadPoolExecutorを拡張して MDC、Mapped Diagnostic Contextを引き渡しながら、タスクを実行するようなコードを書いてみます。

はじめに

ThreadPoolExecutorはExecutorやExecutorServiceの実装となっています。 Java並列処理プログラミングでは、ThreadPoolExecutorは8-3で詳細な紹介がされていますので、そちらを読むと分かりやすいです。 また、8-4でThreadPoolExecutorをbeforeExecute, afterExecuteなどを使って拡張する話が書かれています。

AbstractExecutorServiceのnewTaskForについて

Java並列処理プログラミングでは、ThreadPoolExecutorを紹介する中で、newTaskForの話が出てきます。 6-3-2では、APIの紹介のみに留まり、7-1-7では、タスクの標準的でないキャンセルをカプセル化する、という話の中で newTaskForの利用について書かれていました。

MDCを引き渡しながらタスクを実行するようなコードを書いてみる

その前に

今回の記事で使われる、動作を確認するためのコードです。 今回はログに出力される値で動作を見ます。 本当はテストコードとしてアサーションなどを追加したほうが良いと思います。

   @Test
    public void test1() throws InterruptedException, BrokenBarrierException {
        final MDCPropagationThreadPoolExecutor executor = new MDCPropagationThreadPoolExecutor(
            4, 4, 0, TimeUnit.MINUTES, new LinkedBlockingQueue<>());
        final CyclicBarrier barrier = new CyclicBarrier(2);
        MDC.put("nest", "1");
        executor.submit(() -> {
            // print nest: 1
            log.info("nest1: {}", MDC.get("nest"));
            MDC.put("nest", "2");
            executor.execute(() -> log.info("nest2: {}", MDC.get("nest"))); // print nest: 2
            
            MDC.put("nest", "3");
            executor.submit(() -> log.info("nest3: {}", MDC.get("nest"))); // print nest: 3
            try {
                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                // ignore
            }
        });
        
        barrier.await();
        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.MINUTES);
    }

1. newTaskForだけオーバーライドしてみる

まずは、newTaskForを純粋にオーバーライドして、受け取ったcallable, runnableをwrapするコードにしてみましょう。 (テストコードにstaticにネストしたクラスに実装します。)

   static class MDCPropagationThreadPoolExecutor extends ThreadPoolExecutor {
        
        // 記事のために最初の方に持ってきた
        @Override
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return super.newTaskFor(wrap(callable));
        }
        
        // 記事のために最初の方に持ってきた
        @Override
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return super.newTaskFor(wrap(runnable), value);
        }

        // 以後省略
        public MDCPropagationThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
        
        // 以後省略
        public MDCPropagationThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        }
        
        // 以後省略
        public MDCPropagationThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        }
        
        // 以後省略
        public MDCPropagationThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
            RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        }
        
        // 以後省略
        static <T> Callable<T> wrap(Callable<T> callable) {
            log.info("wrap"); // TODO remove. 動作確認のため
            final Map<String, String> contextMap = MDC.getCopyOfContextMap();
            return () -> {
                final Map<String, String> beforeContext = MDC.getCopyOfContextMap();
                if (contextMap != null) {
                    MDC.setContextMap(contextMap);
                } else {
                    MDC.clear();
                }
                try {
                    return callable.call();
                } finally {
                    if (beforeContext == null && contextMap != null) {
                        MDC.clear();
                    } else if (beforeContext != null) {
                        MDC.setContextMap(beforeContext);
                    }
                }
            };
        }
        
        // 以後省略
        static Runnable wrap(Runnable runnable) {
            log.info("wrap"); // TODO remove. 動作確認のため
            final Map<String, String> contextMap = MDC.getCopyOfContextMap();
            return () -> {
                final Map<String, String> beforeContext = MDC.getCopyOfContextMap();
                if (contextMap == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(contextMap);
                }
                try {
                    runnable.run();
                } finally {
                    if (beforeContext == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(beforeContext);
                    }
                }
            };
        }
    }

動かしてみたら、こんな感じになりました。

2019/05/12 18:12:42.238 [main] INFO  ThreadPoolExecutorTest:119 - wrap
2019/05/12 18:12:42.241 [pool-1-thread-1] INFO  ThreadPoolExecutorTest:31 - nest1: 1
2019/05/12 18:12:42.245 [pool-1-thread-1] INFO  ThreadPoolExecutorTest:119 - wrap
2019/05/12 18:12:42.245 [pool-1-thread-2] INFO  ThreadPoolExecutorTest:33 - nest2: null
2019/05/12 18:12:42.245 [pool-1-thread-3] INFO  ThreadPoolExecutorTest:36 - nest3: 3

nest2ではMDCに2が入ってほしかった気持ちになりました。 これは、なぜでしょうか?

6-3-2の紹介では、こう書かれています。 「Java 6では、依頼されたCallableやRunnableのためにFutureのインスタンスを作る時、ExecutorServiceの実装クラスがAbstractExecutorServiceのnewTaskForをオーバーライドしてその作り方を制御できます。 デフォルトの実装は新しい、FutureTaskを作るだけです。」

と書かれています。 ここをしっかり読んでいればわかった落とし穴がここにあります。 executeを呼んだ場合は、newTaskForは呼ばれないようですね。

2. executeもオーバーライドしてみる

ここで自分は6-3-2の記述をしっかり読んでいなかったので、executeをオーバーライドして、以下のような形にしました。

       @Override
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return super.newTaskFor(wrap(callable));
        }
        
        @Override
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return super.newTaskFor(wrap(runnable), value);
        }

        @Override
        public void execute(Runnable command) {
            super.execute(wrap(command));
        }

もう1回テストコードを実行してみると、こうなりました。 wrapが2回呼ばれています・・・。これもまた、意図した通りではありません。

2019/05/12 18:59:54.304 [main] INFO  ThreadPoolExecutorTest:133 - wrap
2019/05/12 18:59:54.307 [main] INFO  ThreadPoolExecutorTest:133 - wrap
2019/05/12 18:59:54.307 [pool-1-thread-1] INFO  ThreadPoolExecutorTest:31 - nest1: 1
2019/05/12 18:59:54.310 [pool-1-thread-1] INFO  ThreadPoolExecutorTest:133 - wrap
2019/05/12 18:59:54.310 [pool-1-thread-2] INFO  ThreadPoolExecutorTest:33 - nest2: 2
2019/05/12 18:59:54.311 [pool-1-thread-1] INFO  ThreadPoolExecutorTest:133 - wrap
2019/05/12 18:59:54.311 [pool-1-thread-1] INFO  ThreadPoolExecutorTest:133 - wrap
2019/05/12 18:59:54.311 [pool-1-thread-3] INFO  ThreadPoolExecutorTest:36 - nest3: 3

これは、なぜかというと、newTaskForで生成された、RunnableFutureはexecuteメソッドを使って実行されるからですね。 そうすると、newTaskForで1回wrapしたRunnableに対して、wrapする形になります。

必要のないwrapはやりたくないので、見直します。

3. newTaskForで自前のクラスを使いつつ、executeで分岐してみる

今度はRunnableFutureの実装を用意しつつ、executeをinstanceOfで分岐してみます。

       @Override
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new CallableWrapper<>(callable);
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new CallableWrapper<>(runnable, value);
        }

        @Override
        public void execute(Runnable command) {
            if (command instanceof CallableWrapper) {
                super.execute(command);
            } else {
                super.execute(wrap(command));
            }
        }
        
        // 本来はFutureTaskに移譲するRunnableFutureの実装が良さそうなんですが
        // ここでは簡略化のために継承を使っています。
        static class CallableWrapper<T> extends FutureTask<T> {
            
            CallableWrapper(Callable<T> callable) {
                super(wrap(callable));
            }
            
            CallableWrapper(Runnable runnable, T value) {
                super(wrap(runnable), value);
            }
            
        }

動かしてみたら、動作は正しく動いていそうです。 ひと安心ですね。

と、思いきや、このコードはあまりお行儀がよくありません。なぜでしょうか。 色々考えながら、中の実装を読んでいて気づきました。 実はもっとシンプルに実装できます。

4. 最終的な実装: executeでwrapするだけ

newTaskForの挙動を確かめたかったのに、最終的に使わなくなってしまいました。 invokeXXX系のメソッドのオーバーライドも必要ありません。

       static class MDCPropagationThreadPoolExecutor extends ThreadPoolExecutor {
        
        public MDCPropagationThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
        
        public MDCPropagationThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        }
        
        public MDCPropagationThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        }
        
        public MDCPropagationThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
            RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        }
        
        @Override
        public void execute(Runnable command) {
                super.execute(wrap(command));
        }
        
        static Runnable wrap(Runnable runnable) {
            final Map<String, String> contextMap = MDC.getCopyOfContextMap();
            return () -> {
                final Map<String, String> beforeContext = MDC.getCopyOfContextMap();
                if (contextMap == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(contextMap);
                }
                try {
                    runnable.run();
                } finally {
                    if (beforeContext == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(beforeContext);
                    }
                }
            };
        }
    }

まとめ

今回の記事では、AbstractExecutorServiceのnewTaskForを知ったので ThreadPoolExecutorを拡張して、MDCをPropagationするようなコードを書いてみようとしました。 最終的には、newTaskForを使うことなく、非常にシンプルな実装で、やりたいことが出来ました。

しかし、現在のThreadPoolExecutor実装を拡張して実装する場合はexecuteをオーバーライドを実装するだけで出来ましたが 他の実装では、その他のメソッドを実装しないといけないことには留意しないといけないです。 仕様としては、executeに移譲されることは保証されていないので。

終わり。

Java並行処理プログラミングは良い本ですね。 コアなAPIの紹介がされていて、勉強になりました。

やっと、11章まで読んだので 最後まで読んでいきましょう。

Gatlingを使った負荷試験を継続的に回すためのビルドやセットアップ周りの話

負荷試験は大事なのは皆さんご存知だと思いますが すぐにセットアップ出来る負荷試験ツールもあると良いですね。 この記事では、自分がやっている負荷試験の方法を紹介しておきます。 負荷試験にはGatlingを使っています。

この記事は以下の流れで説明していきます。

  • Gatlingを動かす環境についての概要
  • 負荷試験のシミュレーションをビルドしてパッケージしてから使っている
  • Gatlingの設定をいい感じにするために、Typesafe Configを使っている
  • まとめ

負荷環境について

現在はGatlingはAWSで動かすアプリケーションに負荷を掛けるために使っています。 また、Kinesisを使ったミドルウェア負荷試験のセットアップなどにも使っています。

ざっくりした図です。

f:id:reteria:20190417230745p:plain
負荷試験環境とその周辺

PackerでカーネルパラメータやJavaのインストール、unboundのインストールなどをやっています。 また、負荷試験をパッケージしたデータもこのタイミングでAMIに入れています。 今は割と頻繁に負荷試験を書き換えて実行することが多いので、このタイミングで入れなくても良いかなぁと思ったりしています。

Terraformでは、Packerで作ったAMIを元に Autoscaling Groupを使って負荷を掛けるEC2を起動しています。 Autoscaling Groupを使ってEC2を立てる理由としては、EC2が1台では負荷が掛けられない時に 楽に負荷を増やしたいときに使います。

Ansibleは、負荷試験で使うファイルをコピーするために使っています。

ツールが3種類も出てきて整理したい気持ちは少しあるんですが 色々弄ってこの形に落ち着いています。

負荷試験のセットアップ周りについては直せそうかなぁと思っているのは以下の点です。

  • PackerでJavaをインストールしているのをAnsibleに移して依存関係を明確にしたい
  • Spotインスタンスを使うことで料金を減らしたい

どんどん良くしていきたいですね。

負荷試験のシミュレーションをビルドしてパッケージしてから使っている

タイトル通りなんですが、負荷試験のシミュレーションをビルドしてパッケージしてから使っています。 理由としてはざっくり以下の2つです。

  1. 先にビルドしておくと、エラーもなく何事も無かったかのようにコンパイルが終了し、負荷試験が見つからない、というエラーを吐く状態が回避できる
  2. 負荷試験のセットアップで外部のライブラリを使いたくなった時に楽

1の「先にビルドしておくと、エラーもなく何事も無かったかのようにコンパイルが終了し、負荷試験が見つからない、というエラーを吐く状態が回避できる」について説明します。 Gatlingでは、simulation-folderなどに負荷試験Scalaコードを配置することで コンパイルした後、負荷試験として動かす機能があります。

このScalaコンパイルするのはZincと呼ばれるコンパイラだそうなのですが 負荷試験で使う共通クラスを使うと、何もエラーが出力されずに 負荷試験が見つからない、といったエラーを吐いて 負荷試験コンパイルに失敗する例に遭遇しました。

2の「負荷試験のセットアップで外部のライブラリを使いたくなった時に楽」について説明します。 今回自分たちが作っているアプリケーションはAWSの上で動いており、S3やSQS、Kinesisなどを使っています。 ある負荷試験のシナリオで、AWS SDKを使って負荷試験の前処理をやる必要が発生しました。 zipになっている Gatling を使う場合、library を置くフォルダに自分でコピーする必要があります。 しかし、この場合、依存関係をしっかり把握していないと、何をコピーするのか分からず、大変です。 推移依存がある場合などを考えた時に、手元でパッケージングするのが正解だと思いました。

そのため、sbt-packを利用して負荷試験のシナリオ群をパッケージして使っています。 sbt-packでは、ライブラリとその起動用のシェルに加えて、依存関係をtarget配下に纏めてくれる機能があります

以下のような設定を書いておくと、 JVM_OPTIONSを設定しつつ、起動クラスを指定した起動用のシェルが生成されます。

enablePlugins(GatlingPlugin)
enablePlugins(PackPlugin)

// 省略

packMain := Map("gatling" -> "io.gatling.app.Gatling")
// gatlingの起動シェルをベースに設定している
packJvmOpts := Map("gatling" -> Seq(
  "-Xms2G", "-Xmx2G",
  "-XX:+UseG1GC",

  "-verbose:gc", "-XX:+PrintGCDetails", "-XX:+PrintGCDateStamps",
  "-Xloggc:gclog/gc_%t_%p.log",
  "-XX:+HeapDumpOnOutOfMemoryError",
  "-XX:+ExitOnOutOfMemoryError",
  "-XX:HeapDumpPath=heapdump/",
  "-XX:ErrorFile=error/",
  "-XX:+UseGCLogFileRotation",
  "-XX:NumberOfGCLogFiles=10",
  "-XX:GCLogFileSize=100M",

  "-XX:InitiatingHeapOccupancyPercent=75",
  "-XX:+ParallelRefProcEnabled",
  "-XX:+PerfDisableSharedMem",
  "-XX:+AggressiveOpts",
  "-XX:+OptimizeStringConcat",
  "-XX:+HeapDumpOnOutOfMemoryError",
  "-Dsun.net.inetaddr.ttl=60",
  "-Djava.net.preferIPv4Stack=true",
  "-Djava.net.preferIPv6Addresses=false"))

こんな感じで現在はsbt-packで負荷試験をパッケージングしています。

もうちょっと改善出来るなぁと思う点としては以下の点です。

  • ローカルでビルドしているのを、CIでコミットごとにビルドして、負荷試験をS3などにリリースする
  • 負荷試験を利用するときは、S3からインストールする

上記内容を適用すると、ローカルでちょこっと弄ってすぐに負荷掛けたい、といった要求には答えられません。 しかし、現在は自動で負荷テストをする、みたいなことはやっていないので 今の状態でも良いとは思っています。

Gatlingの設定をいい感じにするために、Typesafe Configを使っている

負荷試験をやっている時に柔軟に設定を弄って負荷を掛けたい時があります。 例えば実行時間やユーザ数、あるいは環境のエンドポイント名など色々あります。 そのため、設定ファイルのフォーマットにTypesafe Configを採用しました。

採用した理由としては以下のものです。

  • 人間が読みやすい期間の指定が出来るので、負荷を掛ける時間の設定が楽
  • 変数の参照が出来る
  • 構造化した形で設定を記述できる
# 人間が読みやすい期間の指定ができる
duration = 1hours
# 変数の参照が出来る
foo.endpoint = "https://foo."${env}".example.com"

env = "test"

# 構造化した形で設定を書ける
oauth {
  AccessTokenSimulation {
    duration = 1minutes
  }
}

型変換なども、ライブラリ側に任せられるので、非常に楽ですね。

実際には、以下のようなコードを書いて、Typesafe Configで設定を取り扱っています。

abstract class BaseSimulation extends Simulation {
  val properties = configFile

  def configFile: Config = {
    List(
      Try {
        // 環境変数から設定ファイルを指定できる。
        val path = System.getenv("CONFIG")
        ConfigFactory.parseFileAnySyntax(Paths.get(path).toFile)
      },
      Try {
        // カレントディレクトリにある、environment.{conf, json, yaml}が使える
        val wd = System.getProperty("user.dir");
        ConfigFactory.parseFileAnySyntax(Paths.get(wd, "environment").toFile)
      },
      // クラスパスに入っている、environment.{conf, json, yaml}が使える
      // 主に、負荷試験のデフォルト値として使っています。
      Try(ConfigFactory.parseResourcesAnySyntax("environment")),
      // 環境変数からも設定できる
      Try(ConfigFactory.systemEnvironment()),
      // システムプロパティからも設定できる
      Try(ConfigFactory.systemProperties()),
    )
      .filter(_.isSuccess)
      .foldLeft(ConfigFactory.empty()) {
        (a, b) => a.withFallback(b.get)
      }.resolve()
  }

  // このフィールドは負荷試験シナリオ固有の設定を使うときに使っています。
  // FQCNでネストした形で設定を掛けるようになっています。
  lazy val own = properties.getConfig(this.getClass.getName)
}

使い方のイメージとしては以下のようなイメージです。

class FooSimulation extends BaseSimulation {
  // ...省略

  val scn = scenario("シナリオ名")
    .exec(http("アクション名")
    .post("/foo")))

  setUp(scn.inject(
    rampUsersPerSec(own.getInt("from")) to own.getInt("to") // fromからtoに掛けてユーザを増やしていく形で負荷を掛ける
      during own.getDuration("duration") // durationの間
  ).protocols(httpConf))
}

ここで一つ問題があって、Typesafe ConfigのgetDurationはjava.time.Durationを返してきます。 しかし、Gatlingのduringは、scala.concurrent.duration.FiniteDurationを要求してきます。

そのため、先程紹介したコードの中に以下のような implicit conversionを用意して 楽に書けるようにしています。

abstract class BaseSimulation extends Simulation {

  // java.time.DurationからScalaのFiniteDurationにimplicit conversionする関数
  implicit def asFinite(d: java.time.Duration): FiniteDuration = Duration.fromNanos(d.toNanos)

  // ...省略
}

まとめ

負荷試験を継続的にまわしていくためには、負荷試験のセットアップを簡単しなければいけません。 そのために、今回はインフラのセットアップや負荷試験ツールのセットアップまでの自動化を目指して 実際に使っているものをベースにリポジトリを公開しました。

少し癖のあるところもありますが、参考にしてみてもらえたら幸いです。

負荷試験をやっていきましょう。

リポジトリこちらです。

今回はちょっと雑に書きました。

GradleのプロジェクトでIntellij IDEAで使われるコンパイラの引数をbuild.gradleに設定したい

その要求に答えてくれるのがこの JetBrainsが出している、gradle-idea-ext-pluginです。

idea {
  project {
    settings {
      compiler {
        javac {
          javacAdditionalOptions "-Amapstruct.defaultComponentModel=spring -Amapstruct.unmappedTargetPolicy=ERROR"
        }
      }
    }
  }
}

こんな感じで設定できます。 この例は MapStruct を使うときに設定した例です。

ちなみに、生成したソースを参照するために以下の設定を追加しました。

idea {
  module {
    sourceDirs += project.file('out/production/classes/generated')
    generatedSourceDirs += project.file('out/production/classes/generated')
    testSourceDirs += project.file('out/production/classes/generated_tests')
  }
}

参考: 逆引きマニュアル: IntelliJ IDEAでAnnotation Processorsを使用する方法 | ikemo memo

なんか動いてるっぽい。