62-kafka 安装 : flume 整合 kafka
一.kafka 安装
1.下载
2. 解压
tar -zxvf kafka_2.10-0.8.1.1.tgz
3.启动服务
3.1 首先启动zookeeper服务
bin/zookeeper-server-start.sh config/zookeeper.properties
3.2启动Kafka
bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &
3.3创建topic
创建一个"test"的topic,一个分区一个副本
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test查看主题
bin/kafka-topics.sh --list --zookeeper localhost:2181 查看主题详情 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test 删除主题 bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test二. flume -整合 --kafka
1.启动 flume (配置文件)
flume-ng agent --conf conf -f /bigdata/flume-1.6/conf/kafka.conf -name producer -Dlume.root.logger=DEBUG,console
2.启动 kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &
3. 发送 消息
echo 'wo l g q .' |nc -u hadoop1 8285
4.--启动consumer查看是否接受到信息
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
三.flume - kafka 错误
java.lang.ClassNotFoundException: org.apache.flume.plugins.KafkaSinkjar -tf flume-ng-kafka-sink-1.6.0.jar | fgrep KafkaSink,你就能确定这里面有没有KafkaSink了producer.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink (一定要参考官网配置 agent)
四. kafka.conf
producer agent 配置
#memory channel called ch1 on agent1producer.channels.channel1.type = memory# Define an Avro source called avro-source1 on agent1 and tell it# to bind to 0.0.0.0:41414. Connect it to channel ch1.producer.sources.source1.channels = channel1producer.sources.source1.type = syslogudpproducer.sources.source1.bind = 127.0.0.1producer.sources.source1.port = 8285# Define a logger sink that simply logs all events it receives# and connect it to the other end of the same channel.producer.sinks.sink1.channel = channel1producer.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSinkproducer.sinks.sink1.brokerList=127.0.0.1:9092producer.sinks.sink1.topic=testproducer.sinks.sink1.batchSize=20 # Finally, now that we've defined all of our components, tell# agent1 which ones we want to activate.producer.channels = channel1producer.sources = source1producer.sinks = sink1