Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 118 additions & 26 deletions src/ckan_to_bigquery.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import os, sys
import base64
import json

from google.cloud import bigquery
from api_tracker import tables_in_query
Expand All @@ -16,6 +18,7 @@
import ckan.plugins.toolkit as tk
import logging
import datetime

from user_agents import parse

import requests
Expand Down Expand Up @@ -365,7 +368,73 @@ def search_sql_normal(self, sql):
"fields": []
}
}

def _get_table_last_modified_time(self, table_id):
'''
Get the last modified time of a table
'''
query = '''
SELECT
TIMESTAMP_MILLIS(last_modified_time) AS last_modified_time
FROM `{0}.{1}.__TABLES__`
WHERE table_id = '{2}' ;
'''.format(self.project_id, self.dataset, table_id)
query_job = self.bqclient.query(query, job_config=self.job_config)
rows = query_job.result()
last_modified_time = dict(next(iter(rows)))['last_modified_time']
return last_modified_time

def _get_query_history(self, table_id, query, modified_time):
'''
Look up query history, return result if already exists
'''
query = """
SELECT * FROM `{0}.{1}._query_history_lookup`
WHERE table_id = "{2}"
AND query = "{3}"
AND modified_time = "{4}"
""".format(
self.project_id, self.dataset, table_id, query, modified_time
)
query_job = self.bqclient.query(query, job_config=self.job_config)
rows = query_job.result()
records = [dict(row) for row in rows]
return records

def _insert_query_into_history(self, table_id, query, modified_time, result):
query = '''
INSERT INTO `{0}.{1}._query_history_lookup` (query, modified_time, created_at, table_id, result)
VALUES ('{2}', '{3}', CURRENT_TIMESTAMP(), '{4}', '{5}')
'''.format(self.project_id, self.dataset, query, modified_time, table_id, result)
query_job = self.bqclient.query(query, job_config=self.job_config)
query_job.result()
return query_job

def _query_history_lookup(self, sql_initial):
'''
Get query history if available
'''

table_ref = self.bqclient.dataset(self.dataset).table('_query_history_lookup')
table = bigquery.Table(table_ref)
table.schema = [
bigquery.SchemaField('query', 'STRING', mode='REQUIRED'),
bigquery.SchemaField('modified_time', 'TIMESTAMP'),
bigquery.SchemaField('created_at', 'TIMESTAMP'),
bigquery.SchemaField('table_id', 'STRING'),
bigquery.SchemaField('result', 'STRING'),
]
table = self.bqclient.create_table(table, exists_ok=True)

# Get the table id
table_id = tables_in_query(sql_initial).replace('`', '')

# Convert the sql to base64
encoded_query = base64.b64encode(sql_initial)
table_modified_time = self._get_table_last_modified_time(table_id)
query_history = self._get_query_history(table_id, encoded_query, table_modified_time)
return query_history, encoded_query, table_modified_time

def search_sql(self, data_dict):
# default is_bulk export value
is_bulk = False
Expand Down Expand Up @@ -409,33 +478,56 @@ def get_destination_table(self, destination_table):
log.warning("Fetching destination table: {}".format(destination_table))
return self.bqclient.get_table(destination_table)


def _get_gcs_url(self, sql_initial):
self.log_data['query'] = sql_initial
sql_query_job = self.bqclient.query(sql_initial, job_config=self.job_config)
# get temp table containing query result
destination_table = sql_query_job.destination
egress = self.get_destination_table(destination_table)
egress = egress.num_bytes
self.log_data['bigquery_egress'] = egress
self.log_data['storage_egress'] = egress
log.warning("destination table: {}".format(destination_table))
destination_urls = self.extract_query_to_gcs(destination_table, sql_initial)
log.warning("extract job result: {}".format(destination_urls))
self.log_data['bigquery_job_id'] = sql_query_job.job_id
self.log_data['job_details'] = sql_query_job._properties.get('statistics')
self.log_data['api_call_type'] = "dataexplorer-bulk-download"
if not self.checkUserAgent():
self.log_data['api_call_type'] = 'direct-api'
self.create_egress_log()

table_id = tables_in_query(sql_initial).replace('`', '')
return destination_urls, table_id



def bulk_export(self, sql_initial):
result = {
"help": "https://demo.ckan.org/api/3/action/help_show?name=datastore_search_sql",
"success": "true",
"records_truncated": "true",
"gc_urls": []
}
try:
self.log_data['query'] = sql_initial
sql_query_job = self.bqclient.query(sql_initial, job_config=self.job_config)
# get temp table containing query result
destination_table = sql_query_job.destination
egress = self.get_destination_table(destination_table)
egress = egress.num_bytes
self.log_data['bigquery_egress'] = egress
self.log_data['storage_egress'] = egress
log.warning("destination table: {}".format(destination_table))
destination_urls = self.extract_query_to_gcs(destination_table, sql_initial)
log.warning("extract job result: {}".format(destination_urls))
self.log_data['bigquery_job_id'] = sql_query_job.job_id
self.log_data['job_details'] = sql_query_job._properties.get('statistics')
self.log_data['api_call_type'] = "dataexplorer-bulk-download"
if not self.checkUserAgent():
self.log_data['api_call_type'] = 'direct-api'
self.create_egress_log()
return {
"help":"https://demo.ckan.org/api/3/action/help_show?name=datastore_search_sql",
"success": "true",
"records_truncated": "true",
"gc_urls": destination_urls
}
except Exception as ex:
log.error("Error: {}".format(str(ex)))
query_history, encoded_query, table_modified_time = self._query_history_lookup(sql_initial)
if query_history:
log.warning("History Exist: returning datastore query result from history")
result['gc_urls'] = json.loads(query_history[0]['result'])
else:
destination_urls, table_id = self._get_gcs_url(sql_initial)
self._insert_query_into_history(table_id, encoded_query, table_modified_time, json.dumps(destination_urls))
result['gc_urls'] = destination_urls
except Exception as e:
log.error("An error occurred while looking up query history or inserting into history: {}".format(e))
try:
destination_urls, table_id = self._get_gcs_url(sql_initial)
result['gc_urls'] = destination_urls
except Exception as e:
log.error("An error occurred while getting GCS URL: {}".format(e))
return result


@retry.Retry(predicate=if_exception_type(exceptions.NotFound))
def extract_query_to_gcs(self, table_ref, sql):
Expand Down Expand Up @@ -639,4 +731,4 @@ def table_schema(self, table):
rows = query_job.result()
schema = rows.schema
return schema
'''
'''