owned this note
owned this note
Published
Linked with GitHub
# D18 撰寫 Flask Web 程式存取 MongoDB
使用 Flask 撰寫一個會員系統,將會員資料存入 MongoDB,並提供 4 種 API 操作 MongoDB 中的資料,分別是新增、修改、刪除、查詢。
## 1. Create
```python=
@app.route('/members', methods=['POST'])
def create_member():
created_count = 0
# Get data from forms
name = request.form.get('name')
age = request.form.get('age')
phone = request.form.get('phone')
email = request.form.get('email')
# insert_one to create a new member using form data
result = mongo.db.members.insert_one({'name': name, 'age': age, 'phone': phone, 'email': email, 'status': "active"})
if result is not None:
created_count = 1
created_id = str(result.inserted_id)
# 回傳新增資料的 id
return jsonify({"created_count": created_count, "_id": created_id})
```
## 2. Update
```python=
@app.route('/member/<id>', methods = ['PUT'])
def update_member(id):
result = 0
# get data from forms
name = request.form.get('name')
age = request.form.get('age')
phone = request.form.get('phone')
email = request.form.get('email')
status = request.form.get('status')
new_val = { '$set': {"name": name, 'age': age, 'phone': phone, 'email': email, 'status': status} }
update_result = mongo.db.members.update_one({"_id": ObjectId(id)}, new_val)
if update_result is not None:
result = update_result.modified_count
return jsonify({"updated_count": result, '_id': id})
```
## 3. Delete
```python=
@app.route('/member/<id>', methods = ['DELETE'])
def delete_member(id):
result = 0
member = mongo.db.members.find_one({'_id': ObjectId(id)})
if member is not None:
id = ObjectId(member['_id'])
delete_result = mongo.db.members.delete_one({'_id': id})
result = delete_result.deleted_count
return jsonify({"deleted_count": result, '_id': str(id)})
```
## 4. Read
```python=
@app.route('/members')
def get_all_members():
members = mongo.db.members.find({})
results = []
for member in members:
member['_id'] = str(member['_id'])
results.append(member)
return jsonify(results)
@app.route('/member/<id>', methods=["GET"])
def get_member(id=None):
if id is None:
return None
else:
try:
oid = ObjectId(id)
except:
return Response("Invalid bson ObjectID: {}".format(id), status=400)
result = mongo.db.members.find_one({'_id': oid})
if result is not None:
result['_id'] = str(result['_id'])
return jsonify(result)
```
## 5. Test Code
```python=
import requests
base = "http://localhost:5000/member"
# Create a new test member
name = "Testy Tester"
age = 100
phone = "+9919919191"
email = "test@test.com"
update_age = 40
update_status = 'inactive'
data = {"name": name, "age": age, "phone": phone, "email": email}
r = requests.post("http://localhost:5000/members", data=data)
assert r.status_code == 200
assert r.json()['created_count'] == 1
id = r.json()['_id']
try:
# get that member
r = requests.get("{}/{}".format(base, id))
assert r.status_code == 200
assert r.json()['name'] == name
except:
print("Get method error occured ... ")
try:
# update that member
data['age'] = update_age
data['status'] = update_status
r = requests.put("{}/{}".format(base, id), data=data)
assert r.status_code == 200
except:
print("PUT method error occured ... ")
try:
r = requests.get("{}/{}".format(base, id))
assert r.status_code == 200
assert r.json()['age'] == str(update_age)
assert r.json()['status'] == update_status
except:
print("Get error on updated info occured ...\n")
# delete that member
r = requests.delete("{}/{}".format(base, id))
assert r.status_code == 200
assert r.json()['deleted_count'] == 1
r = requests.get("{}/{}".format(base, id))
assert r.status_code == 200
assert r.json() == None
```
## 6. Authentication using environment variables
Uses environment variable to authenticate Mongodb
```bash=
export MONGO_USER=your_mongo_username
export MONGO_PWD=your_mongo_pwd
export DB_NAME=name_of_your_mongo_database
```
## 7. Flask app code
```python=
from flask import Flask, render_template, request
from flask_pymongo import PyMongo
from flask.json import jsonify
from bson.objectid import ObjectId
from urllib.parse import quote_plus
import os
# Get user and pwd from environment variables
username = os.environ['MONGO_USER']
pwd = os.environ['MONGO_PWD']
db_name = os.environ['DB_NAME']
# transform user and pwd to safty string with quote_plus
username = quote_plus(username)
pwd = quote_plus(pwd)
db_name = quote_plus(db_name)
app = Flask(__name__)
app.config["MONGO_URI"] = "mongodb://{}:{}@localhost:27017/{}".format(username, pwd, db_name)
mongo = PyMongo(app)
@app.route('/members', methods=['POST'])
def create_member():
created_count = 0
# Get data from forms
name = request.form.get('name')
age = request.form.get('age')
phone = request.form.get('phone')
email = request.form.get('email')
# insert_one to create a new member using form data
result = mongo.db.members.insert_one({'name': name, 'age': age, 'phone': phone, 'email': email, 'status': "active"})
if result is not None:
created_count = 1
created_id = str(result.inserted_id)
# 回傳新增資料的 id
return jsonify({"created_count": created_count, "_id": created_id})
@app.route('/member/<id>', methods = ['PUT'])
def update_member(id):
result = 0
# get data from forms
name = request.form.get('name')
age = request.form.get('age')
phone = request.form.get('phone')
email = request.form.get('email')
status = request.form.get('status')
new_val = { '$set': {"name": name, 'age': age, 'phone': phone, 'email': email, 'status': status} }
update_result = mongo.db.members.update_one({"_id": ObjectId(id)}, new_val)
if update_result is not None:
result = update_result.modified_count
return jsonify({"updated_count": result, '_id': id})
@app.route('/member/<id>', methods = ['DELETE'])
def delete_member(id):
result = 0
member = mongo.db.members.find_one({'_id': ObjectId(id)})
if member is not None:
id = ObjectId(member['_id'])
delete_result = mongo.db.members.delete_one({'_id': id})
result = delete_result.deleted_count
return jsonify({"deleted_count": result, '_id': str(id)})
@app.route('/members')
def get_all_members():
members = mongo.db.members.find({})
results = []
for member in members:
member['_id'] = str(member['_id'])
results.append(member)
return jsonify(results)
@app.route('/member/<id>', methods=["GET"])
def get_member(id=None):
if id is None:
return None
else:
try:
oid = ObjectId(id)
except:
return Response("Invalid bson ObjectID: {}".format(id), status=400)
result = mongo.db.members.find_one({'_id': oid})
if result is not None:
result['_id'] = str(result['_id'])
return jsonify(result)
if __name__ == "__main__":
app.run(debug=True, host='0.0.0.0')
```