diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Http2ClientImpl.java b/src/java.net.http/share/classes/jdk/internal/net/http/Http2ClientImpl.java
index 0f2db7738fc..fa30ed49a74 100644
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2ClientImpl.java
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2ClientImpl.java
@@ -42,6 +42,8 @@
 import jdk.internal.net.http.common.MinimalFuture;
 import jdk.internal.net.http.common.Utils;
 import jdk.internal.net.http.frame.SettingsFrame;
+
+import static jdk.internal.net.http.frame.SettingsFrame.INITIAL_CONNECTION_WINDOW_SIZE;
 import static jdk.internal.net.http.frame.SettingsFrame.INITIAL_WINDOW_SIZE;
 import static jdk.internal.net.http.frame.SettingsFrame.ENABLE_PUSH;
 import static jdk.internal.net.http.frame.SettingsFrame.HEADER_TABLE_SIZE;
@@ -247,9 +249,13 @@ int getConnectionWindowSize(SettingsFrame clientSettings) {
         int defaultValue = Math.min(Integer.MAX_VALUE,
                 Math.max(streamWindow, K*K*32));
 
+        // The min value is the max between the streamWindow and
+        // the initial connection window size
+        int minValue = Math.max(INITIAL_CONNECTION_WINDOW_SIZE, streamWindow);
+
         return getParameter(
                 "jdk.httpclient.connectionWindowSize",
-                streamWindow, Integer.MAX_VALUE, defaultValue);
+                minValue, Integer.MAX_VALUE, defaultValue);
     }
 
     /**
diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java
index 2953b297561..bcc72f3682a 100644
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java
@@ -969,6 +969,34 @@ private String checkMaxOrphanedHeadersExceeded(HeaderFrame hf) {
         return null;
     }
 
+    // This method is called when a DataFrame that was added
+    // to a Stream::inputQ is later dropped from the queue
+    // without being consumed.
+    //
+    // Before adding a frame to the queue, the Stream calls
+    // connection.windowUpdater.canBufferUnprocessedBytes(), which
+    // increases the count of unprocessed bytes in the connection.
+    // After consuming the frame, it calls connection.windowUpdater::processed,
+    // which decrements the count of unprocessed bytes, and possibly
+    // sends a window update to the peer.
+    //
+    // This method is called when connection.windowUpdater::processed
+    // will not be called, which can happen when consuming the frame
+    // fails, or when an empty DataFrame terminates the stream,
+    // or when the stream is cancelled while data is still
+    // sitting in its inputQ. In the later case, it is called for
+    // each frame that is dropped from the queue.
+    final void releaseUnconsumed(DataFrame df) {
+        windowUpdater.released(df.payloadLength());
+        dropDataFrame(df);
+    }
+
+    // This method can be called directly when a DataFrame is dropped
+    // before/without having been added to any Stream::inputQ.
+    // In that case, the number of unprocessed bytes hasn't been incremented
+    // by the stream, and does not need to be decremented.
+    // Otherwise, if the frame is dropped after having been added to the
+    // inputQ, releaseUnconsumed above should be called.
     final void dropDataFrame(DataFrame df) {
         if (closed) return;
         if (debug.on()) {
@@ -1087,6 +1115,10 @@ private void handleConnectionFrame(Http2Frame frame)
         }
     }
 
+    boolean isOpen() {
+        return !closed && connection.channel().isOpen();
+    }
+
     void resetStream(int streamid, int code) {
         try {
             if (connection.channel().isOpen()) {
@@ -1277,11 +1309,12 @@ private void sendConnectionPreface() throws IOException {
         // Note that the default initial window size, not to be confused
         // with the initial window size, is defined by RFC 7540 as
         // 64K -1.
-        final int len = windowUpdater.initialWindowSize - DEFAULT_INITIAL_WINDOW_SIZE;
-        if (len != 0) {
+        final int len = windowUpdater.initialWindowSize - INITIAL_CONNECTION_WINDOW_SIZE;
+        assert len >= 0;
+        if (len > 0) {
             if (Log.channel()) {
                 Log.logChannel("Sending initial connection window update frame: {0} ({1} - {2})",
-                        len, windowUpdater.initialWindowSize, DEFAULT_INITIAL_WINDOW_SIZE);
+                        len, windowUpdater.initialWindowSize, INITIAL_CONNECTION_WINDOW_SIZE);
             }
             windowUpdater.sendWindowUpdate(len);
         }
@@ -1658,6 +1691,19 @@ public ConnectionWindowUpdateSender(Http2Connection connection,
         int getStreamId() {
             return 0;
         }
+
+        @Override
+        protected boolean windowSizeExceeded(long received) {
+            if (connection.isOpen()) {
+                try {
+                    connection.protocolError(ErrorFrame.FLOW_CONTROL_ERROR,
+                            "connection window exceeded");
+                } catch (IOException io) {
+                    connection.shutdown(io);
+                }
+            }
+            return true;
+        }
     }
 
     /**
diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java
index f97632c12a1..9ed28508bae 100644
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java
@@ -163,7 +163,7 @@ class Stream<T> extends ExchangeImpl<T> {
      * sending any data. Will be null for PushStreams, as they cannot send data.
      */
     private final WindowController windowController;
