import os 5 import sys 6 7 import boto3 8 9 sys.path.append(os.path.join(os.path.abspath(os.path.dirname(__file__)), 'vendor')) 10 from gcloud import bigquery 11 from oauth2client.service_account import ServiceAccountCredentials 12 13 BQ_CREDENTIALS = os.environ['BQ_CREDENTIALS'] 14 BQ_PROJECT = os.environ['BQ_PROJECT'] 15 BQ_DATASET = os.environ['BQ_DATASET'] 16 BQ_TABLE = os.environ['BQ_TABLE'] 17 18 def handler(event, context): 19 rows = [] 20 21 for r in event['Records']: 22 payload = r['kinesis']['data'] 23 try: 24 data = json.loads(base64.b64decode(payload)) 25 row = [] 26 for key in ['time', 'tag', 'value']: 27 if key == 'time': 28 row.append(datetime.datetime.fromtimestamp(data[key])) 29 else: 30 row.append(data[key]) 31 rows.append(tuple(row)) 32 except Exception as e: 33 print('Invalid data "{0}": {1}'.format(payload, e)) 34 pass 35 36 if len(rows) == 0: 37 return 38 39 kms = boto3.client('kms') 40 blob = base64.b64decode(BQ_CREDENTIALS) 41 dec = kms.decrypt(CiphertextBlob = blob) 42 keyfile_dict = json.loads(dec['Plaintext']) 43 credentials = ServiceAccountCredentials.from_json_keyfile_dict(keyfile_dict) 44 45 bq = bigquery.Client(credentials = credentials, project = BQ_PROJECT) 46 dataset = bq.dataset(BQ_DATASET) 47 table = dataset.table(BQ_TABLE) 48 table.reload() 49 res = table.insert_data(rows) 50 51 print(res)