Java并发之原子变量Atomic

Java并发之原子变量Atomic

Java从JDK1.5开始提供了java.util.concurrent.atomic包,方便程序员在多线程环境下,无锁的进行原子操作。原子变量的底层使用了处理器提供的原子指令,但是不同的CPU架构可能提供的原子指令不一样,也有可能需要某种形式的内部锁,所以该方法不能绝对保证线程不被阻塞。

Atomic 类

在Atomic包里一共有12个原子类,四种原子更新方式,分别是原子更新基本类型,原子更新数组,原子更新引用和原子更新字段。Atomic包里的类基本都是使用Unsafe实现的包装类。

原子更新基本类型类

用于通过原子的方式更新基本类型,Atomic包提供了以下三个类:

  • AtomicBoolean:原子更新布尔类型。
  • AtomicInteger:原子更新整型。
  • AtomicLong:原子更新长整型。

AtomicInteger的常用方法如下:

  • int addAndGet(int delta) :以原子方式将输入的数值与实例中的值(AtomicInteger里的value)相加,并返回结果
  • boolean compareAndSet(int expect, int update) :如果输入的数值等于预期值,则以原子方式将该值设置为输入的值。
  • int getAndIncrement():以原子方式将当前值加1,注意:这里返回的是自增前的值。
  • void lazySet(int newValue):最终会设置成newValue,使用lazySet设置值后,可能导致其他线程在之后的一小段时间内还是可以读到旧的值。
  • int getAndSet(int newValue):以原子方式设置为newValue的值,并返回旧值。

Atomic包提供了三种基本类型的原子更新,但是Java的基本类型里还有char,float和double等。那么问题来了,如何原子的更新其他的基本类型呢?Atomic包里的类基本都是使用Unsafe实现的,Unsafe只提供了三种CAS方法,compareAndSwapObject,compareAndSwapInt和compareAndSwapLong,再看AtomicBoolean源码,发现其是先把Boolean转换成整型,再使用compareAndSwapInt进行CAS,所以原子更新float和double也可以用类似的思路来实现(提示:可以使用 Float.floatToIntBits 和 Float.intBitstoFloat 转换来保存浮点数,使用 Double.doubleToLongBits 和 Double.longBitsToDouble 转换来保存双精度)。

实现原理分析

AtomicInteger.incrementAndGet()方法为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
private volatile int value;

public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next)) // CAS操作
return next;
}
}
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update); // native function
}
  1. 使用了volatile 来保证多线程的可见性(cas的情况下一定只有一个线程写成功),主要是用于未内部调用unsafe类方法的原子类方法,例如get()
  2. 方法中采用了CAS操作,每次从内存中读取数据然后将此数据和+1后的结果进行CAS操作,如果成功就返回结果,否则重试直到成功为止。

原子更新数组类

通过原子的方式更新数组里的某个元素,Atomic包提供了以下三个类:

  • AtomicIntegerArray:原子更新整型数组里的元素。
  • AtomicLongArray:原子更新长整型数组里的元素。
  • AtomicReferenceArray:原子更新引用类型数组里的元素。

AtomicIntegerArray类主要是提供原子的方式更新数组里的整型,其常用方法如下:

  • int addAndGet(int i, int delta):以原子方式将输入值与数组中索引i的元素相加。
  • boolean compareAndSet(int i, int expect, int update):如果当前值等于预期值,则以原子方式将数组位置i的元素设置成update值。

AtomicIntegerArray类需要注意的是,数组value通过构造方法传递进去,然后AtomicIntegerArray会将当前数组复制一份(不会修改数组引用对象的值),所以当AtomicIntegerArray对内部的数组元素进行修改时,不会影响到传入的数组。

1
2
3
4
public AtomicIntegerArray(int[] array) {
// Visibility guaranteed by final field guarantees
this.array = array.clone();
}

使用例子:

1
2
3
4
5
6
7
8
9
10
11
public class AtomicIntegerArrayTest {
static int[] value = new int[] { 1, 2 };
static AtomicIntegerArray ai = new AtomicIntegerArray(value);
public static void main(String[] args) {
ai.getAndSet(0, 3);
System.out.println(ai.get(0));
System.out.println(value[0]);
}
}
// 3
// 1

实现原理分析

AtomicIntegerArray.set()方法为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private final int[] array;
private static final int base = unsafe.arrayBaseOffset(int[].class);
static {
int scale = unsafe.arrayIndexScale(int[].class);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
shift = 31 - Integer.numberOfLeadingZeros(scale);
}

