# coding=utf-8
#
# This file is part of Hypothesis, which may be found at
# https://github.com/HypothesisWorks/hypothesis-python
#
# Most of this work is copyright (C) 2013-2018 David R. MacIver
# (david@drmaciver.com), but it contains contributions by others. See
# CONTRIBUTING.rst for a full list of people who may hold copyright, and
# consult the git log if you need to determine who owns an individual
# contribution.
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file, You can
# obtain one at http://mozilla.org/MPL/2.0/.
#
# END HEADER
from __future__ import division, print_function, absolute_import
import os
import re
import binascii
import warnings
import threading
from hashlib import sha1
from contextlib import contextmanager
from hypothesis.errors import HypothesisWarning
from hypothesis._settings import note_deprecation
from hypothesis.configuration import storage_directory
from hypothesis.internal.compat import FileNotFoundError, hbytes, \
b64decode, b64encode
from hypothesis.utils.conventions import not_set
sqlite3 = None
SQLITE_PATH = re.compile(r"\.\(db|sqlite|sqlite3\)$")
def _db_for_path(path=None):
if path is not_set:
path = os.getenv('HYPOTHESIS_DATABASE_FILE')
if path is not None: # pragma: no cover
# Note: we should retain an explicit deprecation warning for a
# further period after this is removed, to ease debugging for
# anyone migrating to a new version.
note_deprecation(
'The $HYPOTHESIS_DATABASE_FILE environment variable is '
'deprecated, and will be ignored by a future version of '
'Hypothesis. Configure your database location via a '
'settings profile instead.'
)
return _db_for_path(path)
# Note: storage_directory attempts to create the dir in question, so
# if os.access fails there *must* be a fatal permissions issue.
path = storage_directory('examples')
if os.access(path, os.R_OK | os.W_OK | os.X_OK):
return _db_for_path(path)
else: # pragma: no cover
warnings.warn(HypothesisWarning(
'The database setting is not configured, and the default '
'location is unusable - falling back to an in-memory '
'database for this session. path=%r' % (path,)
))
return InMemoryExampleDatabase()
if path in (None, ':memory:'):
return InMemoryExampleDatabase()
path = str(path)
if os.path.isdir(path):
return DirectoryBasedExampleDatabase(path)
if os.path.exists(path):
return SQLiteExampleDatabase(path)
if SQLITE_PATH.search(path):
return SQLiteExampleDatabase(path)
else:
return DirectoryBasedExampleDatabase(path)
class EDMeta(type):
def __call__(self, *args, **kwargs):
if self is ExampleDatabase:
return _db_for_path(*args, **kwargs)
return super(EDMeta, self).__call__(*args, **kwargs)
[docs]class ExampleDatabase(
EDMeta('ExampleDatabase', (object,), {}) # type: ignore
):
"""Interface class for storage systems.
A key -> multiple distinct values mapping.
Keys and values are binary data.
"""
[docs] def save(self, key, value):
"""Save ``value`` under ``key``.
If this value is already present for this key, silently do
nothing
"""
raise NotImplementedError('%s.save' % (type(self).__name__))
[docs] def delete(self, key, value):
"""Remove this value from this key.
If this value is not present, silently do nothing.
"""
raise NotImplementedError('%s.delete' % (type(self).__name__))
[docs] def move(self, src, dest, value):
"""Move value from key src to key dest. Equivalent to delete(src,
value) followed by save(src, value) but may have a more efficient
implementation.
Note that value will be inserted at dest regardless of whether
it is currently present at src.
"""
if src == dest:
self.save(src, value)
return
self.delete(src, value)
self.save(dest, value)
[docs] def fetch(self, key):
"""Return all values matching this key."""
raise NotImplementedError('%s.fetch' % (type(self).__name__))
[docs] def close(self):
"""Clear up any resources associated with this database."""
raise NotImplementedError('%s.close' % (type(self).__name__))
[docs]class InMemoryExampleDatabase(ExampleDatabase):
def __init__(self):
self.data = {}
def __repr__(self):
return 'InMemoryExampleDatabase(%r)' % (self.data,)
def fetch(self, key):
for v in self.data.get(key, ()):
yield v
def save(self, key, value):
self.data.setdefault(key, set()).add(hbytes(value))
def delete(self, key, value):
self.data.get(key, set()).discard(hbytes(value))
def close(self):
pass
class SQLiteExampleDatabase(ExampleDatabase):
def __init__(self, path=u':memory:'):
self.path = path
self.db_created = False
self.current_connection = threading.local()
global sqlite3
import sqlite3
if path == u':memory:':
note_deprecation(
'The SQLite database backend has been deprecated. '
'Use InMemoryExampleDatabase or set database_file=":memory:" '
'instead.'
)
else:
note_deprecation(
'The SQLite database backend has been deprecated. '
'Set database_file to some path name not ending in .db, '
'.sqlite or .sqlite3 to get the new directory based database '
'backend instead.'
)
def connection(self):
if not hasattr(self.current_connection, 'connection'):
self.current_connection.connection = sqlite3.connect(self.path)
return self.current_connection.connection
def close(self):
if hasattr(self.current_connection, 'connection'):
try:
self.connection().close()
finally:
del self.current_connection.connection
def __repr__(self):
return u'%s(%s)' % (self.__class__.__name__, self.path)
@contextmanager
def cursor(self):
conn = self.connection()
cursor = conn.cursor()
try:
try:
yield cursor
finally:
cursor.close()
except BaseException:
conn.rollback()
raise
else:
conn.commit()
def save(self, key, value):
self.create_db_if_needed()
with self.cursor() as cursor:
try:
cursor.execute("""
insert into hypothesis_data_mapping(key, value)
values(?, ?)
""", (b64encode(key), b64encode(value)))
except sqlite3.IntegrityError:
pass
def delete(self, key, value):
self.create_db_if_needed()
with self.cursor() as cursor:
cursor.execute("""
delete from hypothesis_data_mapping
where key = ? and value = ?
""", (b64encode(key), b64encode(value)))
def fetch(self, key):
self.create_db_if_needed()
with self.cursor() as cursor:
cursor.execute("""
select value from hypothesis_data_mapping
where key = ?
""", (b64encode(key),))
for (value,) in cursor:
try:
yield b64decode(value)
except (binascii.Error, TypeError):
pass
def create_db_if_needed(self):
if self.db_created:
return
with self.cursor() as cursor:
cursor.execute("""
create table if not exists hypothesis_data_mapping(
key text,
value text,
unique(key, value)
)
""")
self.db_created = True
def mkdirp(path):
try:
os.makedirs(path)
except OSError:
pass
return path
def _hash(key):
return sha1(key).hexdigest()[:16]
[docs]class DirectoryBasedExampleDatabase(ExampleDatabase):
def __init__(self, path):
self.path = path
self.keypaths = {}
def __repr__(self):
return 'DirectoryBasedExampleDatabase(%r)' % (self.path,)
def close(self):
pass
def _key_path(self, key):
try:
return self.keypaths[key]
except KeyError:
pass
directory = os.path.join(self.path, _hash(key))
mkdirp(directory)
self.keypaths[key] = directory
return directory
def _value_path(self, key, value):
return os.path.join(
self._key_path(key),
sha1(value).hexdigest()[:16]
)
def fetch(self, key):
kp = self._key_path(key)
for path in os.listdir(kp):
try:
with open(os.path.join(kp, path), 'rb') as i:
yield hbytes(i.read())
except FileNotFoundError:
pass
def save(self, key, value):
path = self._value_path(key, value)
if not os.path.exists(path):
suffix = binascii.hexlify(os.urandom(16))
if not isinstance(suffix, str): # pragma: no branch
# On Python 3, binascii.hexlify returns bytes
suffix = suffix.decode('ascii')
tmpname = path + '.' + suffix
with open(tmpname, 'wb') as o:
o.write(value)
try:
os.rename(tmpname, path)
except OSError: # pragma: no cover
os.unlink(tmpname)
assert not os.path.exists(tmpname)
def move(self, src, dest, value):
if src == dest:
self.save(src, value)
return
try:
os.rename(
self._value_path(src, value), self._value_path(dest, value))
except OSError:
self.delete(src, value)
self.save(dest, value)
def delete(self, key, value):
try:
os.unlink(self._value_path(key, value))
except OSError:
pass