במדריך זה נתמקד בעיקר בקריאה/טעינה של קובץ הפרקט לתוך PySpark DataFrame/SQL באמצעות הפונקציה read.parquet() הזמינה במחלקה pyspark.sql.DataFrameReader.
נושא התוכן:
קרא את קובץ הפרקט ל-PySpark DataFrame
קרא את קובץ הפרקט ל- PySpark SQL
Pyspark.sql.DataFrameReader.parquet()
פונקציה זו משמשת לקריאת קובץ הפרקט ולטעינתו לתוך PySpark DataFrame. זה לוקח את הנתיב/שם הקובץ של קובץ הפרקט. אנחנו יכולים פשוט להשתמש בפונקציה read.parquet() מכיוון שזו הפונקציה הגנרית.
תחביר:
בוא נראה את התחביר של read.parquet():
spark_app.read.parquet(file_name.parquet/path)ראשית, התקן את מודול PySpark באמצעות הפקודה pip:
pip להתקין pyspark
קבלו את קובץ הפרקט
כדי לקרוא קובץ פרקט צריך את הנתונים שבהם נוצר קובץ הפרקט מאותם נתונים. בחלק זה נראה כיצד ליצור קובץ פרקט מה-PySpark DataFrame.
בואו ניצור PySpark DataFrame עם 5 רשומות ונכתוב את זה לקובץ הפרקט 'industry_parquet'.
ייבוא pysparkמ-pyspark.sql ייבוא SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( 'רמז לינוקס' ).getOrCreate()
# צור את מסגרת הנתונים המאחסנת את פרטי התעשייה
industry_df = linuxhint_spark_app.createDataFrame([Row(Type= 'חַקלָאוּת' ,אזור= 'ארה'ב' ,
דירוג= 'חַם' ,סה'כ_עובדים= 100 ),
Row(Type= 'חַקלָאוּת' ,אזור= 'הוֹדוּ' ,דירוג= 'חַם' ,סה'כ_עובדים= 200 ),
Row(Type= 'התפתחות' ,אזור= 'ארה'ב' ,דירוג= 'נעים' ,סה'כ_עובדים= 100 ),
Row(Type= 'חינוך' ,אזור= 'ארה'ב' ,דירוג= 'מגניב' ,סה'כ_עובדים= 400 ),
Row(Type= 'חינוך' ,אזור= 'ארה'ב' ,דירוג= 'נעים' ,סה'כ_עובדים= עשרים )
])
# מסגרת נתונים בפועל
industry_df.show()
# כתוב את industry_df לקובץ הפרקט
industry_df.coalesce( 1 ).write.parquet( 'פרקט_תעשייה' )
תְפוּקָה:
זהו ה-DataFrame שמחזיק 5 רשומות.
נוצר קובץ פרקט עבור ה-DataFrame הקודם. כאן, שם הקובץ שלנו עם סיומת הוא 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet'. אנו משתמשים בקובץ זה בכל המדריך.
קרא את קובץ הפרקט ל-PySpark DataFrame
יש לנו את קובץ הפרקט. בואו נקרא את הקובץ הזה באמצעות הפונקציה read.parquet() ונטען אותו לתוך PySpark DataFrame.
ייבוא pysparkמ-pyspark.sql ייבוא SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( 'רמז לינוקס' ).getOrCreate()
# קרא את קובץ הפרקט לתוך אובייקט dataframe_from_parquet.
dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )
# הצג את ה-dataframe_from_parquet-DataFrame
dataframe_from_parquet.show()
תְפוּקָה:
אנו מציגים את ה-DataFrame באמצעות שיטת show() אשר נוצרה מקובץ הפרקט.
שאילתות SQL עם קובץ פרקט
לאחר טעינה ל-DataFrame, ניתן יהיה ליצור את טבלאות SQL ולהציג את הנתונים הקיימים ב-DataFrame. עלינו ליצור TEMPORARY VIEW ולהשתמש בפקודות SQL כדי להחזיר את הרשומות מה-DataFrame שנוצר מקובץ הפרקט.
דוגמה 1:
צור תצוגה זמנית בשם 'Sectors' והשתמש בפקודה SELECT כדי להציג את הרשומות ב-DataFrame. אתה יכול להתייחס לזה הדרכה שמסביר כיצד ליצור VIEW ב-Spark – SQL.
ייבוא pysparkמ-pyspark.sql ייבוא SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( 'רמז לינוקס' ).getOrCreate()
# קרא את קובץ הפרקט לתוך אובייקט dataframe_from_parquet.
dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )
# צור תצוגה מקובץ הפרקט הנ'ל בשם - 'מגזרים'
dataframe_from_parquet.createOrReplaceTempView( 'מגזרים' )
# שאילתה להצגת כל הרשומות מהמגזרים
linuxhint_spark_app.sql( 'בחר * ממגזרים' ).הופעה()
תְפוּקָה:
דוגמה 2:
באמצעות התצוגה הקודמת, כתוב את שאילתת SQL:
- כדי להציג את כל הרשומות מהמגזרים השייכים ל'הודו'.
- להציג את כל הרשומות מהמגזרים עם עובד שגדול מ-100.
linuxhint_spark_app.sql( 'בחר * ממגזרים שבהם Area='India'' ).הופעה()
# שאילתה להציג את כל הרשומות מהמגזרים עם עובדים מעל 100
linuxhint_spark_app.sql( 'בחר * מתוך סקטורים שבהם Total_employees>100' ).הופעה()
תְפוּקָה:
יש רק שיא אחד עם אזור שהוא 'הודו' ושני רשומות עם עובדים שהוא יותר מ-100.
קרא את קובץ הפרקט ל- PySpark SQL
ראשית, עלינו ליצור VIEW באמצעות הפקודה CREATE. באמצעות מילת המפתח 'נתיב' בתוך שאילתת SQL, נוכל לקרוא את קובץ הפרקט ל-Spark SQL. לאחר הנתיב, עלינו לציין את שם הקובץ/מיקום הקובץ.
תחביר:
spark_app.sql( 'צור תצוגה זמנית view_name באמצעות אפשרויות פרקט (נתיב ' file_name.parquet ')' )דוגמה 1:
צור תצוגה זמנית בשם 'Sector2' וקרא את קובץ הפרקט לתוכו. באמצעות הפונקציה sql(), כתוב את שאילתת הבחירה כדי להציג את כל הרשומות הקיימות בתצוגה.
ייבוא pysparkמ-pyspark.sql ייבוא SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( 'רמז לינוקס' ).getOrCreate()
# קרא את קובץ הפרקט לתוך Spark- SQL
linuxhint_spark_app.sql( 'צור תצוגה זמנית Sector2 באמצעות אפשרויות פרקט (נתיב ' part-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parquet ')' )
# שאילתה להצגת כל הרשומות מ-Sector2
linuxhint_spark_app.sql( 'בחר * מ-Sector2' ).הופעה()
תְפוּקָה:
דוגמה 2:
השתמש בתצוגה הקודמת וכתוב את השאילתה כדי להציג את כל הרשומות עם הדירוג 'חם' או 'מגניב'.
# שאילתה להציג את כל הרשומות מ-Sector2 עם דירוג - חם או מגניב.linuxhint_spark_app.sql( 'בחר * מ-Sector2 שבו Rating='Hot' OR Rating='Cool'' ).הופעה()
תְפוּקָה:
ישנם שלושה תקליטים עם הדירוג 'חם' או 'מגניב'.
סיכום
ב-PySpark, הפונקציה write.parquet() כותבת את ה-DataFrame לקובץ הפרקט. הפונקציה read.parquet() קוראת את קובץ הפרקט ל-PySpark DataFrame או לכל DataSource אחר. למדנו איך לקרוא את קובץ הפרקט לתוך PySpark DataFrame ולטבלת PySpark. כחלק ממדריך זה, דנו גם כיצד ליצור את הטבלאות מ-PySpark DataFrame ולסנן את הנתונים באמצעות סעיף WHERE.