irpas技术客

大数据处理技术-头歌平台-答案_从化北

未知 599

文章目录 写在最前HBase的安装与简单操作第一关:单机版安装第三关 HBase 伪分布式环境搭建第一关:伪分布式环境搭建 ZooKeeper入门-初体验第一关 ZooKeeper初体验第2关:ZooKeeper配置第3关:Client连接及状态 ZooKeeper之分布式环境搭建第1关:仲裁模式与伪分布式环境搭建第2关:伪分布式体验及分布式安装配置 Flume入门第1关:Flume 简介第2关:采集目录下所有新文件到Hdfs Flume进阶第1关:拦截器的使用第2关:自定义拦截器 分布式 Kafka 安装第1关:分布式 Kafka 安装 kafka-入门篇第1关:kafka - 初体验第2关:生产者 (Producer ) - 简单模式第3关:消费者( Consumer)- 自动提交偏移量第4关消费者( Consumer )- 手动提交偏移量 Spark Standalone 模式的安装和部署第1关: Standalone 分布式集群搭建

写在最前

这里是大数据处理技术的实训作业 ,学校使用的是“头歌”平台。(我已经不想吐槽了) 开始的几章很简单,所以没有写 其中有几章题目,仅仅需要ctrl+c ctrl+v即可,只是操作步骤麻烦一下,所以也没有写。

HBase的安装与简单操作 第一关:单机版安装 mkdir /app cd /opt tar -zxvf hbase-2.1.1-bin.tar.gz -C /app vim /app/hbase-2.1.1/conf/hbase-env.sh # 在末尾添加 export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_111 vim /app/hbase-2.1.1/conf/hbase-site.xml

替换原有的configuration标签

<configuration> <property> <name>hbase.rootdir</name> <value>file:///root/data/hbase/data</value> </property> <property> <name>hbase.zookeeper.property.dataDir</name> <value>/root/data/hbase/zookeeper</value> </property> <property> <name>hbase.unsafe.stream.capability.enforce</name> <value>false</value> </property> </configuration> vim /etc/profile # 在末尾追加如下内容 #SET HBASE_enviroment HBASE_HOME=/app/hbase-2.1.1 export PATH=$PATH:$HBASE_HOME/bin source /etc/profile 第三关 put 'mytable','row1','data:1','zhangsan' put 'mytable','row2','data:2','zhangsanfeng' put 'mytable','row3','data:3','zhangwuji' HBase 伪分布式环境搭建 第一关:伪分布式环境搭建

先按照 《HBase的安装与简单第一关配置好单机》,傻子平台。

vim /app/hbase-2.1.1/conf/hbase-site.xml <!-- 替换configuration整体 --> <configuration> <property> <name>hbase.rootdir</name> <value>hdfs://localhost:9000/hbase</value> </property> <property> <name>hbase.zookeeper.property.dataDir</name> <value>/root/data/hbase/zookeeper</value> </property> <property> <name>hbase.unsafe.stream.capability.enforce</name> <value>true</value> </property> <property> <name>hbase.cluster.distributed</name> <value>true</value> </property> </configuration> # 启动hadoop和hbase start-all.sh start-hbase.sh # 查看进程 jps # 在hdfs中验证 hadoop fs -ls /hbase ZooKeeper入门-初体验 第一关 ZooKeeper初体验 tar -zxvf zookeepre-3.4.12.tar.gz /opt/zookeeper-3.4.12 cd /opt/zookeeper-3.4.12/conf mv zoo_sample.cfg zoo.cfg zkServer.sh start # zkServer.sh stop 第2关:ZooKeeper配置 vim /opt/zookeeper-3.4.12/conf/zoo.cfg 把 “# maxClientCnxns=60 ” 改为 maxClientCnxns=100 第3关:Client连接及状态 zkServer.sh stop vim /opt/zookeeper-3.4.12/conf/zoo.cfg <!-- 修改为2182 --> clientPort=2182 <!-- 添加preAllocSize=300 --> preAllocSize=300 vim /opt/zookeeper-3.4.12/bin/zkEnv.sh <!-- 修改第56行为 --> ZOO_LOG_DIR="/opt/zookeeper-3.4.12" zkServer.sh start zkCli.sh -server 127.0.0.1:2182 ZooKeeper之分布式环境搭建 第1关:仲裁模式与伪分布式环境搭建 vim /opt/zookeeper-3.4.12/conf/zoo.cfg

