כיצד ליישם הזרמת נתונים בזמן אמת ב- Python

Kyzd Lyysm Hzrmt Ntwnym Bzmn Mt B Python



שליטה ביישום הזרמת נתונים בזמן אמת ב-Python פועלת כמיומנות חיונית בעולם המעורב בנתונים של ימינו. מדריך זה בוחן את שלבי הליבה והכלים החיוניים לניצול הזרמת נתונים בזמן אמת עם אותנטיות ב- Python. מבחירת מסגרת מתאימה כמו Apache Kafka או Apache Pulsar ועד לכתיבת קוד Python לצריכת נתונים ללא מאמץ, עיבוד והדמיה יעילה, נרכוש את המיומנויות הדרושות לבניית ערוצי נתונים זריזים ויעילים בזמן אמת.

דוגמה 1: יישום הזרמת נתונים בזמן אמת בפייתון

הטמעת הזרמת נתונים בזמן אמת ב- Python היא חיונית בעידן ובעולם מונעי הנתונים של ימינו. בדוגמה מפורטת זו, נעבור על תהליך בניית מערכת הזרמת נתונים בזמן אמת באמצעות Apache Kafka ו- Python ב-Google Colab.







כדי לאתחל את הדוגמה לפני שנתחיל בקידוד, בניית סביבה ספציפית ב-Google Colab היא חיונית. הדבר הראשון שעלינו לעשות הוא להתקין את הספריות הדרושות. אנו משתמשים בספריית 'קפקא-פיתון' לשילוב קפקא.



! צִפצוּף להתקין קפקא-פיתון


פקודה זו מתקינה את ספריית 'kafka-python' אשר מספקת את פונקציות Python ואת ה-bindings עבור Apache Kafka. לאחר מכן, אנו מייבאים את הספריות הנדרשות עבור הפרויקט שלנו. ייבוא ​​הספריות הנדרשות כולל 'KafkaProducer' ו-'KafkaConsumer' הם השיעורים מספריית 'kafka-python' המאפשרים לנו ליצור אינטראקציה עם מתווכים של קפקא. JSON היא ספריית Python לעבודה עם נתוני ה-JSON שבהם אנו משתמשים כדי להסדיר ולבטל את ההודעות בסידרה.



מ-kafka ייבוא ​​KafkaProducer, KafkaConsumer
ייבוא ​​json


יצירת מפיק קפקא





זה חשוב כי מפיק קפקא שולח את הנתונים לנושא קפקא. בדוגמה שלנו, אנו יוצרים מפיק כדי לשלוח נתונים מדומה בזמן אמת לנושא שנקרא 'נושא בזמן אמת'.

אנו יוצרים מופע 'KafkaProducer' המציין את כתובת המתווך של קפקא כ-'localhost:9092'. לאחר מכן, אנו משתמשים ב-'value_serializer', פונקציה שמסדרת את הנתונים לפני שליחתם לקפקא. במקרה שלנו, פונקציית למבדה מקודדת את הנתונים כ-JSON מקודד UTF-8. כעת, בואו נדמה כמה נתונים בזמן אמת ונשלח אותם לנושא קפקא.



מפיק = קפקא מפיק ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( ב ) .לְהַצְפִּין ( 'utf-8' ) )
# נתוני זמן אמת מדומים
נתונים = { 'זיהוי_חיישן' : 1 , 'טֶמפֶּרָטוּרָה' : 25.5 , 'לחות' : 60.2 }
# שליחת נתונים לנושא
producer.send ( 'נושא בזמן אמת' , נתונים )


בשורות אלו, אנו מגדירים מילון 'נתונים' המייצג נתוני חיישן מדומה. לאחר מכן אנו משתמשים בשיטת 'שלח' כדי לפרסם את הנתונים הללו ל'נושא בזמן אמת'.

