Pyspark.sql.DataFrameWriter.saveAsTable()
ראשית, נראה כיצד לכתוב את PySpark DataFrame הקיים לטבלה באמצעות הפונקציה write.saveAsTable() . צריך את שם הטבלה ופרמטרים אופציונליים אחרים כמו modes, partionBy וכו' כדי לכתוב את ה-DataFrame לטבלה. הוא מאוחסן כקובץ פרקט.
תחביר:
dataframe_obj.write.saveAsTable(path/Table_name,mode,partitionBy,...)
- Table_name הוא השם של הטבלה שנוצרה מה-dataframe_obj.
- אנו יכולים לצרף/להחליף את הנתונים של הטבלה באמצעות פרמטר ה-mode.
- ה-partitionBy לוקח את העמודות הבודדות/מרובות כדי ליצור מחיצות המבוססות על ערכים בעמודות שסופקו.
דוגמה 1:
צור PySpark DataFrame עם 5 שורות ו-4 עמודות. כתוב Dataframe זה לטבלה בשם 'Agri_Table1'.
ייבוא pyspark
מ-pyspark.sql ייבוא SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'רמז לינוקס' ).getOrCreate()
# נתוני חקלאות עם 5 שורות ו-5 עמודות
אגרי =[{ 'סוג_קרקע' : 'שָׁחוֹר' , 'זמינות_השקיה' : 'לא' , 'אקרים' : 2500 , 'מצב_קרקע' : 'יָבֵשׁ' ,
'מדינה' : 'ארה'ב' },
{ 'סוג_קרקע' : 'שָׁחוֹר' , 'זמינות_השקיה' : 'כן' , 'אקרים' : 3500 , 'מצב_קרקע' : 'רָטוֹב' ,
'מדינה' : 'הוֹדוּ' },
{ 'סוג_קרקע' : 'אָדוֹם' , 'זמינות_השקיה' : 'כן' , 'אקרים' : 210 , 'מצב_קרקע' : 'יָבֵשׁ' ,
'מדינה' : 'בְּרִיטַנִיָה' },
{ 'סוג_קרקע' : 'אַחֵר' , 'זמינות_השקיה' : 'לא' , 'אקרים' : 1000 , 'מצב_קרקע' : 'רָטוֹב' ,
'מדינה' : 'ארה'ב' },
{ 'סוג_קרקע' : 'חוֹל' , 'זמינות_השקיה' : 'לא' , 'אקרים' : 500 , 'מצב_קרקע' : 'יָבֵשׁ' ,
'מדינה' : 'הוֹדוּ' }]
# צור את מסגרת הנתונים מהנתונים שלמעלה
agri_df = linuxhint_spark_app.createDataFrame(agri)
agri_df.show()
# כתוב את ה-DataFrame לעיל לטבלה.
agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Table1' )
תְפוּקָה:
אנו יכולים לראות שקובץ פרקט אחד נוצר עם ה-PySpark Data הקודם.
דוגמה 2:
שקול את ה-DataFrame הקודם וכתוב את 'Agri_Table2' לטבלה על ידי חלוקת הרשומות על סמך הערכים בעמודה 'מדינה'.
# כתוב את ה-DataFrame לעיל לטבלה עם הפרמטר partitionByagri_df.write.saveAsTable( 'Agri_Table2' ,partitionBy=[ 'מדינה' ])
תְפוּקָה:
ישנם שלושה ערכים ייחודיים בעמודה 'מדינה' - 'הודו', 'בריטניה' ו'ארה'ב. אז נוצרות שלוש מחיצות. כל מחיצה מכילה את קבצי הפרקט.
Pyspark.sql.DataFrameReader.table()
בואו נטען את הטבלה לתוך PySpark DataFrame באמצעות הפונקציה spark.read.table(). זה דורש רק פרמטר אחד שהוא הנתיב/שם הטבלה. זה טוען ישירות את הטבלה לתוך PySpark DataFrame וניתן להחיל את כל פונקציות SQL שמופעלות על PySpark DataFrame גם על DataFrame טעון זה.
תחביר:
spark_app.read.table(path/'Table_name')בתרחיש זה, אנו משתמשים בטבלה הקודמת שנוצרה מ-PySpark DataFrame. ודא שאתה צריך ליישם את קטעי הקוד של התרחיש הקודם בסביבה שלך.
דוגמא:
טען את הטבלה 'Agri_Table1' ל-DataFrame בשם 'loaded_data'.
loaded_data = linuxhint_spark_app.read.table( 'Agri_Table1' )loaded_data.show()
תְפוּקָה:
אנו יכולים לראות שהטבלה נטענת לתוך PySpark DataFrame.
ביצוע שאילתות SQL
כעת, אנו מבצעים כמה שאילתות SQL על ה-DataFrame הנטען באמצעות הפונקציה spark.sql().
# השתמש בפקודה SELECT כדי להציג את כל העמודות מהטבלה לעיל.linuxhint_spark_app.sql( 'בחר * מ-Agri_Table1' ).הופעה()
# סעיף היכן
linuxhint_spark_app.sql( 'בחר * מ-Agri_Table1 WHERE Soil_status='Dry' ' ).הופעה()
linuxhint_spark_app.sql( 'בחר * מ-Agri_Table1 WHERE Acres > 2000 ' ).הופעה()
תְפוּקָה:
- השאילתה הראשונה מציגה את כל העמודות והרשומות מה-DataFrame.
- השאילתה השנייה מציגה את הרשומות על סמך העמודה 'מצב_קרקע'. יש רק שלושה תקליטים עם האלמנט 'יבש'.
- השאילתה האחרונה מחזירה שתי רשומות עם 'אקרים' שגדולות מ-2000.
Pyspark.sql.DataFrameWriter.insertInto()
באמצעות הפונקציה insertInto() אנו יכולים לצרף את ה-DataFrame לטבלה הקיימת. אנו יכולים להשתמש בפונקציה זו יחד עם selectExpr() כדי להגדיר את שמות העמודות ולאחר מכן להכניס אותה לטבלה. פונקציה זו לוקחת גם את tableName כפרמטר.
תחביר:
DataFrame_obj.write.insertInto('Table_name')בתרחיש זה, אנו משתמשים בטבלה הקודמת שנוצרה מ-PySpark DataFrame. ודא שאתה צריך ליישם את קטעי הקוד של התרחיש הקודם בסביבה שלך.
דוגמא:
צור DataFrame חדש עם שתי רשומות והכנס אותן לטבלה 'Agri_Table1'.
ייבוא pysparkמ-pyspark.sql ייבוא SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'רמז לינוקס' ).getOrCreate()
# נתוני חקלאות עם 2 שורות
אגרי =[{ 'סוג_קרקע' : 'חוֹל' , 'זמינות_השקיה' : 'לא' , 'אקרים' : 2500 , 'מצב_קרקע' : 'יָבֵשׁ' ,
'מדינה' : 'ארה'ב' },
{ 'סוג_קרקע' : 'חוֹל' , 'זמינות_השקיה' : 'לא' , 'אקרים' : 1200 , 'מצב_קרקע' : 'רָטוֹב' ,
'מדינה' : 'יפן' }]
# צור את מסגרת הנתונים מהנתונים שלמעלה
agri_df2 = linuxhint_spark_app.createDataFrame(agri)
agri_df2.show()
# write.insertInto()
agri_df2.selectExpr( 'אקרים' , 'מדינה' , 'זמינות_השקיה' , 'סוג_קרקע' ,
'מצב_קרקע' ).write.insertInto( 'Agri_Table1' )
# הצג את Agri_Table1 הסופי
linuxhint_spark_app.sql( 'בחר * מ-Agri_Table1' ).הופעה()
תְפוּקָה:
כעת, המספר הכולל של שורות הקיימות ב-DataFrame הוא 7.
סיכום
כעת אתה מבין כיצד לכתוב את PySpark DataFrame לטבלה באמצעות הפונקציה write.saveAsTable() . זה לוקח את שם הטבלה ופרמטרים אופציונליים אחרים. לאחר מכן, טענו את הטבלה הזו לתוך PySpark DataFrame באמצעות הפונקציה spark.read.table() . זה דורש רק פרמטר אחד שהוא הנתיב/שם הטבלה. אם ברצונך לצרף את ה-DataFrame החדש לטבלה הקיימת, השתמש בפונקציה insertInto() .