読者です 読者をやめる 読者になる 読者になる

will and way

ただの自分用メモを人に伝える形式で書くことでわかりやすくまとめてるはずのブログ

マルチスレッドプログラミングの基本

最近、並行処理ではまった所をメモ
CountDownLatchの使い方が曖昧だったのでそこも兼ねて書いた

AbstractCounter

package jp.co.matsuokah.counter;

import java.util.concurrent.atomic.AtomicInteger;
/**
 *
 */
public abstract class AbstractCounter implements Counter {
	protected AtomicInteger counter = new AtomicInteger();

	/** limit到達時のハンドリング */
	protected void limitHandle() throws Exception {
		throw new Exception("reached the limit.");
	}

	/** limitに達しているかの検出 */
	protected void isLimiting() throws Exception {
		if (counter.get() >= LIMIT) {
			limitHandle();
		}
	}
}

SafeCounter

package jp.co.matsuokah.counter;

import jp.co.matsuokah.counter.AbstractCounter;

/**
 * スレッドセーフなカウンター
 */
public class SafeCounter extends AbstractCounter {

	private final Object lock = new Object();

	@Override
	public void countUp() throws Exception {
		synchronized (lock) {
			isLimiting();
			counter.incrementAndGet();
		}
	}

	@Override
	public int getCount() {
		return counter.get();
	}
}

UnsafeCounter

package jp.co.matsuokah.counter;

/**
 * スレッドセーフではないカウンター
 */
public class UnsafeCounter extends AbstractCounter {

	@Override
	public void countUp() throws Exception {
		isLimiting();
		counter.incrementAndGet();
	}

	@Override
	public int getCount() {
		return counter.get();
	}
}

Tester

package jp.co.matsuokah;

import jp.co.matsuokah.counter.Counter;
import java.util.concurrent.CountDownLatch;

/**
 * テストクラス
 */
public class Tester {
	public void test(final Counter counter) throws InterruptedException {

		final int threadNum = 1000;
		final CountDownLatch startSignal = new CountDownLatch(1);
		final CountDownLatch endSignal = new CountDownLatch(threadNum);
		long timestamp = System.currentTimeMillis();

		for (int i = 0; i < threadNum; i++) {
			new Thread(new Runnable() {
				@Override
				public void run() {
					try {
						startSignal.await();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}

					// threadNum回分countUpが失敗する
					for (int j = 0; j <= (Counter.LIMIT / threadNum); j++) {
						try {
							counter.countUp();
						} catch (Exception e) {
						}
					}

					endSignal.countDown();
				}
			}).start();
		}
		startSignal.countDown(); //すべてのスレッドの処理をスタートさせる
		endSignal.await(); //すべてのスレッドの処理の終了を待つ

		System.out.println(counter.getClass() + ": " + counter.getCount());
		System.out.println("実行時間: " + String.valueOf(System.currentTimeMillis() - timestamp));
	}
}

Main

package jp.co.matsuokah;

import jp.co.matsuokah.counter.SafeCounter;
import jp.co.matsuokah.counter.UnsafeCounter;

/**
 *
 */
public class Main {
	public static void main(String[] args) throws InterruptedException {
		Tester tester = new Tester();
		tester.test(new SafeCounter());
		tester.test(new UnsafeCounter());
	}
}

実行結果

class jp.co.matsuokah.counter.SafeCounter: 1000000
実行時間: 259
class jp.co.matsuokah.counter.UnsafeCounter: 1000000
実行時間: 173

class jp.co.matsuokah.counter.SafeCounter: 1000000
実行時間: 270
class jp.co.matsuokah.counter.UnsafeCounter: 1000002
実行時間: 178

class jp.co.matsuokah.counter.SafeCounter: 1000000
実行時間: 231
class jp.co.matsuokah.counter.UnsafeCounter: 1000000
実行時間: 171

class jp.co.matsuokah.counter.SafeCounter: 1000000
実行時間: 226
class jp.co.matsuokah.counter.UnsafeCounter: 1000001
実行時間: 170

isLimitingとcountUpの協調

countUpとisLimitingはスレッド間の共有オブジェクトであるcounterにアクセスしていて、
同時にisLimitingメソッドが実行されてしまい、2つ以上のスレッドがそこを通り抜けて、インクリメントされてしまうために、予期せぬ振る舞いをしてしまう。

countUpとisLimitingは1つのトランザクションとして考えるべきで、1つのスレッドがカウントアップする間に他のスレッドでcounterの値に依存した処理をさせない事がスレッドセーフへの道筋。

呼び出し元でgetCountとcountUpが協調するロジックを実装しているならgetCountもsynchronized(lock)する必要がある。

https://github.com/matsuokah/ThreadSafeCounter