python如何处理spark上的数据_Pyspark获取并处理RDD数据代码实例

 2023-09-07 阅读 20 评论 0

摘要:弹性分布式数据集(RDD)是一组不可变的JVM对象的分布集,可以用于执行高速运算,它是Apache Spark的核心。在pyspark中获取和处理RDD数据集的方法如下:python调用shell、1. 首先是导入库和环境配置(本测试在linux的pycharm上完成)import osfrom pyspark imp

弹性分布式数据集(RDD)是一组不可变的JVM对象的分布集,可以用于执行高速运算,它是Apache Spark的核心。

在pyspark中获取和处理RDD数据集的方法如下:

python调用shell、1. 首先是导入库和环境配置(本测试在linux的pycharm上完成)

import os

from pyspark import SparkContext, SparkConf

spark调用python?from pyspark.sql.session import SparkSession

os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"

conf = SparkConf().setAppName('test_rdd')

Python爬取数据?sc = SparkContext('local', 'test', conf=conf)

spark = SparkSession(sc)

2. 然后,提供hdfs分区数据的路径或者分区表名

txt_File = r"hdfs://host:port/apps/hive/warehouse/数据库名.db/表名/分区名/part-m-00029.deflate" # part-m-00029.deflate

# txt_File = r"hdfs://host:port/apps/hive/warehouse/数据库名.db/表名" # hive table

3. sc.textFile进行读取,得到RDD格式数据,参数中还可设置数据被划分的分区数

txt_ = sc.textFile(txt_File)

4. 基本操作:

type(txt_):显示数据类型,这时属于 'pyspark.rdd.RDD'

txt_.first():获取第一条数据

txt_.take(2):获取前2条数据,形成长度为2的list

txt_.take(2)[1].split('\1')[1]:表示获取前两条中的第[1]条数据(也就是第2条,因为python的索引是从0开始的),并以 '\1'字符分隔开(这要看你的表用什么作为分隔符的),形成list,再获取该list的第2条数据

txt_.map(lambda x:x.split('\1')):使用lambda函数和map函数快速处理每一行数据,这里表示将每一行以 '\1'字符分隔开,每一行返回一个list;此时数据结构是:'pyspark.rdd.PipelinedRDD'

txt_.map(lambda x:(x, x.split('\1'))).filter(lambda y:y[0].startswith('北京')):表示在返回 (x, x.split('\1')) 后,进行筛选filter,获取其中以 '北京' 开头的行,并按照相同格式 (例如,这里是(x, x.split('\1'))格式,即原数据+分割后的列表数据) 返回数据

txt_.collect():返回所有RDD数据元素,当数据量很大时谨慎操作

txt_.toDF():不能直接转成DataFrame格式,需要设置Schema

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/2/18664.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息