לאחר מכן, אנו רוצים ליצור צרכן קפקאי, וצרכן קפקאי קורא את הנתונים מתוך נושא קפקאי. אנו יוצרים צרכן לצרוך ולעבד את המסרים ב'נושא בזמן אמת'. אנו יוצרים מופע 'KafkaConsumer', המציין את הנושא שברצוננו לצרוך, למשל, (נושא בזמן אמת) ואת כתובת המתווך של קפקא. לאחר מכן, ה-'value_deserializer' הוא פונקציה המבטלת את הנתונים המתקבלים מקפקא. במקרה שלנו, פונקציית למבדה מפענחת את הנתונים כ-JSON מקודד UTF-8.

צרכן = KafkaConsumer ( 'נושא בזמן אמת' ,
bootstrap_servers = 'localhost:9092' ,
value_deserializer =lambda x: json.loads ( x.decode ( 'utf-8' ) ) )


אנו משתמשים בלולאה איטרטיבית כדי לצרוך ולעבד באופן רציף את ההודעות מהנושא.

# קריאה ועיבוד נתונים בזמן אמת
ל הוֹדָעָה ב צרכן:
data = message.value
הדפס ( ו 'נתונים שהתקבלו: {נתונים}' )


אנו מאחזרים את הערך של כל הודעה ואת נתוני החיישן המדומים שלנו בתוך הלולאה ומדפיסים אותם לקונסולה. הפעלת המפיק והצרכן של קפקא כרוכה בהפעלת קוד זה בגוגל קולאב וביצוע תאי הקוד בנפרד. המפיק שולח את הנתונים המדומים לנושא קפקא, והצרכן קורא ומדפיס את הנתונים שהתקבלו.


ניתוח של הפלט בזמן הפעלת הקוד

אנו נצפה בנתון בזמן אמת שמיוצר ונצרך. פורמט הנתונים עשוי להשתנות בהתאם לסימולציה שלנו או למקור הנתונים בפועל. בדוגמה מפורטת זו, אנו מכסים את כל התהליך של הקמת מערכת הזרמת נתונים בזמן אמת באמצעות Apache Kafka ו- Python ב-Google Colab. נסביר כל שורת קוד ואת משמעותה בבניית מערכת זו. הזרמת נתונים בזמן אמת היא יכולת רבת עוצמה, ודוגמה זו משמשת בסיס ליישומים מורכבים יותר בעולם האמיתי.

דוגמה 2: יישום הזרמת נתונים בזמן אמת ב-Python באמצעות נתוני שוק המניות

בואו נעשה עוד דוגמה ייחודית ליישום הזרמת נתונים בזמן אמת ב-Python באמצעות תרחיש אחר; הפעם, נתמקד בנתוני שוק המניות. אנו יוצרים מערכת הזרמת נתונים בזמן אמת אשר לוכדת את השינויים במחיר המניה ומעבדת אותם באמצעות Apache Kafka ו- Python בגוגל קולאב. כפי שהודגם בדוגמה הקודמת, אנו מתחילים בהגדרת הסביבה שלנו ב-Google Colab. ראשית, אנו מתקינים את הספריות הנדרשות:

! צִפצוּף להתקין kafka-python yfinance


כאן, אנו מוסיפים את ספריית 'yfinance' המאפשרת לנו לקבל נתונים על שוק המניות בזמן אמת. לאחר מכן, אנו מייבאים את הספריות הדרושות. אנו ממשיכים להשתמש בכיתות 'KafkaProducer' ו-'KafkaConsumer' מספריית 'kafka-python' לאינטראקציה עם קפקא. אנו מייבאים JSON לעבודה עם נתוני JSON. אנו משתמשים גם ב-'yfinance' כדי לקבל נתונים על שוק המניות בזמן אמת. אנו גם מייבאים את ספריית 'הזמן' כדי להוסיף עיכוב זמן כדי לדמות את העדכונים בזמן אמת.

מ-kafka ייבוא ​​KafkaProducer, KafkaConsumer
ייבוא ​​json
יבוא כספים כפי ש yf
יְבוּא זְמַן


כעת, אנו יוצרים מפיק קפקא לנתוני מניות. מפיק קפקא שלנו מקבל נתוני מלאי בזמן אמת ושולח אותו לנושא קפקא בשם 'מחיר מלאי'.

מפיק = קפקא מפיק ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( ב ) .לְהַצְפִּין ( 'utf-8' ) )

