Java并发之Thread基础-1

Java Thread基础(1)

什么是线程

线程有时也被称为轻量级的进程。进程和线程都提供了一个执行环境,但创建一个新的线程比创建一个新的进程需要的资源要少。 线程是在进程中存在的,每个进程最少有一个线程。线程共享进程的资源,包括内存和打开的文件。这样提高了效率,但潜在的问题就是线程间的通信。

定义并启动一个线程

使用Thread新建线程有两种写法:

继承Thread类

1
2
3
4
5
6
7
8
Thread helloThread = new Thread() {
@Override
public void run() {
System.out.println("Hello from a thread!");
}
}

helloThread.start();

实现Runnable接口

Runnable对象仅包含一个run()方法,在这个方法中定义的代码将在会线程中执行。将Runnable对象传递给Thread类的构造函数即可。

1
2
3
4
5
6
7
8
public class HelloRunnable implements Runnable {
public void run() {
System.out.println("Hello from a thread!");
}
public static void main(String args[]) {
(new Thread(new HelloRunnable())).start();
}
}
1
2
3
4
5
6
// Thread 的 run()
public void More ...run() {
if (target != null) {
target.run();
}
}

NOTE:开启新线程需要调用Thread.start()方法,直接调用run()方法,只会在当前线程串行执行run()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class MyThread extends Thread{
public MyThread(String name) {
super(name);
}

public void run(){
System.out.println(Thread.currentThread().getName()+" is running");
}
};

public class Demo {
public static void main(String[] args) {
Thread mythread=new MyThread("mythread");

System.out.println(Thread.currentThread().getName()+" call mythread.run()");
mythread.run();

System.out.println(Thread.currentThread().getName()+" call mythread.start()");
mythread.start();
}
}
/* ------out put ---------
* main call mythread.run()
* main is running
* main call mythread.start()
* mythread is running
*/

补充说明

  1. Thread.currentThread()方法可以返回代码段正在被哪个线程调用的信息。
  2. 执行start()方法的顺序并不代表线程启动的顺序。

isAlive()方法

方法isAlive()的功能是判断当前的线程是否处于活动状态。活动状态是指线程已经启动且尚未终止。返回true表示线程还”存活”。

sleep()方法

方法sleep()的作用是在指定的毫秒数内让当前正在执行的线程暂停执行。

getId()方法

getId()方法的作用是取得线程的唯一标识。

终止线程

一般来说,线程执行完后就会结束;但是,一些后台线程可能会常驻系统,不会正常终结。如何正常关闭一个线程?JDK的Thread提供了stop()方法,但要注意的是该方法已经被弃用。

stop()的不安全性

  1. stop()强行终止线程导致数据不一致
1
2
3
4
5
6
7
8
9
10
11
12
13
class User{
private int id;
private String name;
public User() {
this.id = 0;
this.name = "0";
}
// 省略 setter and getter
@Override
public String toString() {
return "user -> [ id = "+id+", name = "+name+" ]";
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
    Thread thread = new Thread(){
@Override
public void run(){
while(true) {
int tmp = (int) System.currentTimeMillis() / 1000;
user.setId(tmp);
try {
Thread.sleep(99);
} catch (InterruptedException e) {
e.printStackTrace();
}
user.setName(String.valueOf(tmp));
}
}
};
thread.start();
Thread.sleep(100);
thread.stop();
System.out.println(user);
//-----------------------
//user -> [ id = -1064621, name = 0 ]

显然,此处的id与name并不相同,stop方法强行终止了setName方法的执行。

  1. stop()直接释放锁,破坏对象一致性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class StopThread {

public static User user = new User();

public static class Reader implements Runnable{
public void run() {
while(true) {
synchronized (StopThread.class) {
if (user.getId()!= Integer.parseInt(user.getName())) {
System.out.println(user);
}
}
Thread.yield();
}
}
}

public static class Writer implements Runnable{
public void run() {
while (true){
synchronized (StopThread.class){
int tmp = (int) System.currentTimeMillis()/1000;
user.setId(tmp);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
user.setName(String.valueOf(tmp));
}
Thread.yield();
}
}
}

public static void main(String[] args) throws InterruptedException {
new Thread(new Reader()).start();
while(true) {
Thread thread = new Thread(new Writer());
thread.start();
Thread.sleep(150);
thread.stop();
}
}
}

Thread.stop()在结束线程时,会直接终止线程,立即释放线程所持有的锁。假设当前写入线程写入数据进行到一半,并强行终止,对象数据不一致;又因为锁已经被释放,读进程将读入不一致对象。

1
2
user -> [ id = -1063553, name = -1063554 ]
user -> [ id = -1063552, name = -1063553 ]

总结下,Thread.stop()的使用会带来数据不一致的问题,所以尽量避免使用该方法。大多数stop的使用,应当被替换为简单修改某些变量来指示其目标线程将停止运行的代码(共享变量)。目标线程应当有规律的检查这些变量。并且,如果这些变量指示其将停止运行,目标线程应当以某种有序的方式从它的run方法返回。为了确保停止请求的及时传达,变量必须是 volatile 的(或者变量的访问被同步)。

下面是一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private volatile Thread blinker;
public void stop() {
blinker = null;
}
public void run() {
Thread thisThread = Thread.currentThread();
while (blinker == thisThread) {
try {
thisThread.sleep(interval);
} catch (InterruptedException e){
}
repaint();
}
}

线程中断

中断是给线程的一个指示,告诉它应该停止正在做的事并去做其他事情。一个线程究竟要怎么响应中断请求取决于程序员。一个线程通过调用对被中断线程的Thread对象的interrupt()方法,发送中断信号。为了让中断机制正常工作,被中断的线程必须支持它自己的中断(即要自己处理中断)。

interrupt()方法

中断的相关方法有3个:

1
2
3
public void Thread.interrupt()              // 中断线程
public boolean Thread.isInterrupted() // 判断是否中断
public static boolean Thread.interrupted() // 判断是否中断,并清除当前中断状态
Thread.interrupt()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
// 此处的interrupt0 是native方法
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);
return;
}
}
interrupt0();
}

