storage: refactor to be more OOP

This commit is contained in:
dogeystamp 2023-04-09 17:49:26 -04:00
parent 25dc5b2200
commit 6749391b3c
Signed by: dogeystamp
GPG Key ID: 7225FE3592EFFA38
4 changed files with 153 additions and 139 deletions

View File

@ -1,69 +1,80 @@
class Storage:
"""Generic storage interface.
Raises:
OSError if SACHET_FILE_DIR could not be opened as a directory.
Raises
------
OSError
If the storage could not be initialized.
"""
def create(self, name):
"""Create file along with the metadata.
Raises:
OSError if the file already exists.
"""
pass
def open(self, name, mode="r"):
"""Open a file for reading/writing.
Raises:
OSError if the file does not exist.
Returns:
Stream to access file (similar to open()'s handle.)
"""
pass
def read_metadata(self, name):
"""Get metadata for a file.
Raises:
OSError if the file does not exist.
"""
pass
def write_metadata(self, name, data):
"""Set metadata for a file.
Raises:
OSError if the file does not exist.
"""
pass
def delete(self, name):
"""Delete file and associated metadata.
Raises:
OSError if the file does not exist.
"""
pass
def rename(self, name, new_name):
"""Rename a file.
Raises:
OSError if the file does not exist.
"""
pass
def list_files(self):
"""Lists all files."""
"""Lists all files.
Returns
-------
list of File
"""
pass
def get_file(self, name):
"""Return a File handle for a given file.
The file will be created if it does not exist yet.
Parameters
----------
name : str
Filename to access.
"""
pass
class File:
"""Handle for a file and its metadata.
Do not instantiate this; use `Storage.get_file()`.
Attributes
----------
metadata : _Metadata
All the metadata, accessible via dot attribute notation.
name : str
Filename
"""
def open(self, mode="r"):
"""Open file for reading/writing.
Parameters
----------
mode : str, optional
Mode of access (same as `open()`.)
Returns
-------
_io.TextIOWrapper
Stream to access the file (just like the builtin `open()`.)
"""
pass
def delete(self):
"""Delete file and associated metadata."""
pass
def rename(self, new_name):
"""Rename a file.
Parameters
----------
new_name : str
New name for the file.
"""
pass

View File

