Java知识分享网 - 轻松学习从此开始!    

Java知识分享网

Java1234官方群25:java1234官方群17
Java1234官方群25:838462530
        
SpringBoot+SpringSecurity+Vue+ElementPlus权限系统实战课程 震撼发布        

最新Java全栈就业实战课程(免费)

springcloud分布式电商秒杀实战课程

IDEA永久激活

66套java实战课程无套路领取

锋哥开始收Java学员啦!

Python学习路线图

锋哥开始收Java学员啦!
当前位置: 主页 > Java文档 > 大数据云计算 >

kafka使用简介 PDF 下载


分享到:
时间:2020-06-19 17:01来源:http://www.java1234.com 作者:小锋  侵权举报
kafka使用简介 PDF 下载
失效链接处理
kafka使用简介 PDF 下载

本站整理下载:
 
相关截图:
 
主要内容:

1.Kafka简介
(1)概念
 
(2)Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。
Broker:Kafka集群包含一个或多个服务器,这种服务器被称为broker。
Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
Partition:物理上的概念,每个Topic包含一个或多个Partition.
Producer:负责发布消息到Kafka broker
Consumer:消息消费者,向Kafka broker读取消息的客户端。
Consumer Group:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
 
(3)
 
2.环境准备
(1)环境信息
主机名 操作系统版本 IP地址 安装软件
Broker01 CentOS 7.4 172.16.96.238 JDK1.8、ZK、kafka_2.11-2.0.0.tgz    
Broker02 CentOS 7.4 172.16.96.237 JDK1.8、ZK、kafka_2.11-2.0.0.tgz
Broker03 CentOS 7.4 172.16.96.236 JDK1.8、ZK、kafka_2.11-2.0.0.tgz
 
 
 
 
 
 
 
(2)安装JAVA
(3)安装Zookeeper
1)下载zk:wget http://archive.apache.org/dist/zookeeper/stable/zookeeper-3.4.12.tar.gz
2)解压: tar -zxvf zookeeper-3.4.12.tar.gz
3)配置:cp zoo_sample.cfg zoo.cfg,修改zoo.cfg
#配置数据和日志目录:
dataDir=/bigdata/zookeeper/data
dataLogDir=/bigdata/zookeeper/dataLog
# the port at which the clients will connect
clientPort=2181
 
#配置集群主机和端口
server.1=Broker01:2888:3888
server.2=Broker02:2888:3888
server.3=Broker03:2888:3888
 
4)在data目录下面创建myid,依次填写1,2,3
5)拷贝zk目录到其他机器:Scp -r zookeeperuser@host:/bigdata/zookeeper
注意:需要修改myid不同编号。
6)启动服务:
./zkServer.sh start
./zkServer.sh stop
./zkServer.sh restart
./zkServer.sh status
 
3.Kafka安装
(1)下载kafka:wget http://archive.apache.org/dist/kafka/2.0.0/kafka_2.11-2.0.0.tgz
> tar -xzvf kafka_2.11-2.0.0.tgz
> cd kafka_2.11-2.0.0
(2)配置文件:vim config/server.properties
#配置broker ID,必须每个kafka不同
broker.id=1
#配置zk地址
zookeeper.connect=172.16.96.236:2181,172.16.96.237:2181,172.16.96.238:2181
#log数据存储目录,需要提前创建好
log.dirs=/bigdata/kafkaData
#delete topics,默认为false,表示标记删除
#delete.topic.enable=true
(3)启动集群:
# JMX_PORT=9997 bin/kafka-server-start.sh -daemon config/server.properties &
(说明:
1)启动自带的zk,独立zk可以不启动:
$ bin/zookeeper-server-start.sh config/zookeeper.properties
2)启动kafka服务:bin/kafka-server-start.sh config/server.properties &)
(4)创建主题:
$bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
副本3方式:
$bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
(5)查看主题:bin/kafka-topics.sh --list --zookeeper localhost:2181
(6)发送消息:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
(7)启动消费端:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
(8)查看topic状态:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
(9)停止服务:bin/kafka-server-stop.sh
(10)删除topic
删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关topic目录
如果配置了delete.topic.enable=true直接通过命令删除,如果命令删除不掉,直接通过zookeeper-client 删除掉"/brokers/topics/"目录下相关topic节点。
注意: 如果你要删除一个topic并且重建,那么必须重新启动kafka,否则新建的topic在zookeeper的/brokers/topics/test-topic/目录下没有partitions这个目录,也就是没有分区信息。
4.Kafka配置
 