public final void set(int i, int newValue) {
unsafe.putIntVolatile(array, checkedByteOffset(i), newValue);
}
private long checkedByteOffset(int i) {
if (i < 0 || i >= array.length)
throw new IndexOutOfBoundsException("index " + i);

return byteOffset(i);
}
private static long byteOffset(int i) {
return ((long) i << shift) + base;
}
  1. 使用了unsafe类来获取数组元素的偏移地址。
  2. 使用了unsafe.putIntVolatile(此处的Volatile是C中的关键字,和java中的关键字并不一致)来保证操作原子性。

原子更新引用类型

原子更新基本类型的AtomicInteger,只能更新一个变量,如果要原子的更新多个变量,就需要使用这个原子更新引用类型提供的类。Atomic包提供了以下三个类:

  • AtomicReference:原子更新引用类型。
  • AtomicStampedReference:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于原子的更数据和数据的版本号,可以解决使用CAS进行原子更新时,可能出现的ABA问题。
  • AtomicMarkableReference:原子更新带有标记位的引用类型。可以原子的更新一个布尔类型的标记位和引用类型。AtomicStampedReference可以知道,引用变量中途被更改了几次。有时候,我们并不关心引用变量更改了几次,只是单纯的关心是否更改过,所以就有了AtomicMarkableReference。

AtomicStampedReference将版本信息和对象存储为Pair,同时更新。

