המרת PySpark DataFrame ל-JSON

Hmrt Pyspark Dataframe L Json



העברת נתונים מובנית באמצעות JSON אפשרית וגם גוזלת זיכרון נמוך. בהשוואה ל-PySpark RDD או PySpark DataFrame, JSON צורך זיכרון נמוך וסידרה מה שאפשר עם JSON. אנו מסוגלים להמיר את PySpark DataFrame ל-JSON באמצעות שיטת pyspark.sql.DataFrameWriter.json() . מלבדו, ישנן שתי דרכים נוספות להמיר את ה-DataFrame ל-JSON.

נושא התוכן:

בואו נשקול PySpark DataFrame פשוט בכל הדוגמאות ונמיר אותו ל-JSON באמצעות הפונקציות שהוזכרו.







מודול נדרש:

התקן את ספריית PySpark בסביבה שלך אם היא עדיין לא מותקנת. אתה יכול לעיין בפקודה הבאה כדי להתקין אותה:



pip להתקין pyspark

PySpark DataFrame ל-JSON באמצעות To_json() עם ToPandas()

השיטה to_json() זמינה במודול Pandas הממיר את Pandas DataFrame ל-JSON. נוכל להשתמש בשיטה זו אם נמיר את PySpark DataFrame שלנו ל-Pandas DataFrame. על מנת להמיר את PySpark DataFrame ל-Pandas DataFrame, נעשה שימוש בשיטת toPandas() . בוא נראה את התחביר של to_json() יחד עם הפרמטרים שלו.



תחביר:





dataframe_object.toPandas().to_json(orient,index,...)
  1. Orient משמש להצגת ה-JSON שהומר כפורמט הרצוי. זה דורש 'רשומות', 'טבלה', 'ערכים', 'עמודות', 'אינדקס', 'פיצול'.
  2. אינדקס משמש לכלול/הסרה של האינדקס ממחרוזת ה-JSON שהומרה. אם הוא מוגדר ל'True', המדדים יוצגו. אחרת, המדדים לא יוצגו אם הכיוון הוא 'מפוצל' או 'טבלה'.

דוגמה 1: כיוון כ'תקליטים'

צור 'skills_df' PySpark DataFrame עם 3 שורות ו-4 עמודות. המר את ה-DataFrame הזה ל-JSON על-ידי ציון הפרמטר orient בתור 'רשומות'.

ייבוא ​​pyspark

לייבא פנדות

מ-pyspark.sql ייבוא ​​SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'רמז לינוקס' ).getOrCreate()

# נתוני כישורים עם 3 שורות ו-4 עמודות

כישורים =[{ 'תְעוּדַת זֶהוּת' : 123 , 'אדם' : 'דבש' , 'מְיוּמָנוּת' : 'צִיוּר' , 'פרס' : 25,000 },

{ 'תְעוּדַת זֶהוּת' : 112 , 'אדם' : 'מוני' , 'מְיוּמָנוּת' : 'לִרְקוֹד' , 'פרס' : 2000 },

{ 'תְעוּדַת זֶהוּת' : 153 , 'אדם' : 'טולסי' , 'מְיוּמָנוּת' : 'קריאה' , 'פרס' : 1200 }

]

# צור את מסגרת הנתונים של כישורים מהנתונים שלעיל

skills_df = linuxhint_spark_app.createDataFrame(skills)

# נתוני כישורים בפועל

skills_df.show()

# המר ל-JSON באמצעות to_json() עם orient בתור 'רשומות'

json_skills_data = skills_df.toPandas().to_json(orient= 'רשומות' )

print(json_skills_data)

תְפוּקָה:



+---+------+-----+--------+

| id|person|prize| מיומנות|

+---+------+-----+--------+

| 123 | מותק| 25,000 |ציור|

| 112 | מוני| 2000 | לרקוד|

| 153 |טולסי| 1200 | קריאה|

+---+------+-----+--------+

