码蚁

打码改变人生

Kafka 部署文档

Posted at — Jan 6, 2017

公司 Kafka 机器更新,顺便也把 Kafka 更新到 0.10.0.1,Kafka 的监控工具也更换为 Kafka Manager

下载 Kafka

https://kafka.apache.org/downloads

$ wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz
$ tar zxvf kafka_2.11-0.10.0.1.tgz

安装 JAVA

# 下载
$ wget --no-check-certificate --no-cookie --header "Cookie: oraclelicense=accept-securebackup-cookie;" http://download.oracle.com/otn-pub/java/jdk/8u111-b14/jdk-8u111-linux-x64.rpm

# 安装
$ rpm -ivh jdk-8u111-linux-x64.rpm

# 设置环境变量
$ vi ~/.bashrc
# 在文件的最后添加如下代码:

export JAVA_HOME=/usr/lib/java/
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH

# 运行 source ~/.bashrc,让脚本生效。

# 检验安装结果
$ java -version
java version "1.8.0_111"
Java(TM) SE Runtime Environment (build 1.8.0_111-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode)

安装 Zookeeper

修改启动文件

# 内存修改为 1G

$ vi ./bin/zookeeper-server-start.sh

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

修改配置文件

# The number of milliseconds of each tick
# Zokeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
# Zookeeper的Leader 接受客户端(Follower)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
#表示 Leader 与 Follower 之间发送消息时请求和应答时间长度,最长不能超过多少个tickTime 的时间长度,总的时间长度就是 2*2000=4 秒
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
# 数据目录
dataDir=/data/zookeeper_new
# the port at which the clients will connect
# 客户端连接端口
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
maxClientCnxns=1024
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
# server.A=B:C:D:其中A 是一个数字,表示这个是第几号服务器;B 是这个服务器的 ip 地址;C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。如果是伪集群的配置方式,由于 B 都是一样,所以不同的 Zookeeper 实例通信端口号不能一样,所以要给它们分配不同的端口号。
server.1=10.78.180.19:3888:4888
server.2=10.78.180.20:3888:4888
server.3=10.78.180.21:3888:4888
server.4=10.78.180.22:3888:4888
server.5=10.78.174.69:3888:4888

# 日志及快照清理频率,单位是小时,默认是0,表示不开启自己清理功能。
autopurge.purgeInterval=6
# 和上面的参数搭配使用,指定需要保留的文件数目,默认是保留3个。
autopurge.snapRetainCount=5

创建 myid 文件

dataDir 目录下创建 myid 文 件,10.78.180.19 机器的内容为 1,10.78.180.20 机器的内容为 2,10.78.180.21 机器的内容为 3,若有更多依此类推。

$ echo '1' > /data/zookeeper/myid

启动 Zookeeper

nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper.log 2>&1 &

安装 Kafka

修改启动脚本

修改程序内存,添加 JMX 监控接口:

$ vi ./bin/kafka-server-start.sh

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G"
    export JMX_PORT=8999
    export "KAFKA_JMX_OPTS=-Dcom.sun.management.jmxremote=true
     -Dcom.sun.management.jmxremote.authenticate=false
     -Dcom.sun.management.jmxremote.ssl=false
     -Djava.rmi.server.hostname=10.78.180.19
     -Djava.net.preferIPv4Stack=true"
fi

注意:-Djava.rmi.server.hostname=10.78.180.19 一定要填写,否则 kafka-manager 会去连接此 kafka brokerhostname -i 地址。

修改配置文件

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
# 每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况。
broker.id=1

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = security_protocol://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
# 监听列表(以逗号分隔 不同的协议(如plaintext,trace,ssl、不同的IP和端口)),hostname如果设置为0.0.0.0则绑定所有的网卡地址;如果hostname为空则绑定默认的网卡。如果没有配置则默认为java.net.InetAddress.getCanonicalHostName()
listeners=PLAINTEXT://10.78.180.19:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# The number of threads handling network requests
# 服务器用来处理网络请求的网络线程数目;一般你不需要更改这个属性
num.network.threads=3