修改默认。 修改zoo.cfg 这节有个智障操作,这里不吐槽了。按着步骤走吧。

<!-- zookeeper-3.4.12的zoo.cfg --> <!-- 修改 --> clientPort=2181 dataDir=/opt/zookeeper-3.4.12/tmp/data <!-- 末尾追加 --> server.1=127.0.0.1:2888:3888 server.2=127.0.0.1:2889:3889 server.3=127.0.0.1:2890:3890

第一个节点添加myid文件

mkdir -p /opt/zookeeper-3.4.12/tmp/data/ echo 1 > /opt/zookeeper-3.4.12/tmp/data/myid cat /opt/zookeeper-3.4.12/tmp/data/myid

复制三个新节点出来

# 智障系统。您搁着我斗志斗勇呢呀 cp -r /opt/zookeeper-3.4.12/ /opt/zookeeper-3.4.12-01 cp -r /opt/zookeeper-3.4.12/ /opt/zookeeper-3.4.12-02 cp -r /opt/zookeeper-3.4.12/ /opt/zookeeper-3.4.12-03

第一个节点 修改zoo.cfg

vim /opt/zookeeper-3.4.12-01/conf/zoo.cfg <!-- zookeeper-3.4.12-01的zoo.cfg --> <!-- 仅修改这个就行 --> dataDir=/opt/zookeeper-3.4.12-01/tmp/data

第二个节点 修改zoo.cfg

vim /opt/zookeeper-3.4.12-02/conf/zoo.cfg <!-- zookeeper-3.4.12-02的zoo.cfg --> <!-- 修改 --> clientPort=2182 dataDir=/opt/zookeeper-3.4.12-02/tmp/data

第二个节点添加myid文件

echo 2 > /opt/zookeeper-3.4.12-02/tmp/data/myid cat /opt/zookeeper-3.4.12-02/tmp/data/myid

第三个节点 修改zoo.cfg

vim /opt/zookeeper-3.4.12-03/conf/zoo.cfg <!-- zookeeper-3.4.12-03的zoo.cfg --> <!-- 修改 --> clientPort=2183 dataDir=/opt/zookeeper-3.4.12-03/tmp/data

第三个节点添加myid文件

echo 3 > /opt/zookeeper-3.4.12-03/tmp/data/myid cat /opt/zookeeper-3.4.12-03/tmp/data/myid # 分别三个启动节点 /opt/zookeeper-3.4.12-01/bin/zkServer.sh start /opt/zookeeper-3.4.12-02/bin/zkServer.sh start /opt/zookeeper-3.4.12-03/bin/zkServer.sh start 第2关:伪分布式体验及分布式安装配置

智障平台,我重置了一次命令行,重新做了一遍才行。

zkCli.sh -server 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 create /quorum_test "quorum_test" quit Flume入门 第1关:Flume 简介

第一题 Source Channel Sink 第二题 名称 类型 属性集 第三题 可靠性 可恢复性

第2关:采集目录下所有新文件到Hdfs start-dfs.sh hadoop dfs -mkdir /flume

我不得不吐槽一下这个平台。 你说你资源不够你做什么平台嘛。 也是,我理解,随时启动一个hadoop确实很耗费资源,但你不能在启动脚本中再启动一次hadoop吗? 你在这跟我捉迷藏呢?真就担心我找到你哈?

