irpas技术客

pyspark使用分布式xgboost_gyl2016_pyspark分布式编程

网络投稿 7101

亲测跑通

环境:

? ? ? Python 3.6.5

? ? ? Pyspark:2.4.5

? ? ? Spark: 2.4.3

步骤:

? ? 第一步:配置好环境

? ? 第二步:下载相关文件(下载地址)

? ?xgboost4j-0.72.jar? ?xgboost4j-spark-0.72.jar?? ?Sparkxgb.zip

? ? 第三步:

关键点1:将xgboost4j-0.72.jar和Xgboost4j-spark-0.72.jar添加到job中(使用--jars或者配置spark.jars)关键点2:需要每个executor执行:spark.sparkContext.addPyFile("hdfs:///xxxx/xxx/sparkxgb.zip")将以上3个包放入:

? ? ? ? 代码示例:

from sparkxgb import XGBoostEstimator from pyspark.sql import SparkSession from pyspark.sql.functions import * import os os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars xgboost4j-spark-0.72.jar,xgboost4j-0.72.jar pyspark-shell' # 本地运行时,jar包放在当前代码的路径下;提交任务时,利用--jars参数指定 spark = SparkSession \ .builder \ .master("local") \ .appName("PythonWordCount") \ .getOrCreate() spark.sparkContext.addPyFile("hdfs:///xxxx/xxx/sparkxgb.zip") # sparkxgb.zip包路径,如果本地运行则改成本地路径 # Load Data dataPath = "xxx\\spark-2.4.3-bin-hadoop2.6\\data\\mllib\\sample_binary_classification_data.txt" dataDF = spark.read.format("libsvm").load(dataPath) # Split into Train/Test trainDF, testDF = dataDF.randomSplit([0.8, 0.2], seed=1000) # Define and train model xgboost = XGBoostEstimator( # General Params nworkers=1, nthread=1, checkpointInterval=-1, checkpoint_path="", use_external_memory=False, silent=0, missing=float("nan"), # Column Params featuresCol="features", labelCol="label", predictionCol="prediction", weightCol="weight", baseMarginCol="baseMargin", # Booster Params booster="gbtree", base_score=0.5, objective="binary:logistic", eval_metric="error", num_class=2, num_round=2, seed=None, # Tree Booster Params eta=0.3, gamma=0.0, max_depth=6, min_child_weight=1.0, max_delta_step=0.0, subsample=1.0, colsample_bytree=1.0, colsample_bylevel=1.0, reg_lambda=0.0, alpha=0.0, tree_method="auto", sketch_eps=0.03, scale_pos_weight=1.0, grow_policy='depthwise', max_bin=256, # Dart Booster Params sample_type="uniform", normalize_type="tree", rate_drop=0.0, skip_drop=0.0, # Linear Booster Params lambda_bias=0.0 ) xgboost_model = xgboost.fit(trainDF) # Transform test set xgboost_model.transform(testDF).show() # Write model/classifier xgboost.write().overwrite().save("xgboost_class_test") xgboost_model.write().overwrite().save("xgboost_class_test.model")

附:xgboost4j-0.90.jar、xgboost4j-spark-0.90.jar、Sparkxgb.zip版本测试——运行不成功,下面是测试情况

? ? 注意:在0.90里没有XGBoostEstimator

? ?0.90版本—代码示例:

from sparkxgb import XGBoostClassifier from pyspark.sql import SparkSession from pyspark.sql.functions import * import os os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars xgboost4j-spark-0.90.jar,xgboost4j-0.90.jar pyspark-shell' # jar包放在当前代码的路径下 spark = SparkSession \ .builder \ .master("local") \ .appName("PythonWordCount") \ .getOrCreate() spark.sparkContext.addPyFile("hdfs:///xxxx/xxx/sparkxgb.zip") # Load Data dataPath = "xxx\\spark-2.4.3-bin-hadoop2.6\\data\\mllib\\sample_binary_classification_data.txt" dataDF = spark.read.format("libsvm").load(dataPath) # Split into Train/Test trainDF, testDF = dataDF.randomSplit([0.8, 0.2], seed=1000) # Define and train model xgboost = XGBoostClassifier( # General Params nworkers=1, nthread=1, checkpointInterval=-1, checkpoint_path="", use_external_memory=False, silent=0, missing=float("nan"), # Column Params featuresCol="features", labelCol="label", predictionCol="prediction", weightCol="weight", baseMarginCol="baseMargin", # Booster Params booster="gbtree", base_score=0.5, objective="binary:logistic", eval_metric="error", num_class=2, num_round=2, seed=None, # Tree Booster Params eta=0.3, gamma=0.0, max_depth=6, min_child_weight=1.0, max_delta_step=0.0, subsample=1.0, colsample_bytree=1.0, colsample_bylevel=1.0, reg_lambda=0.0, alpha=0.0, tree_method="auto", sketch_eps=0.03, scale_pos_weight=1.0, grow_policy='depthwise', max_bin=256, # Dart Booster Params sample_type="uniform", normalize_type="tree", rate_drop=0.0, skip_drop=0.0, # Linear Booster Params lambda_bias=0.0 ) xgboost_model = xgboost.fit(trainDF) # Transform test set xgboost_model.transform(testDF).show() # Write model/classifier xgboost.write().overwrite().save("xgboost_class_test") xgboost_model.write().overwrite().save("xgboost_class_test.model")

会报错:

Traceback (most recent call last): ??File "D:/gyl/scalaProgram/python_OwnerIdentify/test.py", line 48, in <module> ????missing=float("+inf")) ??File "D:\Program Files\python\python3\lib\site-packages\pyspark\__init__.py", line 110, in wrapper ????return func(self, **kwargs) ??File "D:\software\bigData\pyspark_study-master\source\pyspark-xgboost\sparkxgb.zip\sparkxgb\xgboost.py", line 85, in __init__ ??File "D:\software\bigData\pyspark_study-master\source\pyspark-xgboost\sparkxgb.zip\sparkxgb\common.py", line 68, in __init__ ??File "D:\Program Files\python\python3\lib\site-packages\pyspark\ml\wrapper.py", line 67, in _new_java_obj ????return java_obj(*java_args) TypeError: 'JavaPackage' object is not callable

参考:

1、https://github.com/dmlc/xgboost/issues/1698

? ? ? ? ? ? ? ? ? 注意:缺失值用 float("+inf")

2、https://zhuanlan.zhihu.com/p/273756067?utm_source=com.tencent.wework

?

?

?

?

?

?

?

?

?

?

?

?


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #pyspark分布式编程 #亲测跑通环境 #Python #365