storm 核心API之普通Topology

普通Topology

如果建立自己的Topology(非Transactional的),用户通常需要利用如下接口和对象:

IRichBolt

IRichSpout

TopologyBuilder

public interface ISpout extends Serializable {

void open(Map conf, TopologyContext context, SpoutOutputCollector collector);

void close();

void activate();

void deactivate();

void nextTuple();

void ack(Object msgId);

void fail(Object msgId);

}

public interface IBolt extends Serializable {

void prepare(Map stormConf, TopologyContext context, OutputCollector collector);

void execute(Tuple input);

void cleanup();

}

IRichBolt和IRichSpout与IBolt和ISpout的不同在于多了两个接口:

(OutputFieldsDeclarerdeclarer):声明输出字段

() :该接口是在0.7.0引入的,用于支持组件级的配置,即允许用户针对单个Spout或Bolt进行参数配置。

实现了这两个接口后,通过调用TopologyBuilder建立起Topology。TopologyBuilder实际上是封装了StormTopology的thrift接口,也就是说Topology实际上是通过thrift定义的一个struct,TopologyBuilder将这个对象建立起来,然后nimbus实际上会运行一个thrift服务器,用于接收用户提交的结构。由于是采用thrift实现,所以用户可以用其他语言建立Topology,这样就提供了比较方便的多语言操作支持。

对于用户来说,,通常需要做的就是提供自己的ISpout和IBlot实现,然后利用TopologyBuilder建立起自己需要的拓扑结构。

Storm框架会拿到用户提供这个拓扑结构及Spout和Blot对象,驱动整个处理过程。简单介绍下ISpout的那些接口的调用时机,在创建Spout对象时,会调用open函数。对象销毁时调用close(),但是框架并不保证close函数一定会被调用,因为进程可能是通过kill -9被杀死的。activate和deactivate是在spout被activate或deactivate时被调用,这两个动作是由用户从外部触发的,Strom的命令行提供两个命令activate和deactivate,允许用户activate和deactivate一个Topology,当用户执行deactivate时,对应Topology的spout会被deactivate,产生影响就是spout的nextTuple此后将不会被调用,直到用户再调用activate。Spout的核心功能是通过nextTuple实现的,用户通过该函数完成Tuple的发射。该函数会被框架周期性的调用。会有类似如下的一个循环:

While(true)

{

if(…)

spout.activate();

if(…)

sput.deactivate();

if(…)

spout.nextTupe();

}

首先这三个函数都是在一个线程中,因此不需要同步。其次,nextTuple()不能阻塞,如果没有Tuple可以发射需要立即返回,用户不能提供一个阻塞式的实现,否则可能阻塞整个后台循环。另外,后台可能会调节nextTuple()的调用频率,比如系统有一个配置参数可以控制当前被pending的Tuple最大数目,如果达到这个限制,可能就会做一些流控。

ack和fail则是两个回调函数。Spout在发射出一个tuple后,该tuple会通过acking机制被acker追踪,除了显式的fail和ack外,每个tuple有一个超时时间,如果超过这个时间还未确定该tuple的状态,那么acker会通知spout,这个tuple处理失败了,然后框架得到这个消息后,就会调用spout的fail函数,如果acker发现这个tuple处理成功了,也会通知spout,然后会调用spout的ack函数。所以通常来说用户在发射tuple时,要确保数据不丢失,都会将已经发射的tuple缓存起来,然后在ack函数中删除对应tuple,在fail函数中重发对应的tuple。

另外需要注意的一点是,Spout使用的collector是SpoutOutputCollector,Bolt使用的collector是OutputCollector。这两个虽然提供的功能类似,都是负责发送tuple的,但是由于一个是面向Spout,一个是面向Bolt的,它们的接口也略有不同。具体如下:

public interface ISpoutOutputCollector {

List<Integer> emit(String streamId, List<Object> tuple, Object messageId);

void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId);

void reportError(Throwable error);

}

她是应该难过的往回走,还是蹲下来哭泣?

storm 核心API之普通Topology

相关文章:

你感兴趣的文章:

标签云: