① java实现通用线程池
线程池通俗的描述就是预先创建若干空闲线程 等到需要用多线程去处理事务的时候去唤醒某些空闲线程执行处理任务 这样就省去了频繁创建线程的时间 因为频 繁创建线程是要耗费大量的CPU资源的 如果一个应用程序需要频繁地处理大量并发事务 不断的创建销毁线程往往会大大地降低系统的效率 这时候线程池就派 上用场了
本文旨在使用Java语言编写一个通用的线程池 当需要使用线程池处理事务时 只需按照指定规范封装好事务处理对象 然后用已有的线程池对象去自动选择空 闲线程自动调用事务处理对象即可 并实现线程池的动态修改(修改当前线程数 最大线程数等) 下面是实现代码
//ThreadTask java
package polarman threadpool;
/** *//**
*线程任务
* @author ryang
*
*/
public interface ThreadTask {
public void run();
}
//PooledThread java
package polarman threadpool;
import java util Collection; import java util Vector;
/** *//**
*接受线程池管理的线程
* @author ryang
*
*/
public class PooledThread extends Thread {
protected Vector tasks = new Vector();
protected boolean running = false;
protected boolean stopped = false;
protected boolean paused = false;
protected boolean killed = false;
private ThreadPool pool;
public PooledThread(ThreadPool pool) { this pool = pool;
}
public void putTask(ThreadTask task) { tasks add(task);
}
public void putTasks(ThreadTask[] tasks) { for(int i= ; i<tasks length; i++) this tasks add(tasks[i]);
}
public void putTasks(Collection tasks) { this tasks addAll(tasks);
}
protected ThreadTask popTask() { if(tasks size() > ) return (ThreadTask)tasks remove( );
else
return null;
}
public boolean isRunning() {
return running;
}
public void stopTasks() {
stopped = true;
}
public void stopTasksSync() {
stopTasks();
while(isRunning()) { try {
sleep( );
} catch (InterruptedException e) {
}
}
}
public void pauseTasks() {
paused = true;
}
public void pauseTasksSync() {
pauseTasks();
while(isRunning()) { try {
sleep( );
} catch (InterruptedException e) {
}
}
}
public void kill() { if(!running)
interrupt();
else
killed = true;
}
public void killSync() {
kill();
while(isAlive()) { try {
sleep( );
} catch (InterruptedException e) {
}
}
}
public synchronized void startTasks() {
running = true;
this notify();
}
public synchronized void run() { try { while(true) { if(!running || tasks size() == ) { pool notifyForIdleThread(); //System out println(Thread currentThread() getId() + : 空闲 ); this wait(); }else {
ThreadTask task;
while((task = popTask()) != null) { task run(); if(stopped) {
stopped = false;
if(tasks size() > ) { tasks clear(); System out println(Thread currentThread() getId() + : Tasks are stopped );
break;
}
}
if(paused) {
paused = false;
if(tasks size() > ) { System out println(Thread currentThread() getId() + : Tasks are paused );
break;
}
}
}
running = false;
}
if(killed) {
killed = false;
break;
}
}
}catch(InterruptedException e) {
return;
}
//System out println(Thread currentThread() getId() + : Killed );
}
}
//ThreadPool java
package polarman threadpool;
import java util Collection; import java util Iterator; import java util Vector;
/** *//**
*线程池
* @author ryang
*
*/
public class ThreadPool {
protected int maxPoolSize;
protected int initPoolSize;
protected Vector threads = new Vector();
protected boolean initialized = false;
protected boolean hasIdleThread = false;
public ThreadPool(int maxPoolSize int initPoolSize) { this maxPoolSize = maxPoolSize; this initPoolSize = initPoolSize;
}
public void init() {
initialized = true;
for(int i= ; i<initPoolSize; i++) {
PooledThread thread = new PooledThread(this);
thread start(); threads add(thread);
}
//System out println( 线程池初始化结束 线程数= + threads size() + 最大线程数= + maxPoolSize);
}
public void setMaxPoolSize(int maxPoolSize) { //System out println( 重设最大线程数 最大线程数= + maxPoolSize); this maxPoolSize = maxPoolSize;
if(maxPoolSize < getPoolSize())
setPoolSize(maxPoolSize);
}
/** *//**
*重设当前线程数
* 若需杀掉某线程 线程不会立刻杀掉 而会等到线程中的事务处理完成* 但此方法会立刻从线程池中移除该线程 不会等待事务处理结束
* @param size
*/
public void setPoolSize(int size) { if(!initialized) {
initPoolSize = size;
return;
}else if(size > getPoolSize()) { for(int i=getPoolSize(); i<size && i<maxPoolSize; i++) {
PooledThread thread = new PooledThread(this);
thread start(); threads add(thread);
}
}else if(size < getPoolSize()) { while(getPoolSize() > size) { PooledThread th = (PooledThread)threads remove( ); th kill();
}
}
//System out println( 重设线程数 线程数= + threads size());
}
public int getPoolSize() { return threads size();
}
protected void notifyForIdleThread() {
hasIdleThread = true;
}
protected boolean waitForIdleThread() {
hasIdleThread = false;
while(!hasIdleThread && getPoolSize() >= maxPoolSize) { try { Thread sleep( ); } catch (InterruptedException e) {
return false;
}
}
return true;
}
public synchronized PooledThread getIdleThread() { while(true) { for(Iterator itr=erator(); itr hasNext();) { PooledThread th = (PooledThread)itr next(); if(!th isRunning())
return th;
}
if(getPoolSize() < maxPoolSize) {
PooledThread thread = new PooledThread(this);
thread start(); threads add(thread);
return thread;
}
//System out println( 线程池已满 等待 );
if(waitForIdleThread() == false)
return null;
}
}
public void processTask(ThreadTask task) {
PooledThread th = getIdleThread();
if(th != null) { th putTask(task); th startTasks();
}
}
public void processTasksInSingleThread(ThreadTask[] tasks) {
PooledThread th = getIdleThread();
if(th != null) { th putTasks(tasks); th startTasks();
}
}
public void processTasksInSingleThread(Collection tasks) {
PooledThread th = getIdleThread();
if(th != null) { th putTasks(tasks); th startTasks();
}
}
}
下面是线程池的测试程序
//ThreadPoolTest java
import java io BufferedReader; import java io IOException; import java io InputStreamReader;
import polarman threadpool ThreadPool; import polarman threadpool ThreadTask;
public class ThreadPoolTest {
public static void main(String[] args) { System out println( quit 退出 ); System out println( task A 启动任务A 时长为 秒 ); System out println( size 设置当前线程池大小为 ); System out println( max 设置线程池最大线程数为 ); System out println();
final ThreadPool pool = new ThreadPool( ); pool init();
Thread cmdThread = new Thread() { public void run() {
BufferedReader reader = new BufferedReader(new InputStreamReader(System in));
while(true) { try { String line = reader readLine(); String words[] = line split( ); if(words[ ] equalsIgnoreCase( quit )) { System exit( ); }else if(words[ ] equalsIgnoreCase( size ) && words length >= ) { try { int size = Integer parseInt(words[ ]); pool setPoolSize(size); }catch(Exception e) {
}
}else if(words[ ] equalsIgnoreCase( max ) && words length >= ) { try { int max = Integer parseInt(words[ ]); pool setMaxPoolSize(max); }catch(Exception e) {
}
}else if(words[ ] equalsIgnoreCase( task ) && words length >= ) { try { int timelen = Integer parseInt(words[ ]); SimpleTask task = new SimpleTask(words[ ] timelen * ); pool processTask(task); }catch(Exception e) {
}
}
} catch (IOException e) { e printStackTrace();
}
}
}
};
cmdThread start();
/**//*
for(int i= ; i< ; i++){
SimpleTask task = new SimpleTask( Task + i (i+ )* ); pool processTask(task);
}*/
}
}
class SimpleTask implements ThreadTask {
private String taskName;
private int timeLen;
public SimpleTask(String taskName int timeLen) { this taskName = taskName; this timeLen = timeLen;
}
public void run() { System out println(Thread currentThread() getId() +
: START TASK + taskName + );
try { Thread sleep(timeLen); } catch (InterruptedException e) {
}
System out println(Thread currentThread() getId() +
: END TASK + taskName + );
}
}
使用此线程池相当简单 下面两行代码初始化线程池
ThreadPool pool = new ThreadPool( ); pool init();
要处理的任务实现ThreadTask 接口即可(如测试代码里的SimpleTask) 这个接口只有一个方法run()
两行代码即可调用
lishixin/Article/program/Java/hx/201311/27203
② 什么是java线程池
找的资料,你看一下吧:x0dx0a多线程技术主要解决处理器单元内多个线程执行的问题,裤哗它可以显着减让汪少处理器单元的闲置时间,增加处理器单元的吞吐能力。x0dx0a x0dx0a 假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。x0dx0a x0dx0a 如果:T1 + T3 远大于 T2,则可以采用线程池,以提高坦纯仔服务器性能。x0dx0a 一个线程池包括以下四个基本组成部分:x0dx0a 1、线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;x0dx0a 2、工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;x0dx0a 3、任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;x0dx0a 4、任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。x0dx0a x0dx0a 线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。x0dx0ax0dx0a 线程池不仅调整T1,T3产生的时间段,而且它还显着减少了创建线程的数目,看一个例子:x0dx0ax0dx0a 假设一个服务器一天要处理50000个请求,并且每个请求需要一个单独的线程完成。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目,而如果服务器不利用线程池来处理这些请求则线程总数为50000。一般线程池大小是远小于50000。所以利用线程池的服务器程序不会为了创建50000而在处理请求时浪费时间,从而提高效率。
③ 让ThreadPoolExecutor无所遁形:Java线程池运行原理详解
深入探讨ThreadPoolExecutor的核心工作原理,我们了解到这个Java类在并发和多线程讨论中不可或缺。它管理线程池内的线程,包括创建、执行、管理及监控。理解其如何确保线程池正确运作至关重要。以下解析ThreadPoolExecutor的关键元素:
线程池状态和控制流程概述:ThreadPoolExecutor维护一个ctl的原子整型变量,其高位存储线程池状态,低位存储线程数量,通过内部状态控制管理线程池。
ctl的位字段表示方法:定义为高位存储状态,低位存储线程数量,确保线程池状态准确反映生命周期。
状态转换及控制:通过ctl变量控制,提供辅助函数管理状态,如获取状态和线程数量,确保线程池运行流程协调。
ThreadPoolExecutor的属性详解:包括核心线程数、最大线程数、任务队列、keepAliveTime、ThreadFactory及拒绝策略等,共同决定线程池行为与性能。
execute方法的任务提交流程:接收任务,根据线程数量和核心线程数比较决定是否添加新工作线程,成功加入队列后检查状态,尝试添加非核心线程,失败调用拒绝策略。
submit方法与FutureTask的协同:提交带返回值的任务转化为FutureTask,调用execute方法执行,返回Future对象获取异步执行结果。
shutdown和shutdownNow方法的区别:shutdown平缓关闭线程池,等待正在执行任务,不接收新任务;shutdownNow则激进停止所有执行任务,并返回等待执行的任务列表。
ThreadPoolExecutor的关键内部类:Worker类封装线程池中的每个工作线程,实现Runnable接口;内部阻塞队列实现线程安全管理任务。
Worker类的作用和生命周期:使用ThreadFactory创建线程,控制线程状态,执行任务队列中的任务,直到线程池终止或无任务执行。
内部阻塞队列的实现与特性:使用线程安全的队列暂存待执行任务,支持不同类型的阻塞队列,如LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue,各有特殊使用场景和性能特点。
延迟任务处理:配置为ScheledThreadPoolExecutor时使用DelayedWorkQueue处理定时任务。
ThreadPoolExecutor的扩展与自定义:提供覆写的钩子方法,实现自定义扩展和自定义拒绝策略。
拓展方法的应用:覆写beforeExecute、afterExecute和terminated方法实现自定义逻辑,如记录日志、计算任务执行时间、收集线程池统计信息等。
自定义拒绝策略的应用:实现RejectedExecutionHandler接口创建自定义拒绝策略,如记录日志、尝试重新提交任务。
实战案例:通过创建监控线程池运行状态的ThreadPoolExecutor并实施自定义扩展,实现对线程池的高效管理。
④ Java多线程和单线程怎么通俗易懂的理解
1. 深入理解Java多线程与单线程:如同工厂流水线的生动比喻
想象一下,一个工厂生产过程可以被看作是单线程,就像只有一个员工在独立完成各项任务。
在Java的世界里,多线程就像拥有多个员工,他们可以同时并行地完成各自的工作,共享资源但拥有独立的工作流程。
这就是Java对多线程和单线程的直观解释。
2. Java的核心特性之一就是支持多线程,线程是程序执行的基本单元,每个线程都有自己的栈空间,虽然可以共享程序的全局资源,但每个线程的执行是独立的。
3. 要实现多线程,你可以选择继承Thread类或者实现Runnable接口。
前者适用于简单扩展,而后者则避免了Java单继承的限制,但设计上稍显复杂。
4. 创建Java线程有三种方法:
继承Thread类:创建一个子类,重写run()方法,然后实例化Thread并调用start()。
实现Runnable接口:创建Runnable实现类,重写run()方法,通过Thread的构造方法将Runnable对象传递。
使用Callable和FutureTask:Callable接口提供了有返回值和异常处理的能力,通过FutureTask包装Callable并启动线程,get()方法用于获取结果。
5. 线程的生命周期分为五个阶段:新建、就绪、运行、阻塞和死亡。
每个阶段都对应着线程在工厂流水线上的不同状态。
6. 调度机制在单CPU和多CPU环境下有所不同,Java采用分时或抢占式模型,优先级高的线程优先获取CPU资源。
7. 线程间的协作和同步至关重要,如通过join()方法实现线程同步,让主线程等待子线程完成。
8. 后台线程则默默地为其他任务服务,守护线程的生命周期与主线程紧密相连。
9. 同步代码块和锁机制(如synchronized关键字)确保在并发环境下数据的一致性,避免诸如窗口卖票问题中的并发冲突。
10. 线程池作为高级工具,通过Executors类简化了线程管理,提高了性能。
线程池可设置固定大小,控制并发量,确保资源的合理分配。
11. 此外,ThreadGroup和线程通信方法(如Object类提供的wait(), notify()等)在处理线程组和线程间的协作中起到关键作用。
总的来说,Java的多线程和单线程就像工厂中的不同工作模式,既独立又协作,为程序并发执行提供了强大的支持。
通过理解这些概念,开发者可以更好地设计和优化他们的并发程序,提升效率,保证数据一致性。
⑤ Java面试之线程池参数设置
在Java中,提供多种线程池类型,以满足不同任务需求。常用类型包括:
缓存线程池(Executors.newCachedThreadPool):动态创建线程,根据任务数量调整大小。
定时线程池(Executors.newScheledThreadPool):按固定时间间隔或延迟执行任务。
固定线程池(Executors.newFixedThreadPool):维护固定数量线程,任务入队等待。
单线程线程池(Executors.newSingleThreadExecutor):包含一个线程,任务顺序执行。
工作窃取线程池(Executors.newWorkStealingPool):内部使用ForkJoinPool,适用于多线程并行操作。
这些线程池都是通过Executors类创建的,但推荐使用ThreadPoolExecutor自定义参数。关键参数包括:
核心线程数量(corePoolSize):线程池中保持的最少线程数。
最大线程数量(maximumPoolSize):线程池能容纳的最大线程数。
存活时间(keepAliveTime):线程闲置时间超过此值将被销毁。
存活时间单位(TimeUnit):keepAliveTime的时间单位。
阻塞队列(workQueue):保存待执行任务。
线程创建工厂(ThreadFactory):自定义线程属性。
饱和策略(RejectedExecutionHandler):队列满时,决定如何处理新任务。
配置参数时,考虑任务类型、CPU核数等,有经验值、最佳线程数算法等方法。经验值法考虑任务密集度,IO密集型设置为2N,CPU密集型设置为N+1;最佳线程数算法则综合任务等待与执行时间。Java并发编程实践与Java虚拟机提供不同计算方法,旨在优化线程池性能。
实际应用中,需综合分析任务特性、系统负载等,通过实验与调整找到平衡点。监测工具与基准负载测试有助于确定线程池最优大小,实现资源高效利用。