Skip to content
Open
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 114 additions & 21 deletions src/ckan_to_bigquery.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import os
import base64
import json

from google.cloud import bigquery
from api_tracker import tables_in_query
Expand Down Expand Up @@ -52,12 +54,13 @@ def search(self, data_dict):
total = self.get_total_num_of_query_rows(fields, data_dict)
else:
total = len(results)

out = {
"include_total": include_total,
"resource_id": data_dict['resource_id'],
"fields": fields,
"records_format": "objects",
"records": results,
"records_format": "objects",
"_links": {
"start": "/api/3/action/datastore_search?resource_id="+data_dict['resource_id'],
"next": "/api/3/action/datastore_search?offset=100&resource_id="+data_dict['resource_id']
Expand Down Expand Up @@ -205,20 +208,23 @@ def search_sql_normal(self, sql):
rows_max = int(config.get('ckan.datastore.search.rows_max', 32000))
sql_initial = sql
# limit the number of results to return by rows_max
sql = 'SELECT * FROM ({0}) AS blah LIMIT {1} ;'.format(sql, rows_max+1)
sql = 'SELECT COUNT(*) AS total FROM ({0}) AS blah LIMIT {1} ;'.format(sql, rows_max+1)
query_job = self.bqclient_readonly.query(sql, job_config=self.job_config)
rows = query_job.result()
records = [dict(row) for row in rows]
total_rows = dict(next(iter(rows)))['total']
# check if results truncated ...
if len(records) == rows_max + 1:
if total_rows > rows_max:
return self.bulk_export(sql_initial)
else:
# do normal
sql = 'SELECT * FROM ({0}) AS blah LIMIT {1} ;'.format(sql_initial, rows_max+1)
query_job = self.bqclient_readonly.query(sql, job_config=self.job_config)
rows = query_job.result()
records = [dict(row) for row in rows]
return {
"help":"https://demo.ckan.org/api/3/action/help_show?name=datastore_search_sql",
"success": "true",
"result":{
"records": records,
"result": {
"records": {},
"fields": []
}
}
Expand All @@ -234,23 +240,110 @@ def search_sql(self, data_dict):
else:
log.warning("do standard search_sql")
return self.search_sql_normal(data_dict['sql'])

def _get_table_last_modified_time(self, table_id):
'''
Get the last modified time of a table
'''
query = '''
SELECT
TIMESTAMP_MILLIS(last_modified_time) AS last_modified_time
FROM `{0}.{1}.__TABLES__`
WHERE table_id = '{2}' ;
'''.format(self.project_id, self.dataset, table_id)
query_job = self.bqclient.query(query, job_config=self.job_config)
rows = query_job.result()
last_modified_time = dict(next(iter(rows)))['last_modified_time']
return last_modified_time

def _get_query_history(self, table_id, query, modified_time):
'''
Look up query history, return result if already exists
'''
query = """
SELECT * FROM `{0}.{1}._query_history_lookup`
WHERE table_id = "{2}"
AND query = "{3}"
AND modified_time = "{4}"
""".format(
self.project_id, self.dataset, table_id, query, modified_time
)
query_job = self.bqclient.query(query, job_config=self.job_config)
rows = query_job.result()
records = [dict(row) for row in rows]
return records

def _insert_query_into_history(self, table_id, query, modified_time, result):
query = '''
INSERT INTO `{0}.{1}._query_history_lookup` (query, modified_time, created_at, table_id, result)
VALUES ('{2}', '{3}', CURRENT_TIMESTAMP(), '{4}', '{5}')
'''.format(self.project_id, self.dataset, query, modified_time, table_id, result)
query_job = self.bqclient.query(query, job_config=self.job_config)
query_job.result()
return query_job

def _query_history_lookup(self, sql_initial):
'''
Get query history if available
'''

table_ref = self.bqclient.dataset(self.dataset).table('_query_history_lookup')
table = bigquery.Table(table_ref)
table.schema = [
bigquery.SchemaField('query', 'STRING', mode='REQUIRED'),
bigquery.SchemaField('modified_time', 'TIMESTAMP'),
bigquery.SchemaField('created_at', 'TIMESTAMP'),
bigquery.SchemaField('table_id', 'STRING'),
bigquery.SchemaField('result', 'STRING'),
]
table = self.bqclient.create_table(table, exists_ok=True)

# Get the table id
table_id = tables_in_query(sql_initial).replace('`', '')

# Convert the sql to base64
encoded_query = base64.b64encode(sql_initial)
table_modified_time = self._get_table_last_modified_time(table_id)
query_history = self._get_query_history(table_id, encoded_query, table_modified_time)
return query_history, encoded_query, table_modified_time

def _get_gcs_url(self, sql_initial):
sql_query_job = self.bqclient.query(sql_initial, job_config=self.job_config)
# get temp table containing query result
destination_table = sql_query_job.destination
log.warning(u"destination table: {}".format(destination_table))
destination_urls = self.extract_query_to_gcs(destination_table, sql_initial)

# insert query into history for future use
table_id = tables_in_query(sql_initial).replace('`', '')
return destination_urls, table_id


def bulk_export(self, sql_initial):
result = {
"help": "https://demo.ckan.org/api/3/action/help_show?name=datastore_search_sql",
"success": "true",
"records_truncated": "true",
"gc_urls": []
}
try:
sql_query_job = self.bqclient.query(sql_initial, job_config=self.job_config)
# get temp table containing query result
destination_table = sql_query_job.destination
log.warning("destination table: {}".format(destination_table))
destination_urls = self.extract_query_to_gcs(destination_table, sql_initial)
log.warning("extract job result: {}".format(destination_urls))
return {
"help":"https://demo.ckan.org/api/3/action/help_show?name=datastore_search_sql",
"success": "true",
"records_truncated": "true",
"gc_urls": destination_urls
}
except Exception as ex:
log.error("Error: {}".format(str(ex)))
query_history, encoded_query, table_modified_time = self._query_history_lookup(sql_initial)
if query_history:
log.warning("History Exist: returning datastore query result from history")
result['gc_urls'] = json.loads(query_history[0]['result'])
else:
destination_urls, table_id = self._get_gcs_url(sql_initial)
self._insert_query_into_history(table_id, encoded_query, table_modified_time, json.dumps(destination_urls))
result['gc_urls'] = destination_urls
except Exception as e:
log.error("An error occurred while looking up query history or inserting into history: {}".format(e))
try:
destination_urls, table_id = self._get_gcs_url(sql_initial)
result['gc_urls'] = destination_urls
except Exception as e:
log.error("An error occurred while getting GCS URL: {}".format(e))
return result


@retry.Retry(predicate=if_exception_type(exceptions.NotFound))
def extract_query_to_gcs(self, table_ref, sql):
Expand Down