上周
Apache Camel v2.16
发布,解决了 600 多个问题(错误修复、更新和新功能)。 Claus 已经在博客中介绍了
前 10 大亮点
,在这里我想简要地向您展示一个用例,说明我在此版本中的微小贡献:
camel-jbpm
组件。
该组件旨在简化与
jBPM
(业务流程管理应用程序)的集成。它使用
Remote Java API
并且基本上对 jBPM 进行 REST 调用来管理业务流程(启动、停止、中止)、执行业务规则、管理人工任务等。因此,与其手动创建 REST 客户端或 Remote API,您可以使用骆驼组件。远程 API 还提供了基于 HornetMQ 的集成,但我已经从 Camel 组件中删除了那部分,因为它不能很好地与 OSGI 配合使用。
在示例用例(受真实案例启发)中,有一个基于 Camel 的集成应用程序在 JBoss Fuse 上运行,该应用程序应该进行实时集成。每当集成路径中出现错误时,错误详细信息都会发送给 jBPM,以便在必要时由人工进行评估和审查。为了证明我已经创建了两个独立的项目:
- 在 Fuse 上运行的 Camel 项目 ,处理任何异常并将详细信息发送到 jBPM
- 在 jBPM 上运行的业务流程项目 使用 Drools 和复杂事件处理对错误进行分类,并在错误严重时创建人工任务。
下面的 Camel 路由是不言自明的,有一个路由生成 10 个异常,每个异常延迟半秒,还有一个错误处理程序捕获这些异常并将它们发送给 jBPM。请注意,它还会使用有关错误的详细信息填充 CamelJBPMParameters 标头。
public void configure() {
from("timer://trigger?fixedRate=true&period=500&repeatCount=10").routeId("mainRoute")
.to("log:com.ofbizian.jbpm.before?showAll=true&multiline=true")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
throw new RuntimeException("Something went wrong");
}
});
onException(Exception.class)
.handled(true)
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
final Map map = new HashMap();
map.putAll(exchange.getIn().getHeaders());
map.put("contextId", exchange.getContext().getName());
map.put("routeId", exchange.getProperty(Exchange.FAILURE_ROUTE_ID));
map.put("endpointId", exchange.getProperty(Exchange.FAILURE_ENDPOINT));
map.put("exchangeId", exchange.getExchangeId());
map.put("breadcrumbId", exchange.getIn().getHeader(Exchange.BREADCRUMB_ID));
map.put("exceptionType", exchange.getProperty(Exchange.EXCEPTION_CAUGHT).getClass());
map.put("errorMessage", exchange.getProperty(Exchange.EXCEPTION_CAUGHT).toString());
exchange.getOut().setHeader("CamelJBPMParameters", map);
}
})
.to("jbpm:http://localhost:8080/business-central?userName=bpmsAdmin&password=password&deploymentId=org.kie.example:camel-process:1.0.0-SNAPSHOT&processId=project1.camel.demo");
}
您可以在 github 上找到示例代码,其中包含有关如何在 Fuse 上构建和运行它的说明。
演示的第二部分包含接收错误并处理它们的 jBPM 流程。该流程的BPMN2图如下:
该过程的输入参数与 CamelJBPMParameters 标头中的值相匹配。在第一步中,它初始化一个标记为非关键的 RequestQualifier 对象。在第二步中,我们使用 Drools 和 复杂事件处理 来限定一段时间内发生的错误。 CEP 的想法是,只有在 10 秒时间窗口内出现超过 5 个错误时,我们才想将错误标记为严重错误。此外,为了防止标记为严重的多重错误并使系统充满人工任务,我们限制在 10 分钟的间隔内最多只有 1 个严重错误。所有这些条件都写在 Drools 中如下:
public void configure() {
from("timer://trigger?fixedRate=true&period=500&repeatCount=10").routeId("mainRoute")
.to("log:com.ofbizian.jbpm.before?showAll=true&multiline=true")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
throw new RuntimeException("Something went wrong");
}
});
onException(Exception.class)
.handled(true)
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
final Map map = new HashMap();
map.putAll(exchange.getIn().getHeaders());
map.put("contextId", exchange.getContext().getName());
map.put("routeId", exchange.getProperty(Exchange.FAILURE_ROUTE_ID));
map.put("endpointId", exchange.getProperty(Exchange.FAILURE_ENDPOINT));
map.put("exchangeId", exchange.getExchangeId());
map.put("breadcrumbId", exchange.getIn().getHeader(Exchange.BREADCRUMB_ID));
map.put("exceptionType", exchange.getProperty(Exchange.EXCEPTION_CAUGHT).getClass());
map.put("errorMessage", exchange.getProperty(Exchange.EXCEPTION_CAUGHT).toString());
exchange.getOut().setHeader("CamelJBPMParameters", map);
}
})
.to("jbpm:http://localhost:8080/business-central?userName=bpmsAdmin&password=password&deploymentId=org.kie.example:camel-process:1.0.0-SNAPSHOT&processId=project1.camel.demo");
}
一旦带有错误详细信息的请求被认定为关键或不重要,我们就会使用该信息进行条件路由。记录所有非严重错误,并完成业务流程。我们可以在以下屏幕截图中看到已完成流程的完整列表和错误相关信息:
这些业务流程实例不需要进一步的操作。但在业务流程的另一个分支中,如果错误被标记为严重,我们会创建一个人工任务,流程会在此时停止。
这需要用户分配任务并对其执行操作。我们可以看到 Human Task 已经从业务流程中接收到详细信息:
一旦人工任务被认领并完成,业务流程就会继续并在下一步中完成。