בזמן נָכוֹן:
מניה = yf.Ticker ( 'AAPL' ) # דוגמה: מניית Apple Inc
stock_data = stock.history ( פרק זמן = '1d' )
last_price = נתוני_מלאי [ 'סגור' ] .iloc [ - 1 ]
נתונים = { 'סֵמֶל' : 'AAPL' , 'מחיר' : מחיר סופי }
producer.send ( 'מחיר מניה' , נתונים )
זמן שינה ( 10 ) # הדמיית עדכונים בזמן אמת כל 10 שניות


אנו יוצרים מופע 'KafkaProducer' עם הכתובת של ברוקר קפקא בקוד זה. בתוך הלולאה, אנו משתמשים ב-yfinance כדי לקבל את מחיר המניה העדכני ביותר עבור Apple Inc. ('AAPL'). לאחר מכן, אנו מחלצים את מחיר הסגירה האחרון ושולחים אותו לנושא 'מחיר המניה'. בסופו של דבר, אנו מציגים השהיית זמן כדי לדמות את העדכונים בזמן אמת כל 10 שניות.

בואו ניצור צרכן קפקא שיקרא ויעבד את נתוני מחיר המניה מהנושא 'מחיר המניה'.

צרכן = KafkaConsumer ( 'מחיר מניה' ,
bootstrap_servers = 'localhost:9092' ,
value_deserializer =lambda x: json.loads ( x.decode ( 'utf-8' ) ) )

ל הוֹדָעָה ב צרכן:
stock_data = message.value
הדפס ( ו 'נתוני מלאי שהתקבלו: {stock_data['symbol']} - מחיר: {stock_data['price']}' )


קוד זה דומה להגדרת הצרכן של הדוגמה הקודמת. הוא קורא ומעבד ברציפות את ההודעות מהנושא 'מחיר מלאי' ומדפיס את סמל המניה והמחיר לקונסולה. אנו מבצעים את תאי הקוד ברצף, למשל, אחד אחד ב-Google Colab כדי להפעיל את המפיק והצרכן. היצרן מקבל ושולח את עדכוני מחיר המניה בזמן אמת בזמן שהצרכן קורא ומציג נתונים אלה.

! צִפצוּף להתקין kafka-python yfinance
מ-kafka ייבוא ​​KafkaProducer, KafkaConsumer
ייבוא ​​json
ייבוא ​​כספים כפי ש yf
יְבוּא זְמַן
מפיק = קפקא מפיק ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( ב ) .לְהַצְפִּין ( 'utf-8' ) )

בזמן נָכוֹן:
מניה = yf.Ticker ( 'AAPL' ) # מניית Apple Inc
stock_data = stock.history ( פרק זמן = '1d' )
last_price = נתוני_מלאי [ 'סגור' ] .iloc [ - 1 ]

נתונים = { 'סֵמֶל' : 'AAPL' , 'מחיר' : מחיר סופי }

producer.send ( 'מחיר מניה' , נתונים )

זמן שינה ( 10 ) # הדמיית עדכונים בזמן אמת כל 10 שניות
צרכן = KafkaConsumer ( 'מחיר מניה' ,
bootstrap_servers = 'localhost:9092' ,
value_deserializer =lambda x: json.loads ( x.decode ( 'utf-8' ) ) )

ל הוֹדָעָה ב צרכן:
stock_data = message.value
הדפס ( ו 'נתוני מלאי שהתקבלו: {stock_data['symbol']} - מחיר: {stock_data['price']}' )


בניתוח הפלט לאחר ריצת הקוד, נצפה עדכוני מחירי המניה בזמן אמת של Apple Inc. המיוצרים ונצרכים.

סיכום

בדוגמה ייחודית זו, הדגמנו יישום של הזרמת נתונים בזמן אמת ב-Python באמצעות Apache Kafka וספריית 'yfinance' כדי ללכוד ולעבד את נתוני שוק המניות. הסברנו ביסודיות כל שורה בקוד. ניתן ליישם הזרמת נתונים בזמן אמת בתחומים שונים כדי לבנות את היישומים בעולם האמיתי בתחום הפיננסים, ה-IoT ועוד.