2015-07-07 12:09:50
来 源
中存储网
Nginx
没有用Storm常用的队列,如Kafka、MetaQ等,fqueue是一个轻量的memcached协议队列,把普通的日志文件转为memcached的服务,这样Storm的Spout就可以直接以memcached协议逐条读取。

UAE(UC App Engine)是一个UC内部的PaaS平台,总体架构有点类似CloudFoundry,包括:

  1. 快速部署:支持Node.js、Play!、PHP等框架
  2. 信息透明:运维过程、系统状态、业务状况
  3. 灰度试错:IP灰度、地域灰度
  4. 基础服务:key-value存储、MySQL高可用、图片平台等

这里它不是主角,不作详细介绍。

有数百个Web应用运行在UAE上,所有的请求都会经过UAE的路由,每天的Nginx access log大小是TB级,如何实时监控每个业务的访问趋势、广告数据、页面耗时、访问质量、自定义报表和异常报警?

Hadoop可以满足统计需求,但秒级的实时性不能满足;用Spark Streaming又有些大材小用,同时我们也没有Spark的工程经验;自写分布式程序调度比较麻烦并且要考虑扩展、消息流动;

最后我们的技术选型定为Storm:相对轻量、灵活、消息传递方便、扩展灵活。

另外,而由于UC的各地集群比较多,跨集群日志传输也会是其中一个比较大的问题。

技术准备

基数计数(Cardinality Counting)

在大数据分布式计算的时候,PV(Page View)可以很方便相加合并,但UV(Unique Visitor)不能。

分布式计算的情况下,几百个业务、数十万URL同时统计UV,如果还要分时段统计(每分钟/每5分钟合并/每小时合并/每天合并),内存的消耗是不可接受的。

这个时候,概率的力量就体现了出来。我们在Probabilistic Data Structures for Web Analytics and Data Mining可以看到,精确的哈希表统计UV和基数计数的内存比较,并不是一个数量级的。基数计数可以让你实现UV的合并,内存消耗极小,并且误差完全在可接受范围内。

可以先了解LogLog Counting,理解均匀哈希方法的前提下,粗糙估计的来由即可,后面的公式推导可以跳过。

具体算法是Adaptive Counting,使用的计算库是stream-2.7.0.jar。

实时日志传输

实时计算必须依赖于秒级的实时日志传输,附加的好处是可以避免阶段性传输引起的网络拥堵。

实时日志传输是UAE已有的轻量级的日志传输工具,成熟稳定,直接拿来用了,包括客户端(mca)和服务器端(mcs)。

客户端监听各个集群的日志文件的变化,传输到指定的Storm集群的各台机器上,存储为普通日志文件。

我们调整了传输策略,使得每台Storm机器上的日志文件大小大致相同,所以Spout只读取本机数据即可。

数据源队列

我们并没有用Storm常用的队列,如Kafka、MetaQ等,主要是太重了…

fqueue是一个轻量的memcached协议队列,把普通的日志文件转为memcached的服务,这样Storm的Spout就可以直接以memcached协议逐条读取。

这个数据源比较简单,它不支持重新发射(replay),一条记录被取出之后就不复存在,如果某个tuple处理失败或超时,则数据丢失。

它比较轻量,基于本地文件读取,做了一层薄的缓存,并不是一个纯内存的队列,它的性能瓶颈在于磁盘IO,每秒吞吐量跟磁盘读取速度是一致的。但对于我们这个系统已经足够,后续有计划改成纯内存队列。

架构

通过上面的技术储备,我们可以在用户访问几秒后就能获取到用户的日志。

discovery_arch

整体架构也比较简单,之所以有两种计算bolt,是基于计算的均匀分布考虑。业务的量相差极大,如果仅按业务ID去进行fieldsGrouping,计算资源也会不均衡。

  1. spout将每条原始日志标准化,按照URL分组(fieldsGrouping,为保持每台服务器计算量的均匀),派发到对应的stat_bolt上;
  2. stat_bolt是主要的计算Bolt,将每个业务的URL梳理并计算,如PV、UV、总响应时间、后端响应时间、HTTP状态码统计、URL排序、流量统计等;
  3. merge_bolt将每个业务的数据合并,如PV数,UV数等。当然,这里的UV合并就用到了前面提到的基数计数;
  4. 自写了一个简单的Coordinator协调类,streamId标记为”coordinator”,作用:时间协调(切分batch)、检查任务完成度、超时处理。原理跟Storm自带的Transactional Topolgoy类似。
  5. 实现一个Scheduler通过API获取参数,动态调整Spout、Bolt在各服务器的分布,以便灵活分配服务器资源。
  6. 支持平滑升级Topology:当一个Topology升级的时候,新Topology和旧Topology讲同时运行,协调切换时间,当新的Topology接管了fqueue之后,过河拆桥,杀死旧的Topology。

