irpas技术客

Spark操作Ceph_我不想名字重复

大大的周 8354

文章目录 前言Ceph集群安装ceph部分组件介绍主机和每台机器安装的组件和启动的服务集群安装步骤1.机器环境准备2.安装ceph组件3.ceph的存储命令 Spark操作cephceph-radosgw安装spark通过radosgw服务对ceph进行读写SparkReadCephSparkWriteCeph

前言

通过spark操作ceph(读/写操作),在操作之前,我是连ceph是什么都不知道的,因此这篇文章也是在我简单了解ceph后写的,可能有不足之处,请指教; spark是不能很好的和ceph对接的;虽然ceph提供的有api,但把数据读出来后,在转为rdd/dataFrame,麻烦不说,转换时中文也会乱码;将rdd/dataFrame写入ceph时,调用foreach向ceph写时,创建的ceph连接对象是没有实现序列化的,因为不能在foreach外只创建一个连接多次使用,当时测试时我是在foreach中频繁创建连接的,而且数据写出去后,中文乱码且数据都在一行。

Ceph集群安装 ceph部分组件介绍

OSD:Ceph 对象存储设备,它是Ceph集群中存储实际用户数据并响应客户端读操作请求的唯一组件。 MON:Ceph 监视器(Ceph monitor),MON组件通过一系列的map来跟踪整个集群的健康状态,一个MON为每一个组件维护一个独立的map,如OSD、MON、PG、CRUSH。MON不存储实际数据。 MGR: Ceph Manager,Ceph管理器软件,可以收集整个集群的所有状态。有仪表板插件

主机和每台机器安装的组件和启动的服务 主机安装组件启动服务lei-137ceph-deploy、ceph-mon、ceph-osd、ceph-mds、ceph-mgrmon、mgr、osdslave1ceph-mon、ceph-osd、ceph-mds、ceph-mgrmon、mgr、osdslave2ceph-mon、ceph-osd、ceph-mds、ceph-mgrmon、mgr、osd
集群安装步骤

注意:安装ceph-osd时,需要一个裸磁盘(没存数据的磁盘,空间最好别小于5G),必须是磁盘,不能是文件系统,请先准备好裸磁盘 看ceph官网的安装步骤是要设置双网卡安装,我这里因条件不允许,只用了单网卡,也不影响安装,只是在初始化mon命令有区别;

1.机器环境准备 1.主机名 2.主机名ip映射 3.时间同步ntp安装 4.修改为ali镜像源 阿里镜像源地址:https://developer.aliyun.com/mirror/ wget -O /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-7.repo wget -O /etc/yum.repos.d/epel.repo http://mirrors.aliyun.com/repo/epel-7.repo 5.在/etc/yum.repos.d/下创建ceph.repo文件,写入以下内容,可以根据需要安装的ceph版本自行更改baseurl的值 [ceph] name=ceph baseurl=https://mirrors.aliyun.com/ceph/rpm-nautilus/el7/x86_64/ enabled=1 gpgcheck=0 priority=1 [ceph-noarch] name=cephnoarch baseurl=https://mirrors.aliyun.com/ceph/rpm-nautilus/el7/noarch/ enabled=1 gpgcheck=0 priority=1 [ceph-source] name=Ceph source packages baseurl=https://mirrors.aliyun.com/ceph/rpm-nautilus/el7/SRPMS/ enabled=0 gpgcheck=1 type=rpm-md gpgkey=https://mirrors.aliyun.com/ceph/keys/release.asc priority=1 6.执行yum makecache 6.5.执行yum search ceph命令,查看刚才安装的ceph包 2.安装ceph组件 7.安装ceph相关的rpm包 集群机器说明:3台机器:lei-137(主节点)、slave1、slave2 7.1 lei-137(主节点)安装ceph-deploy工具,要先安装python yum -y install python-setuptools -y yum install ceph-deploy ceph-deploy --version #查看ceph-deploy安装版本,这里版本最好是2.0+ 7.2 lei-137、slave1、slave2节点安装ceph组件mon、osd、mds、mgr等 yum install -y ceph ceph-mon ceph-osd ceph-mds ceph-radosgw ceph-mgr -y rpm -qa | grep ceph #安装完后查看一下安装的包 在主节点lei-137上面随便一个地方创建一个文件夹,后续命令都在此文件夹下执行,mon,mgr,osd的初始化都在此文件夹下(方便管理) 8.部署monitor(在lei-137创建的文件夹下执行) #ceph-deploy new --public-network 10.0.82.0/24 --cluster-network 10.0.82.0/24 slave1 //因为没有双网卡,所以没有使用上面的命令安装 //在创建的文件夹下执行以下命令: ceph-deploy new lei-137 slave1 slave2 #lei-37 slave1 slave2:主机名,安装mon,此步骤会在创建的文件夹下生成3个配置文件:ceph-deploy-ceph.log、ceph.conf、ceph.mon.keyring ceph-deploy mon create-initial #初始化mon,此时会在创建的文件夹下又生成多个配置文件: ceph.bootstrap-*.keying,*代表名字不同的地方 ceph-deploy admin slave1 slave2 #将初始化生成的部分文件copy到slave1和slave2上,在slave1和slave2的/etc/ceph/文件夹下会多出ceph.client.admin.keyring文件 8.5 在slave1或slave2节点执行命令查看mon安装是否成功 ceph -s 命令 [root@slave1 ~]# ceph -s cluster: id: 9aaf658d-e215-4f6a-b939-3ea7774381b9 health: HEALTH_WARN mons are allowing insecure global_id reclaim clock skew detected on mon.slave1, mon.slave2 mon lei-137 is low on available space services: mon: 3 daemons, quorum lei-137,slave1,slave2 (age 5m) mgr: no daemons active osd: 0 osds: 0 up, 0 in data: pools: 0 pools, 0 pgs objects: 0 objects, 0 B usage: 0 B used, 0 B / 0 B avail pgs: ps -ef | grep ceph-mon #查看mon进程 mon相关命令查询: ceph-deploy mon -h #查询命令 ceph-deploy mon add 主机名 #添加新的mon 9.安装manager(在lei-137创建的文件夹下执行) ceph-deploy mgr create lei-137 slave1 slave2 ceph-deploy mgr -h #查询命令 ceph-deploy mgr create 主机名 #在主机上创建mgr 9.5 在slave1或slave2节点执行命令查看mgr安装是否成功,如果查询不到,那就再执行一下ceph-deploy admin slave1 slave2命令,复制一下文件,时间有点久,这里忘记要不要再次执行了 [root@slave1 ~]# ceph -s cluster: id: 9aaf658d-e215-4f6a-b939-3ea7774381b9 health: HEALTH_WARN OSD count 0 < osd_pool_default_size 3 mons are allowing insecure global_id reclaim clock skew detected on mon.slave1, mon.slave2 mon lei-137 is low on available space services: mon: 3 daemons, quorum lei-137,slave1,slave2 (age 18m) mgr: slave1(active, since 32s), standbys: lei-137, slave2 osd: 0 osds: 0 up, 0 in data: pools: 0 pools, 0 pgs objects: 0 objects, 0 B usage: 0 B used, 0 B / 0 B avail pgs: 10.安装osd(在lei-137创建的文件夹下执行) #/dev/vdb是之前准备好的裸磁盘:可以使用lsblk、fdisk -l 、df -h等命令查询磁盘 #先格式化一下裸磁盘 ceph-deploy disk zap lei-137 /dev/vdb ceph-deploy disk zap slave1 /dev/vdb ceph-deploy disk zap slave2 /dev/vdb #创建osd ceph-deploy osd create --data /dev/vdb lei-137 ceph-deploy osd create --data /dev/vdb slave1 ceph-deploy osd create --data /dev/vdb slave2 10.5 在slave1下执行ceph -s查看osd安装是否成功 #这里的警告,好像是我时间同步时,有1-2秒的误差,没有太精确导致的(环境准备时ntp没有认真搞);还有mon的使用空间太小(磁盘太小?) [root@slave1 ~]# ceph -s cluster: id: 9aaf658d-e215-4f6a-b939-3ea7774381b9 health: HEALTH_WARN mons are allowing insecure global_id reclaim clock skew detected on mon.slave1, mon.slave2 mon lei-137 is low on available space services: mon: 3 daemons, quorum lei-137,slave1,slave2 (age 35m) mgr: slave1(active, since 17m), standbys: lei-137, slave2 osd: 3 osds: 3 up (since 16s), 3 in (since 16s) task status: data: pools: 0 pools, 0 pgs objects: 0 objects, 0 B usage: 3.0 GiB used, 297 GiB / 300 GiB avail pgs: 集群到这里就启动成功了, 11.使用systemd管理ceph服务 systemctl status ceph\*.service ceph\*.target #查看进程 systemctl start ceph.target #启动所有服务的守护进程 systemctl stop ceph.target #停止所有服务的守护进程 //单个服务的启动/停止/重启命令 systemctl start/stop/restart ceph-osd.target systemctl start/stop/restart ceph-mds.target systemctl start/stop/restart ceph-mgr.target 3.ceph的存储命令 12. 存储池管理命令 ceph osd lspools ceph osd pool ls #列出已创建的存储池 ceph osd -h ceph osd create -h ceph osd create --help #查看osd相关的命令 ceph osd pool create test 64 64 #创建名为test的储存池 #osd创建命令,这里没有太深入研究参数的作用都是什么,毕竟是以完成代码为主 ceph osd pool create {pool-name} [{pg-num} [{pgp-num}]] [replicated] [crush-rule-name] [expected-num-objects] #修改储存池名字 ceph osd pool rename {current-pool-name} {new-pool-name} ceph osd pool rename a b #将名为a的存储池修改名字为b ceph osd pool get cephname size #查看cephname的副本数(默认3) ceph osd pool get cephname pg_num #查看cephname的pg数 #删除ceph存储池,有点麻烦 ceph osd pool rm cephname #报错如下: [root@slave1 ~]# ceph osd pool rm test Error EPERM: WARNING: this will *PERMANENTLY DESTROY* all data stored in pool test. If you are *ABSOLUTELY CERTAIN* that is what you want, pass the pool name *twice*, followed by --yes-i-really-really-mean-it. #删除存储池,第一次删除时可能会报错,根据提示后面加上--yes-i-really-really-mean-it,虽然加上了后,还会继续报错 //加上--yes-i-really-really-mean-it时,cephname也要多写一遍 ceph osd pool rm cephname cephname --yes-i-really-really-mean-it [root@slave1 ~]# ceph osd pool rm test test --yes-i-really-really-mean-it #如果记得没错,虽然按提示加上--yes-i-really-really-mean-it,但还是会报错,所以还需要修改ceph的配置文件 #在lei-13创建的文件夹中的cpeh.conf文件中加入配置: [mon] mon_allow_pool_delete = true #注意:ceph.conf配置文件,最后一行配置信息以后,还必须得多一个换行符!!!不然报错如下: [root@slave2 ~]# ceph -s 2022-04-01 16:19:52.504 7fe9e4426700 -1 Errors while parsing config file! 2022-04-01 16:19:52.504 7fe9e4426700 -1 read_conf: ignoring line 10 because it doesn't end with a newline! Please end the config file with a newline. cluster: id: 9aaf658d-e215-4f6a-b939-3ea7774381b9 health: HEALTH_WARN application not enabled on 1 pool(s) 1 daemons have recently crashed mons are allowing insecure global_id reclaim clock skew detected on mon.slave1, mon.slave2 mon lei-137 is low on available space services: mon: 3 daemons, quorum lei-137,slave1,slave2 (age 5m) mgr: slave2(active, since 2d), standbys: slave1, lei-137 osd: 3 osds: 3 up (since 2d), 3 in (since 4d) rgw: 3 daemons active (lei-137, slave1, slave2) task status: data: pools: 6 pools, 224 pgs objects: 200 objects, 3.4 KiB usage: 3.0 GiB used, 297 GiB / 300 GiB avail pgs: 224 active+clean #将ceph.conf推送到lei-137,slave1和slave2节点,虽然是在lei-137上修改的, #但也需要推一下(其实在/etc/ceph下还有个ceph.conf配置文件,覆盖的就是这个) ceph-deploy --overwrite-conf config push lei-137 slave1 slave2 #重启每个mon(每个节点都执行) systemctl restart ceph-mon.target #再次删除,即可删除成功 ceph osd pool rm cephname cephname --yes-i-really-really-mean-it ceph osd pool application enable cephname <app> 为存储池指定Ceph的类型,app的可选值:cephfs、rbd、rgw,也可以不指定 13.存储池数据相关命令 13.1 上传对象到存储池 rados -p cephname put filerename filepath cephname:存储池名字 filepath:上传到存储池文件 filerename:将文件上传到存储池后重命名的文件名 13.2 列出存储池中的对象 rados -p cephname ls #cephname:存储池名字 13.3 从存储池下载对象 rados -p cephname get filename savepathAndrename cephname:存储池名字 filename:要下载的存储池对象 savepathAndrename:存储池对象下载保存的路径和重命名的名字 13.4 删除存储池对象 rados -p cephname rm filename cephname:存储池名字 filename:要删除的对象名字 Spark操作ceph ceph-radosgw安装

虽然我上面说了spark不能很友好的对接ceph,但从百度来看,spark是可以操作S3协议的数据的,而ceph的radosgw组件是支持S3的restful的,因此想要使用spark直接操作ceph,还需要安装radosgw服务 安装radosgw参考博客

#每个节点都安装radosgw服务 yum install ceph-radosgw #下面这个命令还是在lei-137之前创建的文件夹下执行 ceph-deploy rgw create lei-137 slave1 slave2 #默认是以7480端口号启动的,查看端口号 netstat -tnlp |grep 7480 #创建radosgw用户,下面自动生成的access_key和secret_key很重要,spark连接时需要用到 radosgw-admin user create --uid=radosgw --display-name="radosgw" { "user_id": "radosgw", "display_name": "radosgw", "email": "", "suspended": 0, "max_buckets": 1000, "auid": 0, "subusers": [], "keys": [ { "user": "radosgw", "access_key": "DKOORDOMS6YHR2OW5M23", "secret_key": "OOBNCO0d03oiBaLCtYePPQ7gIeUR2Y7UuB24pBW4" } ], "swift_keys": [], "caps": [], "op_mask": "read, write, delete", "default_placement": "", "placement_tags": [], "bucket_quota": { "enabled": false, "check_on_raw": false, "max_size": -1, "max_size_kb": 0, "max_objects": -1 }, "user_quota": { "enabled": false, "check_on_raw": false, "max_size": -1, "max_size_kb": 0, "max_objects": -1 }, "temp_url_keys": [], "type": "rgw" } #安装s3cmd客户端 yum install -y s3cmd #设置配置信息 [root@lei-137~]# s3cmd --configure Enter new values or accept defaults in brackets with Enter. Refer to user manual for detailed description of all options. Access key and Secret key are your identifiers for Amazon S3. Leave them empty for using the env variables. Access Key: DKOORDOMS6YHR2OW5M23 #可以再次修改Access Key Secret Key: OOBNCO0d03oiBaLCtYePPQ7gIeUR2Y7UuB24pBW4 #可以再次修改Secret Key Default Region [US]: ZH Use "s3.amazonaws.com" for S3 Endpoint and not modify it to the target Amazon S3. S3 Endpoint [s3.amazonaws.com]: #这里推荐改为ip:port,spark连接时要用到 Use "%(bucket)s.s3.amazonaws.com" to the target Amazon S3. "%(bucket)s" and "%(location)s" vars can be used if the target S3 system supports dns based buckets. DNS-style bucket+hostname:port template for accessing a bucket [%(bucket)s.s3.amazonaws.com]: Encryption password is used to protect your files from reading by unauthorized persons while in transfer to S3 Encryption password: #如果测试,这个密码可以不设;在调用S3Api时好像需要此值 Path to GPG program [/usr/bin/gpg]: When using secure HTTPS protocol all communication with Amazon S3 servers is protected from 3rd party eavesdropping. This method is slower than plain HTTP, and can only be proxied with Python 2.7 or newer Use HTTPS protocol [Yes]: no On some networks all internet access must go through a HTTP proxy. Try setting it here if you can't connect to S3 directly HTTP Proxy server name: New settings: Access Key: DKOORDOMS6YHR2OW5M23 Secret Key: OOBNCO0d03oiBaLCtYePPQ7gIeUR2Y7UuB24pBW4 Default Region: ZH S3 Endpoint: s3.amazonaws.com #如果上面修改了Endpoint,这里就不是默认值 DNS-style bucket+hostname:port template for accessing a bucket: %(bucket)s.s3.amazonaws.com Encryption password: Path to GPG program: /usr/bin/gpg Use HTTPS protocol: False HTTP Proxy server name: HTTP Proxy server port: 0 Test access with supplied credentials? [Y/n] n Save settings? [y/N] y Configuration saved to '/root/.s3cfg' #配置信息都保存到/root/.s3cfg文件中了,我还把里面的host_base属性改为ip:port形式了 [root@lei-137~]# cat .s3cfg [default] access_key = DKOORDOMS6YHR2OW5M23 access_token = add_encoding_exts = add_headers = bucket_location = US ca_certs_file = cache_file = check_ssl_certificate = True check_ssl_hostname = True cloudfront_host = cloudfront.amazonaws.com content_disposition = content_type = default_mime_type = binary/octet-stream delay_updates = False delete_after = False delete_after_fetch = False delete_removed = False dry_run = False enable_multipart = True encrypt = False expiry_date = expiry_days = expiry_prefix = follow_symlinks = False force = False get_continue = False gpg_command = /usr/bin/gpg gpg_decrypt = %(gpg_command)s -d --verbose --no-use-agent --batch --yes --passphrase-fd %(passphrase_fd)s -o %(output_file)s %(input_file)s gpg_encrypt = %(gpg_command)s -c --verbose --no-use-agent --batch --yes --passphrase-fd %(passphrase_fd)s -o %(output_file)s %(input_file)s gpg_passphrase = guess_mime_type = True host_base = 我的ip:7480 host_bucket = %(bucket).ceph-node1:7480 human_readable_sizes = False invalidate_default_index_on_cf = False invalidate_default_index_root_on_cf = True invalidate_on_cf = False kms_key = limit = -1 limitrate = 0 list_md5 = False log_target_prefix = long_listing = False max_delete = -1 mime_type = multipart_chunk_size_mb = 15 multipart_max_chunks = 10000 preserve_attrs = True progress_meter = True proxy_host = proxy_port = 0 put_continue = False recursive = False recv_chunk = 65536 reduced_redundancy = False requester_pays = False restore_days = 1 restore_priority = Standard secret_key = OOBNCO0d03oiBaLCtYePPQ7gIeUR2Y7UuB24pBW4 send_chunk = 65536 server_side_encryption = False signature_v2 = False signurl_use_https = False simpledb_host = sdb.amazonaws.com skip_existing = False socket_timeout = 300 stats = False stop_on_error = False storage_class = throttle_max = 100 upload_id = urlencoding_mode = normal use_http_expect = False use_https = False use_mime_magic = True verbosity = WARNING website_endpoint = http://%(bucket)s.s3-website-%(location)s.amazonaws.com/ website_error = website_index = index.html #s3cmd的一些命令 s3cmd --help查看 s3cmd mb s3://first-bucket #创建名为first-bucket的桶,前面的s3://不可省 Bucket 's3://first-bucket/' created s3cmd ls #查看所有的桶 2019-02-15 07:45 s3://first-bucket s3cmd put /etc/hosts s3://first-bucket #将本地文件上传到桶中 upload: '/etc/hosts' -> 's3://first-bucket/hosts' [1 of 1] 239 of 239 100% in 1s 175.80 B/s done s3cmd ls s3://first-bucket #查看桶中所有对象 2019-02-15 07:47 239 s3://first-bucket/hosts spark通过radosgw服务对ceph进行读写

连接ceph所需的主要依赖(spark的依赖不再列举)

<dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-scala_2.11</artifactId> <version>2.13.1</version> </dependency> <!--我之前还引了一个jackson-databind依赖,实际是不需要的,jackson-module引的有jackson-databind--> <!--注意:这里hadoop-aws的版本很重要,之前我引的2.7.2和2.6.0存在问题,问题会在下面的代码中说明--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-aws</artifactId> <version>2.9.0</version> </dependency> <!--如果hadoop-aws的版本是2.7.2,那么就不需要hadoop-client依赖, 如果hadoop-aws版本是2.8.5(我直接从2.8.5版本测的,可能再低点版本也一样),必须引hadoop-client依赖--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.9.0</version> </dependency> SparkReadCeph val spark = SparkSession.builder().master("local[*]").appName("read").getOrCreate() //上面设置的accessKey和secretKey spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "N33P4EEC8LWGTBEBZ8R1") spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "123") //上面设置的endpoint,我设置的ip:port形式 spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "ip:7480") //不使用https连接,为true报错 spark.sparkContext.hadoopConfiguration.set("fs.s3a.connection.ssl.enabled", "false") //读取Bbb桶下的Person.txt对象 //ceph的路径是s3:/bucketName/dir,但这里连接必须是s3a,否则报错 val rdd =spark.sparkContext.textFile("s3a://Bbb/Person.txt") rdd.foreach(println) SparkWriteCeph val spark = SparkSession.builder().master("local[*]").appName("write").getOrCreate() //上面设置的accessKey和secretKey spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "N33P4EEC8LWGTBEBZ8R1") spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "123") //上面设置的endpoint,我设置的ip:port形式 spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "ip:7480") //不使用https连接,为true报错 spark.sparkContext.hadoopConfiguration.set("fs.s3a.connection.ssl.enabled", "false") val seq= Seq("hedafjcbj,ae,bvjdbc,bvaevndb n,dvbkllo","worladfkj") val rdd: RDD[String] = spark.sparkContext.makeRDD(seq) //保存数据为文本格式 //ceph的路径是s3:/bucketName/dir,但这里连接必须是s3a,否则报错 rdd.repartition(1).saveAsTextFile("s3a://Bbb/290/") rdd.toDF().repartition(1).write.mode(SaveMode.Overwrite).text("s3a://Bbb/df/") rdd.toDF().repartition(1).write.mode(SaveMode.Overwrite).text("s3a://Bbb/df/") /** *保存数据为csv */ //读csv数据 val df: DataFrame = spark.read.format("csv") .option("sep", ",") .option("inferSchema", "true") .option("header", "true") .load("D:\\cephTestData\\csv.txt") df.show() //将数据写到ceph上并保存未csv格式 //ceph的路径是s3:/bucketName/dir,但这里连接必须是s3a,否则报错 df.repartition(1).write.format("csv") .mode(SaveMode.Overwrite) .option("header","true") .option("delimiter",",") .save("s3a://Bbb/df/") /** *保存数据为json */ val json: DataFrame = spark.read.format("json") .load("D:\\cephTestData\\cephJson.json") json.show() //将数据写到ceph上并保存未json格式 //ceph的路径是s3:/bucketName/dir,但这里连接必须是s3a,否则报错 json.repartition(1).write.mode(SaveMode.Overwrite).json("s3a://Bbb/json/") spark.stop() 注意:平常向外写数据时,有的可以把参数封装到.options()方法中,但上面的4个参数fs.s3a.access.key、 fs.s3a.secret.key、fs.s3a.endpoint、fs.s3a.connection.ssl.enabled不行,测试时报错

