כיצד לקרוא ולכתוב נתוני טבלה ב- PySpark

Kyzd Lqrw Wlktwb Ntwny Tblh B Pyspark



עיבוד הנתונים ב- PySpark מהיר יותר אם הנתונים נטענים בצורה של טבלה. עם זה, באמצעות ה-SQl Expressions, העיבוד יהיה מהיר. לכן, המרת PySpark DataFrame/RDD לטבלה לפני שליחתו לעיבוד היא הגישה הטובה יותר. היום נראה כיצד לקרוא את נתוני הטבלה לתוך PySpark DataFrame, לכתוב את PySpark DataFrame לטבלה ולהכניס את DataFrame חדש לטבלה הקיימת באמצעות הפונקציות המובנות. בוא נלך!

Pyspark.sql.DataFrameWriter.saveAsTable()

ראשית, נראה כיצד לכתוב את PySpark DataFrame הקיים לטבלה באמצעות הפונקציה write.saveAsTable() . צריך את שם הטבלה ופרמטרים אופציונליים אחרים כמו modes, partionBy וכו' כדי לכתוב את ה-DataFrame לטבלה. הוא מאוחסן כקובץ פרקט.

תחביר:







dataframe_obj.write.saveAsTable(path/Table_name,mode,partitionBy,...)
  1. Table_name הוא השם של הטבלה שנוצרה מה-dataframe_obj.
  2. אנו יכולים לצרף/להחליף את הנתונים של הטבלה באמצעות פרמטר ה-mode.
  3. ה-partitionBy לוקח את העמודות הבודדות/מרובות כדי ליצור מחיצות המבוססות על ערכים בעמודות שסופקו.

דוגמה 1:

צור PySpark DataFrame עם 5 שורות ו-4 עמודות. כתוב Dataframe זה לטבלה בשם 'Agri_Table1'.



ייבוא ​​pyspark

מ-pyspark.sql ייבוא ​​SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'רמז לינוקס' ).getOrCreate()

# נתוני חקלאות עם 5 שורות ו-5 עמודות

