我们这一节教程,就为大家介绍如何自定义异步并行处理框架,希望大家能够好好学。
代码说明:任务有工作组组成,每个工作组有自己的任务线程,每个工作组有完成后的处理接口,所有工作组完成后有处理接口
1、CompletableFuture相当于一个容器,容器里面运行的是工作组,通过new关键字创建容器
2、为容器添加工作组,工作组有容器的getWorkGroup()方法获取,
3、为工作组添加任务:workGroup.addTask,这里添加任务有三种方式,第一种:直接通过jquery,Callable添加。第二种:通过WorkGroupTask添加,第三种:mysql通过WorkGroupForAround添加任务,这种方式支持前后通知,可以自定义任务完成前后需要处理的事情。
4、CompletableInterface接口是每个工作组或容器执行完后的汇总接口,
completable是容器完成任务后的方法,
workerCompletable是工作组完成任务的方法,也是方法之一。 下面的例子是演示了一个加法的运算,通过创建三个工作组,每个工作组创建三个任务,每个任务计算不同长度的加法,然后每个工作组完成后对该工作组内的加法做汇总,当所有工作组完成后, 容器再对所有工作组内的结果再汇总。
package com.sgcc.sgd5000.utils;import java.math.BigDecimal;import java.util.ArrayList;import java.util.List;import java.util.Timer;import java.util.TimerTask;import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.ScheduledExecutorService;public final class CompletableFuture<T>{ private int seconds=60; public static void main(String[] args) { CompletableInterface comp=new CompletableFuture.CompletableInterface<BigDecimal>() { @Override public void completable(List<List<BigDecimal>> resultSet) { } @Override public void workerCompletable(List<BigDecimal> resultSet) { BigDecimal total=new BigDecimal(0); for(BigDecimal set:resultSet){ for(BigDecimal i:resultSet){ total=total.add(i); } } resultSet.clear(); resultSet.add(total); } }; CompletableFuture futur=new CompletableFuture<Integer>(); CompletableFuture.WorkGroup workGroup = futur.getWorkGroup(); workGroup.addWorkCompletableInterface(comp); CompletableFuture.WorkGroup workGroup1 = futur.getWorkGroup(); workGroup1.addWorkCompletableInterface(comp); CompletableFuture.WorkGroup workGroup2 = futur.getWorkGroup(); workGroup2.addWorkCompletableInterface(comp); workGroup.addTask(new Callable<BigDecimal>() { @Override public BigDecimal call() throws Exception { BigDecimal b=new BigDecimal(0); for(int i=0;i<300000000;i++){ b=b.add(new BigDecimal(i)); } return b; } }).addTask(new Callable<BigDecimal>() { @Override public BigDecimal call() throws Exception { BigDecimal b=new BigDecimal(0); for(int i=300000000;i<600000000;i++){ b=b.add(new BigDecimal(i)); } return b; } }).addTask(new Callable<BigDecimal>() { @Override public BigDecimal call() throws Exception { BigDecimal b=new BigDecimal(0); for(int i=600000000;i<800000000;i++){ b=b.add(new BigDecimal(i)); } return b; } }); workGroup.addTask(new Callable<BigDecimal>() { @Override public BigDecimal call() throws Exception { BigDecimal b=new BigDecimal(0); for(int i=800000000;i<1000000000;i++){ b=b.add(new BigDecimal(i)); } return b; } }); workGroup1.addTask(new Callable<BigDecimal>() { @Override public BigDecimal call() throws Exception { BigDecimal b=new BigDecimal(0); for(int i=1000000000;i<1300000000;i++){ b=b.add(new BigDecimal(i)); } return b; } }); workGroup1.addTask(new Callable<BigDecimal>() { @Override public BigDecimal call() throws Exception { BigDecimal b=new BigDecimal(0); for(int i=1300000000;i<1600000000;i++){ b=b.add(new BigDecimal(i)); } return b; } }); workGroup1.addTask(new Callable<BigDecimal>() { @Override public BigDecimal call() throws Exception { BigDecimal b=new BigDecimal(0); for(int i=1600000000;i<1800000000;i++){ b=b.add(new BigDecimal(i)); } return b; } }); workGroup1.addTask(new Callable<BigDecimal>() { @Override public BigDecimal call() throws Exception { BigDecimal b=new BigDecimal(0); for(int i=1800000000;i<2000000000;i++){ b=b.add(new BigDecimal(i)); } return b; } }); workGroup2.addTaskInteface(new WorkGroupTask<BigDecimal>() { @Override public BigDecimal call() { BigDecimal b=new BigDecimal(0); for(int i=9;i<11;i++){ b=b.add(new BigDecimal(i)); } return b; } }); futur.addWorkGroup(workGroup).addWorkGroup(workGroup1); futur.awaits(1000); futur.forkJoin(new CompletableFuture.CompletableInterface<BigDecimal>() { @Override public void completable(List<List<BigDecimal>> resultSet) { BigDecimal total=new BigDecimal(0); for(List<BigDecimal> set:resultSet){ for(BigDecimal i:set){ total=total.add(i); } } System.out.println(total); } @Override public void workerCompletable(List<BigDecimal> resultSet) { // TODO Auto-generated method stub } }); } public interface CompletableInterface<T>{ public void completable(List<List<T>> resultSet); public void workerCompletable(List<T> resultSet); } public ScheduledExecutorService schedulePool = Executors.newScheduledThreadPool(3); public ScheduledExecutorService sch = Executors.newScheduledThreadPool(6); List<WorkGroup<T>> workGroups=new ArrayList<WorkGroup<T>>(); public void awaits(int seconds){ this.seconds=seconds; invoke(); } public CompletableFuture<T> addWorkGroup(WorkGroup<T> w){ workGroups.add(w); return this; } private void invoke(){ for(WorkGroup<T> group:workGroups){ schedulePool.execute(group); } } class ListenerTaskFinshed extends TimerTask{ public int time=0; @Override public void run() { time++; } public int getTime(){ return time; } } public <T> void forkJoin(CompletableInterface<T> completableInterface){ final Timer timer=new Timer(); try { ListenerTaskFinshed listener=this.new ListenerTaskFinshed(); timer.schedule(listener, 0, 1000); int workGroupsCount=0; do{ for(WorkGroup<?> group:workGroups){ if(group.getFinshed()){ workGroupsCount++; group.setFinshed(false); } if(workGroupsCount ==workGroups.size()){ break; } } }while(workGroupsCount <workGroups.size()&& listener.getTime()<seconds); timer.cancel(); if(workGroupsCount ==workGroups.size()){ List<List<T>> rset=new ArrayList<List<T>>(); for(WorkGroup group:workGroups){ List<T> rList=group.getFutures(); rset.add(rList); } completableInterface.completable(rset); } } catch (Exception e) { e.printStackTrace(); schedulePool.shutdownNow(); sch.shutdownNow(); }finally{ timer.cancel(); sch.shutdown(); schedulePool.shutdown(); } } public <T> WorkGroup<T> getWorkGroup(){ return this.new WorkGroup<T>(){}; } public interface WorkGroupTask<T>{ public T call(); } public abstract class WorkGroupForAround<T> implements WorkGroupTask<T>{ public abstract void beforeCall(); public final T runTask(){ beforeCall(); T t=call(); afterCall(t); return t; }; public abstract void afterCall(T result); } //工作组 public abstract class WorkGroup<T> implements Runnable{ private List<Callable<T>> taskGroup=new ArrayList<Callable<T>>(); private List<T> listFuture; private boolean finshed=false; private CompletableInterface workCompletableInterface; public WorkGroup<T> addTask(Callable<T> call){ taskGroup.add(call); return this; } public WorkGroup<T> addTaskInteface(final WorkGroupTask<T> task){ taskGroup.add(new Callable<T>() { @Override public T call() throws Exception { // TODO Auto-generated method stub return task.call(); } }); return this; } public WorkGroup<T> addTaskInteface(final WorkGroupForAround<T> task){ taskGroup.add(new Callable<T>() { @Override public T call() throws Exception { // TODO Auto-generated method stub return task.runTask(); } }); return this; } public void addWorkCompletableInterface(CompletableInterface workInterface){ workCompletableInterface=workInterface; } @Override public void run() { try { if(taskGroup.size() ==0){ listFuture=new ArrayList<T>(); setFinshed(true); return ; } List<Future<T>> reList=sch.invokeAll(taskGroup); listFuture=new ArrayList<T>(); for(Future<T> fu:reList){ listFuture.add(fu.get()); } if(null != workCompletableInterface){ workCompletableInterface.workerCompletable(listFuture); } setFinshed(true); } catch (Exception e) { // TODO: handle exception } } public synchronized void setFinshed(boolean bl){ finshed=bl; } public synchronized boolean getFinshed(){ return finshed; } public List<T> getFutures(){ return listFuture; } } } 小编结语:相信大家在看完了这一段代码之后,都知道如何自定义异步并行处理了吧,感谢大家的支持。
原文出处:550
版权声明:原创作品,允许转载,转载时请务必以超链接形式标明文章原始出处、作者信息和本声明,否则将追究法律责任。https://blog.kokojia.com/love15200922/b-1974.html