1
2
3
4
5
6
7
8
9
10
11
12
private static class Pair<T> {
final T reference;
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
private volatile Pair<V> pair;

使用例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class AtomicReferenceTest {
public static AtomicReference<user> atomicUserRef = new AtomicReference<user>();
public static void main(String[] args) {
User user = new User("conan", 15);
atomicUserRef.set(user);
User updateUser = new User("Shinichi", 17);
atomicUserRef.compareAndSet(user, updateUser);
System.out.println(atomicUserRef.get().getName());
System.out.println(atomicUserRef.get().getAge());
}
static class User {
private String name;
private int age;
public User(String name, int age) {
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public int getAge() {
return age;
}
}
}
// Shinichi
// 17

原子更新字段类

如果我们只需要某个类里的某个字段,那么就需要使用原子更新字段类,Atomic包提供了以下三个类:

  • AtomicIntegerFieldUpdater:原子更新整型的字段的更新器。
  • AtomicLongFieldUpdater:原子更新长整型字段的更新器。
  • AtomicReferenceFieldUpdater:原子更新引用类型里的字段。

原子更新字段类都是抽象类,每次使用都时候必须使用静态方法newUpdater创建一个更新器。原子更新类的字段的必须使用public volatile修饰符(多线程可见)。

AtomicIntegerFieldUpdater的例子代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class AtomicIntegerFieldUpdaterTest {
private static AtomicIntegerFieldUpdater<User> a = AtomicIntegerFieldUpdater
.newUpdater(User.class, "old");
public static void main(String[] args) {
User conan = new User("conan", 10);
System.out.println(a.getAndIncrement(conan));
System.out.println(a.get(conan));
}
public static class User {
private String name;
public volatile int old;
public User(String name, int old) {
this.name = name;
this.old = old;
}
public String getName() {
return name;
}
public int getOld() {
return old;
}
}
}
// 10
// 11

实现原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
private static final class AtomicIntegerFieldUpdaterImpl<T> extends AtomicIntegerFieldUpdater<T> {
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private final long offset;
/**
* 如果字段是protected字段, 使用子类构造更新器,否则和tclass是一致的
*/
private final Class<?> cclass;
/** class holding the field */
private final Class<T> tclass;

AtomicIntegerFieldUpdaterImpl(final Class<T> tclass, final String fieldName,final Class<?> caller) {
final Field field;
final int modifiers;
try {
field = AccessController.doPrivileged(
new PrivilegedExceptionAction<Field>() {
public Field run() throws NoSuchFieldException {
return tclass.getDeclaredField(fieldName);
}
});
modifiers = field.getModifiers();
sun.reflect.misc.ReflectUtil.ensureMemberAccess(caller, tclass, null, modifiers);
ClassLoader cl = tclass.getClassLoader();
ClassLoader ccl = caller.getClassLoader();
if ((ccl != null) && (ccl != cl) && ((cl == null) || !isAncestor(cl, ccl))) {
sun.reflect.misc.ReflectUtil.checkPackageAccess(tclass);
}
} catch (PrivilegedActionException pae) {
throw new RuntimeException(pae.getException());
} catch (Exception ex) {
throw new RuntimeException(ex);
}
if (field.getType() != int.class)
throw new IllegalArgumentException("Must be integer type");
// 此处要求了被更新字段必须是volatile 修饰
if (!Modifier.isVolatile(modifiers))
throw new IllegalArgumentException("Must be volatile type");

// protected 字段只允许子类或是同包类访问。
// 如果对protected 字段的更新器来自包外,且所在class是 tclass的子类
// 使用 caller 作为 cclass
this.cclass = (Modifier.isProtected(modifiers) &&
tclass.isAssignableFrom(caller) &&
!isSamePackage(tclass, caller))
? caller : tclass;
this.tclass = tclass;
this.offset = U.objectFieldOffset(field);
}
public final boolean compareAndSet(T obj, int expect, int update) {
accessCheck(obj);
return U.compareAndSwapInt(obj, offset, expect, update);
}
// ... ...

}

可以看到原子字段更新的原理是获取字段在对象中的偏移量,然后通过unsafe类的CAS方法进行更新。

额外提一句,在看源码的时候,你会发现一个很有意思的问题。在AtomticLong中有对平台是否支持CAS-Long的检查:

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Records whether the underlying JVM supports lockless
* compareAndSwap for longs. While the Unsafe.compareAndSwapLong
* method works in either case, some constructions should be
* handled at Java level to avoid locking user-visible locks.
*/
static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();

/**
* Returns whether underlying JVM supports lockless CompareAndSet
* for longs. Called only once and cached in VM_SUPPORTS_LONG_CAS.
*/
private static native boolean VMSupportsCS8();

这个检查结果也被用在了AtomicLongFieldUpdater的构造器中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static <U> AtomicLongFieldUpdater<U> newUpdater(Class<U> tclass,String fieldName) {
Class<?> caller = Reflection.getCallerClass();
if (AtomicLong.VM_SUPPORTS_LONG_CAS)
return new CASUpdater<U>(tclass, fieldName, caller);
else
return new LockedUpdater<U>(tclass, fieldName, caller);
}

// casUpdate
public final boolean compareAndSet(T obj, long expect, long update) {
accessCheck(obj);
// U => Unsafe instance
return U.compareAndSwapLong(obj, offset, expect, update);
}

// lockupdater
public final boolean compareAndSet(T obj, long expect, long update) {
accessCheck(obj);
synchronized (this) {
long v = U.getLong(obj, offset);
if (v != expect)
return false;
U.putLong(obj, offset, update);
return true;
}
}

但是令人疑惑的是AtomLong中却是直接使用unsafe类的compareAndSwapLong方法,为啥不一样?

在openJDk的MailList里我找到一些解释😂。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

On 01/09/13 06:04, Aleksey Shipilev wrote:

> I actually have the question about this. What is the usual pattern for
> using AtomicLong.VM_SUPPORTS_LONG_CAS? AtomicLong seems to use Unsafe
> directly without the checks. AtomicLongFieldUpdater does the checks.
> Something is fishy about this whole thing.

Here's the story: Any implementation of Long-CAS on a machine
that does not have any other way to support it is allowed to
emulate by a synchronized block on enclosing object. For the
AtomicXFieldUpdaters classes, there was, at the time they were
introduced, no way to express the object to use, so the checks
were done explicitly. I don't think this is even necessary
anymore, but doesn't hurt. Further, I'm not sure that JDK8
is even targeted to any machines that require this kind of
emulation.

-Doug

>> I have a concern that the Long versions of these methods may be used
>> directly without there being any check for supports_cx8
>
> I actually have the question about this. What is the usual pattern for
> using AtomicLong.VM_SUPPORTS_LONG_CAS? AtomicLong seems to use Unsafe
> directly without the checks. AtomicLongFieldUpdater does the checks.
> Something is fishy about this whole thing.

I had forgotten at what levels this operates too. As I think is now
clear(er) there is a Java level check (and fallback to locks) for the
AtomicLongFieldUpdater based on supports_cx8. Then there are further
checks of supports_cx8 in unsafe.cpp. Critically in
Unsafe_CompareAndSwapLong. (Still needed on some platforms)

Also note that Unsafe_get/setLongVolatile are also gated, unnecessarily,
on supports_cx8. We have to have atomic 64-bit read/write for direct
Java volatile accesses anyway. There's also an open CR to define
supports_atomic_long_ops so that supports_cx8 is only used for the CAS,
where it is needed, rather than simple read/write ops.

David

正如mail中所说,unsafe.cpp 也支持了对平台的检查。所以AtomicLong可以直接使用Unsafe的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapLong(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jlong e, jlong x))
UnsafeWrapper("Unsafe_CompareAndSwapLong");
Handle p (THREAD, JNIHandles::resolve(obj));
jlong* addr = (jlong*)(index_oop_from_field_offset_long(p(), offset));
if (VM_Version::supports_cx8())
return (jlong)(Atomic::cmpxchg(x, addr, e)) == e;
else {
jboolean success = false;
ObjectLocker ol(p, THREAD);
if (*addr == e) { *addr = x; success = true; }
return success;
}
UNSAFE_END

stripe64(累加器)

在Atomic包里除了上述的12个类,还有5个类。

Striped64

-------------本文结束感谢您的阅读-------------
坚持分享,您的支持将鼓励我继续创作!
0%