现在 Infinispan 支持 Java 8,我们可以充分利用一些新特性。 Java 8 的一大特性是新的 Stream 类。这颠覆了处理数据的思路,这样底层 Stream 就可以处理数据,而不必自己迭代数据,而您只需提供要对其执行的操作。这非常适合分布式处理,因为迭代完全由实现处理(在本例中为 Infinispan)。
因此,我很高兴为 Infinispan 8 介绍分布式流功能!这允许您可以在常规 Stream 上执行的任何操作也可以在分布式缓存上执行(假设操作和数据是 可编组的 )。
可编组性
使用分布式或复制缓存时,缓存的键和值必须是可编组的。这与使用分布式流时的中间和终端操作的情况相同。通常,您必须提供某个新类的实例,该类要么是可序列化的,要么已为其注册了一个 Externalizer,如用户指南的可编组部分所述。
不过,Java 8 也引入了 lambda,可以很容易的定义为可序列化的(虽然有点别扭)。可以 在此处 找到此序列化的示例。
你们中的一些人可能还知道 Collectors 类,它与流上的 collect 方法一起使用。不幸的是,所有生产的收藏家都无法编组。因此,Infinispan 添加了一个可以与 Collectors 类结合使用的 实用程序类 。这允许您仍然使用 Collectors 类的任意组合,并且在需要编组所有内容时仍然可以正常工作。
平行度
Java 8 流自然具有并行性。那就是流可以被标记为并行的。这又允许使用多个线程并行执行操作。最好的部分是它是多么简单。可以在通过调用 parallelStream 首次检索流时使流并行,或者您可以选择在检索 Stream 后通过调用 parallel 启用它。
来自 Infinispan 的新分布式流更进一步,我称之为并行分布。也就是说,由于数据已经跨节点分区,我们还可以允许同时在不同节点上同时运行操作。默认情况下启用此选项。然而,这可以通过使用下面讨论的新 CacheStream 接口来控制。另外,需要明确的是,Java 8 并行可以与并行分布结合使用。这仅意味着您将在每个节点上跨多个线程的多个节点上运行并发操作。
缓存流接口
提供了一个新接口 Cachestream ,允许在使用分布式流时控制其他选项。我正在突出显示添加的方法(注意注释已从要点中删除)
public interface CacheStream<R> extends Stream<R> {
CacheStream<R> sequentialDistribution();
CacheStream<R> parallelDistribution();
CacheStream<R> filterKeySegments(Set<Integer> segments);
CacheStream<R> filterKeys(Set<?> keys);
CacheStream<R> distributedBatchSize(int batchSize);
CacheStream<R> segmentCompletionListener(SegmentCompletionListener listener);
CacheStream<R> disableRehashAware();
CacheStream<R> timeout(long timeout, TimeUnit unit);
interface SegmentCompletionListener {
void segmentCompleted(Set<Integer> segments);
}
}
分布式批量大小
此方法控制一次带回多少元素以用于可感知键的操作。这些操作是 (spl)iterator 和 forEach。这对于调整从远程节点保存在内存中的密钥数量很有用。因此,这是性能(更多键)与内存的权衡。这默认为状态传输配置的块大小。
parallelDistribution / 顺序分布
这在上面的并行性部分中进行了讨论。请注意,除 spl(iterator) 方法外,所有命令都默认启用此功能。
过滤键
此方法可用于让分布式流仅对给定的一组键进行操作。这是以一种非常有效的方式完成的,因为它只会在拥有给定键的节点上执行操作。使用一组给定的键还允许来自数据容器/存储的恒定访问时间,因为缓存不必查看缓存中的每个条目。
filterKeySegments(仅限高级用户)
这对于以更高效的方式过滤实例很有用。通常,您可以使用过滤器中间操作,但此方法在执行任何操作之前执行,以最有效地限制为流处理提供的条目。例如,如果只需要一部分段,则可能不必发送远程请求。
segmentCompletionListener(仅限高级用户)
与前面的方法类似,这与关键段有关。此侦听器允许在完成段处理时通知最终用户。如果您想跟踪完成情况,并且如果此节点出现故障,您可以仅使用未处理的段重新运行处理,这将很有用。目前,此侦听器仅支持 spl(iterator) 方法。
disableRehashAware(仅限高级用户)
默认情况下,所有流操作都是所谓的重新散列感知。也就是说,如果一个节点在操作正在进行时加入或离开集群,集群将意识到这一点并确保所有数据都得到正确处理而没有丢失(假设实际上没有数据丢失)。
这可以通过调用 disableRehashAware 来禁用;但是,如果在操作中间发生重新散列,则可能无法处理所有数据。应该注意的是,禁用此功能后不会多次处理数据,只会丢失数据。
通常不建议使用此选项,除非您有能力只对一部分数据进行操作。权衡是操作可以执行得更快,尤其是 (spl)iterator 和 forEach 方法。
映射/减少
map/reduce 的古老示例始终是字数统计。 Streams 也允许您这样做!这是一个等效的字数统计示例,假设您有一个包含 String 键和值的缓存,并且您想要计算值中所有字数。你们中的一些人可能想知道这与我们现有的 map/reduce 框架有什么关系。计划是弃用现有的 Map/Reduce,并在稍后用新的分布式流完全替换它。
public interface CacheStream<R> extends Stream<R> {
CacheStream<R> sequentialDistribution();
CacheStream<R> parallelDistribution();
CacheStream<R> filterKeySegments(Set<Integer> segments);
CacheStream<R> filterKeys(Set<?> keys);
CacheStream<R> distributedBatchSize(int batchSize);
CacheStream<R> segmentCompletionListener(SegmentCompletionListener listener);
CacheStream<R> disableRehashAware();
CacheStream<R> timeout(long timeout, TimeUnit unit);
interface SegmentCompletionListener {
void segmentCompleted(Set<Integer> segments);
}
}
请记住,分布式流可以做的不仅仅是 map/reduce。并且已经有很多关于流的例子。要使用分布式流,你只需要确保你的操作是可编组的,你就可以开始了。
以下是一些示例,其中包含如何直接使用来自 Oracle 的流的示例:
http://www.oracle.com/technetwork/articles/java/ma14-java-se-8-streams-2177646.html
http://www.oracle.com/technetwork/articles/java/architect-streams-pt2-2227132.html
我希望你喜欢分布式流。我们希望它们能改变您与集群中的数据交互的方式!
让我们知道您的想法、任何您想分享的问题或用法!
干杯,
将要