[{ 'תְעוּדַת זֶהוּת' : 123 , 'אדם' : 'דבש' , 'פרס' : 25,000 , 'מְיוּמָנוּת' : 'צִיוּר' },{ 'תְעוּדַת זֶהוּת' : 112 , 'אדם' : 'מוני' , 'פרס' : 2000 , 'מְיוּמָנוּת' : 'לִרְקוֹד' },{ 'תְעוּדַת זֶהוּת' : 153 , 'אדם' : 'טולסי' , 'פרס' : 1200 , 'מְיוּמָנוּת' : 'קריאה' }]

אנו יכולים לראות שה-PySpark DataFrame מומר למערך JSON עם מילון ערכים. כאן, המפתחות מייצגים את שם העמודה והערך מייצג את ערך השורה/תא ב-PySpark DataFrame.

דוגמה 2: כיוון כ'פיצול'

פורמט ה-JSON שמוחזר על ידי ה-'פיצול' כולל את שמות העמודות שיש להם רשימה של עמודות, רשימת אינדקס ורשימת נתונים. להלן הפורמט של האוריינט ה'מפוצל'.

# המר ל-JSON באמצעות to_json() עם orient בתור 'פיצול'

json_skills_data = skills_df.toPandas().to_json(orient= 'לְפַצֵל' )

print(json_skills_data)

תְפוּקָה:

{ 'עמודות' :[ 'תְעוּדַת זֶהוּת' , 'אדם' , 'פרס' , 'מְיוּמָנוּת' ], 'אינדקס' :[ 0 , 1 , 2 ], 'נתונים' :[[ 123 , 'דבש' , 25,000 , 'צִיוּר' ],[ 112 , 'מוני' , 2000 , 'לִרְקוֹד' ],[ 153 , 'טולסי' , 1200 , 'קריאה' ]]}

דוגמה 3: כיוון כ'אינדקס'

כאן, כל שורה מ-PySpark DataFrame מושבתת בצורה של מילון עם המפתח כשם העמודה. עבור כל מילון, מיקום האינדקס מצוין כמפתח.

# המר ל-JSON באמצעות to_json() עם orient בתור 'index'

json_skills_data = skills_df.toPandas().to_json(orient= 'אינדקס' )

print(json_skills_data)

תְפוּקָה:

{ '0' :{ 'תְעוּדַת זֶהוּת' : 123 , 'אדם' : 'דבש' , 'פרס' : 25,000 , 'מְיוּמָנוּת' : 'צִיוּר' }, '1' :{ 'תְעוּדַת זֶהוּת' : 112 , 'אדם' : 'מוני' , 'פרס' : 2000 , 'מְיוּמָנוּת' : 'לִרְקוֹד' }, '2' :{ 'תְעוּדַת זֶהוּת' : 153 , 'אדם' : 'טולסי' , 'פרס' : 1200 , 'מְיוּמָנוּת' : 'קריאה' }}

דוגמה 4: כיוון כ'עמודות'

עמודות הן המפתח לכל רשומה. כל עמודה מכילה מילון שלוקח את ערכי העמודות עם מספרי אינדקס.

# המר ל-JSON באמצעות to_json() עם orient בתור 'עמודות'

json_skills_data = skills_df.toPandas().to_json(orient= 'עמודות' )

print(json_skills_data)

תְפוּקָה:

{ 'תְעוּדַת זֶהוּת' :{ '0' : 123 , '1' : 112 , '2' : 153 }, 'אדם' :{ '0' : 'דבש' , '1' : 'מוני' , '2' : 'טולסי' }, 'פרס' :{ '0' : 25,000 , '1' : 2000 , '2' : 1200 }, 'מְיוּמָנוּת' :{ '0' : 'צִיוּר' , '1' : 'לִרְקוֹד' , '2' : 'קריאה' }}

דוגמה 5: כיוון כ'ערכים'

אם אתה צריך רק את הערכים ב-JSON, אתה יכול ללכת על הכיוון 'ערכים'. הוא מציג כל שורה ברשימה. לבסוף, כל הרשימות מאוחסנות ברשימה. JSON זה הוא מסוג רשימה מקוננת.

