Source code for gwrappy.drive.drive

import os

from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload
from gwrappy.service import get_service
from gwrappy.utils import iterate_list
from gwrappy.errors import HttpError
from gwrappy.drive.utils import DriveResponse


[docs]class DriveUtility: def __init__(self, json_credentials_path, client_id, **kwargs): """ Initializes object for interacting with Bigquery API. :param client_secret_path: File path for client secret JSON file. Only required if credentials are invalid or unavailable. :param json_credentials_path: File path for automatically generated credentials. :param client_id: Credentials are stored as a key-value pair per client_id to facilitate multiple clients using the same credentials file. For simplicity, using one's email address is sufficient. :keyword max_retries: Argument specified with each API call to natively handle retryable errors. :type max_retries: integer :keyword chunksize: Upload/Download chunk size :type chunksize: integer """ self._service = get_service('drive', json_credentials_path=json_credentials_path, client_id=client_id, **kwargs) self._max_retries = kwargs.get('max_retries', 3) # Number of bytes to send/receive in each request. self._chunksize = kwargs.get('chunksize', 2 * 1024 * 1024)
[docs] def get_account_info(self, fields=None): """ Abstraction of about().get() method. [https://developers.google.com/drive/v3/reference/about/get] :param fields: Available properties can be found here: https://developers.google.com/drive/v3/reference/about :type fields: list or ", " delimited string :return: Dictionary object representation of About resource. """ if fields is None: fields = [ 'kind', 'storageQuota', 'user' ] if isinstance(fields, list): fields = ', '.join(fields) return self._service.about().get( fields=fields ).execute(num_retries=self._max_retries)
[docs] def list_files(self, max_results=None, **kwargs): """ Abstraction of files().list() method with inbuilt iteration functionality. [https://developers.google.com/drive/v3/reference/files/list] :param max_results: If None, all results are iterated over and returned. :type max_results: integer :keyword fields: Available properties can be found here: https://developers.google.com/drive/v3/reference/about :keyword spaces: A comma-separated list of spaces to query within the corpus. Supported values are 'drive', 'appDataFolder' and 'photos'. :keyword q: A query for filtering the file results. Reference here: https://developers.google.com/drive/v3/web/search-parameters :return: List of dictionary objects representing file resources. """ fields = kwargs.get('fields', None) if isinstance(fields, list): if 'nextPageToken' not in fields: fields.append('nextPageToken') fields = ', '.join(fields) return iterate_list( self._service.files(), 'files', max_results, self._max_retries, filter_exp=None, fields=fields, q=kwargs.get('q', None), spaces=kwargs.get('spaces', None) )
[docs] def get_file(self, file_id, fields=None): """ Get file metadata. :param file_id: Unique file id. Check on UI or by list_files(). :type file_id: string :param fields: Available properties can be found here: https://developers.google.com/drive/v3/reference/about :type fields: list or ", " delimited string :return: Dictionary object representing file resource. """ if fields is None: fields = [ 'name', 'id', 'mimeType', 'modifiedTime', 'size' ] if isinstance(fields, list): fields = ', '.join(fields) resp = self._service.files().get( fileId=file_id, fields=fields ).execute(num_retries=self._max_retries) return resp
[docs] def download_file(self, file_id, write_path, page_num=None, output_type=None): """ Downloads object. :param file_id: Unique file id. Check on UI or by list_files(). :type file_id: string :param write_path: Local path to write object to. :type write_path: string :param page_num: Only applicable to Google Sheets. Check **gid** param in URL. :type page_num: integer :param output_type: Only applicable to Google Sheets. Can be directly downloaded as list or Pandas dataframe. :type output_type: string. 'list' or 'dataframe' :returns: If Google Sheet and output_type specified: result in selected type, DriveResponse object. Else DriveResponse object. :raises: HttpError if non-retryable errors are encountered. """ drive_resp = DriveResponse('downloaded') file_metadata = self.get_file(file_id) if file_metadata['mimeType'] == 'application/vnd.google-apps.spreadsheet': assert page_num is not None download_url = 'https://docs.google.com/spreadsheets/d/%s/export?format=csv&gid=%i' % (file_id, page_num) resp, content = self._service._http.request(download_url) if resp.status == 200: if output_type is not None: assert output_type in ('dataframe', 'list') from io import BytesIO with BytesIO(content) as file_buffer: if output_type == 'list': import unicodecsv as csv drive_resp.load_resp(file_metadata, True) return list(csv.reader(file_buffer)), drive_resp elif output_type == 'dataframe': import pandas as pd drive_resp.load_resp(file_metadata, True) return pd.read_csv(file_buffer), drive_resp else: with open(write_path, 'wb') as write_file: write_file.write(content) else: raise HttpError(resp, content) else: req = self._service.files().get_media(fileId=file_id) with open(write_path, 'wb') as write_file: downloader = MediaIoBaseDownload(write_file, req) done = False while done is False: status, done = downloader.next_chunk(num_retries=self._max_retries) drive_resp.load_resp( file_metadata, is_download=True ) return drive_resp
[docs] def upload_file(self, read_path, overwrite_existing=True, **kwargs): """ Creates file if it doesn't exist, updates if it does. :param read_path: Local path of object to upload. :type read_path: string :param overwrite_existing: Safety flag, would raise ValueError if object exists and overwrite_existing=False :type overwrite_existing: boolean :param kwargs: Key-Value pairs of Request Body params. Reference here: https://developers.google.com/drive/v3/reference/files :return: DriveResponse object. """ drive_resp = DriveResponse('uploaded') file_name = os.path.basename(read_path) request_body = { 'name': kwargs['name'] if 'name' in kwargs else file_name } # check for existing file if 'name' in kwargs: q = 'name="%s"' % kwargs['name'] else: q = 'name="%s"' % file_name if 'parents' in kwargs: assert isinstance(kwargs['parents'], str) q += ' and "%s" in parents' % kwargs['parents'] existing_files = self.list_files(q=q) assert len(existing_files) <= 1, 'More than one file matches %s' % file_name media = MediaFileUpload(read_path, chunksize=self._chunksize, resumable=True) if len(existing_files) == 0: if 'parents' in kwargs: request_body['parents'] = [kwargs['parents']] resp = self._service.files().create( media_body=media, body=request_body, fields='id, name, size, modifiedTime, parents' ).execute(num_retries=self._max_retries) elif overwrite_existing: resp = self._service.files().update( fileId=existing_files[0]['id'], media_body=media, body=request_body, fields='id, name, size, modifiedTime, parents' ).execute(num_retries=self._max_retries) else: raise ValueError('Existing file found, set overwrite=True to overwrite file') drive_resp.load_resp( resp, is_download=False ) return drive_resp