はじめに


Google Cloud Platform Advent Calendar 2019: 19日目の記事です

この記事では比較的新しい分析組織という中で実装した,GCPサービスでコストを抑えながら重いバッチ処理をスケジュールで回す仕組みについて記します.
インフラ環境が整っていなかったり,予算の都合などから金額を気にしている方には今の所オススメの方法です.
これよりオススメの方法があったら教えてくださると幸いです.

対象ではない人


以下の人にはこの記事をオススメしません.

  • インフラ・サーバーエンジニアがインフラ運用してくれるチーム
  • デプロイまでの綺麗なインフラ環境を目指している人
  • Cloudの料金を気にしなくて良い人
  • Cloud Functionsで事足りる人
  • そもそもGCPやアーキテクチャ周り詳しい人

構築した設計


アーキテクチャ図

今回は一番簡単に実装できそうだったGCPのチュートリアルを参考に上記の設計でバッチを組みました.

サービス 役割
Cloud Scheduler cron形式でスケジュールの発火.
Cloud Pub/Sub メッセージ送信に用いる. httpを使わない理由は先ほど貼ったチュートリアルのリンクを参照.
Cloud Functions ComputeEngineのインスタンス起動スクリプトを実行.
Compute Engine バッチ処理.
BigQuery データ連携

他に考えた構成


とりあえず以下の構成を考えました.

  • CloudFunctions

    • メモリ上限が2GでGCSをうまく利用しても綺麗に作るのは難しそうだった.無理やり分解すれば可能だったがそのやり方はGCFとGCSの往復を何回もすることになるので可能なら避けたかった.Scheduler → GCF → BigQuery → GCS → GCF → GCS → GCF → BigQueryのようにそれぞれ一次集計・二次集計とわかりにくくなりそうだった.他のタスクで分かりやす買ったらCloudFunctionsをモノレポで管理していたりします.
  • CloudRun・KubeFlow

    • 0インスタンスからオートスケール設定をすれば安い料金で構築できるが,分析組織でGKEを使うにはまだ早いフェーズだったので後回しにした.今後組織的に余裕ができるか,バッチが乱雑するようなら本格的に取り組む予定.とりあえず一人でお試しコンテナ運用で様子見を行いフェーズに合ってきたら布教.
  • DataFlow

    • 正直今回のアーキテクチャでバッチの内容が単純ならDataFlowも合っている気がしている.しかしながらApacheBeamの学習コスト的にそれをチームに強いるのは考えものというのと,DataFlowではシステム領域で必要になる外部ライブラリやレコードのストリーム内での外部連携に適していないため今回の実装では見送り.(機械学習での前処理など単純なものにはピカイチ.個人的にはscalaかつApacheBeamの存在を匂わせないので scio が好き.)

上記のような理由から費用的・学習的コストを考え今のフェーズに適していると考えた構成を取り入れました.

構築


CloudScheduler


ジョブの作成から適当な名前をつけ,頻度はcron形式で発火タイミングを指定します.
CloudSchedulerからCloudFunctionsへPub/Subで呼ぶのでターゲットをPub/Subに指定し,トピックは適当に名前をつけます.
本構成にあたり,いくつか別タスクのインスタンスを起動できるようにするためにペイロードの値によってCloudFunctions側で起動するインスタンスを分けられるようにします.
そのためzoneとinstanceを指定し,2種類のインスタンスを建てたい場合は複数のSchedulerでペイロードの値を変えて切り分けました.

CloudScheduler

CloudFunctions


インスタンス起動用のスクリプトを動くようにします.
名前を適当につけ,割り当てメモリは最小の128Mで十分かと思います.
トリガーをPub/Subに設定し,トピックは先ほど指定したtest-topicを 新しいtopicを作成から 作ります.
今回はテスト的に行うのでソースコードはインラインエディタでデプロイします.
そして実行する関数を以下のソースコードだったら start_up_instance に設定し,詳細の環境変数でprojectを設定しました.
projectをまたいでインスタンスを起動することはないと思うので環境変数に設定しましたが,payloadで渡してあげても良いです.

CloudFunctions
import base64
import json
import os
import logging

from oauth2client.client import GoogleCredentials
from googleapiclient.discovery import build


def start_up_instance(event, context):
    """Triggered from a message on a Cloud Pub/Sub topic.
    Args:
         event (dict): Event payload.
         context (google.cloud.functions.Context): Metadata for the event.
    """
    message = base64.b64decode(event['data']).decode('utf-8')
    payload = json.loads(message)

    project = os.environ['project']
    instance = payload['instance']
    zone = payload['zone']

    credentials = GoogleCredentials.get_application_default()
    compute = build('compute', 'v1', credentials=credentials, cache_discovery=False)
    instance = compute.instances().get(project=project, zone=zone, instance=instance).execute()
    if instance['status'] == 'TERMINATED':
        compute.instances().start(project=project, zone=zone, instance=instance['name']).execute()

google-api-python-client==1.7.11
oauth2client==4.1.3

GCE


ここからが邪道です.
インスタンスの起動時にインスタンス内のcrontabか,GCEのstart_up_scriptにバッチ実行シェルを仕込むという方法です.
今回は前者で行いました.
実際に動かしているバッチ処理の重さは使いたい人の一日のデータ量の設定や,ログの量によって可変で,短ければ数分・長ければ1時間以上と可変になってしまっています.そのため,cronの処理終了時に少し余裕を持たせてからシャットダウンを行うようにしています.シャットダウンのコマンドはsudo権限が必要なのでsudo権限がついているユーザーでcronを作成する必要があります.

$ crontab -e # sudo権限があるユーザーで

# ↓↓↓ 記入するのはこれだけ ↓↓↓
@reboot sh /{パス}/run.sh > /{パス}/$(date '+%Y%m%d').log
#!/bin/sh
/usr/local/bin/python3 1.py; # 動かしたいバッチ
/usr/local/bin/python3 2.py; # シェルなら1,2,3と分けることも可能
/usr/local/bin/python3 3.py;

sleep 5m; # もしもバッチで処理したいデータが無かった場合,すぐにシャットダウンしてしまうので5分余裕を持たせている.
/sbin/shutdown -h now;
exit 0;

バッチの中でBigQueryやGCSを呼んでいますが今回の話とはやや逸れるので省きます.

何が良いのか


CPUやメモリ,GPUなども自由に選択できるので開発のスピードを求められる時などは有用かと思います.
しかしこれらが乱雑してしまうと管理コストが大きくなると思うのでコンテナ管理する方をお勧めします.
初期のテスト段階として検証用に使う分には問題はないかなと思っています.

終わりに


分析チームにもインフラを管理してくれるエンジニアが一人でもいると非常に助かるなと思いました.