Skip to content

Commit

Permalink
8296797: java/nio/channels/vthread/BlockingChannelOps.testSocketChann…
Browse files Browse the repository at this point in the history
…elWriteAsyncClose failed with ClosedChannelException

Reviewed-by: chegar, jpai
  • Loading branch information
Alan Bateman committed Nov 13, 2022
1 parent a2cdcdd commit 8eb90e2
Showing 1 changed file with 104 additions and 75 deletions.
179 changes: 104 additions & 75 deletions test/jdk/java/nio/channels/vthread/BlockingChannelOps.java
Expand Up @@ -66,7 +66,6 @@
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;

import jdk.test.lib.Platform;
import jdk.test.lib.thread.VThreadRunner;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
Expand Down Expand Up @@ -107,7 +106,7 @@ public void testSocketChannelRead() throws Exception {
SocketChannel sc1 = connection.channel1();
SocketChannel sc2 = connection.channel2();

// delayed write to sc1
// write to sc1 when current thread blocks in sc2.read
ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
runAfterParkedAsync(() -> sc1.write(bb1));

Expand All @@ -130,7 +129,7 @@ public void testSocketChannelWrite() throws Exception {
SocketChannel sc1 = connection.channel1();
SocketChannel sc2 = connection.channel2();

// delayed read from sc2 to EOF
// read from sc2 to EOF when current thread blocks in sc1.write
Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));

