diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Http2ClientImpl.java b/src/java.net.http/share/classes/jdk/internal/net/http/Http2ClientImpl.java index 0f2db7738fc..fa30ed49a74 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2ClientImpl.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2ClientImpl.java @@ -42,6 +42,8 @@ import jdk.internal.net.http.common.MinimalFuture; import jdk.internal.net.http.common.Utils; import jdk.internal.net.http.frame.SettingsFrame; + +import static jdk.internal.net.http.frame.SettingsFrame.INITIAL_CONNECTION_WINDOW_SIZE; import static jdk.internal.net.http.frame.SettingsFrame.INITIAL_WINDOW_SIZE; import static jdk.internal.net.http.frame.SettingsFrame.ENABLE_PUSH; import static jdk.internal.net.http.frame.SettingsFrame.HEADER_TABLE_SIZE; @@ -247,9 +249,13 @@ int getConnectionWindowSize(SettingsFrame clientSettings) { int defaultValue = Math.min(Integer.MAX_VALUE, Math.max(streamWindow, K*K*32)); + // The min value is the max between the streamWindow and + // the initial connection window size + int minValue = Math.max(INITIAL_CONNECTION_WINDOW_SIZE, streamWindow); + return getParameter( "jdk.httpclient.connectionWindowSize", - streamWindow, Integer.MAX_VALUE, defaultValue); + minValue, Integer.MAX_VALUE, defaultValue); } /** diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java index 2953b297561..bcc72f3682a 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java @@ -969,6 +969,34 @@ private String checkMaxOrphanedHeadersExceeded(HeaderFrame hf) { return null; } + // This method is called when a DataFrame that was added + // to a Stream::inputQ is later dropped from the queue + // without being consumed. + // + // Before adding a frame to the queue, the Stream calls + // connection.windowUpdater.canBufferUnprocessedBytes(), which + // increases the count of unprocessed bytes in the connection. + // After consuming the frame, it calls connection.windowUpdater::processed, + // which decrements the count of unprocessed bytes, and possibly + // sends a window update to the peer. + // + // This method is called when connection.windowUpdater::processed + // will not be called, which can happen when consuming the frame + // fails, or when an empty DataFrame terminates the stream, + // or when the stream is cancelled while data is still + // sitting in its inputQ. In the later case, it is called for + // each frame that is dropped from the queue. + final void releaseUnconsumed(DataFrame df) { + windowUpdater.released(df.payloadLength()); + dropDataFrame(df); + } + + // This method can be called directly when a DataFrame is dropped + // before/without having been added to any Stream::inputQ. + // In that case, the number of unprocessed bytes hasn't been incremented + // by the stream, and does not need to be decremented. + // Otherwise, if the frame is dropped after having been added to the + // inputQ, releaseUnconsumed above should be called. final void dropDataFrame(DataFrame df) { if (closed) return; if (debug.on()) { @@ -1087,6 +1115,10 @@ private void handleConnectionFrame(Http2Frame frame) } } + boolean isOpen() { + return !closed && connection.channel().isOpen(); + } + void resetStream(int streamid, int code) { try { if (connection.channel().isOpen()) { @@ -1277,11 +1309,12 @@ private void sendConnectionPreface() throws IOException { // Note that the default initial window size, not to be confused // with the initial window size, is defined by RFC 7540 as // 64K -1. - final int len = windowUpdater.initialWindowSize - DEFAULT_INITIAL_WINDOW_SIZE; - if (len != 0) { + final int len = windowUpdater.initialWindowSize - INITIAL_CONNECTION_WINDOW_SIZE; + assert len >= 0; + if (len > 0) { if (Log.channel()) { Log.logChannel("Sending initial connection window update frame: {0} ({1} - {2})", - len, windowUpdater.initialWindowSize, DEFAULT_INITIAL_WINDOW_SIZE); + len, windowUpdater.initialWindowSize, INITIAL_CONNECTION_WINDOW_SIZE); } windowUpdater.sendWindowUpdate(len); } @@ -1658,6 +1691,19 @@ public ConnectionWindowUpdateSender(Http2Connection connection, int getStreamId() { return 0; } + + @Override + protected boolean windowSizeExceeded(long received) { + if (connection.isOpen()) { + try { + connection.protocolError(ErrorFrame.FLOW_CONTROL_ERROR, + "connection window exceeded"); + } catch (IOException io) { + connection.shutdown(io); + } + } + return true; + } } /** diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java index f97632c12a1..9ed28508bae 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java @@ -163,7 +163,7 @@ class Stream<T> extends ExchangeImpl<T> { * sending any data. Will be null for PushStreams, as they cannot send data. */ private final WindowController windowController; - private final WindowUpdateSender windowUpdater; + private final WindowUpdateSender streamWindowUpdater; @Override HttpConnection connection() { @@ -203,7 +203,8 @@ private void schedule() { int size = Utils.remaining(dsts, Integer.MAX_VALUE); if (size == 0 && finished) { inputQ.remove(); - connection.ensureWindowUpdated(df); // must update connection window + // consumed will not be called + connection.releaseUnconsumed(df); // must update connection window Log.logTrace("responseSubscriber.onComplete"); if (debug.on()) debug.log("incoming: onComplete"); sched.stop(); @@ -219,7 +220,11 @@ private void schedule() { try { subscriber.onNext(dsts); } catch (Throwable t) { - connection.dropDataFrame(df); // must update connection window + // Data frames that have been added to the inputQ + // must be released using releaseUnconsumed() to + // account for the amount of unprocessed bytes + // tracked by the connection.windowUpdater. + connection.releaseUnconsumed(df); throw t; } if (consumed(df)) { @@ -272,7 +277,11 @@ private void drainInputQueue() { Http2Frame frame; while ((frame = inputQ.poll()) != null) { if (frame instanceof DataFrame) { - connection.dropDataFrame((DataFrame)frame); + // Data frames that have been added to the inputQ + // must be released using releaseUnconsumed() to + // account for the amount of unprocessed bytes + // tracked by the connection.windowUpdater. + connection.releaseUnconsumed((DataFrame)frame); } } } @@ -297,12 +306,13 @@ private boolean consumed(DataFrame df) { boolean endStream = df.getFlag(DataFrame.END_STREAM); if (len == 0) return endStream; - connection.windowUpdater.update(len); - + connection.windowUpdater.processed(len); if (!endStream) { + streamWindowUpdater.processed(len); + } else { // Don't send window update on a stream which is // closed or half closed. - windowUpdater.update(len); + streamWindowUpdater.released(len); } // true: end of stream; false: more data coming @@ -343,8 +353,21 @@ public String toString() { } private void receiveDataFrame(DataFrame df) { - inputQ.add(df); - sched.runOrSchedule(); + try { + int len = df.payloadLength(); + if (len > 0) { + // we return from here if the connection is being closed. + if (!connection.windowUpdater.canBufferUnprocessedBytes(len)) return; + // we return from here if the stream is being closed. + if (closed || !streamWindowUpdater.canBufferUnprocessedBytes(len)) { + connection.releaseUnconsumed(df); + return; + } + } + inputQ.add(df); + } finally { + sched.runOrSchedule(); + } } /** Handles a RESET frame. RESET is always handled inline in the queue. */ @@ -429,7 +452,7 @@ CompletableFuture<ExchangeImpl<T>> sendBodyAsync() { this.responseHeadersBuilder = new HttpHeadersBuilder(); this.rspHeadersConsumer = new HeadersConsumer(); this.requestPseudoHeaders = createPseudoHeaders(request); - this.windowUpdater = new StreamWindowUpdateSender(connection); + this.streamWindowUpdater = new StreamWindowUpdateSender(connection); } /** @@ -1281,12 +1304,18 @@ void cancel(IOException cause) { @Override void onProtocolError(final IOException cause) { + onProtocolError(cause, ResetFrame.PROTOCOL_ERROR); + } + + void onProtocolError(final IOException cause, int code) { if (debug.on()) { - debug.log("cancelling exchange on stream %d due to protocol error: %s", streamid, cause.getMessage()); + debug.log("cancelling exchange on stream %d due to protocol error [%s]: %s", + streamid, ErrorFrame.stringForCode(code), + cause.getMessage()); } Log.logError("cancelling exchange on stream {0} due to protocol error: {1}\n", streamid, cause); // send a RESET frame and close the stream - cancelImpl(cause, ResetFrame.PROTOCOL_ERROR); + cancelImpl(cause, code); } void connectionClosing(Throwable cause) { @@ -1554,6 +1583,14 @@ String dbgString() { return dbgString = dbg; } } + + @Override + protected boolean windowSizeExceeded(long received) { + onProtocolError(new ProtocolException("stream " + streamid + + " flow control window exceeded"), + ResetFrame.FLOW_CONTROL_ERROR); + return true; + } } /** diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java b/src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java index e85b92786f6..b92cb5358d7 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -31,15 +31,30 @@ import jdk.internal.net.http.frame.WindowUpdateFrame; import jdk.internal.net.http.common.Utils; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +/** + * A class that tracks the amount of flow controlled + * data received on an HTTP/2 connection + */ abstract class WindowUpdateSender { final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); + // The threshold at which window updates are sent in bytes final int limit; + // The flow control window in bytes + final int windowSize; final Http2Connection connection; - final AtomicInteger received = new AtomicInteger(0); + // The amount of flow controlled data received and processed, in bytes, + // since the start of the window. + // The window is exhausted when received + unprocessed >= windowSize + final AtomicLong received = new AtomicLong(); + // The amount of flow controlled data received and unprocessed, in bytes, + // since the start of the window. + // The window is exhausted when received + unprocessed >= windowSize + final AtomicLong unprocessed = new AtomicLong(); WindowUpdateSender(Http2Connection connection) { this(connection, connection.clientSettings.getParameter(SettingsFrame.INITIAL_WINDOW_SIZE)); @@ -51,6 +66,7 @@ abstract class WindowUpdateSender { WindowUpdateSender(Http2Connection connection, int maxFrameSize, int initWindowSize) { this.connection = connection; + this.windowSize = initWindowSize; int v0 = Math.max(0, initWindowSize - maxFrameSize); int v1 = (initWindowSize + (maxFrameSize - 1)) / maxFrameSize; v1 = v1 * maxFrameSize / 2; @@ -64,15 +80,118 @@ abstract class WindowUpdateSender { maxFrameSize, initWindowSize, limit); } + // O for the connection window, > 0 for a stream window abstract int getStreamId(); + + /** + * {@return {@code true} if buffering the given amount of + * flow controlled data would not exceed the flow control + * window} + * <p> + * This method is called before buffering and processing + * a DataFrame. The count of unprocessed bytes is incremented + * by the given amount, and checked against the number of + * available bytes in the flow control window. + * <p> + * This method returns {@code true} if the bytes can be buffered + * without exceeding the flow control window, {@code false} + * if the flow control window is exceeded and corrective + * action (close/reset) has been taken. + * <p> + * When this method returns true, either {@link #processed(int)} + * or {@link #released(int)} must eventually be called to release + * the bytes from the flow control window. + * + * @implSpec + * an HTTP/2 endpoint may disable its own flow control + * (see <a href="https://www.rfc-editor.org/rfc/rfc9113.html#section-5.2.1"> + * RFC 9113, section 5.2.1</a>), in which case this + * method may return true even if the flow control window would + * be exceeded: that is, the flow control window is exceeded but + * the endpoint decided to take no corrective action. + * + * @param len a number of unprocessed bytes, which + * the caller wants to buffer. + */ + boolean canBufferUnprocessedBytes(int len) { + return !checkWindowSizeExceeded(unprocessed.addAndGet(len)); + } + + // adds the provided amount to the amount of already + // received and processed bytes and checks whether the + // flow control window is exceeded. If so, take + // corrective actions and return true. + private boolean checkWindowSizeExceeded(long len) { + // because windowSize is bound by Integer.MAX_VALUE + // we will never reach the point where received.get() + len + // could overflow + long rcv = received.get() + len; + return rcv > windowSize && windowSizeExceeded(rcv); + } + + /** + * Called after unprocessed buffered bytes have been + * processed, to release part of the flow control window + * + * @apiNote this method is called only when releasing bytes + * that where buffered after calling + * {@link #canBufferUnprocessedBytes(int)}. + * + * @param delta the amount of processed bytes to release + */ + void processed(int delta) { + long rest = unprocessed.addAndGet(-delta); + assert rest >= 0; + update(delta); + } + + /** + * Called when it is desired to release unprocessed bytes + * without processing them, or without triggering the + * sending of a window update. This method can be called + * instead of calling {@link #processed(int)}. + * When this method is called instead of calling {@link #processed(int)}, + * it should generally be followed by a call to {@link #update(int)}, + * unless the stream or connection is being closed. + * + * @apiNote this method should only be called to release bytes that + * have been buffered after calling {@link + * #canBufferUnprocessedBytes(int)}. + * + * @param delta the amount of bytes to release from the window + * + * @return the amount of remaining unprocessed bytes + */ + long released(int delta) { + long rest = unprocessed.addAndGet(-delta); + assert rest >= 0; + return rest; + } + + /** + * This method is called to update the flow control window, + * and possibly send a window update + * + * @apiNote this method can be called directly if a frame is + * dropped before calling {@link #canBufferUnprocessedBytes(int)}. + * Otherwise, either {@link #processed(int)} or {@link #released(int)} + * should be called, depending on whether sending a window update + * is desired or not. It is typically not desired to send an update + * if the stream or connection is being closed. + * + * @param delta the amount of bytes released from the window. + */ void update(int delta) { - int rcv = received.addAndGet(delta); + long rcv = received.addAndGet(delta); if (debug.on()) debug.log("update: %d, received: %d, limit: %d", delta, rcv, limit); + if (rcv > windowSize && windowSizeExceeded(rcv)) { + return; + } if (rcv > limit) { synchronized (this) { - int tosend = received.get(); - if( tosend > limit) { + int tosend = (int)Math.min(received.get(), Integer.MAX_VALUE); + if (tosend > limit) { received.getAndAdd(-tosend); sendWindowUpdate(tosend); } @@ -82,6 +201,7 @@ void update(int delta) { void sendWindowUpdate(int delta) { if (debug.on()) debug.log("sending window update: %d", delta); + assert delta > 0 : "illegal window update delta: " + delta; connection.sendUnorderedFrame(new WindowUpdateFrame(getStreamId(), delta)); } @@ -99,4 +219,16 @@ String dbgString() { } } + /** + * Called when the flow control window size is exceeded + * This method may return false if flow control is disabled + * in this endpoint. + * + * @param received the amount of data received, which is greater + * than {@code windowSize} + * @return {@code true} if the error was reported to the peer + * and no further window update should be sent. + */ + protected abstract boolean windowSizeExceeded(long received); + } diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/frame/FramesDecoder.java b/src/java.net.http/share/classes/jdk/internal/net/http/frame/FramesDecoder.java index 72c7750eb43..7ebfa090830 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/frame/FramesDecoder.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/frame/FramesDecoder.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -463,6 +463,16 @@ private Http2Frame parseSettingsFrame(int frameLength, int streamid, int flags) int val = getInt(); if (id > 0 && id <= SettingsFrame.MAX_PARAM) { // a known parameter. Ignore otherwise + if (id == SettingsFrame.INITIAL_WINDOW_SIZE && val < 0) { + return new MalformedFrame(ErrorFrame.FLOW_CONTROL_ERROR, + "SettingsFrame with INITIAL_WINDOW_SIZE > 2^31 -1: " + + (val & 0xffffffffL)); + } + if (id == SettingsFrame.MAX_FRAME_SIZE && (val < 16384 || val > 16777215)) { + return new MalformedFrame(ErrorFrame.PROTOCOL_ERROR, + "SettingsFrame with MAX_FRAME_SIZE out of range: " + + (val & 0xffffffffL)); + } sf.setParameter(id, val); // TODO parameters validation } } @@ -530,7 +540,12 @@ private Http2Frame parseWindowUpdateFrame(int frameLength, int streamid, int fla return new MalformedFrame(ErrorFrame.FRAME_SIZE_ERROR, "WindowUpdateFrame length is "+ frameLength+", expected 4"); } - return new WindowUpdateFrame(streamid, getInt() & 0x7fffffff); + int update = getInt(); + if (update < 0) { + return new MalformedFrame(ErrorFrame.FLOW_CONTROL_ERROR, + "WindowUpdateFrame with value > 2^31 -1 " + (update & 0xffffffffL)); + } + return new WindowUpdateFrame(streamid, update & 0x7fffffff); } private Http2Frame parseContinuationFrame(int frameLength, int streamid, int flags) { diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/frame/SettingsFrame.java b/src/java.net.http/share/classes/jdk/internal/net/http/frame/SettingsFrame.java index eef9f615bb3..ac306bd4551 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/frame/SettingsFrame.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/frame/SettingsFrame.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -172,6 +172,11 @@ public synchronized void update(SettingsFrame updated) { // The initial value is 2^14 (16,384) octets. public static final int DEFAULT_MAX_FRAME_SIZE = 16 * K; + // Initial connection window size. This cannot be updated using the + // SETTINGS frame. + public static final int INITIAL_CONNECTION_WINDOW_SIZE = DEFAULT_INITIAL_WINDOW_SIZE; + + public static SettingsFrame defaultRFCSettings() { SettingsFrame f = new SettingsFrame(); f.setParameter(ENABLE_PUSH, DEFAULT_ENABLE_PUSH); diff --git a/test/jdk/java/net/httpclient/ProxySelectorTest.java b/test/jdk/java/net/httpclient/ProxySelectorTest.java index c4bbad6ebf8..de1855277d3 100644 --- a/test/jdk/java/net/httpclient/ProxySelectorTest.java +++ b/test/jdk/java/net/httpclient/ProxySelectorTest.java @@ -385,7 +385,7 @@ public void setup() throws Exception { public void teardown() throws Exception { client = null; Thread.sleep(100); - AssertionError fail = TRACKER.check(500); + AssertionError fail = TRACKER.check(1500); try { proxy.stop(); authproxy.stop(); diff --git a/test/jdk/java/net/httpclient/http2/ConnectionFlowControlTest.java b/test/jdk/java/net/httpclient/http2/ConnectionFlowControlTest.java new file mode 100644 index 00000000000..c60fd0c944f --- /dev/null +++ b/test/jdk/java/net/httpclient/http2/ConnectionFlowControlTest.java @@ -0,0 +1,375 @@ +/* + * Copyright (c) 2024, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +/* + * @test + * @bug 8342075 + * @summary checks connection flow control + * @library /test/lib server/ ../ + * @build jdk.test.lib.net.SimpleSSLContext HttpServerAdapters + * Http2TestServer + * @modules java.net.http/jdk.internal.net.http.common + * java.net.http/jdk.internal.net.http.frame + * java.net.http/jdk.internal.net.http.hpack + * java.logging + * java.base/sun.net.www.http + * java.base/sun.net.www + * java.base/sun.net + * @run testng/othervm -Djdk.internal.httpclient.debug=true + * -Djdk.httpclient.connectionWindowSize=65535 + * -Djdk.httpclient.windowsize=16384 + * ConnectionFlowControlTest + */ + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.ProtocolException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpHeaders; +import java.net.http.HttpRequest; +import java.net.http.HttpRequest.BodyPublishers; +import java.net.http.HttpResponse; +import java.net.http.HttpResponse.BodyHandler; +import java.net.http.HttpResponse.BodyHandlers; +import java.net.http.HttpResponse.BodySubscriber; +import java.net.http.HttpResponse.ResponseInfo; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Flow.Subscription; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSession; + +import jdk.internal.net.http.common.HttpHeadersBuilder; +import jdk.internal.net.http.frame.ContinuationFrame; +import jdk.internal.net.http.frame.HeaderFrame; +import jdk.internal.net.http.frame.HeadersFrame; +import jdk.internal.net.http.frame.Http2Frame; +import jdk.internal.net.http.frame.SettingsFrame; +import jdk.test.lib.Utils; +import jdk.test.lib.net.SimpleSSLContext; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import static java.util.List.of; +import static java.util.Map.entry; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +public class ConnectionFlowControlTest { + + SSLContext sslContext; + HttpServerAdapters.HttpTestServer http2TestServer; // HTTP/2 ( h2c ) + HttpServerAdapters.HttpTestServer https2TestServer; // HTTP/2 ( h2 ) + String http2URI; + String https2URI; + final AtomicInteger reqid = new AtomicInteger(); + + + @DataProvider(name = "variants") + public Object[][] variants() { + return new Object[][] { + { http2URI }, + { https2URI }, + }; + } + + @Test(dataProvider = "variants") + void test(String uri) throws Exception { + System.out.printf("%ntesting %s%n", uri); + ConcurrentHashMap<String, CompletableFuture<String>> responseSent = new ConcurrentHashMap<>(); + ConcurrentHashMap<String, HttpResponse<InputStream>> responses = new ConcurrentHashMap<>(); + FCHttp2TestExchange.setResponseSentCB((s) -> responseSent.get(s).complete(s)); + int connectionWindowSize = Math.max(Integer.getInteger( + "jdk.httpclient.connectionWindowSize", 65535), 65535); + int windowSize = Math.max(Integer.getInteger( + "jdk.httpclient.windowsize", 65535), 16384); + int max = connectionWindowSize / windowSize + 2; + System.out.printf("connection window: %s, stream window: %s, will make %s requests%n", + connectionWindowSize, windowSize, max); + + String label = null; + + Throwable t = null; + HttpClient client = HttpClient.newBuilder().executor(Executors.newCachedThreadPool()).sslContext(sslContext).build(); + try { + try { + String[] keys = new String[max]; + for (int i = 0; i < max; i++) { + String query = "reqId=" + reqid.incrementAndGet(); + keys[i] = query; + URI uriWithQuery = URI.create(uri + "?" + query); + CompletableFuture<String> sent = new CompletableFuture<>(); + responseSent.put(query, sent); + HttpRequest request = HttpRequest.newBuilder(uriWithQuery) + .POST(BodyPublishers.ofString("Hello there!")) + .build(); + System.out.println("\nSending request:" + uriWithQuery); + final HttpClient cc = client; + var response = cc.send(request, BodyHandlers.ofInputStream()); + responses.put(query, response); + String ckey = response.headers().firstValue("X-Connection-Key").get(); + if (label == null) label = ckey; + try { + if (i < max - 1) { + // the connection window might be exceeded at i == max - 2, which + // means that the last request could go on a new connection. + assertEquals(ckey, label, "Unexpected key for " + query); + } + } catch (AssertionError ass) { + // since we won't pull all responses, the client + // will not exit unless we ask it to shutdown now. + ExecutorService exec = (ExecutorService)client.executor().get(); + exec.shutdownNow(); + throw ass; + } + } + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + // ignore + } + CompletableFuture<?> allsent = CompletableFuture.allOf(responseSent.values().stream() + .toArray(CompletableFuture<?>[]::new)); + allsent.get(); + for (int i = 0; i < max; i++) { + try { + String query = keys[i]; + var response = responses.get(keys[i]); + String ckey = response.headers().firstValue("X-Connection-Key").get(); + if (label == null) label = ckey; + assertEquals(ckey, label, "Unexpected key for " + query); + int wait = uri.startsWith("https://") ? 500 : 250; + try (InputStream is = response.body()) { + Thread.sleep(Utils.adjustTimeout(wait)); + is.readAllBytes(); + } + System.out.printf("%s did not fail: %s%n", query, response.statusCode()); + } catch (AssertionError t1) { + // since we won't pull all responses, the client + // will not exit unless we ask it to shutdown now. + ExecutorService exec = (ExecutorService)client.executor().get(); + exec.shutdownNow(); + throw t1; + } catch (Throwable t0) { + System.out.println("Got EXPECTED: " + t0); + if (t0 instanceof ExecutionException) { + t0 = t0.getCause(); + } + t = t0; + try { + assertDetailMessage(t0, i); + } catch (AssertionError e) { + // since we won't pull all responses, the client + // will not exit unless we ask it to shutdown now. + ExecutorService exec = (ExecutorService)client.executor().get(); + exec.shutdownNow(); + throw e; + } + } + } + } catch (Throwable t0) { + System.out.println("Got EXPECTED: " + t0); + if (t0 instanceof ExecutionException) { + t0 = t0.getCause(); + } + t = t0; + } + if (t == null) { + // we could fail here if we haven't waited long enough + fail("Expected exception, got all responses, should sleep time be raised?"); + } else { + assertDetailMessage(t, max); + } + String query = "reqId=" + reqid.incrementAndGet(); + URI uriWithQuery = URI.create(uri + "?" + query); + CompletableFuture<String> sent = new CompletableFuture<>(); + responseSent.put(query, sent); + HttpRequest request = HttpRequest.newBuilder(uriWithQuery) + .POST(BodyPublishers.ofString("Hello there!")) + .build(); + System.out.println("\nSending last request:" + uriWithQuery); + var response = client.send(request, BodyHandlers.ofString()); + if (label != null) { + String ckey = response.headers().firstValue("X-Connection-Key").get(); + assertNotEquals(ckey, label); + System.out.printf("last request %s sent on different connection as expected:" + + "\n\tlast: %s\n\tprevious: %s%n", query, ckey, label); + } + } finally { + ExecutorService exec = (ExecutorService)client.executor().get(); + exec.shutdownNow(); + } + } + + // Assertions based on implementation specific detail messages. Keep in + // sync with implementation. + static void assertDetailMessage(Throwable throwable, int iterationIndex) { + try { + Throwable cause = throwable; + while (cause != null) { + if (cause instanceof ProtocolException) { + if (cause.getMessage().contains("connection window exceeded")) { + System.out.println("Found expected exception: " + cause); + return; + } + } + cause = cause.getCause(); + } + throw new AssertionError( + "ProtocolException(\"protocol error: connection window exceeded\") not found", + throwable); + } catch (AssertionError e) { + System.out.println("Exception does not match expectation: " + throwable); + throwable.printStackTrace(System.out); + throw e; + } + } + + @BeforeTest + public void setup() throws Exception { + sslContext = new SimpleSSLContext().get(); + if (sslContext == null) + throw new AssertionError("Unexpected null sslContext"); + + var http2TestServer = new Http2TestServer("localhost", false, 0); + http2TestServer.addHandler(new Http2TestHandler(), "/http2/"); + this.http2TestServer = HttpServerAdapters.HttpTestServer.of(http2TestServer); + http2URI = "http://" + this.http2TestServer.serverAuthority() + "/http2/x"; + + var https2TestServer = new Http2TestServer("localhost", true, sslContext); + https2TestServer.addHandler(new Http2TestHandler(), "/https2/"); + this.https2TestServer = HttpServerAdapters.HttpTestServer.of(https2TestServer); + https2URI = "https://" + this.https2TestServer.serverAuthority() + "/https2/x"; + + // Override the default exchange supplier with a custom one to enable + // particular test scenarios + http2TestServer.setExchangeSupplier(FCHttp2TestExchange::new); + https2TestServer.setExchangeSupplier(FCHttp2TestExchange::new); + + this.http2TestServer.start(); + this.https2TestServer.start(); + } + + @AfterTest + public void teardown() throws Exception { + http2TestServer.stop(); + https2TestServer.stop(); + } + + static class Http2TestHandler implements Http2Handler { + + @Override + public void handle(Http2TestExchange t) throws IOException { + String query = t.getRequestURI().getRawQuery(); + + try (InputStream is = t.getRequestBody(); + OutputStream os = t.getResponseBody()) { + + byte[] bytes = is.readAllBytes(); + System.out.println("Server " + t.getLocalAddress() + " received:\n" + + t.getRequestURI() + ": " + new String(bytes, StandardCharsets.UTF_8)); + t.getResponseHeaders().setHeader("X-Connection-Key", t.getConnectionKey()); + + if (bytes.length == 0) bytes = "no request body!".getBytes(StandardCharsets.UTF_8); + int window = Math.max(16384, Integer.getInteger("jdk.httpclient.windowsize", 2*16*1024)); + final int maxChunkSize; + if (t instanceof FCHttp2TestExchange) { + maxChunkSize = Math.min(window, ((FCHttp2TestExchange)t).conn.getMaxFrameSize()); + } else { + maxChunkSize = Math.min(window, SettingsFrame.MAX_FRAME_SIZE); + } + byte[] resp = bytes.length < maxChunkSize + ? bytes + : Arrays.copyOfRange(bytes, 0, maxChunkSize); + int max = (window / resp.length); + // send in chunks + t.sendResponseHeaders(200, 0); + int sent = 0; + for (int i=0; i<=max; i++) { + int len = Math.min(resp.length, window - sent); + if (len <= 0) break; + if (os instanceof BodyOutputStream) { + try { + // we don't wait for the stream window, but we want + // to wait for the connection window + ((BodyOutputStream)os).waitForStreamWindow(len); + } catch (InterruptedException ie) { + // ignore and continue... + } + } + ((BodyOutputStream) os).writeUncontrolled(resp, 0, len); + sent += len; + } + if (sent != window) fail("should have sent " + window + ", sent " + sent); + } + if (t instanceof FCHttp2TestExchange) { + ((FCHttp2TestExchange)t).responseSent(query); + } else { + fail("Exchange is not " + FCHttp2TestExchange.class.getName() + + " but " + t.getClass().getName()); + } + } + } + + // A custom Http2TestExchangeImpl that overrides sendResponseHeaders to + // allow headers to be sent with a number of CONTINUATION frames. + static class FCHttp2TestExchange extends Http2TestExchangeImpl { + static volatile Consumer<String> responseSentCB; + static void setResponseSentCB(Consumer<String> responseSentCB) { + FCHttp2TestExchange.responseSentCB = responseSentCB; + } + + final Http2TestServerConnection conn; + FCHttp2TestExchange(int streamid, String method, HttpHeaders reqheaders, + HttpHeadersBuilder rspheadersBuilder, URI uri, InputStream is, + SSLSession sslSession, BodyOutputStream os, + Http2TestServerConnection conn, boolean pushAllowed) { + super(streamid, method, reqheaders, rspheadersBuilder, uri, is, sslSession, os, conn, pushAllowed); + this.conn = conn; + } + public void responseSent(String query) { + System.out.println("Server: response sent for " + query); + responseSentCB.accept(query); + } + + } +} diff --git a/test/jdk/java/net/httpclient/http2/StreamFlowControlTest.java b/test/jdk/java/net/httpclient/http2/StreamFlowControlTest.java new file mode 100644 index 00000000000..dd7dbc2f5f7 --- /dev/null +++ b/test/jdk/java/net/httpclient/http2/StreamFlowControlTest.java @@ -0,0 +1,357 @@ +/* + * Copyright (c) 2024, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +/* + * @test + * @bug 8342075 + * @library /test/lib server/ ../ + * @build jdk.test.lib.net.SimpleSSLContext HttpServerAdapters + * Http2TestServer + * @modules java.net.http/jdk.internal.net.http.common + * java.net.http/jdk.internal.net.http.frame + * java.net.http/jdk.internal.net.http.hpack + * java.logging + * java.base/sun.net.www.http + * java.base/sun.net.www + * java.base/sun.net + * @run testng/othervm -Djdk.internal.httpclient.debug=true + * -Djdk.httpclient.connectionWindowSize=65535 + * -Djdk.httpclient.windowsize=16384 + * StreamFlowControlTest + */ + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.ProtocolException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpHeaders; +import java.net.http.HttpRequest; +import java.net.http.HttpRequest.BodyPublishers; +import java.net.http.HttpResponse; +import java.net.http.HttpResponse.BodyHandlers; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSession; + +import jdk.internal.net.http.common.HttpHeadersBuilder; +import jdk.internal.net.http.frame.SettingsFrame; +import jdk.test.lib.Utils; +import jdk.test.lib.net.SimpleSSLContext; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; + +public class StreamFlowControlTest { + + SSLContext sslContext; + HttpServerAdapters.HttpTestServer http2TestServer; // HTTP/2 ( h2c ) + HttpServerAdapters.HttpTestServer https2TestServer; // HTTP/2 ( h2 ) + String http2URI; + String https2URI; + final AtomicInteger reqid = new AtomicInteger(); + + + @DataProvider(name = "variants") + public Object[][] variants() { + return new Object[][] { + { http2URI, false }, + { https2URI, false }, + { http2URI, true }, + { https2URI, true }, + }; + } + + + @Test(dataProvider = "variants") + void test(String uri, + boolean sameClient) + throws Exception + { + System.out.printf("%ntesting test(%s, %s)%n", uri, sameClient); + ConcurrentHashMap<String, CompletableFuture<String>> responseSent = new ConcurrentHashMap<>(); + FCHttp2TestExchange.setResponseSentCB((s) -> responseSent.get(s).complete(s)); + + HttpClient client = null; + try { + int max = sameClient ? 10 : 3; + String label = null; + for (int i = 0; i < max; i++) { + if (!sameClient || client == null) + client = HttpClient.newBuilder() + .executor(Executors.newCachedThreadPool()) + .sslContext(sslContext).build(); + + String query = "reqId=" + reqid.incrementAndGet(); + URI uriWithQuery = URI.create(uri + "?" + query); + CompletableFuture<String> sent = new CompletableFuture<>(); + responseSent.put(query, sent); + HttpRequest request = HttpRequest.newBuilder(uriWithQuery) + .POST(BodyPublishers.ofString("Hello there!")) + .build(); + System.out.println("\nSending request:" + uriWithQuery); + final HttpClient cc = client; + try { + HttpResponse<InputStream> response = cc.send(request, BodyHandlers.ofInputStream()); + if (sameClient) { + String key = response.headers().firstValue("X-Connection-Key").get(); + if (label == null) label = key; + assertEquals(key, label, "Unexpected key for " + query); + } + sent.join(); + // we have to pull to get the exception, but slow enough + // so that DataFrames are buffered up to the point that + // the window is exceeded... + int wait = uri.startsWith("https://") ? 500 : 350; + try (InputStream is = response.body()) { + Thread.sleep(Utils.adjustTimeout(wait)); + is.readAllBytes(); + } + // we could fail here if we haven't waited long enough + fail("Expected exception, got :" + response + ", should sleep time be raised?"); + } catch (IOException ioe) { + System.out.println("Got EXPECTED: " + ioe); + assertDetailMessage(ioe, i); + } finally { + if (!sameClient && client != null) { + ExecutorService exec = (ExecutorService)client.executor().get(); + exec.shutdown(); + client = null; + } + } + } + } finally { + if (sameClient && client != null) { + ExecutorService exec = (ExecutorService)client.executor().get(); + exec.shutdown(); + } + } + + } + + @Test(dataProvider = "variants") + void testAsync(String uri, + boolean sameClient) + { + System.out.printf("%ntesting testAsync(%s, %s)%n", uri, sameClient); + ConcurrentHashMap<String, CompletableFuture<String>> responseSent = new ConcurrentHashMap<>(); + FCHttp2TestExchange.setResponseSentCB((s) -> responseSent.get(s).complete(s)); + + HttpClient client = null; + try { + int max = sameClient ? 5 : 3; + String label = null; + for (int i = 0; i < max; i++) { + if (!sameClient || client == null) + client = HttpClient.newBuilder() + .executor(Executors.newCachedThreadPool()) + .sslContext(sslContext).build(); + + String query = "reqId=" + reqid.incrementAndGet(); + URI uriWithQuery = URI.create(uri + "?" + query); + CompletableFuture<String> sent = new CompletableFuture<>(); + responseSent.put(query, sent); + HttpRequest request = HttpRequest.newBuilder(uriWithQuery) + .POST(BodyPublishers.ofString("Hello there!")) + .build(); + System.out.println("\nSending request:" + uriWithQuery); + final HttpClient cc = client; + + Throwable t = null; + try { + HttpResponse<InputStream> response = cc.sendAsync(request, BodyHandlers.ofInputStream()).get(); + if (sameClient) { + String key = response.headers().firstValue("X-Connection-Key").get(); + if (label == null) label = key; + assertEquals(key, label, "Unexpected key for " + query); + } + sent.join(); + int wait = uri.startsWith("https://") ? 600 : 300; + try (InputStream is = response.body()) { + Thread.sleep(Utils.adjustTimeout(wait)); + is.readAllBytes(); + } + // we could fail here if we haven't waited long enough + fail("Expected exception, got :" + response + ", should sleep time be raised?"); + } catch (Throwable t0) { + System.out.println("Got EXPECTED: " + t0); + if (t0 instanceof ExecutionException) { + t0 = t0.getCause(); + } + t = t0; + } finally { + if (!sameClient && client != null) { + ExecutorService exec = (ExecutorService)client.executor().get(); + exec.shutdown(); + client = null; + } + } + assertDetailMessage(t, i); + } + } finally { + if (sameClient && client != null) { + ExecutorService exec = (ExecutorService)client.executor().get(); + exec.shutdown(); + } + } + } + + // Assertions based on implementation specific detail messages. Keep in + // sync with implementation. + static void assertDetailMessage(Throwable throwable, int iterationIndex) { + try { + Throwable cause = throwable; + while (cause != null) { + if (cause instanceof ProtocolException) { + if (cause.getMessage().matches("stream [0-9]+ flow control window exceeded")) { + System.out.println("Found expected exception: " + cause); + return; + } + } + cause = cause.getCause(); + } + throw new AssertionError( + "ProtocolException(\"stream X flow control window exceeded\") not found", + throwable); + } catch (AssertionError e) { + System.out.println("Exception does not match expectation: " + throwable); + throwable.printStackTrace(System.out); + throw e; + } + } + + @BeforeTest + public void setup() throws Exception { + sslContext = new SimpleSSLContext().get(); + if (sslContext == null) + throw new AssertionError("Unexpected null sslContext"); + + var http2TestServer = new Http2TestServer("localhost", false, 0); + http2TestServer.addHandler(new Http2TestHandler(), "/http2/"); + this.http2TestServer = HttpServerAdapters.HttpTestServer.of(http2TestServer); + http2URI = "http://" + this.http2TestServer.serverAuthority() + "/http2/x"; + + var https2TestServer = new Http2TestServer("localhost", true, sslContext); + https2TestServer.addHandler(new Http2TestHandler(), "/https2/"); + this.https2TestServer = HttpServerAdapters.HttpTestServer.of(https2TestServer); + https2URI = "https://" + this.https2TestServer.serverAuthority() + "/https2/x"; + + // Override the default exchange supplier with a custom one to enable + // particular test scenarios + http2TestServer.setExchangeSupplier(FCHttp2TestExchange::new); + https2TestServer.setExchangeSupplier(FCHttp2TestExchange::new); + + this.http2TestServer.start(); + this.https2TestServer.start(); + } + + @AfterTest + public void teardown() throws Exception { + http2TestServer.stop(); + https2TestServer.stop(); + } + + static class Http2TestHandler implements Http2Handler { + + @Override + public void handle(Http2TestExchange t) throws IOException { + String query = t.getRequestURI().getRawQuery(); + + try (InputStream is = t.getRequestBody(); + OutputStream os = t.getResponseBody()) { + + byte[] bytes = is.readAllBytes(); + System.out.println("Server " + t.getLocalAddress() + " received:\n" + + t.getRequestURI() + ": " + new String(bytes, StandardCharsets.UTF_8)); + t.getResponseHeaders().setHeader("X-Connection-Key", t.getConnectionKey()); + + if (bytes.length == 0) bytes = "no request body!".getBytes(StandardCharsets.UTF_8); + int window = Integer.getInteger("jdk.httpclient.windowsize", 2 * 16 * 1024); + final int maxChunkSize; + if (t instanceof FCHttp2TestExchange) { + maxChunkSize = Math.min(window, ((FCHttp2TestExchange)t).conn.getMaxFrameSize()); + } else { + maxChunkSize = Math.min(window, SettingsFrame.MAX_FRAME_SIZE); + } + byte[] resp = bytes.length <= maxChunkSize + ? bytes + : Arrays.copyOfRange(bytes, 0, maxChunkSize); + int max = (window / resp.length) + 2; + // send in chunks + t.sendResponseHeaders(200, 0); + for (int i = 0; i <= max; i++) { + if (t instanceof FCHttp2TestExchange) { + try { + // we don't wait for the stream window, but we want + // to wait for the connection window + ((FCHttp2TestExchange)t).conn.obtainConnectionWindow(resp.length); + } catch (InterruptedException ie) { + // ignore and continue... + } + } + ((BodyOutputStream) os).writeUncontrolled(resp, 0, resp.length); + } + } + if (t instanceof FCHttp2TestExchange) { + ((FCHttp2TestExchange)t).responseSent(query); + } else fail("Exchange is not " + FCHttp2TestExchange.class.getName() + + " but " + t.getClass().getName()); + } + } + + // A custom Http2TestExchangeImpl that overrides sendResponseHeaders to + // allow headers to be sent with a number of CONTINUATION frames. + static class FCHttp2TestExchange extends Http2TestExchangeImpl { + static volatile Consumer<String> responseSentCB; + static void setResponseSentCB(Consumer<String> responseSentCB) { + FCHttp2TestExchange.responseSentCB = responseSentCB; + } + + final Http2TestServerConnection conn; + FCHttp2TestExchange(int streamid, String method, HttpHeaders reqheaders, + HttpHeadersBuilder rspheadersBuilder, URI uri, InputStream is, + SSLSession sslSession, BodyOutputStream os, + Http2TestServerConnection conn, boolean pushAllowed) { + super(streamid, method, reqheaders, rspheadersBuilder, uri, is, sslSession, os, conn, pushAllowed); + this.conn = conn; + } + public void responseSent(String query) { + System.out.println("Server: response sent for " + query); + responseSentCB.accept(query); + } + + } +} diff --git a/test/jdk/java/net/httpclient/http2/server/BodyOutputStream.java b/test/jdk/java/net/httpclient/http2/server/BodyOutputStream.java index d08495e709d..a744a48cecb 100644 --- a/test/jdk/java/net/httpclient/http2/server/BodyOutputStream.java +++ b/test/jdk/java/net/httpclient/http2/server/BodyOutputStream.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2016, 2024, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -23,6 +23,7 @@ import java.io.*; import java.nio.ByteBuffer; +import java.util.Objects; import jdk.internal.net.http.frame.DataFrame; @@ -60,6 +61,10 @@ void waitForWindow(int demand) throws InterruptedException { // first wait for the connection window conn.obtainConnectionWindow(demand); // now wait for the stream window + waitForStreamWindow(demand); + } + + public void waitForStreamWindow(int demand) throws InterruptedException { synchronized (this) { while (demand > 0) { int n = Math.min(demand, window); @@ -78,6 +83,7 @@ void goodToGo() { @Override public void write(byte[] buf, int offset, int len) throws IOException { + Objects.checkFromIndexSize(offset, len, buf.length); if (closed) { throw new IOException("closed"); } @@ -99,6 +105,34 @@ public void write(byte[] buf, int offset, int len) throws IOException { } } + /** + * This method pushes frames onto the stack without checking + * for flow control, allowing the sender to bypass flow + * control for testing purposes + * @param buf data to send + * @param offset offset at which the data starts + * @param len length of the data to send + * @throws IOException if an I/O error occurs + */ + public void writeUncontrolled(byte[] buf, int offset, int len) + throws IOException { + Objects.checkFromIndexSize(offset, len, buf.length); + if (closed) { + throw new IOException("closed"); + } + + if (!goodToGo) { + throw new IllegalStateException("sendResponseHeaders must be called first"); + } + int max = conn.getMaxFrameSize(); + while (len > 0) { + int n = len > max ? max : len; + send(buf, offset, n, 0); + offset += n; + len -= n; + } + } + private void send(byte[] buf, int offset, int len, int flags) throws IOException { ByteBuffer buffer = ByteBuffer.allocate(len); buffer.put(buf, offset, len); diff --git a/test/jdk/java/net/httpclient/http2/server/Http2TestExchange.java b/test/jdk/java/net/httpclient/http2/server/Http2TestExchange.java index 3f068440b46..232ed554cd0 100644 --- a/test/jdk/java/net/httpclient/http2/server/Http2TestExchange.java +++ b/test/jdk/java/net/httpclient/http2/server/Http2TestExchange.java @@ -81,5 +81,11 @@ default void sendFrames(List<Http2Frame> frames) throws IOException { * with the number of milliseconds it took to get a valid response. * It may also complete exceptionally */ + CompletableFuture<Long> sendPing(); + /** + * {@return the identification of the connection on which this exchange is being + * processed} + */ + String getConnectionKey(); } diff --git a/test/jdk/java/net/httpclient/http2/server/Http2TestExchangeImpl.java b/test/jdk/java/net/httpclient/http2/server/Http2TestExchangeImpl.java index f4c6bdbf648..c257b5d4414 100644 --- a/test/jdk/java/net/httpclient/http2/server/Http2TestExchangeImpl.java +++ b/test/jdk/java/net/httpclient/http2/server/Http2TestExchangeImpl.java @@ -214,6 +214,11 @@ public void serverPush(URI uri, HttpHeaders headers, InputStream content) { } } + @Override + public String getConnectionKey() { + return conn.connectionKey(); + } + private boolean isHeadRequest() { return HEAD.equalsIgnoreCase(getRequestMethod()); } diff --git a/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java b/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java index 7e45743934d..e57a1421ed6 100644 --- a/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java +++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java @@ -630,6 +630,10 @@ void createPrimordialStream(Http1InitialRequest request) throws IOException { }); } + final String connectionKey() { + return this.server.getAddress() + "->" + this.socket.getRemoteSocketAddress(); + } + // all other streams created here @SuppressWarnings({"rawtypes","unchecked"}) void createStream(HeaderFrame frame) throws IOException { @@ -1286,7 +1290,7 @@ void registerStreamWindowUpdater(int streamid, Consumer<Integer> r) { * * @param amount */ - synchronized void obtainConnectionWindow(int amount) throws InterruptedException { + public synchronized void obtainConnectionWindow(int amount) throws InterruptedException { while (amount > 0) { int n = Math.min(amount, sendWindow); amount -= n; @@ -1296,9 +1300,13 @@ synchronized void obtainConnectionWindow(int amount) throws InterruptedException } } - synchronized void updateConnectionWindow(int amount) { - sendWindow += amount; - notifyAll(); + void updateConnectionWindow(int amount) { + System.out.printf("sendWindow (window=%s, amount=%s) is now: %s%n", + sendWindow, amount, sendWindow + amount); + synchronized (this) { + sendWindow += amount; + notifyAll(); + } } // simplified output headers class. really just a type safe container