再说开发时遇到的错误: 最多的就是遇到的403错误,而且403的错误还都不尽一样

导的hadoop-aws依赖为2.6.0版本时,最开始报的403错误: ResponseCode: 403, ResponseStatus: Forbidden, XML Error Message: <?xml version="1.0" encoding="UTF-8"?> <Error><Code>InvalidAccessKeyId</Code><Message>The AWS Access Key Id you provided does not exist in our records. </Message><AWSAccessKeyId>6BI59IIO7WFVTPNDBSWT</AWSAccessKeyId> <RequestId>HSXXXFRTD4EVBVQJ</RequestId><HostId>nbgkYfR/eWrL44sDD/Llt7vGaUrytD8izv5TwslNxexHsi3J5DyuMp3FGeY7HGkhdBmlVw1KjQc=</HostId></Error> 看意思是accessKey和secretKey不对报的错,但我使用S3的Api直接连接,相同的accessKey和secretKey是可以连接ceph的, 因此可以推翻accessKey和secretKey的错误;后来百度好像hadoop2.7.0才开始支持S3(太久了记不太清了),然后才把依赖改为2.7.2,最终成功读取; 但向ceph写数据时又报错,具体什么错忘记了,然后又把依赖升级到2.8.5,成功把数据写到ceph上 导的hadoop-aws依赖为2.8.5以上版本时,又报403错误: Unable to load filesystem:s3a 这个错误,是因为jar包版本冲突引起的,因为上面咱们引入了hadoop-aws,里面有一些hadoop版本的依赖, 我项目中已经引过一些hadoop的依赖,冲突了;

小结:如果代码是按照上面去写的,local模式一般是不会有问题的,如果报错403和一些其他的classNotFound错误,基本上都是依赖惹的祸,仔细排查即可;yarn模式如果也报403或者classNotFound,请看下面的补充

补充:因为之前hadoop使用的是2.6.0,这里升级为2.9.0版本,使用的spark是2.3.4, spark中对应的hadoop的jar包还是2.6.0版本(没升级sparl,直接copy过来用了),导致local模式能读写, 提交到yarn上时各种报方法找不到,比如: java.lang.NoSuchMethodError: org.apache.hadoop.conf.Configuration.reloadExistingConfigurations 后面发现了,然后把spark中有关hadoop的jar包,都重新上传为2.9.0的 然后单独把代码写到一个测试项目中,打包部署到yarn上,是可以成功写入的(只测了写入,读取应该问题也不大)

总结:因为对ceph了解甚少,主要是以完成任务(功能先实现)为主,所以有可能有说错的地方,还请见谅,或许后续有机会深度学习ceph了会再做补充;有发现错误的地方,也欢迎留言指出。


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Spark操作Ceph