5.JAVA开发
(1)消息生产者
 /**
     * 1)bootstrap.servers --设置生产者需要连接的kafka地址 2)acks --回令类型 3)retries --重试次数
     * 4)batch.size --批量提交大小 5)linger.ms --提交延迟等待时间(等待时间内可以追加提交) 6)buffer.memory
     * --缓存大小 7)key.serializer|value.serializer --序列化方法 需要注意的有两点:
     * 1、acks回令。如果必须等待回令,那么设置acks为all;否则,设置为-1;等待回令会有性能损耗。
     * 2、生产者在发送消息的过程中,会自己默认批量提交。所以,如果单条指令的发送请求,记得发送完后flush才能生效。
     *
     * @param events
     */
    public void producer_test1(int events) {
        Random rnd = new Random();
 
        //    /opt/kafka_2.12-1.1.0/bin/kafka-console-producer.sh --broker-list 192.178.0.111:9092 --sync --topic kafkatopic
        Properties props = new Properties();
        props.put("bootstrap.servers", "172.16.96.236:9092,172.16.96.237:9092,172.16.96.238:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 配置partitionner选择策略,可选配置
        props.put("partitioner.class", "com.Broker.iemp.bigdata.service.SimplePartitioner2");
 
        try (Producer<String, String> producer = new KafkaProducer<>(props)) {
            for (long nEvents = 0; nEvents < events; nEvents++) {
                long runtime = new Date().getTime();
                String ip = "192.178.0." + rnd.nextInt(255);
                String msg = runtime + ",www.example.com," + ip;
                ProducerRecord<String, String> data = new ProducerRecord<>("kafakatopic", ip, msg);
                Future<RecordMetadata> send = producer.send(data, (RecordMetadata metadata, Exception e) -> {
                    if (e != null) {
                        e.printStackTrace();
                    } else {
                        System.out.println("The offset of the record we just sent is: " + metadata.offset());
                    }
                });
            }
        }
    }
 
    /**
     * 普通发送。
     */
    public void producer_test2() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "172.16.96.236:9092,172.16.96.237:9092,172.16.96.238:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
        try (Producer<String, String> producer = new KafkaProducer<>(props)) {
            for (int i = 0; i < 10; i++) {
                producer.send(new ProducerRecord<>("kafakatopic", Integer.toString(i), Integer.toString(i)));
            }
        }
}
 
/**
 *
 * @author Broker050286
 */
public class SimplePartitioner2 implements Partitioner {
     @Override
     public void configure(Map<String, ?> map) {
     }
 
     @Override
     public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
         int partition = 0;
         List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
         int numPartitions = partitions.size();
         String stringKey = (String) key;
         int offset = stringKey.lastIndexOf('.');
         if (offset > 0) {
             partition = Integer.parseInt(stringKey.substring(offset + 1)) % numPartitions;
         }
 
         return partition;
     }
 
     @Override
     public void close() {
     }
 }
(2)消息消费端
public static void consume() {
        Properties props = new Properties();
 
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.16.96.236:9092,172.16.96.237:9092,172.16.96.238:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("kafakatopic"));
 
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }

 

------分隔线----------------------------

锋哥公众号


锋哥微信


关注公众号
【Java资料站】
回复 666
获取 
66套java
从菜鸡到大神
项目实战课程

锋哥推荐