# המר ל-JSON באמצעות to_json() עם orient בתור 'values'

json_skills_data = skills_df.toPandas().to_json(orient= 'ערכים' )

print(json_skills_data)

תְפוּקָה:

[[ 123 , 'דבש' , 25,000 , 'צִיוּר' ],[ 112 , 'מוני' , 2000 , 'לִרְקוֹד' ],[ 153 , 'טולסי' , 1200 , 'קריאה' ]]

דוגמה 6: כיוון כ'שולחן'

כיוון ה'טבלה' מחזיר את ה-JSON הכולל את הסכימה עם שמות השדות יחד עם סוגי הנתונים של העמודות, האינדקס כמפתח ראשי וגרסת Pandas. שמות העמודות עם הערכים מוצגים כ'נתונים'.

# המר ל-JSON באמצעות to_json() עם orient בתור 'table'

json_skills_data = skills_df.toPandas().to_json(orient= 'שולחן' )

print(json_skills_data)

תְפוּקָה:

{ 'סכֵימָה' :{ 'שדות' :[{ 'שֵׁם' : 'אינדקס' , 'סוּג' : 'מספר שלם' },{ 'שֵׁם' : 'תְעוּדַת זֶהוּת' , 'סוּג' : 'מספר שלם' },{ 'שֵׁם' : 'אדם' , 'סוּג' : 'חוּט' },{ 'שֵׁם' : 'פרס' , 'סוּג' : 'מספר שלם' },{ 'שֵׁם' : 'מְיוּמָנוּת' , 'סוּג' : 'חוּט' }], 'מפתח ראשי' :[ 'אינדקס' ], 'גרסה_פנדה' : '1.4.0' }, 'נתונים' :[{ 'אינדקס' : 0 , 'תְעוּדַת זֶהוּת' : 123 , 'אדם' : 'דבש' , 'פרס' : 25,000 , 'מְיוּמָנוּת' : 'צִיוּר' },{ 'אינדקס' : 1 , 'תְעוּדַת זֶהוּת' : 112 , 'אדם' : 'מוני' , 'פרס' : 2000 , 'מְיוּמָנוּת' : 'לִרְקוֹד' },{ 'אינדקס' : 2 , 'תְעוּדַת זֶהוּת' : 153 , 'אדם' : 'טולסי' , 'פרס' : 1200 , 'מְיוּמָנוּת' : 'קריאה' }]}

דוגמה 7: עם פרמטר אינדקס

ראשית, אנו מעבירים את פרמטר האינדקס על ידי הגדרתו ל'True'. תראה עבור כל ערך עמודה שמיקום האינדקס מוחזר כמפתח במילון.

בפלט השני, רק שמות העמודות ('עמודות') והרשומות ('נתונים') מוחזרים ללא מיקומי האינדקס מכיוון שהאינדקס מוגדר כ'שקר'.

# המר ל-JSON באמצעות to_json() עם index=True

json_skills_data = skills_df.toPandas().to_json(index=True)

print(json_skills_data, ' \n ' )

# המר ל-JSON באמצעות to_json() עם index=False

json_skills_data= skills_df.toPandas().to_json(index=False,orient= 'לְפַצֵל' )

print(json_skills_data)

תְפוּקָה:

{ 'תְעוּדַת זֶהוּת' :{ '0' : 123 , '1' : 112 , '2' : 153 }, 'אדם' :{ '0' : 'דבש' , '1' : 'מוני' , '2' : 'טולסי' }, 'פרס' :{ '0' : 25,000 , '1' : 2000 , '2' : 1200 }, 'מְיוּמָנוּת' :{ '0' : 'צִיוּר' , '1' : 'לִרְקוֹד' , '2' : 'קריאה' }}

