如何設計出好的 data pipeline

  |   Source

Appier 擔任 Data Engineer 一職至今已滿五年,在這過程之中開發與維護 pipeline 一直是我最重要的工作項目之一。 除了開發全新設計的 pipeline 以外,許多舊有的 pipeline 也在不斷的持續改進,也有些 pipeline 由於體質過差,也不得不全面重寫。 有趣的是,許多 pipeline 在改寫或重寫過後,input 與 output 仍然與之前沒有太大差異,那麼,究竟是什麼性質造就了一條好的 pipeline?

Effectiveness

綜觀所有的特性,我認為效率是其中最重要的一項。我這裡指的並不是一條 pipeline 每秒鐘能處理多少資料,或著每天的花費是多少。 之所以需要建立 pipeline,目的就是要讓後續使用 data 能更加有效率,如果不將下游使用一併納入考慮,單純計算 pipeline 本身的效率,我認為意義不大。 事實上我認為整個 Big Data 領域在解決的問題,有很大一部份就是在針對讀取進行最佳化,NoSQL 的崛起正是絕佳的例子。

在納入上述觀點之後,pipeline 的價值就很容易計算了:原先不透過這條 pipeline 讀取資料所需的花費 減去 改成透過讀取 pipeline 產出所需的花費和 pipeline 本身的花費,就是這條 pipeline 帶來的價值。

基於 Cloud Storage 的特性,為了增加讀取的效率,可以考慮進行以下的設計:

  • 使用 Columnar Storage Format 如 Parquet
  • 若使用 Columnar Storage Format,可進一步在檔案內對指定欄位進行排序
  • 將檔案以 partitioned 形式存放
  • 適當控制每一個檔案的大小

然而就像使用 NoSQL 一樣,一個 index 不太可能滿足所有的查詢;只用一種方式存放資料,也不太可能讓所有讀取都有效率。 這時你就會需要輸出第二份資料,它可能是原先資料的另一種 partitioned 擺放方式,也可能是 pre-aggregated 之後的結果。 而實作方式它可能是一條全新獨立的 pipeline、或著是基於原先資料的 downstream pipeline,也可以讓原本的 pipeline 同時輸出多份資料。

在考慮多種輸出格式的情況下,效率的評估就變得極為困難了。如何決定每一種應用該讀取用哪一份資料?什麼時候該為了還沒被最佳化的應用多輸出一種格式?每一份輸出真的有帶來額外價值嗎? 這問題雖然困難,但所幸我們也不需要追求完美解答,只要大方向不要偏離太多即可,尤其是當需求與應用也在不斷隨時間持續改變。

Extensibility & Complexity

Pipeline 是一種需要穩定,卻又免不了根據需求持續演進的專案,而複雜的 pipeline 將會導致演進變得困難。

在設計 pipeline 初期,我總是希望 pipeline 在架構上能夠儘可能的簡單,最好是能只用幾句話就說明清楚。 既然註定了要隨需求持續演化,那一開始就要更加的避免過度設計。

而在 pipeline 上線之後,面對持續演進的需求與急迫的時程,有時也會不得不做出大幅增加複雜度的設計。 面對這樣的問題,我的方案是定期的 refactor,確保複雜度能收斂至合理的範圍。這個作法就像是 Refactor 書中提到的切換兩頂帽子,一個帽子負責功能演進,另一個負責降低複雜度。

Reliability

對於需要處理大量 input 資料的 pipeline 來說,出現異常資料幾乎是無可避免的事,尤其是當上游資料來自於公司外部。

為了避免最終產生錯誤的 output,你可能會想讓 pipeline 在遭遇錯誤時停止運作,直到狀況釐清並完成 hotfix。 但除非你能確認資料來源足夠乾淨,否則頻繁的緊急處理最終會耗光維護人員的心力,於是進入另一個選項。

這個選項當然就是惡名昭彰的 silent exception,將有問題的資料通通丟掉,從此 pipeline 再也不會 crash,大家都很開心。 直到有一天使用者回報,你才發現有大量合法的資料被視為有問題丟掉,而這個狀況已經不知道持續多久了……。

我們針對這個問題的解法是,將有問題的資料輸出到特定的位址並搭配 alert。 因為 pipeline 並沒有被中止,維護人員並不需要第一時間進行處理,疑似有問題的資料也因為集中擺放而容易分析,甚至在 hotfix 過後想重新加回這些資料時,也可以只針對這批資料進行。

Maintainability

當 pipeline 規模變得龐大,各種稀奇古怪的 bug 將變得不再那麼稀有,你可能會遇到故障的機器、各種 library 中的 bug,或著 pipeline 中自己犯的錯,也可能只是來自 input 資料本身的異常。 這聽起來就跟一般的 bug 沒什麼不同,但讓事情變得棘手的是發生的機率。 設想一下你的 pipeline 每天處理十億筆資料,不時接獲使用者反映懷疑資料有問題,你需要從上游的十億筆資料中找出對應的那筆,然後判斷錯誤是否來自資料本身。

如果錯誤來自資料本身,對應的動作可能是通知上游,和使用者說明原因,讓他們相信 pipeline 其實是可靠的,然後祈禱類似的問題不要太頻繁的再次發生。

然而如果問題來自 pipeline 本身,接下來就是殘酷的 debug 考驗。 因為 cost 考量,你不太可能有一個完整規模的 staging 環境,而較小規模的實驗很可能難以複製出問題,特別是當問題只會機率性出現時。 因此你必須將所有能夠獲取的訊息都納入考慮,嘗試從中找出蛛絲馬跡,並對所有的機制都保持懷疑。

我遇過許多讓我印象深刻的 bug,像是某知名雲端供應商,當你開出數百台機器建立 cluster 時,可能會有其中幾台在互相通訊時,會有小機率出現封包中某一個 bit 有 flipping 的情況,導致最終產生的資料數值可能不正確。 另一個是最近才剛找出來的 Spark correctness issue,會在 "GROUP BY" 之後出現重複的資料。

面對這樣的狀況,我的建議是必須要將 pipeline 設計成容易調查,例如紀錄每份 output 是在什麼時間產生、它對應的 input 分別是哪些檔案。 如果 pipeline 有中間產物,或許可以保留一段時間再刪除。如果 pipeline 會需要修改過去資料,將檔案設計為 immutable 而避免 overwrite 也可能有幫助。 這些零碎的小細節,很可能是你面對毫無頭緒的 bug 的唯一救命稻草。

Scalability

隨著資料量的增加,pipeline 免不了需要具備 scalability 的能力。 所幸,現在常見的 big data framework 都能夠處理好這件事,頂多就是需要額外注意一下 data skew 的情況。

Usability

在顧及上述所有特性之後,別忘記,pipeline 的存在目的就是讓後續資料使用能更有效率。 因此,將輸出的資料設計成容易理解、容易使用也是很重要的,簡單的例子像是將相同類型的欄位使用相同的 prefix/suffix,使用適當的型態與預設值,提供適當的文件……等等。

以上就是我認為一個好的 pipeline 需具備的所有特性,只要遵從這些原則,多半就足以讓 pipeline 物有所值。 至於體質真的有問題的 pipeline,有時就沒有辦法只靠 refactor 來改進,這時就需要更完整的 migration plan 來徹底改頭換面,不過這又是另一個故事了。