Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Pythonでパイプライン処理

Avatar for yujikawa yujikawa
November 22, 2017

 Pythonでパイプライン処理

Pythonを使ったパイプライン処理を紹介します

Avatar for yujikawa

yujikawa

November 22, 2017
Tweet

More Decks by yujikawa

Other Decks in Technology

Transcript

  1. 自己紹介 class Person(object): """ Person Class """ def __init__(self): self.name

    = "川上祐司" self.job = "エンジニア" self.twitter = "@yujikawa_py" yujikawa.py
  2. Luigiの使い方 実装編 import luigi import time class TaskA(luigi.Task): """ TaskA

    class """ def requires(self): return [] def output(self): return luigi.LocalTarget('tmp/sample1/TaskA') def run(self): time.sleep(10) with self.output().open('w') as out_file: out_file.write("") sample1.py 最初に実行するタスクを実装する。 Requiresは依存を設定する関数 Outputは依存関連ファイルを出力する関数 Runはタスクの処理を書く関数
  3. Luigiの使い方 実装編 class TaskA(luigi.Task): ・・・・・・ class TaskB(luigi.Task): """ TaskB class

    """ def requires(self): return [TaskA(), ] def output(self): return luigi.LocalTarget('tmp/sample1/TaskB') def run(self): time.sleep(10) with self.output().open('w') as out_file: out_file.write("") sample1.py 次に実行するタスクを定義する。 RequiresにはTaskAの後に動かしたいのでクラ ス名を設定する OutputはTaskBの依存ファイルを出力するに する
  4. Luigiの使い方 実行編 $ luigid Defaulting to basic logging; consider specifying

    logging_conf_file in luigi.cfg. 2017-11-20 16:37:24,026 luigi.scheduler[5689] INFO: No prior state file exists at /var/lib/luigi-server/state.pickle. Starting with empty state2017-11-20 16:37:24,031 luigi.server[5689] INFO: Scheduler starting up console まずluigidコマンドでluigiサーバを起動します。 http://localhost:8082 にアクセスするとタスク状況が確認できます。
  5. Luigiの使い方 実行編 $ python sample1.py TaskEnd -–worker 1 console --workerオプション

    プロセスの起動数の設定で複数にする ことで一気に並列処理できます 次に自分で作ったluigiプログラムを実行します。引数にクラス名 を指定する。ここで指定したクラスのところまで実行します。ま たworkerオプションをつけることでプロセス数をコントロールで きます。
  6. • 多くの機能が実装されています。例えば・・ その他便利機能 •luigi.contrib.bigquery module •luigi.contrib.bigquery_avro module •luigi.contrib.dataproc module •luigi.contrib.ecs

    module •luigi.contrib.esindex module •luigi.contrib.external_program module •luigi.contrib.ftp module •luigi.contrib.gcp module •luigi.contrib.gcs module •luigi.contrib.hadoop module •luigi.contrib.hadoop_jar module •luigi.contrib.hive module •luigi.contrib.kubernetes module •luigi.contrib.mongodb module •luigi.contrib.mrrunner module •luigi.contrib.mssqldb module •luigi.contrib.mysqldb module •luigi.contrib.opener module •luigi.contrib.pig module •luigi.contrib.postgres module •luigi.contrib.pyspark_runner module •luigi.contrib.rdbms module •luigi.contrib.redis_store module •luigi.contrib.redshift module やったことはないですが、依存関係の管理をredisでできる模様
  7. その他便利機能 •luigi.contrib.s3 module •luigi.contrib.salesforce module •luigi.contrib.scalding module •luigi.contrib.sge module •luigi.contrib.sge_runner

    module •luigi.contrib.simulate module •luigi.contrib.spark module •luigi.contrib.sparkey module •luigi.contrib.sqla module •luigi.contrib.ssh module •luigi.contrib.target module •luigi.contrib.webhdfs module https://luigi.readthedocs.io/en/stable/api/luigi.contrib.html • まだまだあります。
  8. その他便利機能 from luigi.contrib.ssh import RemoteFileSystem ssh = RemoteFileSystem(host=host, port=port, username=username,

    key_file=key_file) ssh.put(local_path, remote_dir) sample.py • 例えばsshモジュールは下記のような形で簡単にput処理ができる