ThreadPoolExecutorを拡張してスレッドプールを使いつつもMDCを引き回すようなExecutorを作ってみる

今、Java並行処理プログラミング読んでいて ThreadPoolExecutorというかAbstractExecutorServiceのnewTaskForというメソッドを知ったので 使ってみます。

今回の記事では、ThreadPoolExecutorを拡張して MDC、Mapped Diagnostic Contextを引き渡しながら、タスクを実行するようなコードを書いてみます。

はじめに

ThreadPoolExecutorはExecutorやExecutorServiceの実装となっています。 Java並列処理プログラミングでは、ThreadPoolExecutorは8-3で詳細な紹介がされていますので、そちらを読むと分かりやすいです。 また、8-4でThreadPoolExecutorをbeforeExecute, afterExecuteなどを使って拡張する話が書かれています。

AbstractExecutorServiceのnewTaskForについて

Java並列処理プログラミングでは、ThreadPoolExecutorを紹介する中で、newTaskForの話が出てきます。 6-3-2では、APIの紹介のみに留まり、7-1-7では、タスクの標準的でないキャンセルをカプセル化する、という話の中で newTaskForの利用について書かれていました。

MDCを引き渡しながらタスクを実行するようなコードを書いてみる

その前に

今回の記事で使われる、動作を確認するためのコードです。 今回はログに出力される値で動作を見ます。 本当はテストコードとしてアサーションなどを追加したほうが良いと思います。

   @Test
    public void test1() throws InterruptedException, BrokenBarrierException {
        final MDCPropagationThreadPoolExecutor executor = new MDCPropagationThreadPoolExecutor(
            4, 4, 0, TimeUnit.MINUTES, new LinkedBlockingQueue<>());
        final CyclicBarrier barrier = new CyclicBarrier(2);
        MDC.put("nest", "1");
        executor.submit(() -> {
            // print nest: 1
            log.info("nest1: {}", MDC.get("nest"));
            MDC.put("nest", "2");
            executor.execute(() -> log.info("nest2: {}", MDC.get("nest"))); // print nest: 2
            
            MDC.put("nest", "3");
            executor.submit(() -> log.info("nest3: {}", MDC.get("nest"))); // print nest: 3
            try {
                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                // ignore
            }
        });
        
        barrier.await();
        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.MINUTES);
    }

1. newTaskForだけオーバーライドしてみる

まずは、newTaskForを純粋にオーバーライドして、受け取ったcallable, runnableをwrapするコードにしてみましょう。 (テストコードにstaticにネストしたクラスに実装します。)

   static class MDCPropagationThreadPoolExecutor extends ThreadPoolExecutor {
        
        // 記事のために最初の方に持ってきた
        @Override
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return super.newTaskFor(wrap(callable));
        }
        
        // 記事のために最初の方に持ってきた
        @Override
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return super.newTaskFor(wrap(runnable), value);
        }

        // 以後省略
        public MDCPropagationThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
        
        // 以後省略
        public MDCPropagationThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        }
        
        // 以後省略
        public MDCPropagationThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        }
        
        // 以後省略
        public MDCPropagationThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
            RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        }
        
        // 以後省略
        static <T> Callable<T> wrap(Callable<T> callable) {
            log.info("wrap"); // TODO remove. 動作確認のため
            final Map<String, String> contextMap = MDC.getCopyOfContextMap();
            return () -> {
                final Map<String, String> beforeContext = MDC.getCopyOfContextMap();
                if (contextMap != null) {
                    MDC.setContextMap(contextMap);
                } else {
                    MDC.clear();
                }
                try {
                    return callable.call();
                } finally {
                    if (beforeContext == null && contextMap != null) {
                        MDC.clear();
                    } else if (beforeContext != null) {
                        MDC.setContextMap(beforeContext);
                    }
                }
            };
        }
        
        // 以後省略
        static Runnable wrap(Runnable runnable) {
            log.info("wrap"); // TODO remove. 動作確認のため
            final Map<String, String> contextMap = MDC.getCopyOfContextMap();
            return () -> {
                final Map<String, String> beforeContext = MDC.getCopyOfContextMap();
                if (contextMap == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(contextMap);
                }
                try {
                    runnable.run();
                } finally {
                    if (beforeContext == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(beforeContext);
                    }
                }
            };
        }
    }

動かしてみたら、こんな感じになりました。

2019/05/12 18:12:42.238 [main] INFO  ThreadPoolExecutorTest:119 - wrap
2019/05/12 18:12:42.241 [pool-1-thread-1] INFO  ThreadPoolExecutorTest:31 - nest1: 1
2019/05/12 18:12:42.245 [pool-1-thread-1] INFO  ThreadPoolExecutorTest:119 - wrap
2019/05/12 18:12:42.245 [pool-1-thread-2] INFO  ThreadPoolExecutorTest:33 - nest2: null
2019/05/12 18:12:42.245 [pool-1-thread-3] INFO  ThreadPoolExecutorTest:36 - nest3: 3

nest2ではMDCに2が入ってほしかった気持ちになりました。 これは、なぜでしょうか?

