最新 Zookeeper + Flume + Kafka 简易整合教程

在大数据领域有很多耳熟能详的框架,今天要介绍的就是 zookeeper、flume、kafka。因为平时是做数据接入的,所以对这些实时的数据处理系统不是很熟悉。通过官网的简要介绍,搭建了一套简要的平台,主要实现的功能是消费 kafka 中从 flume 传递过来的消息,当让为了方便这里所有的输入输出都在控制台完成。当然注意我所使用的版本,切不可生搬硬套,这是学习技术的大忌,当然这些系统都是在 Linux 或者 macOS 系统下运行的,如果是Windows就不要尝试了。


其实大数据平台上面有很多优秀的系统,很多都是分布式的,这些系统的架构比我们平时写的业务系统要复杂的多。但是时间有限,下面简要说明各个组件作用,至于原理等细节不做讲解,如果以后自己研究透了再分享给大家。

配置 Zookeeper – 3.4.8

Zookeeper 是一个很稳定的系统,我自己没有用过。但是印象中它是很可靠的,几乎是不会宕机的,因为它有一个选举机制,某个主节点挂掉了会从新选举一个主节点,大数据平台有的框架主节点宕机了整个集群就不可用了。因为上述的原因,所以一些可靠性要求高的系统会使用到,例如 Hbase,还有今天用到的 Kafka。

  • 下载Zookeeper

    下载地址:http://apache.mirrors.lucidnetworks.net/zookeeper/zookeeper-3.4.8/zookeeper-3.4.8.tar.gz

  • 解压Zookeeper

    解压也很简单,使用tar命令来完成,你可以解压到自己想要的路径,下面的命令是解压到当前路径下

    tar -zxvf zookeeper-3.4.8.tar.gz
    
    
  • 配置Zookeeper

    配置也很简单,官网也给出了说明,我这里简要配置一下。
    官网配置教程:https://zookeeper.apache.org/doc/trunk/zookeeperStarted.html
    其实主要是配置 zoo.cfg 来这个文件,在 conf 文件夹有一个 zoo_sample.cfg,我们只需要拷贝一份,更改一下名字,使用默认配置即可,不需要修改。

    cp zoo_sample.cfg zoo.cfg
    
    

    默认配置中 Zookeeper 的端口为 2181,这个端口很重要,后面配置 Kafka 会用到。

  • 启动Zookeeper

    使用下面的命令启动

    bin/zkServer.sh start
    
    

    上面的服务是一个后台服务,不会占用当前的控制台。

配置 Flume – 1.7.0

Flume的配置也很简单,需要我们配置两个文件,然后启动即可。

  • 下载 Flume

    下载地址:http://apache.claz.org/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz

  • 解压 Flume

    tar -zxvf apache-flume-1.7.0-bin.tar.gz
    
    
  • 配置 flume-env.sh

    这里我们主要配置 JAVA_HOME 这个变量,因为我的是 macOS ,所以配置如下:

    export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home
    
    
  • 配置 source、channel、sink

    这里简要讲述下三个术语的概念,source 用来描述我们数据的来源,channel用来描述数据缓存的类型,可以是内存,也可以是文件,sink 用来描述数据目的地。上述三个配置项都在 conf/flume.conf 中,我们可以从 flume-conf.properties.template 复制一份。

    cp flume-conf.properties.template flume.conf
    
    
  • flume.conf的配置信息
    # Licensed to the Apache Software Foundation (ASF) under one
    # or more contributor license agreements.  See the NOTICE file
    # distributed with this work for additional information
    # regarding copyright ownership.  The ASF licenses this file
    # to you under the Apache License, Version 2.0 (the
    # "License"); you may not use this file except in compliance
    # with the License.  You may obtain a copy of the License at
    #
    #  http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing,
    # software distributed under the License is distributed on an
    # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    # KIND, either express or implied.  See the License for the
    # specific language governing permissions and limitations
    # under the License.
    
    # The configuration file needs to define the sources,
    # the channels and the sinks.
    # Sources, channels and sinks are defined per agent,
    # in this case called 'agent'
    
    agent1.sources = avro-source1
    agent1.channels = ch1
    agent1.sinks = log-sink1
    
    # For each one of the sources, the type is defined
    agent1.sources.avro-source1.type = netcat
    agent1.sources.avro-source1.bind=0.0.0.0
    agent1.sources.avro-source1.port=41414
    # The channel can be defined as follows.
    agent1.sources.avro-source1.channels = ch1
    
    # Each sink's type must be defined
    agent1.sinks.log-sink1.type = org.apache.flume.sink.kafka.KafkaSink
    agent1.sinks.log-sink1.kafka.bootstrap.servers=0.0.0.0:9092
    agent1.sinks.log-sink1.kafka.topic=test
    #Specify the channel the sink should use
    agent1.sinks.log-sink1.channel = ch1
    
    # Each channel's type is defined.
    agent1.channels.ch1.type = memory
    
    # Other config values specific to each type of channel(sink or source)
    # can be defined as well
    # In this case, it specifies the capacity of the memory channel
    agent1.channels.memoryChannel.capacity = 100
    
    

