Unverified Commit 824b3aad authored by Kristian Tan's avatar Kristian Tan Committed by GitHub

Log time (#1)

Save record to the database for each day's usage and display it on the website
parent f57eac82
from .daily_usage import DailyUsage
\ No newline at end of file
from flask import Flask, render_template
import RPi.GPIO as GPIO
from flask_sqlalchemy import SQLAlchemy
import os
from datetime import datetime, date, timedelta
# from daily_usage import DailyUsage
app = Flask(__name__)
os.environ['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///energyUsage'
app.config['SQLALCHEMY_DATABASE_URI'] = os.environ['SQLALCHEMY_DATABASE_URI']
# To suppress warnings
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
db = SQLAlchemy(app)
# TODO: Move this into daily_usage class file
class DailyUsage(db.Model):
__tablename__ = 'daily_usage'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
date = db.Column(db.DateTime, unique=True, nullable=False)
kwhUsed = db.Column(db.Float, unique=False)
def __init__(self, date, kwhUsed):
self.date = date
self.kwhUsed = kwhUsed
def __repr__(self):
return '<DailyUsage %r, %r, %r>' % (self.id, self.date, self.kwhUsed)
db.create_all()
# TODO: be able to query db by date
daily_total = 0
latest_entry = db.session.query(DailyUsage).order_by(DailyUsage.id.desc()).first()
if latest_entry:
latest_entry_date = date(latest_entry.date.year, latest_entry.date.month, latest_entry.date.day)
if latest_entry_date == datetime.today().date():
daily_total = format(latest_entry.kwhUsed, '.7f')
todays_cost = format(float(daily_total) * 0.1622, '0.5f')
# Create dictionary to store pin info
pins = {
25: {'name': 'Light', 'state': GPIO.LOW}
25: {'name': 'Light', 'state': GPIO.LOW, 'on_time': None, 'on_date': None, 'Wattage': 15}
}
# Setup each pin
......@@ -24,39 +64,75 @@ def main():
# Set the template data for the HTML template
template_data = {
'pins': pins
'pins': pins,
'daily_total': daily_total,
'todays_cost': todays_cost
}
return render_template('main.html', **template_data)
@app.route("/<change_pin>")
@app.route("/toggle/<change_pin>")
def toggle_pin(change_pin):
if change_pin == 'favicon.ico':
pass
change_pin = int(change_pin)
device_name = pins[change_pin]['name']
# Toggle the selected pin
GPIO.output(change_pin, not GPIO.input(change_pin))
message = "Turned " + device_name
if GPIO.input(change_pin) == 0:
message += " off."
if pins[change_pin]['on_time'] is not None:
create_entry(change_pin)
else:
message += " on."
pins[change_pin]['on_time'] = datetime.now()
pins[change_pin]['on_date'] = date.today()
for pin in pins:
pins[pin]['state'] = GPIO.input(pin)
latest_entry = db.session.query(DailyUsage).order_by(DailyUsage.id.desc()).first()
if latest_entry:
latest_entry_date = date(latest_entry.date.year, latest_entry.date.month, latest_entry.date.day)
if latest_entry_date == datetime.today().date():
daily_total = format(latest_entry.kwhUsed, '.7f')
else:
daily_total = 0
todays_cost = format(float(daily_total) * 0.1622, '0.5f')
template_data = {
'message': message,
'pins': pins
'pins': pins,
'daily_total': daily_total,
'todays_cost': todays_cost
}
return render_template('main.html', **template_data)
def create_entry(change_pin):
latest_entry = db.session.query(DailyUsage).order_by(DailyUsage.id.desc()).first()
start_time = pins[change_pin]['on_time']
# Get the elapsed time and strip away milliseconds
elapsed = int((datetime.now() - start_time).total_seconds())
start_date = pins[change_pin]['on_date']
# Formula to calculate kWh based on time and wattage
kwh = pins[change_pin]['Wattage'] * (elapsed / 3600) / 1000
# If there is already an entry for today, update on time
# if latest_entry:
if latest_entry:
latest_entry_date = date(latest_entry.date.year, latest_entry.date.month, latest_entry.date.day)
if latest_entry_date == start_date:
latest_entry.kwhUsed += kwh
else:
# If no entry for today, make one
entry = DailyUsage(date=start_date, kwhUsed=kwh)
db.session.add(entry)
db.session.commit()
pins[change_pin]['on_time'] = None
pins[change_pin]['on_date'] = None
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8090)
from app import db
class DailyUsage(db.Model):
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
date = db.Column(db.String(80), unique=True, nullable=False)
hours = db.Column(db.Integer(120), unique=False)
def __repr__(self):
return '<DailyUsage %r>' % self.id
\ No newline at end of file
File added
body{
body {
font-family: "Roboto Thin";
}
.heading {
text-decoration: underline;
text-align: center;
background-color: aqua;
margin-bottom: 5%;
/*background-color: aqua;*/
}
.container{
.container {
display: flex;
flex-direction: row;
background-color: red;
/*background-color: darkkhaki;*/
height: 100%;
}
.device_list{
.device_list {
width: 25%;
background-color: red;
/*background-color: red;*/
margin-left: 5%;
text-align: left;
}
.spacer{
width: 5%;
background-color: green;
}
.smart_meter{
width: 70%;
background-color: coral;
.device_list_title {
text-decoration: underline;
}
.display {
width: 80%;
height: 80%;
margin: 5%;
.smart_meter {
width: 65%;
margin-right: 5%;
margin-left: 5%;
/*background-color: coral;*/
border-style: solid;
border-color: black;
border-width: 1px;
}
.smart_meter_info {
margin-left: 5%;
}
......@@ -11,30 +11,34 @@
<body>
<div class="container">
<div class="device_list">
<h3>Device Listing and Status</h3>
<h3 class="device_list_title">Device Listing and Status</h3>
{% for pin in pins %}
<p>The {{ pins[pin].name }}
<p>{{ pins[pin].name }} :
{% if pins[pin].state == true %}
is currently on (<a href="/{{pin}}">turn off</a>)
(<a href="/toggle/{{pin}}">turn off</a>)
{% else %}
is currently off (<a href="/{{pin}}">turn on</a>)
(<a href="/toggle/{{pin}}">turn on</a>)
{% endif %}
</p>
{% endfor %}
{% if message %}
<h2>{{ message }}</h2>
{% endif %}
</div>
<div class="spacer"></div>
</div>
<div class="smart_meter">
<div class="display">
<div class="smart_meter_info">
<p> Devices currently in use:
{% for pin in pins %}
{% if pins[pin].state == true %}
{{ pins[pin].name }}
{% endif %}
{% endfor %}
</p>
{# Implement some way to display "No devices" message if none are in use#}
<p> Total energy used today: {{ daily_total }} kWh.</p>
<p> Total cost of today's energy usage: £{{ todays_cost }}</p>
</div>
</div>
</div>
</body>
......
Copyright 2010 Pallets
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Metadata-Version: 2.1
Name: Flask-SQLAlchemy
Version: 2.4.1
Summary: Adds SQLAlchemy support to your Flask application.
Home-page: https://github.com/pallets/flask-sqlalchemy
Author: Armin Ronacher
Author-email: armin.ronacher@active-4.com
Maintainer: Pallets
Maintainer-email: contact@palletsprojects.com
License: BSD-3-Clause
Project-URL: Documentation, https://flask-sqlalchemy.palletsprojects.com/
Project-URL: Code, https://github.com/pallets/flask-sqlalchemy
Project-URL: Issue tracker, https://github.com/pallets/flask-sqlalchemy/issues
Platform: UNKNOWN
Classifier: Development Status :: 5 - Production/Stable
Classifier: Environment :: Web Environment
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: BSD License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 2
Classifier: Programming Language :: Python :: 2.7
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.4
Classifier: Programming Language :: Python :: 3.5
Classifier: Programming Language :: Python :: 3.6
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Python :: Implementation :: PyPy
Classifier: Topic :: Internet :: WWW/HTTP :: Dynamic Content
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >= 2.7, != 3.0.*, != 3.1.*, != 3.2.*, != 3.3.*
Requires-Dist: Flask (>=0.10)
Requires-Dist: SQLAlchemy (>=0.8.0)
Flask-SQLAlchemy
================
Flask-SQLAlchemy is an extension for `Flask`_ that adds support for
`SQLAlchemy`_ to your application. It aims to simplify using SQLAlchemy
with Flask by providing useful defaults and extra helpers that make it
easier to accomplish common tasks.
Installing
----------
Install and update using `pip`_:
.. code-block:: text
$ pip install -U Flask-SQLAlchemy
A Simple Example
----------------
.. code-block:: python
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///example.sqlite"
db = SQLAlchemy(app)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String, unique=True, nullable=False)
email = db.Column(db.String, unique=True, nullable=False)
db.session.add(User(name="Flask", email="example@example.com"))
db.session.commit()
users = User.query.all()
Links
-----
- Documentation: https://flask-sqlalchemy.palletsprojects.com/
- Releases: https://pypi.org/project/Flask-SQLAlchemy/
- Code: https://github.com/pallets/flask-sqlalchemy
- Issue tracker: https://github.com/pallets/flask-sqlalchemy/issues
- Test status: https://travis-ci.org/pallets/flask-sqlalchemy
- Test coverage: https://codecov.io/gh/pallets/flask-sqlalchemy
.. _Flask: https://palletsprojects.com/p/flask/
.. _SQLAlchemy: https://www.sqlalchemy.org
.. _pip: https://pip.pypa.io/en/stable/quickstart/
Flask_SQLAlchemy-2.4.1.dist-info/INSTALLER,sha256=zuuue4knoyJ-UwPPXg8fezS7VCrXJQrAP7zeNuwvFQg,4
Flask_SQLAlchemy-2.4.1.dist-info/LICENSE.rst,sha256=SJqOEQhQntmKN7uYPhHg9-HTHwvY-Zp5yESOf_N9B-o,1475
Flask_SQLAlchemy-2.4.1.dist-info/METADATA,sha256=SO2Yy86hBglL9QIQxNdZqKPPBaS-3LrvuYbMG6wHuKI,3128
Flask_SQLAlchemy-2.4.1.dist-info/RECORD,,
Flask_SQLAlchemy-2.4.1.dist-info/WHEEL,sha256=8zNYZbwQSXoB9IfXOjPfeNwvAsALAjffgk27FqvCWbo,110
Flask_SQLAlchemy-2.4.1.dist-info/top_level.txt,sha256=w2K4fNNoTh4HItoFfz2FRQShSeLcvHYrzU_sZov21QU,17
flask_sqlalchemy/__init__.py,sha256=qaMQKMcAVz3et6XhUqOyjzpn8V7NUghH5hHSZvyEJUw,39027
flask_sqlalchemy/__pycache__/__init__.cpython-36.pyc,,
flask_sqlalchemy/__pycache__/_compat.cpython-36.pyc,,
flask_sqlalchemy/__pycache__/model.cpython-36.pyc,,
flask_sqlalchemy/__pycache__/utils.cpython-36.pyc,,
flask_sqlalchemy/_compat.py,sha256=yua0ZSgVWwi56QpEgwaPInzkNQ9PFb7YQdvEk3dImXo,821
flask_sqlalchemy/model.py,sha256=9jBoPU1k0c4nqz2-KyYnfoE55n-1G8Zxfo2Z-ZHV0v4,4992
flask_sqlalchemy/utils.py,sha256=4eHqAbYElnJ3NbSAHhuINckoAHDABoxjleMJD0iKgyg,1390
Wheel-Version: 1.0
Generator: bdist_wheel (0.33.6)
Root-Is-Purelib: true
Tag: py2-none-any
Tag: py3-none-any
SQLAlchemy
==========
The Python SQL Toolkit and Object Relational Mapper
Introduction
-------------
SQLAlchemy is the Python SQL toolkit and Object Relational Mapper
that gives application developers the full power and
flexibility of SQL. SQLAlchemy provides a full suite
of well known enterprise-level persistence patterns,
designed for efficient and high-performing database
access, adapted into a simple and Pythonic domain
language.
Major SQLAlchemy features include:
* An industrial strength ORM, built
from the core on the identity map, unit of work,
and data mapper patterns. These patterns
allow transparent persistence of objects
using a declarative configuration system.
Domain models
can be constructed and manipulated naturally,
and changes are synchronized with the
current transaction automatically.
* A relationally-oriented query system, exposing
the full range of SQL's capabilities
explicitly, including joins, subqueries,
correlation, and most everything else,
in terms of the object model.
Writing queries with the ORM uses the same
techniques of relational composition you use
when writing SQL. While you can drop into
literal SQL at any time, it's virtually never
needed.
* A comprehensive and flexible system
of eager loading for related collections and objects.
Collections are cached within a session,
and can be loaded on individual access, all
at once using joins, or by query per collection
across the full result set.
* A Core SQL construction system and DBAPI
interaction layer. The SQLAlchemy Core is
separate from the ORM and is a full database
abstraction layer in its own right, and includes
an extensible Python-based SQL expression
language, schema metadata, connection pooling,
type coercion, and custom types.
* All primary and foreign key constraints are
assumed to be composite and natural. Surrogate
integer primary keys are of course still the
norm, but SQLAlchemy never assumes or hardcodes
to this model.
* Database introspection and generation. Database
schemas can be "reflected" in one step into
Python structures representing database metadata;
those same structures can then generate
CREATE statements right back out - all within
the Core, independent of the ORM.
SQLAlchemy's philosophy:
* SQL databases behave less and less like object
collections the more size and performance start to
matter; object collections behave less and less like
tables and rows the more abstraction starts to matter.
SQLAlchemy aims to accommodate both of these
principles.
* An ORM doesn't need to hide the "R". A relational
database provides rich, set-based functionality
that should be fully exposed. SQLAlchemy's
ORM provides an open-ended set of patterns
that allow a developer to construct a custom
mediation layer between a domain model and
a relational schema, turning the so-called
"object relational impedance" issue into
a distant memory.
* The developer, in all cases, makes all decisions
regarding the design, structure, and naming conventions
of both the object model as well as the relational
schema. SQLAlchemy only provides the means
to automate the execution of these decisions.
* With SQLAlchemy, there's no such thing as
"the ORM generated a bad query" - you
retain full control over the structure of
queries, including how joins are organized,
how subqueries and correlation is used, what
columns are requested. Everything SQLAlchemy
does is ultimately the result of a developer-
initiated decision.
* Don't use an ORM if the problem doesn't need one.
SQLAlchemy consists of a Core and separate ORM
component. The Core offers a full SQL expression
language that allows Pythonic construction
of SQL constructs that render directly to SQL
strings for a target database, returning
result sets that are essentially enhanced DBAPI
cursors.
* Transactions should be the norm. With SQLAlchemy's
ORM, nothing goes to permanent storage until
commit() is called. SQLAlchemy encourages applications
to create a consistent means of delineating
the start and end of a series of operations.
* Never render a literal value in a SQL statement.
Bound parameters are used to the greatest degree
possible, allowing query optimizers to cache
query plans effectively and making SQL injection
attacks a non-issue.
Documentation
-------------
Latest documentation is at:
http://www.sqlalchemy.org/docs/
Installation / Requirements
---------------------------
Full documentation for installation is at
`Installation <http://www.sqlalchemy.org/docs/intro.html#installation>`_.
Getting Help / Development / Bug reporting
------------------------------------------
Please refer to the `SQLAlchemy Community Guide <http://www.sqlalchemy.org/support.html>`_.
Code of Conduct
---------------
Above all, SQLAlchemy places great emphasis on polite, thoughtful, and
constructive communication between users and developers.
Please see our current Code of Conduct at
`Code of Conduct <http://www.sqlalchemy.org/codeofconduct.html>`_.
License
-------
SQLAlchemy is distributed under the `MIT license
<http://www.opensource.org/licenses/mit-license.php>`_.
Copyright 2005-2019 SQLAlchemy authors and contributors <see AUTHORS file>.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
Metadata-Version: 2.0
Name: SQLAlchemy
Version: 1.3.11
Summary: Database Abstraction Library
Home-page: http://www.sqlalchemy.org
Author: Mike Bayer
Author-email: mike_mp@zzzcomputing.com
License: MIT
Project-URL: Documentation, https://docs.sqlalchemy.org
Project-URL: Issue Tracker, https://github.com/sqlalchemy/sqlalchemy/
Platform: UNKNOWN
Classifier: Development Status :: 5 - Production/Stable
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 2
Classifier: Programming Language :: Python :: 2.7
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.4
Classifier: Programming Language :: Python :: 3.5
Classifier: Programming Language :: Python :: 3.6
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Python :: Implementation :: PyPy
Classifier: Topic :: Database :: Front-Ends
Classifier: Operating System :: OS Independent
Requires-Python: >=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*
Provides-Extra: mssql
Provides-Extra: mssql_pymssql
Provides-Extra: mssql_pyodbc
Provides-Extra: mysql
Provides-Extra: oracle
Provides-Extra: postgresql
Provides-Extra: postgresql_pg8000
Provides-Extra: postgresql_psycopg2binary
Provides-Extra: postgresql_psycopg2cffi
Provides-Extra: pymysql
Provides-Extra: mssql
Requires-Dist: pyodbc; extra == 'mssql'
Provides-Extra: mssql_pymssql
Requires-Dist: pymssql; extra == 'mssql_pymssql'
Provides-Extra: mssql_pyodbc
Requires-Dist: pyodbc; extra == 'mssql_pyodbc'
Provides-Extra: mysql
Requires-Dist: mysqlclient; extra == 'mysql'
Provides-Extra: oracle
Requires-Dist: cx-oracle; extra == 'oracle'
Provides-Extra: postgresql
Requires-Dist: psycopg2; extra == 'postgresql'
Provides-Extra: postgresql_pg8000
Requires-Dist: pg8000; extra == 'postgresql_pg8000'
Provides-Extra: postgresql_psycopg2binary
Requires-Dist: psycopg2-binary; extra == 'postgresql_psycopg2binary'
Provides-Extra: postgresql_psycopg2cffi
Requires-Dist: psycopg2cffi; extra == 'postgresql_psycopg2cffi'
Provides-Extra: pymysql
Requires-Dist: pymysql; extra == 'pymysql'
SQLAlchemy
==========
The Python SQL Toolkit and Object Relational Mapper
Introduction
-------------
SQLAlchemy is the Python SQL toolkit and Object Relational Mapper
that gives application developers the full power and
flexibility of SQL. SQLAlchemy provides a full suite
of well known enterprise-level persistence patterns,
designed for efficient and high-performing database
access, adapted into a simple and Pythonic domain
language.
Major SQLAlchemy features include:
* An industrial strength ORM, built
from the core on the identity map, unit of work,
and data mapper patterns. These patterns
allow transparent persistence of objects
using a declarative configuration system.
Domain models
can be constructed and manipulated naturally,
and changes are synchronized with the
current transaction automatically.
* A relationally-oriented query system, exposing
the full range of SQL's capabilities
explicitly, including joins, subqueries,
correlation, and most everything else,
in terms of the object model.
Writing queries with the ORM uses the same
techniques of relational composition you use
when writing SQL. While you can drop into
literal SQL at any time, it's virtually never
needed.
* A comprehensive and flexible system
of eager loading for related collections and objects.
Collections are cached within a session,
and can be loaded on individual access, all
at once using joins, or by query per collection
across the full result set.
* A Core SQL construction system and DBAPI
interaction layer. The SQLAlchemy Core is
separate from the ORM and is a full database
abstraction layer in its own right, and includes
an extensible Python-based SQL expression
language, schema metadata, connection pooling,
type coercion, and custom types.
* All primary and foreign key constraints are
assumed to be composite and natural. Surrogate
integer primary keys are of course still the
norm, but SQLAlchemy never assumes or hardcodes
to this model.
* Database introspection and generation. Database
schemas can be "reflected" in one step into
Python structures representing database metadata;
those same structures can then generate
CREATE statements right back out - all within
the Core, independent of the ORM.
SQLAlchemy's philosophy:
* SQL databases behave less and less like object
collections the more size and performance start to
matter; object collections behave less and less like
tables and rows the more abstraction starts to matter.
SQLAlchemy aims to accommodate both of these
principles.
* An ORM doesn't need to hide the "R". A relational
database provides rich, set-based functionality
that should be fully exposed. SQLAlchemy's
ORM provides an open-ended set of patterns
that allow a developer to construct a custom
mediation layer between a domain model and
a relational schema, turning the so-called
"object relational impedance" issue into
a distant memory.
* The developer, in all cases, makes all decisions
regarding the design, structure, and naming conventions
of both the object model as well as the relational
schema. SQLAlchemy only provides the means
to automate the execution of these decisions.
* With SQLAlchemy, there's no such thing as
"the ORM generated a bad query" - you
retain full control over the structure of
queries, including how joins are organized,
how subqueries and correlation is used, what
columns are requested. Everything SQLAlchemy
does is ultimately the result of a developer-
initiated decision.
* Don't use an ORM if the problem doesn't need one.
SQLAlchemy consists of a Core and separate ORM
component. The Core offers a full SQL expression
language that allows Pythonic construction
of SQL constructs that render directly to SQL
strings for a target database, returning
result sets that are essentially enhanced DBAPI
cursors.
* Transactions should be the norm. With SQLAlchemy's
ORM, nothing goes to permanent storage until
commit() is called. SQLAlchemy encourages applications
to create a consistent means of delineating
the start and end of a series of operations.
* Never render a literal value in a SQL statement.
Bound parameters are used to the greatest degree
possible, allowing query optimizers to cache
query plans effectively and making SQL injection
attacks a non-issue.
Documentation
-------------
Latest documentation is at:
http://www.sqlalchemy.org/docs/
Installation / Requirements
---------------------------
Full documentation for installation is at
`Installation <http://www.sqlalchemy.org/docs/intro.html#installation>`_.
Getting Help / Development / Bug reporting
------------------------------------------
Please refer to the `SQLAlchemy Community Guide <http://www.sqlalchemy.org/support.html>`_.
Code of Conduct
---------------
Above all, SQLAlchemy places great emphasis on polite, thoughtful, and
constructive communication between users and developers.
Please see our current Code of Conduct at
`Code of Conduct <http://www.sqlalchemy.org/codeofconduct.html>`_.
License
-------
SQLAlchemy is distributed under the `MIT license
<http://www.opensource.org/licenses/mit-license.php>`_.
Wheel-Version: 1.0
Generator: bdist_wheel (0.30.0)
Root-Is-Purelib: false
Tag: cp36-cp36m-linux_x86_64
{"classifiers": ["Development Status :: 5 - Production/Stable", "Intended Audience :: Developers", "License :: OSI Approved :: MIT License", "Programming Language :: Python", "Programming Language :: Python :: 2", "Programming Language :: Python :: 2.7", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.4", "Programming Language :: Python :: 3.5", "Programming Language :: Python :: 3.6", "Programming Language :: Python :: 3.7", "Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: PyPy", "Topic :: Database :: Front-Ends", "Operating System :: OS Independent"], "extensions": {"python.details": {"contacts": [{"email": "mike_mp@zzzcomputing.com", "name": "Mike Bayer", "role": "author"}], "document_names": {"description": "DESCRIPTION.rst", "license": "LICENSE.txt"}, "project_urls": {"Home": "http://www.sqlalchemy.org"}}}, "extras": ["mssql", "mssql_pymssql", "mssql_pyodbc", "mysql", "oracle", "postgresql", "postgresql_pg8000", "postgresql_psycopg2binary", "postgresql_psycopg2cffi", "pymysql"], "generator": "bdist_wheel (0.30.0)", "license": "MIT", "metadata_version": "2.0", "name": "SQLAlchemy", "project_url": "Documentation, https://docs.sqlalchemy.org", "requires_python": ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*", "run_requires": [{"extra": "oracle", "requires": ["cx-oracle"]}, {"extra": "mysql", "requires": ["mysqlclient"]}, {"extra": "postgresql_pg8000", "requires": ["pg8000"]}, {"extra": "postgresql_psycopg2binary", "requires": ["psycopg2-binary"]}, {"extra": "postgresql", "requires": ["psycopg2"]}, {"extra": "postgresql_psycopg2cffi", "requires": ["psycopg2cffi"]}, {"extra": "mssql_pymssql", "requires": ["pymssql"]}, {"extra": "pymysql", "requires": ["pymysql"]}, {"extra": "mssql", "requires": ["pyodbc"]}, {"extra": "mssql_pyodbc", "requires": ["pyodbc"]}], "summary": "Database Abstraction Library", "test_requires": [{"requires": ["mock", "pytest (!=3.9.1,!=3.9.2,>=2.5.2)", "pytest-xdist"]}], "version": "1.3.11"}
\ No newline at end of file
This diff is collapsed.
import sys
PY2 = sys.version_info[0] == 2
if PY2:
def iteritems(d):
return d.iteritems()
def itervalues(d):
return d.itervalues()
xrange = xrange
string_types = (unicode, bytes)
def to_str(x, charset='utf8', errors='strict'):
if x is None or isinstance(x, str):
return x
if isinstance(x, unicode):
return x.encode(charset, errors)
return str(x)
else:
def iteritems(d):
return iter(d.items())
def itervalues(d):
return iter(d.values())
xrange = range
string_types = (str,)
def to_str(x, charset='utf8', errors='strict'):
if x is None or isinstance(x, str):
return x
if isinstance(x, bytes):
return x.decode(charset, errors)
return str(x)
import re
import sqlalchemy as sa
from sqlalchemy import inspect
from sqlalchemy.ext.declarative import DeclarativeMeta, declared_attr
from sqlalchemy.schema import _get_table_key
from ._compat import to_str
def should_set_tablename(cls):
"""Determine whether ``__tablename__`` should be automatically generated
for a model.
* If no class in the MRO sets a name, one should be generated.
* If a declared attr is found, it should be used instead.
* If a name is found, it should be used if the class is a mixin, otherwise
one should be generated.
* Abstract models should not have one generated.
Later, :meth:`._BoundDeclarativeMeta.__table_cls__` will determine if the
model looks like single or joined-table inheritance. If no primary key is
found, the name will be unset.
"""
if (
cls.__dict__.get('__abstract__', False)
or not any(isinstance(b, DeclarativeMeta) for b in cls.__mro__[1:])
):
return False
for base in cls.__mro__:
if '__tablename__' not in base.__dict__:
continue
if isinstance(base.__dict__['__tablename__'], declared_attr):
return False
return not (
base is cls
or base.__dict__.get('__abstract__', False)
or not isinstance(base, DeclarativeMeta)
)
return True
camelcase_re = re.compile(r'([A-Z]+)(?=[a-z0-9])')
def camel_to_snake_case(name):
def _join(match):
word = match.group()
if len(word) > 1:
return ('_%s_%s' % (word[:-1], word[-1])).lower()
return '_' + word.lower()
return camelcase_re.sub(_join, name).lstrip('_')
class NameMetaMixin(object):
def __init__(cls, name, bases, d):
if should_set_tablename(cls):
cls.__tablename__ = camel_to_snake_case(cls.__name__)
super(NameMetaMixin, cls).__init__(name, bases, d)
# __table_cls__ has run at this point
# if no table was created, use the parent table
if (
'__tablename__' not in cls.__dict__
and '__table__' in cls.__dict__
and cls.__dict__['__table__'] is None
):
del cls.__table__
def __table_cls__(cls, *args, **kwargs):
"""This is called by SQLAlchemy during mapper setup. It determines the
final table object that the model will use.
If no primary key is found, that indicates single-table inheritance,
so no table will be created and ``__tablename__`` will be unset.
"""
# check if a table with this name already exists
# allows reflected tables to be applied to model by name
key = _get_table_key(args[0], kwargs.get('schema'))
if key in cls.metadata.tables:
return sa.Table(*args, **kwargs)
# if a primary key or constraint is found, create a table for
# joined-table inheritance
for arg in args:
if (
(isinstance(arg, sa.Column) and arg.primary_key)
or isinstance(arg, sa.PrimaryKeyConstraint)
):
return sa.Table(*args, **kwargs)
# if no base classes define a table, return one
# ensures the correct error shows up when missing a primary key
for base in cls.__mro__[1:-1]:
if '__table__' in base.__dict__:
break
else:
return sa.Table(*args, **kwargs)
# single-table inheritance, use the parent tablename
if '__tablename__' in cls.__dict__:
del cls.__tablename__
class BindMetaMixin(object):
def __init__(cls, name, bases, d):
bind_key = (
d.pop('__bind_key__', None)
or getattr(cls, '__bind_key__', None)
)
super(BindMetaMixin, cls).__init__(name, bases, d)
if bind_key is not None and getattr(cls, '__table__', None) is not None:
cls.__table__.info['bind_key'] = bind_key
class DefaultMeta(NameMetaMixin, BindMetaMixin, DeclarativeMeta):
pass
class Model(object):
"""Base class for SQLAlchemy declarative base model.
To define models, subclass :attr:`db.Model <SQLAlchemy.Model>`, not this
class. To customize ``db.Model``, subclass this and pass it as
``model_class`` to :class:`SQLAlchemy`.
"""
#: Query class used by :attr:`query`. Defaults to
# :class:`SQLAlchemy.Query`, which defaults to :class:`BaseQuery`.
query_class = None
#: Convenience property to query the database for instances of this model
# using the current session. Equivalent to ``db.session.query(Model)``
# unless :attr:`query_class` has been changed.
query = None
def __repr__(self):
identity = inspect(self).identity
if identity is None:
pk = "(transient {0})".format(id(self))
else:
pk = ', '.join(to_str(value) for value in identity)
return '<{0} {1}>'.format(type(self).__name__, pk)
import warnings
import sqlalchemy
def parse_version(v):
"""
Take a string version and conver it to a tuple (for easier comparison), e.g.:
"1.2.3" --> (1, 2, 3)
"1.2" --> (1, 2, 0)
"1" --> (1, 0, 0)
"""
parts = v.split(".")
# Pad the list to make sure there is three elements so that we get major, minor, point
# comparisons that default to "0" if not given. I.e. "1.2" --> (1, 2, 0)
parts = (parts + 3 * ['0'])[:3]
return tuple(int(x) for x in parts)
def sqlalchemy_version(op, val):
sa_ver = parse_version(sqlalchemy.__version__)
target_ver = parse_version(val)
assert op in ('<', '>', '<=', '>=', '=='), 'op {} not supported'.format(op)
if op == '<':
return sa_ver < target_ver
if op == '>':
return sa_ver > target_ver
if op == '<=':
return sa_ver <= target_ver
if op == '>=':
return sa_ver >= target_ver
return sa_ver == target_ver
def engine_config_warning(config, version, deprecated_config_key, engine_option):
if config[deprecated_config_key] is not None:
warnings.warn(
'The `{}` config option is deprecated and will be removed in'
' v{}. Use `SQLALCHEMY_ENGINE_OPTIONS[\'{}\']` instead.'
.format(deprecated_config_key, version, engine_option),
DeprecationWarning
)
# sqlalchemy/__init__.py
# Copyright (C) 2005-2019 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
from . import util as _util # noqa
from .inspection import inspect # noqa
from .schema import BLANK_SCHEMA # noqa
from .schema import CheckConstraint # noqa
from .schema import Column # noqa
from .schema import ColumnDefault # noqa
from .schema import Computed # noqa
from .schema import Constraint # noqa
from .schema import DDL # noqa
from .schema import DefaultClause # noqa
from .schema import FetchedValue # noqa
from .schema import ForeignKey # noqa
from .schema import ForeignKeyConstraint # noqa
from .schema import Index # noqa
from .schema import MetaData # noqa
from .schema import PassiveDefault # noqa
from .schema import PrimaryKeyConstraint # noqa
from .schema import Sequence # noqa
from .schema import Table # noqa
from .schema import ThreadLocalMetaData # noqa
from .schema import UniqueConstraint # noqa
from .sql import alias # noqa
from .sql import all_ # noqa
from .sql import and_ # noqa
from .sql import any_ # noqa
from .sql import asc # noqa
from .sql import between # noqa
from .sql import bindparam # noqa
from .sql import case # noqa
from .sql import cast # noqa
from .sql import collate # noqa
from .sql import column # noqa
from .sql import delete # noqa
from .sql import desc # noqa
from .sql import distinct # noqa
from .sql import except_ # noqa
from .sql import except_all # noqa
from .sql import exists # noqa
from .sql import extract # noqa
from .sql import false # noqa
from .sql import func # noqa
from .sql import funcfilter # noqa
from .sql import insert # noqa
from .sql import intersect # noqa
from .sql import intersect_all # noqa
from .sql import join # noqa
from .sql import lateral # noqa
from .sql import literal # noqa
from .sql import literal_column # noqa
from .sql import modifier # noqa
from .sql import not_ # noqa
from .sql import null # noqa
from .sql import nullsfirst # noqa
from .sql import nullslast # noqa
from .sql import or_ # noqa
from .sql import outerjoin # noqa
from .sql import outparam # noqa
from .sql import over # noqa
from .sql import select # noqa
from .sql import subquery # noqa
from .sql import table # noqa
from .sql import tablesample # noqa
from .sql import text # noqa
from .sql import true # noqa
from .sql import tuple_ # noqa
from .sql import type_coerce # noqa
from .sql import union # noqa
from .sql import union_all # noqa
from .sql import update # noqa
from .sql import within_group # noqa
from .types import ARRAY # noqa
from .types import BIGINT # noqa
from .types import BigInteger # noqa
from .types import BINARY # noqa
from .types import Binary # noqa
from .types import BLOB # noqa
from .types import BOOLEAN # noqa
from .types import Boolean # noqa
from .types import CHAR # noqa
from .types import CLOB # noqa
from .types import DATE # noqa
from .types import Date # noqa
from .types import DATETIME # noqa
from .types import DateTime # noqa
from .types import DECIMAL # noqa
from .types import Enum # noqa
from .types import FLOAT # noqa
from .types import Float # noqa
from .types import INT # noqa
from .types import INTEGER # noqa
from .types import Integer # noqa
from .types import Interval # noqa
from .types import JSON # noqa
from .types import LargeBinary # noqa
from .types import NCHAR # noqa
from .types import NUMERIC # noqa
from .types import Numeric # noqa
from .types import NVARCHAR # noqa
from .types import PickleType # noqa
from .types import REAL # noqa
from .types import SMALLINT # noqa
from .types import SmallInteger # noqa
from .types import String # noqa
from .types import TEXT # noqa
from .types import Text # noqa
from .types import TIME # noqa
from .types import Time # noqa
from .types import TIMESTAMP # noqa
from .types import TypeDecorator # noqa
from .types import Unicode # noqa
from .types import UnicodeText # noqa
from .types import VARBINARY # noqa
from .types import VARCHAR # noqa
from .engine import create_engine # noqa nosort
from .engine import engine_from_config # noqa nosort
__version__ = "1.3.11"
def __go(lcls):
global __all__
from . import events # noqa
from . import util as _sa_util
import inspect as _inspect
__all__ = sorted(
name
for name, obj in lcls.items()
if not (name.startswith("_") or _inspect.ismodule(obj))
)
_sa_util.dependencies.resolve_all("sqlalchemy")
__go(locals())
# connectors/__init__.py
# Copyright (C) 2005-2019 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
class Connector(object):
pass
# connectors/mxodbc.py
# Copyright (C) 2005-2019 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
"""
Provide a SQLALchemy connector for the eGenix mxODBC commercial
Python adapter for ODBC. This is not a free product, but eGenix
provides SQLAlchemy with a license for use in continuous integration
testing.
This has been tested for use with mxODBC 3.1.2 on SQL Server 2005
and 2008, using the SQL Server Native driver. However, it is
possible for this to be used on other database platforms.
For more info on mxODBC, see http://www.egenix.com/
"""
import re
import sys
import warnings
from . import Connector
class MxODBCConnector(Connector):
driver = "mxodbc"
supports_sane_multi_rowcount = False
supports_unicode_statements = True
supports_unicode_binds = True
supports_native_decimal = True
@classmethod
def dbapi(cls):
# this classmethod will normally be replaced by an instance
# attribute of the same name, so this is normally only called once.
cls._load_mx_exceptions()
platform = sys.platform
if platform == "win32":
from mx.ODBC import Windows as Module
# this can be the string "linux2", and possibly others
elif "linux" in platform:
from mx.ODBC import unixODBC as Module
elif platform == "darwin":
from mx.ODBC import iODBC as Module
else:
raise ImportError("Unrecognized platform for mxODBC import")
return Module
@classmethod
def _load_mx_exceptions(cls):
""" Import mxODBC exception classes into the module namespace,
as if they had been imported normally. This is done here
to avoid requiring all SQLAlchemy users to install mxODBC.
"""
global InterfaceError, ProgrammingError
from mx.ODBC import InterfaceError
from mx.ODBC import ProgrammingError
def on_connect(self):
def connect(conn):
conn.stringformat = self.dbapi.MIXED_STRINGFORMAT
conn.datetimeformat = self.dbapi.PYDATETIME_DATETIMEFORMAT
conn.decimalformat = self.dbapi.DECIMAL_DECIMALFORMAT
conn.errorhandler = self._error_handler()
return connect
def _error_handler(self):
""" Return a handler that adjusts mxODBC's raised Warnings to
emit Python standard warnings.
"""
from mx.ODBC.Error import Warning as MxOdbcWarning
def error_handler(connection, cursor, errorclass, errorvalue):
if issubclass(errorclass, MxOdbcWarning):
errorclass.__bases__ = (Warning,)
warnings.warn(
message=str(errorvalue), category=errorclass, stacklevel=2
)
else:
raise errorclass(errorvalue)
return error_handler
def create_connect_args(self, url):
r"""Return a tuple of \*args, \**kwargs for creating a connection.
The mxODBC 3.x connection constructor looks like this:
connect(dsn, user='', password='',
clear_auto_commit=1, errorhandler=None)
This method translates the values in the provided uri
into args and kwargs needed to instantiate an mxODBC Connection.
The arg 'errorhandler' is not used by SQLAlchemy and will
not be populated.
"""
opts = url.translate_connect_args(username="user")
opts.update(url.query)
args = opts.pop("host")
opts.pop("port", None)
opts.pop("database", None)
return (args,), opts
def is_disconnect(self, e, connection, cursor):
# TODO: eGenix recommends checking connection.closed here
# Does that detect dropped connections ?
if isinstance(e, self.dbapi.ProgrammingError):
return "connection already closed" in str(e)
elif isinstance(e, self.dbapi.Error):
return "[08S01]" in str(e)
else:
return False
def _get_server_version_info(self, connection):
# eGenix suggests using conn.dbms_version instead
# of what we're doing here
dbapi_con = connection.connection
version = []
r = re.compile(r"[.\-]")
# 18 == pyodbc.SQL_DBMS_VER
for n in r.split(dbapi_con.getinfo(18)[1]):
try:
version.append(int(n))
except ValueError:
version.append(n)
return tuple(version)
def _get_direct(self, context):
if context:
native_odbc_execute = context.execution_options.get(
"native_odbc_execute", "auto"
)
# default to direct=True in all cases, is more generally
# compatible especially with SQL Server
return False if native_odbc_execute is True else True
else:
return True
def do_executemany(self, cursor, statement, parameters, context=None):
cursor.executemany(
statement, parameters, direct=self._get_direct(context)
)
def do_execute(self, cursor, statement, parameters, context=None):
cursor.execute(statement, parameters, direct=self._get_direct(context))
# connectors/pyodbc.py
# Copyright (C) 2005-2019 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
import re
from . import Connector
from .. import util
class PyODBCConnector(Connector):
driver = "pyodbc"
supports_sane_rowcount_returning = False
supports_sane_multi_rowcount = False
supports_unicode_statements = True
supports_unicode_binds = True
supports_native_decimal = True
default_paramstyle = "named"
# for non-DSN connections, this *may* be used to
# hold the desired driver name
pyodbc_driver_name = None
def __init__(self, supports_unicode_binds=None, **kw):
super(PyODBCConnector, self).__init__(**kw)
if supports_unicode_binds is not None:
self.supports_unicode_binds = supports_unicode_binds
@classmethod
def dbapi(cls):
return __import__("pyodbc")
def create_connect_args(self, url):
opts = url.translate_connect_args(username="user")
opts.update(url.query)
keys = opts
query = url.query
connect_args = {}
for param in ("ansi", "unicode_results", "autocommit"):
if param in keys:
connect_args[param] = util.asbool(keys.pop(param))
if "odbc_connect" in keys:
connectors = [util.unquote_plus(keys.pop("odbc_connect"))]
else:
def check_quote(token):
if ";" in str(token):
token = "'%s'" % token
return token
keys = dict((k, check_quote(v)) for k, v in keys.items())
dsn_connection = "dsn" in keys or (
"host" in keys and "database" not in keys
)
if dsn_connection:
connectors = [
"dsn=%s" % (keys.pop("host", "") or keys.pop("dsn", ""))
]
else:
port = ""
if "port" in keys and "port" not in query:
port = ",%d" % int(keys.pop("port"))
connectors = []
driver = keys.pop("driver", self.pyodbc_driver_name)
if driver is None:
util.warn(
"No driver name specified; "
"this is expected by PyODBC when using "
"DSN-less connections"
)
else:
connectors.append("DRIVER={%s}" % driver)
connectors.extend(
[
"Server=%s%s" % (keys.pop("host", ""), port),
"Database=%s" % keys.pop("database", ""),
]
)
user = keys.pop("user", None)
if user:
connectors.append("UID=%s" % user)
connectors.append("PWD=%s" % keys.pop("password", ""))
else:
connectors.append("Trusted_Connection=Yes")
# if set to 'Yes', the ODBC layer will try to automagically
# convert textual data from your database encoding to your
# client encoding. This should obviously be set to 'No' if
# you query a cp1253 encoded database from a latin1 client...
if "odbc_autotranslate" in keys:
connectors.append(
"AutoTranslate=%s" % keys.pop("odbc_autotranslate")
)
connectors.extend(["%s=%s" % (k, v) for k, v in keys.items()])
return [[";".join(connectors)], connect_args]
def is_disconnect(self, e, connection, cursor):
if isinstance(e, self.dbapi.ProgrammingError):
return "The cursor's connection has been closed." in str(
e
) or "Attempt to use a closed connection." in str(e)
else:
return False
# def initialize(self, connection):
# super(PyODBCConnector, self).initialize(connection)
def _dbapi_version(self):
if not self.dbapi:
return ()
return self._parse_dbapi_version(self.dbapi.version)
def _parse_dbapi_version(self, vers):
m = re.match(r"(?:py.*-)?([\d\.]+)(?:-(\w+))?", vers)
if not m:
return ()
vers = tuple([int(x) for x in m.group(1).split(".")])
if m.group(2):
vers += (m.group(2),)
return vers
def _get_server_version_info(self, connection, allow_chars=True):
# NOTE: this function is not reliable, particularly when
# freetds is in use. Implement database-specific server version
# queries.
dbapi_con = connection.connection
version = []
r = re.compile(r"[.\-]")
for n in r.split(dbapi_con.getinfo(self.dbapi.SQL_DBMS_VER)):
try:
version.append(int(n))
except ValueError:
if allow_chars:
version.append(n)
return tuple(version)
def set_isolation_level(self, connection, level):
# adjust for ConnectionFairy being present
# allows attribute set e.g. "connection.autocommit = True"
# to work properly
if hasattr(connection, "connection"):
connection = connection.connection
if level == "AUTOCOMMIT":
connection.autocommit = True
else:
connection.autocommit = False
super(PyODBCConnector, self).set_isolation_level(connection, level)
# connectors/zxJDBC.py
# Copyright (C) 2005-2019 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
import sys
from . import Connector
class ZxJDBCConnector(Connector):
driver = "zxjdbc"
supports_sane_rowcount = False
supports_sane_multi_rowcount = False
supports_unicode_binds = True
supports_unicode_statements = sys.version > "2.5.0+"
description_encoding = None
default_paramstyle = "qmark"
jdbc_db_name = None
jdbc_driver_name = None
@classmethod
def dbapi(cls):
from com.ziclix.python.sql import zxJDBC
return zxJDBC
def _driver_kwargs(self):
"""Return kw arg dict to be sent to connect()."""
return {}
def _create_jdbc_url(self, url):
"""Create a JDBC url from a :class:`~sqlalchemy.engine.url.URL`"""
return "jdbc:%s://%s%s/%s" % (
self.jdbc_db_name,
url.host,
url.port is not None and ":%s" % url.port or "",
url.database,
)
def create_connect_args(self, url):
opts = self._driver_kwargs()
opts.update(url.query)
return [
[
self._create_jdbc_url(url),
url.username,
url.password,
self.jdbc_driver_name,
],
opts,
]
def is_disconnect(self, e, connection, cursor):
if not isinstance(e, self.dbapi.ProgrammingError):
return False
e = str(e)
return "connection is closed" in e or "cursor is closed" in e
def _get_server_version_info(self, connection):
# use connection.connection.dbversion, and parse appropriately
# to get a tuple
raise NotImplementedError()
# databases/__init__.py
# Copyright (C) 2005-2019 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
"""Include imports from the sqlalchemy.dialects package for backwards
compatibility with pre 0.6 versions.
"""
from ..dialects.firebird import base as firebird
from ..dialects.mssql import base as mssql
from ..dialects.mysql import base as mysql
from ..dialects.oracle import base as oracle
from ..dialects.postgresql import base as postgresql
from ..dialects.sqlite import base as sqlite
from ..dialects.sybase import base as sybase
postgres = postgresql
__all__ = (
"firebird",
"mssql",
"mysql",
"postgresql",
"sqlite",
"oracle",
"sybase",
)
# dialects/__init__.py
# Copyright (C) 2005-2019 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
__all__ = (
"firebird",
"mssql",
"mysql",
"oracle",
"postgresql",
"sqlite",
"sybase",
)
from .. import util
_translates = {"postgres": "postgresql"}
def _auto_fn(name):
"""default dialect importer.
plugs into the :class:`.PluginLoader`
as a first-hit system.
"""
if "." in name:
dialect, driver = name.split(".")
else:
dialect = name
driver = "base"
if dialect in _translates:
translated = _translates[dialect]
util.warn_deprecated(
"The '%s' dialect name has been "
"renamed to '%s'" % (dialect, translated)
)
dialect = translated
try:
module = __import__("sqlalchemy.dialects.%s" % (dialect,)).dialects
except ImportError:
return None
module = getattr(module, dialect)
if hasattr(module, driver):
module = getattr(module, driver)
return lambda: module.dialect
else:
return None
registry = util.PluginLoader("sqlalchemy.dialects", auto_fn=_auto_fn)
plugins = util.PluginLoader("sqlalchemy.plugins")
# firebird/__init__.py
# Copyright (C) 2005-2019 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
from sqlalchemy.dialects.firebird.base import BIGINT
from sqlalchemy.dialects.firebird.base import BLOB
from sqlalchemy.dialects.firebird.base import CHAR
from sqlalchemy.dialects.firebird.base import DATE
from sqlalchemy.dialects.firebird.base import FLOAT
from sqlalchemy.dialects.firebird.base import NUMERIC
from sqlalchemy.dialects.firebird.base import SMALLINT
from sqlalchemy.dialects.firebird.base import TEXT
from sqlalchemy.dialects.firebird.base import TIME
from sqlalchemy.dialects.firebird.base import TIMESTAMP
from sqlalchemy.dialects.firebird.base import VARCHAR
from . import base # noqa
from . import fdb # noqa
from . import kinterbasdb # noqa
base.dialect = dialect = fdb.dialect
__all__ = (
"SMALLINT",
"BIGINT",
"FLOAT",
"FLOAT",
"DATE",
"TIME",
"TEXT",
"NUMERIC",
"FLOAT",
"TIMESTAMP",
"VARCHAR",
"CHAR",
"BLOB",
"dialect",
)
# firebird/fdb.py
# Copyright (C) 2005-2019 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
"""
.. dialect:: firebird+fdb
:name: fdb
:dbapi: pyodbc
:connectstring: firebird+fdb://user:password@host:port/path/to/db[?key=value&key=value...]
:url: http://pypi.python.org/pypi/fdb/
fdb is a kinterbasdb compatible DBAPI for Firebird.
.. versionchanged:: 0.9 - The fdb dialect is now the default dialect
under the ``firebird://`` URL space, as ``fdb`` is now the official
Python driver for Firebird.
Arguments
----------
The ``fdb`` dialect is based on the
:mod:`sqlalchemy.dialects.firebird.kinterbasdb` dialect, however does not
accept every argument that Kinterbasdb does.
* ``enable_rowcount`` - True by default, setting this to False disables
the usage of "cursor.rowcount" with the
Kinterbasdb dialect, which SQLAlchemy ordinarily calls upon automatically
after any UPDATE or DELETE statement. When disabled, SQLAlchemy's
ResultProxy will return -1 for result.rowcount. The rationale here is
that Kinterbasdb requires a second round trip to the database when
.rowcount is called - since SQLA's resultproxy automatically closes
the cursor after a non-result-returning statement, rowcount must be
called, if at all, before the result object is returned. Additionally,
cursor.rowcount may not return correct results with older versions
of Firebird, and setting this flag to False will also cause the
SQLAlchemy ORM to ignore its usage. The behavior can also be controlled on a
per-execution basis using the ``enable_rowcount`` option with
:meth:`.Connection.execution_options`::
conn = engine.connect().execution_options(enable_rowcount=True)
r = conn.execute(stmt)
print r.rowcount
* ``retaining`` - False by default. Setting this to True will pass the
``retaining=True`` keyword argument to the ``.commit()`` and ``.rollback()``
methods of the DBAPI connection, which can improve performance in some
situations, but apparently with significant caveats.
Please read the fdb and/or kinterbasdb DBAPI documentation in order to
understand the implications of this flag.
.. versionchanged:: 0.9.0 - the ``retaining`` flag defaults to ``False``.
In 0.8 it defaulted to ``True``.
.. seealso::
http://pythonhosted.org/fdb/usage-guide.html#retaining-transactions
- information on the "retaining" flag.
""" # noqa
from .kinterbasdb import FBDialect_kinterbasdb
from ... import util
class FBDialect_fdb(FBDialect_kinterbasdb):
def __init__(self, enable_rowcount=True, retaining=False, **kwargs):
super(FBDialect_fdb, self).__init__(
enable_rowcount=enable_rowcount, retaining=retaining, **kwargs
)
@classmethod
def dbapi(cls):
return __import__("fdb")
def create_connect_args(self, url):
opts = url.translate_connect_args(username="user")
if opts.get("port"):
opts["host"] = "%s/%s" % (opts["host"], opts["port"])
del opts["port"]
opts.update(url.query)
util.coerce_kw_type(opts, "type_conv", int)
return ([], opts)
def _get_server_version_info(self, connection):
"""Get the version of the Firebird server used by a connection.
Returns a tuple of (`major`, `minor`, `build`), three integers
representing the version of the attached server.
"""
# This is the simpler approach (the other uses the services api),
# that for backward compatibility reasons returns a string like
# LI-V6.3.3.12981 Firebird 2.0
# where the first version is a fake one resembling the old
# Interbase signature.
isc_info_firebird_version = 103
fbconn = connection.connection
version = fbconn.db_info(isc_info_firebird_version)
return self._parse_version_info(version)
dialect = FBDialect_fdb
# firebird/kinterbasdb.py
# Copyright (C) 2005-2019 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
"""
.. dialect:: firebird+kinterbasdb
:name: kinterbasdb
:dbapi: kinterbasdb
:connectstring: firebird+kinterbasdb://user:password@host:port/path/to/db[?key=value&key=value...]
:url: http://firebirdsql.org/index.php?op=devel&sub=python
Arguments
----------
The Kinterbasdb backend accepts the ``enable_rowcount`` and ``retaining``
arguments accepted by the :mod:`sqlalchemy.dialects.firebird.fdb` dialect.
In addition, it also accepts the following:
* ``type_conv`` - select the kind of mapping done on the types: by default
SQLAlchemy uses 200 with Unicode, datetime and decimal support. See
the linked documents below for further information.
* ``concurrency_level`` - set the backend policy with regards to threading
issues: by default SQLAlchemy uses policy 1. See the linked documents
below for further information.
.. seealso::
http://sourceforge.net/projects/kinterbasdb
http://kinterbasdb.sourceforge.net/dist_docs/usage.html#adv_param_conv_dynamic_type_translation
http://kinterbasdb.sourceforge.net/dist_docs/usage.html#special_issue_concurrency
""" # noqa
import decimal
from re import match
from .base import FBDialect
from .base import FBExecutionContext
from ... import types as sqltypes
from ... import util
class _kinterbasdb_numeric(object):
def bind_processor(self, dialect):
def process(value):
if isinstance(value, decimal.Decimal):
return str(value)
else:
return value
return process
class _FBNumeric_kinterbasdb(_kinterbasdb_numeric, sqltypes.Numeric):
pass
class _FBFloat_kinterbasdb(_kinterbasdb_numeric, sqltypes.Float):
pass
class FBExecutionContext_kinterbasdb(FBExecutionContext):
@property
def rowcount(self):
if self.execution_options.get(
"enable_rowcount", self.dialect.enable_rowcount
):
return self.cursor.rowcount
else:
return -1
class FBDialect_kinterbasdb(FBDialect):
driver = "kinterbasdb"
supports_sane_rowcount = False
supports_sane_multi_rowcount = False
execution_ctx_cls = FBExecutionContext_kinterbasdb
supports_native_decimal = True
colspecs = util.update_copy(
FBDialect.colspecs,
{
sqltypes.Numeric: _FBNumeric_kinterbasdb,
sqltypes.Float: _FBFloat_kinterbasdb,
},
)
def __init__(
self,
type_conv=200,
concurrency_level=1,
enable_rowcount=True,
retaining=False,
**kwargs
):
super(FBDialect_kinterbasdb, self).__init__(**kwargs)
self.enable_rowcount = enable_rowcount
self.type_conv = type_conv
self.concurrency_level = concurrency_level
self.retaining = retaining
if enable_rowcount:
self.supports_sane_rowcount = True
@classmethod
def dbapi(cls):
return __import__("kinterbasdb")
def do_execute(self, cursor, statement, parameters, context=None):
# kinterbase does not accept a None, but wants an empty list
# when there are no arguments.
cursor.execute(statement, parameters or [])
def do_rollback(self, dbapi_connection):
dbapi_connection.rollback(self.retaining)
def do_commit(self, dbapi_connection):
dbapi_connection.commit(self.retaining)
def create_connect_args(self, url):
opts = url.translate_connect_args(username="user")
if opts.get("port"):
opts["host"] = "%s/%s" % (opts["host"], opts["port"])
del opts["port"]
opts.update(url.query)
util.coerce_kw_type(opts, "type_conv", int)
type_conv = opts.pop("type_conv", self.type_conv)
concurrency_level = opts.pop(
"concurrency_level", self.concurrency_level
)
if self.dbapi is not None:
initialized = getattr(self.dbapi, "initialized", None)
if initialized is None:
# CVS rev 1.96 changed the name of the attribute:
# http://kinterbasdb.cvs.sourceforge.net/viewvc/kinterbasdb/
# Kinterbasdb-3.0/__init__.py?r1=1.95&r2=1.96
initialized = getattr(self.dbapi, "_initialized", False)
if not initialized:
self.dbapi.init(
type_conv=type_conv, concurrency_level=concurrency_level
)
return ([], opts)
def _get_server_version_info(self, connection):
"""Get the version of the Firebird server used by a connection.
Returns a tuple of (`major`, `minor`, `build`), three integers
representing the version of the attached server.
"""
# This is the simpler approach (the other uses the services api),
# that for backward compatibility reasons returns a string like
# LI-V6.3.3.12981 Firebird 2.0
# where the first version is a fake one resembling the old
# Interbase signature.
fbconn = connection.connection
version = fbconn.server_version
return self._parse_version_info(version)
def _parse_version_info(self, version):
m = match(
r"\w+-V(\d+)\.(\d+)\.(\d+)\.(\d+)( \w+ (\d+)\.(\d+))?", version
)
if not m:
raise AssertionError(
"Could not determine version from string '%s'" % version
)
if m.group(5) != None:
return tuple([int(x) for x in m.group(6, 7, 4)] + ["firebird"])
else:
return tuple([int(x) for x in m.group(1, 2, 3)] + ["interbase"])
def is_disconnect(self, e, connection, cursor):
if isinstance(
e, (self.dbapi.OperationalError, self.dbapi.ProgrammingError)
):
msg = str(e)
return (
"Error writing data to the connection" in msg
or "Unable to complete network request to host" in msg
or "Invalid connection state" in msg
or "Invalid cursor state" in msg
or "connection shutdown" in msg
)
else:
return False
dialect = FBDialect_kinterbasdb
# mssql/__init__.py
# Copyright (C) 2005-2019 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
from . import adodbapi # noqa
from . import base # noqa
from . import mxodbc # noqa
from . import pymssql # noqa
from . import pyodbc # noqa
from . import zxjdbc # noqa
from .base import BIGINT
from .base import BINARY
from .base import BIT
from .base import CHAR
from .base import DATE
from .base import DATETIME
from .base import DATETIME2
from .base import DATETIMEOFFSET
from .base import DECIMAL
from .base import FLOAT
from .base import IMAGE
from .base import INTEGER
from .base import MONEY
from .base import NCHAR
from .base import NTEXT
from .base import NUMERIC
from .base import NVARCHAR
from .base import REAL
from .base import ROWVERSION
from .base import SMALLDATETIME
from .base import SMALLINT
from .base import SMALLMONEY
from .base import SQL_VARIANT
from .base import TEXT
from .base import TIME
from .base import TIMESTAMP
from .base import TINYINT
from .base import try_cast
from .base import UNIQUEIDENTIFIER
from .base import VARBINARY
from .base import VARCHAR
from .base import XML
base.dialect = dialect = pyodbc.dialect
__all__ = (
"INTEGER",
"BIGINT",
"SMALLINT",
"TINYINT",
"VARCHAR",
"NVARCHAR",
"CHAR",
"NCHAR",
"TEXT",
"NTEXT",
"DECIMAL",
"NUMERIC",
"FLOAT",
"DATETIME",
"DATETIME2",
"DATETIMEOFFSET",
"DATE",
"TIME",
"SMALLDATETIME",
"BINARY",
"VARBINARY",
"BIT",
"REAL",
"IMAGE",
"TIMESTAMP",
"ROWVERSION",
"MONEY",
"SMALLMONEY",
"UNIQUEIDENTIFIER",
"SQL_VARIANT",
"XML",
"dialect",
"try_cast",
)
# mssql/adodbapi.py
# Copyright (C) 2005-2019 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
"""
.. dialect:: mssql+adodbapi
:name: adodbapi
:dbapi: adodbapi
:connectstring: mssql+adodbapi://<username>:<password>@<dsnname>
:url: http://adodbapi.sourceforge.net/
.. note::
The adodbapi dialect is not implemented in SQLAlchemy versions 0.6 and
above at this time.
"""
import datetime
import sys
from sqlalchemy import types as sqltypes
from sqlalchemy import util
from sqlalchemy.dialects.mssql.base import MSDateTime
from sqlalchemy.dialects.mssql.base import MSDialect
class MSDateTime_adodbapi(MSDateTime):
def result_processor(self, dialect, coltype):
def process(value):
# adodbapi will return datetimes with empty time
# values as datetime.date() objects.
# Promote them back to full datetime.datetime()
if type(value) is datetime.date:
return datetime.datetime(value.year, value.month, value.day)
return value
return process
class MSDialect_adodbapi(MSDialect):
supports_sane_rowcount = True
supports_sane_multi_rowcount = True
supports_unicode = sys.maxunicode == 65535
supports_unicode_statements = True
driver = "adodbapi"
@classmethod
def import_dbapi(cls):
import adodbapi as module
return module
colspecs = util.update_copy(
MSDialect.colspecs, {sqltypes.DateTime: MSDateTime_adodbapi}
)
def create_connect_args(self, url):
def check_quote(token):
if ";" in str(token):
token = "'%s'" % token
return token
keys = dict((k, check_quote(v)) for k, v in url.query.items())
connectors = ["Provider=SQLOLEDB"]
if "port" in keys:
connectors.append(
"Data Source=%s, %s" % (keys.get("host"), keys.get("port"))
)
else:
connectors.append("Data Source=%s" % keys.get("host"))
connectors.append("Initial Catalog=%s" % keys.get("database"))
user = keys.get("user")
if user:
connectors.append("User Id=%s" % user)
connectors.append("Password=%s" % keys.get("password", ""))
else:
connectors.append("Integrated Security=SSPI")
return [[";".join(connectors)], {}]
def is_disconnect(self, e, connection, cursor):
return isinstance(
e, self.dbapi.adodbapi.DatabaseError
) and "'connection failure'" in str(e)
dialect = MSDialect_adodbapi
# mssql/information_schema.py
# Copyright (C) 2005-2019 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
# TODO: should be using the sys. catalog with SQL Server, not information
# schema
from ... import cast
from ... import Column
from ... import MetaData
from ... import Table
from ... import util
from ...ext.compiler import compiles
from ...sql import expression
from ...types import Integer
from ...types import String
from ...types import TypeDecorator
from ...types import Unicode
ischema = MetaData()
class CoerceUnicode(TypeDecorator):
impl = Unicode
def process_bind_param(self, value, dialect):
if util.py2k and isinstance(value, util.binary_type):
value = value.decode(dialect.encoding)
return value
def bind_expression(self, bindvalue):
return _cast_on_2005(bindvalue)
class _cast_on_2005(expression.ColumnElement):
def __init__(self, bindvalue):
self.bindvalue = bindvalue
@compiles(_cast_on_2005)
def _compile(element, compiler, **kw):
from . import base
if (
compiler.dialect.server_version_info is None
or compiler.dialect.server_version_info < base.MS_2005_VERSION
):
return compiler.process(element.bindvalue, **kw)
else:
return compiler.process(cast(element.bindvalue, Unicode), **kw)
schemata = Table(
"SCHEMATA",
ischema,
Column("CATALOG_NAME", CoerceUnicode, key="catalog_name"),
Column("SCHEMA_NAME", CoerceUnicode, key="schema_name"),
Column("SCHEMA_OWNER", CoerceUnicode, key="schema_owner"),
schema="INFORMATION_SCHEMA",
)
tables = Table(
"TABLES",
ischema,
Column("TABLE_CATALOG", CoerceUnicode, key="table_catalog"),
Column("TABLE_SCHEMA", CoerceUnicode, key="table_schema"),
Column("TABLE_NAME", CoerceUnicode, key="table_name"),
Column("TABLE_TYPE", CoerceUnicode, key="table_type"),
schema="INFORMATION_SCHEMA",
)
columns = Table(
"COLUMNS",
ischema,
Column("TABLE_SCHEMA", CoerceUnicode, key="table_schema"),
Column("TABLE_NAME", CoerceUnicode, key="table_name"),
Column("COLUMN_NAME", CoerceUnicode, key="column_name"),
Column("IS_NULLABLE", Integer, key="is_nullable"),
Column("DATA_TYPE", String, key="data_type"),
Column("ORDINAL_POSITION", Integer, key="ordinal_position"),
Column(
"CHARACTER_MAXIMUM_LENGTH", Integer, key="character_maximum_length"
),
Column("NUMERIC_PRECISION", Integer, key="numeric_precision"),
Column("NUMERIC_SCALE", Integer, key="numeric_scale"),
Column("COLUMN_DEFAULT", Integer, key="column_default"),
Column("COLLATION_NAME", String, key="collation_name"),
schema="INFORMATION_SCHEMA",
)
constraints = Table(
"TABLE_CONSTRAINTS",
ischema,
Column("TABLE_SCHEMA", CoerceUnicode, key="table_schema"),
Column("TABLE_NAME", CoerceUnicode, key="table_name"),
Column("CONSTRAINT_NAME", CoerceUnicode, key="constraint_name"),
Column("CONSTRAINT_TYPE", CoerceUnicode, key="constraint_type"),
schema="INFORMATION_SCHEMA",
)
column_constraints = Table(
"CONSTRAINT_COLUMN_USAGE",
ischema,
Column("TABLE_SCHEMA", CoerceUnicode, key="table_schema"),
Column("TABLE_NAME", CoerceUnicode, key="table_name"),
Column("COLUMN_NAME", CoerceUnicode, key="column_name"),
Column("CONSTRAINT_NAME", CoerceUnicode, key="constraint_name"),
schema="INFORMATION_SCHEMA",
)
key_constraints = Table(
"KEY_COLUMN_USAGE",
ischema,
Column("TABLE_SCHEMA", CoerceUnicode, key="table_schema"),
Column("TABLE_NAME", CoerceUnicode, key="table_name"),
Column("COLUMN_NAME", CoerceUnicode, key="column_name"),
Column("CONSTRAINT_NAME", CoerceUnicode, key="constraint_name"),
Column("CONSTRAINT_SCHEMA", CoerceUnicode, key="constraint_schema"),
Column("ORDINAL_POSITION", Integer, key="ordinal_position"),
schema="INFORMATION_SCHEMA",
)
ref_constraints = Table(
"REFERENTIAL_CONSTRAINTS",
ischema,
Column("CONSTRAINT_CATALOG", CoerceUnicode, key="constraint_catalog"),
Column("CONSTRAINT_SCHEMA", CoerceUnicode, key="constraint_schema"),
Column("CONSTRAINT_NAME", CoerceUnicode, key="constraint_name"),
# TODO: is CATLOG misspelled ?
Column(
"UNIQUE_CONSTRAINT_CATLOG",
CoerceUnicode,
key="unique_constraint_catalog",
),
Column(
"UNIQUE_CONSTRAINT_SCHEMA",
CoerceUnicode,
key="unique_constraint_schema",
),
Column(
"UNIQUE_CONSTRAINT_NAME", CoerceUnicode, key="unique_constraint_name"
),
Column("MATCH_OPTION", String, key="match_option"),
Column("UPDATE_RULE", String, key="update_rule"),
Column("DELETE_RULE", String, key="delete_rule"),
schema="INFORMATION_SCHEMA",
)
views = Table(
"VIEWS",
ischema,
Column("TABLE_CATALOG", CoerceUnicode, key="table_catalog"),
Column("TABLE_SCHEMA", CoerceUnicode, key="table_schema"),
Column("TABLE_NAME", CoerceUnicode, key="table_name"),
Column("VIEW_DEFINITION", CoerceUnicode, key="view_definition"),
Column("CHECK_OPTION", String, key="check_option"),
Column("IS_UPDATABLE", String, key="is_updatable"),
schema="INFORMATION_SCHEMA",
)
# mssql/mxodbc.py
# Copyright (C) 2005-2019 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
"""
.. dialect:: mssql+mxodbc
:name: mxODBC
:dbapi: mxodbc
:connectstring: mssql+mxodbc://<username>:<password>@<dsnname>
:url: http://www.egenix.com/
Execution Modes
---------------
mxODBC features two styles of statement execution, using the
``cursor.execute()`` and ``cursor.executedirect()`` methods (the second being
an extension to the DBAPI specification). The former makes use of a particular
API call specific to the SQL Server Native Client ODBC driver known
SQLDescribeParam, while the latter does not.
mxODBC apparently only makes repeated use of a single prepared statement
when SQLDescribeParam is used. The advantage to prepared statement reuse is
one of performance. The disadvantage is that SQLDescribeParam has a limited
set of scenarios in which bind parameters are understood, including that they
cannot be placed within the argument lists of function calls, anywhere outside
the FROM, or even within subqueries within the FROM clause - making the usage
of bind parameters within SELECT statements impossible for all but the most
simplistic statements.
For this reason, the mxODBC dialect uses the "native" mode by default only for
INSERT, UPDATE, and DELETE statements, and uses the escaped string mode for
all other statements.
This behavior can be controlled via
:meth:`~sqlalchemy.sql.expression.Executable.execution_options` using the
``native_odbc_execute`` flag with a value of ``True`` or ``False``, where a
value of ``True`` will unconditionally use native bind parameters and a value
of ``False`` will unconditionally use string-escaped parameters.
"""
from .base import _MSDate
from .base import _MSDateTime
from .base import _MSTime
from .base import MSDialect
from .base import VARBINARY
from .pyodbc import _MSNumeric_pyodbc
from .pyodbc import MSExecutionContext_pyodbc
from ... import types as sqltypes
from ...connectors.mxodbc import MxODBCConnector
class _MSNumeric_mxodbc(_MSNumeric_pyodbc):
"""Include pyodbc's numeric processor.
"""
class _MSDate_mxodbc(_MSDate):
def bind_processor(self, dialect):
def process(value):
if value is not None:
return "%s-%s-%s" % (value.year, value.month, value.day)
else:
return None
return process
class _MSTime_mxodbc(_MSTime):
def bind_processor(self, dialect):
def process(value):
if value is not None:
return "%s:%s:%s" % (value.hour, value.minute, value.second)
else:
return None
return process
class _VARBINARY_mxodbc(VARBINARY):
"""
mxODBC Support for VARBINARY column types.
This handles the special case for null VARBINARY values,
which maps None values to the mx.ODBC.Manager.BinaryNull symbol.
"""
def bind_processor(self, dialect):
if dialect.dbapi is None:
return None
DBAPIBinary = dialect.dbapi.Binary
def process(value):
if value is not None:
return DBAPIBinary(value)
else:
# should pull from mx.ODBC.Manager.BinaryNull
return dialect.dbapi.BinaryNull
return process
class MSExecutionContext_mxodbc(MSExecutionContext_pyodbc):
"""
The pyodbc execution context is useful for enabling
SELECT SCOPE_IDENTITY in cases where OUTPUT clause
does not work (tables with insert triggers).
"""
# todo - investigate whether the pyodbc execution context
# is really only being used in cases where OUTPUT
# won't work.
class MSDialect_mxodbc(MxODBCConnector, MSDialect):
# this is only needed if "native ODBC" mode is used,
# which is now disabled by default.
# statement_compiler = MSSQLStrictCompiler
execution_ctx_cls = MSExecutionContext_mxodbc
# flag used by _MSNumeric_mxodbc
_need_decimal_fix = True
colspecs = {
sqltypes.Numeric: _MSNumeric_mxodbc,
sqltypes.DateTime: _MSDateTime,
sqltypes.Date: _MSDate_mxodbc,
sqltypes.Time: _MSTime_mxodbc,
VARBINARY: _VARBINARY_mxodbc,
sqltypes.LargeBinary: _VARBINARY_mxodbc,
}
def __init__(self, description_encoding=None, **params):
super(MSDialect_mxodbc, self).__init__(**params)
self.description_encoding = description_encoding
dialect = MSDialect_mxodbc
# mssql/zxjdbc.py
# Copyright (C) 2005-2019 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
"""
.. dialect:: mssql+zxjdbc
:name: zxJDBC for Jython
:dbapi: zxjdbc
:connectstring: mssql+zxjdbc://user:pass@host:port/dbname[?key=value&key=value...]
:driverurl: http://jtds.sourceforge.net/
.. note:: Jython is not supported by current versions of SQLAlchemy. The
zxjdbc dialect should be considered as experimental.
""" # noqa
from .base import MSDialect
from .base import MSExecutionContext
from ... import engine
from ...connectors.zxJDBC import ZxJDBCConnector
class MSExecutionContext_zxjdbc(MSExecutionContext):
_embedded_scope_identity = False
def pre_exec(self):
super(MSExecutionContext_zxjdbc, self).pre_exec()
# scope_identity after the fact returns null in jTDS so we must
# embed it
if self._select_lastrowid and self.dialect.use_scope_identity:
self._embedded_scope_identity = True
self.statement += "; SELECT scope_identity()"
def post_exec(self):
if self._embedded_scope_identity:
while True:
try:
row = self.cursor.fetchall()[0]
break
except self.dialect.dbapi.Error:
self.cursor.nextset()
self._lastrowid = int(row[0])
if (
self.isinsert or self.isupdate or self.isdelete
) and self.compiled.returning:
self._result_proxy = engine.FullyBufferedResultProxy(self)
if self._enable_identity_insert:
table = self.dialect.identifier_preparer.format_table(
self.compiled.statement.table
)
self.cursor.execute("SET IDENTITY_INSERT %s OFF" % table)
class MSDialect_zxjdbc(ZxJDBCConnector, MSDialect):
jdbc_db_name = "jtds:sqlserver"
jdbc_driver_name = "net.sourceforge.jtds.jdbc.Driver"
execution_ctx_cls = MSExecutionContext_zxjdbc
def _get_server_version_info(self, connection):
return tuple(
int(x) for x in connection.connection.dbversion.split(".")
)
dialect = MSDialect_zxjdbc
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment