线程2

Posted by Solace Blog on March 20, 2019

线程 - 并发容器

马士兵老师线程示例代码;concurrent文件为源码

ConcurrentLinkedQueue

/**
 * 有N张火车票,每张票都有一个编号
 * 同时有10个窗口对外售票
 * 请写一个模拟程序
 * 
 * 分析下面的程序可能会产生哪些问题?
 * 重复销售?超量销售?
 * 
 * 使用Vector或者Collections.synchronizedXXX
 * 分析一下,这样能解决问题吗?
 * 
 * 就算操作A和B都是同步的,但A和B组成的复合操作也未必是同步的,仍然需要自己进行同步
 * 就像这个程序,判断size和进行remove必须是一整个的原子操作
 * 
 * 使用ConcurrentQueue提高并发性
 */
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;

public class TicketSeller4 {
  static Queue<String> tickets = new ConcurrentLinkedQueue<>();
  
  
  static {
    for(int i=0; i<1000; i++) tickets.add("票 编号:" + i);
  }
  
  public static void main(String[] args) {
    
    for(int i=0; i<10; i++) {
      new Thread(()->{
        while(true) {
          String s = tickets.poll();// poll是原子性操作,即使下面的判断和remove被其他线程打断
          if(s == null) break;		// 当是最后一张票被卖出后,顶多再去取到的东西是null
          else System.out.println("销售了--" + s);
        }
      }).start();
    }
  }
}

生产者消费者问题

/**
 * 面试题:写一个固定容量同步容器,拥有put和get方法,以及getCount方法,
 * 能够支持2个生产者线程以及10个消费者线程的阻塞调用
 * 
 * 使用wait和notify/notifyAll来实现
 * 
 * @author mashibing
 */
package yxxy.c_021;

import java.util.LinkedList;
import java.util.concurrent.TimeUnit;

public class MyContainer1<T> {
  final private LinkedList<T> lists = new LinkedList<>();
  final private int MAX = 10; //最多10个元素
  private int count = 0;
  
  
  public synchronized void put(T t) {
    while(lists.size() == MAX) { //想想为什么用while而不是用if?
      try {
        this.wait(); //effective java
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    
    lists.add(t);
    ++count;
    this.notifyAll(); //通知消费者线程进行消费
  }
  
  public synchronized T get() {
    T t = null;
    while(lists.size() == 0) {
      try {
        this.wait();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    t = lists.removeFirst();
    count --;
    this.notifyAll(); //通知生产者进行生产
    return t;
  }
  
  public static void main(String[] args) {
    MyContainer1<String> c = new MyContainer1<>();
    //启动消费者线程
    for(int i=0; i<10; i++) {
      new Thread(()->{
        for(int j=0; j<5; j++) System.out.println(c.get());
      }, "c" + i).start();
    }
    
    try {
      TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    
    //启动生产者线程
    for(int i=0; i<2; i++) {
      new Thread(()->{
        for(int j=0; j<25; j++) c.put(Thread.currentThread().getName() + " " + j);
      }, "p" + i).start();
    }
  }
}

线程里面运行的task任务分两种接口

  • Runnable:java.lang包下面,JDK1.0出现,void run()无返回值不抛出异常

  • Callable:java.util.concurrent包下面,JDK1.5出现,V call() throws Exception有返回值且会抛出异常

java.uti.concurrent包下面有很多线程池的接口和类

  • Executor:接口,执行器,void execute(Runnable command)用来执行一个任务

    public class MyExecutor implements Executor {
      
      public static void main(String[] args) {
        new MyExecutor().execute(() -> System.out.println("hello executor"));
      }
      
      @Override
      public void execute(Runnable command) {
        command.run();
      }
    }
    
  • ExecutorService:接口,继承自Executor接口,Future<?> submit(Runnable task)方法用来执行不同类别的任务

    ExecutorService service = Executors.newFixedThreadPool(5);
    for (int i = 0; i < 6; i++) {
      service.execute(() -> System.out.println(Thread.currentThread().getName()));
    }
    
  • Future:接口,用来定义线程计算结束的结果,使用阻塞方法public V get()来获取指定线程执中任务的结果

    ExecutorService service = Executors.newFixedThreadPool(5);
    Future<Integer> f = service.submit(() -> 1);
      
    System.out.println(f.get()); // 1
    

    当使用多个线程执行多个任务时,可将所有的Future执行结果放入List,最后遍历List来处理结果

    // 创建一个包含四个线程的线程池
    ExecutorService service = Executors.newFixedThreadPool(4);
    // 创建一个List存放所有线程运行任务的结果
    ArrayList<Future<String>> futures = new ArrayList<>();
      
    for (int i = 0; i < 4; i++) {
        // 将所有线程的执行结果放入futures中
        // 之所以使用submit而不使用execute,因为submit有返回值
        futures.add(service.submit(() -> Thread.currentThread().getName()));
    }
      
    // 遍历futures处理所有结果
    for(Future<String> future : futures) {
        System.out.println(future.get());
    }
    

ThreadPool

  • 创建固定数量的线程池:ExecutorService service = Executors.newFixedThreadPool(5);

  • 创建数量不固定的线程池:ExecutorService service = Executors.newCachedThreadPool();,每个线程的默认存活时间是60s

  • 创建一个线程池:ExecutorService service = Executors.newSingleThreadExecutor();,保证任务按顺序执行

  • 创建定时执行的线程池:

    ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
    service.scheduleAtFixedRate(()->{
      try {
        TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      System.out.println(Thread.currentThread().getName());
    }, 0, 500, TimeUnit.MILLISECONDS); // 将任务从启动后每隔500毫秒执行一次
    
  • 窃取线程池:ExecutorService service = Executors.newWorkStealingPool();

  • ForkJoinPool fjp = new ForkJoinPool();