main.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. # -*- coding: utf-8 -*-
  2. import json
  3. from secrets import token_hex
  4. from fastapi import FastAPI, Request
  5. from fastapi.middleware.cors import CORSMiddleware
  6. from fastapi.responses import JSONResponse
  7. from database import db, cur
  8. from config import ADMIN_SECRET
  9. from errors import Error
  10. from models import CreateUser, Event
  11. app = FastAPI()
  12. app.add_middleware(
  13. CORSMiddleware,
  14. allow_methods=['*'],
  15. allow_headers=['*'],
  16. allow_origins=['*'],
  17. allow_credentials=True
  18. )
  19. async def set_body(req: Request, body: bytes):
  20. async def receive():
  21. return {"type": "http.request", "body": body}
  22. req._receive = receive
  23. @app.middleware('http')
  24. async def middleware(req: Request, call_next):
  25. method = req.method
  26. if method == 'POST':
  27. body = await req.body()
  28. await set_body(req, body)
  29. body = json.loads(body)
  30. if 'access_token' in body:
  31. u = cur.execute('SELECT * FROM user WHERE access_token = ?', (body['access_token'],)).fetchone()
  32. if u[4] != 1:
  33. return JSONResponse(content=Error.ACCEPT_DENIED)
  34. elif 'secret' not in body or body['secret'] != ADMIN_SECRET:
  35. return JSONResponse(content=Error.ACCEPT_DENIED)
  36. return await call_next(req)
  37. @app.post('/user')
  38. async def create_user(user: CreateUser):
  39. u = cur.execute('SELECT * FROM user WHERE login = ?', (user.login,)).fetchone()
  40. if u is not None:
  41. return Error.LOGIN_IS_EXISTS
  42. roles = [i[0] for i in cur.execute('SELECT * FROM role').fetchall()]
  43. if user.role not in roles:
  44. return Error.ROLE_IS_NOT_EXISTS
  45. token = token_hex(32)
  46. cur.execute(
  47. 'INSERT INTO user (name, role, login, password, access_token) VALUES (?, ?, ?, ?, ?)',
  48. (user.name, user.role, user.login, user.password, token)
  49. )
  50. db.commit()
  51. return {
  52. 'response': {
  53. 'id': cur.lastrowid,
  54. 'access_token': token
  55. }
  56. }
  57. @app.get('/user{user_id}')
  58. async def get_user_data(user_id: int):
  59. u = cur.execute('SELECT * FROM user WHERE id = ?', (user_id,)).fetchone()
  60. if u is None:
  61. return Error.USER_IS_NOT_EXISTS
  62. role = cur.execute('SELECT * FROM role WHERE id = ?', (u[4],)).fetchone()
  63. return {'response': {
  64. 'id': u[0],
  65. 'name': u[1],
  66. 'login': u[2],
  67. 'role': role[1]
  68. }}
  69. @app.post('/event')
  70. async def create_event(event: Event):
  71. u = cur.execute('SELECT * FROM user WHERE id = ?', (event.author,)).fetchone()
  72. if u is None:
  73. return Error.USER_IS_NOT_EXISTS
  74. cur.execute(
  75. 'INSERT INTO event (title, author, date) VALUES (?, ?, ?)',
  76. (event.title, event.author, event.date)
  77. )
  78. db.commit()
  79. return {'response': {
  80. 'id': cur.lastrowid,
  81. }}
  82. @app.get('/event{event_id}')
  83. async def get_event_by_id(event_id: int):
  84. event = cur.execute('SELECT * FROM event WHERE id = ?', (event_id,)).fetchone()
  85. if event is None:
  86. return Error.EVENT_IS_NOT_EXISTS
  87. return {'response': {
  88. 'id': event[0],
  89. 'author': event[1],
  90. 'title': event[2],
  91. 'date': event[3]
  92. }}