diff --git a/jmh-core-it/src/test/java/org/openjdk/jmh/it/threads/BenchmarkBenchSameThreadTest.java b/jmh-core-it/src/test/java/org/openjdk/jmh/it/threads/BenchmarkBenchSameThreadTest.java index 7dd16578d..62c5de6d7 100644 --- a/jmh-core-it/src/test/java/org/openjdk/jmh/it/threads/BenchmarkBenchSameThreadTest.java +++ b/jmh-core-it/src/test/java/org/openjdk/jmh/it/threads/BenchmarkBenchSameThreadTest.java @@ -32,6 +32,7 @@ import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; @@ -44,17 +45,40 @@ import org.openjdk.jmh.runner.options.Options; import org.openjdk.jmh.runner.options.OptionsBuilder; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ForkJoinWorkerThread; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; /** * Tests if harness executes setup, run, and tearDown in the same workers. + * Also tests if all workers are platform or virtual threads. */ @State(Scope.Benchmark) public class BenchmarkBenchSameThreadTest { + public enum ExecutorType { + CACHED_TPE, FIXED_TPE, VIRTUAL_TPE, FJP, CUSTOM + + } + + @Param("FIXED_TPE") + ExecutorType benchmarkExecutorType; + private final Set<Thread> setupRunThread = Collections.synchronizedSet(new HashSet<>()); private final Set<Thread> setupIterationThread = Collections.synchronizedSet(new HashSet<>()); private final Set<Thread> setupInvocationThread = Collections.synchronizedSet(new HashSet<>()); @@ -96,12 +120,41 @@ public void tearDownInvocation() { @TearDown(Level.Trial) public void teardownZZZ() { // should perform last Assert.assertFalse("Test sanity", testInvocationThread.isEmpty()); - Assert.assertTrue("test <: setupRun", testInvocationThread.containsAll(setupRunThread)); - Assert.assertTrue("test <: setupIteration", testInvocationThread.containsAll(setupIterationThread)); - Assert.assertTrue("test <: setupInvocation", testInvocationThread.containsAll(setupInvocationThread)); - Assert.assertTrue("test <: teardownRun", testInvocationThread.containsAll(teardownRunThread)); - Assert.assertTrue("test <: teardownIteration", testInvocationThread.containsAll(teardownIterationThread)); - Assert.assertTrue("test <: teardownInvocation", testInvocationThread.containsAll(teardownInvocationThread)); + if (benchmarkExecutorType == ExecutorType.FIXED_TPE || + benchmarkExecutorType == ExecutorType.VIRTUAL_TPE) { // only fixed and virtual guarantee same thread rule + Assert.assertTrue("test <: setupRun", testInvocationThread.containsAll(setupRunThread)); + Assert.assertTrue("test <: setupIteration", testInvocationThread.containsAll(setupIterationThread)); + Assert.assertTrue("test <: setupInvocation", testInvocationThread.containsAll(setupInvocationThread)); + Assert.assertTrue("test <: teardownRun", testInvocationThread.containsAll(teardownRunThread)); + Assert.assertTrue("test <: teardownIteration", testInvocationThread.containsAll(teardownIterationThread)); + Assert.assertTrue("test <: teardownInvocation", testInvocationThread.containsAll(teardownInvocationThread)); + } + if (benchmarkExecutorType == ExecutorType.VIRTUAL_TPE) { + Assert.assertTrue("setupRun thread kind", setupRunThread.stream().allMatch(VirtualAPI::isVirtual)); + Assert.assertTrue("setupIteration thread kind", setupIterationThread.stream().allMatch(VirtualAPI::isVirtual)); + Assert.assertTrue("setupInvocation thread kind", setupInvocationThread.stream().allMatch(VirtualAPI::isVirtual)); + Assert.assertTrue("teardownRun thread kind", teardownRunThread.stream().allMatch(VirtualAPI::isVirtual)); + Assert.assertTrue("teardownIteration thread kind", teardownIterationThread.stream().allMatch(VirtualAPI::isVirtual)); + Assert.assertTrue("teardownInvocation thread kind", teardownInvocationThread.stream().allMatch(VirtualAPI::isVirtual)); + Assert.assertTrue("testInvocation thread kind", testInvocationThread.stream().allMatch(VirtualAPI::isVirtual)); + } else { + Assert.assertTrue("setupRun thread kind", setupRunThread.stream().noneMatch(VirtualAPI::isVirtual)); + Assert.assertTrue("setupIteration thread kind", setupIterationThread.stream().noneMatch(VirtualAPI::isVirtual)); + Assert.assertTrue("setupInvocation thread kind", setupInvocationThread.stream().noneMatch(VirtualAPI::isVirtual)); + Assert.assertTrue("teardownRun thread kind", teardownRunThread.stream().noneMatch(VirtualAPI::isVirtual)); + Assert.assertTrue("teardownIteration thread kind", teardownIterationThread.stream().noneMatch(VirtualAPI::isVirtual)); + Assert.assertTrue("teardownInvocation thread kind", teardownInvocationThread.stream().noneMatch(VirtualAPI::isVirtual)); + Assert.assertTrue("testInvocation thread kind", testInvocationThread.stream().noneMatch(VirtualAPI::isVirtual)); + } + if (benchmarkExecutorType == ExecutorType.FJP) { + Assert.assertTrue("setupRun thread kind", setupRunThread.stream().allMatch(t -> t instanceof ForkJoinWorkerThread)); + Assert.assertTrue("setupIteration thread kind", setupIterationThread.stream().allMatch(t -> t instanceof ForkJoinWorkerThread)); + Assert.assertTrue("setupInvocation thread kind", setupInvocationThread.stream().allMatch(t -> t instanceof ForkJoinWorkerThread)); + Assert.assertTrue("teardownRun thread kind", teardownRunThread.stream().allMatch(t -> t instanceof ForkJoinWorkerThread)); + Assert.assertTrue("teardownIteration thread kind", teardownIterationThread.stream().allMatch(t -> t instanceof ForkJoinWorkerThread)); + Assert.assertTrue("teardownInvocation thread kind", teardownInvocationThread.stream().allMatch(t -> t instanceof ForkJoinWorkerThread)); + Assert.assertTrue("testInvocation thread kind", testInvocationThread.stream().allMatch(t -> t instanceof ForkJoinWorkerThread)); + } } @Benchmark @@ -116,14 +169,201 @@ public void test() { } @Test - public void invokeAPI() throws RunnerException { + public void invokeAPI_default() throws RunnerException { + for (int c = 0; c < Fixtures.repetitionCount(); c++) { + Options opt = new OptionsBuilder() + .include(Fixtures.getTestMask(this.getClass())) + .shouldFailOnError(true) + .build(); + new Runner(opt).run(); + } + } + + @Test + public void invokeAPI_fixed() throws RunnerException { + for (int c = 0; c < Fixtures.repetitionCount(); c++) { + Options opt = new OptionsBuilder() + .include(Fixtures.getTestMask(this.getClass())) + .jvmArgsAppend("-Djmh.executor=FIXED_TPE") + .param("benchmarkExecutorType", "FIXED_TPE") + .shouldFailOnError(true) + .build(); + new Runner(opt).run(); + } + } + + @Test + public void invokeAPI_cached() throws RunnerException { for (int c = 0; c < Fixtures.repetitionCount(); c++) { Options opt = new OptionsBuilder() .include(Fixtures.getTestMask(this.getClass())) + .jvmArgsAppend("-Djmh.executor=CACHED_TPE") + .param("benchmarkExecutorType", "CACHED_TPE") .shouldFailOnError(true) .build(); new Runner(opt).run(); } } + @Test + public void invokeAPI_fjp() throws RunnerException { + for (int c = 0; c < Fixtures.repetitionCount(); c++) { + Options opt = new OptionsBuilder() + .include(Fixtures.getTestMask(this.getClass())) + .jvmArgsAppend("-Djmh.executor=FJP") + .param("benchmarkExecutorType", "FJP") + .shouldFailOnError(true) + .build(); + new Runner(opt).run(); + } + } + + @Test + public void invokeAPI_custom() throws RunnerException { + for (int c = 0; c < Fixtures.repetitionCount(); c++) { + Options opt = new OptionsBuilder() + .include(Fixtures.getTestMask(this.getClass())) + .jvmArgsAppend("-Djmh.executor=CUSTOM") + .jvmArgsAppend("-Djmh.executor.class=" + CustomExecutor.class.getName()) + .param("benchmarkExecutorType", "CUSTOM") + .shouldFailOnError(true) + .build(); + new Runner(opt).run(); + } + } + + @Test + public void invokeAPI_virtual() throws RunnerException { + if(VirtualAPI.hasVirtualThreads()) { + for (int c = 0; c < Fixtures.repetitionCount(); c++) { + Options opt = new OptionsBuilder() + .include(Fixtures.getTestMask(this.getClass())) + .jvmArgsAppend("-Djmh.executor=VIRTUAL_TPE") + .param("benchmarkExecutorType", "VIRTUAL_TPE") + .shouldFailOnError(true) + .build(); + new Runner(opt).run(); + } + } + } + + public static class VirtualAPI { + // provide access to new Threads API via reflection + + private static final Method IS_VIRTUAL = getIsVirtual(); + + private static Method getIsVirtual() { + try { + Method m = Class.forName("java.lang.Thread").getMethod("isVirtual"); + m.invoke(Thread.currentThread()); + // isVirtual check is not enough, have to check running virtual thread + Method start = Class.forName("java.lang.Thread").getMethod("startVirtualThread", Runnable.class); + start.invoke(null, (Runnable) (() -> {})); + return m; + } catch (NoSuchMethodException | ClassNotFoundException | InvocationTargetException | + IllegalAccessException e) { + return null; + } + } + + public static boolean hasVirtualThreads() { + return IS_VIRTUAL != null; + } + + public static boolean isVirtual(Thread t) { + if (!hasVirtualThreads()) { + return false; + } + try { + return (boolean) IS_VIRTUAL.invoke(t); + } catch (IllegalAccessException | InvocationTargetException e) { + return false; + } + } + } + + static class CustomExecutor implements ExecutorService { + private final ExecutorService e; + + public CustomExecutor(int maxThreads, String prefix) { + e = Executors.newFixedThreadPool(maxThreads, new CustomThreadFactory(prefix)); + } + + public void execute(Runnable command) { + e.execute(command); + } + + public void shutdown() { + e.shutdown(); + } + + public List<Runnable> shutdownNow() { + return e.shutdownNow(); + } + + public boolean isShutdown() { + return e.isShutdown(); + } + + public boolean isTerminated() { + return e.isTerminated(); + } + + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return e.awaitTermination(timeout, unit); + } + + public Future<?> submit(Runnable task) { + return e.submit(task); + } + + public <T> Future<T> submit(Callable<T> task) { + return e.submit(task); + } + + public <T> Future<T> submit(Runnable task, T result) { + return e.submit(task, result); + } + + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { + return e.invokeAll(tasks); + } + + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { + return e.invokeAll(tasks, timeout, unit); + } + + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { + return e.invokeAny(tasks); + } + + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return e.invokeAny(tasks, timeout, unit); + } + } + + static class CustomThreadFactory implements ThreadFactory { + + private final AtomicInteger counter; + private final String prefix; + + public CustomThreadFactory(String prefix) { + this.counter = new AtomicInteger(); + this.prefix = prefix; + } + + @Override + public Thread newThread(Runnable r) { + CustomThread t = new CustomThread(r, prefix + "-jmh-worker-" + counter.incrementAndGet()); + t.setDaemon(true); + return t; + } + } + + static class CustomThread extends Thread { + public CustomThread(Runnable r, String name) { + super(r, name); + } + } + } diff --git a/jmh-core/src/main/java/org/openjdk/jmh/runner/BaseRunner.java b/jmh-core/src/main/java/org/openjdk/jmh/runner/BaseRunner.java index 0f2642a05..7bbe5c6f1 100644 --- a/jmh-core/src/main/java/org/openjdk/jmh/runner/BaseRunner.java +++ b/jmh-core/src/main/java/org/openjdk/jmh/runner/BaseRunner.java @@ -247,6 +247,7 @@ protected void runBenchmark(BenchmarkParams benchParams, BenchmarkHandler handle long allWarmup = 0; long allMeasurement = 0; + boolean isFirstIteration = true; // warmup IterationParams wp = benchParams.getWarmup(); @@ -258,8 +259,9 @@ protected void runBenchmark(BenchmarkParams benchParams, BenchmarkHandler handle out.iteration(benchParams, wp, i); boolean isLastIteration = (benchParams.getMeasurement().getCount() == 0); - IterationResult ir = handler.runIteration(benchParams, wp, isLastIteration); + IterationResult ir = handler.runIteration(benchParams, wp, isFirstIteration, isLastIteration); out.iterationResult(benchParams, wp, i, ir); + isFirstIteration = false; allWarmup += ir.getMetadata().getAllOps(); } @@ -278,8 +280,9 @@ protected void runBenchmark(BenchmarkParams benchParams, BenchmarkHandler handle out.iteration(benchParams, mp, i); boolean isLastIteration = (i == mp.getCount()); - IterationResult ir = handler.runIteration(benchParams, mp, isLastIteration); + IterationResult ir = handler.runIteration(benchParams, mp, isFirstIteration, isLastIteration); out.iterationResult(benchParams, mp, i, ir); + isFirstIteration = false; allMeasurement += ir.getMetadata().getAllOps(); diff --git a/jmh-core/src/main/java/org/openjdk/jmh/runner/BenchmarkHandler.java b/jmh-core/src/main/java/org/openjdk/jmh/runner/BenchmarkHandler.java index 055a10b0e..60cc51dda 100644 --- a/jmh-core/src/main/java/org/openjdk/jmh/runner/BenchmarkHandler.java +++ b/jmh-core/src/main/java/org/openjdk/jmh/runner/BenchmarkHandler.java @@ -55,6 +55,9 @@ class BenchmarkHandler { private final ConcurrentMap<Thread, WorkerData> workerData; private final BlockingQueue<ThreadParams> tps; + private final BlockingQueue<WorkerData> orphanedWorkerData; + + private final CyclicBarrier wdBarrier; private final OutputFormat out; private final List<InternalProfiler> profilers; @@ -74,14 +77,18 @@ public BenchmarkHandler(OutputFormat out, Options options, BenchmarkParams execu profilersRev = new ArrayList<>(profilers); Collections.reverse(profilersRev); - tps = new ArrayBlockingQueue<>(executionParams.getThreads()); - tps.addAll(distributeThreads(executionParams.getThreads(), executionParams.getThreadGroups())); + int threads = executionParams.getThreads(); + wdBarrier = new CyclicBarrier(threads, this::adoptWorkerData); + + orphanedWorkerData = new ArrayBlockingQueue<>(threads); + tps = new ArrayBlockingQueue<>(threads); + tps.addAll(distributeThreads(threads, executionParams.getThreadGroups())); workerData = new ConcurrentHashMap<>(); this.out = out; try { - executor = EXECUTOR_TYPE.createExecutor(executionParams.getThreads(), executionParams.getBenchmark()); + executor = EXECUTOR_TYPE.createExecutor(threads, executionParams.getBenchmark()); } catch (Exception e) { throw new IllegalStateException(e); } @@ -308,7 +315,7 @@ public void shutdown() { * @param last Should this iteration considered to be the last * @return IterationResult */ - public IterationResult runIteration(BenchmarkParams benchmarkParams, IterationParams params, boolean last) { + public IterationResult runIteration(BenchmarkParams benchmarkParams, IterationParams params, boolean first, boolean last) { int numThreads = benchmarkParams.getThreads(); TimeValue runtime = params.getTime(); @@ -319,7 +326,7 @@ public IterationResult runIteration(BenchmarkParams benchmarkParams, IterationPa List<Result> iterationResults = new ArrayList<>(); InfraControl control = new InfraControl(benchmarkParams, params, - preSetupBarrier, preTearDownBarrier, last, + preSetupBarrier, preTearDownBarrier, first, last, new Control()); // preparing the worker runnables @@ -437,12 +444,12 @@ public IterationResult runIteration(BenchmarkParams benchmarkParams, IterationPa return result; } - private WorkerData newWorkerData(Thread worker) { - WorkerData wd = workerData.get(worker); - if (wd != null) { - return wd; - } + private void adoptWorkerData() { + orphanedWorkerData.addAll(workerData.values()); + workerData.clear(); + } + private WorkerData newWorkerData(Thread worker) { try { Object o = clazz.getConstructor().newInstance(); ThreadParams t = tps.poll(); @@ -450,7 +457,7 @@ private WorkerData newWorkerData(Thread worker) { throw new IllegalStateException("Cannot get another thread params"); } - wd = new WorkerData(o, t); + WorkerData wd = new WorkerData(o, t); WorkerData exist = workerData.put(worker, wd); if (exist != null) { throw new IllegalStateException("Duplicate thread data"); @@ -462,6 +469,26 @@ private WorkerData newWorkerData(Thread worker) { } } + private WorkerData findWorkerData(Thread worker) { + WorkerData wd = workerData.remove(worker); + try { + wdBarrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + throw new RuntimeException("Worker data barrier error ", e); + } + if (wd == null) { + wd = orphanedWorkerData.poll(); + if (wd == null) { + throw new IllegalStateException("Cannot get another thread working data"); + } + } + WorkerData exist = workerData.put(worker, wd); + if (exist != null) { + throw new IllegalStateException("Duplicate thread data"); + } + return wd; + } + /** * Worker body. */ @@ -480,7 +507,7 @@ public BenchmarkTaskResult call() throws Exception { runner = Thread.currentThread(); // poll the current data, or instantiate in this thread, if needed - WorkerData wd = newWorkerData(runner); + WorkerData wd = control.isFirstIteration() ? newWorkerData(runner) : findWorkerData(runner); return (BenchmarkTaskResult) method.invoke(wd.instance, control, wd.params); } catch (Throwable e) { diff --git a/jmh-core/src/main/java/org/openjdk/jmh/runner/InfraControl.java b/jmh-core/src/main/java/org/openjdk/jmh/runner/InfraControl.java index 8bc6e019a..ad542e73c 100644 --- a/jmh-core/src/main/java/org/openjdk/jmh/runner/InfraControl.java +++ b/jmh-core/src/main/java/org/openjdk/jmh/runner/InfraControl.java @@ -56,9 +56,9 @@ public class InfraControl extends InfraControlL4 { } public InfraControl(BenchmarkParams benchmarkParams, IterationParams iterationParams, - CountDownLatch preSetup, CountDownLatch preTearDown, boolean lastIteration, + CountDownLatch preSetup, CountDownLatch preTearDown, boolean firstIteration, boolean lastIteration, Control notifyControl) { - super(benchmarkParams, iterationParams, preSetup, preTearDown, lastIteration, notifyControl); + super(benchmarkParams, iterationParams, preSetup, preTearDown, firstIteration, lastIteration, notifyControl); } /** @@ -110,6 +110,10 @@ public void preTearDownForce() { preTearDown.countDown(); } + public boolean isFirstIteration() { + return firstIteration; + } + public boolean isLastIteration() { return lastIteration; } @@ -161,6 +165,7 @@ abstract class InfraControlL2 extends InfraControlL1 { public final CountDownLatch preSetup; public final CountDownLatch preTearDown; + public final boolean firstIteration; public final boolean lastIteration; public final AtomicInteger warmupVisited, warmdownVisited; @@ -175,7 +180,7 @@ abstract class InfraControlL2 extends InfraControlL1 { private final int threads; public InfraControlL2(BenchmarkParams benchmarkParams, IterationParams iterationParams, - CountDownLatch preSetup, CountDownLatch preTearDown, boolean lastIteration, + CountDownLatch preSetup, CountDownLatch preTearDown, boolean firstIteration, boolean lastIteration, Control notifyControl) { warmupVisited = new AtomicInteger(); warmdownVisited = new AtomicInteger(); @@ -193,6 +198,7 @@ public InfraControlL2(BenchmarkParams benchmarkParams, IterationParams iteration this.preSetup = preSetup; this.preTearDown = preTearDown; + this.firstIteration = firstIteration; this.lastIteration = lastIteration; this.benchmarkParams = benchmarkParams; this.iterationParams = iterationParams; @@ -274,9 +280,9 @@ abstract class InfraControlL3 extends InfraControlL2 { private boolean q171, q172, q173, q174, q175, q176, q177, q178; public InfraControlL3(BenchmarkParams benchmarkParams, IterationParams iterationParams, - CountDownLatch preSetup, CountDownLatch preTearDown, boolean lastIteration, + CountDownLatch preSetup, CountDownLatch preTearDown, boolean firstIteration, boolean lastIteration, Control notifyControl) { - super(benchmarkParams, iterationParams, preSetup, preTearDown, lastIteration, notifyControl); + super(benchmarkParams, iterationParams, preSetup, preTearDown, firstIteration, lastIteration, notifyControl); } } @@ -284,9 +290,9 @@ abstract class InfraControlL4 extends InfraControlL3 { private int markerEnd; public InfraControlL4(BenchmarkParams benchmarkParams, IterationParams iterationParams, - CountDownLatch preSetup, CountDownLatch preTearDown, boolean lastIteration, + CountDownLatch preSetup, CountDownLatch preTearDown, boolean firstIteration, boolean lastIteration, Control notifyControl) { - super(benchmarkParams, iterationParams, preSetup, preTearDown, lastIteration, notifyControl); + super(benchmarkParams, iterationParams, preSetup, preTearDown, firstIteration, lastIteration, notifyControl); } }