@ -1,12 +1,14 @@
from sachet.storage import Storage
from pathlib import Path
from sachet.server import app
from werkzeug.utils import secure_filename
import json
class FileSystem(Storage):
def __init__(self):
# prevent circular import when inspecting this file outside of Flask
from sachet.server import app
config_path = Path(app.config["SACHET_FILE_DIR"])
if config_path.is_absolute():
self._directory = config_path
@ -30,66 +32,65 @@ class FileSystem(Storage):
name = secure_filename(name)
return self._meta_directory / Path(name)
def create(self, name):
path = self._get_path(name)
if path.exists():
raise OSError(f"Path {path} already exists.")
def list_files(self):
return [
self.get_file(x.name)
for x in self._files_directory.iterdir()
if x.is_file()
]
meta_path = self._get_meta_path(name)
if meta_path.exists():
raise OSError(f"Path {meta_path} already exists.")
def get_file(self, name):
return self.File(self, name)
path.touch()
meta_path.touch()
class File:
def __init__(self, storage, name):
self.name = name
self._storage = storage
self._path = self._storage._get_path(name)
self._meta_path = self._storage._get_meta_path(name)
self._path.touch()
self._meta_path.touch()
self.metadata = self._Metadata(self)
def delete(self, name):
path = self._get_path(name)
if not path.exists():
raise OSError(f"Path {path} does not exist.")
path.unlink()
self._path.unlink()
self._meta_path.unlink()
def open(self, name, mode="r"):
path = self._get_path(name)
if not path.exists():
raise OSError(f"Path {path} does not exist.")
def open(self, mode="r"):
return self._path.open(mode=mode)
return path.open(mode=mode)
def read_metadata(self, name):
meta_path = self._get_meta_path(name)
if not meta_path.exists():
raise OSError(f"Path {meta_path} does not exist.")
with meta_path.open() as meta_file:
content = meta_file.read()
return json.loads(content)
def write_metadata(self, name, data):
meta_path = self._get_meta_path(name)
if not meta_path.exists():
raise OSError(f"Path {meta_path} does not exist.")
with meta_path.open("w") as meta_file:
content = json.dumps(data)
meta_file.write(content)
def rename(self, name, new_name):
path = self._get_path(name)
if not path.exists():
raise OSError(f"Path {path} does not exist.")
new_path = self._get_path(new_name)
def rename(self, new_name):
new_path = self._storage._get_path(new_name)
if new_path.exists():
raise OSError(f"Path {path} already exists.")
meta_path = self._get_meta_path(name)
if not meta_path.exists():
raise OSError(f"Path {meta_path} does not exist.")
new_meta_path = self._get_meta_path(new_name)
new_meta_path = self._storage._get_meta_path(new_name)
if new_meta_path.exists():
raise OSError(f"Path {path} already exists.")
path.rename(new_path)
meta_path.rename(new_meta_path)
self._path.rename(new_path)
self._meta_path.rename(new_meta_path)
def list_files(self):
return [x for x in self._files_directory.iterdir() if x.is_file()]
class _Metadata:
def __init__(self, file):
self.__dict__["_file"] = file
@property
def __data(self):
with self._file._meta_path.open() as meta_file:
content = meta_file.read()
if len(content.strip()) == 0:
return {}
return json.loads(content)
# there is no setter for __data because it would cause __setattr__ to infinitely recurse
def __setattr__(self, name, value):
data = self.__data
data[name] = value
with self._file._meta_path.open("w") as meta_file:
content = json.dumps(data)
meta_file.write(content)
def __getattr__(self, name):
return self.__data.get(name, None)

View File

@ -20,6 +20,7 @@ def rand():
r.seed(0)
return r
def clear_filesystem():
if app.config["SACHET_STORAGE"] == "filesystem":
for file in itertools.chain(
@ -29,9 +30,8 @@ def clear_filesystem():
if file.is_relative_to(Path(app.instance_path)) and file.is_file():
file.unlink()
else:
raise OSError(
f"Attempted to delete {file}: please delete it yourself."
)
raise OSError(f"Attempted to delete {file}: please delete it yourself.")
@pytest.fixture
def client(config={}):

View File

@ -27,16 +27,17 @@ class TestSuite:
]
for file in files:
storage.create(file["name"])
with storage.open(file["name"], mode="wb") as f:
handle = storage.get_file(file["name"])
with handle.open(mode="wb") as f:
f.write(file["data"])
storage.write_metadata(file["name"], file["metadata"])
handle.metadata.test_data = file["metadata"]
for file in files:
with storage.open(file["name"], mode="rb") as f:
handle = storage.get_file(file["name"])
with handle.open(mode="rb") as f:
saved_data = f.read()
assert saved_data == file["data"]
saved_meta = storage.read_metadata(file["name"])
saved_meta = handle.metadata.test_data
assert saved_meta == file["metadata"]
assert sorted([f.name for f in storage.list_files()]) == sorted(
@ -60,15 +61,16 @@ class TestSuite:
]
for file in files:
storage.create(file["name"])
with storage.open(file["name"], mode="wb") as f:
handle = storage.get_file(file["name"])
with handle.open(mode="wb") as f:
f.write(file["data"])
storage.write_metadata(file["name"], file["metadata"])
storage.rename(file["name"], file["new_name"])
handle.metadata.test_data = file["metadata"]
handle.rename(file["new_name"])
for file in files:
with storage.open(file["new_name"], mode="rb") as f:
handle = storage.get_file(file["new_name"])
with handle.open(mode="rb") as f:
saved_data = f.read()
assert saved_data == file["data"]
saved_meta = storage.read_metadata(file["new_name"])
saved_meta = handle.metadata.test_data
assert saved_meta == file["metadata"]