やったこと
- PostgreのRDSから作成されたsnapshotに対してS3 exportを実施し、parquetファイルを出力した
- そのファイルをGlue Jobで読み込みDynamicFrame, (PySpark)Dataframeに変換すると、データ型の不一致が原因でPySparkで特定のカラムの値のみ正しく読み込まれていない問題に遭遇したため、備忘録として残しておく
詳細
下記のようにs3からparquetデータを読み込みDynamicFrameを作成し、ApplyMappingを使用してPySparkのDataFrameに変換していたが、xxx_codeの値が全てNullになっていた。(snapshot取得元であるDBで定義されているxxx_codeのデータ型はstring)
snapshotDynmc = glueContext.create_dynamic_frame.from_options(
connection="s3",
connection_options={"s3://snapshot-output-bucket/targetdata"},
format="parquet",
transformation_ctx="snapshotDynmc"
)
mappings = [
("xxx_id", "int", "xxx_id", "string"),
("xxx_code", "string", "xxx_code", "string"),
("xxx_name", "string", "xxx_name", "string")
]
snapshot_df = ApplyMapping.apply(frame=snapshotDynmc,
mappings = mappings,
transformation_ctx="snapshot_df").toDF()
snapshot_df.show()
snapshot_df.show()
の出力結果
#|xxx_id|xxx_code|xxx_name|
#|1 |null |name1|
#|2 |null |name2|
#|3 |null |name3|
原因
mappingのデータ型とparquetファイルに定義されているデータ型が一致していないため発生していた
入力型のデータ型をshort
にすると正しく値が出力されることを確認した
mappings = [
("xxx_id", "int", "xxx_id", "string"),
# ("xxx_code", "string", "xxx_code", "string"),
("xxx_code", "short", "xxx_code", "string"),
("xxx_name", "string", "xxx_name", "string")
]
原因調査の経緯
parquet内のschema情報を確認
parquet-toolsでparquet内のschema情報を参照するとxxx_codeカラムのデータ型はint32
になっていた
...
optional int32 xxx_code (INTEGER(16,true));
...
int32と記載されているがmappingを("xxx_code", "int", "xxx_code", "string"),
に変更しても解消しなかった
DataFrameのschema情報を確認
snapshot_df = ApplyMapping.apply(frame=snapshotDynmc,
mappings = mappings,
transformation_ctx="snapshot_df").toDF()
snapshot_df.schema()
... StructField(xxx_id, IntegertTyepe, true),StructField(xxx_code, StringType, true),StructField(xxx_name, StringType, true)
DataFrameでは正しくstringになっている
DynamicFrameのschema情報を確認
snapshotDynmc = glueContext.create_dynamic_frame.from_options(
connection="s3",
connection_options={"s3://snapshot-output-bucket/targetdata"},
format="parquet",
transformation_ctx="snapshotDynmc"
)
snapshotDynmc.printSchema()
出力結果
# root
# | -- xxx_id: int
# | -- xxx_code: short
# | -- xxx_name: string
shortになっているため、これが原因だった
まとめ
- ApplyMappingの変換前のデータ型はDynamicFrameのschema情報を元に設定する(snapshot取得元のRDSのデータ型を頼りにしない)
- s3 exportで出力されたparquetのデータ型が常に正しいとは限らない
- parquetにGlue Crawlerを実行してtableを作成し、そのtableからDynamicFrameを生成した方が安全そう