Java并发编程
为什么需要并发编程?
计算机不是万能的,因为他有自己的处理速度,当我们开发Web应用或者其他通讯应用,会有很多用户并发(同时)访问时,在服务器端我们就必须做并发处理。
Java支持多线程的,每个请求一个线程来处理。行业经验不丰富的情况下,你可能考虑到是使用同步来保证资源的正常调用。要知道同步是非常消耗计算机资源的,因此这篇文章推荐大家使用Java并发编程包。
java.util.concurrent
在并发编程中很常用的实用工具类。
理解队列机制
就好比我们中午去食堂吃饭一样,我们需要等待前面的兄弟伙饭打好了,才能轮到后边一位。而这个过程我们可以用“先进先出”来表示。但有时候,有的人认识食堂的人,那么他就可能插队打饭。
在concurrent包中也就是实现了这一队列机制。下边是简单的示意图:
球1是最先进去的,最先出来。而矩形框就是我们的队列池。
调度线程与维护线程
这两个线程由java.util.concurrent包中的类提供
- 数据池必须要我们能访问它并且能存放数据到里面。
-
在程序启动时候间隔几秒或直接启动两个线程,
- 调度线程:进行轮循启动 缓存的调度线程(这个负责取得数据池数据构造数据传递线程并传给维护线程池[ThreadPool],由维护线程池自动调用)
- 维护线程:是将获取的数据进行进一步处理,要么存数据库,要么通过Http请求发送给其他服务。
- 异常处理机制,我们不能保证数据传递过程不会报出异常,这里我们就要进行异常处理,出现异常,获取异常数据,重新推进数据池中。
并发编程实现
// 线程池管理
package org.marker.mq;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 线程池管理
* @Date 2012-12-29
* */
public class ThreadPoolManager {
//线程实例变量
private static volatile ThreadPoolManager poolManager;
// 线程池维护线程的最少数量
private final static int CORE_POOL_SIZE = 4;
// 线程池维护线程的最大数量
private final static int MAX_POOL_SIZE = 15;
// 线程池维护线程所允许的空闲时间
private final static int KEEP_ALIVE_TIME = 0;
// 线程池所使用的缓冲队列大小
private final static int WORK_QUEUE_SIZE = 15;
// 消息缓冲队列
private Queue<String> msgQueue = new LinkedList<String>();
// 访问消息缓存的调度线程
final Runnable accessBufferThread = new Runnable() {
public void run() {
if (hasMoreAcquire()) {//如果有请求内容,则创建一个新的AccessDBThread,并添加到线程池中
String msg = (String) msgQueue.poll();
Runnable task = new AccessDBThread(msg);
threadPool.execute(task);
}
}
};
//异常处理消息
final RejectedExecutionHandler handler = new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(((AccessDBThread) r).msg + "消息放入队列中重新等待执行");
msgQueue.offer(((AccessDBThread) r).msg);
}
};
//管理数据库访问的线程池
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new ArrayBlockingQueue(WORK_QUEUE_SIZE), this.handler);
//调度线程池(参数含义查看API)
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
final ScheduledFuture taskHandler = scheduler.scheduleAtFixedRate( accessBufferThread, 10, 1, TimeUnit.MILLISECONDS);
//私有构造方法
private ThreadPoolManager() { }
/**
* 创建线程池实例
* @return ThreadPoolManager
*/
public static ThreadPoolManager getInstance() {
if(poolManager == null) poolManager = new ThreadPoolManager();
return poolManager;
}
//判断是否有更多的
private boolean hasMoreAcquire() {
return !msgQueue.isEmpty();
}
/**
* 推送消息到池里
* */
public boolean put(String msg) {
if(msgQueue.size() < 10){
return msgQueue.offer(msg);
}else{
return false;
}
}
}
// 排队对象
package org.marker.mq;
/**
* 数据库操作
* @author marker
* */
public class AccessDBThread implements Runnable {
public String msg;
public AccessDBThread(String msg) {
this.msg = msg;
}
public void run() {
// 向数据库中添加Msg变量值
System.out.println("Added the message: " + msg + " into the Database");
}
}
// 测试代码
package org.marker.mq;
public class TestDriver {
ThreadPoolManager pool = ThreadPoolManager.getInstance();
public void sendMsg(String msg) {
if(!pool.put(msg + "记录一条日志 ")){
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.sendMsg(msg);
}
}
public static void main(String[] args) {
for (int i = 0; i < 10000; i++) {
new TestDriver().sendMsg(Integer.toString(i));
}
}
}
在这个并发编程实现中,并没有说理解处理数据库插入,而是交给线程池有时间有优先级的排队处理。