Hava axını: Bir vuruşda minlərlə vəzifəni planlaşdırmaq üçün necə istifadə olunur

Bu yazıda, bir dağın içərisində minlərlə vəzifəni necə cədvəlləndirə biləcəyimizi müzakirə edəcəyəm. Hava axını nədir və onu necə quraşdıra biləcəyinizə diqqət yetirmək niyyətində deyiləm, amma bunun əvəzinə bir dağın içərisində çox sayda işi necə planlaşdırmağımızı müzakirə edəcəyəm.

Əsasən, hava axını birdən çox DAG-a sahib olmaq üçün nəzərdə tutulmuşdur və DAG içərisində yüzlərlə və ya min vəzifə ola bilər. Beləliklə, çox sayda vəzifəni planlaşdırmaq istədiyimiz zaman 60000 və ya daha çox deyək. Bu blogda izah etdiklərim.

Hava axını ilə işimi avtomatlaşdırmaq üçün çalışıram. Ancaq mənim şirkətimdə həqiqətən çox miqdarda məlumat var və mən Hava axınının müxtəlif versiyalarından istifadə edərək cəhd etdim və həqiqətən çox böyük məlumatlar olduğuna görə bir DAG-da demək olar ki, 70000 vəzifəm var. Hava axınının müxtəlif versiyalarını sınamışam və ən son versiya 5000 tapşırığı planlaşdıra bilər, amma daha çox cədvəl istəsək, planlayıcı vəzifələri planlaşdırmadan işləyən vəziyyətdə qalır. Hər bir problemi tapdım və əsl səbəb nəyin olduğunu və nəhayət bu bloq yazmağın necə olacağını yoxladım.

Bir DAG içərisində minlərlə vəzifəniz olduqda bu hava axınında istifadə hallarından biridir. Bununla başlamaq üçün hava axınının 1.10.3 versiyasından istifadə etməliyik, bundan sonra çox sayda vəzifəyə diqqət yetirməməliyik, buna görə də hava axını 1.10.3 versiyasından istifadə etməliyik. Bu versiyanı quraşdırmaq üçün aşağıdakı addımları izləyin:

  • Əvvəlcə yeni bir mühit yaratmalı və mühiti aşağıdakı əmrdən istifadə edərək aktivləşdirməliyik:
conda yaratmaq -n hava axını_3
conda hava axını aktivləşdirmək_3
  • Hava axını 1.10.3 xüsusi versiyası ilə quraşdırmaq üçün aşağıdakı əmrdən istifadə edin:
conda quraşdırma -c conda-forge hava axını == 1.10.3
  • Bu versiyanın flask≥1.0.9 ilə işləməyəcəyinə dair bəzi xüsusi tələblərdən əmin olmalısınız, buna görə də bu versiyadan daha böyük flakon varsa, aşağıdakı əmrdən istifadə edin:
boru kəməri quraşdırmaq == 1.0.4
pip quraşdırma funcsigs == 1.0.0 (Bu quraşdırılmalı olan başqa bir tələbdir)
  • Bu çox böyük miqdarda tapşırıqla işləyərkən Kərəviz İcraçısından istifadə etmək tövsiyə olunur, çünki bu vəzifələri paralelləşdirməliyik və buna Kərəviz İcraçısından istifadə etməklə nail olmaq olar. Kərəvizi quraşdırmaq üçün aşağıdakı əmrdən istifadə edin:
pip kərəviz quraşdırın
  • Kərəviz icraçısından istifadə etmək üçün işçilərdən istifadə etməli və bir broker təyin etməlisiniz, mən RabbitMQ-dan vasitəçi kimi istifadə edirəm. TO quraşdırma broker URL aşağıdakı quruluşdan istifadə edə bilər:
broker_url = amqp: // "istifadəçi adı": "şifrə" @ "host_name": "port" /

misal üçün

