public class Consts { public static final int PROBLEM_BANK_COUNT = 2000;//题库大小 //取得本地机器cpu数量 public final static int THREAD_COUNT_BASE = Runtime.getRuntime().availableProcessors();}
/** * 用sleep模拟实际的业务操作 */public class BusiMock { public static void buisness(int sleepTime){ try { Thread.sleep(sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } }}
public class MakeSrcDoc { /** * 形成待处理文档 * @param docCount 生成的文档数量 * @return 待处理文档列表 */ public static ListmakeDoc(int docCount){ Random r = new Random(); Random rProblemCount = new Random(); List docList = new LinkedList<>();//文档列表 for(int i=0;i problemList = new LinkedList ();//文档中题目列表 int docProblemCount = rProblemCount.nextInt(60)+60; for(int j=0;j< docProblemCount;j++){ int problemId = r.nextInt(Consts.PROBLEM_BANK_COUNT); problemList.add(problemId); } PendingDocVo pendingDocVo = new PendingDocVo("pending_"+i, problemList); docList.add(pendingDocVo); } return docList; }}
public class ProblemBank { //题库数据存储 private static ConcurrentHashMapproblemBankMap = new ConcurrentHashMap<>(); //定时任务池,负责定时更新题库数据 private static ScheduledExecutorService updateProblemBank = new ScheduledThreadPoolExecutor(1); //初始化题库 public static void initBank(){ for(int i=0;i
/** * 文档处理服务 */public class DocService { /** * 上传文档到网络 * @param docFileName 实际文档在本地的存储位置 * @return 上传后的网络存储地址 */ public static String upLoadDoc(String docFileName){ Random r = new Random(); BusiMock.buisness(5000+r.nextInt(400)); return "http://www.xxxx.com/file/upload/"+docFileName; } /** * 将待处理文档处理为本地实际文档 * @param pendingDocVo 待处理文档 * @return 实际文档在本地的存储位置 */ public static String makeDoc(PendingDocVo pendingDocVo){ System.out.println("开始处理文档:"+ pendingDocVo.getDocName()); StringBuffer sb = new StringBuffer(); for(Integer problemId: pendingDocVo.getProblemVoList()){ sb.append(ProblemService.makeProblem(problemId)); } return "complete_"+System.currentTimeMillis()+"_" +pendingDocVo.getDocName()+".pdf"; } /** * 异步并行处理文档中的题目 * @param pendingDocVo * @return * @throws ExecutionException * @throws InterruptedException */ public static String makeAsyn(PendingDocVo pendingDocVo) throws ExecutionException, InterruptedException { System.out.println("开始处理文档:"+ pendingDocVo.getDocName()); //对题目处理结果的缓存 MapmultiProblemVoMap = new HashMap<>(); //并行处理文档中的每个题目 for(Integer problemId:pendingDocVo.getProblemVoList()){ multiProblemVoMap.put(problemId, ProblemMultiService.makeProblem(problemId)); } //获取题目的结果 StringBuffer sb = new StringBuffer(); for(Integer problemId:pendingDocVo.getProblemVoList()){ MultiProblemVo multiProblemVo = multiProblemVoMap.get(problemId); sb.append( multiProblemVo.getProblemText()==null ? multiProblemVo.getProbleFuture().get().getProcessedContent() : multiProblemVo.getProblemText()); } return "complete_"+System.currentTimeMillis()+"_" +pendingDocVo.getDocName()+".pdf"; }}
/** * 题目处理的基础服务,模拟解析题目文本,下载图片等操作, * 返回解析后的文本 */public class BaseProblemService { /** * 对题目进行处理,如解析文本,下载图片等等工作 * @param problemId 题目id * @return 题目解析后的文本 */ public static String makeProblem(Integer problemId,String problemSrc){ Random r = new Random(); BusiMock.buisness(450+r.nextInt(100)); return "CompleteProblem[id="+problemId +" content=:"+ problemSrc+"]"; }}
/** * 并行异步的处理题目 */public class ProblemMultiService { //存放处理过题目内容的缓存 private static ConcurrentHashMapproblemCache = new ConcurrentHashMap<>(); //存放正在处理的题目的缓存,防止多个线程同时处理一个题目 private static ConcurrentHashMap > processingProblemCache = new ConcurrentHashMap<>(); //处理的题目的线程池 private static ExecutorService makeProblemExec = Executors.newFixedThreadPool(Consts.THREAD_COUNT_BASE*2); //供调用者使用,返回题目的内容或者任务 public static MultiProblemVo makeProblem(Integer problemId){ //检查缓存中是否存在 ProblemCacheVo problemCacheVo = problemCache.get(problemId); if(null==problemCacheVo){ System.out.println("题目【"+problemId+"】在缓存中不存在,需要新启任务"); return new MultiProblemVo(getProblemFuture(problemId)); }else{ //拿摘要,一篇文档中的所有题目的摘要其实可以一次性取得,以减少对数据库的访问 String problemSha = ProblemBank.getProblemSha(problemId); if(problemCacheVo.getProblemSha().equals(problemSha)){ System.out.println("题目【"+problemId+"】在缓存中存在且没有修改过,可以直接使用。"); return new MultiProblemVo(problemCacheVo.getProcessedContent()); } else{ System.out.println("题目【"+problemId+"】的摘要发生了变化,启动任务更新缓存。"); return new MultiProblemVo(getProblemFuture(problemId)); } } } //返回题目的工作任务 private static Future getProblemFuture(Integer problemid){ Future problemFuture = processingProblemCache.get(problemid); if (problemFuture==null){ ProblemDBVo problemDBVo = ProblemBank.getProblem(problemid); ProblemTask problemTask = new ProblemTask(problemDBVo,problemid); //当前线程新启了一个任务 FutureTask ft = new FutureTask (problemTask); problemFuture = processingProblemCache.putIfAbsent(problemid,ft); if (problemFuture==null){ //表示没有别的线程正在处理当前题目 problemFuture = ft; makeProblemExec.execute(ft); System.out.println("题目【"+problemid+"】计算任务启动,请等待完成>>>>>>>>>>>>>。"); }else{ System.out.println("刚刚有其他线程启动了题目【"+problemid+"】的计算任务,任务不必开启"); } }else{ System.out.println("当前已经有了题目【"+problemid+"】的计算任务,不必重新开启"); } return problemFuture; } //处理题目的任务 private static class ProblemTask implements Callable { private ProblemDBVo problemDBVo; private Integer problemId; public ProblemTask(ProblemDBVo problemDBVo, Integer problemId) { this.problemDBVo = problemDBVo; this.problemId = problemId; } @Override public ProblemCacheVo call() throws Exception { try { ProblemCacheVo problemCacheVo = new ProblemCacheVo(); problemCacheVo.setProcessedContent( BaseProblemService.makeProblem(problemId,problemDBVo.getContent())); problemCacheVo.setProblemSha(problemDBVo.getSha()); problemCache.put(problemId,problemCacheVo); return problemCacheVo; } finally { //无论正常还是异常,都需要将生成的题目的任务从缓存移除 processingProblemCache.remove(problemId); } } }}
public class ProblemService { /** * 普通对题目进行处理 * @param problemId 题目id * @return 题目解析后的文本 */ public static String makeProblem(Integer problemId){ return BaseProblemService.makeProblem(problemId, ProblemBank.getProblem(problemId).getContent()); }}
/** * 并发题目处理时,返回处理的题目结果 */public class MultiProblemVo { private final String problemText;//要么就是题目处理后的文本; private final FutureprobleFuture;//处理题目的任务 public MultiProblemVo(String problemText) { this.problemText = problemText; this.probleFuture = null; } public MultiProblemVo(Future probleFuture) { this.probleFuture = probleFuture; this.problemText = null; } public String getProblemText() { return problemText; } public Future getProbleFuture() { return probleFuture; }}
/** * 待处理文档实体类 */public class PendingDocVo { //待处理文档名称 private final String docName; //待处理文档中题目id列表 private final ListproblemVoList; public PendingDocVo(String docName, List problemVoList) { this.docName = docName; this.problemVoList = problemVoList; } public String getDocName() { return docName; } public List getProblemVoList() { return problemVoList; }}
/** * 题目保存在缓存中的实体类 */public class ProblemCacheVo implements Serializable{ private String processedContent; private String problemSha; public ProblemCacheVo() { } public ProblemCacheVo(String processedContent, String problemSha) { this.processedContent = processedContent; this.problemSha = problemSha; } public String getProcessedContent() { return processedContent; } public void setProcessedContent(String processedContent) { this.processedContent = processedContent; } public String getProblemSha() { return problemSha; } public void setProblemSha(String problemSha) { this.problemSha = problemSha; }}
/** * 题目在数据库中存放实体类 */public class ProblemDBVo { //题目id private final int problemId; //题目内容,平均长度700字节 private final String content; //题目的sha串 private final String sha; public ProblemDBVo(int problemId, String content, String sha) { this.problemId = problemId; this.content = content; this.sha = sha; } public int getProblemId() { return problemId; } public String getContent() { return content; } public String getSha() { return sha; }}
public class SingleWeb { public static void main(String[] args) { System.out.println("题库开始初始化..........."); ProblemBank.initBank(); System.out.println("题库初始化完成。"); ListdocList = MakeSrcDoc.makeDoc(2); long startTotal = System.currentTimeMillis(); for(PendingDocVo doc:docList){ System.out.println("开始处理文档:"+doc.getDocName()+"......."); long start = System.currentTimeMillis(); String localName = DocService.makeDoc(doc); System.out.println("文档"+localName+"生成耗时:" +(System.currentTimeMillis()-start)+"ms"); start = System.currentTimeMillis(); String remoteUrl = DocService.upLoadDoc(localName); System.out.println("已上传至["+remoteUrl+"]耗时:" +(System.currentTimeMillis()-start)+"ms"); } System.out.println("共耗时:"+(System.currentTimeMillis()-startTotal)+"ms"); }}运行结果======================题库开始初始化...........开始定时更新题库..........................题库初始化完成。开始处理文档:pending_0.......开始处理文档:pending_0文档complete_1531484294630_pending_0.pdf生成耗时:42965ms已上传至[http://www.xxxx.com/file/upload/complete_1531484294630_pending_0.pdf]耗时:5223ms开始处理文档:pending_1.......开始处理文档:pending_1文档complete_1531484341109_pending_1.pdf生成耗时:41256ms已上传至[http://www.xxxx.com/file/upload/complete_1531484341109_pending_1.pdf]耗时:5036ms共耗时:94480ms
/** * 服务的拆分,rpc服务 */public class RpcMode { //生成文档的线程池 private static ExecutorService docMakeService = Executors.newFixedThreadPool(Consts.THREAD_COUNT_BASE*2); //上传文档的线程池 private static ExecutorService docUploadService = Executors.newFixedThreadPool(Consts.THREAD_COUNT_BASE*2); private static CompletionService docCompletionService = new ExecutorCompletionService(docMakeService); private static CompletionService uploadCompletionService = new ExecutorCompletionService(docUploadService); private static class MakeDocTask implements Callable{ private PendingDocVo pendingDocVo; public MakeDocTask(PendingDocVo pendingDocVo) { this.pendingDocVo = pendingDocVo; } @Override public String call() throws Exception { long start = System.currentTimeMillis(); String localName = DocService.makeAsyn(pendingDocVo); System.out.println("文档"+localName+"生成耗时:" +(System.currentTimeMillis()-start)+"ms"); return localName; } } private static class UploadDocTask implements Callable { private String localName; public UploadDocTask(String localName) { this.localName = localName; } @Override public String call() throws Exception { long start = System.currentTimeMillis(); String remoteUrl = DocService.upLoadDoc(localName); System.out.println("已上传至["+remoteUrl+"]耗时:" +(System.currentTimeMillis()-start)+"ms"); return remoteUrl; } } public static void main(String[] args) throws InterruptedException, ExecutionException { System.out.println("题库开始初始化..........."); ProblemBank.initBank(); System.out.println("题库初始化完成。"); List docList = MakeSrcDoc.makeDoc(60); long startTotal = System.currentTimeMillis(); for(PendingDocVo doc:docList){ docCompletionService.submit(new MakeDocTask(doc)); } for(PendingDocVo doc:docList){ Future futureLocalName = docCompletionService.take(); uploadCompletionService.submit(new UploadDocTask(futureLocalName.get())); } for(PendingDocVo doc:docList){ //把上传后的网络存储地址拿到 uploadCompletionService.take().get(); } System.out.println("共耗时:"+(System.currentTimeMillis()-startTotal)+"ms"); }}运行结果 ==========题库开始初始化...........开始定时更新题库..........................题库初始化完成。开始处理文档:pending_5开始处理文档:pending_3开始处理文档:pending_4开始处理文档:pending_7开始处理文档:pending_6开始处理文档:pending_10开始处理文档:pending_0开始处理文档:pending_1开始处理文档:pending_2开始处理文档:pending_9开始处理文档:pending_8开始处理文档:pending_11开始处理文档:pending_12开始处理文档:pending_13开始处理文档:pending_14题目【780】在缓存中不存在,需要新启任务题目【1433】在缓存中不存在,需要新启任务题目【847】在缓存中不存在,需要新启任务题目【688】在缓存中不存在,需要新启任务题目【1695】在缓存中不存在,需要新启任务题目【547】在缓存中不存在,需要新启任务题目【446】在缓存中不存在,需要新启任务开始处理文档:pending_15题目【1439】在缓存中不存在,需要新启任务题目【1245】在缓存中不存在,需要新启任务题目【895】在缓存中不存在,需要新启任务题目【1416】在缓存中不存在,需要新启任务..................已上传至[http://www.xxxx.com/file/upload/complete_1531485132232_pending_58.pdf]耗时:5319ms已上传至[http://www.xxxx.com/file/upload/complete_1531485132409_pending_59.pdf]耗时:5341ms共耗时:65280ms