Source code for tableauhyperapi.connection

import enum
import threading
import warnings

from pathlib import PurePath
from typing import Optional, Union, List, Mapping

from .endpoint import Endpoint
from .hyperexception import HyperException, ContextId
from .hyperserviceversion import HyperServiceVersion
from .result import Result
from .sqltype import NullableValue
from .warning import UnclosedObjectWarning
from .impl import hapi
from .impl.dll import ffi, lib
from .impl.dllutil import Error, Parameters, InteropUtil
from .impl.util import check_precondition
from . import catalog


[docs] class CreateMode(enum.Enum): """ Constants which define what happens when connecting to a database depending on whether it already exists. """ NONE = hapi.HYPER_DO_NOT_CREATE """ Do not create the database. Method will fail if database does not exist. """ CREATE = hapi.HYPER_CREATE """ Create the database. Method will fail if the database already exists. """ CREATE_IF_NOT_EXISTS = hapi.HYPER_CREATE_IF_NOT_EXISTS """ Create the database if it does not exist. """ CREATE_AND_REPLACE = hapi.HYPER_CREATE_AND_REPLACE """ Create the database. If it already exists, drop the old one first. """
[docs] class Connection: """ Connects to a Hyper server. :param endpoint: :any:`Endpoint` which specifies the Hyper instance to connect to. :param database: Optional path to the database file. :param create_mode: If database path is specified, defines what happens if the database already exists. By default it is :any:`CreateMode.NONE`. :param parameters: Optional dictionary of connection parameters to pass to Hyper. The available parameters are documented `in the Tableau Hyper documentation, chapter "Connection Settings" <https://tableau.github.io/hyper-db/docs/hyper-api/connection#connection-settings>`__. If the database is not specified, then it connects to the main database. This is useful to create and delete databases. Note that the main database gets deleted once the :any:`HyperProcess` gets closed. No methods of this class are thread-safe, except :any:`cancel()`, which can be called from a different thread. .. testsetup:: connection.__init__ import os from tableauhyperapi import * hyper = HyperProcess(Telemetry.SEND_USAGE_DATA_TO_TABLEAU, 'myapp') .. testcode:: connection.__init__ # Connect and create the database. If it already exists, replace it. with Connection(hyper.endpoint, 'mydb.hyper', CreateMode.CREATE_AND_REPLACE) as connection: schema = TableDefinition('table', [ TableDefinition.Column('text', SqlType.text()), TableDefinition.Column('int', SqlType.int()), ]) connection.catalog.create_table(schema) .. testcleanup:: connection.__init__ hyper.close() if os.path.exists('mydb.hyper'): os.remove('mydb.hyper') """ def __init__(self, endpoint: Endpoint, database: Optional[Union[str, PurePath]] = None, create_mode: Optional[CreateMode] = CreateMode.NONE, parameters: Optional[Mapping[str, str]] = None): self.__cdata = None # Reference lib for correct gc order self.__lib_ref = lib check_precondition(isinstance(endpoint, Endpoint), "'endpoint' must be an Endpoint instance") if isinstance(database, PurePath): database = str(database) if parameters and 'dbname' in parameters: if database: raise ValueError("Database name cannot be provided as a 'database' parameter in addition to setting " "'dbname' in the parameters dictionary") database = parameters['dbname'] del parameters['dbname'] self.__cdata = self.__create_connection(endpoint, database, create_mode, parameters) self.__endpoint = endpoint # Lock to serialize cancel() and close() calls. self.__cancel_lock = threading.Lock() @staticmethod def __create_connection(endpoint: Endpoint, database: Optional[str], create_mode: CreateMode, parameters: Optional[Mapping[str, str]]): native_params = Parameters.create_connection_parameters() native_params.set_value('endpoint', endpoint.connection_descriptor) if endpoint.user_agent: native_params.set_value('user_agent', endpoint.user_agent) native_params.set_value('api_language', 'Python') if database: native_params.set_value('dbname', database) if parameters: for key, value in parameters.items(): native_params.set_value(key, value) pp = ffi.new('hyper_connection_t**') Error.check(hapi.hyper_connect(native_params.cdata, pp, create_mode.value)) return ffi.gc(pp[0], hapi.hyper_disconnect) @property def _cdata(self): if self.__cdata is None: raise RuntimeError('Connection is closed') return self.__cdata @property def _endpoint(self) -> Endpoint: return self.__endpoint @property def is_open(self) -> bool: """ Returns ``True`` if the connection has not been closed yet. """ return self.__cdata is not None @property def is_ready(self) -> bool: """ Checks whether the connection is ready, i.e., it is not processing a query. An open :any:`Inserter` or :any:`Result` keeps the connection busy. """ return self.__cdata is not None and hapi.hyper_connection_is_ready(self.__cdata)
[docs] def close(self): """ Closes the connection. Note that this has no effect if there is an active result or data inserter. These need to be closed before the connection to the server will be actually dropped.""" with self.__cancel_lock: if self.__cdata is not None: ffi.release(self.__cdata) self.__cdata = None
[docs] def cancel(self): """ Cancels the current SQL command or query of this connection (if any). This method may be safely called from any thread. After this method was called, the current SQL command or query may fail with a cancellation error at any point during its execution. However, there are no guarantees if and when it will fail. """ with self.__cancel_lock: if self.__cdata is not None: try: Error.check(hapi.hyper_cancel(self.__cdata)) except HyperException: # TODO TFSID 921655: log it pass
@property def catalog(self) -> 'catalog.Catalog': """ Gets the :any:`Catalog` for this connection. """ return catalog.Catalog(self)
[docs] def execute_query(self, query, text_as_bytes=False) -> Result: """ Executes a SQL query and returns the result as a :any:`Result` object. :param query: SQL query to execute. :param text_as_bytes: optional, if ``True`` then string values read from the database will be returned as UTF-8-encoded ``bytearray`` objects. By default string values are returned as ``str`` objects. :return: A :any:`Result` instance. Use this method in a ``with`` statement to automatically close the result when done reading from it, or call its :any:`close()<Result.close>` method. No queries can be executed or tables created/opened while the result is open. """ pp_result = ffi.new('hyper_rowset_t**') Error.check(hapi.hyper_execute_query(self._cdata, InteropUtil.string_to_char_p(query), pp_result)) return Result(text_as_bytes, self, pp_result[0])
[docs] def execute_list_query(self, query, text_as_bytes=False) -> List[List[NullableValue]]: """ Executes a SQL query and returns the result as list of rows of data, each represented by a list of objects. :param query: SQL query to execute. :param text_as_bytes: optional, if ``True`` then string values read from the database will be returned as UTF-8-encoded ``bytearray`` objects. By default string values are returned as ``str`` objects. :return: A list of rows, each represented by a list of objects. See :any:`TypeTag` documentation for how database values are represented by Python objects. """ with self.execute_query(query, text_as_bytes) as result: # Note, it is tempting to return an iterable with yield, but that would make it easy to leak the # result object until it's garbage-collected (if the iteration stops in the middle). return list(result)
[docs] def execute_command(self, command) -> Optional[int]: """ Executes a SQL statement and returns the affected row count if the statement has one. :param command: SQL statement to execute. :return: Count of affected rows if available, ``None`` otherwise. """ row_count_cdata = ffi.new('int*') Error.check(hapi.hyper_execute_command(self._cdata, InteropUtil.string_to_char_p(command), row_count_cdata)) row_count = row_count_cdata[0] if row_count < 0: row_count = None return row_count
[docs] def execute_scalar_query(self, query, text_as_bytes=False) -> NullableValue: """ Executes a scalar query, i.e. a query that returns exactly one row with one column, and returns the value from the result. :param query: SQL query to execute. :param text_as_bytes: optional, if ``True`` then a string value read from the database will be returned as UTF-8-encoded ``bytearray`` objects. By default string values are returned as ``str`` objects. :return: the value from the result. A NULL database value is returned as ``None``. See :any:`TypeTag` documentation for how database values are represented by Python objects. """ with self.execute_query(query, text_as_bytes) as result: if len(result.schema.columns) != 1: raise HyperException(ContextId(0xA1B8BBEC6D), 'Query result must have exactly one column') if not result.next_row(): raise HyperException(ContextId(0xB8BBEC6DA1), 'Query returned zero rows') value = result.get_value(0) if result.next_row(): raise HyperException(ContextId(0xBEC6DA1B8B), 'Query returned more than one row') return value
[docs] def hyper_service_version(self) -> HyperServiceVersion: """ Returns the Hyper Service version of this connection :return: The Hyper Service version of this connection """ version = ffi.new('hyper_service_version_t*') Error.check(hapi.hyper_connection_get_hyper_service_version(self._cdata, version)) return HyperServiceVersion(version.major, version.minor)
[docs] def is_capability_active(self, capability_flag: str) -> bool: """ Returns true if the capability flag is active on this connection. :param capability_flag: The capability flag to check. It is prefixed with `capability_`. :return: true if the capability flag is active on this connection. """ return hapi.hyper_connection_is_capability_active(self._cdata, InteropUtil.string_to_char_p(capability_flag))
[docs] @staticmethod def query_supported_hyper_service_version_range(endpoint: Endpoint) -> List[HyperServiceVersion]: """ Connects to the Hyper endpoint and determines which Hyper Service version numbers are common between the Hyper API and the Hyper server. :param: endpoint Endpoint to connect to. :return: List of Hyper Service versions that are supported by both this Hyper API and the endpoint. """ native_params = Parameters.create_connection_parameters() native_params.set_value('endpoint', endpoint.connection_descriptor) if endpoint.user_agent: native_params.set_value('user_agent', endpoint.user_agent) native_params.set_value('api_language', 'Python') version_array_ptr = ffi.new('hyper_service_version_t**') elements_cdata = ffi.new('size_t*') Error.check(hapi.hyper_query_supported_hyper_service_version_range(native_params.cdata, version_array_ptr, elements_cdata)) elements = elements_cdata[0] if version_array_ptr[0] == ffi.NULL: return [] else: list = [] for i in range(0, elements): list.append(HyperServiceVersion(version_array_ptr[0][i].major, version_array_ptr[0][i].minor)) return list
def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def __del__(self): if self.__cdata is not None: warnings.warn('Connection has not been closed. Use Connection object in a with statement or call its ' 'close() method when done.', UnclosedObjectWarning) # it is closed by cffi, self.__cdata is a gc'ed pointer def __repr__(self): status = 'open' if self.is_ready else ('busy' if self.is_open else 'closed') return f"<Connection object at {id(self):#x}; status: {status}; endpoint: {self._endpoint!r}>"