{ 'עמודות' :[ 'תְעוּדַת זֶהוּת' , 'אדם' , 'פרס' , 'מְיוּמָנוּת' ], 'נתונים' :[[ 123 , 'דבש' , 25,000 , 'צִיוּר' ],[ 112 , 'מוני' , 2000 , 'לִרְקוֹד' ],[ 153 , 'טולסי' , 1200 , 'קריאה' ]]

PySpark DataFrame ל-JSON באמצעות ToJSON()

השיטה toJSON() משמשת להמרת ה-PySpark DataFrame לאובייקט JSON. בעיקרון, הוא מחזיר מחרוזת JSON המוקפת ברשימה. ה [‘{עמודה:ערך,…}’,…. ] הוא הפורמט שמוחזר על ידי פונקציה זו. כאן, כל שורה מה-PySpark DataFrame מוחזרת כמילון עם שם העמודה כמפתח.

תחביר:

dataframe_object.toJSON()

זה יכול להיות אפשרי להעביר את הפרמטרים כמו האינדקס, תוויות העמודות וסוג הנתונים.

דוגמא:

צור 'skills_df' PySpark DataFrame עם 5 שורות ו-4 עמודות. המרת DataFrame זה ל-JSON באמצעות שיטת toJSON()‎.

ייבוא ​​pyspark

מ-pyspark.sql ייבוא ​​SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'רמז לינוקס' ).getOrCreate()

# נתוני כישורים עם 5 שורות ו-4 עמודות

כישורים =[{ 'תְעוּדַת זֶהוּת' : 123 , 'אדם' : 'דבש' , 'מְיוּמָנוּת' : 'צִיוּר' , 'פרס' : 25,000 },

{ 'תְעוּדַת זֶהוּת' : 112 , 'אדם' : 'מוני' , 'מְיוּמָנוּת' : 'מוזיקה/ריקוד' , 'פרס' : 2000 },

{ 'תְעוּדַת זֶהוּת' : 153 , 'אדם' : 'טולסי' , 'מְיוּמָנוּת' : 'קריאה' , 'פרס' : 1200 },

{ 'תְעוּדַת זֶהוּת' : 173 , 'אדם' : 'רץ' , 'מְיוּמָנוּת' : 'מוּסִיקָה' , 'פרס' : 2000 },

{ 'תְעוּדַת זֶהוּת' : 43 , 'אדם' : 'קמאלה' , 'מְיוּמָנוּת' : 'קריאה' , 'פרס' : 10000 }

]

# צור את מסגרת הנתונים של כישורים מהנתונים שלעיל

skills_df = linuxhint_spark_app.createDataFrame(skills)

# נתוני כישורים בפועל

skills_df.show()

# המר למערך JSON

json_skills_data = skills_df.toJSON().collect()

print(json_skills_data)

תְפוּקָה:

+---+------+-----+---------+

| id|person|prize| מיומנות|

+---+------+-----+---------+

| 123 | מותק| 25,000 | ציור|

| 112 | מוני| 2000 |מוזיקה/ריקוד|

| 153 |טולסי| 1200 | קריאה|

| 173 | רץ| 2000 | מוזיקה|

| 43 |קמאלה| 10000 | קריאה|

+---+------+-----+---------+

[ '{'id':123,'person':'מותק','prize':25000,'skill':'painting'}' , '{'id':112,'person':'Mouni','prize':2000,'skill':'music/dance'}' , '{'id':153,'person':'Tulasi','prize':1200,'skill':'reading'}' , '{'id':173,'person':'Ran','prize':2000,'skill':'music'}' , '{'id':43,'person':'קמאלה','prize':10000,'skill':'reading'}' ]

ישנן 5 שורות ב-PySpark DataFrame. כל 5 השורות הללו מוחזרות כמילון של מחרוזות המופרדות בפסיק.

PySpark DataFrame ל-JSON באמצעות Write.json()

השיטה write.json() זמינה ב-PySpark אשר כותבת/שומרת את PySpark DataFrame לקובץ JSON. זה לוקח את שם הקובץ/נתיב כפרמטר. בעיקרון, זה מחזיר את ה-JSON במספר קבצים (קבצים מחולקים). כדי למזג את כולם בקובץ אחד, נוכל להשתמש בשיטת coalesce() .

