地图
地图

Apache Kafka安装

1 验证Java安装

java -version

  1. 下载JDK

如果没有下载Java,请通过访问以下链接并下载最新版本来下载最新版本的JDK。

https://www.oracle.com/technetwork/java/javase/downloads/index.html


  1. 提取文件

cd /usr/local/java

tar -zxvf


  1. 设置路径 ~/.bashrc
1
2
export JAVA_HOME =/XXX
export PATH=$PATH:$JAVA_HOME/bin

source ~/.bashrc


  1. Java替代(可选)

update-alternatives --install /usr/bin/java java XXX/bin/java 100


  1. 验证java

java -version


  1. 离线rpm包安装

rpm -ivh *.rpm


2 ZooKeeper安装

  1. 下载

http://zookeeper.apache.org/releases.html#download


  1. 提取tar文件
1
2
3
cd /opt
tar -zxvf zookeeper-3.4.14.tar.gz
cd zookeeper-3.4.14


  1. 创建配置文件
1
2
3
cd /opt/zookeeper-3.4.14/conf
cp zoo_sample.cfg zoo.cfg
chmod 664 zoo.cfg

配置zoo.cfg

1
2
3
4
5
6
7
8
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/var/logs/zookeeper

server.11=10.130.124.171:2888:3888
server.12=10.130.124.172:2888:3888
server.13=10.130.124.173:2888:3888


  1. 创建myid文件

echo 11 > /var/logs/zookeeper/myid


  1. 启动ZooKeeper

/opt/zookeeper-3.4.14/bin/zkServer.sh start


  1. 停止

/opt/zookeeper-3.4.14/bin/zkServer.sh stop


3 Apache Kafka安装

  1. 下载

https://kafka.apache.org/downloads


  1. 解压tar文件
1
2
3
cd /opt
$ tar -zxvf kafka_2.12-2.3.0.tgz
$ cd kafka_2.12-2.3.0

配置server.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39


# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://10.130.124.171:9092


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/home/admin/logs/kafka


# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=60


############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=10.130.124.171:2181,10.130.124.172:2181,10.130.124.173:2181


  1. 启动服务器

首先启动ZooKeeper,然后启动kafka,注意相对路径

/opt/zookeeper-3.4.14/bin/zkServer.sh start &
nohup /opt/kafka_2.12-2.3.0/bin/kafka-server-start.sh /opt/kafka_2.12-2.3.0/config/server.properties &

服务器启动后,您会在屏幕上看到以下响应:

1
2
3
4
5
6
7
8
9
$ bin/kafka-server-start.sh config/server.properties
[2016-01-02 15:37:30,410] INFO KafkaConfig values:
request.timeout.ms = 30000
log.roll.hours = 168
inter.broker.protocol.version = 0.9.0.X
log.preallocate = false
security.inter.broker.protocol = PLAINTEXT
…………………………………………….
…………………………………………….


  1. 停止

/opt/kafka_2.12-2.3.0/bin/kafka-server-stop.sh


4 Kafka 基本操作

  1. 启动ZooKeeper

  2. 启动Kafka

  3. 键入命令 jps

1
2
3
821 QuorumPeerMain
928 Kafka
931 Jps

现在你可以看到两个守护进程运行在终端上,QuorumPeerMain是ZooKeeper守护进程,另一个是Kafka守护进程。




  • 启动kafka
    nohup /opt/kafka_2.12-2.3.0/bin/kafka-server-start.sh /opt/kafka_2.12-2.3.0/config/server.properties &
  • 创建topic
    /opt/kafka_2.12-2.3.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic test0717
  • 查看topic列表
    /opt/kafka_2.12-2.3.0/bin/kafka-topics.sh --list --zookeeper localhost:2181
  • 创建生产者
    /opt/kafka_2.12-2.3.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test0717
  • 创建消费者
    旧命令 ./kafka-console-consumer.sh –zookeeper localhost:2181 –topic test0717 –from-beginning
    /opt/kafka_2.12-2.3.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test0717 --from-beginning

  • topic详情
    /opt/kafka_2.12-2.3.0/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test0717