中断本线程。无返回值。具体作用分以下几种情况:

  • 如果该线程正阻塞于Object类的wait()wait(long)wait(long, int)方法,或者Thread类的join()、join(long)、join(long, int)sleep(long)sleep(long, int)方法,则该线程的中断状态将被清除,并收到一个java.lang.InterruptedException
  • 如果该线程正阻塞于interruptible channel上的I/O操作,则该通道将被关闭,同时该线程的中断状态被设置,并收到一个java.nio.channels.ClosedByInterruptException
    如果该线程正阻塞于一个java.nio.channels.Selector操作,则该线程的中断状态被设置,它将立即从选择操作返回,并可能带有一个非零值,就好像调用java.nio.channels.Selector.wakeup()方法一样。
  • 如果上述条件都不成立,则该线程的中断状态将被设置。中断一个不处于活动状态的线程不会有任何作用。如果是其他线程在中断该线程,则java.lang.Thread.checkAccess()方法就会被调用,这可能抛出java.lang.SecurityException
Thread.isInterrupted
1
2
3
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}

检测当前线程是否已经中断,是则返回true,否则false,并清除中断状态。换言之,如果该方法被连续调用两次,第二次必将返回false,除非在第一次与第二次的瞬间线程再次被中断。如果中断调用时线程已经不处于活动状态,则返回false。

Thread.interrupted()
1
2
3
public boolean isInterrupted() {
return isInterrupted(false);
}

检测当前线程是否已经中断,是则返回true,否则false。中断状态不受该方法的影响。如果中断调用时线程已经不处于活动状态,则返回false。

interrupted()与isInterrupted()的唯一区别是,前者会读取并清除中断状态,后者仅读取状态。
在hotspot源码中,两者均通过调用的native方法isInterrupted(boolean)来实现,区别是参数值ClearInterrupted不同。
private native boolean isInterrupted(boolean ClearInterrupted);

使用中断

注意,Thread.interrupt()方法只是修改了当前线程的中断状态,为使中断生效,我们需要自己实现处理中断的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class InterruptThread {

public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(){
@Override
public void run() {
while(true) {
// 此处处理中断
if(Thread.currentThread().isInterrupted()){
System.out.println("Thread is Interrupted!");
break;
// 如果while 下面还有代码,可以使用 return
}
System.out.println("Thread is Running!");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
Thread.yield();
}
}
};

thread.start();
Thread.sleep(5000);
System.out.println("interrupt Thread!");
thread.interrupt();
}
}

当线程阻塞于wait/join/sleep时,中断状态会被清除掉,同时收到InterruptedException。如果不加处理,下次循环开始时,就无法捕获这个中断。因此,在编写多线程代码的时候,任何时候捕获到InterruptedException,要么继续上抛,要么重置中断状态,这是最安全的做法。

  • 摘自《Java并发编程实战》第5章 基础构建模块 5.4 阻塞方法与中断方法

当某个方法抛出InterruptedException时,表示该方法是一个阻塞方法。当在代码中调用一个将抛出InterruptedException异常的方法时,你自己的方法也就变成了一个阻塞方法,并且必须要处理对中断的相应。对于库代码来说,有两种选择:

  1. 传递InterruptedException。这是最明智的策略,将异常传递给方法的调用者。
  2. 恢复中断。在不能上抛的情况下,如Runnable方法,必须捕获InterruptedException,并通过当前线程的interrupt()方法恢复中断状态,这样在调用栈中更高层的代码将看到引发了一个中断。如下代码是模板:
