博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
hadoop 之 kafka 安装与 flume -> kafka 整合
阅读量:7119 次
发布时间:2019-06-28

本文共 2262 字,大约阅读时间需要 7 分钟。

62-kafka 安装 : flume 整合 kafka

一.kafka 安装

1.下载

2. 解压

tar -zxvf kafka_2.10-0.8.1.1.tgz

3.启动服务

3.1 首先启动zookeeper服务

bin/zookeeper-server-start.sh config/zookeeper.properties

3.2启动Kafka

bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &

3.3创建topic

创建一个"test"的topic,一个分区一个副本

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看主题

bin/kafka-topics.sh --list --zookeeper localhost:2181
查看主题详情
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
删除主题
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

二. flume -整合 --kafka

1.启动 flume (配置文件)

flume-ng agent --conf conf -f /bigdata/flume-1.6/conf/kafka.conf -name producer -Dlume.root.logger=DEBUG,console

2.启动 kafka

bin/zookeeper-server-start.sh config/zookeeper.properties

bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &

3. 发送 消息

echo 'wo l g q .' |nc -u hadoop1 8285

4.--启动consumer查看是否接受到信息

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

三.flume - kafka 错误

java.lang.ClassNotFoundException: org.apache.flume.plugins.KafkaSinkjar -tf flume-ng-kafka-sink-1.6.0.jar | fgrep KafkaSink,你就能确定这里面有没有KafkaSink了producer.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink  (一定要参考官网配置 agent) 

  

  

四. kafka.conf

  

producer agent 配置

#memory channel called ch1 on agent1producer.channels.channel1.type = memory# Define an Avro source called avro-source1 on agent1 and tell it# to bind to 0.0.0.0:41414. Connect it to channel ch1.producer.sources.source1.channels = channel1producer.sources.source1.type = syslogudpproducer.sources.source1.bind = 127.0.0.1producer.sources.source1.port = 8285# Define a logger sink that simply logs all events it receives# and connect it to the other end of the same channel.producer.sinks.sink1.channel = channel1producer.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSinkproducer.sinks.sink1.brokerList=127.0.0.1:9092producer.sinks.sink1.topic=testproducer.sinks.sink1.batchSize=20 # Finally, now that we've defined all of our components, tell# agent1 which ones we want to activate.producer.channels = channel1producer.sources = source1producer.sinks = sink1

转载于:https://www.cnblogs.com/chaoren399/p/5479563.html

你可能感兴趣的文章
virtio分析
查看>>
SVN使用方法及问题解决
查看>>
Linux- 日常运维-w-查看系统负载
查看>>
Spring4+Springmvc+quartz实现多线程动态定时调度
查看>>
Mozilla在Thunderbird 60.3中的修补了多个安全漏洞
查看>>
HDFS进阶应用 配置 NFS 网关
查看>>
[小型企业]不同vlan配置DHCP服务
查看>>
定时任务执行利器Timer和ScheduledThreadPoolExecutor使用
查看>>
iOS 网络编程(二)
查看>>
mysql用户管理
查看>>
Redis的使用原理
查看>>
亚洲诚信亮相2018天翼智能生态博览会
查看>>
centos7实现Linux和Windows共享
查看>>
SLAM技术的应用及发展现状
查看>>
java 进销存 销售报表 库存管理 商户管理 springmvc SSM crm 项目
查看>>
学习nodejs之hello world
查看>>
几个容易混淆的对齐概念
查看>>
那些不能错过的Xcode插件
查看>>
centos7源码编译安装mariadb
查看>>
5个资源满满的网站,都是百度找不到的好资源,30T的硬盘瞬间爆满
查看>>