59
59
* beyond the {@code close} method until all threads started to execute subtasks have finished.
60
60
* To ensure correct usage, the {@code fork}, {@code join} and {@code close} methods may
61
61
* only be invoked by the <em>owner thread</em> (the thread that opened the {@code
62
- * StructuredTaskScope}), and the {@code close} method throws an exception after closing
63
- * if the owner did not invoke the {@code join} method.
62
+ * StructuredTaskScope}), the {@code fork} method may not be called after {@code join},
63
+ * the {@code join} method may only be invoked once, and the {@code close} method throws
64
+ * an exception after closing if the owner did not invoke the {@code join} method after
65
+ * forking subtasks.
64
66
*
65
67
* <p> As a first example, consider a main task that splits into two subtasks to concurrently
66
68
* fetch resources from two URL locations "left" and "right". Both subtasks may complete
89
91
* completes and the main task uses the {@link Subtask#get() Subtask.get()} method to get
90
92
* the result of each subtask. If one of the subtasks fails then the other subtask
91
93
* is cancelled (this will interrupt the thread executing the other subtask) and the
92
- * {@code join} method throws {@link ExecutionException } with the exception from
94
+ * {@code join} method throws {@link FailedException } with the exception from
93
95
* the failed subtask as the {@linkplain Throwable#getCause() cause}.
94
96
*
95
97
* <p> A {@code StructuredTaskScope} may be opened with a {@link Joiner} that handles subtask
102
104
* require the result of subtasks that are still executing. Cancelling execution prevents
103
105
* new threads from being started to execute further subtasks, {@linkplain Thread#interrupt()
104
106
* interrupts} the threads executing subtasks that have not completed, and causes the
105
- * {@code join} method to wakeup with a result (or exception). The {@link #close() close}
106
- * method always waits for threads executing subtasks to finish, even if execution is
107
- * cancelled, so it cannot continue beyond the {@code close} method until the interrupted
108
- * threads finish. Subtasks should be coded so that they finish as soon as possible when
109
- * interrupted. Subtasks that do not respond to interrupt, e.g. block on methods that are
110
- * not interruptible, may delay the closing of a task scope indefinitely.
107
+ * {@code join} method to wakeup with a result (or exception). In the above example,
108
+ * the no-arg {@link #open() open} method created the {@code StructuredTaskScope} with a
109
+ * {@code Joiner} that cancelled execution when any subtask failed.
110
+ *
111
+ * <p> The {@link #close() close} method always waits for threads executing subtasks to
112
+ * finish, even if execution is cancelled, so it cannot continue beyond the {@code close}
113
+ * method until the interrupted threads finish. Subtasks should be coded so that they
114
+ * finish as soon as possible when interrupted. Subtasks that do not respond to interrupt,
115
+ * e.g. block on methods that are not interruptible, may delay the closing of a task scope
116
+ * indefinitely.
111
117
*
112
118
* <p> Now consider another example that also splits into two subtasks to concurrently
113
119
* fetch resources. In this example, the code in the main task is only interested in the
136
142
* subtask to be cancelled (this will interrupt the thread executing the subtask), and
137
143
* the {@code join} method returns the result from the first subtask. Cancelling the other
138
144
* subtask avoids the main task waiting for a result that it doesn't care about. If both
139
- * subtasks fail then the {@code join} method throws {@link ExecutionException } with the
145
+ * subtasks fail then the {@code join} method throws {@link FailedException } with the
140
146
* exception from one of the subtasks as the {@linkplain Throwable#getCause() cause}.
141
147
*
142
148
* <p> Whether code uses the {@code Subtask} returned from {@code fork} will depend on
152
158
* handles subtask completion and produces the outcome for the {@link #join() join} method.
153
159
* In some cases, the outcome will be a result, in other cases it will be an exception.
154
160
* If the outcome is an exception then the {@code join} method throws {@link
155
- * ExecutionException } with the exception as the {@linkplain Throwable#getCause()
161
+ * FailedException } with the exception as the {@linkplain Throwable#getCause()
156
162
* cause}. For many {@code Joiner} implementations, the exception will be an exception
157
163
* thrown by a subtask that failed. In the case of {@link Joiner#allSuccessfulOrThrow()
158
164
* allSuccessfulOrThrow} and {@link Joiner#awaitAllSuccessfulOrThrow() awaitAllSuccessfulOrThrow}
159
165
* for example, the exception is from the first subtask to fail.
160
166
*
161
167
* <p> Many of the details for how exceptions are handled will depend on usage. In some
162
- * cases, the {@code join} method will be called in a {@code try-catch} block to catch
163
- * {@code ExecutionException} and handle the cause. The exception handling may use
164
- * {@code instanceof} with pattern matching to handle specific causes. In some cases it
165
- * may not be useful to catch {@code ExecutionException} but instead leave it to propagate
166
- * to the configured {@linkplain Thread.UncaughtExceptionHandler uncaught exception handler}
167
- * for logging purposes.
168
+ * cases it may be useful t add a {@code catch} block to catch {@code FailedException}.
169
+ * The exception handling may use {@code instanceof} with pattern matching to handle
170
+ * specific causes.
171
+ * {@snippet lang=java :
172
+ * try (var scope = StructuredTaskScope.open()) {
173
+ *
174
+ * ..
175
+ *
176
+ * } catch (StructuredTaskScope.FailedException e) {
177
+ *
178
+ * Throwable cause = e.getCause();
179
+ * switch (cause) {
180
+ * case IOException ioe -> ..
181
+ * default -> ..
182
+ * }
183
+ *
184
+ * }
185
+ * }
186
+ * In some cases it may not be useful to catch {@code FailedException} but instead leave
187
+ * it to propagate to the configured {@linkplain Thread.UncaughtExceptionHandler uncaught
188
+ * exception handler} for logging purposes.
168
189
*
169
190
* <p> For cases where a specific exception triggers the use of a default result then it
170
191
* may be more appropriate to handle this in the subtask itself rather than the subtask
212
233
* starts when the new task scope is opened. If the timeout expires before the {@code join}
213
234
* method has completed then <a href="#CancelExecution">execution is cancelled</a>. This
214
235
* interrupts the threads executing the two subtasks and causes the {@link #join() join}
215
- * method to throw {@link ExecutionException } with {@link TimeoutException} as the cause.
236
+ * method to throw {@link FailedException } with {@link TimeoutException} as the cause.
216
237
* {@snippet lang=java :
217
238
* Duration timeout = Duration.ofSeconds(10);
218
239
*
@@ -337,10 +358,15 @@ public class StructuredTaskScope<T, R> implements AutoCloseable {
337
358
private final ThreadFactory threadFactory ;
338
359
private final ThreadFlock flock ;
339
360
340
- // fields that are only accessed by owner thread
341
- private boolean needToJoin ; // set by fork to indicate that owner must join
342
- private boolean joined ; // set to true when owner joins
343
- private boolean closed ;
361
+ // state, only accessed by owner thread
362
+ private static final int ST_NEW = 0 ;
363
+ private static final int ST_FORKED = 1 ; // subtasks forked, need to join
364
+ private static final int ST_JOIN_STARTED = 2 ; // join started, can no longer fork
365
+ private static final int ST_JOIN_COMPLETED = 3 ; // join completed
366
+ private static final int ST_CLOSED = 4 ; // closed
367
+ private int state ;
368
+
369
+ // timer task, only accessed by owner thread
344
370
private Future <?> timerTask ;
345
371
346
372
// set or read by any thread
@@ -349,22 +375,32 @@ public class StructuredTaskScope<T, R> implements AutoCloseable {
349
375
// set by the timer thread, read by the owner thread
350
376
private volatile boolean timeoutExpired ;
351
377
378
+ /**
379
+ * Throws WrongThreadException if the current thread is not the owner thread.
380
+ */
381
+ private void ensureOwner () {
382
+ if (Thread .currentThread () != flock .owner ()) {
383
+ throw new WrongThreadException ("Current thread not owner" );
384
+ }
385
+ }
386
+
352
387
/**
353
388
* Throws IllegalStateException if the task scope is closed.
354
389
*/
355
390
private void ensureOpen () {
356
391
assert Thread .currentThread () == flock .owner ();
357
- if (closed ) {
392
+ if (state == ST_CLOSED ) {
358
393
throw new IllegalStateException ("Task scope is closed" );
359
394
}
360
395
}
361
396
362
397
/**
363
- * Throws WrongThreadException if the current thread is not the owner thread .
398
+ * Throws IllegalStateException if the already joined or task scope is closed .
364
399
*/
365
- private void ensureOwner () {
366
- if (Thread .currentThread () != flock .owner ()) {
367
- throw new WrongThreadException ("Current thread not owner" );
400
+ private void ensureNotJoined () {
401
+ assert Thread .currentThread () == flock .owner ();
402
+ if (state > ST_FORKED ) {
403
+ throw new IllegalStateException ("Already joined or task scope is closed" );
368
404
}
369
405
}
370
406
@@ -373,9 +409,8 @@ private void ensureOwner() {
373
409
* has not joined.
374
410
*/
375
411
private void ensureJoinedIfOwner () {
376
- if (Thread .currentThread () == flock .owner () && !joined ) {
377
- String msg = needToJoin ? "Owner did not join" : "join did not complete" ;
378
- throw new IllegalStateException (msg );
412
+ if (Thread .currentThread () == flock .owner () && state <= ST_JOIN_STARTED ) {
413
+ throw new IllegalStateException ("join not called" );
379
414
}
380
415
}
381
416
@@ -531,7 +566,8 @@ enum State {
531
566
*
532
567
* @return the possibly-null result
533
568
* @throws IllegalStateException if the subtask has not completed, did not complete
534
- * successfully, or the current thread is the task scope owner and it has not joined
569
+ * successfully, or the current thread is the task scope owner invoking this
570
+ * method before {@linkplain #join() joining}
535
571
* @see State#SUCCESS
536
572
*/
537
573
T get ();
@@ -552,7 +588,8 @@ enum State {
552
588
* {@link State#FAILED FAILED} before using this method to get the exception.
553
589
*
554
590
* @throws IllegalStateException if the subtask has not completed, completed with
555
- * a result, or the current thread is the task scope owner and it has not joined
591
+ * a result, or the current thread is the task scope owner invoking this method
592
+ * before {@linkplain #join() joining}
556
593
* @see State#FAILED
557
594
*/
558
595
Throwable exception ();
@@ -576,7 +613,7 @@ enum State {
576
613
* {@code Joiner} that waits for all successful subtasks. It cancels execution and
577
614
* causes {@code join} to throw if any subtask fails.
578
615
* <li> {@link #awaitAll() awaitAll()} creates a {@code Joiner} that waits for all
579
- * subtasks. If does not cancel execution.
616
+ * subtasks. It does not cancel execution or cause {@code join} to throw .
580
617
* </ul>
581
618
*
582
619
* <p> In addition to the methods to create {@code Joiner} objects for common cases,
@@ -673,14 +710,14 @@ default boolean onComplete(Subtask<? extends T> subtask) {
673
710
* Invoked by {@link #join()} to produce the result (or exception) after waiting
674
711
* for all subtasks to complete or execution to be cancelled. The result from this
675
712
* method is returned by the {@code join} method. If this method throws, then
676
- * {@code join} throws {@link ExecutionException } with the exception thrown by
713
+ * {@code join} throws {@link FailedException } with the exception thrown by
677
714
* this method as the cause.
678
715
*
679
- * <p> In normal usage, this method will be called at most once to produce the
680
- * result (or exception). If the {@code join} method is called more than once
681
- * then this method may be called more than once to produce the result. An
682
- * implementation should return an equal result (or throw the same exception) on
683
- * second or subsequent calls to produce the outcome.
716
+ * <p> In normal usage, this method will be called at most once by the {@code join}
717
+ * method to produce the result (or exception). The behavior of this method when
718
+ * invoked directly, and invoked more than once, is not specified. Where possible,
719
+ * an implementation should return an equal result (or throw the same exception)
720
+ * on second or subsequent calls to produce the outcome.
684
721
*
685
722
* @apiNote This method is invoked by the {@code join} method. It should not be
686
723
* invoked directly.
@@ -823,8 +860,8 @@ static <T> Joiner<T, Stream<Subtask<T>>> all(Predicate<Subtask<? extends T>> isD
823
860
* ThreadFactory} to create threads, an optional name for the purposes of monitoring
824
861
* and management, and an optional timeout.
825
862
*
826
- * <p> Creating a {@code StructuredTaskScope} with its 1-arg {@link #open(Joiner) open }
827
- * method uses the <a href="StructuredTaskScope.html#DefaultConfiguration">default
863
+ * <p> Creating a {@code StructuredTaskScope} with {@link #open()} or {@link #open(Joiner)}
864
+ * uses the <a href="StructuredTaskScope.html#DefaultConfiguration">default
828
865
* configuration</a>. The default configuration consists of a thread factory that
829
866
* creates unnamed <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">
830
867
* virtual threads</a>, no name for monitoring and management purposes, and no timeout.
@@ -881,6 +918,45 @@ public sealed interface Config permits ConfigImpl {
881
918
Config withTimeout (Duration timeout );
882
919
}
883
920
921
+ /**
922
+ * Exception thrown by {@link #join()} when the outcome is an exception rather than a
923
+ * result.
924
+ *
925
+ * @since 24
926
+ */
927
+ @ PreviewFeature (feature = PreviewFeature .Feature .STRUCTURED_CONCURRENCY )
928
+ public static class FailedException extends RuntimeException {
929
+ @ java .io .Serial
930
+ static final long serialVersionUID = -1533055100078459923L ;
931
+
932
+ /**
933
+ * Constructs a {@code FailedException} with the specified cause.
934
+ *
935
+ * @param cause the cause, can be {@code null}
936
+ */
937
+ public FailedException (Throwable cause ) {
938
+ super (cause );
939
+ }
940
+ }
941
+
942
+ /**
943
+ * Exception thrown by {@link #join()} if the task scope is created with the timeout
944
+ * expires before or while waiting in {@code join}.
945
+ *
946
+ * @since 24
947
+ * @see Config#withTimeout(Duration)
948
+ */
949
+ @ PreviewFeature (feature = PreviewFeature .Feature .STRUCTURED_CONCURRENCY )
950
+ public static class TimeoutException extends RuntimeException {
951
+ @ java .io .Serial
952
+ static final long serialVersionUID = 705788143955048766L ;
953
+
954
+ /**
955
+ * Constructs a {@code TimeoutException} with no detail message.
956
+ */
957
+ public TimeoutException () { }
958
+ }
959
+
884
960
/**
885
961
* Opens a new structured task scope to use the given {@code Joiner} object and with
886
962
* configuration that is the result of applying the given function to the
@@ -898,7 +974,7 @@ public sealed interface Config permits ConfigImpl {
898
974
* <p> If a {@linkplain Config#withTimeout(Duration) timeout} is set then it starts
899
975
* when the task scope is opened. If the timeout expires before the task scope has
900
976
* {@linkplain #join() joined} then execution is cancelled and the {@code join} method
901
- * throws {@link ExecutionException } with {@link TimeoutException} as the cause.
977
+ * throws {@link FailedException } with {@link TimeoutException} as the cause.
902
978
*
903
979
* <p> The new task scope is owned by the current thread. Only code executing in this
904
980
* thread can {@linkplain #fork(Callable) fork}, {@linkplain #join() join}, or
@@ -965,7 +1041,7 @@ public static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends
965
1041
* or any subtask to fail.
966
1042
*
967
1043
* <p> The {@code join} method returns {@code null} if all subtasks complete successfully.
968
- * It throws {@link ExecutionException } if any subtask fails, with the exception from
1044
+ * It throws {@link FailedException } if any subtask fails, with the exception from
969
1045
* the first subtask to fail as the cause.
970
1046
*
971
1047
* <p> The task scope is created with the <a href="#DefaultConfiguration">default
@@ -1023,9 +1099,9 @@ public static <T> StructuredTaskScope<T, Void> open() {
1023
1099
* @param task the value-returning task for the thread to execute
1024
1100
* @param <U> the result type
1025
1101
* @return the subtask
1026
- * @throws IllegalStateException if this task scope is closed or the owner has already
1027
- * joined
1028
1102
* @throws WrongThreadException if the current thread is not the task scope owner
1103
+ * @throws IllegalStateException if the owner has already {@linkplain #join() joined}
1104
+ * or the task scope is closed
1029
1105
* @throws StructureViolationException if the current scoped value bindings are not
1030
1106
* the same as when the task scope was created
1031
1107
* @throws RejectedExecutionException if the thread factory rejected creating a
@@ -1034,10 +1110,7 @@ public static <T> StructuredTaskScope<T, Void> open() {
1034
1110
public <U extends T > Subtask <U > fork (Callable <? extends U > task ) {
1035
1111
Objects .requireNonNull (task );
1036
1112
ensureOwner ();
1037
- ensureOpen ();
1038
- if (joined ) {
1039
- throw new IllegalStateException ("Already joined" );
1040
- }
1113
+ ensureNotJoined ();
1041
1114
1042
1115
var subtask = new SubtaskImpl <U >(this , task );
1043
1116
@@ -1062,7 +1135,7 @@ public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
1062
1135
}
1063
1136
}
1064
1137
1065
- needToJoin = true ;
1138
+ state = ST_FORKED ;
1066
1139
return subtask ;
1067
1140
}
1068
1141
@@ -1076,9 +1149,9 @@ public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
1076
1149
*
1077
1150
* @param task the task for the thread to execute
1078
1151
* @return the subtask
1079
- * @throws IllegalStateException if this task scope is closed or the owner has already
1080
- * joined
1081
1152
* @throws WrongThreadException if the current thread is not the task scope owner
1153
+ * @throws IllegalStateException if the owner has already {@linkplain #join() joined}
1154
+ * or the task scope is closed
1082
1155
* @throws StructureViolationException if the current scoped value bindings are not
1083
1156
* the same as when the task scope was created
1084
1157
* @throws RejectedExecutionException if the thread factory rejected creating a
@@ -1094,10 +1167,10 @@ public Subtask<? extends T> fork(Runnable task) {
1094
1167
* Waits for all subtasks started in this task scope to complete or execution to be
1095
1168
* cancelled. If a {@linkplain Config#withTimeout(Duration) timeout} has been set
1096
1169
* then execution will be cancelled if the timeout expires before or while waiting.
1097
- * Once finished waiting, the {@code Joiner}'s {@link Joiner#result() result}
1098
- * method is invoked to get the result or throw an exception. If the {@code result}
1099
- * method throws then this method throws {@code ExecutionException } with the
1100
- * exception thrown by the {@code result()} method as the cause.
1170
+ * Once finished waiting, the {@code Joiner}'s {@link Joiner#result() result} method
1171
+ * is invoked to get the result or throw an exception. If the {@code result} method
1172
+ * throws then this method throws {@code FailedException } with the exception thrown
1173
+ * by the {@code result()} method as the cause.
1101
1174
*
1102
1175
* <p> This method waits for all subtasks by waiting for all threads {@linkplain
1103
1176
* #fork(Callable) started} in this task scope to finish execution. It stops waiting
@@ -1106,42 +1179,42 @@ public Subtask<? extends T> fork(Runnable task) {
1106
1179
* to cancel execution, the timeout (if set) expires, or the current thread is
1107
1180
* {@linkplain Thread#interrupt() interrupted}.
1108
1181
*
1109
- * <p> This method may only be invoked by the task scope owner.
1182
+ * <p> This method may only be invoked by the task scope owner, and only once .
1110
1183
*
1111
1184
* @return the {@link Joiner#result() result}
1112
- * @throws IllegalStateException if this task scope is closed
1113
1185
* @throws WrongThreadException if the current thread is not the task scope owner
1114
- * @throws ExecutionException if the joiner's {@code result} method throws, or with
1115
- * cause {@link TimeoutException} if a timeout is set and the timeout expires
1186
+ * @throws IllegalStateException if already joined or this task scope is closed
1187
+ * @throws FailedException if the <i>outcome</i> is an exception, thrown with the
1188
+ * exception from {@link Joiner#result()} as the cause
1189
+ * @throws TimeoutException if a timeout is set and the timeout expires before or
1190
+ * while waiting
1116
1191
* @throws InterruptedException if interrupted while waiting
1117
1192
* @since 24
1118
1193
*/
1119
- public R join () throws ExecutionException , InterruptedException {
1194
+ public R join () throws InterruptedException {
1120
1195
ensureOwner ();
1121
- ensureOpen ();
1196
+ ensureNotJoined ();
1122
1197
1123
- if (!joined ) {
1124
- // owner has attempted to join
1125
- needToJoin = false ;
1198
+ // join started
1199
+ state = ST_JOIN_STARTED ;
1126
1200
1127
- // wait for all subtasks, execution to be cancelled, or interrupt
1128
- flock .awaitAll ();
1129
-
1130
- // subtasks are done or execution is cancelled
1131
- joined = true ;
1132
- }
1201
+ // wait for all subtasks, execution to be cancelled, or interrupt
1202
+ flock .awaitAll ();
1133
1203
1134
1204
// throw if timeout expired
1135
1205
if (timeoutExpired ) {
1136
- throw new ExecutionException ( new TimeoutException () );
1206
+ throw new TimeoutException ();
1137
1207
}
1138
1208
cancelTimeout ();
1139
1209
1210
+ // all subtasks completed or cancelled
1211
+ state = ST_JOIN_COMPLETED ;
1212
+
1140
1213
// invoke joiner to get result
1141
1214
try {
1142
1215
return joiner .result ();
1143
1216
} catch (Throwable e ) {
1144
- throw new ExecutionException (e );
1217
+ throw new FailedException (e );
1145
1218
}
1146
1219
}
1147
1220
@@ -1244,12 +1317,13 @@ public boolean isCancelled() {
1244
1317
@ Override
1245
1318
public void close () {
1246
1319
ensureOwner ();
1247
- if (closed ) {
1320
+ int s = state ;
1321
+ if (s == ST_CLOSED ) {
1248
1322
return ;
1249
1323
}
1250
1324
1251
- // cancel execution if not already joined
1252
- if (! joined ) {
1325
+ // cancel execution if join did not complete
1326
+ if (s < ST_JOIN_COMPLETED ) {
1253
1327
cancelExecution ();
1254
1328
cancelTimeout ();
1255
1329
}
@@ -1258,13 +1332,12 @@ public void close() {
1258
1332
try {
1259
1333
flock .close ();
1260
1334
} finally {
1261
- closed = true ;
1335
+ state = ST_CLOSED ;
1262
1336
}
1263
1337
1264
1338
// throw ISE if the owner didn't join after forking
1265
- if (needToJoin ) {
1266
- needToJoin = false ;
1267
- throw new IllegalStateException ("Owner did not join" );
1339
+ if (s == ST_FORKED ) {
1340
+ throw new IllegalStateException ("Owner did not join after forking" );
1268
1341
}
1269
1342
}
1270
1343
0 commit comments