storage: added tests for renaming
This commit is contained in:
parent
95ed51dd64
commit
25dc5b2200
@ -78,14 +78,14 @@ class FileSystem(Storage):
|
|||||||
if not path.exists():
|
if not path.exists():
|
||||||
raise OSError(f"Path {path} does not exist.")
|
raise OSError(f"Path {path} does not exist.")
|
||||||
new_path = self._get_path(new_name)
|
new_path = self._get_path(new_name)
|
||||||
if path.exists():
|
if new_path.exists():
|
||||||
raise OSError(f"Path {path} already exists.")
|
raise OSError(f"Path {path} already exists.")
|
||||||
|
|
||||||
meta_path = self._get_meta_path(name)
|
meta_path = self._get_meta_path(name)
|
||||||
if not path.exists():
|
if not meta_path.exists():
|
||||||
raise OSError(f"Path {meta_path} does not exist.")
|
raise OSError(f"Path {meta_path} does not exist.")
|
||||||
new_meta_path = self._get_meta_path(name)
|
new_meta_path = self._get_meta_path(new_name)
|
||||||
if path.exists():
|
if new_meta_path.exists():
|
||||||
raise OSError(f"Path {path} already exists.")
|
raise OSError(f"Path {path} already exists.")
|
||||||
|
|
||||||
path.rename(new_path)
|
path.rename(new_path)
|
||||||
|
@ -20,6 +20,18 @@ def rand():
|
|||||||
r.seed(0)
|
r.seed(0)
|
||||||
return r
|
return r
|
||||||
|
|
||||||
|
def clear_filesystem():
|
||||||
|
if app.config["SACHET_STORAGE"] == "filesystem":
|
||||||
|
for file in itertools.chain(
|
||||||
|
storage._meta_directory.iterdir(),
|
||||||
|
storage._files_directory.iterdir(),
|
||||||
|
):
|
||||||
|
if file.is_relative_to(Path(app.instance_path)) and file.is_file():
|
||||||
|
file.unlink()
|
||||||
|
else:
|
||||||
|
raise OSError(
|
||||||
|
f"Attempted to delete {file}: please delete it yourself."
|
||||||
|
)
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def client(config={}):
|
def client(config={}):
|
||||||
@ -32,20 +44,11 @@ def client(config={}):
|
|||||||
db.drop_all()
|
db.drop_all()
|
||||||
db.create_all()
|
db.create_all()
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
clear_filesystem()
|
||||||
yield client
|
yield client
|
||||||
|
clear_filesystem()
|
||||||
db.session.remove()
|
db.session.remove()
|
||||||
db.drop_all()
|
db.drop_all()
|
||||||
if app.config["SACHET_STORAGE"] == "filesystem":
|
|
||||||
for file in itertools.chain(
|
|
||||||
storage._meta_directory.iterdir(),
|
|
||||||
storage._files_directory.iterdir(),
|
|
||||||
):
|
|
||||||
if file.is_relative_to(Path(app.instance_path)) and file.is_file():
|
|
||||||
file.unlink()
|
|
||||||
else:
|
|
||||||
raise OSError(
|
|
||||||
f"Attempted to delete {file}: please delete it yourself."
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
|
@ -10,7 +10,7 @@ from uuid import UUID
|
|||||||
@pytest.mark.parametrize("client", [{"SACHET_STORAGE": "filesystem"}], indirect=True)
|
@pytest.mark.parametrize("client", [{"SACHET_STORAGE": "filesystem"}], indirect=True)
|
||||||
class TestSuite:
|
class TestSuite:
|
||||||
def test_creation(self, client, rand):
|
def test_creation(self, client, rand):
|
||||||
"""Test the process of creating, writing, then reading files with metadata."""
|
"""Test the process of creating, writing, then reading files with metadata, and also listing files."""
|
||||||
|
|
||||||
files = [
|
files = [
|
||||||
dict(
|
dict(
|
||||||
@ -38,3 +38,37 @@ class TestSuite:
|
|||||||
assert saved_data == file["data"]
|
assert saved_data == file["data"]
|
||||||
saved_meta = storage.read_metadata(file["name"])
|
saved_meta = storage.read_metadata(file["name"])
|
||||||
assert saved_meta == file["metadata"]
|
assert saved_meta == file["metadata"]
|
||||||
|
|
||||||
|
assert sorted([f.name for f in storage.list_files()]) == sorted(
|
||||||
|
[f["name"] for f in files]
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_rename(self, client, rand):
|
||||||
|
files = [
|
||||||
|
dict(
|
||||||
|
name=str(UUID(bytes=rand.randbytes(16))),
|
||||||
|
new_name=str(UUID(bytes=rand.randbytes(16))),
|
||||||
|
data=rand.randbytes(4000),
|
||||||
|
metadata=dict(
|
||||||
|
sdljkf=dict(abc="def", aaa="bbb"),
|
||||||
|
lkdsjf=dict(ld="sdlfj", sdljf="sdlkjf"),
|
||||||
|
sdlfkj="sjdlkfsldk",
|
||||||
|
ssjdklf=rand.randint(-1000, 1000),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
for i in range(25)
|
||||||
|
]
|
||||||
|
|
||||||
|
for file in files:
|
||||||
|
storage.create(file["name"])
|
||||||
|
with storage.open(file["name"], mode="wb") as f:
|
||||||
|
f.write(file["data"])
|
||||||
|
storage.write_metadata(file["name"], file["metadata"])
|
||||||
|
storage.rename(file["name"], file["new_name"])
|
||||||
|
|
||||||
|
for file in files:
|
||||||
|
with storage.open(file["new_name"], mode="rb") as f:
|
||||||
|
saved_data = f.read()
|
||||||
|
assert saved_data == file["data"]
|
||||||
|
saved_meta = storage.read_metadata(file["new_name"])
|
||||||
|
assert saved_meta == file["metadata"]
|
||||||
|
Loading…
Reference in New Issue
Block a user