// write to sc1 should block
Expand Down Expand Up @@ -174,7 +173,7 @@ public void testSocketChannelReadInterrupt() throws Exception {
try (var connection = new Connection()) {
SocketChannel sc = connection.channel1();

// delayed interrupt of current thread
// interrupt current thread when it blocks in read
Thread thisThread = Thread.currentThread();
runAfterParkedAsync(thisThread::interrupt);

Expand All @@ -194,21 +193,26 @@ public void testSocketChannelReadInterrupt() throws Exception {
@Test
public void testSocketChannelWriteAsyncClose() throws Exception {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
SocketChannel sc = connection.channel1();
runAfterParkedAsync(sc::close);
try {
ByteBuffer bb = ByteBuffer.allocate(100*1024);
for (;;) {
int n = sc.write(bb);
assertTrue(n > 0);
bb.clear();
boolean retry = true;
while (retry) {
try (var connection = new Connection()) {
SocketChannel sc = connection.channel1();

// close sc when current thread blocks in write
runAfterParkedAsync(sc::close);
try {
ByteBuffer bb = ByteBuffer.allocate(100*1024);
for (;;) {
int n = sc.write(bb);
assertTrue(n > 0);
bb.clear();
}
} catch (AsynchronousCloseException expected) {
// closed when blocked in write
retry = false;
} catch (ClosedChannelException e) {
// closed when not blocked in write, need to retry test
}
} catch (AsynchronousCloseException e) {
// expected
} catch (ClosedChannelException e) {
// on macOS the write loop may block more than once
if (!Platform.isOSX()) throw e;
}
}
});
Expand All @@ -220,25 +224,29 @@ public void testSocketChannelWriteAsyncClose() throws Exception {
@Test
public void testSocketChannelWriteInterrupt() throws Exception {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
SocketChannel sc = connection.channel1();

// delayed interrupt of current thread
Thread thisThread = Thread.currentThread();
runAfterParkedAsync(thisThread::interrupt);

try {
ByteBuffer bb = ByteBuffer.allocate(100*1024);
for (;;) {
int n = sc.write(bb);
assertTrue(n > 0);
bb.clear();
boolean retry = true;
while (retry) {
try (var connection = new Connection()) {
SocketChannel sc = connection.channel1();

// interrupt current thread when it blocks in write
Thread thisThread = Thread.currentThread();
runAfterParkedAsync(thisThread::interrupt);

try {
ByteBuffer bb = ByteBuffer.allocate(100*1024);
for (;;) {
int n = sc.write(bb);
assertTrue(n > 0);
bb.clear();
}
} catch (ClosedByInterruptException e) {
// closed when blocked in write
assertTrue(Thread.interrupted());
retry = false;
} catch (ClosedChannelException e) {
// closed when not blocked in write, need to retry test
}
} catch (ClosedByInterruptException e) {
assertTrue(Thread.interrupted());
} catch (ClosedChannelException e) {
// on macOS the write loop may block more than once
if (!Platform.isOSX()) throw e;
}
}
});
Expand Down Expand Up @@ -266,7 +274,7 @@ private void testSocketAdaptorRead(int timeout) throws Exception {
SocketChannel sc1 = connection.channel1();
SocketChannel sc2 = connection.channel2();

// delayed write to sc1
// write to sc1 when currnet thread blocks reading from sc2
ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
runAfterParkedAsync(() -> sc1.write(bb));

Expand Down Expand Up @@ -308,7 +316,7 @@ public void testServerSocketChannelAccept2() throws Exception {
ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
var sc1 = SocketChannel.open();

// delayed connect
// connect when current thread when it blocks in accept
runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));

// accept should block
Expand Down Expand Up @@ -348,7 +356,7 @@ public void testServerSocketChannelAcceptInterrupt() throws Exception {
InetAddress lh = InetAddress.getLoopbackAddress();
ssc.bind(new InetSocketAddress(lh, 0));

// delayed interrupt of current thread
// interrupt current thread when it blocks in accept
Thread thisThread = Thread.currentThread();
runAfterParkedAsync(thisThread::interrupt);

Expand Down Expand Up @@ -385,7 +393,7 @@ private void testSocketChannelAdaptorAccept(int timeout) throws Exception {
ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
var sc = SocketChannel.open();

// delayed connect
// interrupt current thread when it blocks in accept
runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));

// accept should block
Expand Down Expand Up @@ -435,7 +443,7 @@ public void testDatagramChannelSendReceive2() throws Exception {
InetAddress lh = InetAddress.getLoopbackAddress();
dc2.bind(new InetSocketAddress(lh, 0));

// delayed send from sc1
// send from dc1 when current thread blocked in dc2.receive
ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));

Expand Down Expand Up @@ -475,7 +483,7 @@ public void testDatagramChannelReceiveInterrupt() throws Exception {
InetAddress lh = InetAddress.getLoopbackAddress();
dc.bind(new InetSocketAddress(lh, 0));

// delayed interrupt of current thread
// interrupt current thread when it blocks in receive
Thread thisThread = Thread.currentThread();
runAfterParkedAsync(thisThread::interrupt);

Expand Down Expand Up @@ -513,7 +521,7 @@ private void testDatagramSocketAdaptorReceive(int timeout) throws Exception {
InetAddress lh = InetAddress.getLoopbackAddress();
dc2.bind(new InetSocketAddress(lh, 0));

// delayed send from dc2
// send from dc1 when current thread blocks in dc2 receive
ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));

Expand Down Expand Up @@ -556,7 +564,7 @@ private void testDatagramSocketAdaptorReceiveAsyncClose(int timeout) throws Exce
if (timeout > 0)
dc.socket().setSoTimeout(timeout);

// delayed close of channel/socket
// close channel/socket when current thread blocks in receive
runAfterParkedAsync(dc::close);

assertThrows(SocketException.class, () -> dc.socket().receive(p));
Expand Down Expand Up @@ -592,7 +600,7 @@ private void testDatagramSocketAdaptorReceiveInterrupt(int timeout) throws Excep
if (timeout > 0)
dc.socket().setSoTimeout(timeout);

// delayed interrupt of current thread
// interrupt current thread when it blocks in receive
Thread thisThread = Thread.currentThread();
runAfterParkedAsync(thisThread::interrupt);

Expand Down Expand Up @@ -640,7 +648,7 @@ public void testPipeReadWrite2() throws Exception {
try (Pipe.SinkChannel sink = p.sink();
Pipe.SourceChannel source = p.source()) {

// delayed write from sink
// write from sink when current thread blocks reading from source
ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
runAfterParkedAsync(() -> sink.write(bb1));

Expand All @@ -663,7 +671,7 @@ public void testPipeReadWrite3() throws Exception {
try (Pipe.SinkChannel sink = p.sink();
Pipe.SourceChannel source = p.source()) {

// delayed read from source to EOF
// read from source to EOF when current thread blocking in write
Thread reader = runAfterParkedAsync(() -> readToEOF(source));

// write to sink should block
Expand All @@ -688,7 +696,8 @@ public void testPipeReadWrite3() throws Exception {
public void testPipeReadAsyncClose() throws Exception {
VThreadRunner.run(() -> {
Pipe p = Pipe.open();
try (Pipe.SourceChannel source = p.source()) {
try (Pipe.SinkChannel sink = p.sink();
Pipe.SourceChannel source = p.source()) {
runAfterParkedAsync(source::close);
try {
int n = source.read(ByteBuffer.allocate(100));
Expand All @@ -705,9 +714,10 @@ public void testPipeReadAsyncClose() throws Exception {
public void testPipeReadInterrupt() throws Exception {
VThreadRunner.run(() -> {
Pipe p = Pipe.open();
try (Pipe.SourceChannel source = p.source()) {
try (Pipe.SinkChannel sink = p.sink();
Pipe.SourceChannel source = p.source()) {

// delayed interrupt of current thread
// interrupt current thread when it blocks reading from source
Thread thisThread = Thread.currentThread();
runAfterParkedAsync(thisThread::interrupt);

Expand All @@ -727,17 +737,28 @@ public void testPipeReadInterrupt() throws Exception {
@Test
public void testPipeWriteAsyncClose() throws Exception {
VThreadRunner.run(() -> {
Pipe p = Pipe.open();
try (Pipe.SinkChannel sink = p.sink()) {
runAfterParkedAsync(sink::close);
try {
ByteBuffer bb = ByteBuffer.allocate(100*1024);
for (;;) {
int n = sink.write(bb);
assertTrue(n > 0);
bb.clear();
boolean retry = true;
while (retry) {
Pipe p = Pipe.open();
try (Pipe.SinkChannel sink = p.sink();
Pipe.SourceChannel source = p.source()) {

// close sink when current thread blocks in write
runAfterParkedAsync(sink::close);
try {
ByteBuffer bb = ByteBuffer.allocate(100*1024);
for (;;) {
int n = sink.write(bb);
assertTrue(n > 0);
bb.clear();
}
} catch (AsynchronousCloseException e) {
// closed when blocked in write
retry = false;
} catch (ClosedChannelException e) {
// closed when not blocked in write, need to retry test
}
} catch (AsynchronousCloseException expected) { }
}
}
});
}
Expand All @@ -748,22 +769,30 @@ public void testPipeWriteAsyncClose() throws Exception {
@Test
public void testPipeWriteInterrupt() throws Exception {
VThreadRunner.run(() -> {
Pipe p = Pipe.open();
try (Pipe.SinkChannel sink = p.sink()) {

// delayed interrupt of current thread
Thread thisThread = Thread.currentThread();
runAfterParkedAsync(thisThread::interrupt);

try {
ByteBuffer bb = ByteBuffer.allocate(100*1024);
for (;;) {
int n = sink.write(bb);
assertTrue(n > 0);
bb.clear();
boolean retry = true;
while (retry) {
Pipe p = Pipe.open();
try (Pipe.SinkChannel sink = p.sink();
Pipe.SourceChannel source = p.source()) {

// interrupt current thread when it blocks in write
Thread thisThread = Thread.currentThread();
runAfterParkedAsync(thisThread::interrupt);

try {
ByteBuffer bb = ByteBuffer.allocate(100*1024);
for (;;) {
int n = sink.write(bb);
assertTrue(n > 0);
bb.clear();
}
} catch (ClosedByInterruptException expected) {
// closed when blocked in write
assertTrue(Thread.interrupted());
retry = false;
} catch (ClosedChannelException e) {
// closed when not blocked in write, need to retry test
}
} catch (ClosedByInterruptException expected) {
assertTrue(Thread.interrupted());
}
}
});
Expand Down

1 comment on commit 8eb90e2

@openjdk-notifier
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.