1
2
3
4
5
6
7
8
public void run() {
try {
// 1.调用阻塞方法
} catch (InterruptedException e) {
// important
Thread.currentThread().interrupt(); // 2.恢复被中断的状态
}
}

等待(wait) && 通知(notify)

JDK在java.lang.Object中提供了两个接口wait()notify():

1
2
3
4
5
6
7
8
public final void wait() throws InterruptedException {
wait(0);
}
public final native void wait(long timeout) throws InterruptedException;
public final void wait(long timeout, int nanos) throws InterruptedException;

public final native void notify();
public final native void notifyAll();

如果一个线程调用了object.wait(),它就会进入object对象的等待队列。当object.notify() 被调用时,会从等待队列中随机选择一个线程,将其唤醒。注意,这里的选择是不公平的,完全随机。除了notify()方法外,notifyAll()会唤醒等待队列中所有的等待线程。

注意,wait()notify()/notifyAll()必须在synchronized同步代码块中使用。

执行过程

  1. 当线程T1执行wait()时,会把当前的锁S1释放,然后让出CPU,进入等待状态。
  2. 当获得锁的线程T2执行notify/notifyAlls方法时,会唤醒一个处于等待该对象锁状态的的线程T1,然后继续往下执行,直到执行完退出对象锁锁住的区域(synchronized修饰的代码块)后再释放锁。
  3. T1被唤醒后,会尝试获得锁S1,当顺利获得锁S1后,才能真正继续执行下去。

