现在,有这样一个需求:
1、真正开始执行实际业务之前,因为要和多方进行交互(远程通信),需要对对外部各方进行一系列的鉴权;
2、对外部各方之间进行鉴权,存在一定的先后顺序;
3、如果某一个鉴权步骤失败,整个流程终止;
4、全部鉴权完毕后,需要收集一些数据(组装报文),为后续的业务调用提供组装报文的数据。
通过jBPM的fork和join可以很好地实现上述的需求,我们定义的流程,如下图所示:
上述流程对应的流程定义文件multiple-fork-join-process.jpdl.xml内容如下所示:
<?xml version="1.0" encoding="UTF-8"?> <process name="MultipleForkJoinProcess" xmlns="http://jbpm.org/4.4/jpdl"> <start g="521,17,48,48"> <transition g="-77,-17" name="to CheckUser" to="CheckUser"/> </start> <custom class="com.umpay.ubp.jbpm.handler.CheckMerHandler" g="345,261,112,52" name="CheckMer"> <transition g="-53,-17" name="to join1" to="join1"/> </custom> <custom class="com.umpay.ubp.jbpm.handler.CheckUserHandler" g="489,97,112,52" name="CheckUser"> <transition g="-53,-17" name="to fork1" to="fork1"/> </custom> <custom class="com.umpay.ubp.jbpm.handler.CheckGoodsHandler" g="633,261,112,52" name="CheckGoods"> <transition g="-53,-17" name="to join2" to="join2"/> </custom> <custom class="com.umpay.ubp.jbpm.handler.CheckBankHandler" g="489,261,112,52" name="CheckBank"> <transition g="-53,-17" name="to fork2" to="fork2"/> </custom> <custom class="com.umpay.ubp.jbpm.handler.CheckGoodsBankHandler" g="559,505,112,52" name="CheckGoodsBank"> <transition g="-53,-17" name="to join3" to="join3"/> </custom> <custom class="com.umpay.ubp.jbpm.handler.AggregateDataHandler" g="497,753,92,52" name="AggregateData"> <transition g="-41,-17" name="to end" to="end"/> </custom> <custom class="com.umpay.ubp.jbpm.handler.PrintInfoHandler" g="497,669,92,52" name="PrintInfo"> <transition name="to AggregateData" to="AggregateData" g="-101,-17"/> </custom> <end g="519,837,48,48" name="end"/> <fork g="521,181,48,48" name="fork1"> <transition g="-71,-17" name="to CheckMer" to="CheckMer"/> <transition g="-77,-17" name="to CheckBank" to="CheckBank"/> <transition g="-83,-17" name="to CheckGoods" to="CheckGoods"/> </fork> <join g="447,425,48,48" name="join1"> <transition g="-95,-17" name="to CheckMerBank" to="CheckMerBank"/> </join> <custom class="com.umpay.ubp.jbpm.handler.CheckMerBankHandler" g="415,505,112,52" name="CheckMerBank"> <transition g="-53,-17" name="to join3" to="join3"/> </custom> <join g="593,425,48,48" name="join2"> <transition g="-107,-17" name="to CheckGoodsBank" to="CheckGoodsBank"/> </join> <join g="519,589,48,48" name="join3"> <transition name="to PrintInfo" to="PrintInfo" g="-77,-17"/> </join> <fork g="521,345,48,48" name="fork2"> <transition g="-53,-17" name="to join1" to="join1"/> <transition g="-53,-17" name="to join2" to="join2"/> </fork> </process>
上述流程简要描述如下:
1、校验用户(CheckUser)的合法性,例如如果是远程移动支付,用户提供的手机号必须在线,而且开通了某项业务鉴权才通过,才可以进行后续的校验;
2、校验提供商品的商户(CheckMer),校验提供支付的银行(也就是支付商,CheckBank),校验提供的商品(CheckGoods),这几项是可以并行进行的(不过,貌似jBPM启动流程执行fork的时候,只能单线程执行,而无法多线程并行校验);
3、校验商户支付商关系(CheckMerBank),校验商品支付商关系(CheckGoodsBank)。由于商户支付商关系鉴权依赖于商户和支付商的校验,商品支付商关系鉴权依赖于商品和支付商的校验,所以在CheckBank结点进行了fork,然后分别和CheckMer、CheckGoods做了一个join操作;
4、校验提供商品的商户(CheckMer)与校验提供支付的银行(CheckBank)都完成后,需要将在发起远程支付交易的请求中数据都汇集,join后,在AggregateData中进行处理(例如内部报文到内部报文的转换操作)。
对于每个结点(这里只上述流程定义中的custom对应的处理类)要进行的处理,完全可以自己模拟,例如,我的com.umpay.ubp.jbpm.handler.PrintInfoHandler的继承关系如下所示:
package com.umpay.ubp.common; public abstract class AbstractHandler implements org.jbpm.api.activity.ExternalActivityBehaviour { ... } public class PrintInfoHandler extends com.umpay.ubp.common.bstractHandler { ... }
在AbstractHandler中,对业务流程处理进行了抽象。
上述流程对应的测试用例如下所示:
package com.umpay.ubp.jbpm.test; import java.util.HashMap; import java.util.Map; import junit.framework.TestCase; import org.jbpm.api.Execution; import org.jbpm.api.ProcessEngine; import org.jbpm.api.ProcessInstance; import org.springframework.context.support.ClassPathXmlApplicationContext; import com.umpay.ubp.constants.XmlUBP; public class MultipleForkJoinProcessTest extends TestCase { private ProcessEngine processEngine; String deploymentId; protected void setUp() throws Exception { super.setUp(); ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext( "applicationContext.xml"); ctx.start(); this.processEngine = (ProcessEngine) ctx.getBean("processEngine"); deploymentId = this.processEngine .getRepositoryService() .createDeployment() .addResourceFromClasspath( "com/umpay/ubp/jbpm/jpdl/multiple-fork-join-process.jpdl.xml") .deploy(); } protected void tearDown() throws Exception { this.processEngine.getRepositoryService().deleteDeploymentCascade( deploymentId); super.tearDown(); } public void testMyProcess() { // initialize request data Map<String, Object> variables = new HashMap<String, Object>(); variables.put(XmlUBP.MERID, "001"); // mer variables.put(XmlUBP.CALLING, "13800138000"); // user variables.put(XmlUBP.GOODSID, "030"); // goods variables.put(XmlUBP.BANKID, "CMPAY591"); // bank variables.put("tick", String.valueOf(System.currentTimeMillis())); ProcessInstance processInstance = null; // start a process instance // it can execute a process automatically processInstance = this.processEngine .getExecutionService().startProcessInstanceByKey("MultipleForkJoinProcess", variables); System.out.println(processInstance.findActiveActivityNames()); assertEquals(Execution.STATE_ENDED, processInstance.getState()); } }