Akka Axını və Alpakka CSV ilə bir CSV faylındakı sahələrin sayını necə təsdiqləmək olar

Alpakka, Akka Axınlarının üstündə qurulan Java və Scala üçün axın bilən boru kəmərlərini həyata keçirmək üçün məşhur bir mənbə layihəsidir. Məlumatları emal etmək və bir sıra saxlama sistemlərində (məsələn, AWS S3) saxlamaq üçün bir çox "bağlayıcı" təmin edir.

Bu yazıda CSV (vergüllə ayrılmış dəyərlər) məlumatlarına diqqət yetirəcəyik və hər satırdakı sahələrin sayının necə doğrulanacağını göstərəcəyik.

Vikipediyanın şəkli

CSV nə deməkdir?

CSV vergüllə ayrılmış dəyərlərin tərəfdarıdır və məlumatların cədvəl formatında saxlanmasına imkan verən bir növdür. CSV-lər elektron mətn sənədləridir, cədvəllərdən daha asan yaradılması və işlənməsi asandır. Əslində onların yalnız bir vərəqi var və düsturlara icazə vermirlər.

Rəsmi bir spesifikasiya (RFC4180) mövcud olsa da, fərqli parsers bunu fərqli şəkildə şərh edir.

Sahələrin qiymətləndirilməsi

Akka Axınları, mənbələri, qaynaqları və axını ilə tanış olmadığınızı hiss edirsinizsə, bu məqaləni mövzuya əla giriş üçün oxumağı məsləhət görürəm: Akka axınlarına giriş.

Bu hissədə CSK sənədinin məzmununu oxumaq üçün Akka Axınlarından istifadə edəcəyik, hər sətir üçün bir xəritə [String, String] alırıq, burada düymələr başlıqları təmsil edir (yəni fayldakı ilk sıra) və dəyərlər müəyyən bir sətirdə həqiqi dəyərləri təmsil edir. Əvvəlcədən təyin edilmiş bağlayıcılar bu Xəritəni yaratmağa imkan versələr də, sahələrin sayına dair əsas yoxlamaları aparmağımıza kömək etmir.

Koddan başlayaq və necə davranacağını görək.

Sətir 1 sadəcə üç sütun və iki sıra ilə əsas CSV faylını yaradır. 8-ci sətir sonra bu sətri Mənbə [ByteString, NotUsed] halına gətirir, burada NotUsed sadəcə mənbənin heç bir dəyər yaratmadığı anlamına gəlir, sadəcə ByteString axını istehsal edir. Daha sonra ByteString bu cür axını iki daxili axını istifadə edərək işləyirik (bu funksiya kimi düşünmək olar). Əvvəlcə lineScanner () ilə bir xətt ayırıcı olaraq \ n istifadə edərək onu xətlərə ayırdıq. İkincisi, hər bir sətri toMapAsStrings () istifadə edərək Xəritəyə [String, String] çeviririk. Daha sonra axını yalnız hər bir satırı (xəritə şəklində təqdim edən) bir funksiya ilə işə salırıq. RunWith () metodu axın bitdikdə tamamlanan Gələcəyə [Seq [Xəritə [Sistem, Sətir]]] qaytarır.

İşlədikdə, bu sadə proqram aşağıdakı çıxışı yazdırır:

Vektor (Xəritə (sütun1 -> dəyər1, sütun2 -> 2, sütun3 -> dəyər3), Xəritə (sütun1 -> 1, sütun2 -> dəyər2, sütun3 -> 3))

Bu kod bir problemdən əziyyət çəkir: yalnız hər bir sətir üçün sahələrin sayı sütunların sayına uyğun gəldiyi təqdirdə düzgün işləyir (birinci cərgədəki dəyərlərin sayı olaraq təyin olunur).

Aşağıdakı CSV-lərlə nəyi çap etdiyini görək (müvafiq olaraq başlıqlardan daha az və daha çox sahə):

val csvFewer = "" "sütun1, sütun2, sütun3 | dəyər1,2 | 1, dəyər2,3 |" "". stripMargin
val csvMore = "" "sütun1, sütun2, sütun3 | dəyər1,2, dəyər3, başqa bir dəyər | 1, dəyər2,3 |" "". stripMargin

İki çıxış aşağıdakı kimidir:

Vektor (Xəritə (sütun1 -> dəyər1, sütun2 -> 2), Xəritə (sütun1 -> 1, sütun2 -> dəyər2, sütun3 -> 3))

Vektor (Xəritə (sütun1 -> dəyər1, sütun2 -> 2, sütun3 -> 3), Xəritə (sütun1 -> 1, sütun2 -> dəyər2, sütun3 -> 3))