5 logstash安装和配置

  1. logstash安装

    yum install logstash-6.2.3.rpm

    • 安装 x-pack
    1
    2
    3
    cd /usr/share/logstash/bin

    ./logstash-plugin install file:///root/x-pack-6.2.3.zip


  1. 启动

    • initctl start logstash
    • initctl stop logstash

    • nohup /usr/share/logstash/bin/logstash agent -f /etc/logstash/conf.d/logstash.conf &


  1. logstash配置

    vim /etc/logstash/conf.d/logstash.conf

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    # Sample Logstash configuration for creating a simple
    # kafka -> Logstash -> Elasticsearch pipeline.

    input {
    # beats {
    # port => 5044
    # }

    kafka{
    bootstrap_servers => "kafka_IP:9092,kafka_IP:9092,kafka_IP:9092"
    client_id => "devLog-kafka"
    auto_offset_reset => "latest"
    consumer_threads => 6
    topics => "test0717"
    codec => "json"
    }
    }

    output {
    elasticsearch {
    hosts => ["es_IP:9200","es_IP:9200"]
    index => "logstash-%{+YYYY.MM.dd}"
    #user => "elastic"
    #password => "changeme"
    }

    }

    vim logstash.yml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    # This defaults to the number of the host's CPU cores.
    #
    pipeline.workers: 2

    # log.level: info
    path.logs: /var/log/logstash

    # ------------ X-Pack Settings (not applicable for OSS build)--------------
    #
    # X-Pack Monitoring
    # https://www.elastic.co/guide/en/logstash/current/monitoring-logstash.html
    xpack.monitoring.enabled: true
    #xpack.monitoring.elasticsearch.username: logstash_system
    #xpack.monitoring.elasticsearch.password: password
    xpack.monitoring.elasticsearch.hosts: ["http://10.130.128.108:9200","http://10.130.128.107:9200"]


6 开机自启

  1. zookeeper

    cd /etc/init.d/

    vim zookeeper

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    #!/bin/bash

    #chkconfig:2345 20 90
    #description:zookeeper
    #processname:zookeeper
    case $1 in
    start)
    /opt/zookeeper-3.4.14/bin/zkServer.sh start
    ;;
    stop)
    /opt/zookeeper-3.4.14/bin/zkServer.sh stop
    ;;
    status)
    /opt/zookeeper-3.4.14/bin/zkServer.sh status
    ;;
    restart)
    /opt/zookeeper-3.4.14/bin/zkServer.sh restart
    ;;
    *)
    echo "require start|stop|status|restart"
    ;;
    esac
    • 权限
      chmod 755 zookeeper

    • 测试命令
      service zookeeper status

    • 添加到服务列表
      chkconfig --add zookeeper

    • 验证
      chkconfig --list

    • 设置为开机启动
      chkconfig zookeeper on


  1. kafka

    vim kafka

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    #!/bin/bash

    #chkconfig:2345 20 90
    #description:kafka
    #processname:kafka
    case $1 in
    start)
    /opt/kafka_2.12-2.3.0/bin/kafka-server-start.sh -daemon /opt/kafka_2.12-2.3.0/config/server.properties
    ;;
    stop)
    /opt/kafka_2.12-2.3.0/bin/kafka-server-stop.sh
    ;;
    status)
    jps
    ;;
    restart)
    /opt/kafka_2.12-2.3.0/bin/kafka-server-stop.sh
    /opt/kafka_2.12-2.3.0/bin/kafka-server-start.sh -daemon /opt/kafka_2.12-2.3.0/config/server.properties
    ;;
    *)
    echo "require start|stop|status|restart"
    ;;
    esac
    • 后续同上