broker_url = amqp: // qonaq: qonaq @ yerlihost: 5672 /
  • Kərəviz İcraçısının UI-sini görmək üçün Çiçəkdən istifadə edə bilərik, bu əmrdən istifadə etmək üçün:
conda install -c conda-forge çiçək
  • Bunu etdikdən sonra paralel olaraq minlərlə vəzifəni yerinə yetirmək və minlərlə işi bir vuruşda cədvəlləşdirmək üçün bəzi konfiqurasiyanı dəyişdirməliyik.
[əsas]
icraçı = CeleryExector paralellik = 200000 qeyri_pooled_task_slot_count = 100000 dag_concurrency = 100000 max_active_runs_per_dag = 2
[planlayıcı]
max_thread = 10 (Proqramı artırmaq və ya azaltmaqla mövzuları istifadə edə bilər)

Minlərlə işi bir vuruşda planlaşdırmaq istəyəndə bunlar əsas parametrlərdir. Paralel olaraq nə qədər çalışmaq istədiyinizi və bir DAG içərisində neçə vəzifənin olduğuna görə onu tənzimləməlisiniz.

Əsas parametr hava axınının 1.10.4 versiyasından çıxarılan "Non_pooled_task_slot_count" dir, buna görə də 1.10.3 istifadə edirəm, çünki bu parametr tapşırıqların planlaşdırılmasında çox vacib rol oynayır.

"Non_pooled_task_slot_count" çıxarıldıqdan sonra əsas fərq, default olaraq 128 olaraq təyin olunan default_pool istifadə edir (tələbə görə artıra bilər). "Non_pooled_task_slot_count" ın əsas işi tapşırıqları cədvəlləşdirməkdir və bu default_pool ya da verilənlər bazasından hər hansı digər əlaqə bağlantısı deyil ki, bu sayı istədiyiniz qədər artıraq, ancaq "default_pool" da yuvaların sayını artırsanız onda bu da olduğunuz verilənlər bazası bağlantılarına qoşulur və paralel olaraq bir anda 100000 verilənlər bazası bağlantısına sahib ola bilməzsiniz. Əsasən, "Non_pooled_task_slot_count" "default_pool" lehinə çıxarıldı.

Bu yazıda planlaşdırıcının niyə çətin vəziyyətə düşdüyü, ilişib qaldığı, çox sayda vəzifə planlaşdırmadığı və ya heç bir şey etmədən bütün gün işlədiyi sualına cavab var. Bütün bu cavabların hava axını versiyası 1.10.3 istifadə etmək üçün bir cavabı var.

Hava axını 1.10.3 istifadə edərkən DAG tərəfindən hansı hovuzun istifadə olunacağını təyin etməliyik, çünki default olaraq "default_pool" istifadə etmir, buna görə də tapşırıqlar yaratarkən para mater hovuzunu keçməliyik = 'defautl_pool'. UI (Admin -> hovuzlar) istifadə edərək 'default_pool' yarada bilərsiniz və ya əmr xətti ilə edilə bilər:

hava axını hovuzu -s default_pool 128 'default hovuz'.

Budur DAG nümunəsi:

datetime idxal datetime idxal, timedelta hava axını idxal DAG hava axını.operators.dummy_operator idxal DummyOperator
default_args = {'sahibi': 'Hava axını', 'asılı_on_past': Yanlış, 'başlanğıc günü': hava axını.utils.dates.days_ago (2), 'təkrar cəhd': 1, 'təkrarlama': timedelta (dəqiqə = 1),}
dag = DAG ('dummy_try1', default_args = default_args, cədvəl_interval = Yox)
(50000) aralığındakı i üçün: vəzifələr = DummyOperator (task_id = '{}'. format (i), dag = dag, hovuz = 'default_pool)

Aşağıdakı linkdən bütün versiyalar arasındakı fərqi yoxlaya bilərsiniz:

  • https://github.com/apache/airflow/blob/master/UPDATING.md#airflow-1104