Gördüyünüz kimi, əvvəlki vəziyyətdə üçüncü sütun sadəcə atılır və sonuncu çıxışdakı birinci cərgədəki əlavə dəyər. Bu etibarlı bir yanaşmadır, lakin bəzən sahələrin sayı hər bir satır üçün ardıcıl deyilsə, oxumağı dayandırmaq lazımdır.

Bu problemi xüsusi bir axını tətbiq etməklə necə həll edə biləcəyik, növü olacaq: Axın [Siyahı [ByteString], Ya da [WrongNumberOfFields, Xəritə [String, String]], NotUSed]. Bu tip nə deməkdir ki, biz bir siyahı [ByteString] (yəni CSV-də olan bütün sətirlərin siyahısı) daxil edən və ya da [WrongNumberOfFields, Map [String, String]] daxil edən bir funksiyamız var. WrongNumberOfFields, sahələrin sayı cərgələrdə uyğunsuz olduqda bir nümunə qaldırmaq istədiyimiz xüsusi bir növüdür. Xəritə [String, String] digər tərəfdən açarları CSV başlığı olan və dəyərləri verilən cərgədəki dəyərlər olan xəritəni təmsil edir. Qısacası, Bu Axın satırların siyahısını alır və hər sətir üçün bir xəritə və ya səhv barədə bəzi məlumatlar istehsal edir.

Koda nəzər salaq:

prefixAndTail (1) siyahını iki hissəyə bölmək üçün ilk addım olaraq istifadə olunur: başlıqlar və bütün satırlar. Hər bir giriş elementini birləşmə yolu ilə düzəldiləcək çıxış elementlərinin mənbəyinə çevirmək üçün bizə Seq [Siyahı [ByteString]] verir. Qısayol-sintaksis kassasından (hs, satır) istifadə edərək Seq başlıqlarını və satırlarını çıxarırıq. Doğrulama məntiqinə keçməzdən əvvəl başqa bir addım lazımdır: WrongNumberOfFields da xətanın baş verdiyi xəttə ehtiyac duyur, ona görə də satırlardakı zipWithIndex-i işə salmalıyıq. Bu bizə (Siyahı [ByteString], Uzun) verir, burada başlamağın ilk elementi bir sıra, ikincisi isə fayldakı sıra nömrəsidir (indekslər 0-dan başlayan kimi). Bu iki xətt çirkli işləri olduqca çox edir. Funksiyanın qalan hissəsi sadəcə kodu daha oxunaqlı etmək üçün bir neçə ovuc yerli dəyişənləri təqdim edir və gözlənilən və faktiki sahələrin sayının bərabər olduğunu yoxlayır (hər sətir üçün) (Sətir 11). Əgər belədirsə, hər ikisini bükdükdən sonra başlıqlar və sətirləri kəsərək bir xəritə [String, String] yaradırıq. Digər tərəfdən, satırda gözləniləndən daha az və ya daha çox sahə varsa, WrongNumberOfFields nümunəsini geri qaytarırıq.

Gəlin bu Çayın yuxarıdakı üç CSV ilə necə davrandığını görək:

  • etibarlı biri ilə (Xəritəni əhatə edən əlavə sağa diqqət yetirin):
Vektor (Sağ (Xəritə (sütun1 -> dəyər1, sütun2 -> 2, sütun3 -> 3)), sağ (Xəritə (sütun1 -> 1, sütun2 -> dəyər2, sütun3 -> 3)))
  • csvFewer ilə, məzmunda 1 satırda (başlıqlar sətri sayılmır) xətanı siqnal edən Sol (WrongNumberOfFields (1,3,2)) qeydinə diqqət yetirin, burada 3 sahənin gözlənildiyi, ancaq cəmi 2-nin tapıldığı):
Vektor (Sol (SəhvNumberOfFields (1,3,2)), Sağ (Xəritə (sütun1 -> 1, sütun2 -> dəyər2, sütun3 -> 3)))
  • csvMore ilə (əvvəlki kimi, lakin indi 4 sahə tapıldı):
Vektor (Sol (SəhvNumberOfFields (1,3,4)), Sağ (Xəritə (sütun1 -> 1, sütun2 -> dəyər2, sütun3 -> 3))

Axını dayandırmaq üçün bir istisna atmaq kimi düzgün hərəkətlər etmək hesablama sonrakı mərhələyə qədərdir.

Xülasə

Bu yazıda Alpakka CSV-nin daxili funksiyaları ilə birlikdə istifadə ediləcək xüsusi bir axını necə tətbiq edəcəyimizi gördük. Həyata keçirdiyimiz işlər sadəcə sahələrin sayının sıra boyunca ardıcıl olduğunu yoxlayır, lakin bu mərhələdə edə biləcəyimiz yoxlamalarda məhdudiyyət yoxdur.

İstinadlar

  • Alpakka CSV sənədləri: https://doc.akka.io/docs/alpakka/current/data-transformations/csv.html
  • CSV spesifikasiyası: https://tools.ietf.org/html/rfc4180