storm kafka spoutspout 要配置多少

storm-kafka的使用
1.storm-kafka介绍
storm-kafka是storm自带的从kafka上获取消息的kafka客户端程序。
提供kafka和Trident的spout实现从kafka消费数据。
2.storm-kafka的使用实例
maven的依赖配置文件,要注意strom-kafka是使用的kafka的低级api,因此也要引用kafka的包。如果不引,虽然编译不报错,但运行时会报错,我在初次使用时就是因为这个原因一直有问题。
&dependency&
&groupId&org.apache.storm&/groupId&
&artifactId&storm-core&/artifactId&
&version&0.9.5&/version&
&scope&provided&/scope&
&/dependency&
&dependency&
&groupId&org.apache.storm&/groupId&
&artifactId&storm-kafka&/artifactId&
&version&0.9.5&/version&
&scope&provided&/scope&
&/dependency&
&dependency&
&groupId&org.apache.kafka&/groupId&
&artifactId&kafka_2.10&/artifactId&
&version&0.8.1.1&/version&
&exclusions&
&exclusion&
&groupId&org.apache.zookeeper&/groupId&
&artifactId&zookeeper&/artifactId&
&/exclusion&
&exclusion&
&groupId&log4j&/groupId&
&artifactId&log4j&/artifactId&
&/exclusion&
&/exclusions&
&/dependency&
下面是我写的一个demo
具体步骤如下
1.new BrokerHosts
需要的参数zookeeper的地址
2.new SpoutConfig
构建SpoutConfig,需要设置BrokerHosts,kafka的topic,strom在zookeeper上的根等相关的参数。
3.new TopologyBuilder
给TopologyBuilder设置Soupt和Boult用于构建一个Topology
4.配置Config并设置参数,启动LocalCluster,提交topology任务。
import java.util.A
import java.util.M
import storm.kafka.BrokerH
import storm.kafka.KafkaS
import storm.kafka.SpoutC
import storm.kafka.StringS
import storm.kafka.ZkH
import backtype.storm.C
import backtype.storm.LocalC
import backtype.storm.spout.SchemeAsMultiS
import backtype.storm.task.OutputC
import backtype.storm.task.TopologyC
import backtype.storm.topology.OutputFieldsD
import backtype.storm.topology.TopologyB
import backtype.storm.topology.base.BaseRichB
import backtype.storm.tuple.T
* @Description:strom-kafka 使用
* @author:difeng
* @time:日 上午10:18:31
public class StormKafkaConsumer {
public static class PingCounter extends BaseRichBolt{
private static final long serialVersionUID = 1L;
private OutputC
public void execute(Tuple input) {
String msg = input.getString(0);
System.out.println(&---------------------& + msg + &-----------------&);
collector.ack(input);
public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
this.collector = arg2;
System.out.println(&++++++++++++++++++++prepare++++++++++++++++++++++++++++++++++&);
public void declareOutputFields(OutputFieldsDeclarer arg0) {
// TODO Auto-generated method stub
System.out.println(&++++++++++++++++++++declareOutputFields+++++++++++++++++++++&);
public static void main(String[] args) {
//zookeeper的服务器地址
String zks = &192.168.1.50:.1.57:.1.58:2181&;
//消息的topic
String topic = &data_icmp_ping&;
//strom在zookeeper上的根
String zkRoot = &/storm&;
String id = &data_icmp_ping&;
BrokerHosts brokerHosts = new ZkHosts(zks);
SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConf.forceFromStart = true;
spoutConf.zkServers = Arrays.asList(new String[] {&192.168.1.50,192.168.1.57,192.168.1.58&});
spoutConf.zkPort = 2181;
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(&kafka-reader&, new KafkaSpout(spoutConf),3);
builder.setBolt(&ping-counter&, new PingCounter(),3).shuffleGrouping(&kafka-reader&);
Config conf = new Config();
conf.setDebug(true);
//设置任务线程数
conf.setMaxTaskParallelism(1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(&test&, conf, builder.createTopology());
Thread.sleep(60000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
cluster.shutdown();
阅读(...) 评论()他的最新文章
他的热门文章
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)他的最新文章
他的热门文章
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)<option value='/video/av/index_1.html' cid='、1
<option value='/video/av/index_2.html' cid='、2
<option value='/video/av/index_3.html' cid='、3
视频地址复制
Flash地址复制
Html地址复制
离线看更方便
用或其他应用扫描二维码
广播电视节目制作经营许可证:(沪)字第1248号
| 网络文化经营许可证:沪网文[6号 | 信息网络传播视听节目许可证:0910417 | 互联网ICP备案:沪ICP备号-3 沪ICP证:沪B2- | 违法不良信息举报邮箱: | 违法不良信息举报电话:转3
公司名称:上海宽娱数码科技有限公司 | 公司地址:上海市杨浦区政立路485号 | 客服电话:他的最新文章
他的热门文章
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)

我要回帖

更多关于 kafkaspoutconfig 的文章

 

随机推荐