This ensures `isort` is applied to all files in the repo. Change-Id: Ib7ced1c924ef1639542bf0d1a01c5737f6ba43e9
245 lines
9.0 KiB
Python
245 lines
9.0 KiB
Python
# Copyright (c) 2023 The Regents of the University of California
|
|
# All rights reserved.
|
|
#
|
|
# Redistribution and use in source and binary forms, with or without
|
|
# modification, are permitted provided that the following conditions are
|
|
# met: redistributions of source code must retain the above copyright
|
|
# notice, this list of conditions and the following disclaimer;
|
|
# redistributions in binary form must reproduce the above copyright
|
|
# notice, this list of conditions and the following disclaimer in the
|
|
# documentation and/or other materials provided with the distribution;
|
|
# neither the name of the copyright holders nor the names of its
|
|
# contributors may be used to endorse or promote products derived from
|
|
# this software without specific prior written permission.
|
|
#
|
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
|
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
|
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
|
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
|
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
|
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
|
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
|
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
|
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
|
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
|
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
|
|
import json
|
|
from typing import (
|
|
Dict,
|
|
List,
|
|
)
|
|
|
|
import pymongo
|
|
from api.client import Client
|
|
from bson import json_util
|
|
from pymongo import MongoClient
|
|
from pymongo.errors import (
|
|
ConfigurationError,
|
|
ConnectionFailure,
|
|
)
|
|
|
|
|
|
class DatabaseConnectionError(Exception):
|
|
"Raised for failure to connect to MongoDB client"
|
|
pass
|
|
|
|
|
|
class MongoDBClient(Client):
|
|
def __init__(self, mongo_uri, database_name, collection_name):
|
|
super().__init__()
|
|
self.mongo_uri = mongo_uri
|
|
self.collection_name = collection_name
|
|
self.database_name = database_name
|
|
self.collection = self._get_database(
|
|
mongo_uri, database_name, collection_name
|
|
)
|
|
|
|
def _get_database(
|
|
self,
|
|
mongo_uri: str,
|
|
database_name: str,
|
|
collection_name: str,
|
|
) -> pymongo.collection.Collection:
|
|
"""
|
|
This function returns a MongoDB database object for the specified
|
|
collection.
|
|
It takes three arguments: 'mongo_uri', 'database_name', and
|
|
'collection_name'.
|
|
|
|
:param: mongo_uri: URI of the MongoDB instance
|
|
:param: database_name: Name of the database
|
|
:param: collection_name: Name of the collection
|
|
:return: database: MongoDB database object
|
|
"""
|
|
|
|
try:
|
|
client = MongoClient(mongo_uri)
|
|
client.admin.command("ping")
|
|
except ConnectionFailure:
|
|
client.close()
|
|
raise DatabaseConnectionError(
|
|
"Could not connect to MongoClient with given URI!"
|
|
)
|
|
except ConfigurationError as e:
|
|
raise DatabaseConnectionError(e)
|
|
|
|
database = client[database_name]
|
|
if database.name not in client.list_database_names():
|
|
raise DatabaseConnectionError("Database Does not Exist!")
|
|
|
|
collection = database[collection_name]
|
|
if collection.name not in database.list_collection_names():
|
|
raise DatabaseConnectionError("Collection Does not Exist!")
|
|
|
|
return collection
|
|
|
|
def find_resource(self, query: Dict) -> Dict:
|
|
"""
|
|
Find a resource in the database
|
|
|
|
:param query: JSON object with id and resource_version
|
|
:return: json_resource: JSON object with request resource or
|
|
error message
|
|
"""
|
|
if "resource_version" not in query or query["resource_version"] == "":
|
|
resource = (
|
|
self.collection.find({"id": query["id"]}, {"_id": 0})
|
|
.sort("resource_version", -1)
|
|
.limit(1)
|
|
)
|
|
else:
|
|
resource = (
|
|
self.collection.find(
|
|
{
|
|
"id": query["id"],
|
|
"resource_version": query["resource_version"],
|
|
},
|
|
{"_id": 0},
|
|
)
|
|
.sort("resource_version", -1)
|
|
.limit(1)
|
|
)
|
|
json_resource = json_util.dumps(resource)
|
|
res = json.loads(json_resource)
|
|
if res == []:
|
|
return {"exists": False}
|
|
return res[0]
|
|
|
|
def update_resource(self, query: Dict) -> Dict[str, str]:
|
|
"""
|
|
This function updates a resource in the database by first checking if
|
|
the resource version in the request matches the resource version
|
|
stored in the database.
|
|
If they match, the resource is updated in the database. If they do not
|
|
match, the update is rejected.
|
|
|
|
:param: query: JSON object with original_resource and the
|
|
updated resource
|
|
:return: json_response: JSON object with status message
|
|
"""
|
|
original_resource = query["original_resource"]
|
|
modified_resource = query["resource"]
|
|
try:
|
|
self.collection.replace_one(
|
|
{
|
|
"id": original_resource["id"],
|
|
"resource_version": original_resource["resource_version"],
|
|
},
|
|
modified_resource,
|
|
)
|
|
except Exception as e:
|
|
print(e)
|
|
return {"status": "Resource does not exist"}
|
|
return {"status": "Updated"}
|
|
|
|
def get_versions(self, query: Dict) -> List[Dict]:
|
|
"""
|
|
This function retrieves all versions of a resource with the given ID
|
|
from the database.
|
|
It takes two arguments, the database object and a JSON object
|
|
containing the 'id' key of the resource to be retrieved.
|
|
|
|
:param: query: JSON object with id
|
|
:return: json_resource: JSON object with all resource versions
|
|
"""
|
|
versions = self.collection.find(
|
|
{"id": query["id"]}, {"resource_version": 1, "_id": 0}
|
|
).sort("resource_version", -1)
|
|
# convert to json
|
|
res = json_util.dumps(versions)
|
|
return json_util.loads(res)
|
|
|
|
def delete_resource(self, query: Dict) -> Dict[str, str]:
|
|
"""
|
|
This function deletes a resource from the database by first checking
|
|
if the resource version in the request matches the resource version
|
|
stored in the database.
|
|
If they match, the resource is deleted from the database. If they do
|
|
not match, the delete operation is rejected
|
|
|
|
:param: query: JSON object with id and resource_version
|
|
:return: json_response: JSON object with status message
|
|
"""
|
|
self.collection.delete_one(
|
|
{"id": query["id"], "resource_version": query["resource_version"]}
|
|
)
|
|
return {"status": "Deleted"}
|
|
|
|
def insert_resource(self, query: Dict) -> Dict[str, str]:
|
|
"""
|
|
This function inserts a new resource into the database using the
|
|
'insert_one' method of the MongoDB client.
|
|
The function takes two arguments, the database object and the JSON
|
|
object representing the new resource to be inserted.
|
|
|
|
:param: json: JSON object representing the new resource to be inserted
|
|
:return: json_response: JSON object with status message
|
|
"""
|
|
try:
|
|
self.collection.insert_one(query)
|
|
except Exception as e:
|
|
return {"status": "Resource already exists"}
|
|
return {"status": "Inserted"}
|
|
|
|
def check_resource_exists(self, query: Dict) -> Dict:
|
|
"""
|
|
This function checks if a resource exists in the database by searching
|
|
for a resource with a matching 'id' and 'resource_version' in
|
|
the database.
|
|
The function takes two arguments, the database object and a JSON object
|
|
containing the 'id' and 'resource_version' keys.
|
|
|
|
:param: json: JSON object with id and resource_version
|
|
:return: json_response: JSON object with boolean 'exists' key
|
|
"""
|
|
resource = (
|
|
self.collection.find(
|
|
{
|
|
"id": query["id"],
|
|
"resource_version": query["resource_version"],
|
|
},
|
|
{"_id": 0},
|
|
)
|
|
.sort("resource_version", -1)
|
|
.limit(1)
|
|
)
|
|
json_resource = json_util.dumps(resource)
|
|
res = json.loads(json_resource)
|
|
if res == []:
|
|
return {"exists": False}
|
|
return {"exists": True}
|
|
|
|
def save_session(self) -> Dict:
|
|
"""
|
|
This function saves the client session to a dictionary.
|
|
:return: A dictionary containing the client session.
|
|
"""
|
|
session = {
|
|
"client": "mongodb",
|
|
"uri": self.mongo_uri,
|
|
"database": self.database_name,
|
|
"collection": self.collection_name,
|
|
}
|
|
return session
|