אגרי =[{ 'סוג_קרקע' : 'שָׁחוֹר' , 'זמינות_השקיה' : 'לא' , 'אקרים' : 2500 , 'מצב_קרקע' : 'יָבֵשׁ' ,
'מדינה' : 'ארה'ב' },

{ 'סוג_קרקע' : 'שָׁחוֹר' , 'זמינות_השקיה' : 'כן' , 'אקרים' : 3500 , 'מצב_קרקע' : 'רָטוֹב' ,
'מדינה' : 'הוֹדוּ' },

{ 'סוג_קרקע' : 'אָדוֹם' , 'זמינות_השקיה' : 'כן' , 'אקרים' : 210 , 'מצב_קרקע' : 'יָבֵשׁ' ,
'מדינה' : 'בְּרִיטַנִיָה' },

{ 'סוג_קרקע' : 'אַחֵר' , 'זמינות_השקיה' : 'לא' , 'אקרים' : 1000 , 'מצב_קרקע' : 'רָטוֹב' ,
'מדינה' : 'ארה'ב' },

{ 'סוג_קרקע' : 'חוֹל' , 'זמינות_השקיה' : 'לא' , 'אקרים' : 500 , 'מצב_קרקע' : 'יָבֵשׁ' ,
'מדינה' : 'הוֹדוּ' }]



# צור את מסגרת הנתונים מהנתונים שלמעלה

agri_df = linuxhint_spark_app.createDataFrame(agri)

agri_df.show()

# כתוב את ה-DataFrame לעיל לטבלה.

agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Table1' )

תְפוּקָה:







אנו יכולים לראות שקובץ פרקט אחד נוצר עם ה-PySpark Data הקודם.



דוגמה 2:

שקול את ה-DataFrame הקודם וכתוב את 'Agri_Table2' לטבלה על ידי חלוקת הרשומות על סמך הערכים בעמודה 'מדינה'.

# כתוב את ה-DataFrame לעיל לטבלה עם הפרמטר partitionBy

agri_df.write.saveAsTable( 'Agri_Table2' ,partitionBy=[ 'מדינה' ])

תְפוּקָה:

ישנם שלושה ערכים ייחודיים בעמודה 'מדינה' - 'הודו', 'בריטניה' ו'ארה'ב. אז נוצרות שלוש מחיצות. כל מחיצה מכילה את קבצי הפרקט.

Pyspark.sql.DataFrameReader.table()

בואו נטען את הטבלה לתוך PySpark DataFrame באמצעות הפונקציה spark.read.table(). זה דורש רק פרמטר אחד שהוא הנתיב/שם הטבלה. זה טוען ישירות את הטבלה לתוך PySpark DataFrame וניתן להחיל את כל פונקציות SQL שמופעלות על PySpark DataFrame גם על DataFrame טעון זה.

תחביר:

spark_app.read.table(path/'Table_name')

בתרחיש זה, אנו משתמשים בטבלה הקודמת שנוצרה מ-PySpark DataFrame. ודא שאתה צריך ליישם את קטעי הקוד של התרחיש הקודם בסביבה שלך.

דוגמא:

טען את הטבלה 'Agri_Table1' ל-DataFrame בשם 'loaded_data'.

loaded_data = linuxhint_spark_app.read.table( 'Agri_Table1' )

loaded_data.show()

תְפוּקָה:

אנו יכולים לראות שהטבלה נטענת לתוך PySpark DataFrame.

ביצוע שאילתות SQL

כעת, אנו מבצעים כמה שאילתות SQL על ה-DataFrame הנטען באמצעות הפונקציה spark.sql().

# השתמש בפקודה SELECT כדי להציג את כל העמודות מהטבלה לעיל.

linuxhint_spark_app.sql( 'בחר * מ-Agri_Table1' ).הופעה()

# סעיף היכן

linuxhint_spark_app.sql( 'בחר * מ-Agri_Table1 WHERE Soil_status='Dry' ' ).הופעה()

linuxhint_spark_app.sql( 'בחר * מ-Agri_Table1 WHERE Acres > 2000 ' ).הופעה()

תְפוּקָה:

  1. השאילתה הראשונה מציגה את כל העמודות והרשומות מה-DataFrame.
  2. השאילתה השנייה מציגה את הרשומות על סמך העמודה 'מצב_קרקע'. יש רק שלושה תקליטים עם האלמנט 'יבש'.
  3. השאילתה האחרונה מחזירה שתי רשומות עם 'אקרים' שגדולות מ-2000.

Pyspark.sql.DataFrameWriter.insertInto()

באמצעות הפונקציה insertInto() אנו יכולים לצרף את ה-DataFrame לטבלה הקיימת. אנו יכולים להשתמש בפונקציה זו יחד עם selectExpr() כדי להגדיר את שמות העמודות ולאחר מכן להכניס אותה לטבלה. פונקציה זו לוקחת גם את tableName כפרמטר.

תחביר:

DataFrame_obj.write.insertInto('Table_name')

בתרחיש זה, אנו משתמשים בטבלה הקודמת שנוצרה מ-PySpark DataFrame. ודא שאתה צריך ליישם את קטעי הקוד של התרחיש הקודם בסביבה שלך.

דוגמא:

צור DataFrame חדש עם שתי רשומות והכנס אותן לטבלה 'Agri_Table1'.

ייבוא ​​pyspark

מ-pyspark.sql ייבוא ​​SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'רמז לינוקס' ).getOrCreate()

# נתוני חקלאות עם 2 שורות

אגרי =[{ 'סוג_קרקע' : 'חוֹל' , 'זמינות_השקיה' : 'לא' , 'אקרים' : 2500 , 'מצב_קרקע' : 'יָבֵשׁ' ,
'מדינה' : 'ארה'ב' },

{ 'סוג_קרקע' : 'חוֹל' , 'זמינות_השקיה' : 'לא' , 'אקרים' : 1200 , 'מצב_קרקע' : 'רָטוֹב' ,
'מדינה' : 'יפן' }]

# צור את מסגרת הנתונים מהנתונים שלמעלה

agri_df2 = linuxhint_spark_app.createDataFrame(agri)

agri_df2.show()

# write.insertInto()

agri_df2.selectExpr( 'אקרים' , 'מדינה' , 'זמינות_השקיה' , 'סוג_קרקע' ,
'מצב_קרקע' ).write.insertInto( 'Agri_Table1' )

# הצג את Agri_Table1 הסופי

linuxhint_spark_app.sql( 'בחר * מ-Agri_Table1' ).הופעה()

תְפוּקָה:

כעת, המספר הכולל של שורות הקיימות ב-DataFrame הוא 7.

סיכום

כעת אתה מבין כיצד לכתוב את PySpark DataFrame לטבלה באמצעות הפונקציה write.saveAsTable() . זה לוקח את שם הטבלה ופרמטרים אופציונליים אחרים. לאחר מכן, טענו את הטבלה הזו לתוך PySpark DataFrame באמצעות הפונקציה spark.read.table() . זה דורש רק פרמטר אחד שהוא הנתיב/שם הטבלה. אם ברצונך לצרף את ה-DataFrame החדש לטבלה הקיימת, השתמש בפונקציה insertInto() .