PySpark Read.Parquet()

Pyspark Read Parquet



ב-PySpark, הפונקציה write.parquet() כותבת את ה-DataFrame לקובץ הפרקט וה-read.parquet() קורא את קובץ הפרקט ל-PySpark DataFrame או כל DataSource אחר. כדי לעבד את העמודות ב- Apache Spark במהירות וביעילות, עלינו לדחוס את הנתונים. דחיסת נתונים חוסכת את הזיכרון שלנו וכל העמודות מומרות לרמה שטוחה. זה אומר שקיים אחסון בגובה העמודה השטוח. הקובץ המאחסן את אלה ידוע כקובץ PARQUET.

במדריך זה נתמקד בעיקר בקריאה/טעינה של קובץ הפרקט לתוך PySpark DataFrame/SQL באמצעות הפונקציה read.parquet() הזמינה במחלקה pyspark.sql.DataFrameReader.

נושא התוכן:







קבלו את קובץ הפרקט



קרא את קובץ הפרקט ל-PySpark DataFrame



קרא את קובץ הפרקט ל- PySpark SQL





Pyspark.sql.DataFrameReader.parquet()

פונקציה זו משמשת לקריאת קובץ הפרקט ולטעינתו לתוך PySpark DataFrame. זה לוקח את הנתיב/שם הקובץ של קובץ הפרקט. אנחנו יכולים פשוט להשתמש בפונקציה read.parquet() מכיוון שזו הפונקציה הגנרית.

תחביר:



בוא נראה את התחביר של read.parquet():

spark_app.read.parquet(file_name.parquet/path)

ראשית, התקן את מודול PySpark באמצעות הפקודה pip:

pip להתקין pyspark

קבלו את קובץ הפרקט

כדי לקרוא קובץ פרקט צריך את הנתונים שבהם נוצר קובץ הפרקט מאותם נתונים. בחלק זה נראה כיצד ליצור קובץ פרקט מה-PySpark DataFrame.

בואו ניצור PySpark DataFrame עם 5 רשומות ונכתוב את זה לקובץ הפרקט 'industry_parquet'.

ייבוא ​​pyspark

מ-pyspark.sql ייבוא ​​SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'רמז לינוקס' ).getOrCreate()

# צור את מסגרת הנתונים המאחסנת את פרטי התעשייה

industry_df = linuxhint_spark_app.createDataFrame([Row(Type= 'חַקלָאוּת' ,אזור= 'ארה'ב' ,
דירוג= 'חַם' ,סה'כ_עובדים= 100 ),

Row(Type= 'חַקלָאוּת' ,אזור= 'הוֹדוּ' ,דירוג= 'חַם' ,סה'כ_עובדים= 200 ),

Row(Type= 'התפתחות' ,אזור= 'ארה'ב' ,דירוג= 'נעים' ,סה'כ_עובדים= 100 ),

Row(Type= 'חינוך' ,אזור= 'ארה'ב' ,דירוג= 'מגניב' ,סה'כ_עובדים= 400 ),

Row(Type= 'חינוך' ,אזור= 'ארה'ב' ,דירוג= 'נעים' ,סה'כ_עובדים= עשרים )

])

# מסגרת נתונים בפועל

industry_df.show()

# כתוב את industry_df לקובץ הפרקט

industry_df.coalesce( 1 ).write.parquet( 'פרקט_תעשייה' )

תְפוּקָה:

זהו ה-DataFrame שמחזיק 5 רשומות.

נוצר קובץ פרקט עבור ה-DataFrame הקודם. כאן, שם הקובץ שלנו עם סיומת הוא 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet'. אנו משתמשים בקובץ זה בכל המדריך.

קרא את קובץ הפרקט ל-PySpark DataFrame

יש לנו את קובץ הפרקט. בואו נקרא את הקובץ הזה באמצעות הפונקציה read.parquet() ונטען אותו לתוך PySpark DataFrame.

ייבוא ​​pyspark

מ-pyspark.sql ייבוא ​​SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'רמז לינוקס' ).getOrCreate()

# קרא את קובץ הפרקט לתוך אובייקט dataframe_from_parquet.

dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )

# הצג את ה-dataframe_from_parquet-DataFrame

dataframe_from_parquet.show()

תְפוּקָה:

אנו מציגים את ה-DataFrame באמצעות שיטת show() אשר נוצרה מקובץ הפרקט.

שאילתות SQL עם קובץ פרקט

