Skip to content

Commit

Permalink
8302635: Race condition in HttpBodySubscriberWrapper when cancelling …
Browse files Browse the repository at this point in the history
…request

Reviewed-by: jpai
dfuch committed Feb 17, 2023
1 parent cd77fcf commit edf238b
Showing 4 changed files with 213 additions and 45 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2022, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -209,24 +209,12 @@ static final class Http1ResponseBodySubscriber<U> extends HttpBodySubscriberWrap
}

@Override
protected void onSubscribed() {
protected void register() {
exchange.registerResponseSubscriber(this);
}

@Override
protected void complete(Throwable t) {
try {
exchange.unregisterResponseSubscriber(this);
} finally {
super.complete(t);
}
}

@Override
protected void onCancel() {
// If the subscription is cancelled the
// subscriber may or may not get completed.
// Therefore we need to unregister it
protected void unregister() {
exchange.unregisterResponseSubscriber(this);
}
}
13 changes: 2 additions & 11 deletions src/java.net.http/share/classes/jdk/internal/net/http/Stream.java
Original file line number Diff line number Diff line change
@@ -1606,21 +1606,12 @@ final class Http2StreamResponseSubscriber<U> extends HttpBodySubscriberWrapper<U
}

@Override
protected void onSubscribed() {
protected void register() {
registerResponseSubscriber(this);
}

@Override
protected void complete(Throwable t) {
try {
unregisterResponseSubscriber(this);
} finally {
super.complete(t);
}
}

@Override
protected void onCancel() {
protected void unregister() {
unregisterResponseSubscriber(this);
}

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2022, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -58,11 +58,16 @@ public void request(long n) { }
public void cancel() { }
};

static final int SUBSCRIBED = 1;
static final int REGISTERED = 2;
static final int COMPLETED = 4;
static final int CANCELLED = 8;
static final int UNREGISTERED = 16;

static final AtomicLong IDS = new AtomicLong();
final long id = IDS.incrementAndGet();
final BodySubscriber<T> userSubscriber;
final AtomicBoolean completed = new AtomicBoolean();
final AtomicBoolean subscribed = new AtomicBoolean();
private volatile int state;
final ReentrantLock subscriptionLock = new ReentrantLock();
volatile SubscriptionWrapper subscription;
volatile Throwable withError;
@@ -83,14 +88,55 @@ public void request(long n) {
@Override
public void cancel() {
try {
subscription.cancel();
onCancel();
try {
subscription.cancel();
} finally {
if (markCancelled()) {
onCancel();
}
}
} catch (Throwable t) {
onError(t);
}
}
}

private final boolean markState(final int flag) {
int state = this.state;
if ((state & flag) == flag) {
return false;
}
synchronized (this) {
state = this.state;
if ((state & flag) == flag) {
return false;
}
state = this.state = (state | flag);
}
assert (state & flag) == flag;
return true;
}

private boolean markSubscribed() {
return markState(SUBSCRIBED);
}

private boolean markCancelled() {
return markState(CANCELLED);
}

private boolean markCompleted() {
return markState(COMPLETED);
}

private boolean markRegistered() {
return markState(REGISTERED);
}

private boolean markUnregistered() {
return markState(UNREGISTERED);
}

final long id() { return id; }

@Override
@@ -101,8 +147,9 @@ public boolean needsExecutor() {
// propagate the error to the user subscriber, even if not
// subscribed yet.
private void propagateError(Throwable t) {
var state = this.state;
assert t != null;
assert completed.get();
assert (state & COMPLETED) != 0;
try {
// if unsubscribed at this point, it will not
// get subscribed later - so do it now and
@@ -111,7 +158,7 @@ private void propagateError(Throwable t) {
// subscription is finished before calling onError;
subscriptionLock.lock();
try {
if (subscribed.compareAndSet(false, true)) {
if (markSubscribed()) {
userSubscriber.onSubscribe(NOP);
}
} finally {
@@ -125,34 +172,139 @@ private void propagateError(Throwable t) {
}
}

/**
* This method attempts to mark the state of this
* object as registered, and then call the
* {@link #register()} method.
* <p>
* The state will be marked as registered, and the
* {@code register()} method will be called only
* if not already registered or unregistered,
* or cancelled, or completed.
*
* @return {@code true} if {@link #register()} was called,
* false otherwise.
*/
protected final boolean tryRegister() {
subscriptionLock.lock();
try {
int state = this.state;
if ((state & (REGISTERED | UNREGISTERED | CANCELLED | COMPLETED)) != 0) return false;
if (markRegistered()) {
register();
return true;
}
} finally {
subscriptionLock.unlock();
}
return false;
}

/**
* This method attempts to mark the state of this
* object as unregistered, and then call the
* {@link #unregister()} method.
* <p>
* The {@code unregister()} method will be called only
* if already registered and not yet unregistered.
* Whether {@code unregister()} is called or not,
* the state is marked as unregistered, to prevent
* {@link #tryRegister()} from calling {@link #register()}
* after {@link #tryUnregister()} has been called.
*
* @return {@code true} if {@link #unregister()} was called,
* false otherwise.
*/
protected final boolean tryUnregister() {
subscriptionLock.lock();
try {
int state = this.state;
if ((state & REGISTERED) == 0) {
markUnregistered();
return false;
}
if (markUnregistered()) {
unregister();
return true;
}
} finally {
subscriptionLock.unlock();
}
return false;
}

/**
* This method can be implemented by subclasses
* to perform registration actions. It will not be
* called if already registered or unregistered.
* @apiNote
* This method is called while holding a subscription
* lock.
* @see #tryRegister()
*/
protected void register() {
assert subscriptionLock.isHeldByCurrentThread();
}

/**
* This method can be implemented by subclasses
* to perform unregistration actions. It will not be
* called if not already registered, or already unregistered.
* @apiNote
* This method is called while holding a subscription
* lock.
* @see #tryUnregister()
*/
protected void unregister() {
assert subscriptionLock.isHeldByCurrentThread();
}

/**
* Called when the subscriber cancels its subscription.
* @apiNote
* This method may be used by subclasses to perform cleanup
* actions after a subscription has been cancelled.
* @implSpec
* This method calls {@link #tryUnregister()}
*/
protected void onCancel() { }
protected void onCancel() {
// If the subscription is cancelled the
// subscriber may or may not get completed.
// Therefore we need to unregister it
tryUnregister();
}

/**
* Called right before the userSubscriber::onSubscribe is called.
* @apiNote
* This method may be used by subclasses to perform cleanup
* related actions after a subscription has been succesfully
* related actions after a subscription has been successfully
* accepted.
* This method is called while holding a subscription
* lock.
* @implSpec
* This method calls {@link #tryRegister()}
*/
protected void onSubscribed() { }
protected void onSubscribed() {
tryRegister();
}

/**
* Complete the subscriber, either normally or exceptionally
* ensure that the subscriber is completed only once.
* @param t a throwable, or {@code null}
* @implSpec
* If not {@linkplain #completed()} yet, this method
* calls {@link #tryUnregister()}
*/
protected void complete(Throwable t) {
if (completed.compareAndSet(false, true)) {
public final void complete(Throwable t) {
if (markCompleted()) {
tryUnregister();
t = withError = Utils.getCompletionCause(t);
if (t == null) {
try {
assert subscribed.get();
var state = this.state;
assert (state & SUBSCRIBED) != 0;
userSubscriber.onComplete();
} catch (Throwable x) {
// Simply propagate the error by calling
@@ -179,10 +331,45 @@ protected void complete(Throwable t) {
* {@return true if this subscriber has already completed, either normally
* or abnormally}
*/
public boolean completed() {
return completed.get();
public final boolean completed() {
int state = this.state;
return (state & COMPLETED) != 0;
}

/**
* {@return true if this subscriber has already subscribed}
*/
public final boolean subscribed() {
int state = this.state;
return (state & SUBSCRIBED) != 0;
}

/**
* {@return true if this subscriber has already been registered}
*/
public final boolean registered() {
int state = this.state;
return (state & REGISTERED) != 0;
}

/**
* {@return true if this subscriber has already been unregistered}
*/
public final boolean unregistered() {
int state = this.state;
return (state & UNREGISTERED) != 0;
}

/**
* {@return true if this subscriber's subscription has already
* been cancelled}
*/
public final boolean cancelled() {
int state = this.state;
return (state & CANCELLED) != 0;
}


@Override
public CompletionStage<T> getBody() {
return userSubscriber.getBody();
@@ -194,7 +381,7 @@ public void onSubscribe(Flow.Subscription subscription) {
// subscription is finished before calling onError;
subscriptionLock.lock();
try {
if (subscribed.compareAndSet(false, true)) {
if (markSubscribed()) {
onSubscribed();
SubscriptionWrapper wrapped = new SubscriptionWrapper(subscription);
userSubscriber.onSubscribe(this.subscription = wrapped);
@@ -208,8 +395,9 @@ public void onSubscribe(Flow.Subscription subscription) {

@Override
public void onNext(List<ByteBuffer> item) {
assert subscribed.get();
if (completed.get()) {
var state = this.state;
assert (state & SUBSCRIBED) != 0;
if ((state & COMPLETED) != 0) {
SubscriptionWrapper subscription = this.subscription;
if (subscription != null) {
subscription.subscription.cancel();
@@ -222,6 +410,7 @@ public void onNext(List<ByteBuffer> item) {
public void onError(Throwable throwable) {
complete(throwable);
}

@Override
public void onComplete() {
complete(null);
2 changes: 1 addition & 1 deletion test/jdk/java/net/httpclient/CancelRequestTest.java
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@

/*
* @test
* @bug 8245462 8229822 8254786 8297075 8297149 8298340
* @bug 8245462 8229822 8254786 8297075 8297149 8298340 8302635
* @summary Tests cancelling the request.
* @library /test/lib /test/jdk/java/net/httpclient/lib
* @key randomness

1 comment on commit edf238b

@openjdk-notifier
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.