a1.sources = source1 a1.sinks = sink1 a1.channels = channel1 # 配置source组件 a1.sources.source1.type = spooldir a1.sources.source1.spoolDir = /opt/flume/data ##定义文件上传完后的后缀,默认是.COMPLETED a1.sources.source1.fileSuffix=.FINISHED ##默认是2048,如果文件行数据量超过2048字节(1k),会被截断,导致数据丢失 a1.sources.source1.deserializer.maxLineLength=5120 # 配置sink组件 a1.sinks.sink1.type = hdfs a1.sinks.sink1.hdfs.path =hdfs://localhost:9000/flume #上传文件的前缀 a1.sinks.sink1.hdfs.filePrefix = flume #上传文件的后缀 a1.sinks.sink1.hdfs.fileSuffix = .log #积攒多少个Event才flush到HDFS一次 a1.sinks.sink1.hdfs.batchSize= 100 a1.sinks.sink1.hdfs.fileType = DataStream a1.sinks.sink1.hdfs.writeFormat =Text ## roll:滚动切换:控制写文件的切换规则 ## 按文件体积(字节)来切 a1.sinks.sink1.hdfs.rollSize = 512000 ## 按event条数切 a1.sinks.sink1.hdfs.rollCount = 1000000 ## 按时间间隔切换文件,多久生成一个新的文件 a1.sinks.sink1.hdfs.rollInterval = 4 ## 控制生成目录的规则 a1.sinks.sink1.hdfs.round = true ##多少时间单位创建一个新的文件夹 a1.sinks.sink1.hdfs.roundValue = 10 a1.sinks.sink1.hdfs.roundUnit = minute #是否使用本地时间戳 a1.sinks.sink1.hdfs.useLocalTimeStamp = true # channel组件配置 a1.channels.channel1.type = memory ## event条数 a1.channels.channel1.capacity = 500000 ##flume事务控制所需要的缓存容量600条event a1.channels.channel1.transactionCapacity = 600 # 绑定source、channel和sink之间的连接 a1.sources.source1.channels = channel1 a1.sinks.sink1.channel = channel1 Flume进阶 第1关:拦截器的使用 start-dfs.sh hadoop dfs -mkdir /flume # Define source, channel, sink #agent名称为a1 # Define source #source类型配置为avro,监听8888端口,后台会自动发送数据到该端口 #拦截后台发送过来的数据,将y.开头的保留下来 # Define channel #channel配置为memery # Define sink #落地到 hdfs://localhost:9000/flume目录下 #根据时间落地,3s #数据格式DataStream a1.sources = source1 a1.sinks = sink1 a1.channels = channel1 # 配置source组件 a1.sources.source1.type = avro a1.sources.source1.bind = 127.0.0.1 a1.sources.source1.port = 8888 ##定义文件上传完后的后缀,默认是.COMPLETED a1.sources.source1.fileSuffix=.FINISHED ##默认是2048,如果文件行数据量超过2048字节(1k),会被截断,导致数据丢失 a1.sources.source1.deserializer.maxLineLength=5120 #正则过滤拦截器 a1.sources.source1.interceptors = i1 a1.sources.source1.interceptors.i1.type = regex_filter a1.sources.source1.interceptors.i1.regex = ^y.* #如果excludeEvents设为false,表示过滤掉不是以A开头的events。 #如果excludeEvents设为true,则表示过滤掉以A开头的events。 a1.sources.source1.interceptors.i1.excludeEvents = false # 配置sink组件 a1.sinks.sink1.type = hdfs a1.sinks.sink1.hdfs.path =hdfs://localhost:9000/flume #上传文件的前缀 a1.sinks.sink1.hdfs.filePrefix = FlumeData. #上传文件的后缀 a1.sinks.sink1.hdfs.fileSuffix = .log #积攒多少个Event才flush到HDFS一次 a1.sinks.sink1.hdfs.batchSize= 100 a1.sinks.sink1.hdfs.fileType = DataStream a1.sinks.sink1.hdfs.writeFormat =Text ## roll:滚动切换:控制写文件的切换规则 ## 按文件体积(字节)来切 a1.sinks.sink1.hdfs.rollSize = 512000 ## 按event条数切 a1.sinks.sink1.hdfs.rollCount = 1000000 ## 按时间间隔切换文件,多久生成一个新的文件 a1.sinks.sink1.hdfs.rollInterval = 4 ## 控制生成目录的规则 a1.sinks.sink1.hdfs.round = true ##多少时间单位创建一个新的文件夹 a1.sinks.sink1.hdfs.roundValue = 10 a1.sinks.sink1.hdfs.roundUnit = minute #是否使用本地时间戳 a1.sinks.sink1.hdfs.useLocalTimeStamp = true # channel组件配置 a1.channels.channel1.type = memory ## event条数 a1.channels.channel1.capacity = 500000 ##flume事务控制所需要的缓存容量600条event a1.channels.channel1.transactionCapacity = 600 # 绑定source、channel和sink之间的连接 a1.sources.source1.channels = channel1 a1.sinks.sink1.channel = channel1 第2关:自定义拦截器

