Upgrade to Pro — share decks privately, control downloads, hide ads and more …

My summer internship result at Treasure Data 20...

My summer internship result at Treasure Data 2018 #td_intern

Kazuhiro Serizawa

September 21, 2018
Tweet

More Decks by Kazuhiro Serizawa

Other Decks in Programming

Transcript

  1. About me • Kazuhiro Serizawa • Graduated school student in

    the 1st year at master’s course • Department of Computer Science, Graduate School of Systems and Information Engineering,
 University of Tsukuba • Studying implementation of Deep Learning system on HPC
  2. How did I worked ? • I have worked as

    a Software Engineer with @muga san and @mitsu san • I have worked at 2 places • 1st half of the duration ( August 13th - 31st / September 25th-28th ) • I have worked at Tokyo office • 2nd half of the duration ( September 4th - 21st ) • I have worked at Mountain View office
  3. What did I do in this internship ? • 1.

    Developing some new features on Digdag • td_result_export> (tentative name) • param_get> / param_set> • Small enhancements on Digdag • 2. Some researches about Kubernetes for Digdag new features
  4. What did I do in this internship ? • 1.

    Developing some new features on Digdag • td_result_export> (tentative name) • param_get> / param_set> • Small enhancements on Digdag • 2. Some researches about Kubernetes for Digdag new features
  5. td_result_export> (tentative name) • Abstract • “td_result_export>” is a new

    operator for Digdag • But the name is tentative, so it may be changed before release • This operator enables user to export a their TD job result to specified multiple connectors (e.g. to TD tables, customer’s data stores, and so on)
  6. td_result_export> (tentative name) • Background • Existing “td>” operator also

    can export its result to specified connector in the same task • But user CANNOT • export a previous job result after finished it • export to multiple connectors • “td_result_export>” can solve this problem _export: td: database: www_access +result_with_connection_with_settings: td>: queries/select_sql.sql result_connection: my_s3_connection result_settings: bucket: my_bucket path: /logs/
  7. td_result_export> (tentative name) _export: td: database: www_access +result_with_connection_with_settings: td>: queries/select_sql.sql

    +export_query_result_to_td_table: td_result_export>: job_id: ${td.last_job_id} # job id what user wants to export result result_url: td://${secret:td.apikey}@/${td.database}/table +export_query_result_to_mysql: td_result_export>: job_id: ${td.last_job_id} # job id what user wants to export result result_url: mysql://${secret:user}:${secret:password}@${secret:host}/database/table • Sample
  8. • Limitations • We can specify the connector only by

    url, not connector name and settings, like td operator td_result_export> (tentative name) result_connection: my_s3_connection result_settings: bucket: my_bucket path: /logs/ • But, it seems that the API endpoint of result_export(/v3/job/result_export) works with specifying connector id and connector settings ( as far as I tried ) • It seems that this is an implicit behavior, so some researches of API specification is needed
  9. td_result_export> (tentative name) • Related work • Support result_export on

    td- client-java • https://github.com/treasure-data/ td-client-java/pull/107
  10. param_set> / param_get> • Abstract • “param_set>” and “param_get>” are

    new operators on Digdag • The couple of operators enables users to set/get params to/from persistent data • This means that users do not need to write a custom scripts (by using py>, rb>, sh>) to achieve this • We support PostgreSQL and Redis as DataStore of persistent data, for now
  11. • Background • Digdag can share parameters within the same

    workflow as an existing function, like this: param_set> / param_get> _export: my_key: my_value +t1: sh>: echo ${my_key} #=> my_value • But Digdag CANNOT share them across other projects or sessions • param_set/get operators can solve this problem by using persistent data
  12. param_set> / param_get> DataStore schedule: daily>: 21:00:00 +fetch_values: param_get>: last_executed_time:

    last_executed_time +execute_job: sh>: job.sh ${last_executed_time} +save_result: param_set>: last_executed_time: ${finished_time} Key Value last_executed_time 2018/09/01 21:30:00
  13. param_set> / param_get> DataStore schedule: daily>: 21:00:00 +fetch_values: param_get>: last_executed_time:

    last_executed_time +execute_job: sh>: job.sh ${last_executed_time} +save_result: param_set>: last_executed_time: ${finished_time} Get persistent data whose key is “last_executed_time” from ParamStore, and store it 
 to ParamStore as ${last_executed_time} Key Value last_executed_time 2018/09/01 21:30:00
  14. param_set> / param_get> DataStore schedule: daily>: 21:00:00 +fetch_values: param_get>: last_executed_time:

    last_executed_time +execute_job: sh>: job.sh ${last_executed_time} +save_result: param_set>: last_executed_time: ${finished_time} Using the gotten data by next task Key Value last_executed_time 2018/09/01 21:30:00 2018/09/01 21:30:00
  15. param_set> / param_get> DataStore schedule: daily>: 21:00:00 +fetch_values: param_get>: last_executed_time:

    last_executed_time +execute_job: sh>: job.sh ${last_executed_time} +save_result: param_set>: last_executed_time: ${finished_time} Set ${finished_time} to DataStore with the key “last_executed_time” Key Value last_executed_time 2018/09/02 21:31:12 2018/09/02 21:31:12
  16. param_set> / param_get> • Supporting datastore • We support PostgreSQL

    or Redis for now • Digdag server loads Connection information of DataStore from system config file param_server.database.type=postgresql # or redis param_server.database.user=serizawa param_server.database.password=pAssw0d param_server.database.host=localhost param_server.database.database=digdag_param_database
  17. param_set> / param_get> • Limitations • The scope of persistent

    data is “site id” level • So users can share persistent data within their projects or sessions • Even with parallel task execution, both of param_set> and param_get> can work. But their behaviors are different from each other: • param_set> immediately updates persistent data, so the last result executed chronologically is the final result • param_get> sets a persistent data to ParamStore independently and parallelly, but the final state of ParamStore is the state of the task whose task id is the most biggest within parallel executed tasks
  18. param_set> / param_get> • Limitations +loop_task: for_each>: value: [value1, value2,

    value3, value4, value5] # => for_each> creates 5 tasks _parallel: true _do: param_get>: key: ${value} +show_value: sh>: echo ${value} # what string is appeared here ?
  19. param_set> / param_get> • Limitations • Currently support only “String

    like value” as a persistent data type • e.g. • Supported: `key: value` • Not supported `key: [1,2,3,4,5]` • But I implemented to enable to be added other type (“String” is used internal as default data type, for now) • So hopefully someone may implement array type support…
  20. Enhancements on Digdag • I introduced two enhancements on Digdag:

    • “num_records” parameter at TD operator • change of log-view on digdag-ui experimentally
  21. Introduce “num_records” parameter at TD operator • This feature is

    requested by @yoshiken san • After executing a query on TD platform, the number of records of the result is stored to ParamStore as ${td.last_job.num_records} • So far, user needed to use TD API directly, in order to know the number of record of job timezone: UTC +run: td>: data: SELECT 1 as a, 2 as b database: sample_datasets +show_num_records: sh>: echo ‘${td.last_job.num_records} #=> 1
  22. Enhancement of Digdag-ui log view • In the log-view of

    the latest version Digdag, all task logs in Digdag-ui is displayed in one place • So it is not easy to find a error message of failed task on log-view and distinguish each line by task • In Treasure Data workflow, some customers have over 100 tasks, so for such customers this is a serious problem
  23. Enhancement of Digdag-ui log view • To solve this problem,

    I modified log- view to split by tasks
  24. Enhancement of Digdag-ui log view • And I also added

    short-cut link to each logs at task-view • If user clicks the ID on task-view, jump to the log whose task id is clicked
  25. What did I in this internship ? • 1. Developing

    some new features on Digdag • td_result_export> (tentative name) • param_get> / param_set> • Small enhancements on Digdag • 2. Some researches about Kubernetes for Digdag new features
  26. Some researches about Kubernetes • Background • We are working

    on a new feature of Digdag, which enables to user to execute custom scripts(included in docker image) on Kubernetes containers • Why did we choose Kubernetes ? • Requirements of customer: • Need to execute some pre-processes on customer’s data with customers’ scripts before importing to TD • Requirements of TreasureData: • Need to achieve customer’s processes isolation, resource management, but Docker is not enough to achieve them • To meet these requirements, we decided to hire Kubernetes 1. Create a container and execute custom script in it 2. Confirm result and update params
  27. Some researches about Kubernetes • 2 tasks to achieve this

    feature for us: • To limit container local storage usage so as not to consume a lot of disk spaces • To limit network access from inside of container, especially access to AWS API • So I researched how to achieve it 1. Create a container and execute custom script in it 2. Confirm result and update params
  28. How to limit container local storage usage ? • Conclusion

    • We decided to limit local storage usage by specifying “ephemeral-storage” limit • This setting causes pod termination if the container uses more than 1GiB apiVersion: v1 kind: Pod metadata: name: ephemeral-strage-test spec: containers: - name: nginx image: nginx resources: requests: ephemeral-storage: "1Gi" limits: ephemeral-storage: "1Gi"
  29. How to limit container local storage usage ? • Experiment

    • I created a pod, and created a dummy file of 1.1GB inside the container • And then, the pod is terminated soon by Kubernetes root@ephemeral-strage-test:/tmp# dd if=/dev/zero of=1.1G.dummy bs=1M count=1100 
 command terminated with exit code 137 Warning Evicted 52s kubelet, ip-172-21-198-121.ec2.internal pod ephemeral local storage usage exceeds the total limit of containers {{1073741824 0} {<nil>} 1Gi BinarySI} Normal Killing 52s kubelet, ip-172-21-198-121.ec2.internal Killing container with id docker://nginx:Need to kill Pod
  30. How to limit container local storage usage ? • I

    tried 2 other options, but they did not work well in our use-case • Option1: Using EBS • Option2: Using PersistentVolume with local storage • To use these storage as a local storage, there are 2 problems we faced • We must specify EC2 instance id with pod specification. This prevent our cluster to scale out • We must delete all the files in the volumes after pod is terminated. On the other hand, ephemeral-storage is automatically cleared after pod is terminated.
  31. How to limit network access from inside of container ?

    • Conclusion • Set a rule to reject to packet to EC2 metadata API host(169.254.169.254) on iptables on EC2 instance of Kubernetes node • As a result of this rule, if some process about to access to EC2 metadata API host from container, the access is blocked
  32. How to limit network access from inside of container ?

    • Conclusion • Kubernetes has a function of “NetworkPolicy”, which enable users to control network traffic inside pod, but I could not limit the access from inside pod to outside • But, our requirements is only to permit access to EC2 metadata API, which returns some sensitive informations of EC2 instance for now • So we chosed using iptables directly
  33. How to limit network access from inside of container ?

    • How to add an iptables rule on EC2 instance of Kubernetes node? • AWS is publishing a script to make AMI image for EKS by using packer • https://github.com/awslabs/amazon-eks-ami • So we can fork this repository, and edit provisioning script to insert a rule of iptables definition
  34. Summary • I have worked on 2 themes in this

    internship: • developing some new features of Digdag • td_result_export> (tentative name) • param_get> / param_set> • Small enhancements on Digdag • researches of Kubernetes • I appreciate all Treasure Data people for helping me a lot
  35. • Digdag operator • https://github.com/treasure-data/Digdag/pull/843 • https://github.com/treasure-data/Digdag/pull/845 • Digdag ui

    • https://github.com/treasure-data/Digdag/pull/850 • Digdag introduce `num_records` • https://github.com/treasure-data/digdag/pull/870 • td-client-java • https://github.com/treasure-data/td-client-java/pull/107 • https://github.com/treasure-data/td-client-java/pull/112 • https://github.com/treasure-data/td-client-java/pull/113 • https://github.com/treasure-data/td-client-java/pull/114 Appendix 1 Pull Requests