Source code for tableauhyperapi.inserter

import datetime
import decimal
import warnings

from typing import Iterable, Union, Any, Optional

from .connection import Connection
from .date import Date
from .interval import Interval
from .sqltype import TypeTag
from .tablename import Name, TableName
from .tabledefinition import TableDefinition, Nullability
from .timestamp import Timestamp
from .warning import UnclosedObjectWarning
from .impl import hapi
from .impl.converter import Converter
from .impl.dll import ffi, lib
from .impl.dllutil import Error, InteropUtil
from .impl.schemaconverter import SchemaConverter
from .impl.util import check_precondition


[docs] class Inserter: """ An object which is used to insert data into a table. :param connection: the connection to use. :param arg: either the table name or the definition of the table to insert data into. :param columns: The set of columns in which to insert. The columns have to exist in the table. When not None, Columns not present in columns will be set to their default value. columns is a Iterable of either string or Inserter.ColumnMapping A columnMapping can optionally contain a valid SQL expression. The SQL expression specified for the column is used to transform or compute values on the fly during insertion. The SQL expression can depend on columns that exist in the table. The SQL expression can also depend on values or columns that do not exist in the table, but must be specified in the inserter_dfinition. :param inserter_definition: The definition of columns to which values are provided. inserter_definition is a keyword-only parameter required only when columns is not None The column definition for all the columns without SQL expression must be specified in inserter_definition. For a column without SQL expression, the column definition provided in inserter_definition must match the actual definition of the column in the table. .. note:: SQL expression provided during insertion are used without any modification during insertion and hence vulnerable to SQL injection attacks. Applications should prevent end-users from providing expressions directly during insertion Sample usage: .. testsetup:: Inserter.__init__ from tableauhyperapi import * def setup(): hyper = HyperProcess(Telemetry.SEND_USAGE_DATA_TO_TABLEAU, 'myapp') connection = Connection(hyper.endpoint, 'mydb.hyper', CreateMode.CREATE_AND_REPLACE) table_def = TableDefinition('foo', [ TableDefinition.Column('text', SqlType.text()), TableDefinition.Column('int', SqlType.int()), ]) connection.catalog.create_table(table_def) return hyper, connection hyper, connection = setup() .. testcode:: Inserter.__init__ with Inserter(connection, 'foo') as inserter: inserter.add_row(['a', 1]) inserter.add_row(['b', 2]) inserter.execute() .. testcleanup:: Inserter.__init__ connection.close() hyper.close() import os os.remove('mydb.hyper') """
[docs] class ColumnMapping: """ An object which defines how a target column is mapped to inserter definition. :param column_name: the column name. :param expression: the SQL expression for computing the column If not set, the data for this column must be provided using add_row()/add_rows() calls during insertion """ def __init__(self, column_name: Union[Name, str], expression: Optional[str] = None): self.__column_name = Name(column_name) self.__expression = expression @property def column_name(self) -> Name: """ The name of the column. """ return self.__column_name @property def expression(self) -> Optional[str]: """ The SQL expression used to compute the column. """ return self.__expression def _as_select_list_expression(self): if self.__expression is None: return str(self.__column_name) else: return f'{self.__expression} AS {str(self.__column_name)}'
def __init__(self, connection: Connection, arg: Union[str, Name, TableName, TableDefinition], columns: Iterable[Union[str, ColumnMapping]] = None, *, inserter_definition: Iterable[TableDefinition.Column] = None): self._cdata = None self._buffer = None self._native_stream_definition = None # Reference lib for correct gc order self.__lib_ref = lib check_precondition(isinstance(arg, TableDefinition) or isinstance(arg, str) or isinstance(arg, Name) or isinstance(arg, TableName), 'Argument must be a table name or a table definition') check_precondition(inserter_definition is None or columns is not None, 'Inserter Defintion is required only when columns are provided') # table_def: Table Definition containing only the colums into which values are inserted # stream_def: Stream Definition used by the bulk inserter # self._select_str: comma separated list of column retrieved by the select statement during bulk insert # Usage - `INSERT BULK INTO table (columns) SELECT self._select_str FROM EXTERNAL STREAM stream` # uses_expr: boolean to indicate if expressions are used in insertion table_def, stream_def, self._select_str, uses_expr = Inserter.__parse_from_arguments(connection, arg, columns, inserter_definition) native_table_definition = SchemaConverter.table_definition_to_native(table_def) pp_inserter = ffi.new('hyper_inserter_t**') Error.check(hapi.hyper_create_inserter(connection._cdata, native_table_definition.cdata, pp_inserter)) self._cdata = pp_inserter[0] self._native_table_definition = native_table_definition # Decimal context used for 128-bit numeric transformations traps = [decimal.Overflow, decimal.DivisionByZero, decimal.InvalidOperation, decimal.Inexact] self._big_numeric_decimal_context = decimal.Context(prec=38, traps=traps) # Stream definition is none, if all columns are computed by calculations if stream_def is not None: native_select_str = InteropUtil.string_to_char_p(self._select_str) native_stream_definition = SchemaConverter.table_definition_to_native(stream_def) # Initialize bulk insert/stream inserter when expressions are used if uses_expr is True: Error.check(hapi.hyper_init_bulk_insert(pp_inserter[0], native_stream_definition.cdata, native_select_str)) pp_buffer = ffi.new('hyper_inserter_buffer_t**') Error.check(hapi.hyper_create_inserter_buffer(pp_inserter[0], native_stream_definition.cdata, native_select_str, pp_buffer)) self._buffer = ffi.gc(pp_buffer[0], hapi.hyper_inserter_buffer_destroy) self.__inline_things(stream_def) @staticmethod def __parse_from_arguments(connection: Connection, arg: Union[str, Name, TableName, TableDefinition], columns: Iterable[Union[str, ColumnMapping]], inserter_definition: Iterable[TableDefinition.Column]): if isinstance(arg, TableDefinition): table_definition = arg._clone() else: table_definition = connection.catalog.get_table_definition(arg) # We identify the usage of expressions by iterating through `columns` argument # The column in `columns` must be a target column and of type `str` or `ColumnMapping` # If expressions are used during Insertion: # The stream definition is different from the target table definition # The inserter definition is used to build the stream definition # For a column that doesn't use expression, # the column definition in the inserter definition and target table definition must be the same # If all columns use expressions the stream definition/inserter defintion is empty. # For ex. # Target_table: [Column("A", SqlType.int()), Column("B", SqlType.int())] # columns: [ColumnMapping["A", 1], ColumnMapping["B", 2]] # # The values are inserted when `execute()` method is called uses_expression = False # List of non-expression column names. # The non expression columns are tracked to ensure that it is specified in the inserter definition non_expression_columns = [] select_list = [] if columns is not None: # User is only inserting into a subset of columns target_columns_list = [] for column in columns: if isinstance(column, str): column = Inserter.ColumnMapping(column) elif not isinstance(column, Inserter.ColumnMapping): raise TypeError("Values in 'columns' argument must be a 'ColumnMapping' or 'str'") target_column = table_definition.get_column_by_name(column.column_name) if target_column is None: raise ValueError(f'Column {column.column_name} does not exist' f' in table {table_definition.table_name.name}') if column.expression is None: non_expression_columns.append(target_column) else: uses_expression = True select_list.append(column._as_select_list_expression()) target_columns_list.append(target_column) table_definition = TableDefinition(table_definition.table_name, target_columns_list, table_definition.persistence) # If expressions are used, build a stream definition with the help of inserter_definition if uses_expression is True: # Verify non-expression columns are specified in the inserter definition only once # and the column defintion matches in the table definition if len(non_expression_columns) != 0 and inserter_definition is None: raise ValueError('Columns without expressions must be specified in inserter definition') for non_expression_col in non_expression_columns: matching_columns = list(filter(lambda inserter_column: inserter_column.name == non_expression_col.name, inserter_definition)) if len(matching_columns) == 0: raise ValueError(f'Column {str(non_expression_col.name)} must be specified ' f'as a ColumnMapping in columns or in inserter definition') elif len(matching_columns) != 1: raise ValueError(f'Column {str(non_expression_col.name)} specified more ' f'than once in inserter definition') else: inserter_column = matching_columns[0] if (inserter_column.type != non_expression_col.type or inserter_column.nullability != non_expression_col.nullability): raise ValueError(f'Column definition for {str(non_expression_col.name)} ' f'does not match definition provided in inserter definition') if inserter_definition is not None: stream_definition = TableDefinition(table_definition.table_name, inserter_definition, table_definition.persistence) else: stream_definition = None else: stream_definition = table_definition # Build the select list for C-API if len(select_list) != 0: select_list_string = ', '.join(select_list) else: select_list_string = ', '.join(str(column.name) for column in stream_definition.columns) return table_definition, stream_definition, select_list_string, uses_expression def __inline_things(self, table_definition: Optional[TableDefinition]): # noinspection DuplicatedCode insert_functions = { TypeTag.BOOL: self.__write_bool, TypeTag.BIG_INT: self.__write_big_int, TypeTag.SMALL_INT: self.__write_small_int, TypeTag.INT: self.__write_int, TypeTag.DOUBLE: self.__write_double, TypeTag.OID: self.__write_uint, TypeTag.BYTES: self.__write_bytes, TypeTag.TEXT: self.__write_text, TypeTag.VARCHAR: self.__write_text, TypeTag.CHAR: self.__write_text, TypeTag.JSON: self.__write_text, TypeTag.DATE: self.__write_date, TypeTag.INTERVAL: self.__write_interval, TypeTag.TIME: self.__write_time, TypeTag.TIMESTAMP: self.__write_timestamp, TypeTag.TIMESTAMP_TZ: self.__write_timestamp_tz, TypeTag.GEOGRAPHY: self.__write_bytes, } validation_functions = { TypeTag.BOOL: Inserter.__validate_bool, TypeTag.BIG_INT: Inserter.__validate_big_int, TypeTag.SMALL_INT: Inserter.__validate_small_int, TypeTag.INT: Inserter.__validate_int, TypeTag.DOUBLE: Inserter.__validate_double, TypeTag.OID: Inserter.__validate_oid, TypeTag.BYTES: Inserter.__validate_bytes, TypeTag.TEXT: Inserter.__validate_text, TypeTag.VARCHAR: Inserter.__validate_varchar, TypeTag.CHAR: Inserter.__validate_char, TypeTag.JSON: Inserter.__validate_json, TypeTag.DATE: Inserter.__validate_date, TypeTag.INTERVAL: Inserter.__validate_interval, TypeTag.TIME: Inserter.__validate_time, TypeTag.TIMESTAMP: Inserter.__validate_timestamp, TypeTag.TIMESTAMP_TZ: Inserter.__validate_timestamp_tz, TypeTag.GEOGRAPHY: Inserter.__validate_geography, TypeTag.NUMERIC: Inserter.__validate_numeric, } self._column_count = len(table_definition.columns) if table_definition else 0 self._range_column_count = range(self._column_count) self._columns = list(table_definition.columns) if table_definition else [] self._insert_null_functions = [] self._insert_value_functions = [] self._validation_functions = [] for col in self._columns: is_nullable = col.nullability == Nullability.NULLABLE if col.type.tag == TypeTag.NUMERIC: insert_value = self.__get_write_decimal_function(col) else: insert_value = insert_functions[col.type.tag] self._insert_value_functions.append(insert_value) self._validation_functions.append(validation_functions[col.type.tag]) if is_nullable: self._insert_null_functions.append(self.__write_null) else: self._insert_null_functions.append( lambda column_name=col.name.unescaped: self.__raise_value_is_null(column_name)) self._hyper_add_null = hapi.hyper_inserter_buffer_add_null self._hyper_add_bool = hapi.hyper_inserter_buffer_add_bool self._hyper_add_int16 = hapi.hyper_inserter_buffer_add_int16 self._hyper_add_int32 = hapi.hyper_inserter_buffer_add_int32 self._hyper_add_int64 = hapi.hyper_inserter_buffer_add_int64 self._hyper_add_double = hapi.hyper_inserter_buffer_add_double self._hyper_add_binary = hapi.hyper_inserter_buffer_add_binary self._hyper_add_date = hapi.hyper_inserter_buffer_add_date self._hyper_add_raw = hapi.hyper_inserter_buffer_add_raw self._ffi_from_buffer = ffi.from_buffer self._py_interval = ffi.new('py_interval*') self._py_interval_buf = ffi.cast('const uint8_t*', self._py_interval) def __get_write_decimal_function(self, column: TableDefinition.Column): if column.type.precision <= 18: # 64-bit numerics write_func = self.__write_big_int scale = decimal.Decimal(10) ** column.type.scale def write_decimal(v: decimal.Decimal): return write_func((v * scale).to_integral_value()) return write_decimal else: # 128-bit numerics def write_decimal(v: decimal.Decimal): i = int(v.scaleb(column.type.scale, context=self._big_numeric_decimal_context)) num128 = i.to_bytes(16, byteorder='little', signed=True) return Error.check(self._hyper_add_raw(self._buffer, self._ffi_from_buffer(num128), len(num128))) return write_decimal def __raise_value_is_null(self, column_name): raise ValueError(f'Got NULL for a non-nullable column "{column_name}"')
[docs] def add_row(self, row: Iterable[object]): """ Inserts a row of data into the table. ``None`` corresponds to the database NULL, see :any:`TypeTag` for how Python types are mapped to Hyper data types. :param row: a row of data as a list of objects. """ try: i = 0 for v in row: if v is None: self._insert_null_functions[i]() else: self._insert_value_functions[i](v) i += 1 if i != self._column_count: raise ValueError(f'Too few columns provided to `add_row()`. ' f'Provided {i} column(s) but {self._column_count} are required.') except ValueError: # If a `ValueError` occurred, just re-raise it after closing the inserter and skip the `__validate_row()` # pass of the general `Exception` case. Otherwise, `__validate_row()` might raise a different exception # and confuse the user. (See Defect 1275968.) self.close() raise except Exception: self.close() # see if insertion failed because the values are invalid self.__validate_row(row) # nope, something else happened, re-raise the exception raise
[docs] def add_rows(self, rows: Iterable[Iterable[object]]): """ Inserts rows of data into the table. Each row is represented by a sequence of objects. ``None`` corresponds to the database NULL, see :any:`TypeTag` for how Python types are mapped to Hyper data types. This is a convenience method equivalent to:: for row in rows: inserter.add_row(row) :param rows: a sequence of rows of data. """ for row in rows: self.add_row(row)
[docs] def execute(self): """ Flush remaining data and commit. If this method fails then the data is discarded and not inserted. The inserter is closed after this method returns, even in the case of a failure. If ``execute()`` is not called and the inserter is closed, then the inserted data is discarded. """ if self._cdata is None: raise RuntimeError('Inserter is already closed') self.__close(True)
def __close(self, insert_data): execute_exception = None if self._cdata is not None: if insert_data: if self._column_count == 0 and self._select_str is not ffi.NULL: try: native_select_str = InteropUtil.string_to_char_p(self._select_str) Error.check(hapi.hyper_insert_computed_expressions(self._cdata, native_select_str)) except Exception as ex: if execute_exception is None: execute_exception = ex else: try: if self._buffer is not None: Error.check(hapi.hyper_inserter_buffer_flush(self._buffer)) except Exception as ex: execute_exception = ex try: Error.check(hapi.hyper_close_inserter(self._cdata, execute_exception is None)) except Exception as ex: if execute_exception is None: execute_exception = ex else: # do not check the error, it's not going to fail hapi.hyper_close_inserter(self._cdata, False) self._cdata = None if self._native_table_definition is not None: ffi.release(self._native_table_definition.cdata) self._native_table_definition = None if self._native_stream_definition is not None: ffi.release(self._native_stream_definition.cdata) self._native_stream_definition = None if self._buffer is not None: ffi.release(self._buffer) self._buffer = None self.add_row = self.__raise_inserter_is_closed if execute_exception is not None: raise execute_exception @property def is_open(self) -> bool: """ Returns ``True`` if the inserter has not been closed yet. """ return self._cdata is not None
[docs] def close(self): """ If the inserter is not closed yet, discard all data and close it. """ self.__close(False)
@staticmethod def __raise_inserter_is_closed(*args, **kwargs): raise RuntimeError('The inserter is closed.') def __validate_row(self, row: Iterable[object]): i = 0 for v in row: if i >= self._column_count: raise ValueError( f'The table has {self._column_count} columns, but Inserter.add_row() was called for a row with ' f'more than {self._column_count} values.') # insert_null will raise an exception if NULL is not allowed if v is not None: self._validation_functions[i](v, self._columns[i]) i += 1 if i != self._column_count: raise ValueError( f'Inserter.add_row() was called for a row with {i} values, but the table has {self._column_count} ' f'columns.') @staticmethod def __format_value(value): s = repr(value) if len(s) > 20: s = s[:20] + '...' return s @staticmethod def __validate_bytes_value(value: Any, column: TableDefinition.Column): if not isinstance(value, bytes) and not isinstance(value, bytearray): raise ValueError(f'Got an invalid value {Inserter.__format_value(value)} for column ' f'"{column.name.unescaped}", it ' f'must be a bytes or a bytearray instance') @staticmethod def __validate_int_value(value, column: TableDefinition.Column, min_value, max_value): if not isinstance(value, int): raise ValueError(f'Got an invalid value {Inserter.__format_value(value)} for column ' f'"{column.name.unescaped}", it ' f'must be an int instance') if value < min_value or value > max_value: raise ValueError(f'Got an invalid value {value} for column ' f'"{column.name.unescaped}", it must be in the range ' f'from {min_value} to {max_value}') @staticmethod def __validate_text_value(value, column: TableDefinition.Column): if not isinstance(value, str) and not isinstance(value, bytes) and not isinstance(value, bytearray): raise ValueError(f'Got an invalid value {Inserter.__format_value(value)} for column ' f'"{column.name.unescaped}", it ' f'must be an str, bytes, or bytearray instance') @staticmethod def __validate_bool(value: Any, column: TableDefinition.Column): if not isinstance(value, bool): raise ValueError(f'Got an invalid value {Inserter.__format_value(value)} for column ' f'"{column.name.unescaped}", it ' f'must be a bool instance') @staticmethod def __validate_big_int(value: Any, column: TableDefinition.Column): Inserter.__validate_int_value(value, column, Inserter._MIN_BIG_INT, Inserter._MAX_BIG_INT) @staticmethod def __validate_small_int(value: Any, column: TableDefinition.Column): Inserter.__validate_int_value(value, column, Inserter._MIN_SMALL_INT, Inserter._MAX_SMALL_INT) @staticmethod def __validate_int(value: Any, column: TableDefinition.Column): Inserter.__validate_int_value(value, column, Inserter._MIN_INT, Inserter._MAX_INT) @staticmethod def __validate_double(value: Any, column: TableDefinition.Column): if not isinstance(value, float): raise ValueError(f'Got an invalid value {Inserter.__format_value(value)} for column ' f'"{column.name.unescaped}", it ' f'must be a float instance') @staticmethod def __validate_oid(value: Any, column: TableDefinition.Column): Inserter.__validate_int_value(value, column, Inserter._MIN_OID, Inserter._MAX_OID) @staticmethod def __validate_bytes(value: Any, column: TableDefinition.Column): Inserter.__validate_bytes_value(value, column) @staticmethod def __validate_text(value: Any, column: TableDefinition.Column): Inserter.__validate_text_value(value, column) @staticmethod def __validate_varchar(value: Any, column: TableDefinition.Column): Inserter.__validate_text_value(value, column) @staticmethod def __validate_char(value: Any, column: TableDefinition.Column): Inserter.__validate_text_value(value, column) @staticmethod def __validate_json(value: Any, column: TableDefinition.Column): Inserter.__validate_text_value(value, column) @staticmethod def __validate_date(value: Any, column: TableDefinition.Column): if (not isinstance(value, datetime.datetime) and not isinstance(value, datetime.date) and not isinstance(value, Timestamp) and not isinstance(value, Date)): raise ValueError(f'Got an invalid value {Inserter.__format_value(value)} for column ' f'"{column.name.unescaped}", it ' f'must be a Date, Timestamp, datetime, or date instance') @staticmethod def __validate_interval(value: Any, column: TableDefinition.Column): if not isinstance(value, Interval): raise ValueError(f'Got an invalid value {Inserter.__format_value(value)} for column ' f'"{column.name.unescaped}", it ' f'must be a Interval instance') @staticmethod def __validate_time(value: Any, column: TableDefinition.Column): if not isinstance(value, datetime.datetime) and not isinstance(value, datetime.time): raise ValueError(f'Got an invalid value {Inserter.__format_value(value)} for column ' f'"{column.name.unescaped}", it ' f'must be a datetime or a time instance') @staticmethod def __validate_timestamp_value(value: Any, column: TableDefinition.Column): if not isinstance(value, datetime.datetime) and not isinstance(value, Timestamp): raise ValueError(f'Got an invalid value {Inserter.__format_value(value)} for column ' f'"{column.name.unescaped}", it ' f'must be a Timestamp or a datetime instance') @staticmethod def __validate_timestamp(value: Any, column: TableDefinition.Column): Inserter.__validate_timestamp_value(value, column) @staticmethod def __validate_timestamp_tz(value: Any, column: TableDefinition.Column): Inserter.__validate_timestamp_value(value, column) @staticmethod def __validate_geography(value: Any, column: TableDefinition.Column): Inserter.__validate_bytes_value(value, column) @staticmethod def __validate_numeric(value: Any, column: TableDefinition.Column): if not isinstance(value, decimal.Decimal): raise ValueError(f'Got an invalid value {Inserter.__format_value(value)} for column ' f'"{column.name.unescaped}", it ' f'must be a Decimal instance') _MIN_BIG_INT = -2 ** 63 _MAX_BIG_INT = 2 ** 63 - 1 _TWO_TO_THIRTY_TWO = 2 ** 32 _TWO_TO_SIXTY_FOUR = 2 ** 64 _MIN_INT = -2 ** 31 _MAX_INT = 2 ** 31 - 1 _MIN_SMALL_INT = -2 ** 15 _MAX_SMALL_INT = 2 ** 15 - 1 _MIN_OID = 0 _MAX_OID = 2 ** 32 - 1 def __write_null(self): Error.check(self._hyper_add_null(self._buffer)) def __write_bool(self, v: bool): Error.check(self._hyper_add_bool(self._buffer, v)) def __write_small_int(self, v: int): Error.check(self._hyper_add_int16(self._buffer, v)) def __write_int(self, v: int): Error.check(self._hyper_add_int32(self._buffer, v)) def __write_uint(self, v: int): # cffi verifies that the value is in range for the target type, so we have to convert the unsigned # value to an int32_t here. E.g. 0xFFFFFFFF gets transformed into -1. v = v if v <= Inserter._MAX_INT else v - Inserter._TWO_TO_THIRTY_TWO Error.check(self._hyper_add_int32(self._buffer, v)) def __write_big_int(self, v: int): Error.check(self._hyper_add_int64(self._buffer, v)) def __write_double(self, v: float): Error.check(self._hyper_add_double(self._buffer, v)) def __write_bytes(self, v: Union[bytes, bytearray]): Error.check(self._hyper_add_binary(self._buffer, self._ffi_from_buffer(v), len(v))) def __write_text(self, v: Union[str, bytes, bytearray]): try: v = v.encode() except AttributeError: pass Error.check(self._hyper_add_binary(self._buffer, self._ffi_from_buffer(v), len(v))) def __write_date(self, v: Union[datetime.date, datetime.datetime, Date]): Error.check(self._hyper_add_date(self._buffer, v.year, v.month, v.day)) def __write_time(self, v: Union[datetime.time, datetime.datetime]): Error.check(self._hyper_add_int64(self._buffer, Converter.time_to_hyper(v))) def __write_timestamp(self, v: Union[datetime.datetime, Timestamp]): v = Converter.to_hyper_timestamp(v) v = v if v <= Inserter._MAX_BIG_INT else v - Inserter._TWO_TO_SIXTY_FOUR Error.check(self._hyper_add_int64(self._buffer, v)) def __write_timestamp_tz(self, v: Union[datetime.datetime, Timestamp]): v = Converter.to_hyper_timestamp_tz(v) v = v if v <= Inserter._MAX_BIG_INT else v - Inserter._TWO_TO_SIXTY_FOUR Error.check(self._hyper_add_int64(self._buffer, v)) def __write_interval(self, v: Interval): p = self._py_interval p.microseconds = v.microseconds p.days = v.days p.months = v.months Error.check(self._hyper_add_raw(self._buffer, self._py_interval_buf, 16)) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def __del__(self): if self._cdata is not None: warnings.warn('Inserter has not been closed. Use Inserter object in a with statement or call its close() ' 'method when done.', UnclosedObjectWarning) hapi.hyper_close_inserter(self._cdata, False) self._cdata = None