-    private final WindowUpdateSender windowUpdater;
+    private final WindowUpdateSender streamWindowUpdater;
 
     @Override
     HttpConnection connection() {
@@ -203,7 +203,8 @@ private void schedule() {
                 int size = Utils.remaining(dsts, Integer.MAX_VALUE);
                 if (size == 0 && finished) {
                     inputQ.remove();
-                    connection.ensureWindowUpdated(df); // must update connection window
+                    // consumed will not be called
+                    connection.releaseUnconsumed(df); // must update connection window
                     Log.logTrace("responseSubscriber.onComplete");
                     if (debug.on()) debug.log("incoming: onComplete");
                     sched.stop();
@@ -219,7 +220,11 @@ private void schedule() {
                     try {
                         subscriber.onNext(dsts);
                     } catch (Throwable t) {
-                        connection.dropDataFrame(df); // must update connection window
+                        // Data frames that have been added to the inputQ
+                        // must be released using releaseUnconsumed() to
+                        // account for the amount of unprocessed bytes
+                        // tracked by the connection.windowUpdater.
+                        connection.releaseUnconsumed(df);
                         throw t;
                     }
                     if (consumed(df)) {
@@ -272,7 +277,11 @@ private void drainInputQueue() {
         Http2Frame frame;
         while ((frame = inputQ.poll()) != null) {
             if (frame instanceof DataFrame) {
-                connection.dropDataFrame((DataFrame)frame);
+                // Data frames that have been added to the inputQ
+                // must be released using releaseUnconsumed() to
+                // account for the amount of unprocessed bytes
+                // tracked by the connection.windowUpdater.
+                connection.releaseUnconsumed((DataFrame)frame);
             }
         }
     }
@@ -297,12 +306,13 @@ private boolean consumed(DataFrame df) {
         boolean endStream = df.getFlag(DataFrame.END_STREAM);
         if (len == 0) return endStream;
 
-        connection.windowUpdater.update(len);
-
+        connection.windowUpdater.processed(len);
         if (!endStream) {
+            streamWindowUpdater.processed(len);
+        } else {
             // Don't send window update on a stream which is
             // closed or half closed.
-            windowUpdater.update(len);
+            streamWindowUpdater.released(len);
         }
 
         // true: end of stream; false: more data coming
@@ -343,8 +353,21 @@ public String toString() {
     }
 
     private void receiveDataFrame(DataFrame df) {
-        inputQ.add(df);
-        sched.runOrSchedule();
+        try {
+            int len = df.payloadLength();
+            if (len > 0) {
+                // we return from here if the connection is being closed.
+                if (!connection.windowUpdater.canBufferUnprocessedBytes(len)) return;
+                // we return from here if the stream is being closed.
+                if (closed || !streamWindowUpdater.canBufferUnprocessedBytes(len)) {
+                    connection.releaseUnconsumed(df);
+                    return;
+                }
+            }
+            inputQ.add(df);
+        } finally {
+            sched.runOrSchedule();
+        }
     }
 
     /** Handles a RESET frame. RESET is always handled inline in the queue. */
@@ -429,7 +452,7 @@ CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
         this.responseHeadersBuilder = new HttpHeadersBuilder();
         this.rspHeadersConsumer = new HeadersConsumer();
         this.requestPseudoHeaders = createPseudoHeaders(request);
-        this.windowUpdater = new StreamWindowUpdateSender(connection);
+        this.streamWindowUpdater = new StreamWindowUpdateSender(connection);
     }
 
     /**
@@ -1281,12 +1304,18 @@ void cancel(IOException cause) {
 
     @Override
     void onProtocolError(final IOException cause) {
+        onProtocolError(cause, ResetFrame.PROTOCOL_ERROR);
+    }
+
+    void onProtocolError(final IOException cause, int code) {
         if (debug.on()) {
-            debug.log("cancelling exchange on stream %d due to protocol error: %s", streamid, cause.getMessage());
+            debug.log("cancelling exchange on stream %d due to protocol error [%s]: %s",
+                    streamid, ErrorFrame.stringForCode(code),
+                    cause.getMessage());
         }
         Log.logError("cancelling exchange on stream {0} due to protocol error: {1}\n", streamid, cause);
         // send a RESET frame and close the stream
-        cancelImpl(cause, ResetFrame.PROTOCOL_ERROR);
+        cancelImpl(cause, code);
     }
 
     void connectionClosing(Throwable cause) {
@@ -1554,6 +1583,14 @@ String dbgString() {
                 return dbgString = dbg;
             }
         }
+
+        @Override
+        protected boolean windowSizeExceeded(long received) {
+            onProtocolError(new ProtocolException("stream " + streamid +
+                            " flow control window exceeded"),
+                    ResetFrame.FLOW_CONTROL_ERROR);
+            return true;
+        }
     }
 
     /**
diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java b/src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java
index e85b92786f6..b92cb5358d7 100644
--- a/src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -31,15 +31,30 @@
 import jdk.internal.net.http.frame.WindowUpdateFrame;
 import jdk.internal.net.http.common.Utils;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
+/**
+  * A class that tracks the amount of flow controlled
+  * data received on an HTTP/2 connection
+  */
 abstract class WindowUpdateSender {
 
     final Logger debug =
             Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
 
+    // The threshold at which window updates are sent in bytes
     final int limit;
+    // The flow control window in bytes
+    final int windowSize;
     final Http2Connection connection;
-    final AtomicInteger received = new AtomicInteger(0);
+    // The amount of flow controlled data received and processed, in bytes,
+    // since the start of the window.
+    // The window is exhausted when received + unprocessed >= windowSize
+    final AtomicLong received = new AtomicLong();
+    // The amount of flow controlled data received and unprocessed, in bytes,
+    // since the start of the window.
+    // The window is exhausted when received + unprocessed >= windowSize
+    final AtomicLong unprocessed = new AtomicLong();
 
     WindowUpdateSender(Http2Connection connection) {
         this(connection, connection.clientSettings.getParameter(SettingsFrame.INITIAL_WINDOW_SIZE));
@@ -51,6 +66,7 @@ abstract class WindowUpdateSender {
 
     WindowUpdateSender(Http2Connection connection, int maxFrameSize, int initWindowSize) {
         this.connection = connection;
+        this.windowSize = initWindowSize;
         int v0 = Math.max(0, initWindowSize - maxFrameSize);
         int v1 = (initWindowSize + (maxFrameSize - 1)) / maxFrameSize;
         v1 = v1 * maxFrameSize / 2;
@@ -64,15 +80,118 @@ abstract class WindowUpdateSender {
                       maxFrameSize, initWindowSize, limit);
     }
 
+    // O for the connection window, > 0 for a stream window
     abstract int getStreamId();
 
+
+    /**
+     * {@return {@code true} if buffering the given amount of
+     * flow controlled data would not exceed the flow control
+     * window}
+     * <p>
+     * This method is called before buffering and processing
+     * a DataFrame. The count of unprocessed bytes is incremented
+     * by the given amount, and checked against the number of
+     * available bytes in the flow control window.
+     * <p>
+     * This method returns {@code true} if the bytes can be buffered
+     * without exceeding the flow control window, {@code false}
+     * if the flow control window is exceeded and corrective
+     * action (close/reset) has been taken.
+     * <p>
+     * When this method returns true, either {@link #processed(int)}
+     * or {@link #released(int)} must eventually be called to release
+     * the bytes from the flow control window.
+     *
+     * @implSpec
+     * an HTTP/2 endpoint may disable its own flow control
+     * (see <a href="https://www.rfc-editor.org/rfc/rfc9113.html#section-5.2.1">
+     *     RFC 9113, section 5.2.1</a>), in which case this
+     * method may return true even if the flow control window would
+     * be exceeded: that is, the flow control window is exceeded but
+     * the endpoint decided to take no corrective action.
+     *
+     * @param  len a number of unprocessed bytes, which
+     *             the caller wants to buffer.
+     */
+    boolean canBufferUnprocessedBytes(int len) {
+        return !checkWindowSizeExceeded(unprocessed.addAndGet(len));
+    }
+
+    // adds the provided amount to the amount of already
+    // received and processed bytes and checks whether the
+    // flow control window is exceeded. If so, take
+    // corrective actions and return true.
+    private boolean checkWindowSizeExceeded(long len) {
+        // because windowSize is bound by Integer.MAX_VALUE
+        // we will never reach the point where received.get() + len
+        // could overflow
+        long rcv = received.get() + len;
+        return rcv > windowSize && windowSizeExceeded(rcv);
+    }
+
+    /**
+     * Called after unprocessed buffered bytes have been
+     * processed, to release part of the flow control window
+     *
+     * @apiNote this method is called only when releasing bytes
+     * that where buffered after calling
+     * {@link #canBufferUnprocessedBytes(int)}.
+     *
+     * @param delta the amount of processed bytes to release
+     */
+    void processed(int delta) {
+        long rest = unprocessed.addAndGet(-delta);
+        assert rest >= 0;
+        update(delta);
+    }
+
+    /**
+     * Called when it is desired to release unprocessed bytes
+     * without processing them, or without triggering the
+     * sending of a window update. This method can be called
+     * instead of calling {@link #processed(int)}.
+     * When this method is called instead of calling {@link #processed(int)},
+     * it should generally be followed by a call to {@link #update(int)},
+     * unless the stream or connection is being closed.
+     *
+     * @apiNote this method should only be called to release bytes that
+     * have been buffered after calling {@link
+     * #canBufferUnprocessedBytes(int)}.
+     *
+     * @param delta the amount of bytes to release from the window
+     *
+     * @return the amount of remaining unprocessed bytes
+     */
+    long released(int delta) {
+        long rest = unprocessed.addAndGet(-delta);
+        assert rest >= 0;
+        return rest;
+    }
+
+    /**
+     * This method is called to update the flow control window,
+     * and possibly send a window update
+     *
+     * @apiNote this method can be called directly if a frame is
+     * dropped before calling {@link #canBufferUnprocessedBytes(int)}.
+     * Otherwise, either {@link #processed(int)} or {@link #released(int)}
+     * should be called, depending on whether sending a window update
+     * is desired or not. It is typically not desired to send an update
+     * if the stream or connection is being closed.
+     *
+     * @param delta the amount of bytes released from the window.
+     */
     void update(int delta) {
-        int rcv = received.addAndGet(delta);
+        long rcv = received.addAndGet(delta);
         if (debug.on()) debug.log("update: %d, received: %d, limit: %d", delta, rcv, limit);
+        if (rcv > windowSize && windowSizeExceeded(rcv)) {
+            return;
+        }
         if (rcv > limit) {
             synchronized (this) {
-                int tosend = received.get();
-                if( tosend > limit) {
+                int tosend = (int)Math.min(received.get(), Integer.MAX_VALUE);
+                if (tosend > limit) {
                     received.getAndAdd(-tosend);
                     sendWindowUpdate(tosend);
                 }
@@ -82,6 +201,7 @@ void update(int delta) {
 
     void sendWindowUpdate(int delta) {
         if (debug.on()) debug.log("sending window update: %d", delta);
+        assert delta > 0 : "illegal window update delta: " + delta;
         connection.sendUnorderedFrame(new WindowUpdateFrame(getStreamId(), delta));
     }
 
@@ -99,4 +219,16 @@ String dbgString() {
         }
     }
 
+    /**
+     * Called when the flow control window size is exceeded
+     * This method may return false if flow control is disabled
+     * in this endpoint.
+     *
+     * @param received the amount of data received, which is greater
+     *                 than {@code windowSize}
+     * @return {@code true} if the error was reported to the peer
+     *         and no further window update should be sent.
+     */
+    protected abstract boolean windowSizeExceeded(long received);
+
 }
diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/frame/FramesDecoder.java b/src/java.net.http/share/classes/jdk/internal/net/http/frame/FramesDecoder.java
index 72c7750eb43..7ebfa090830 100644
--- a/src/java.net.http/share/classes/jdk/internal/net/http/frame/FramesDecoder.java
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/frame/FramesDecoder.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -463,6 +463,16 @@ private Http2Frame parseSettingsFrame(int frameLength, int streamid, int flags)
             int val = getInt();
             if (id > 0 && id <= SettingsFrame.MAX_PARAM) {
                 // a known parameter. Ignore otherwise
+                if (id == SettingsFrame.INITIAL_WINDOW_SIZE && val < 0) {
+                    return new MalformedFrame(ErrorFrame.FLOW_CONTROL_ERROR,
+                            "SettingsFrame with INITIAL_WINDOW_SIZE > 2^31 -1: "
+                                    + (val & 0xffffffffL));
+                }
+                if (id == SettingsFrame.MAX_FRAME_SIZE && (val < 16384 || val > 16777215)) {
+                    return new MalformedFrame(ErrorFrame.PROTOCOL_ERROR,
+                            "SettingsFrame with MAX_FRAME_SIZE out of range: "
+                                    + (val & 0xffffffffL));
+                }
                 sf.setParameter(id, val); // TODO parameters validation
             }
         }
@@ -530,7 +540,12 @@ private Http2Frame parseWindowUpdateFrame(int frameLength, int streamid, int fla
             return new MalformedFrame(ErrorFrame.FRAME_SIZE_ERROR,
                     "WindowUpdateFrame length is "+ frameLength+", expected 4");
         }
-        return new WindowUpdateFrame(streamid, getInt() & 0x7fffffff);
+        int update = getInt();
+        if (update < 0) {
+            return new MalformedFrame(ErrorFrame.FLOW_CONTROL_ERROR,
+                    "WindowUpdateFrame with value > 2^31 -1 " + (update & 0xffffffffL));
+        }
+        return new WindowUpdateFrame(streamid, update & 0x7fffffff);
     }
 
     private Http2Frame parseContinuationFrame(int frameLength, int streamid, int flags) {
diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/frame/SettingsFrame.java b/src/java.net.http/share/classes/jdk/internal/net/http/frame/SettingsFrame.java
index eef9f615bb3..ac306bd4551 100644
--- a/src/java.net.http/share/classes/jdk/internal/net/http/frame/SettingsFrame.java
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/frame/SettingsFrame.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -172,6 +172,11 @@ public synchronized void update(SettingsFrame updated) {
     // The initial value is 2^14 (16,384) octets.
     public static final int DEFAULT_MAX_FRAME_SIZE = 16 * K;
 
+    // Initial connection window size. This cannot be updated using the
+    // SETTINGS frame.
+    public static final int INITIAL_CONNECTION_WINDOW_SIZE = DEFAULT_INITIAL_WINDOW_SIZE;
+
+
     public static SettingsFrame defaultRFCSettings() {
         SettingsFrame f = new SettingsFrame();
         f.setParameter(ENABLE_PUSH, DEFAULT_ENABLE_PUSH);
diff --git a/test/jdk/java/net/httpclient/ProxySelectorTest.java b/test/jdk/java/net/httpclient/ProxySelectorTest.java
index c4bbad6ebf8..de1855277d3 100644
--- a/test/jdk/java/net/httpclient/ProxySelectorTest.java
+++ b/test/jdk/java/net/httpclient/ProxySelectorTest.java
@@ -385,7 +385,7 @@ public void setup() throws Exception {
     public void teardown() throws Exception {
         client = null;
         Thread.sleep(100);
-        AssertionError fail = TRACKER.check(500);
+        AssertionError fail = TRACKER.check(1500);
         try {
             proxy.stop();
             authproxy.stop();
diff --git a/test/jdk/java/net/httpclient/http2/ConnectionFlowControlTest.java b/test/jdk/java/net/httpclient/http2/ConnectionFlowControlTest.java
new file mode 100644
index 00000000000..c60fd0c944f
--- /dev/null
+++ b/test/jdk/java/net/httpclient/http2/ConnectionFlowControlTest.java
@@ -0,0 +1,375 @@
+/*
+ * Copyright (c) 2024, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * @test
+ * @bug 8342075
+ * @summary checks connection flow control
+ * @library /test/lib server/ ../
+ * @build jdk.test.lib.net.SimpleSSLContext HttpServerAdapters
+ *        Http2TestServer
+ * @modules java.net.http/jdk.internal.net.http.common
+ *          java.net.http/jdk.internal.net.http.frame
+ *          java.net.http/jdk.internal.net.http.hpack
+ *          java.logging
+ *          java.base/sun.net.www.http
+ *          java.base/sun.net.www
+ *          java.base/sun.net
+ * @run testng/othervm  -Djdk.internal.httpclient.debug=true
+ *                      -Djdk.httpclient.connectionWindowSize=65535
+ *                      -Djdk.httpclient.windowsize=16384
+ *                      ConnectionFlowControlTest
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.ProtocolException;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpHeaders;
+import java.net.http.HttpRequest;
+import java.net.http.HttpRequest.BodyPublishers;
+import java.net.http.HttpResponse;
+import java.net.http.HttpResponse.BodyHandler;
+import java.net.http.HttpResponse.BodyHandlers;
+import java.net.http.HttpResponse.BodySubscriber;
+import java.net.http.HttpResponse.ResponseInfo;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSession;
+
+import jdk.internal.net.http.common.HttpHeadersBuilder;
+import jdk.internal.net.http.frame.ContinuationFrame;
+import jdk.internal.net.http.frame.HeaderFrame;
+import jdk.internal.net.http.frame.HeadersFrame;
+import jdk.internal.net.http.frame.Http2Frame;
+import jdk.internal.net.http.frame.SettingsFrame;
+import jdk.test.lib.Utils;
+import jdk.test.lib.net.SimpleSSLContext;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static java.util.List.of;
+import static java.util.Map.entry;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+public class ConnectionFlowControlTest {
+
+    SSLContext sslContext;
+    HttpServerAdapters.HttpTestServer http2TestServer;   // HTTP/2 ( h2c )
+    HttpServerAdapters.HttpTestServer https2TestServer;  // HTTP/2 ( h2  )
+    String http2URI;
+    String https2URI;
+    final AtomicInteger reqid = new AtomicInteger();
+
+
+    @DataProvider(name = "variants")
+    public Object[][] variants() {
+        return new Object[][] {
+                { http2URI },
+                { https2URI },
+        };
+    }
+
+    @Test(dataProvider = "variants")
+    void test(String uri) throws Exception {
+        System.out.printf("%ntesting %s%n", uri);
+        ConcurrentHashMap<String, CompletableFuture<String>> responseSent = new ConcurrentHashMap<>();
+        ConcurrentHashMap<String, HttpResponse<InputStream>> responses = new ConcurrentHashMap<>();
+        FCHttp2TestExchange.setResponseSentCB((s) -> responseSent.get(s).complete(s));
+        int connectionWindowSize = Math.max(Integer.getInteger(
+                "jdk.httpclient.connectionWindowSize", 65535), 65535);
+        int windowSize = Math.max(Integer.getInteger(
+                "jdk.httpclient.windowsize", 65535), 16384);
+        int max = connectionWindowSize / windowSize + 2;
+        System.out.printf("connection window: %s, stream window: %s, will make %s requests%n",
+                connectionWindowSize, windowSize, max);
+
+        String label = null;
+
+        Throwable t = null;
+        HttpClient client = HttpClient.newBuilder().executor(Executors.newCachedThreadPool()).sslContext(sslContext).build();
+        try {
+            try {
+                String[] keys = new String[max];
+                for (int i = 0; i < max; i++) {
+                    String query = "reqId=" + reqid.incrementAndGet();
+                    keys[i] = query;
+                    URI uriWithQuery = URI.create(uri + "?" + query);
+                    CompletableFuture<String> sent = new CompletableFuture<>();
+                    responseSent.put(query, sent);
+                    HttpRequest request = HttpRequest.newBuilder(uriWithQuery)
+                            .POST(BodyPublishers.ofString("Hello there!"))
+                            .build();
+                    System.out.println("\nSending request:" + uriWithQuery);
+                    final HttpClient cc = client;
+                    var response = cc.send(request, BodyHandlers.ofInputStream());
+                    responses.put(query, response);
+                    String ckey = response.headers().firstValue("X-Connection-Key").get();
+                    if (label == null) label = ckey;
+                    try {
+                        if (i < max - 1) {
+                            // the connection window might be exceeded at i == max - 2, which
+                            // means that the last request could go on a new connection.
+                            assertEquals(ckey, label, "Unexpected key for " + query);
+                        }
+                    } catch (AssertionError ass) {
+                        // since we won't pull all responses, the client
+                        // will not exit unless we ask it to shutdown now.
+                        ExecutorService exec = (ExecutorService)client.executor().get();
+                        exec.shutdownNow();
+                        throw ass;
+                    }
+                }
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException ie) {
+                    // ignore
+                }
+                CompletableFuture<?> allsent = CompletableFuture.allOf(responseSent.values().stream()
+                        .toArray(CompletableFuture<?>[]::new));
+                allsent.get();
+                for (int i = 0; i < max; i++) {
+                    try {
+                        String query = keys[i];
+                        var response = responses.get(keys[i]);
+                        String ckey = response.headers().firstValue("X-Connection-Key").get();
+                        if (label == null) label = ckey;
+                        assertEquals(ckey, label, "Unexpected key for " + query);
+                        int wait = uri.startsWith("https://") ? 500 : 250;
+                        try (InputStream is = response.body()) {
+                            Thread.sleep(Utils.adjustTimeout(wait));
+                            is.readAllBytes();
+                        }
+                        System.out.printf("%s did not fail: %s%n", query, response.statusCode());
+                    } catch (AssertionError t1) {
+                        // since we won't pull all responses, the client
+                        // will not exit unless we ask it to shutdown now.
+                        ExecutorService exec = (ExecutorService)client.executor().get();
+                        exec.shutdownNow();
+                        throw t1;
+                    } catch (Throwable t0) {
+                        System.out.println("Got EXPECTED: " + t0);
+                        if (t0 instanceof ExecutionException) {
+                            t0 = t0.getCause();
+                        }
+                        t = t0;
+                        try {
+                            assertDetailMessage(t0, i);
+                        } catch (AssertionError e) {
+                            // since we won't pull all responses, the client
+                            // will not exit unless we ask it to shutdown now.
+                            ExecutorService exec = (ExecutorService)client.executor().get();
+                            exec.shutdownNow();
+                            throw e;
+                        }
+                    }
+                }
+            } catch (Throwable t0) {
+                System.out.println("Got EXPECTED: " + t0);
+                if (t0 instanceof ExecutionException) {
+                    t0 = t0.getCause();
+                }
+                t = t0;
+            }
+            if (t == null) {
+                // we could fail here if we haven't waited long enough
+                fail("Expected exception, got all responses, should sleep time be raised?");
+            } else {
+                assertDetailMessage(t, max);
+            }
+            String query = "reqId=" + reqid.incrementAndGet();
+            URI uriWithQuery = URI.create(uri + "?" + query);
+            CompletableFuture<String> sent = new CompletableFuture<>();
+            responseSent.put(query, sent);
+            HttpRequest request = HttpRequest.newBuilder(uriWithQuery)
+                    .POST(BodyPublishers.ofString("Hello there!"))
+                    .build();
+            System.out.println("\nSending last request:" + uriWithQuery);
+            var response = client.send(request, BodyHandlers.ofString());
+            if (label != null) {
+                String ckey = response.headers().firstValue("X-Connection-Key").get();
+                assertNotEquals(ckey, label);
+                System.out.printf("last request %s sent on different connection as expected:" +
+                        "\n\tlast: %s\n\tprevious: %s%n", query, ckey, label);
+            }
+        } finally {
+            ExecutorService exec = (ExecutorService)client.executor().get();
+            exec.shutdownNow();
+        }
+    }
+
+    // Assertions based on implementation specific detail messages. Keep in
+    // sync with implementation.
+    static void assertDetailMessage(Throwable throwable, int iterationIndex) {
+        try {
+            Throwable cause = throwable;
+            while (cause != null) {
+                if (cause instanceof ProtocolException) {
+                    if (cause.getMessage().contains("connection window exceeded")) {
+                       System.out.println("Found expected exception: " + cause);
+                       return;
+                    }
+                }
+                cause = cause.getCause();
+            }
+            throw new AssertionError(
+                    "ProtocolException(\"protocol error: connection window exceeded\") not found",
+                             throwable);
+        } catch (AssertionError e) {
+            System.out.println("Exception does not match expectation: " + throwable);
+            throwable.printStackTrace(System.out);
+            throw e;
+        }
+    }
+
+    @BeforeTest
+    public void setup() throws Exception {
+        sslContext = new SimpleSSLContext().get();
+        if (sslContext == null)
+            throw new AssertionError("Unexpected null sslContext");
+
+        var http2TestServer = new Http2TestServer("localhost", false, 0);
+        http2TestServer.addHandler(new Http2TestHandler(), "/http2/");
+        this.http2TestServer = HttpServerAdapters.HttpTestServer.of(http2TestServer);
+        http2URI = "http://" + this.http2TestServer.serverAuthority() + "/http2/x";
+
+        var https2TestServer = new Http2TestServer("localhost", true, sslContext);
+        https2TestServer.addHandler(new Http2TestHandler(), "/https2/");
+        this.https2TestServer = HttpServerAdapters.HttpTestServer.of(https2TestServer);
+        https2URI = "https://" + this.https2TestServer.serverAuthority() + "/https2/x";
+
+        // Override the default exchange supplier with a custom one to enable
+        // particular test scenarios
+        http2TestServer.setExchangeSupplier(FCHttp2TestExchange::new);
+        https2TestServer.setExchangeSupplier(FCHttp2TestExchange::new);
+
+        this.http2TestServer.start();
+        this.https2TestServer.start();
+    }
+
+    @AfterTest
+    public void teardown() throws Exception {
+        http2TestServer.stop();
+        https2TestServer.stop();
+    }
+
+    static class Http2TestHandler implements Http2Handler {
+
+        @Override
+        public void handle(Http2TestExchange t) throws IOException {
+            String query = t.getRequestURI().getRawQuery();
+
+            try (InputStream is = t.getRequestBody();
+                 OutputStream os = t.getResponseBody()) {
+
+                byte[] bytes = is.readAllBytes();
+                System.out.println("Server " + t.getLocalAddress() + " received:\n"
+                        + t.getRequestURI() + ": " + new String(bytes, StandardCharsets.UTF_8));
+                t.getResponseHeaders().setHeader("X-Connection-Key", t.getConnectionKey());
+
+                if (bytes.length == 0) bytes = "no request body!".getBytes(StandardCharsets.UTF_8);
+                int window = Math.max(16384, Integer.getInteger("jdk.httpclient.windowsize", 2*16*1024));
+                 final int maxChunkSize;
+                if (t instanceof FCHttp2TestExchange) {
+                    maxChunkSize = Math.min(window, ((FCHttp2TestExchange)t).conn.getMaxFrameSize());
+                } else {
+                    maxChunkSize = Math.min(window, SettingsFrame.MAX_FRAME_SIZE);
+                }
+                byte[] resp = bytes.length < maxChunkSize
+                        ? bytes
+                        : Arrays.copyOfRange(bytes, 0, maxChunkSize);
+                int max = (window / resp.length);
+                // send in chunks
+                t.sendResponseHeaders(200, 0);
+                int sent = 0;
+                for (int i=0; i<=max; i++) {
+                    int len = Math.min(resp.length, window - sent);
+                    if (len <= 0) break;
+                    if (os instanceof BodyOutputStream) {
+                        try {
+                            // we don't wait for the stream window, but we want
+                            // to wait for the connection window
+                            ((BodyOutputStream)os).waitForStreamWindow(len);
+                        } catch (InterruptedException ie) {
+                            // ignore and continue...
+                        }
+                    }
+                    ((BodyOutputStream) os).writeUncontrolled(resp, 0, len);
+                    sent += len;
+                }
+                if (sent != window) fail("should have sent " + window + ", sent " + sent);
+            }
+            if (t instanceof FCHttp2TestExchange) {
+                ((FCHttp2TestExchange)t).responseSent(query);
+            } else {
+                fail("Exchange is not " + FCHttp2TestExchange.class.getName() +
+                        " but " + t.getClass().getName());
+            }
+        }
+    }
+
+    // A custom Http2TestExchangeImpl that overrides sendResponseHeaders to
+    // allow headers to be sent with a number of CONTINUATION frames.
+    static class FCHttp2TestExchange extends Http2TestExchangeImpl {
+        static volatile Consumer<String> responseSentCB;
+        static void setResponseSentCB(Consumer<String> responseSentCB) {
+            FCHttp2TestExchange.responseSentCB = responseSentCB;
+        }
+
+        final Http2TestServerConnection conn;
+        FCHttp2TestExchange(int streamid, String method, HttpHeaders reqheaders,
+                             HttpHeadersBuilder rspheadersBuilder, URI uri, InputStream is,
+                             SSLSession sslSession, BodyOutputStream os,
+                             Http2TestServerConnection conn, boolean pushAllowed) {
+            super(streamid, method, reqheaders, rspheadersBuilder, uri, is, sslSession, os, conn, pushAllowed);
+            this.conn = conn;
+        }
+        public void responseSent(String query) {
+            System.out.println("Server: response sent for " + query);
+            responseSentCB.accept(query);
+        }
+
+    }
+}
diff --git a/test/jdk/java/net/httpclient/http2/StreamFlowControlTest.java b/test/jdk/java/net/httpclient/http2/StreamFlowControlTest.java
new file mode 100644
index 00000000000..dd7dbc2f5f7
--- /dev/null
+++ b/test/jdk/java/net/httpclient/http2/StreamFlowControlTest.java
@@ -0,0 +1,357 @@
+/*
+ * Copyright (c) 2024, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * @test
+ * @bug 8342075
+ * @library /test/lib server/ ../
+ * @build jdk.test.lib.net.SimpleSSLContext HttpServerAdapters
+ *        Http2TestServer
+ * @modules java.net.http/jdk.internal.net.http.common
+ *          java.net.http/jdk.internal.net.http.frame
+ *          java.net.http/jdk.internal.net.http.hpack
+ *          java.logging
+ *          java.base/sun.net.www.http
+ *          java.base/sun.net.www
+ *          java.base/sun.net
+ * @run testng/othervm  -Djdk.internal.httpclient.debug=true
+ *                      -Djdk.httpclient.connectionWindowSize=65535
+ *                      -Djdk.httpclient.windowsize=16384
+ *                      StreamFlowControlTest
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.ProtocolException;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpHeaders;
+import java.net.http.HttpRequest;
+import java.net.http.HttpRequest.BodyPublishers;
+import java.net.http.HttpResponse;
+import java.net.http.HttpResponse.BodyHandlers;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSession;
+
+import jdk.internal.net.http.common.HttpHeadersBuilder;
+import jdk.internal.net.http.frame.SettingsFrame;
+import jdk.test.lib.Utils;
+import jdk.test.lib.net.SimpleSSLContext;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+public class StreamFlowControlTest {
+
+    SSLContext sslContext;
+    HttpServerAdapters.HttpTestServer http2TestServer;   // HTTP/2 ( h2c )
+    HttpServerAdapters.HttpTestServer https2TestServer;  // HTTP/2 ( h2  )
+    String http2URI;
+    String https2URI;
+    final AtomicInteger reqid = new AtomicInteger();
+
+
+    @DataProvider(name = "variants")
+    public Object[][] variants() {
+        return new Object[][] {
+                { http2URI,  false },
+                { https2URI, false },
+                { http2URI,  true },
+                { https2URI, true },
+        };
+    }
+
+
+    @Test(dataProvider = "variants")
+    void test(String uri,
+              boolean sameClient)
+        throws Exception
+    {
+        System.out.printf("%ntesting test(%s, %s)%n", uri, sameClient);
+        ConcurrentHashMap<String, CompletableFuture<String>>  responseSent = new ConcurrentHashMap<>();
+        FCHttp2TestExchange.setResponseSentCB((s) -> responseSent.get(s).complete(s));
+
+        HttpClient client = null;
+        try {
+            int max = sameClient ? 10 : 3;
+            String label = null;
+            for (int i = 0; i < max; i++) {
+                if (!sameClient || client == null)
+                    client = HttpClient.newBuilder()
+                            .executor(Executors.newCachedThreadPool())
+                            .sslContext(sslContext).build();
+
+                String query = "reqId=" + reqid.incrementAndGet();
+                URI uriWithQuery = URI.create(uri + "?" + query);
+                CompletableFuture<String> sent = new CompletableFuture<>();
+                responseSent.put(query, sent);
+                HttpRequest request = HttpRequest.newBuilder(uriWithQuery)
+                        .POST(BodyPublishers.ofString("Hello there!"))
+                        .build();
+                System.out.println("\nSending request:" + uriWithQuery);
+                final HttpClient cc = client;
+                try {
+                    HttpResponse<InputStream> response = cc.send(request, BodyHandlers.ofInputStream());
+                    if (sameClient) {
+                        String key = response.headers().firstValue("X-Connection-Key").get();
+                        if (label == null) label = key;
+                        assertEquals(key, label, "Unexpected key for " + query);
+                    }
+                    sent.join();
+                    // we have to pull to get the exception, but slow enough
+                    // so that DataFrames are buffered up to the point that
+                    // the window is exceeded...
+                    int wait = uri.startsWith("https://") ? 500 : 350;
+                    try (InputStream is = response.body()) {
+                        Thread.sleep(Utils.adjustTimeout(wait));
+                        is.readAllBytes();
+                    }
+                    // we could fail here if we haven't waited long enough
+                    fail("Expected exception, got :" + response + ", should sleep time be raised?");
+                } catch (IOException ioe) {
+                    System.out.println("Got EXPECTED: " + ioe);
+                    assertDetailMessage(ioe, i);
+                } finally {
+                    if (!sameClient && client != null) {
+                        ExecutorService exec = (ExecutorService)client.executor().get();
+                        exec.shutdown();
+                        client = null;
+                    }
+                }
+            }
+        } finally {
+            if (sameClient && client != null) {
+                ExecutorService exec = (ExecutorService)client.executor().get();
+                exec.shutdown();
+            }
+        }
+
+    }
+
+    @Test(dataProvider = "variants")
+    void testAsync(String uri,
+                   boolean sameClient)
+    {
+        System.out.printf("%ntesting testAsync(%s, %s)%n", uri, sameClient);
+        ConcurrentHashMap<String, CompletableFuture<String>> responseSent = new ConcurrentHashMap<>();
+        FCHttp2TestExchange.setResponseSentCB((s) -> responseSent.get(s).complete(s));
+
+        HttpClient client = null;
+        try {
+            int max = sameClient ? 5 : 3;
+            String label = null;
+            for (int i = 0; i < max; i++) {
+                if (!sameClient || client == null)
+                    client = HttpClient.newBuilder()
+                            .executor(Executors.newCachedThreadPool())
+                            .sslContext(sslContext).build();
+
+                String query = "reqId=" + reqid.incrementAndGet();
+                URI uriWithQuery = URI.create(uri + "?" + query);
+                CompletableFuture<String> sent = new CompletableFuture<>();
+                responseSent.put(query, sent);
+                HttpRequest request = HttpRequest.newBuilder(uriWithQuery)
+                        .POST(BodyPublishers.ofString("Hello there!"))
+                        .build();
+                System.out.println("\nSending request:" + uriWithQuery);
+                final HttpClient cc = client;
+
+                Throwable t = null;
+                try {
+                    HttpResponse<InputStream> response = cc.sendAsync(request, BodyHandlers.ofInputStream()).get();
+                    if (sameClient) {
+                        String key = response.headers().firstValue("X-Connection-Key").get();
+                        if (label == null) label = key;
+                        assertEquals(key, label, "Unexpected key for " + query);
+                    }
+                    sent.join();
+                    int wait = uri.startsWith("https://") ? 600 : 300;
+                    try (InputStream is = response.body()) {
+                        Thread.sleep(Utils.adjustTimeout(wait));
+                        is.readAllBytes();
+                    }
+                    // we could fail here if we haven't waited long enough
+                    fail("Expected exception, got :" + response + ", should sleep time be raised?");
+                } catch (Throwable t0) {
+                    System.out.println("Got EXPECTED: " + t0);
+                    if (t0 instanceof ExecutionException) {
+                        t0 = t0.getCause();
+                    }
+                    t = t0;
+                } finally {
+                    if (!sameClient && client != null) {
+                        ExecutorService exec = (ExecutorService)client.executor().get();
+                        exec.shutdown();
+                        client = null;
+                    }
+                }
+                assertDetailMessage(t, i);
+            }
+        } finally {
+            if (sameClient && client != null) {
+                ExecutorService exec = (ExecutorService)client.executor().get();
+                exec.shutdown();
+            }
+        }
+    }
+
+    // Assertions based on implementation specific detail messages. Keep in
+    // sync with implementation.
+    static void assertDetailMessage(Throwable throwable, int iterationIndex) {
+        try {
+            Throwable cause = throwable;
+            while (cause != null) {
+                if (cause instanceof ProtocolException) {
+                    if (cause.getMessage().matches("stream [0-9]+ flow control window exceeded")) {
+                       System.out.println("Found expected exception: " + cause);
+                       return;
+                    }
+                }
+                cause = cause.getCause();
+            }
+            throw new AssertionError(
+                    "ProtocolException(\"stream X flow control window exceeded\") not found",
+                             throwable);
+        } catch (AssertionError e) {
+            System.out.println("Exception does not match expectation: " + throwable);
+            throwable.printStackTrace(System.out);
+            throw e;
+        }
+    }
+
+    @BeforeTest
+    public void setup() throws Exception {
+        sslContext = new SimpleSSLContext().get();
+        if (sslContext == null)
+            throw new AssertionError("Unexpected null sslContext");
+
+        var http2TestServer = new Http2TestServer("localhost", false, 0);
+        http2TestServer.addHandler(new Http2TestHandler(), "/http2/");
+        this.http2TestServer = HttpServerAdapters.HttpTestServer.of(http2TestServer);
+        http2URI = "http://" + this.http2TestServer.serverAuthority() + "/http2/x";
+
+        var https2TestServer = new Http2TestServer("localhost", true, sslContext);
+        https2TestServer.addHandler(new Http2TestHandler(), "/https2/");
+        this.https2TestServer = HttpServerAdapters.HttpTestServer.of(https2TestServer);
+        https2URI = "https://" + this.https2TestServer.serverAuthority() + "/https2/x";
+
+        // Override the default exchange supplier with a custom one to enable
+        // particular test scenarios
+        http2TestServer.setExchangeSupplier(FCHttp2TestExchange::new);
+        https2TestServer.setExchangeSupplier(FCHttp2TestExchange::new);
+
+        this.http2TestServer.start();
+        this.https2TestServer.start();
+    }
+
+    @AfterTest
+    public void teardown() throws Exception {
+        http2TestServer.stop();
+        https2TestServer.stop();
+    }
+
+    static class Http2TestHandler implements Http2Handler {
+
+        @Override
+        public void handle(Http2TestExchange t) throws IOException {
+            String query = t.getRequestURI().getRawQuery();
+
+            try (InputStream is = t.getRequestBody();
+                 OutputStream os = t.getResponseBody()) {
+
+                byte[] bytes = is.readAllBytes();
+                System.out.println("Server " + t.getLocalAddress() + " received:\n"
+                        + t.getRequestURI() + ": " + new String(bytes, StandardCharsets.UTF_8));
+                t.getResponseHeaders().setHeader("X-Connection-Key", t.getConnectionKey());
+
+                if (bytes.length == 0) bytes = "no request body!".getBytes(StandardCharsets.UTF_8);
+                int window = Integer.getInteger("jdk.httpclient.windowsize", 2 * 16 * 1024);
+                final int maxChunkSize;
+                if (t instanceof FCHttp2TestExchange) {
+                    maxChunkSize = Math.min(window, ((FCHttp2TestExchange)t).conn.getMaxFrameSize());
+                } else {
+                    maxChunkSize = Math.min(window, SettingsFrame.MAX_FRAME_SIZE);
+                }
+                byte[] resp = bytes.length <= maxChunkSize
+                        ? bytes
+                        : Arrays.copyOfRange(bytes, 0, maxChunkSize);
+                int max = (window / resp.length) + 2;
+                // send in chunks
+                t.sendResponseHeaders(200, 0);
+                for (int i = 0; i <= max; i++) {
+                    if (t instanceof FCHttp2TestExchange) {
+                        try {
+                            // we don't wait for the stream window, but we want
+                            // to wait for the connection window
+                            ((FCHttp2TestExchange)t).conn.obtainConnectionWindow(resp.length);
+                        } catch (InterruptedException ie) {
+                            // ignore and continue...
+                        }
+                    }
+                    ((BodyOutputStream) os).writeUncontrolled(resp, 0, resp.length);
+                }
+            }
+            if (t instanceof FCHttp2TestExchange) {
+                ((FCHttp2TestExchange)t).responseSent(query);
+            } else fail("Exchange is not " + FCHttp2TestExchange.class.getName() +
+                    " but " + t.getClass().getName());
+        }
+    }
+
+    // A custom Http2TestExchangeImpl that overrides sendResponseHeaders to
+    // allow headers to be sent with a number of CONTINUATION frames.
+    static class FCHttp2TestExchange extends Http2TestExchangeImpl {
+        static volatile Consumer<String> responseSentCB;
+        static void setResponseSentCB(Consumer<String> responseSentCB) {
+            FCHttp2TestExchange.responseSentCB = responseSentCB;
+        }
+
+        final Http2TestServerConnection conn;
+        FCHttp2TestExchange(int streamid, String method, HttpHeaders reqheaders,
+                             HttpHeadersBuilder rspheadersBuilder, URI uri, InputStream is,
+                             SSLSession sslSession, BodyOutputStream os,
+                             Http2TestServerConnection conn, boolean pushAllowed) {
+            super(streamid, method, reqheaders, rspheadersBuilder, uri, is, sslSession, os, conn, pushAllowed);
+            this.conn = conn;
+        }
+        public void responseSent(String query) {
+            System.out.println("Server: response sent for " + query);
+            responseSentCB.accept(query);
+        }
+
+    }
+}
diff --git a/test/jdk/java/net/httpclient/http2/server/BodyOutputStream.java b/test/jdk/java/net/httpclient/http2/server/BodyOutputStream.java
index d08495e709d..a744a48cecb 100644
--- a/test/jdk/java/net/httpclient/http2/server/BodyOutputStream.java
+++ b/test/jdk/java/net/httpclient/http2/server/BodyOutputStream.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2016, 2024, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -23,6 +23,7 @@
 
 import java.io.*;
 import java.nio.ByteBuffer;
+import java.util.Objects;
 
 import jdk.internal.net.http.frame.DataFrame;
 
@@ -60,6 +61,10 @@ void waitForWindow(int demand) throws InterruptedException {
         // first wait for the connection window
         conn.obtainConnectionWindow(demand);
         // now wait for the stream window
+        waitForStreamWindow(demand);
+    }
+
+    public void waitForStreamWindow(int demand) throws InterruptedException {
         synchronized (this) {
             while (demand > 0) {
                 int n = Math.min(demand, window);
@@ -78,6 +83,7 @@ void goodToGo() {
 
     @Override
     public void write(byte[] buf, int offset, int len) throws IOException {
+        Objects.checkFromIndexSize(offset, len, buf.length);
         if (closed) {
             throw new IOException("closed");
         }
@@ -99,6 +105,34 @@ public void write(byte[] buf, int offset, int len) throws IOException {
         }
     }
 
+    /**
+     * This method pushes frames onto the stack without checking
+     * for flow control, allowing the sender to bypass flow
+     * control for testing purposes
+     * @param buf     data to send
+     * @param offset  offset at which the data starts
+     * @param len     length of the data to send
+     * @throws IOException if an I/O error occurs
+     */
+    public void writeUncontrolled(byte[] buf, int offset, int len)
+            throws IOException {
+        Objects.checkFromIndexSize(offset, len, buf.length);
+        if (closed) {
+            throw new IOException("closed");
+        }
+
+        if (!goodToGo) {
+            throw new IllegalStateException("sendResponseHeaders must be called first");
+        }
+        int max = conn.getMaxFrameSize();
+        while (len > 0) {
+            int n = len > max ? max : len;
+            send(buf, offset, n, 0);
+            offset += n;
+            len -= n;
+        }
+    }
+
     private void send(byte[] buf, int offset, int len, int flags) throws IOException {
         ByteBuffer buffer = ByteBuffer.allocate(len);
         buffer.put(buf, offset, len);
diff --git a/test/jdk/java/net/httpclient/http2/server/Http2TestExchange.java b/test/jdk/java/net/httpclient/http2/server/Http2TestExchange.java
index 3f068440b46..232ed554cd0 100644
--- a/test/jdk/java/net/httpclient/http2/server/Http2TestExchange.java
+++ b/test/jdk/java/net/httpclient/http2/server/Http2TestExchange.java
@@ -81,5 +81,11 @@ default void sendFrames(List<Http2Frame> frames) throws IOException {
      * with the number of milliseconds it took to get a valid response.
      * It may also complete exceptionally
      */
+
     CompletableFuture<Long> sendPing();
+    /**
+     * {@return the identification of the connection on which this exchange is being
+     * processed}
+     */
+    String getConnectionKey();
 }
diff --git a/test/jdk/java/net/httpclient/http2/server/Http2TestExchangeImpl.java b/test/jdk/java/net/httpclient/http2/server/Http2TestExchangeImpl.java
index f4c6bdbf648..c257b5d4414 100644
--- a/test/jdk/java/net/httpclient/http2/server/Http2TestExchangeImpl.java
+++ b/test/jdk/java/net/httpclient/http2/server/Http2TestExchangeImpl.java
@@ -214,6 +214,11 @@ public void serverPush(URI uri, HttpHeaders headers, InputStream content) {
         }
     }
 
+    @Override
+    public String getConnectionKey() {
+        return conn.connectionKey();
+    }
+
     private boolean isHeadRequest() {
         return HEAD.equalsIgnoreCase(getRequestMethod());
     }
diff --git a/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java b/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java
index 7e45743934d..e57a1421ed6 100644
--- a/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java
+++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java
@@ -630,6 +630,10 @@ void createPrimordialStream(Http1InitialRequest request) throws IOException {
         });
     }
 
+    final String connectionKey() {
+        return this.server.getAddress() + "->" + this.socket.getRemoteSocketAddress();
+    }
+
     // all other streams created here
     @SuppressWarnings({"rawtypes","unchecked"})
     void createStream(HeaderFrame frame) throws IOException {
@@ -1286,7 +1290,7 @@ void registerStreamWindowUpdater(int streamid, Consumer<Integer> r) {
      *
      * @param amount
      */
-    synchronized void obtainConnectionWindow(int amount) throws InterruptedException {
+    public synchronized void obtainConnectionWindow(int amount) throws InterruptedException {
         while (amount > 0) {
             int n = Math.min(amount, sendWindow);
             amount -= n;
@@ -1296,9 +1300,13 @@ synchronized void obtainConnectionWindow(int amount) throws InterruptedException
         }
     }
 
-    synchronized void updateConnectionWindow(int amount) {
-        sendWindow += amount;
-        notifyAll();
+    void updateConnectionWindow(int amount) {
+        System.out.printf("sendWindow (window=%s, amount=%s) is now: %s%n",
+                sendWindow, amount, sendWindow + amount);
+        synchronized (this) {
+            sendWindow += amount;
+            notifyAll();
+        }
     }
 
     // simplified output headers class. really just a type safe container