PySPARK带多组参数和标签的SparkSQL批量数据导出到S3的程序

news/2025/2/3 6:36:32 标签: 大数据, spark, python, 数据仓库, sql

设计一个基于多个带标签SparkSQL模板作为配置文件和多组参数的PySPARK代码程序,实现根据不同的输入参数自动批量地将数据导出为Parquet、CSV和Excel文件到S3上,标签和多个参数(以“_”分割)为组成导出数据文件名,文件已经存在则覆盖原始文件。
代码如下:

python">import json
from pyspark.sql import SparkSession

def load_config(config_path):
    with open(config_path, 'r') as f:
        return json.load(f)

def main(config_path, base_s3_path):
    # 初始化SparkSession,配置S3和Excel支持
    spark = SparkSession.builder \
        .appName("DataExportJob") \
        .config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1") \
        .getOrCreate()

    # 配置S3访问(根据实际环境配置)
    spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", "YOUR_ACCESS_KEY")
    spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "YOUR_SECRET_KEY")
    spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")

    config = load_config(config_path)

    for template in config['templates']:
        label = template['label']
        sql_template = template['sql_template']
        parameters_list = template['parameters']

        for params in parameters_list:
            # 验证参数数量是否匹配
            placeholders = sql_template.count('{')
            if len(params) != placeholders:
                raise ValueError(f"参数数量不匹配,模板需要{placeholders}个参数,但当前参数为{len(params)}个")

            # 替换SQL中的占位符
            formatted_sql = sql_template.format(*params)
            df = spark.sql(formatted_sql)

            # 生成文件名参数部分
            param_str = "_".join(params)
            base_filename = f"{label}_{param_str}"

            # 定义输出路径
            output_paths = {
                'parquet': f"{base_s3_path}/parquet/{base_filename}",
                'csv': f"{base_s3_path}/csv/{base_filename}",
                'excel': f"{base_s3_path}/excel/{base_filename}.xlsx"
            }

            # 写入Parquet
            df.write.mode('overwrite').parquet(output_paths['parquet'])

            # 写入CSV(自动生成header)
            df.write.mode('overwrite') \
                .option("header", "true") \
                .csv(output_paths['csv'])

            # 写入Excel(使用spark-excel包)
            df.write.format("com.crealytics.spark.excel") \
                .option("header", "true") \
                .option("inferSchema", "true") \
                .mode("overwrite") \
                .save(output_paths['excel'])

    spark.stop()

if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument('--config', type=str, required=True, help='Path to config JSON file')
    parser.add_argument('--s3-path', type=str, required=True, help='Base S3 path (e.g., s3a://your-bucket/data)')
    args = parser.parse_args()

    main(args.config, args.s3_path)

配置文件示例(config.json)

{
  "templates": [
    {
      "label": "sales_report",
      "sql_template": "SELECT * FROM sales WHERE date = '{0}' AND region = '{1}'",
      "parameters": [
        ["202301", "north"],
        ["202302", "south"]
      ]
    },
    {
      "label": "user_activity",
      "sql_template": "SELECT user_id, COUNT(*) AS cnt FROM activity WHERE day = '{0}' GROUP BY user_id",
      "parameters": [
        ["2023-01-01"],
        ["2023-01-02"]
      ]
    }
  ]
}

使用说明

  1. 依赖管理

    • 确保Spark集群已安装Hadoop AWS和Spark Excel依赖:
      spark-submit --packages com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1 your_script.py
      
  2. S3配置

    • 替换代码中的YOUR_ACCESS_KEYYOUR_SECRET_KEY为实际AWS凭证
    • 根据S3兼容存储调整endpoint(如使用MinIO需特殊配置)
  3. 执行命令

    spark-submit --packages com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1 \
    data_export.py --config config.json --s3-path s3a://your-bucket/exports
    

输出结构

s3a://your-bucket/exports
├── parquet
│   ├── sales_report_202301_north
│   ├── sales_report_202302_south
│   └── user_activity_2023-01-01
├── csv
│   ├── sales_report_202301_north
│   ├── sales_report_202302_south
│   └── user_activity_2023-01-01
└── excel
    ├── sales_report_202301_north.xlsx
    ├── sales_report_202302_south.xlsx
    └── user_activity_2023-01-01.xlsx

http://www.niftyadmin.cn/n/5840563.html

相关文章

青少年编程与数学 02-008 Pyhon语言编程基础 15课题、运用函数

青少年编程与数学 02-008 Pyhon语言编程基础 15课题、运用函数 一、函数的运用1. 问题分解2. 定义函数接口3. 实现函数4. 测试函数5. 组合函数6. 处理错误和异常7. 优化和重构示例:使用函数解决复杂数学问题 二、递归三、递归函数1. 确定基本情况(Base C…

Kotlin/Js Kotlin 编译为 JS (尝试)

Kotlin/JS 是 Kotlin 编程语言的一个目标平台,它允许你使用 Kotlin 编写在 JavaScript 环境中运行的代码。通过 Kotlin/JS,你可以将你的 Kotlin 代码编译成 JavaScript 代码,通常是兼容 ECMAScript 5 或更高版本的形式。这使得 Kotlin 不仅可…

React基础知识回顾详解

以下是React从前端面试基础到进阶的系统性学习内容,包含核心知识点和常见面试题解析: 一、React基础核心 JSX原理与本质 JSX编译过程(Babel转换)虚拟DOM工作原理面试题:React为何使用className而不是class?…

AI开发学习之——PyTorch框架

PyTorch 简介 PyTorch (Python torch)是由 Facebook AI 研究团队开发的开源机器学习库,广泛应用于深度学习研究和生产。它以动态计算图和易用性著称,支持 GPU 加速计算,并提供丰富的工具和模块。 PyTorch的主要特点 …

文件系统分析

文件系统与磁盘管理详解 一、存储设备基础 1. 存储设备类型对比 设备类型特点典型接口应用场景机械硬盘依赖磁头机械读写,转速影响性能(5400/7200/10000rpm),价格低容量大SATA/SAS冷数据存储、备份固态硬盘无机械结构&#xff…

开源模型应用落地-DeepSeek-R1-Distill-Qwen-7B与vllm实现推理加速的正确姿势(一)

一、前言 在当今人工智能技术迅猛发展的时代,各类人工智能模型如雨后春笋般不断涌现,其性能的优劣直接影响着应用的广度与深度。从自然语言处理到计算机视觉,从智能安防到医疗诊断,AI 模型广泛应用于各个领域,人们对其准确性、稳定性和高效性的期望也与日俱增。 在此背景下…

关于系统重构实践的一些思考与总结

文章目录 一、前言二、系统重构的范式1.明确目标和背景2.兼容屏蔽对上层的影响3.设计灰度迁移方案3.1 灰度策略3.2 灰度过程设计3.2.1 case1 业务逻辑变更3.2.2 case2 底层数据变更(数据平滑迁移)3.2.3 case3 在途新旧流程兼容3.2.4 case4 接口变更3.2.5…

使用 PyTorch 实现逻辑回归并评估模型性能

1. 逻辑回归简介 逻辑回归是一种用于解决二分类问题的算法。它通过一个逻辑函数(Sigmoid 函数)将线性回归的输出映射到 [0, 1] 区间内,从而将问题转化为概率预测问题。如果预测概率大于 0.5,则将样本分类为正类;否则分…