如果对上述配置文件中的参数有不理解的可以自行百度了解,这里简要介绍下
几个配置项。

  1. 配置控制台为数据源
    agent1.sources.avro-source1.type = netcat
    agent1.sources.avro-source1.bind=0.0.0.0
    agent1.sources.avro-source1.port=41414
    
    

    这样配置我们 flume 就可以搜集到我们控制台输入的信息。

  2. 配置 Kafka 为数据源的目标端

    agent1.sinks.log-sink1.type = org.apache.flume.sink.kafka.KafkaSink
    agent1.sinks.log-sink1.kafka.bootstrap.servers=0.0.0.0:9092
    agent1.sinks.log-sink1.kafka.topic=test
    
    

    上面配置 flume 会将控制输入的信息写入到 kafka 中的 test 中,这里的主题 ‘test’ 在后面中 kafka 需要创建。

  • 启动 flume
    执行下面的命令:

    bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n agent1
    
    

    最后的 agent1 是我们上面 flume.cfg 中配置过的,要与配置相配置。
    启动后的输出:

    Info: Sourcing environment configuration script /Users/chenxl/Documents/soft/apache-flume-1.7.0-bin/conf/flume-env.sh
    + exec /Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/bin/java -Xmx20m -Dflume.root.logger=DEBUG,console -cp '/Users/chenxl/Documents/soft/apache-flume-1.7.0-bin/conf:/Users/chenxl/Documents/soft/apache-flume-1.7.0-bin/lib/*' -Djava.library.path= org.apache.flume.node.Application -f conf/flume.conf -n agent1
    2017-05-11 23:11:35,429 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:62)] Configuration provider starting
    2017-05-11 23:11:35,433 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:79)] Configuration provider started
    2017-05-11 23:11:35,436 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:conf/flume.conf for changes
    2017-05-11 23:11:35,437 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:134)] Reloading configuration file:conf/flume.conf
    2017-05-11 23:11:35,442 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:log-sink1
    2017-05-11 23:11:35,442 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1020)] Created context for log-sink1: channel
    2017-05-11 23:11:35,442 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:930)] Added sinks: log-sink1 Agent: agent1
    2017-05-11 23:11:35,442 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:log-sink1
    2017-05-11 23:11:35,443 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:log-sink1
    2017-05-11 23:11:35,443 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:log-sink1
    2017-05-11 23:11:35,443 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:313)] Starting validation of configuration for agent: agent1
    2017-05-11 23:11:35,444 (conf-file-poller-0) [INFO - org.apache.flume.conf.LogPrivacyUtil.<clinit>(LogPrivacyUtil.java:51)] Logging of configuration details is disabled. To see configuration details in the log run the agent with -Dorg.apache.flume.log.printconfig=true JVM argument. Please note that this is not recommended in production systems as it may leak private information to the logfile.
    2017-05-11 23:11:35,448 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateChannels(FlumeConfiguration.java:467)] Created channel ch1
    2017-05-11 23:11:35,454 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:674)] Creating sink: log-sink1 using OTHER
    2017-05-11 23:11:35,455 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:135)] Channels:ch1
    2017-05-11 23:11:35,455 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:136)] Sinks log-sink1
    2017-05-11 23:11:35,457 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:137)] Sources avro-source1
    2017-05-11 23:11:35,457 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)] Post-validation flume configuration contains configuration for agents: [agent1]
    2017-05-11 23:11:35,457 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:147)] Creating channels
    2017-05-11 23:11:35,463 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel ch1 type memory
    2017-05-11 23:11:35,468 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:201)] Created channel ch1
    2017-05-11 23:11:35,469 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source avro-source1, type netcat
    2017-05-11 23:11:35,480 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: log-sink1, type: org.apache.flume.sink.kafka.KafkaSink
    2017-05-11 23:11:35,480 (conf-file-poller-0) [DEBUG - org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:62)] Sink type org.apache.flume.sink.kafka.KafkaSink is a custom type
    2017-05-11 23:11:35,487 (conf-file-poller-0) [INFO - org.apache.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:302)] Using the static topic test. This may be overridden by event headers
    2017-05-11 23:11:35,487 (conf-file-poller-0) [DEBUG - org.apache.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:310)] Using batch size: 100
    2017-05-11 23:11:35,487 (conf-file-poller-0) [DEBUG - org.apache.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:320)] useFlumeEventFormat set to: false
    2017-05-11 23:11:35,513 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:116)] Channel ch1 connected to [avro-source1, log-sink1]
    2017-05-11 23:11:35,526 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:137)] Starting new configuration:{ sourceRunners:{avro-source1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:avro-source1,state:IDLE} }} sinkRunners:{log-sink1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@33583d6 counterGroup:{ name:null counters:{} } }} channels:{ch1=org.apache.flume.channel.MemoryChannel{name: ch1}} }
    2017-05-11 23:11:35,527 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:144)] Starting Channel ch1
    2017-05-11 23:11:35,594 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: ch1: Successfully registered new MBean.
    2017-05-11 23:11:35,594 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: ch1 started
    2017-05-11 23:11:35,595 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:171)] Starting Sink log-sink1
    2017-05-11 23:11:35,595 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:182)] Starting Source avro-source1
    2017-05-11 23:11:35,597 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:155)] Source starting
    2017-05-11 23:11:35,619 (lifecycleSupervisor-1-1) [INFO - org.apache.kafka.common.config.AbstractConfig.logAll(AbstractConfig.java:165)] ProducerConfig values:
    compression.type = none
    metric.reporters = []
    metadata.max.age.ms = 300000
    metadata.fetch.timeout.ms = 60000
    reconnect.backoff.ms = 50
    sasl.kerberos.ticket.renew.window.factor = 0.8
    bootstrap.servers = [0.0.0.0:9092]
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    buffer.memory = 33554432
    timeout.ms = 30000
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.keystore.type = JKS
    ssl.trustmanager.algorithm = PKIX
    block.on.buffer.full = false
    ssl.key.password = null
    max.block.ms = 60000
    sasl.kerberos.min.time.before.relogin = 60000
    connections.max.idle.ms = 540000
    ssl.truststore.password = null
    max.in.flight.requests.per.connection = 5
    metrics.num.samples = 2
    client.id =
    ssl.endpoint.identification.algorithm = null
    ssl.protocol = TLS
    request.timeout.ms = 30000
    ssl.provider = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    acks = 1
    batch.size = 16384
    ssl.keystore.location = null
    receive.buffer.bytes = 32768
    ssl.cipher.suites = null
    ssl.truststore.type = JKS
    security.protocol = PLAINTEXT
    retries = 0
    max.request.size = 1048576
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    ssl.truststore.location = null
    ssl.keystore.password = null
    ssl.keymanager.algorithm = SunX509
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    send.buffer.bytes = 131072
    linger.ms = 0
    2017-05-11 23:11:35,625 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:169)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:41414]
    2017-05-11 23:11:35,626 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.source.NetcatSource.start(NetcatSource.java:190)] Source started
    2017-05-11 23:11:35,626 (Thread-1) [DEBUG - org.apache.flume.source.NetcatSource$AcceptHandler.run(NetcatSource.java:270)] Starting accept handler
    
    