לאחר טעינה ל-DataFrame, ניתן יהיה ליצור את טבלאות SQL ולהציג את הנתונים הקיימים ב-DataFrame. עלינו ליצור TEMPORARY VIEW ולהשתמש בפקודות SQL כדי להחזיר את הרשומות מה-DataFrame שנוצר מקובץ הפרקט.

דוגמה 1:

צור תצוגה זמנית בשם 'Sectors' והשתמש בפקודה SELECT כדי להציג את הרשומות ב-DataFrame. אתה יכול להתייחס לזה הדרכה שמסביר כיצד ליצור VIEW ב-Spark – SQL.

ייבוא ​​pyspark

מ-pyspark.sql ייבוא ​​SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'רמז לינוקס' ).getOrCreate()

# קרא את קובץ הפרקט לתוך אובייקט dataframe_from_parquet.

dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )

# צור תצוגה מקובץ הפרקט הנ'ל בשם - 'מגזרים'

dataframe_from_parquet.createOrReplaceTempView( 'מגזרים' )

# שאילתה להצגת כל הרשומות מהמגזרים

linuxhint_spark_app.sql( 'בחר * ממגזרים' ).הופעה()

תְפוּקָה:

דוגמה 2:

באמצעות התצוגה הקודמת, כתוב את שאילתת SQL:

  1. כדי להציג את כל הרשומות מהמגזרים השייכים ל'הודו'.
  2. להציג את כל הרשומות מהמגזרים עם עובד שגדול מ-100.
# שאילתה להצגת כל הרשומות מהמגזרים השייכים ל'הודו'.

linuxhint_spark_app.sql( 'בחר * ממגזרים שבהם Area='India'' ).הופעה()

# שאילתה להציג את כל הרשומות מהמגזרים עם עובדים מעל 100

linuxhint_spark_app.sql( 'בחר * מתוך סקטורים שבהם Total_employees>100' ).הופעה()

תְפוּקָה:

יש רק שיא אחד עם אזור שהוא 'הודו' ושני רשומות עם עובדים שהוא יותר מ-100.

קרא את קובץ הפרקט ל- PySpark SQL

ראשית, עלינו ליצור VIEW באמצעות הפקודה CREATE. באמצעות מילת המפתח 'נתיב' בתוך שאילתת SQL, נוכל לקרוא את קובץ הפרקט ל-Spark SQL. לאחר הנתיב, עלינו לציין את שם הקובץ/מיקום הקובץ.

תחביר:

spark_app.sql( 'צור תצוגה זמנית view_name באמצעות אפשרויות פרקט (נתיב ' file_name.parquet ')' )

דוגמה 1:

צור תצוגה זמנית בשם 'Sector2' וקרא את קובץ הפרקט לתוכו. באמצעות הפונקציה sql(), כתוב את שאילתת הבחירה כדי להציג את כל הרשומות הקיימות בתצוגה.

ייבוא ​​pyspark

מ-pyspark.sql ייבוא ​​SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'רמז לינוקס' ).getOrCreate()

# קרא את קובץ הפרקט לתוך Spark- SQL

linuxhint_spark_app.sql( 'צור תצוגה זמנית Sector2 באמצעות אפשרויות פרקט (נתיב ' part-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parquet ')' )

# שאילתה להצגת כל הרשומות מ-Sector2

linuxhint_spark_app.sql( 'בחר * מ-Sector2' ).הופעה()

תְפוּקָה:

דוגמה 2:

השתמש בתצוגה הקודמת וכתוב את השאילתה כדי להציג את כל הרשומות עם הדירוג 'חם' או 'מגניב'.

# שאילתה להציג את כל הרשומות מ-Sector2 עם דירוג - חם או מגניב.

linuxhint_spark_app.sql( 'בחר * מ-Sector2 שבו Rating='Hot' OR Rating='Cool'' ).הופעה()

תְפוּקָה:

ישנם שלושה תקליטים עם הדירוג 'חם' או 'מגניב'.

סיכום

ב-PySpark, הפונקציה write.parquet() כותבת את ה-DataFrame לקובץ הפרקט. הפונקציה read.parquet() קוראת את קובץ הפרקט ל-PySpark DataFrame או לכל DataSource אחר. למדנו איך לקרוא את קובץ הפרקט לתוך PySpark DataFrame ולטבלת PySpark. כחלק ממדריך זה, דנו גם כיצד ליצור את הטבלאות מ-PySpark DataFrame ולסנן את הנתונים באמצעות סעיף WHERE.