从这里可以看出,notify/notifyAll()执行后,并不立即释放锁,而是要等到执行完临界区中代码后,再释放。故,在实际编程中,我们应该尽量在线程调用notify/notifyAll()后,立即退出临界区。即不要在notify/notifyAll()后面再写一些耗时的代码。

Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class waitandnotify {
private final Object lock = new Object();
public class T1 extends Thread{
@Override
public void run(){
System.out.println(System.currentTimeMillis()+ "-> T1 start() ");
synchronized (lock){
System.out.println(System.currentTimeMillis()+ "-> T1 begin wait() ");
try {
lock.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println(System.currentTimeMillis()+ "-> T1 end wait() ");
}
}
}
public class T2 extends Thread{
@Override
public void run(){
System.out.println(System.currentTimeMillis()+ "-> T2 start() ");
synchronized (lock){
System.out.println(System.currentTimeMillis()+ "-> T2 begin notify() ");
lock.notify();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println(System.currentTimeMillis()+ "-> T2 end notify() ");
}
}
}

public static void main(String[] args){
waitandnotify client = new waitandnotify();
Thread t1 = client.new T1();
Thread t2 = client.new T2();
t1.start();
t2.start();
}
}
/*1502699128356-> T1 start()
*1502699128356-> T1 begin wait()
*1502699128356-> T2 start()
*1502699128356-> T2 begin notify()
*1502699133357-> T2 end notify()
*1502699133357-> T1 end wait()
*/

显然,从时间戳可以看出T2通知T1继续执行后,T1并不能继续执行,而是等待T2释放object的锁,并重新获得锁后才继续执行。

Object.wait()Thread.sleep()都可以让线程等待若干分钟。除了wait方法可以被唤醒之外,另一个主要区别是wait方法会释放目标对象的锁,而sleep方法不会释放资源。当然,Object.wait()只是把当前线程占用的目标对象的锁释放,其他占用的对象的锁状态不会发生变化。

挂起(suspend)和继续执行(resume)线程

简单使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class suspend_resume {

public static class ChangeThread1 extends Thread{
private int i= 0;
public void run() {
while (true) {
System.out.println(System.currentTimeMillis() + "-> Thread ::" + ++i);
}
}
}
public static void main(String[] args) throws InterruptedException {

ChangeThread1 thread1 = new ChangeThread1();
thread1.start();
Thread.sleep(100);
System.out.println(System.currentTimeMillis()+"------begin-----suspend ");
thread1.suspend();
Thread.sleep(10);
thread1.resume();
System.out.println(System.currentTimeMillis()+"-------end-----resume ");
thread1.suspend();
}
}

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
1502791453031-> Thread ::11696
1502791453031-> Thread ::11697
1502791453031-> Thread ::11698
1502791453031-> Thread ::11699
1502791453031-> Thread ::11700
1502791453031------begin-----suspend
1502791453031-> Thread ::11701
1502791453031-> Thread ::11702
1502791453031-> Thread ::11703
1502791453031-> Thread ::11704
1502791453031-> Thread ::11705
1502791453042-> Thread ::11706 // 此处可从时间戳看出暂停了10 millis
1502791453042-------end-----resume

注意事项

suspend和resume这对API已经被标注为弃用。不推荐使用suspend去挂起线程是因为suspend方法在导致线程暂停的同时,并不会释放任何资源。其他线程无妨访问被占用的锁,直到对应的线程进行了resume操作,被挂起的线程才能继续。其他阻塞在相关锁的线程也可以继续执行。

但是如果resume()操作意外地在suspend()方法前执行,那么被挂起的线程很难继续被执行,更严重的情况是,挂起线程如果占据锁,那么这个锁将一直得不到释放,从而导致整个系统不正常工作。下面就是一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class suspend_resume_dead_lock {

public static Object lock = new Object();

public static class ChangeObjectThread extends Thread{
public ChangeObjectThread(String name) {
super(name);
}

@Override
public void run() {
System.out.println(System.currentTimeMillis()+" -> "+this.getName()+"======= begin ======");
synchronized (lock){
System.out.println(System.currentTimeMillis()+" -> "+this.getName()+"+++++++ enter lock ++++++");
Thread.currentThread().suspend();
}
}
}

public static void main(String[] args) throws InterruptedException {
ChangeObjectThread thread1 = new ChangeObjectThread("thread1");
thread1.start();
Thread.sleep(100);
System.out.println(System.currentTimeMillis()+" -> ######### sleep 100 ##########");
ChangeObjectThread thread2 = new ChangeObjectThread("thread2");
thread2.start();
Thread.sleep(100);
System.out.println(System.currentTimeMillis()+" -> ######### sleep 100 ##########");
thread1.resume();
thread2.resume();
}
}
/** output:
* 1504072559572 -> thread1======= begin ======
* 1504072559573 -> thread1+++++++ enter lock ++++++
* 1504072559672 -> ######### sleep 100 ##########
* 1504072559672 -> thread2======= begin ======
* 1504072559772 -> ######### sleep 100 ##########
* 1504072559772 -> thread2+++++++ enter lock ++++++
* 此时没有退出!!!
*/

由输出的时间戳可以发现thread1调用suspend()后一直占据锁,直到执行resume()后释放占用的锁。值得注意的是,thread2.resume()并没有生效,程序最后并没有退出。下面是对于没有正常退出的分析:

1
2
3
4
5
6
7
8
9
10
11
// 查看 java pid
$ jps
8256 suspend_resume_dead_lock
// 查看线程信息
$ jstack 8256
"thread2" #13 prio=5 os_prio=0 tid=0x000000001a4a0800 nid=0x5278 runnable [0x000000001c14e000]
java.lang.Thread.State: RUNNABLE
at java.lang.Thread.suspend0(Native Method)
at java.lang.Thread.suspend(Thread.java:1032)
at suspend_resume_dead_lock$ChangeObjectThread.run(suspend_resume_dead_lock.java:18)
- locked <0x00000000d609c8b8> (a java.lang.Object)

线程信息中thread2的状态是java.lang.Thread.State: RUNNABLE,这是因为时间先后顺序的问题,主线程中的thread2.resume()并未生效,从而导致thread2一直挂起,并占用锁。危险!!!

等待线程结束(join)和谦让(yield)

join

在存在多线程的应用中,考虑下面这种情况:一个线程的输入,依赖于其他线程的执行完成才能继续执行。

JDK提供了join()方法来实现这一操作。

1
2
3
4
5
public final void More ...join() throws InterruptedException {
join(0);
}
public final synchronized void join(long millis, int nanos) throws InterruptedException;
public final synchronized void join(long millis) throws InterruptedException;

第一个无参join方法表示无限等待,会一直阻塞当前线程,直到目标线程对象执行完毕。第二个和第三个方法给出了最大等待时间,如果超过最大时间,目标线程还在执行,当前线程会停止等待继续执行。下面是一个使用例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class SimpleJoinDemo {
public volatile static int count =0;
public static class IncreaseCountThread extends Thread{
@Override
public void run() {
for (int i =0;i<1000000000;i++){
count ++;
}
}
}

public static void main(String[] args) throws InterruptedException {
Thread increaseCountThread = new IncreaseCountThread();
increaseCountThread.start();
increaseCountThread.join();
System.out.println("field count is: "+String.valueOf(count));
}
}
/*
* field count is: 1000000000
*/

join()方法的本质是让调用线程(当前执行线程)wait在被调用线程实例上。join的核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public final synchronized void More ...join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;

if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}

if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}

可以看到join方法的底层实现是wait()方法,当线程执行完后,会在结束前调用notifyAll()通知所有的等待线程继续。因此,值得注意的是,不要在Thread对象实例中使用wait()notify()|notifyAll()方法。这可能会影响系统API。

yield

yield是一个静态的本地方法,作用是使当前线程让出cpu资源,重新开始cpu资源的竞争。所以有可能刚刚让出cpu资源,又竞争成功重新获取cpu资源。

1
public static native void yield();
-------------本文结束感谢您的阅读-------------
坚持分享,您的支持将鼓励我继续创作!
0%