BigQuery –匯入資料 part 3

Data Transfer and Exports

這一篇我們繼續來說明BQ的資料轉移與資料匯出,之前篇章我們提到的資料載入都是一次性的工作。但若你的資料是要定期性的匯入資料進入 BQ呢?
BQ提供了 “Data transfer Servie” ,讓你定期的自動將資料匯入到 BQ中。這一類的工作BQ提供了Web UI介面/bq command 及REST API的方式來執行這種重複性的工作。
BQ的 transfer servicet除了本身就支援Cloud Storage 之外還支援多種資料來源,例如Google Ads, Google Play, AWS Redshift, YouTube Channel。另外需要注意一點,因爲我們之前提到過BQ是Region Level的服務,Cloud storage也是。所以在設計上時會建議Cloud storage及BQ是在同一個Region(例如台灣),哪麼 data transfer service不會有任何的收費,若是跨區傳輸 data transfer將會有費用產生。在使用Data transfer servicef確認這個 Service 要有BQ的Admin權限,這樣transfer service才能把資料寫入BQ中,且這個服務沒有辦法create data或自動偵測table schema。它只是把資料寫入BQ中,所以有關寫資料以外的DDL作業都要預先完成。

若我們要從既有的table中copy 它的table schema可以用如下的SQL語法

Create or Replace table
your_date_name
as
select * from your_source_table
limit 0

以下是用 bq command 產生一個transfer job

bq mk –transfer_config –data_source=google_cloud_storage \
–target_dataset=your-dataset –display_name your-display-name \
–params={“data_path_template”:”gs://bucket-name/yourfile_*.csv” , “destination_table_name_template”:”your-des-table”, “file_format”:”CSV”, “max_bad_records”:”10″, “skip_leading_rows”:”1″, “allow_jagged_rows”:”true”

上面這個範例的source是在cloud storage,相關的參數在WEB UI上也都可以看到,在BQ中選擇”Transfers”(如下圖)

可以選擇多種來源形式(如下圖)

在定期工作這一段,BQ目前最小的單位是以“天”。也就是說transfer service每天最多傳輸一次資料寫入到BQ。會這麼做的原因是為BQ有Quota的限制,當然若你的資料有大到需要每天傳輸多次現有的Quota不夠用,哪麼可以聯絡Google增加這個Quota。另外若你需要在不同的GCP Region之間的BQ做資料傳輸,哪你的Data Source就需要選擇Dataset copy。

另外,定期載入資料當然免不了也需要定期Query的狀況會發生。例如每個月需要有月報,這時可以把SQL語法寫好把這個語法定期執行。如下圖

Query的定期排程最小可以到“小時”,

當然比較建議的方式是用Dataflow來建立你的Data pileline.這是一個全託管式的ETL服務,這一個服務支援的來源比transfer service來的豐富而且除了批次作業外,Dataflow還支援資料串流(即時)的資料匯入。
假設今天我們需要從MySQL將資料寫入BQ中,我們選擇Dataflow服務並點選create job from template.這是Dataflow提供的template.

template提供了很多種的模組可以直接使用,我們選擇 Jdbc to bigquery

填入相關的MySQL資料後就可以直接將MySQL資料匯入了

若是Dataflow的template job也滿足不了你的需求,哪麼也可以在Dataflow中使用你自己的程式例如用java/Go/Python等來做data的parsing你的資料。DataFlow使用Apache Beam API來呼叫,例如以下Python的範例

INPATTERNS = ‘gs://your-bucket-name/your-csv-file_*.csv’
RUNNER = ‘DataflowRunner’
with beam.Pipeline(RUNNER, options = opts) as p:
(p
| ‘read’ >> beam.io.ReadFromText(INPATTERNS, skip_header_lines=1)
| ‘parse_csv’ >> beam.FlatMap(parse_csv)
| ‘pull_fields’ >> beam.FlatMap(pull_fields)
| ‘write_bq’ >> beam.io.gcp.bigquery.WriteToBigQuery(bqtable, bqdataset, schema=get_output_schema())
)

上面這一段範例我們create 一個Beam的 pipeline,使用DataFlow的服務。這一段主要在呼叫 GCP啟用DataProc(就是Hadoop服務)。若是讀取資料並定義資料的patterns,請參考如下Python的範例

def parse_csv(line):
try:
values = line.split(‘ , ‘)
rowdict = {}
for colname, value in zip(COLNAMES, values):
rowdict[colname] = value
yield rowdict
except:
logging.warn(‘Ignoring line …’)parse_csvpull_fieldsINSTNMdef pull_fields(rowdict):
result = {}

以上就是關於BQ載入資料簡單的介紹。