2022-10-23 445
Storm是一个分布式的流处理系统,利用anchor和ack机制保证所有tuple都被成功处理。如果tuple出错,则可以被重传,但是如何保证出错的tuple只被处理一次呢?Storm提供了一套事务性组件Transaction Topology,用来解决这个问题。
Transactional Topology目前已经不再维护,由Trident来实现事务性topology,但是原理相同。
一、一致性事务的设计
Storm如何实现即对tuple并行处理,又保证事务性。本节从简单的事务性实现方法入手,逐步引出Transactional Topology的原理。
1、简单设计一:强顺序流
保证tuple只被处理一次,最简单的方法就是将tuple流变成强顺序的,并且每次只处理一个tuple。从1开始,给每个tuple都顺序加上一个id。在处理tuple的时候,将处理成功的tuple id和计算结果存在数据库中。下一个tuple到来的时候,将其id与数据库中的id做比较。如果相同,则说明这个tuple已经被成功处理过了,忽略它;如果不同,根据强顺序性,说明这个tuple没有被处理过,将它的id及计算结果更新到数据库中。
以统计消息总数为例。每来一个tuple,如果数据库中存储的id 与当前tuple id不同,则数据库中的消息总数加1,同时更新数据库中的当前tuple id值。如图:
但是这种机制使得系统一次只能处理一个tuple,无法实现分布式计算。
2、简单设计二:强顺序batch流
为了实现分布式,我们可以每次处理一批tuple,称为一个batch。一个batch中的tuple可以被并行处理。
我们要保证一个batch只被处理一次,机制和上一节类似。只不过数据库中存储的是batch id。batch的中间计算结果先存在局部变量中,当一个batch中的所有tuple都被处理完之后,判断batch id,如果跟数据库中的id不同,则将中间计算结果更新到数据库中。
如何确保一个batch里面的所有tuple都被处理完了呢?可以利用Storm提供的CoordinateBolt。如图:
但是强顺序batch流也有局限,每次只能处理一个batch,batch之间无法并行。要想实现真正的分布式事务处理,可以使用storm提供的Transactional Topology。在此之前,我们先详细介绍一下CoordinateBolt的原理。
3、CoordinateBolt原理
CoordinateBolt具体原理如下:
整个过程如图所示:
CoordinateBolt主要用于两个场景:
CoordinatedBolt对于业务是有侵入的,要使用CoordinatedBolt提供的功能,你必须要保证你的每个bolt发送的每个tuple的第一个field是request-id。 所谓的“我已经处理完我的上游”的意思是说当前这个bolt对于当前这个request-id所需要做的工作做完了。这个request-id在DRPC里面代表一个DRPC请求;在Transactional Topology里面代表一个batch。
4、Trasactional Topology
Storm提供的Transactional Topology将batch计算分为process和commit两个阶段。Process阶段可以同时处理多个batch,不用保证顺序性;commit阶段保证batch的强顺序性,并且一次只能处理一个batch,第1个batch成功提交之前,第2个batch不能被提交。
还是以统计消息总数为例,以下代码来自storm-starter里面的TransactionalGlobalCount。
MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA,new Fields(“word“), PARTITION_TAKE_PER_BATCH);
TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder(“global-count“, “spout“, spout, 3);
builder.setBolt(“partial-count“, new BatchCount(), 5).noneGrouping(“spout“);
builder.setBolt(“sum“, new UpdateGlobalCount()).globalGrouping(“partial-count“);
TransactionalTopologyBuilder共接收四个参数。
下面是BatchCount的定义:
publicstaticclassBatchCountextendsBaseBatchBolt{ Object_id; BatchOutputCollector_collector; int_count=0; @Override publicvoidprepare(Mapconf,TopologyContextcontext, BatchOutputCollectorcollector,Objectid){ _collector=collector; _id=id; } @Override publicvoidexecute(Tupletuple){ _count++; } @Override publicvoidfinishBatch(){ _collector.emit(newValues(_id,_count)); } @Override publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){ declarer.declare(newFields(“id“,“count“)); } }
BatchCount的prepare方法的最后一个参数是batch id,在Transactional Tolpoloyg里面这id是一个TransactionAttempt对象。
Transactional Topology里发送的tuple都必须以TransactionAttempt作为第一个field,storm根据这个field来判断tuple属于哪一个batch。
TransactionAttempt包含两个值:一个transaction id,一个attempt id。transaction id的作用就是我们上面介绍的对于每个batch中的tuple是唯一的,而且不管这个batch replay多少次都是一样的。attempt id是对于每个batch唯一的一个id, 但是对于同一个batch,它replay之后的attempt id跟replay之前就不一样了, 我们可以把attempt id理解成replay-times, storm利用这个id来区别一个batch发射的tuple的不同版本。
execute方法会为batch里面的每个tuple执行一次,你应该把这个batch里面的计算状态保持在一个本地变量里面。对于这个例子来说, 它在execute方法里面递增tuple的个数。
最后, 当这个bolt接收到某个batch的所有的tuple之后, finishBatch方法会被调用。这个例子里面的BatchCount类会在这个时候发射它的局部数量到它的输出流里面去。
下面是UpdateGlobalCount类的定义:
publicstaticclassUpdateGlobalCountextendsBaseTransactionalBolt implementsICommitter{ TransactionAttempt_attempt; BatchOutputCollector_collector; int_sum=0; @Override publicvoidprepare(Mapconf,TopologyContextcontext, BatchOutputCollectorcollector,TransactionAttemptattempt){ _collector=collector; _attempt=attempt; } @Override publicvoidexecute(Tupletuple){ _sum+=tuple.getInteger(1); } @Override publicvoidfinishBatch(){ Valueval=DATABASE.get(GLOBAL_COUNT_KEY); Valuenewval; if(val==null||!val.txid.equals(_attempt.getTransactionId())){ newnewval=newValue(); newval.txid=_attempt.getTransactionId(); if(val==null){ newval.count=_sum; }else{ newval.count=_sum+val.count; } DATABASE.put(GLOBAL_COUNT_KEY,newval); }else{ newval=val; } _collector.emit(newValues(_attempt,newval.count)); } @Override publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){ declarer.declare(newFields(“id“,“sum“)); } }
UpdateGlobalCount实现了ICommitter接口,所以storm只会在commit阶段执行finishBatch方法。而execute方法可以在任何阶段完成。
在UpdateGlobalCount的finishBatch方法中,将当前的transaction id与数据库中存储的id做比较。如果相同,则忽略这个batch;如果不同,则把这个batch的计算结果加到总结果中,并更新数据库。
Transactional Topolgy运行示意图如下:
下面总结一下Transactional Topology的一些特性:
二、Trident介绍
Trident是Storm之上的高级抽象,提供了joins,grouping,aggregations,fuctions和filters等接口。如果你使用过Pig或Cascading,对这些接口就不会陌生。
Trident将stream中的tuples分成batches进行处理,API封装了对这些batches的处理过程,保证tuple只被处理一次。处理batches中间结果存储在TridentState对象中。
Trident事务性原理这里不详细介绍,有兴趣的读者请自行查阅资料。
原文链接:https://77isp.com/post/8935.html
=========================================
https://77isp.com/ 为 “云服务器技术网” 唯一官方服务平台,请勿相信其他任何渠道。
数据库技术 2022-03-28
网站技术 2022-11-26
网站技术 2023-01-07
网站技术 2022-11-17
Windows相关 2022-02-23
网站技术 2023-01-14
Windows相关 2022-02-16
Windows相关 2022-02-16
Linux相关 2022-02-27
数据库技术 2022-02-20
抠敌 2023年10月23日
嚼餐 2023年10月23日
男忌 2023年10月22日
瓮仆 2023年10月22日
簿偌 2023年10月22日
扫码二维码
获取最新动态