שאלה ב Producer Consumer pattern

Sea Bass

New member
שאלה ב Producer Consumer pattern

היי
צריך התייעצות לגבי Producer Consumer pattern.
אני עובד על מערכת שמקבלת "משימות \ Items" וצריכה לעבוד עליהם.
כל Item עובד בטרד ולוקח שניה לעבוד עליו. יכול להיות שיגיעו לי 4 Items בשניה ויכול 200 Items בשניה. מה שאני רוצה לשאול זה במנגנון Producer Consumer איך אני מחליט כמה Consumers להרים? אם אני שם 200 Consumers אז כשנכנסים לי 200 items זה טס לי ותוך שניה באמת גומר הכל. אבל לפעמים מספיק רק 2 Cons. יש מצב גם שבמשך דקה לא נכנס כלום אז אפשר "לשתק את כל המנגנון"
אז האם אפשר ב RunTime לקבוע את מספר ה Consumers (כמובן עד גבול מסויים)? האם זו הדרך? האם ניתן להסתכל ב RunTime על התור ולבצע שינויים כמו להוסיף או להוריד?
תודה רבה.
 

מiטקה

New member
אתה יכול לעבוד עם Queue

מי שרוצה יכול לדחוף items לתוך ה queue ואתה מחליט ב RunTime כמב אתה מושך בו זמנית.
 

Sea Bass

New member
כמובן שאני עובד עם QUEUE

אני בהתלבטות בין Data flow TP לבין BlockingCollection.
אז לקבוע את מספר ה Consumers ב Runtime זה דבר מקובל?
מה הבעיות שעלולות לצוץ? האם כשאני מזהה שאין יותר Items אני אבצע Complete וככה אשחרר את ה Consumers?.. זה אומר שב Item הבא אני מרים את זה מחדש... כי אחרי Complete ה queue "ננעל".
 

nocgod

New member
לא יודע באיזה מהם עדיף, כנראה במי שנוח יותר

כן מקובל - ואפילו נהוג (לא בטוח כמה זה נהוג בin-process)

בעיה - בעיקר יציאה מסנכרון ו starvation, במקרה יותר גרוע race conditions והכי גרוע deadlocks - בגדול כל מה שיכול לצוץ בתוכנה שהיא multi-threaded והמשימה לא מוגדרת היטב או לא מוגנת היטב מבעיות כאלו.

לרוב עושים scale-out במקרה שהconsumers הם multi-process לפי CPU, במקרה של עיבוד in-process כנראה הייתי עושה על כמות עבודה בתור. בכל מקרה - לא הייתי הורג אל כל העובדים, תמיד להשאיר לפחות אחד.
 

Sea Bass

New member
לא הבנתי..

את המשפט:
לרוב עושים scale-out במקרה שהconsumers הם multi-process לפי CPU, במקרה של עיבוד in-process כנראה הייתי עושה על כמות עבודה בתור.
<font> לפי ה CPU? אתה יכול להסביר טוב יותר?</font>
<font>ולפי מה שכתבת והבנתי אז זה מקובל להגדיל את ה Consumers ב Runtime?</font>
<font>אם כן, אז לפי מה? זמן ממוצע של ITEM בתור?</font>
<font>ומתי אני מבצע Complete? אין לי מערך ידוע מראש שאני יודע מתי אני צריך לבצע Comlete.</font>
 

nocgod

New member
אנסה להסביר את עצמי

בעולם של microservices קרוב לוודאי המצב שיהיה שיש לך producers ששולחים מידע מעל service-bus או מערכת queuing/messaging אחרת.
הconsumers הם לרוב worker services שמאזינים למערכות queuing האלו, כאשר כל consumer צורך לרוב הודעה אחת אחרי השניה ומטפל בהן.
במקרה של עומס במערכת הqueuing שאתה משתמש בה, או לחליפין עומס בממוצע על ה CPU במכונה שמריצה את ה consumers שלך גבוהה מידי נהוג להוסיף עוד מכונה עם עוד consumers. זה בעולם של microservices וcloud ושאר הבאזז ורדס.

במקרה שלך, אתה מבקש להגדיל את כמות ה threads בprocess שמטפלים בעבודות (זהות), כאשר ה producer יושב באותו ה process יחד עם ה consumers. כלומר כולם יושבים באותו ה process אבל ב threads שונים.
במקרה כזה קשה לדעת איך להגדיל את כמות הצרכנים, מהסיבה הפשוטה שלא תמיד הוספה של threads תפתור את הבעיה (זה לא פלסטר קסם).

אם ה consumers חנוקים על IO - תדאג שהconsumer יהיה async וידע לשחרר את הthread שהוא עובד עליו (async await על task) ולהתחיל לטפל בהודעה נוספת.
במקרה והconsumers שלך חנוקים על CPU - הוספה של עוד threads רק תרע את המצב שלך.
 

nocgod

New member
לגבי איך להגדיל

הייתי שם עוד thread שיהיה מעין monitor, הוא כל שניה ידגום את גודל התור עבודה שלך.
תקבע לעצמך שאם יש בתור 100 עבודות (נניח) אתה מוסיף עוד consumer
אם יש פחות מ 50 אתה מוריד 1.
איך לבצע הורדה שלך consumer? באמצעות הכנסה של הודעת poison לתור.
אחד העובדים יקרא את הודעת ה poison, ויעשה return כדי לסיים את עבודתו.

אני בטוח שיש פתרונות יותר אלגנטיים לבעיה הזאת
 

Sea Bass

New member
אלו בדיוק הדברים שאני חשבתי עליהם

ולא ידעתי אם אני עושה משהו קטסטרופלי מבחינת קוד או משהו שהוא לגיטימי.
חשבתי על Monitor שמנהל את כל ה Consumers אך לא מצאתי כלום באינטרנט לגבי זה.
 

nocgod

New member
אם זה משהו לימודי/אקדמי

לך על זה, תכתוב לבד, תשים github ותבקש ממישהו עם ניסיון בפיתוח וראש על הכתפיים (לא תמיד בעל ניסיון הוא בעל ראש פעיל) לעשות לך code review.

אם זה משהו לעבודה, תבדוק אם זה הפיתרון שאתה באמת רוצה ללכת אליו.
אולי עדיף לא למקבל עבודה אלא לפתור את הבעיה שיוצרת לך את הצוואר בקבוק:
א. אם זה עומס ב IO אז לנסות ולהשתמש ב async IO
ב. אם זה עומס ב CPU אז הוספה של threads בכל מקרה לא תועיל אז תוסיף עוד מחשבים לקלחת שמריצים את הconsumer שלך. במקרה הזה אתה לא תוכל להשתמש כבר ב queue פנימיים אלא תצטרך מערכת חיצונית שתנהל לך את הqueue
 
למעלה