线程 - 并发容器
马士兵老师线程示例代码;concurrent文件为源码
ConcurrentLinkedQueue
/**
* 有N张火车票,每张票都有一个编号
* 同时有10个窗口对外售票
* 请写一个模拟程序
*
* 分析下面的程序可能会产生哪些问题?
* 重复销售?超量销售?
*
* 使用Vector或者Collections.synchronizedXXX
* 分析一下,这样能解决问题吗?
*
* 就算操作A和B都是同步的,但A和B组成的复合操作也未必是同步的,仍然需要自己进行同步
* 就像这个程序,判断size和进行remove必须是一整个的原子操作
*
* 使用ConcurrentQueue提高并发性
*/
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
public class TicketSeller4 {
static Queue<String> tickets = new ConcurrentLinkedQueue<>();
static {
for(int i=0; i<1000; i++) tickets.add("票 编号:" + i);
}
public static void main(String[] args) {
for(int i=0; i<10; i++) {
new Thread(()->{
while(true) {
String s = tickets.poll();// poll是原子性操作,即使下面的判断和remove被其他线程打断
if(s == null) break; // 当是最后一张票被卖出后,顶多再去取到的东西是null
else System.out.println("销售了--" + s);
}
}).start();
}
}
}
生产者消费者问题
/**
* 面试题:写一个固定容量同步容器,拥有put和get方法,以及getCount方法,
* 能够支持2个生产者线程以及10个消费者线程的阻塞调用
*
* 使用wait和notify/notifyAll来实现
*
* @author mashibing
*/
package yxxy.c_021;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
public class MyContainer1<T> {
final private LinkedList<T> lists = new LinkedList<>();
final private int MAX = 10; //最多10个元素
private int count = 0;
public synchronized void put(T t) {
while(lists.size() == MAX) { //想想为什么用while而不是用if?
try {
this.wait(); //effective java
} catch (InterruptedException e) {
e.printStackTrace();
}
}
lists.add(t);
++count;
this.notifyAll(); //通知消费者线程进行消费
}
public synchronized T get() {
T t = null;
while(lists.size() == 0) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
t = lists.removeFirst();
count --;
this.notifyAll(); //通知生产者进行生产
return t;
}
public static void main(String[] args) {
MyContainer1<String> c = new MyContainer1<>();
//启动消费者线程
for(int i=0; i<10; i++) {
new Thread(()->{
for(int j=0; j<5; j++) System.out.println(c.get());
}, "c" + i).start();
}
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//启动生产者线程
for(int i=0; i<2; i++) {
new Thread(()->{
for(int j=0; j<25; j++) c.put(Thread.currentThread().getName() + " " + j);
}, "p" + i).start();
}
}
}
线程里面运行的task任务分两种接口
-
Runnable:java.lang包下面,JDK1.0出现,
void run()
无返回值不抛出异常 -
Callable:java.util.concurrent包下面,JDK1.5出现,
V call() throws Exception
有返回值且会抛出异常
java.uti.concurrent包下面有很多线程池的接口和类
-
Executor:接口,执行器,
void execute(Runnable command)
用来执行一个任务public class MyExecutor implements Executor { public static void main(String[] args) { new MyExecutor().execute(() -> System.out.println("hello executor")); } @Override public void execute(Runnable command) { command.run(); } }
-
ExecutorService:接口,继承自Executor接口,
Future<?> submit(Runnable task)
方法用来执行不同类别的任务ExecutorService service = Executors.newFixedThreadPool(5); for (int i = 0; i < 6; i++) { service.execute(() -> System.out.println(Thread.currentThread().getName())); }
-
Future:接口,用来定义线程计算结束的结果,使用阻塞方法
public V get()
来获取指定线程执中任务的结果ExecutorService service = Executors.newFixedThreadPool(5); Future<Integer> f = service.submit(() -> 1); System.out.println(f.get()); // 1
当使用多个线程执行多个任务时,可将所有的Future执行结果放入List,最后遍历List来处理结果
// 创建一个包含四个线程的线程池 ExecutorService service = Executors.newFixedThreadPool(4); // 创建一个List存放所有线程运行任务的结果 ArrayList<Future<String>> futures = new ArrayList<>(); for (int i = 0; i < 4; i++) { // 将所有线程的执行结果放入futures中 // 之所以使用submit而不使用execute,因为submit有返回值 futures.add(service.submit(() -> Thread.currentThread().getName())); } // 遍历futures处理所有结果 for(Future<String> future : futures) { System.out.println(future.get()); }
ThreadPool
-
创建固定数量的线程池:
ExecutorService service = Executors.newFixedThreadPool(5);
-
创建数量不固定的线程池:
ExecutorService service = Executors.newCachedThreadPool();
,每个线程的默认存活时间是60s -
创建一个线程池:
ExecutorService service = Executors.newSingleThreadExecutor();
,保证任务按顺序执行 -
创建定时执行的线程池:
ScheduledExecutorService service = Executors.newScheduledThreadPool(4); service.scheduleAtFixedRate(()->{ try { TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); }, 0, 500, TimeUnit.MILLISECONDS); // 将任务从启动后每隔500毫秒执行一次
-
窃取线程池:
ExecutorService service = Executors.newWorkStealingPool();
-
ForkJoinPool fjp = new ForkJoinPool();