# The number of threads doing disk I/O
# 服务器用来处理请求的I/O线程的数目;这个线程数目至少要等于硬盘的个数。
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
# log.dirs 指的是kafka的log Data保存的目录,默认为Null。如果不指定log Data会保存到log.dir设置的目录中,log.dir默认为/tmp/kafka-logs。需要保证启动KafKaServer的用户对log.dirs或log.dir设置的目录具有读与写的权限。
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
# 每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖
num.partitions=20

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion
# 消息保存时间
log.retention.hours=1

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
# topic每个分区的最大文件大小 超过则删除
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
# topic partition的日志存放在某个目录下诸多文件中,这些文件将partition的日志切分成一段一段的;这个属性就是每个文件的最大尺寸;当尺寸达到这个数值时,就会创建新文件。此设置可以由每个topic基础设置时进行覆盖
log.segment.bytes=536870912

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

# 是否启动压缩日志,当这个属性设置为false时,一旦日志的保存时间或者大小达到上限时,就会被删除;如果设置为true,则当保存属性达到上限时,就会进行压缩
log.cleaner.enable=false

# 是否启动删除topic。如果设置为false,你在删除topic的时候无法删除,但是会打上一个你将删除该topic的标记,等到你修改这一属性的值为true后重新启动Kafka集群的时候,集群自动将那些标记删除的topic删除掉,对应的log.dirs目录下的topic目录和数据也会被删除。而将这一属性设置为true之后,你就能成功删除你想要删除的topic了
delete.topic.enable=true

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
# zookeeper.connect 指的是zookeeper集群的地址,可以是多个,多个之间用逗号分割hostname1:port1,hostname2:port2,hostname3:port3
zookeeper.connect=10.78.180.19:2181,10.78.180.20:2181,10.78.180.21:2181,10.78.180.22:2181,10.78.174.69:2181

# Timeout in ms for connecting to zookeeper
# 客户端在建立通zookeeper连接中的最大等待时间
zookeeper.connection.timeout.ms=600000

Kafka 启动

nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &

Kafka Manager 安装

下载编译

编译的时候必须安装 JDK8,注意是 JDK(血淋淋的例子啊)。

$ wget https://github.com/yahoo/kafka-manager.git

# 编译需要翻墙,我是使用sock5代理的,或者使用 ssh -D 转发隧道。

$ cd kafka-manager
$ ./sbt clean dist -DsocksProxyHost=10.211.55.2 -DsocksProxyPort=1080

# 然后耐心等待,大概一个小时左右。
# 这个就是编译好的文件,然后就可以全世界运行了。

$ ls kafka-manager/target/universal/
kafka-manager-1.3.2.1.zip

程序配置

Kafka-Manager 依赖 Zookeeper

$ unzip kafka-manager-1.3.2.1.zip
cd kafka-manager-1.3.2.1
vi conf/application.conf

# zookeeper 集群
kafka-manager.zkhosts="10.78.180.19:2181,10.78.180.20:2181,10.78.180.21:2181,10.78.180.22:2181,10.78.174.69:2181"
# 监听端口号
http.port=8000

日志配置

不让日志打印到文件,输出至标准输出上。

$ vi conf/logger.xml

<configuration>

  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
    <encoder>
      <pattern>%date - [%level] - from %logger in %thread %n%message%n%xException%n</pattern>
    </encoder>
  </appender>

  <logger name="play" level="INFO" />
  <logger name="application" level="DEBUG" />

  <!-- Off these ones as they are annoying, and anyway we manage configuration ourself -->
  <logger name="com.avaje.ebean.config.PropertyMapLoader" level="OFF" />
  <logger name="com.avaje.ebeaninternal.server.core.XmlConfigLoader" level="OFF" />
  <logger name="com.avaje.ebeaninternal.server.lib.BackgroundThread" level="OFF" />
  <logger name="com.gargoylesoftware.htmlunit.javascript" level="OFF" />
  <logger name="org.apache.zookeeper" level="INFO"/>

  <logger name="akka" level="INFO" />
  <logger name="kafka" level="INFO" />

  <root level="INFO">
    <appender-ref ref="STDOUT" />
  </root>

</configuration>

Kafka Manager 启动

nohup ./bin/kafka-manager -Dconfig.file=./conf/application.conf -Dlogger.resource=./conf/logger.xml >./kafka-manager.log 2>&1 &

Web 配置

浏览器打开 Kafka Manager 地址,新建立 Cluster,填写 Kafka 集群的 Zookeeper 地址以及 Kafka 版本(没有 0.10.0.1,填写 0.9.0.1 即可),打开 Enable JMX Polling 选项。

comments powered by Disqus