Skip to content
This repository has been archived by the owner on Sep 19, 2023. It is now read-only.

Commit

Permalink
8291897: TerminatingThreadLocal(s) not registered from virtual thread(s)
Browse files Browse the repository at this point in the history
Backport-of: 861cc67
  • Loading branch information
Rob McKenna committed Sep 13, 2022
1 parent 0c71773 commit 5d304f5
Show file tree
Hide file tree
Showing 9 changed files with 250 additions and 66 deletions.
18 changes: 14 additions & 4 deletions src/java.base/share/classes/java/lang/System.java
Expand Up @@ -67,6 +67,8 @@
import java.util.function.Supplier;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;

import jdk.internal.misc.CarrierThreadLocal;
import jdk.internal.misc.Unsafe;
import jdk.internal.util.StaticProperty;
import jdk.internal.module.ModuleBootstrap;
Expand Down Expand Up @@ -2554,12 +2556,20 @@ public <V> V executeOnCarrierThread(Callable<V> task) throws Exception {
}
}

public <T> T getCarrierThreadLocal(ThreadLocal<T> local) {
return local.getCarrierThreadLocal();
public <T> T getCarrierThreadLocal(CarrierThreadLocal<T> local) {
return ((ThreadLocal<T>)local).getCarrierThreadLocal();
}

public <T> void setCarrierThreadLocal(CarrierThreadLocal<T> local, T value) {
((ThreadLocal<T>)local).setCarrierThreadLocal(value);
}

public void removeCarrierThreadLocal(CarrierThreadLocal<?> local) {
((ThreadLocal<?>)local).removeCarrierThreadLocal();
}

public <T> void setCarrierThreadLocal(ThreadLocal<T> local, T value) {
local.setCarrierThreadLocal(value);
public boolean isCarrierThreadLocalPresent(CarrierThreadLocal<?> local) {
return ((ThreadLocal<?>)local).isCarrierThreadLocalPresent();
}

public Object[] extentLocalCache() {
Expand Down
31 changes: 24 additions & 7 deletions src/java.base/share/classes/java/lang/ThreadLocal.java
Expand Up @@ -29,6 +29,8 @@
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import jdk.internal.misc.CarrierThreadLocal;
import jdk.internal.misc.TerminatingThreadLocal;

/**
Expand Down Expand Up @@ -172,6 +174,7 @@ public T get() {
* thread-local variable.
*/
T getCarrierThreadLocal() {
assert this instanceof CarrierThreadLocal<T>;
return get(Thread.currentCarrierThread());
}

Expand All @@ -193,14 +196,18 @@ private T get(Thread t) {
}

/**
* Returns {@code true} if there is a value in the current thread's copy of
* Returns {@code true} if there is a value in the current carrier thread's copy of
* this thread-local variable, even if that values is {@code null}.
*
* @return {@code true} if current thread has associated value in this
* @return {@code true} if current carrier thread has associated value in this
* thread-local variable; {@code false} if not
*/
boolean isPresent() {
Thread t = Thread.currentThread();
boolean isCarrierThreadLocalPresent() {
assert this instanceof CarrierThreadLocal<T>;
return isPresent(Thread.currentCarrierThread());
}

private boolean isPresent(Thread t) {
ThreadLocalMap map = getMap(t);
if (map != null && map != ThreadLocalMap.NOT_SUPPORTED) {
return map.getEntry(this) != null;
Expand All @@ -224,8 +231,8 @@ private T setInitialValue(Thread t) {
} else {
createMap(t, value);
}
if (this instanceof TerminatingThreadLocal) {
TerminatingThreadLocal.register((TerminatingThreadLocal<?>) this);
if (this instanceof TerminatingThreadLocal<?> ttl) {
TerminatingThreadLocal.register(ttl);
}
return value;
}
Expand All @@ -249,6 +256,7 @@ public void set(T value) {
}

void setCarrierThreadLocal(T value) {
assert this instanceof CarrierThreadLocal<T>;
set(Thread.currentCarrierThread(), value);
}

Expand Down Expand Up @@ -276,7 +284,16 @@ private void set(Thread t, T value) {
* @since 1.5
*/
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
remove(Thread.currentThread());
}

void removeCarrierThreadLocal() {
assert this instanceof CarrierThreadLocal<T>;
remove(Thread.currentCarrierThread());
}

private void remove(Thread t) {
ThreadLocalMap m = getMap(t);
if (m != null && m != ThreadLocalMap.NOT_SUPPORTED) {
m.remove(this);
}
Expand Down
Expand Up @@ -27,7 +27,6 @@

import java.lang.annotation.Annotation;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.module.ModuleDescriptor;
import java.lang.reflect.Executable;
Expand All @@ -45,6 +44,7 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.stream.Stream;

import jdk.internal.misc.CarrierThreadLocal;
import jdk.internal.module.ServicesCatalog;
import jdk.internal.reflect.ConstantPool;
import jdk.internal.vm.Continuation;
Expand Down Expand Up @@ -456,12 +456,23 @@ public interface JavaLangAccess {
/**
* Returns the value of the current carrier thread's copy of a thread-local.
*/
<T> T getCarrierThreadLocal(ThreadLocal<T> local);
<T> T getCarrierThreadLocal(CarrierThreadLocal<T> local);

/**
* Sets the value of the current carrier thread's copy of a thread-local.
*/
<T> void setCarrierThreadLocal(ThreadLocal<T> local, T value);
<T> void setCarrierThreadLocal(CarrierThreadLocal<T> local, T value);

/**
* Removes the value of the current carrier thread's copy of a thread-local.
*/
void removeCarrierThreadLocal(CarrierThreadLocal<?> local);

/**
* Returns {@code true} if there is a value in the current carrier thread's copy of
* thread-local, even if that values is {@code null}.
*/
boolean isCarrierThreadLocalPresent(CarrierThreadLocal<?> local);

/**
* Returns the current thread's extent locals cache
Expand Down
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/

package jdk.internal.misc;

import jdk.internal.access.JavaLangAccess;
import jdk.internal.access.SharedSecrets;

/**
* A {@link ThreadLocal} variant which binds its value to current thread's
* carrier thread.
*/
public class CarrierThreadLocal<T> extends ThreadLocal<T> {

@Override
public T get() {
return JLA.getCarrierThreadLocal(this);
}

@Override
public void set(T value) {
JLA.setCarrierThreadLocal(this, value);
}

@Override
public void remove() {
JLA.removeCarrierThreadLocal(this);
}

public boolean isPresent() {
return JLA.isCarrierThreadLocalPresent(this);
}

private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
}
Expand Up @@ -29,11 +29,12 @@
import java.util.IdentityHashMap;

/**
* A thread-local variable that is notified when a thread terminates and
* it has been initialized in the terminating thread (even if it was
* A per-carrier-thread-local variable that is notified when a thread terminates and
* it has been initialized in the terminating carrier thread or a virtual thread
* that had the terminating carrier thread as its carrier thread (even if it was
* initialized with a null value).
*/
public class TerminatingThreadLocal<T> extends ThreadLocal<T> {
public class TerminatingThreadLocal<T> extends CarrierThreadLocal<T> {

@Override
public void set(T value) {
Expand Down Expand Up @@ -79,8 +80,7 @@ public static void threadTerminated() {
* @param tl the ThreadLocal to register
*/
public static void register(TerminatingThreadLocal<?> tl) {
if (!Thread.currentThread().isVirtual())
REGISTRY.get().add(tl);
REGISTRY.get().add(tl);
}

/**
Expand All @@ -89,16 +89,15 @@ public static void register(TerminatingThreadLocal<?> tl) {
* @param tl the ThreadLocal to unregister
*/
private static void unregister(TerminatingThreadLocal<?> tl) {
if (!Thread.currentThread().isVirtual())
REGISTRY.get().remove(tl);
REGISTRY.get().remove(tl);
}

/**
* a per-thread registry of TerminatingThreadLocal(s) that have been registered
* but later not unregistered in a particular thread.
* a per-carrier-thread registry of TerminatingThreadLocal(s) that have been registered
* but later not unregistered in a particular carrier-thread.
*/
public static final ThreadLocal<Collection<TerminatingThreadLocal<?>>> REGISTRY =
new ThreadLocal<>() {
public static final CarrierThreadLocal<Collection<TerminatingThreadLocal<?>>> REGISTRY =
new CarrierThreadLocal<>() {
@Override
protected Collection<TerminatingThreadLocal<?>> initialValue() {
return Collections.newSetFromMap(new IdentityHashMap<>(4));
Expand Down
12 changes: 5 additions & 7 deletions src/java.base/share/classes/sun/nio/ch/IOVecWrapper.java
Expand Up @@ -27,8 +27,7 @@

import java.nio.ByteBuffer;

import jdk.internal.access.JavaLangAccess;
import jdk.internal.access.SharedSecrets;
import jdk.internal.misc.CarrierThreadLocal;
import jdk.internal.ref.CleanerFactory;

/**
Expand All @@ -46,7 +45,6 @@
*/

class IOVecWrapper {
private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();

// Miscellaneous constants
private static final int BASE_OFFSET = 0;
Expand Down Expand Up @@ -83,8 +81,8 @@ public void run() {
}
}

// per thread IOVecWrapper
private static final ThreadLocal<IOVecWrapper> cached = new ThreadLocal<>();
// per carrier-thread IOVecWrapper
private static final CarrierThreadLocal<IOVecWrapper> cached = new CarrierThreadLocal<>();

private IOVecWrapper(int size) {
this.size = size;
Expand All @@ -97,7 +95,7 @@ private IOVecWrapper(int size) {
}

static IOVecWrapper get(int size) {
IOVecWrapper wrapper = JLA.getCarrierThreadLocal(cached);
IOVecWrapper wrapper = cached.get();
if (wrapper != null && wrapper.size < size) {
// not big enough; eagerly release memory
wrapper.vecArray.free();
Expand All @@ -106,7 +104,7 @@ static IOVecWrapper get(int size) {
if (wrapper == null) {
wrapper = new IOVecWrapper(size);
CleanerFactory.cleaner().register(wrapper, new Deallocator(wrapper.vecArray));
JLA.setCarrierThreadLocal(cached, wrapper);
cached.set(wrapper);
}
return wrapper;
}
Expand Down
15 changes: 6 additions & 9 deletions src/java.base/share/classes/sun/nio/ch/Util.java
Expand Up @@ -38,14 +38,11 @@
import java.util.Iterator;
import java.util.Set;

import jdk.internal.access.JavaLangAccess;
import jdk.internal.access.SharedSecrets;
import jdk.internal.misc.TerminatingThreadLocal;
import jdk.internal.misc.Unsafe;
import sun.security.action.GetPropertyAction;

public class Util {
private static JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();

// -- Caches --

Expand All @@ -55,8 +52,8 @@ public class Util {
// The max size allowed for a cached temp buffer, in bytes
private static final long MAX_CACHED_BUFFER_SIZE = getMaxCachedBufferSize();

// Per-thread cache of temporary direct buffers
private static ThreadLocal<BufferCache> bufferCache = new TerminatingThreadLocal<>() {
// Per-carrier-thread cache of temporary direct buffers
private static TerminatingThreadLocal<BufferCache> bufferCache = new TerminatingThreadLocal<>() {
@Override
protected BufferCache initialValue() {
return new BufferCache();
Expand Down Expand Up @@ -230,7 +227,7 @@ public static ByteBuffer getTemporaryDirectBuffer(int size) {
return ByteBuffer.allocateDirect(size);
}

BufferCache cache = JLA.getCarrierThreadLocal(bufferCache);
BufferCache cache = bufferCache.get();
ByteBuffer buf = cache.get(size);
if (buf != null) {
return buf;
Expand All @@ -257,7 +254,7 @@ public static ByteBuffer getTemporaryAlignedDirectBuffer(int size,
.alignedSlice(alignment);
}

BufferCache cache = JLA.getCarrierThreadLocal(bufferCache);
BufferCache cache = bufferCache.get();
ByteBuffer buf = cache.get(size);
if (buf != null) {
if (buf.alignmentOffset(0, alignment) == 0) {
Expand Down Expand Up @@ -294,7 +291,7 @@ static void offerFirstTemporaryDirectBuffer(ByteBuffer buf) {
}

assert buf != null;
BufferCache cache = JLA.getCarrierThreadLocal(bufferCache);
BufferCache cache = bufferCache.get();
if (!cache.offerFirst(buf)) {
// cache is full
free(buf);
Expand All @@ -316,7 +313,7 @@ static void offerLastTemporaryDirectBuffer(ByteBuffer buf) {
}

assert buf != null;
BufferCache cache = JLA.getCarrierThreadLocal(bufferCache);
BufferCache cache = bufferCache.get();
if (!cache.offerLast(buf)) {
// cache is full
free(buf);
Expand Down

1 comment on commit 5d304f5

@openjdk-notifier
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.