תחביר:

dataframe_object.coalesce( 1 ).write.json('file_name')
  1. מצב הוספה - dataframe_object.write.mode('append').json('file_name')
  2. מצב כתיבה - dataframe_object.write.mode(‘overwrite’).json(‘file_name’)

זה יכול להיות אפשרי להוסיף/לחליף את ה-JSON הקיים. באמצעות ה-writer.mode(), נוכל להוסיף את הנתונים על ידי העברת 'append' או להחליף את נתוני ה-JSON הקיימים על ידי העברת 'overwrite' לפונקציה זו.

דוגמה 1:

צור 'skills_df' PySpark DataFrame עם 3 שורות ו-4 עמודות. כתוב את ה-DataFrame הזה ל-JSON.

ייבוא ​​pyspark

לייבא פנדות

מ-pyspark.sql ייבוא ​​SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'רמז לינוקס' ).getOrCreate()

# נתוני מיומנויות עם 3 שורות ו-4 עמודות

כישורים =[{ 'תְעוּדַת זֶהוּת' : 123 , 'אדם' : 'דבש' , 'מְיוּמָנוּת' : 'צִיוּר' , 'פרס' : 25,000 },

{ 'תְעוּדַת זֶהוּת' : 112 , 'אדם' : 'מוני' , 'מְיוּמָנוּת' : 'לִרְקוֹד' , 'פרס' : 2000 },

{ 'תְעוּדַת זֶהוּת' : 153 , 'אדם' : 'טולסי' , 'מְיוּמָנוּת' : 'קריאה' , 'פרס' : 1200 }

]

# צור את מסגרת הנתונים של כישורים מהנתונים שלעיל

skills_df = linuxhint_spark_app.createDataFrame(skills)

# write.json()

skills_df.coalesce( 1 ).write.json( 'נתוני_מיומנויות' )

קובץ JSON:

אנו יכולים לראות שהתיקיה skills_data כוללת את נתוני ה-JSON המחולקים.

בואו נפתח את קובץ JSON. אנו יכולים לראות שכל השורות מה-PySpark DataFrame מומרות ל-JSON.

ישנן 5 שורות ב-PySpark DataFrame. כל 5 השורות הללו מוחזרות כמילון של מחרוזות המופרדות בפסיק.

דוגמה 2:

צור 'skills2_df' PySpark DataFrame עם שורה אחת. הוסף שורה אחת לקובץ ה-JSON הקודם על-ידי ציון המצב בתור 'הוסף'.

ייבוא ​​pyspark

לייבא פנדות

מ-pyspark.sql ייבוא ​​SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'רמז לינוקס' ).getOrCreate()

מיומנויות2 =[{ 'תְעוּדַת זֶהוּת' : 78 , 'אדם' : 'מרי' , 'מְיוּמָנוּת' : 'רכיבה' , 'פרס' : 8960 }

]

# צור את מסגרת הנתונים של כישורים מהנתונים שלעיל

skills2_df = linuxhint_spark_app.createDataFrame(skills2)

# write.json() עם מצב הוספה.

skills2_df.write.mode( 'לְצַרֵף' ).json( 'נתוני_מיומנויות' )

קובץ JSON:

אנו יכולים לראות את קבצי ה-JSON המחולקים למחיצות. הקובץ הראשון מכיל את רשומות ה-DataFrame הראשונות והקובץ השני מכיל את רשומת ה-DataFrame השנייה.

סיכום

ישנן שלוש דרכים שונות להמיר את PySpark DataFrame ל-JSON. ראשית, דנו בשיטת to_json() הממירה ל-JSON על ידי המרת PySpark DataFrame ל-Pandas DataFrame עם דוגמאות שונות על ידי התחשבות בפרמטרים שונים. לאחר מכן, השתמשנו בשיטת toJSON() . לבסוף, למדנו כיצד להשתמש בפונקציה write.json() כדי לכתוב את PySpark DataFrame ל-JSON. הוספה והחלפה אפשריים עם פונקציה זו.