Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

8317808: HTTP/2 stream cancelImpl may leave subscriber registered #1426

Closed
wants to merge 7 commits into from
Closed
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -58,10 +58,6 @@ abstract class ExchangeImpl<T> {

final Exchange<T> exchange;

// this will be set to true only when the peer explicitly states (through a GOAWAY frame or
// a relevant error code in reset frame) that the corresponding stream (id) wasn't processed
private volatile boolean unprocessedByPeer;

ExchangeImpl(Exchange<T> e) {
// e == null means a http/2 pushed stream
this.exchange = e;
@@ -268,13 +264,4 @@ void upgraded() { }
// Called when server returns non 100 response to
// an Expect-Continue
void expectContinueFailed(int rcode) { }

final boolean isUnprocessedByPeer() {
return this.unprocessedByPeer;
}

// Marks the exchange as unprocessed by the peer
final void markUnprocessedByPeer() {
this.unprocessedByPeer = true;
}
}
Original file line number Diff line number Diff line change
@@ -49,7 +49,6 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -397,7 +396,6 @@ private record PushContinuationState(PushPromiseDecoder pushContDecoder, PushPro
private final String key; // for HttpClientImpl.connections map
private final FramesDecoder framesDecoder;
private final FramesEncoder framesEncoder = new FramesEncoder();
private final AtomicLong lastProcessedStreamInGoAway = new AtomicLong(-1);

/**
* Send Window controller for both connection and stream windows.
@@ -804,9 +802,7 @@ final int maxConcurrentServerInitiatedStreams() {

void close() {
if (markHalfClosedLocal()) {
// we send a GOAWAY frame only if the remote side hasn't already indicated
// the intention to close the connection by previously sending a GOAWAY of its own
if (connection.channel().isOpen() && !isMarked(closedState, HALF_CLOSED_REMOTE)) {
if (connection.channel().isOpen()) {
Log.logTrace("Closing HTTP/2 connection: to {0}", connection.address());
GoAwayFrame f = new GoAwayFrame(0,
ErrorFrame.NO_ERROR,
@@ -1358,46 +1354,13 @@ private void handlePing(PingFrame frame)
sendUnorderedFrame(frame);
}

private void handleGoAway(final GoAwayFrame frame) {
final long lastProcessedStream = frame.getLastStream();
assert lastProcessedStream >= 0 : "unexpected last stream id: "
+ lastProcessedStream + " in GOAWAY frame";

markHalfClosedRemote();
setFinalStream(); // don't allow any new streams on this connection
if (debug.on()) {
debug.log("processing incoming GOAWAY with last processed stream id:%s in frame %s",
lastProcessedStream, frame);
}
// see if this connection has previously received a GOAWAY from the peer and if yes
// then check if this new last processed stream id is lesser than the previous
// known last processed stream id. Only update the last processed stream id if the new
// one is lesser than the previous one.
long prevLastProcessed = lastProcessedStreamInGoAway.get();
while (prevLastProcessed == -1 || lastProcessedStream < prevLastProcessed) {
if (lastProcessedStreamInGoAway.compareAndSet(prevLastProcessed,
lastProcessedStream)) {
break;
}
prevLastProcessed = lastProcessedStreamInGoAway.get();
}
handlePeerUnprocessedStreams(lastProcessedStreamInGoAway.get());
}

private void handlePeerUnprocessedStreams(final long lastProcessedStream) {
final AtomicInteger numClosed = new AtomicInteger(); // atomic merely to allow usage within lambda
streams.forEach((id, exchange) -> {
if (id > lastProcessedStream) {
// any streams with an stream id higher than the last processed stream
// can be retried (on a new connection). we close the exchange as unprocessed
// to facilitate the retrying.
client2.client().theExecutor().ensureExecutedAsync(exchange::closeAsUnprocessed);
numClosed.incrementAndGet();
}
});
if (debug.on()) {
debug.log(numClosed.get() + " stream(s), with id greater than " + lastProcessedStream
+ ", will be closed as unprocessed");
private void handleGoAway(GoAwayFrame frame)
throws IOException
{
if (markHalfClosedLRemote()) {
shutdown(new IOException(
connection.channel().getLocalAddress()
+ ": GOAWAY received"));
}
}

@@ -1948,7 +1911,7 @@ private boolean markHalfClosedLocal() {
return markClosedState(HALF_CLOSED_LOCAL);
}

private boolean markHalfClosedRemote() {
private boolean markHalfClosedLRemote() {
return markClosedState(HALF_CLOSED_REMOTE);
}

Original file line number Diff line number Diff line change
@@ -282,25 +282,23 @@ public String toString() {
}
}

static <T> CompletableFuture<T> registerPending(PendingRequest pending, CompletableFuture<T> res) {
static void registerPending(PendingRequest pending) {
// shortcut if cf is already completed: no need to go through the trouble of
// registering it
if (pending.cf.isDone()) return res;
if (pending.cf.isDone()) return;

var client = pending.client;
var cf = pending.cf;
var id = pending.id;
boolean added = client.pendingRequests.add(pending);
// this may immediately remove `pending` from the set is the cf is already completed
var ref = res.whenComplete((r,t) -> client.pendingRequests.remove(pending));
pending.ref = ref;
pending.ref = cf.whenComplete((r,t) -> client.pendingRequests.remove(pending));
assert added : "request %d was already added".formatted(id);
// should not happen, unless the selector manager has already
// exited abnormally
if (client.selmgr.isClosed()) {
pending.abort(client.selmgr.selectorClosedException());
}
return ref;
}

static void abortPendingRequests(HttpClientImpl client, Throwable reason) {
@@ -933,9 +931,8 @@ private void debugCompleted(String tag, long startNanos, HttpRequest req) {
cf = sendAsync(req, responseHandler, null, null);
return cf.get();
} catch (InterruptedException ie) {
if (cf != null) {
if (cf != null )
cf.cancel(true);
}
throw ie;
} catch (ExecutionException e) {
final Throwable throwable = e.getCause();
@@ -1057,23 +1054,19 @@ private void debugCompleted(String tag, long startNanos, HttpRequest req) {
(b,t) -> debugCompleted("ClientImpl (async)", start, userRequest));
}

// makes sure that any dependent actions happen in the CF default
// executor. This is only needed for sendAsync(...), when
// exchangeExecutor is non-null.
if (exchangeExecutor != null) {
res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, ASYNC_POOL);
}

// The mexCf is the Cf we need to abort if the SelectorManager thread
// is aborted.
PendingRequest pending = new PendingRequest(id, requestImpl, mexCf, mex, this);
res = registerPending(pending, res);

if (exchangeExecutor != null) {
// makes sure that any dependent actions happen in the CF default
// executor. This is only needed for sendAsync(...), when
// exchangeExecutor is non-null.
return res.isDone() ? res
: res.whenCompleteAsync((r, t) -> { /* do nothing */}, ASYNC_POOL);
} else {
// make a defensive copy that can be safely canceled
// by the caller
return res.isDone() ? res : res.copy();
}
} catch (Throwable t) {
registerPending(pending);
return res;
} catch(Throwable t) {
requestUnreference();
debugCompleted("ClientImpl (async)", start, userRequest);
throw t;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -90,8 +90,8 @@ class MultiExchange<T> implements Cancelable {
Exchange<T> exchange; // the current exchange
Exchange<T> previous;
volatile Throwable retryCause;
volatile boolean retriedOnce;
volatile HttpResponse<T> response;
volatile boolean expiredOnce;
volatile HttpResponse<T> response = null;

// Maximum number of times a request will be retried/redirected
// for any reason
@@ -279,19 +279,11 @@ public void cancel(IOException cause) {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = this.cancelled;
boolean firstCancel = false;
if (!cancelled && mayInterruptIfRunning) {
if (interrupted.get() == null) {
firstCancel = interrupted.compareAndSet(null,
interrupted.compareAndSet(null,
new CancellationException("Request cancelled"));
}
if (debug.on()) {
if (firstCancel) {
debug.log("multi exchange recording: " + interrupted.get());
} else {
debug.log("multi exchange recorded: " + interrupted.get());
}
}
this.cancelled = true;
var exchange = getExchange();
if (exchange != null) {
@@ -373,30 +365,17 @@ private CompletableFuture<HttpResponse<T>> handleNoBody(Response r, Exchange<T>
}).exceptionallyCompose(this::whenCancelled);
}

// returns a CancellationExcpetion that wraps the given cause
// if cancel(boolean) was called, the given cause otherwise
private Throwable wrapIfCancelled(Throwable cause) {
CancellationException interrupt = interrupted.get();
if (interrupt == null) return cause;

var cancel = new CancellationException(interrupt.getMessage());
// preserve the stack trace of the original exception to
// show where the call to cancel(boolean) came from
cancel.setStackTrace(interrupt.getStackTrace());
cancel.initCause(Utils.getCancelCause(cause));
return cancel;
}

// if the request failed because the multi exchange was cancelled,
// make sure the reported exception is wrapped in CancellationException
private CompletableFuture<HttpResponse<T>> whenCancelled(Throwable t) {
var x = wrapIfCancelled(t);
if (x instanceof CancellationException) {
CancellationException x = interrupted.get();
if (x != null) {
// make sure to fail with CancellationException if cancel(true)
// was called.
t = x.initCause(Utils.getCancelCause(t));
if (debug.on()) {
debug.log("MultiExchange interrupted with: " + x.getCause());
debug.log("MultiExchange interrupted with: " + t.getCause());
}
}
return MinimalFuture.failedFuture(x);
return MinimalFuture.failedFuture(t);
}

static class NullSubscription implements Flow.Subscription {
@@ -459,7 +438,7 @@ private CompletableFuture<Response> responseAsyncImpl() {
return exch.ignoreBody().handle((r,t) -> {
previousreq = currentreq;
currentreq = newrequest;
retriedOnce = false;
expiredOnce = false;
setExchange(new Exchange<>(currentreq, this, acc));
return responseAsyncImpl();
}).thenCompose(Function.identity());
@@ -472,7 +451,7 @@ private CompletableFuture<Response> responseAsyncImpl() {
return completedFuture(response);
}
// all exceptions thrown are handled here
CompletableFuture<Response> errorCF = getExceptionalCF(ex, exch.exchImpl);
CompletableFuture<Response> errorCF = getExceptionalCF(ex);
if (errorCF == null) {
return responseAsyncImpl();
} else {
@@ -544,38 +523,34 @@ private Throwable retryCause(Throwable t) {
* Takes a Throwable and returns a suitable CompletableFuture that is
* completed exceptionally, or null.
*/
private CompletableFuture<Response> getExceptionalCF(Throwable t, ExchangeImpl<?> exchImpl) {
private CompletableFuture<Response> getExceptionalCF(Throwable t) {
if ((t instanceof CompletionException) || (t instanceof ExecutionException)) {
if (t.getCause() != null) {
t = t.getCause();
}
}
final boolean retryAsUnprocessed = exchImpl != null && exchImpl.isUnprocessedByPeer();
if (cancelled && !requestCancelled() && t instanceof IOException) {
if (!(t instanceof HttpTimeoutException)) {
t = toTimeoutException((IOException)t);
}
} else if (retryAsUnprocessed || retryOnFailure(t)) {
} else if (retryOnFailure(t)) {
Throwable cause = retryCause(t);

if (!(t instanceof ConnectException)) {
// we may need to start a new connection, and if so
// we want to start with a fresh connect timeout again.
if (connectTimeout != null) connectTimeout.reset();
if (!retryAsUnprocessed && !canRetryRequest(currentreq)) {
// a (peer) processed request which cannot be retried, fail with
// the original cause
return failedFuture(cause);
if (!canRetryRequest(currentreq)) {
return failedFuture(cause); // fails with original cause
}
} // ConnectException: retry, but don't reset the connectTimeout.

// allow the retry mechanism to do its work
retryCause = cause;
if (!retriedOnce) {
if (!expiredOnce) {
if (debug.on())
debug.log(t.getClass().getSimpleName()
+ " (async): retrying " + currentreq + " due to: ", t);
retriedOnce = true;
debug.log(t.getClass().getSimpleName() + " (async): retrying...", t);
expiredOnce = true;
// The connection was abruptly closed.
// We return null to retry the same request a second time.
// The request filters have already been applied to the
@@ -586,7 +561,7 @@ private CompletableFuture<Response> getExceptionalCF(Throwable t, ExchangeImpl<?
} else {
if (debug.on()) {
debug.log(t.getClass().getSimpleName()
+ " (async): already retried once " + currentreq, t);
+ " (async): already retried once.", t);
}
t = cause;
}
Loading