配置 Kafka -2.11-0.10.2.0

终于要到了最后一步了,到这一步已经完成 80% 的任务了。

  • 下载 Kafka

    下载地址:http://www.gtlib.gatech.edu/pub/apache/kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz

  • 解压 Kafka

    tar -zxvf kafka_2.11-0.10.2.1.tgz
    
    
  • 配置 Kafka

    官方教程:http://kafka.apache.org/quickstart
    因为 Kafka 的默认配置满足我们的需求,所以无需修改,如果你的其它组件的配置与我的不一样可以参考官方教程做修改。

  • 启动 Kafka

    bin/kafka-server-start.sh config/server.properties
    
    
  • 创建 topic
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    
    
  • 查看 topic
    bin/kafka-topics.sh --list --zookeeper localhost:2181
    
    
  • 创建 Kafka 消费者
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    
    

    上面的命令执行之后不要退出 shell,因为这里会打印我们输入的信息。

测试

  • 在另外一个控制台执行下面的命令:

    telnet localhost 41414
    
    

    上面的命令我第一敲错了,主机和写成了’localhost:41414’,导致无法连接到 flume,还好测试了几遍,最后对照官网发现了不同之处。可能是因为最近一直使用 Presto,这种写法敲顺手了,所以各位敲命令时要仔细,如果发现与预期不符要排查原因。

  • 输入字符串

    如果你在这里的控制台上面输入字符串,Kafka中的消费者会打印当前输入的字符串,下图展示下我的实验结果。

发表评论

电子邮件地址不会被公开。 必填项已用*标注