参考链接 conf 配置文件

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://·mons.lang.StringUtils; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer; import org.apache.flume.interceptor.RegexExtractorInterceptorSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Charsets; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Lists; public class RegexExtractorExtInterceptor implements Interceptor { static final String REGEX = "regex"; static final String SERIALIZERS = "serializers"; // 增加代码开始 static final String EXTRACTOR_HEADER = "extractorHeader"; static final boolean DEFAULT_EXTRACTOR_HEADER = false; static final String EXTRACTOR_HEADER_KEY = "extractorHeaderKey"; // 增加代码结束 private static final Logger logger = LoggerFactory .getLogger(RegexExtractorExtInterceptor.class); private final Pattern regex; private final List<NameAndSerializer> serializers; // 增加代码开始 private final boolean extractorHeader; private final String extractorHeaderKey; // 增加代码结束 private RegexExtractorExtInterceptor(Pattern regex, List<NameAndSerializer> serializers, boolean extractorHeader, String extractorHeaderKey) { this.regex = regex; this.serializers = serializers; this.extractorHeader = extractorHeader; this.extractorHeaderKey = extractorHeaderKey; } @Override public void initialize() { // NO-OP... } @Override public void close() { // NO-OP... } @Override public Event intercept(Event event) { String tmpStr; if(extractorHeader) { tmpStr = event.getHeaders().get(extractorHeaderKey); } else { tmpStr=new String(event.getBody(), Charsets.UTF_8); } Matcher matcher = regex.matcher(tmpStr); Map<String, String> headers = event.getHeaders(); if (matcher.find()) { for (int group = 0, count = matcher.groupCount(); group < count; group++) { int groupIndex = group + 1; if (groupIndex > serializers.size()) { if (logger.isDebugEnabled()) { logger.debug( "Skipping group {} to {} due to missing serializer", group, count); } break; } NameAndSerializer serializer = serializers.get(group); if (logger.isDebugEnabled()) { logger.debug("Serializing {} using {}", serializer.headerName, serializer.serializer); } headers.put(serializer.headerName, serializer.serializer .serialize(matcher.group(groupIndex))); } } return event; } @Override public List<Event> intercept(List<Event> events) { List<Event> intercepted = Lists.newArrayListWithCapacity(events.size()); for (Event event : events) { Event interceptedEvent = intercept(event); if (interceptedEvent != null) { intercepted.add(interceptedEvent); } } return intercepted; } public static class Builder implements Interceptor.Builder { private Pattern regex; private List<NameAndSerializer> serializerList; // 增加代码开始 private boolean extractorHeader; private String extractorHeaderKey; // 增加代码结束 private final RegexExtractorInterceptorSerializer defaultSerializer = new RegexExtractorInterceptorPassThroughSerializer(); @Override public void configure(Context context) { String regexString = context.getString(REGEX); Preconditions.checkArgument(!StringUtils.isEmpty(regexString), "Must supply a valid regex string"); regex = Pattern.compile(regexString); regex.pattern(); regex.matcher("").groupCount(); configureSerializers(context); // 增加代码开始 extractorHeader = context.getBoolean(EXTRACTOR_HEADER, DEFAULT_EXTRACTOR_HEADER); if (extractorHeader) { extractorHeaderKey = context.getString(EXTRACTOR_HEADER_KEY); Preconditions.checkArgument( !StringUtils.isEmpty(extractorHeaderKey), "必须指定要抽取内容的header key"); } // 增加代码结束 } private void configureSerializers(Context context) { String serializerListStr = context.getString(SERIALIZERS); Preconditions.checkArgument( !StringUtils.isEmpty(serializerListStr), "Must supply at least one name and serializer"); String[] serializerNames = serializerListStr.split("\\s+"); Context serializerContexts = new Context( context.getSubProperties(SERIALIZERS + ".")); serializerList = Lists .newArrayListWithCapacity(serializerNames.length); for (String serializerName : serializerNames) { Context serializerContext = new Context( serializerContexts.getSubProperties(serializerName + ".")); String type = serializerContext.getString("type", "DEFAULT"); String name = serializerContext.getString("name"); Preconditions.checkArgument(!StringUtils.isEmpty(name), "Supplied name cannot be empty."); if ("DEFAULT".equals(type)) { serializerList.add(new NameAndSerializer(name, defaultSerializer)); } else { serializerList.add(new NameAndSerializer(name, getCustomSerializer(type, serializerContext))); } } } private RegexExtractorInterceptorSerializer getCustomSerializer( String clazzName, Context context) { try { RegexExtractorInterceptorSerializer serializer = (RegexExtractorInterceptorSerializer) Class .forName(clazzName).newInstance(); serializer.configure(context); return serializer; } catch (Exception e) { logger.error("Could not instantiate event serializer.", e); Throwables.propagate(e); } return defaultSerializer; } @Override public Interceptor build() { Preconditions.checkArgument(regex != null, "Regex pattern was misconfigured"); Preconditions.checkArgument(serializerList.size() > 0, "Must supply a valid group match id list"); return new RegexExtractorExtInterceptor(regex, serializerList, extractorHeader, extractorHeaderKey); } } static class NameAndSerializer { private final String headerName; private final RegexExtractorInterceptorSerializer serializer; public NameAndSerializer(String headerName, RegexExtractorInterceptorSerializer serializer) { this.headerName = headerName; this.serializer = serializer; } } } 分布式 Kafka 安装 第1关:分布式 Kafka 安装