注意点:

  • Storm机器尽量部署在同一个机柜内,不影响集群内的带宽;
  • 我们的Nginx日志是按小时切分的,如果切分的时间不准确,在00分的时候,就可以看到明显的数据波动,所以,尽量使用Nginx module去切日志,用crontab发信号切会有延迟。切日志这种10秒级的延迟,在大尺度的统计上没有问题,秒级的统计时波动却很明显;
  • 堆太小会导致woker被强制杀死,所以要配置好-Xmx参数;

自定义项

  • 静态资源:静态资源过滤选项,通过Content-Type或后缀筛选特定的静态资源。
  • 资源合并:URL合并,比如RESTful的资源,合并后方便展示;
  • 维度与指标:通过ANTLR v3做语法、词法分析,完成自定义维度和指标,并且后续的报警也支持自定义表达式。

其他

我们还用其他方式实现了:

  • 业务的进程级(CPU/MEM/端口)监控
  • 业务依赖的服务,如MySQL/memcached等的监控
  • 服务器的磁盘/内存/IO/内核参数/语言环境/环境变量/编译环境等监控

关于Storm

 

诞 生

在2011年Storm开源之前,由于Hadoop的火红,整个业界都在喋喋不休地谈论大数据。Hadoop的高吞吐,海量数据处理的能力使得人们可以方便地处理海量数据。但是,Hadoop的缺点也和它的优点同样鲜明——延迟大,响应缓慢,运维复杂。

有需求也就有创造,在Hadoop基本奠定了大数据霸主地位的时候,很多的开源项目都是以弥补Hadoop的实时性为目标而被创造出来。而在这个节骨眼上Storm横空出世了。

Storm带着流式计算的标签华丽丽滴出场了,看看它的一些卖点:

  • 分布式系统:可横向拓展,现在的项目不带个分布式特性都不好意思开源。
  • 运维简单:Storm的部署的确简单。虽然没有Mongodb的解压即用那么简单,但是它也就是多安装两个依赖库而已。
  • 高度容错:模块都是无状态的,随时宕机重启。
  • 无数据丢失:Storm创新性提出的ack消息追踪框架和复杂的事务性处理,能够满足很多级别的数据处理需求。不过,越高的数据处理需求,性能下降越严重。
  • 多语言:实际上,Storm的多语言更像是临时添加上去似的。因为,你的提交部分还是要使用Java实现。

    下面,我们简单地认识一下Storm这个产品。

认 识

Storm是一个免费开源、分布式、高容错的实时计算系统。Storm令持续不断的流计算变得容易,弥补了Hadoop批处理所不能满足的实时要求。Storm经常用于在实时分析、在线机器学习、持续计算、分布式远程调用和ETL等领域。Storm的部署管理非常简单,而且,在同类的流式计算工具,Storm的性能也是非常出众的。

Storm主要分为两种组件Nimbus和Supervisor。这两种组件都是快速失败的,没有状态。任务状态和心跳信息等都保存在Zookeeper上的,提交的代码资源都在本地机器的硬盘上。

  • Nimbus负责在集群里面发送代码,分配工作给机器,并且监控状态。全局只有一个。
  • Supervisor会监听分配给它那台机器的工作,根据需要启动/关闭工作进程Worker。每一个要运行Storm的机器上都要部署一个,并且,按照机器的配置设定上面分配的槽位数。
  • Zookeeper是Storm重点依赖的外部资源。Nimbus和Supervisor甚至实际运行的Worker都是把心跳保存在Zookeeper上的。Nimbus也是根据Zookeerper上的心跳和任务运行状况,进行调度和任务分配的。
  • Storm提交运行的程序称为Topology。
  • Topology处理的最小的消息单位是一个Tuple,也就是一个任意对象的数组。
  • Topology由Spout和Bolt构成。Spout是发出Tuple的结点。Bolt可以随意订阅某个Spout或者Bolt发出的Tuple。Spout和Bolt都统称为component。   

下图是一个Topology设计的逻辑图的例子。   

topology例子2

下图是Topology的提交流程图。

提交2   

下图是Storm的数据交互图。可以看出两个模块Nimbus和Supervisor之间没有直接交互。状态都是保存在Zookeeper上。Worker之间通过ZeroMQ传送数据。

数据流图

  虽然,有些地方做得还是不太好,例如,底层使用的ZeroMQ不能控制内存使用(下个release版本,引入了新的消息机制使用netty代替ZeroMQ),多语言支持更多是噱头,Nimbus还不支持HA。但是,就像当年的Hadoop那样,很多公司选择它是因为它是唯一的选择。而这些先期使用者,反过来促进了Storm的发展。

发 展

Storm已经发展到0.8.2版本了,看一下两年多来,它取得的成就:

  • 有50个大大小小的公司在使用Storm,相信更多的不留名的公司也在使用。这些公司中不乏淘宝,百度,Twitter,Groupon,雅虎等重量级公司。
  • 从开源时候的0.5.0版本,到现在的0.8.0+,和即将到来的0.9.0+。先后添加了以下重大的新特性:

    • 使用kryo作为Tuple序列化的框架(0.6.0)
    • 添加了Transactional topologies(事务性拓扑)的支持(0.7.0)
    • 添加了Trident的支持(0.8.0)
    • 引入netty作为底层消息机制(0.9.0)

