PySpark Pandas_Udf()

Pyspark Pandas Udf



שינוי ה-PySpark DataFrame אפשרי באמצעות הפונקציה pandas_udf()‎. זוהי פונקציה המוגדרת על ידי משתמש המוחלת על PySpark DataFrame עם חץ. אנו יכולים לבצע את הפעולות הווקטוריות באמצעות ה- pandas_udf(). ניתן ליישם אותו על ידי העברת פונקציה זו כדקורטור. בואו נצלול למדריך זה כדי להכיר את התחביר, הפרמטרים והדוגמאות השונות.

נושא התוכן:

אם אתה רוצה לדעת על PySpark DataFrame והתקנת המודול, עבור על זה מאמר .







Pyspark.sql.functions.pandas_udf()‎

ה-pandas_udf () זמין במודול sql.functions ב-PySpark שניתן לייבא באמצעות מילת המפתח 'מאת'. הוא משמש לביצוע הפעולות הווקטוריות ב-PySpark DataFrame שלנו. פונקציה זו מיושמת כמו דקורטור על ידי העברת שלושה פרמטרים. לאחר מכן, נוכל ליצור פונקציה מוגדרת על ידי משתמש המחזירה את הנתונים בפורמט הוקטור (כמו שאנו משתמשים ב-series/NumPy עבור זה) באמצעות חץ. בתוך פונקציה זו, אנו מסוגלים להחזיר את התוצאה.



מבנה ותחביר:



ראשית, בואו נסתכל על המבנה והתחביר של פונקציה זו:

@pandas_udf(datatype)
def function_name(פעולה) -> convert_format:
הצהרת החזרה

כאן, ה-function_name הוא השם של הפונקציה המוגדרת שלנו. סוג הנתונים מציין את סוג הנתונים שמוחזר על ידי פונקציה זו. נוכל להחזיר את התוצאה באמצעות מילת המפתח 'החזרה'. כל הפעולות מבוצעות בתוך הפונקציה עם הקצאת החצים.





Pandas_udf (פונקציה ו-ReturType)

  1. הפרמטר הראשון הוא הפונקציה המוגדרת על ידי המשתמש שמועברת אליו.
  2. הפרמטר השני משמש לציון סוג נתוני ההחזרה מהפונקציה.

נתונים:

בכל המדריך הזה, אנו משתמשים רק PySpark DataFrame אחד להדגמה. כל הפונקציות המוגדרות על ידי המשתמש שאנו מגדירים מיושמות על PySpark DataFrame זה. ודא שאתה יוצר DataFrame זה בסביבה שלך תחילה לאחר ההתקנה של PySpark.



ייבוא ​​pyspark

מ-pyspark.sql ייבוא ​​SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'רמז לינוקס' ).getOrCreate()

מ-pyspark.sql.functions ייבוא ​​pandas_udf

מ-pyspark.sql.types ייבוא ​​*

לייבא פנדות בתור פנדה

# פרטי ירק