这关平台左侧给的示例中。有一条使用了中文的逗号。要自己改成英文的。这点注意??

这里原本评判脚本有问题。 向工程师提交后,对方修改。 而后按照顺序走即可

kafka-入门篇 第1关:kafka - 初体验 #1.创建一个副本数量为1、分区数量为3、名为 demo 的 Topic /opt/kafka_2.11-1.1.0/bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 3 --topic demo #2.查看所有Topic /opt/kafka_2.11-1.1.0/bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181 #3.查看名为demo的Topic的详情信息 /opt/kafka_2.11-1.1.0/bin/kafka-topics.sh --topic demo --describe --zookeeper 127.0.0.1:2181 第2关:生产者 (Producer ) - 简单模式

有时候会报scala的错误。**系统。 多试几次

package net.educoder; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * kafka producer 简单模式 */ public class App { public static void main(String[] args) { /** * 1.创建配置文件对象,一般采用 props */ /**----------------begin-----------------------*/ Properties props = new Properties(); /**-----------------end-------------------------*/ /** * 2.设置kafka的一些参数 * bootstrap.servers --> kafka的连接地址 kafka-01:9092,kafka-02:9092,kafka-03:9092 * key、value的序列化类 -->org.apache.kafka.common.serialization.StringSerializer * acks:1,-1,0 */ /**-----------------begin-----------------------*/ props.put("bootstrap.servers", "127.0.0.1:9092"); // props.put("bootstrap.servers", "kafka-01:9092,kafka-02:9092,kafka-03:9092"); // props.put("bootstrap.servers","127.0.0.1:2181") props.put("acks", "1"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("retries", 0); // 一批消息的处理大小 props.put("batch.size", 16384); // 请求的延迟 props.put("linger.ms", 1); // 发送缓冲区内存大小 props.put("buffer.size", 33554432); // // key 序列化 // props.put("key.serializer", "org.apache.kafka.common.serilization.StringSerilizer"); // // value 序列化 // props.put("value.serializer", "org.apache.kafka.common.serilization.StringSerilizer"); // KafkaProducer 有多个构造方法,可以用Map来进行社会参数,也可在构造方法中进行设置序列化 /**-----------------end-------------------------*/ /** * 3.构建kafkaProducer对象 */ /**-----------------begin-----------------------*/ // Producer<String, String> producer = new KafkaProducer<>(props); KafkaProducer producer = new KafkaProducer<String, String>(props); /**-----------------end-------------------------*/ for (int i = 0; i < 2; i++) { ProducerRecord<String, String> record = new ProducerRecord<>("demo", ""+i); /** * 4.发送消息 */ /**-----------------begin-----------------------*/ producer.send(record); /**-----------------end-------------------------*/ } producer.close(); } } 第3关:消费者( Consumer)- 自动提交偏移量

有时候会报scala的错误。**系统。 多试几次

然后也要吐槽一下示例的代码, 少个" 是什么鬼。 而且也没有缺少提示, 真应该抓他们过来,让他们一个个给我找!

package net.educoder; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class App { public static void main(String[] args) { Properties props = new Properties(); /**--------------begin----------------*/ //设置kafka集群的地址 props.put("bootstrap.servers", "127.0.0.1:9092"); //设置消费者组,组名字自定义,组名字相同的消费者在一个组 props.put("group.id", "g1"); //开启offset自动提交 props.put("enable.auto.commit", "true"); //自动提交时间间隔 props.put("auto.commit.interval.ms", "1000"); //序列化器 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); /**---------------end---------------*/ /**--------------begin----------------*/ //6.创建kafka消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //7.订阅kafka的topic consumer.subscribe(Arrays.asList("demo")); /**---------------end---------------*/ int i = 1; while (true) { /**----------------------begin--------------------------------*/ //8.poll消息数据,返回的变量为crs ConsumerRecords<String, String> crs = consumer.poll(100); for (ConsumerRecord<String, String> cr : crs) { System.out.println("consume data:" + i); i++; } /**----------------------end--------------------------------*/ if (i > 10) { return; } } } } 第4关消费者( Consumer )- 手动提交偏移量 package net.educoder; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; public class App { public static void main(String[] args){ Properties props = new Properties(); /**-----------------begin------------------------*/ //1.设置kafka集群的地址 props.put("bootstrap.servers", "127.0.0.1:9092"); //设置消费者组,组名字自定义,组名字相同的消费者在一个组 props.put("group.id", "g1"); //3.关闭offset自动提交 props.put("enable.auto.commit", "false"); props.put("max.poll.records", 10); //4.序列化器 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); /**-----------------end------------------------*/ /**-----------------begin------------------------*/ //5.实例化一个消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //6.消费者订阅主题,订阅名为demo的主题 consumer.subscribe(Arrays.asList("demo")); /**-----------------end------------------------*/ final int minBatchSize = 10; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { for (ConsumerRecord bf : buffer) { System.out.printf("offset = %d, key = %s, value = %s%n", bf.offset(), bf.key(), bf.value()); } /**-----------------begin------------------------*/ //7.手动提交偏移量 consumer.commitSync(); /**-----------------end------------------------*/ buffer.clear(); return; } } } } Spark Standalone 模式的安装和部署 第1关: Standalone 分布式集群搭建

吐槽: 这一关的任务要求部分给的一点都不好。 一点不人性化, 其他的不吐槽了。 “小白”要在这个平台做这道题,恶心不死你。


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #大数据处理技术头歌平台答案