Source code for sqlbatis.connection

from sqlalchemy import text
from .row import Row, RowSet, SQLAlchemyResultProxy
from .container import SQLBatisContainer


[docs]class Connection: """The wrapper of the sqlalchemy raw connection """ def __init__(self, conn): self.conn = conn self.transaction = None
[docs] def close(self): """Close the connection """ local = SQLBatisContainer.__local__ self.conn.close() del local.connection
@property def closed(self): """Check the connection status :return: the boolean which indicate the connection status :rtype: boolean """ return self.conn.closed
[docs] def in_transaction(self): """Check if the connection is in transaction :return: the status of the connection transaction :rtype: boolean """ return self.conn.in_transaction()
[docs] def query(self, sql, fetch_all, **params): """The basic query based on the sqlalchemy, it will accept the raw sql, and execute will raw sqlalchemy connection :param sql: the raw sql that will be executed :type sql: str :param fetch_all: determine if consume all the iterator immediately instead of lazy loading :type fetch_all: bool :return: the row or rowset of the sql result :rtype: Row or RowSet """ result_proxy = self.conn.execute(text(sql), **params) return self._result_proxy_to_rowset(result_proxy, fetch_all)
[docs] def bulk_query(self, sql, *params): """Bulk update or insert :param sql: the raw sql that will be executed :type sql: str :return: the row or rowset of the sql result :rtype: Row or RowSet """ result_proxy = self.conn.execute(text(sql), *params) return self._result_proxy_to_rowset(result_proxy, False)
[docs] def execute(self, sql, fetch_all=False, inserted_primary_key=False, **params): """The raw execute function of the sqlalchemy, and main difference between this func with the query is it can accept the sqlalchemy sql expression as the first parameter :param sql: the sqlalchemy sql expression need to be executed :type sql: sqlalchemy sql expression :param fetch_all: determine if consume all the iterator immediately instead of lazy loading, defaults to False :type fetch_all: bool, optional :param inserted_primary_key: if return the primary key when do the create func, defaults to False :type inserted_primary_key: bool, optional :return: the result of the query :rtype: Row or RowSet or int(if inserted_primary_key) """ result_proxy = self.conn.execute(sql, **params) if inserted_primary_key: return result_proxy.inserted_primary_key[0] return self._result_proxy_to_rowset(result_proxy, fetch_all)
[docs] def begin(self): """Start a transaction :return: a transaction :rtype: TBI """ self.transaction = self.conn.begin() return self.transaction
[docs] def begin_nested(self): return self.conn.begin_nested()
def _result_proxy_to_rowset(self, result_proxy, fetch_all): """Convert the ResultProxy object of the query result RowSet which defined in the SQLBatis :param result_proxy: the query result of the sqlalchemy :type result_proxy: ResultProxy :param fetch_all: consumen all the iterator :type fetch_all: bool :return: the rowset of the sql result :rtype: RowSet """ if result_proxy.returns_rows: keys = result_proxy.keys() rows = (Row(keys, values) for values in result_proxy) results = RowSet(rows, SQLAlchemyResultProxy(result_proxy)) if fetch_all: results.all() return results else: return RowSet(iter([]), SQLAlchemyResultProxy(result_proxy)) def __enter__(self): return self def __exit__(self, exc, val, traceback): # if the current connection is in transaction will not close immediately if not self.in_transaction(): self.close()