Transactional topologies和Trident都是针对实际应用中遇到的重复计数问题和应用性问题的解决方案。可以看出,实际的商用给予了Storm很多良好的反馈。

  • 在GitHub上超过4000个项目负责人。Storm集成了许多库,支持包括Kestrel、Kafka、JMS、Cassandra、Memcached以及更多系统。随着支持的库越来越多,Storm更容易与现有的系统协作。

    Storm的拥有一个活跃的社区和一群热心的贡献者。过去两年,Storm的发展是成功的。

当 前

Storm被广泛应用于实时分析,在线机器学习,持续计算、分布式远程调用等领域。来看一些实际的应用:

  • 一淘-实时分析系统pora:实时分析用户的属性,并反馈给搜索引擎。最初,用户属性分析是通过每天在云梯上定时运行的MR job来完成的。为了满足实时性的要求,希望能够实时分析用户的行为日志,将最新的用户属性反馈给搜索引擎,能够为用户展现最贴近其当前需求的结果。
  • 携程-网站性能监控:实时分析系统监控携程网的网站性能。利用HTML5提供的performance标准获得可用的指标,并记录日志。Storm集群实时分析日志和入库。使用DRPC聚合成报表,通过历史数据对比等判断规则,触发预警事件。   

    如果,业务场景中需要低延迟的响应,希望在秒级或者毫秒级完成分析、并得到响应,而且希望能够随着数据量的增大而拓展。那就可以考虑下,使用Storm了。   

  • 试想下,如果,一个游戏新版本上线,有一个实时分析系统,收集游戏中的数据,运营或者开发者可以在上线后几秒钟得到持续不断更新的游戏监控报告和分析结果,然后马上针对游戏的参数和平衡性进行调整。这样就能够大大缩短游戏迭代周期,加强游戏的生命力(实际上,zynga就是这么干的!虽然使用的不是Storm……Zynga研发之道探秘:用数据说话)。 

  • 除了低延迟,Storm的Topology灵活的编程方式和分布式协调也会给我们带来方便。用户属性分析的项目,需要处理大量的数据。使用传统的MapReduce处理是个不错的选择。但是,处理过程中有个步骤需要根据分析结果,采集网页上的数据进行下一步的处理。这对于MapReduce来说就不太适用了。但是,Storm的Topology就能完美解决这个问题。基于这个问题,我们可以画出这样一个Storm的Topology的处理图。

用户分词

我们只需要实现每个分析的过程,而Storm帮我们把消息的传送和接受都完成了。更加激动人心的是,你只需要增加某个Bolt的并行度就能够解决掉某个结点上的性能瓶颈。

未 来

在流式处理领域里,Storm的直接对手是S4。不过,S4冷淡的社区、半成品的代码,在实际商用方面输给Storm不止一条街。

如果把范围扩大到实时处理,Storm就一点都不寂寞了。

  • Puma:Facebook使用puma和Hbase相结合来处理实时数据,使批处理 计算平台具备一定实时能力。 不过这不算是一个开源的产品。只是内部使用。
  • HStreaming:尝试为Hadoop环境添加一个实时的组件HStreaming能让一个Hadoop平台在几天内转为一个实时系统。分商业版和免费版。也许HStreaming可以借Hadoop的东风,撼动Storm。
  • Spark Streaming:作为UC Berkeley云计算software stack的一部分,Spark Streaming是建立在Spark上的应用框架,利用Spark的底层框架作为其执行基础,并在其上构建了DStream的行为抽象。利用DStream所提供的api,用户可以在数据流上实时进行count,join,aggregate等操作。

    当然,Storm也有Yarn-Storm项目,能让Storm运行在Hadoop2.0的Yarn框架上,可以让Hadoop的MapReduce和Storm共享资源。

总 结

知乎上有一个挺好的问答: 问:实时处理系统(类似s4, storm)对比直接用MQ来做好处在哪里?  答:好处是它帮你做了: 1) 集群控制。2) 任务分配。3) 任务分发 4) 监控 等等。

需要知道Storm不是一个完整的解决方案。使用Storm你需要加入消息队列做数据入口,考虑如何在流中保存状态,考虑怎样将大问题用分布式去解决。解决这些问题的成本可能比增加一个服务器的成本还高。但是,一旦下定决定使用了Storm并解决了那些恼人的细节,你就能享受到Storm给你带来的简单,可拓展等优势了。

技术的发展日新月异,数据处理领域越来越多优秀的开源产品。Storm的过去是成功的,将来会如何发展,我们拭目以待吧。

后记

本文的重点是描述Storm的应用场景和未来的发展前景,让大家对Storm有一个初步的印象。如果,要落地使用的朋友,在网上可以找到很多优秀的Storm的技术文章。例如:Storm的核心贡献者徐明明的博客和淘宝关于storm的文章。

声明: 此文观点不代表本站立场;转载须要保留原文链接;版权疑问请联系我们。