ירק =[{ 'סוּג' : 'ירקות' , 'שֵׁם' : 'עגבנייה' , 'locate_country' : 'ארה'ב' , 'כַּמוּת' : 800 },

{ 'סוּג' : 'פרי' , 'שֵׁם' : 'בננה' , 'locate_country' : 'חרסינה' , 'כַּמוּת' : עשרים },

{ 'סוּג' : 'ירקות' , 'שֵׁם' : 'עגבנייה' , 'locate_country' : 'ארה'ב' , 'כַּמוּת' : 800 },

{ 'סוּג' : 'ירקות' , 'שֵׁם' : 'מנגו' , 'locate_country' : 'יפן' , 'כַּמוּת' : 0 },

{ 'סוּג' : 'פרי' , 'שֵׁם' : 'לימון' , 'locate_country' : 'הוֹדוּ' , 'כַּמוּת' : 1700 },

{ 'סוּג' : 'ירקות' , 'שֵׁם' : 'עגבנייה' , 'locate_country' : 'ארה'ב' , 'כַּמוּת' : 1200 },

{ 'סוּג' : 'ירקות' , 'שֵׁם' : 'מנגו' , 'locate_country' : 'יפן' , 'כַּמוּת' : 0 },

{ 'סוּג' : 'פרי' , 'שֵׁם' : 'לימון' , 'locate_country' : 'הוֹדוּ' , 'כַּמוּת' : 0 }

]

# צור את מסגרת הנתונים של השוק מהנתונים שלעיל

market_df = linuxhint_spark_app.createDataFrame(ירקות)

market_df.show()

תְפוּקָה:

כאן אנו יוצרים DataFrame עם 4 עמודות ו-8 שורות. כעת, אנו משתמשים ב- pandas_udf() כדי ליצור את הפונקציות המוגדרות על ידי המשתמש ולהחיל אותן על העמודות הללו.

Pandas_udf() עם סוגי נתונים שונים

בתרחיש זה, אנו יוצרים כמה פונקציות המוגדרות על ידי המשתמש עם pandas_udf() ומחילים אותן על עמודות ומציגים את התוצאות באמצעות שיטת select() . בכל מקרה, אנו משתמשים ב-pandas.Series כאשר אנו מבצעים את הפעולות הווקטוריות. זה מחשיב את ערכי העמודה כמערך חד-ממדי והפעולה מוחלת על העמודה. בדקורטור עצמו, אנו מציינים את סוג החזרת הפונקציה.

דוגמה 1: Pandas_udf() עם סוג מחרוזת

כאן, אנו יוצרים שתי פונקציות המוגדרות על ידי המשתמש עם סוג החזרת מחרוזת כדי להמיר את ערכי העמודה מסוג מחרוזת לאותיות גדולות ואותיות קטנות. לבסוף, אנו מיישמים את הפונקציות הללו על עמודות 'סוג' ו-'locate_country'.

# המר את עמודת הסוג לאותיות גדולות עם pandas_udf

@pandas_udf(StringType())

def type_upper_case(i: panda.Series) -> panda.Series:

החזר i.str.upper()

# המר את העמודה locate_country לאותיות קטנות עם pandas_udf

@pandas_udf(StringType())

def country_lower_case(i: panda.Series) -> panda.Series:

החזר i.str.lower()

# הצג את העמודות באמצעות select()

market_df.select( 'סוּג' ,סוג_אותיות_גדולות( 'סוּג' ), 'locate_country' ,
מדינה_אותיות קטנות( 'locate_country' )).הופעה()

תְפוּקָה:

הֶסבֵּר:

הפונקציה StringType() זמינה במודול pyspark.sql.types. כבר ייבאנו את המודול הזה בזמן יצירת ה-PySpark DataFrame.

  1. ראשית, UDF (פונקציה מוגדרת על ידי משתמש) מחזירה את המחרוזות באותיות רישיות באמצעות הפונקציה str.upper() . ה-str.upper() זמין ב-Series Data Structure (כפי שאנו ממירים לסדרה עם חץ בתוך הפונקציה) הממיר את המחרוזת הנתונה לאותיות רישיות. לבסוף, פונקציה זו מוחלת על העמודה 'סוג' אשר צוינה בתוך שיטת select() . בעבר, כל המחרוזות בעמודת הסוג הן באותיות קטנות. כעת, הם שונו לאותיות רישיות.
  2. שנית, UDF מחזיר את המחרוזות באותיות רישיות באמצעות הפונקציה str.lower()‎. ה-str.lower() זמין ב-Seer Data Structure הממיר את המחרוזת הנתונה לאותיות קטנות. לבסוף, פונקציה זו מוחלת על העמודה 'סוג' אשר צוינה בתוך שיטת select() . בעבר, כל המחרוזות בעמודת הסוג מופיעות באותיות רישיות. כעת, הם שונו לאותיות קטנות.

דוגמה 2: Pandas_udf() עם סוג מספר שלם

בואו ניצור UDF שממיר את עמודת המספרים השלמים של PySpark DataFrame לסדרת Pandas ונוסיף 100 לכל ערך. העבר את העמודה 'כמות' לפונקציה זו בתוך שיטת select() .

# הוסף 100

@pandas_udf(IntegerType())

def add_100(i: panda.Series) -> panda.Series:

החזר i+ 100

# העבירו את עמודת הכמות לפונקציה שלמעלה והצג.

market_df.select( 'כַּמוּת' ,add_100( 'כַּמוּת' )).הופעה()

תְפוּקָה:

הֶסבֵּר:

בתוך ה-UDF, אנו חוזרים על כל הערכים וממירים אותם ל-Series. לאחר מכן, נוסיף 100 לכל ערך בסדרה. לבסוף, אנו מעבירים את העמודה 'כמות' לפונקציה זו ונוכל לראות ש-100 מתווסף לכל הערכים.

Pandas_udf() עם סוגי נתונים שונים באמצעות Groupby() ו-Agg()

בואו נסתכל על הדוגמאות להעברת ה-UDF לעמודות המצטברות. כאן, ערכי העמודות מקובצים תחילה באמצעות הפונקציה groupby() והצבירה מתבצעת באמצעות הפונקציה agg() . אנו מעבירים את ה-UDF שלנו בתוך הפונקציה המצטברת הזו.

תחביר:

pyspark_dataframe_object.groupby( 'קבוצת_עמודה' ).agg(UDF
(pyspark_dataframe_object[ 'טור' ]))

כאן, הערכים בעמודת הקיבוץ מקובצים תחילה. לאחר מכן, הצבירה מתבצעת על כל נתונים מקובצים ביחס ל-UDF שלנו.

דוגמה 1: Pandas_udf() עם ממוצע מצטבר()

כאן, אנו יוצרים פונקציה מוגדרת על ידי משתמש עם ציפה מסוג החזרה. בתוך הפונקציה, אנו מחשבים את הממוצע באמצעות הפונקציה mean() . UDF זה מועבר לעמודת 'כמות' כדי לקבל את הכמות הממוצעת עבור כל סוג.

# החזר את הממוצע/הממוצע

@pandas_udf( 'לָצוּף' )

def average_function(i: panda.Series) -> float:

החזר i.mean()

# העבירו את עמודת הכמות לפונקציה על ידי קיבוץ עמודת הסוג.

market_df.groupby( 'סוּג' ).agg(average_function(market_df[ 'כַּמוּת' ])).הופעה()

תְפוּקָה:

אנו מקבצים על סמך אלמנטים בעמודה 'סוג'. נוצרות שתי קבוצות - 'פירות' ו'ירקות'. עבור כל קבוצה, הממוצע מחושב ומוחזר.

דוגמה 2: Pandas_udf() עם מאגר מקס() ו-Min()

כאן, אנו יוצרים שתי פונקציות המוגדרות על ידי משתמש עם סוג ההחזרה השלם (int). ה-UDF הראשון מחזיר את הערך המינימלי וה-UDF השני מחזיר את הערך המקסימלי.

# pandas_udf שמחזירים את הערך המינימלי

@pandas_udf( 'int' )

def min_(i: panda.Series) -> int:

החזר i.min()

# pandas_udf שמחזירים את הערך המקסימלי

@pandas_udf( 'int' )

def max_(i: panda.Series) -> int:

החזר i.max()

# העבר את עמודת הכמות ל-min_ pandas_udf על ידי קיבוץ locate_country.

market_df.groupby( 'locate_country' ).agg(min_(market_df[ 'כַּמוּת' ])).הופעה()

# העבר את עמודת הכמות ל-max_ pandas_udf על ידי קיבוץ locate_country.

market_df.groupby( 'locate_country' ).agg(max_(market_df[ 'כַּמוּת' ])).הופעה()

תְפוּקָה:

כדי להחזיר ערכי מינימום ומקסימום, אנו משתמשים בפונקציות min() ו-max() בסוג ההחזרה של UDFs. כעת, אנו מקבצים את הנתונים בעמודה 'locate_country'. נוצרות ארבע קבוצות ('סין', 'הודו', 'יפן', 'ארה'ב'). עבור כל קבוצה, אנו מחזירים את הכמות המקסימלית. באופן דומה, אנו מחזירים את הכמות המינימלית.

סיכום

בעיקרון, ה- pandas_udf () משמש לביצוע הפעולות הווקטוריות ב-PySpark DataFrame שלנו. ראינו כיצד ליצור את () pandas_udf ולהחיל אותו על PySpark DataFrame. להבנה טובה יותר, דנו בדוגמאות השונות על ידי התחשבות בכל סוגי הנתונים (מחרוזת, צף ומספר שלם). אפשר להשתמש ב- pandas_udf() עם groupby() דרך הפונקציה agg().