תחי ישראל - אין לנו ארץ אחרת

תחי ישראל -אין לנו ארץ אחרת

שליחת מייל עם FastApi עבודה עם פונקציה א-סינכרונית

מחבר:
בתאריך:

השתמשתי באפליקציה מבוססת FastApi לשליחת מייל (הסבר תהליך השליחה). הפונקציה לשליחת מייל היא א-סינכרונית. הפונקציה testAsync קוראת לפונקציה sendAsync:

from fastapi import FastAPI
from starlette.responses import JSONResponse
from fastapi_mail import FastMail, MessageSchema, ConnectionConfig, MessageType
 
async def sendAsync(to, id):
   now = datetime.now() # current date and time
   now_str = now.strftime("%Y-%m-%d %H:%M:%S")
 
   title = f'test email : {now_str}, id : {id}'
 
   message = MessageSchema(
       subject=title,
       recipients=[to],
       body=title,
       subtype=MessageType.html)
 
   fm = FastMail(conf_email)
   await fm.send_message(message)
   
@app.get('/test-async')
def testAsync():
   sendAsync("mail@test.com", 1)

הרצתי את הקוד, וקיבלתי הודעה אזהרה:

$ RuntimeWarning: coroutine 'sendAsync' was never awaited

הסיבה להודעה היא שאנחנו צריכים להפעיל event loop שישאר בהאזנה לפונקציה הא-סינכרונית עד שתסיים. event loop הוא תהליך שבודק האם יש תהליכים או פונקציות א-סינכרוניות שסיימו לרוץ. כדי להתחיל את תהליך ה-event loop אנחנו צריכים להוסיף לקריאה לפונקציה א-סינכרונית את הקוד שפותח event loop, ממתין לו עד שיסיים, ובסוף סוגר את ה-event.

עושים את זה באמצעות ספרייה של פייתון:

import asyncio

אפשר להשתמש בקוד הבא כדי להפעיל event loop שיאזין לפונקציה עד שתסיים:

loop = asyncio.get_event_loop()
loop.run_until_complete(call_to_func())
loop.close()

אבל עדיף שלא כי עד כמה שהוא מפורט הוא לא תמיד עובד.

דרך טובה יותר היא להשתמש במתודה asyncio.run() ומתוכה לקרוא לפונקציה:

@app.get('/test-async')
def testAsync():
    asyncio.run(sendAsync("mail@gmail.com", 1))
    # print(type(response))
    # print(dir(response))
    msg = json.loads(response.body)
    print(msg["message"])

התוצאה:

email 1 has been sent

 

אפשר לשלוח לרשימה של מיילים באמצעות לולאה:

@app.get('/test-async')
def testAsync():
    start_time = time.process_time()
    
    emails = ["mail@mail.com", "mail2@mail.com","mail3@mail.com"]
    for idx, email in enumerate(emails):
        response = asyncio.run(sendAsync(email, idx))
        msg = json.loads(response.body)
        print(msg["message"])
    
    end_time = time.process_time()
    
    print("Ran %s seconds" % (end_time - start_time))

תוצאה:

email 0 has been sent
email 1 has been sent
email 2 has been sent
Ran 0.09052418200000001 seconds

מדריכים נוספים בסדרה ללימוד FastApi

  1. FastAPI - היכרות עם ספרית הקוד הטובה ביותר של פיתון להקמת אפליקציות אינטרנט
  2. אפליקצית אינטרנט עם FastApi - הקמת מסד הנתונים
  3. קלאס לניהול משתמשים באפליקציית FastApi

 

לכל המדריכים בסדרה ללימוד פייתון

 

אהבתם? לא אהבתם? דרגו!

0 הצבעות, ממוצע 0 מתוך 5 כוכבים

 

 

המדריכים באתר עוסקים בנושאי תכנות ופיתוח אישי. הקוד שמוצג משמש להדגמה ולצרכי לימוד. התוכן והקוד המוצגים באתר נבדקו בקפידה ונמצאו תקינים. אבל ייתכן ששימוש במערכות שונות, דוגמת דפדפן או מערכת הפעלה שונה ולאור השינויים הטכנולוגיים התכופים בעולם שבו אנו חיים יגרום לתוצאות שונות מהמצופה. בכל מקרה, אין בעל האתר נושא באחריות לכל שיבוש או שימוש לא אחראי בתכנים הלימודיים באתר.

למרות האמור לעיל, ומתוך רצון טוב, אם נתקלת בקשיים ביישום הקוד באתר מפאת מה שנראה לך כשגיאה או כחוסר עקביות נא להשאיר תגובה עם פירוט הבעיה באזור התגובות בתחתית המדריכים. זה יכול לעזור למשתמשים אחרים שנתקלו באותה בעיה ואם אני רואה שהבעיה עקרונית אני עשוי לערוך התאמה במדריך או להסיר אותו כדי להימנע מהטעיית הציבור.

שימו לב! הסקריפטים במדריכים מיועדים למטרות לימוד בלבד. כשאתם עובדים על הפרויקטים שלכם אתם צריכים להשתמש בספריות וסביבות פיתוח מוכחות, מהירות ובטוחות.

המשתמש באתר צריך להיות מודע לכך שאם וכאשר הוא מפתח קוד בשביל פרויקט הוא חייב לשים לב ולהשתמש בסביבת הפיתוח המתאימה ביותר, הבטוחה ביותר, היעילה ביותר וכמובן שהוא צריך לבדוק את הקוד בהיבטים של יעילות ואבטחה. מי אמר שלהיות מפתח זו עבודה קלה ?

השימוש שלך באתר מהווה ראייה להסכמתך עם הכללים והתקנות שנוסחו בהסכם תנאי השימוש.

הוסף תגובה חדשה

 

 

ענה על השאלה הפשוטה הבאה כתנאי להוספת תגובה:

דג למים הוא כמו ציפור ל...?