6-3-2の紹介では、こう書かれています。 「Java 6では、依頼されたCallableやRunnableのためにFutureのインスタンスを作る時、ExecutorServiceの実装クラスがAbstractExecutorServiceのnewTaskForをオーバーライドしてその作り方を制御できます。 デフォルトの実装は新しい、FutureTaskを作るだけです。」

と書かれています。 ここをしっかり読んでいればわかった落とし穴がここにあります。 executeを呼んだ場合は、newTaskForは呼ばれないようですね。

2. executeもオーバーライドしてみる

ここで自分は6-3-2の記述をしっかり読んでいなかったので、executeをオーバーライドして、以下のような形にしました。

       @Override
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return super.newTaskFor(wrap(callable));
        }
        
        @Override
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return super.newTaskFor(wrap(runnable), value);
        }

        @Override
        public void execute(Runnable command) {
            super.execute(wrap(command));
        }

もう1回テストコードを実行してみると、こうなりました。 wrapが2回呼ばれています・・・。これもまた、意図した通りではありません。

2019/05/12 18:59:54.304 [main] INFO  ThreadPoolExecutorTest:133 - wrap
2019/05/12 18:59:54.307 [main] INFO  ThreadPoolExecutorTest:133 - wrap
2019/05/12 18:59:54.307 [pool-1-thread-1] INFO  ThreadPoolExecutorTest:31 - nest1: 1
2019/05/12 18:59:54.310 [pool-1-thread-1] INFO  ThreadPoolExecutorTest:133 - wrap
2019/05/12 18:59:54.310 [pool-1-thread-2] INFO  ThreadPoolExecutorTest:33 - nest2: 2
2019/05/12 18:59:54.311 [pool-1-thread-1] INFO  ThreadPoolExecutorTest:133 - wrap
2019/05/12 18:59:54.311 [pool-1-thread-1] INFO  ThreadPoolExecutorTest:133 - wrap
2019/05/12 18:59:54.311 [pool-1-thread-3] INFO  ThreadPoolExecutorTest:36 - nest3: 3

これは、なぜかというと、newTaskForで生成された、RunnableFutureはexecuteメソッドを使って実行されるからですね。 そうすると、newTaskForで1回wrapしたRunnableに対して、wrapする形になります。

必要のないwrapはやりたくないので、見直します。

3. newTaskForで自前のクラスを使いつつ、executeで分岐してみる

今度はRunnableFutureの実装を用意しつつ、executeをinstanceOfで分岐してみます。

       @Override
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new CallableWrapper<>(callable);
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new CallableWrapper<>(runnable, value);
        }

        @Override
        public void execute(Runnable command) {
            if (command instanceof CallableWrapper) {
                super.execute(command);
            } else {
                super.execute(wrap(command));
            }
        }
        
        // 本来はFutureTaskに移譲するRunnableFutureの実装が良さそうなんですが
        // ここでは簡略化のために継承を使っています。
        static class CallableWrapper<T> extends FutureTask<T> {
            
            CallableWrapper(Callable<T> callable) {
                super(wrap(callable));
            }
            
            CallableWrapper(Runnable runnable, T value) {
                super(wrap(runnable), value);
            }
            
        }

動かしてみたら、動作は正しく動いていそうです。 ひと安心ですね。

と、思いきや、このコードはあまりお行儀がよくありません。なぜでしょうか。 色々考えながら、中の実装を読んでいて気づきました。 実はもっとシンプルに実装できます。

4. 最終的な実装: executeでwrapするだけ

newTaskForの挙動を確かめたかったのに、最終的に使わなくなってしまいました。 invokeXXX系のメソッドのオーバーライドも必要ありません。

       static class MDCPropagationThreadPoolExecutor extends ThreadPoolExecutor {
        
        public MDCPropagationThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
        
        public MDCPropagationThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        }
        
        public MDCPropagationThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        }
        
        public MDCPropagationThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
            RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        }
        
        @Override
        public void execute(Runnable command) {
                super.execute(wrap(command));
        }
        
        static Runnable wrap(Runnable runnable) {
            final Map<String, String> contextMap = MDC.getCopyOfContextMap();
            return () -> {
                final Map<String, String> beforeContext = MDC.getCopyOfContextMap();
                if (contextMap == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(contextMap);
                }
                try {
                    runnable.run();
                } finally {
                    if (beforeContext == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(beforeContext);
                    }
                }
            };
        }
    }

まとめ

今回の記事では、AbstractExecutorServiceのnewTaskForを知ったので ThreadPoolExecutorを拡張して、MDCをPropagationするようなコードを書いてみようとしました。 最終的には、newTaskForを使うことなく、非常にシンプルな実装で、やりたいことが出来ました。

しかし、現在のThreadPoolExecutor実装を拡張して実装する場合はexecuteをオーバーライドを実装するだけで出来ましたが 他の実装では、その他のメソッドを実装しないといけないことには留意しないといけないです。 仕様としては、executeに移譲されることは保証されていないので。

終わり。

Java並行処理プログラミングは良い本ですね。 コアなAPIの紹介がされていて、勉強になりました。

やっと、11章まで読んだので 最後まで読んでいきましょう。