diff --git a/README.md b/README.md index f14f414..c462700 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,8 @@ This repo contains the samples for [Keploy's](https://keploy.io) integration wit 5. [Flask-Redis](https://github.com/keploy/samples-python/tree/main/flask-redis) - This Flask-based application provides a book management system utilizing Redis for caching and storage. It supports adding, retrieving, updating, and deleting book records, with optimized search functionality and cache management for improved performance. The API endpoints ensure efficient data handling and quick access to book information. +6. [FastAPI-Mongo](www.github.com/keploy/samples-python/tree/main/fastapi-mongo) - This application is for a Quiz Platform that generated Quizzes using LLMs. This project demonstrates how to use Keploy API Test Generator to automatically generate API tests for a FastAPI application that interacts with a MongoDB database. + ## Community Support ❤️ ### 🤔 Questions? diff --git a/fastapi-mongo/.gitignore b/fastapi-mongo/.gitignore new file mode 100644 index 0000000..503e3f5 --- /dev/null +++ b/fastapi-mongo/.gitignore @@ -0,0 +1,4 @@ +**/__pycache__/** +**/__init__.py/** +__init__.py +.DS_Store \ No newline at end of file diff --git a/fastapi-mongo/Dockerfile b/fastapi-mongo/Dockerfile new file mode 100644 index 0000000..ddd7fa5 --- /dev/null +++ b/fastapi-mongo/Dockerfile @@ -0,0 +1,32 @@ +# 1. Use a lightweight Python 3.10 image +FROM python:3.10-slim + +# 2. Set environment variables +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PYTHONUNBUFFERED=1 + +# 3. Set working directory +WORKDIR /app + +# 4. Install system dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential \ + gcc \ + libpq-dev \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# 5. Copy requirements.txt from the root folder +COPY ../requirements.txt . + +# 6. Install Python dependencies +RUN pip install --upgrade pip && pip install -r requirements.txt + +# 7. Copy everything from quizzly folder (backend code) +COPY . . + +# 8. Expose port +EXPOSE 8000 + +# 9. Run FastAPI app with uvicorn +CMD ["uvicorn", "quizzly.main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/fastapi-mongo/auth/jwt_handler.py b/fastapi-mongo/auth/jwt_handler.py new file mode 100644 index 0000000..8105f45 --- /dev/null +++ b/fastapi-mongo/auth/jwt_handler.py @@ -0,0 +1,20 @@ +from jose import jwt +from datetime import datetime, timedelta +from dotenv import load_dotenv +import os + +load_dotenv() + +SECRET_KEY = os.getenv("SECRET_KEY") +ALGORITHM = "HS256" + + +def create_access_token(data: dict, expires_delta: timedelta = timedelta(days=1)): + to_encode = data.copy() + expire = datetime.utcnow() + expires_delta + to_encode.update({"exp": expire}) + return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) + + +def decode_access_token(token: str): + return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) diff --git a/fastapi-mongo/core/config.py b/fastapi-mongo/core/config.py new file mode 100644 index 0000000..7904b4a --- /dev/null +++ b/fastapi-mongo/core/config.py @@ -0,0 +1,17 @@ +# app/core/config.py +from pydantic_settings import BaseSettings + + +class Settings(BaseSettings): + MONGODB_URI: str + SECRET_KEY: str + FRONTEND_URL: str + GEMINI_API_KEY: str + YOUTUBE_API_KEY: str + CORS_ORIGINS: str + + class Config: + env_file = ".env" + + +settings = Settings() diff --git a/fastapi-mongo/db/mongodb.py b/fastapi-mongo/db/mongodb.py new file mode 100644 index 0000000..1c7205e --- /dev/null +++ b/fastapi-mongo/db/mongodb.py @@ -0,0 +1,6 @@ +# app/db/mongodb.py +from motor.motor_asyncio import AsyncIOMotorClient +from quizzly.core.config import settings + +client = AsyncIOMotorClient(settings.MONGODB_URI) +db = client.quiz_app \ No newline at end of file diff --git a/fastapi-mongo/keploy-api-test-generator/README.md b/fastapi-mongo/keploy-api-test-generator/README.md new file mode 100644 index 0000000..514d7d2 --- /dev/null +++ b/fastapi-mongo/keploy-api-test-generator/README.md @@ -0,0 +1,9 @@ +### Keploy API Test Generator for FastAPI with MongoDB + +This project demonstrates how to use Keploy to automatically generate API tests for a FastAPI application that interacts with a MongoDB database. Keploy captures the API requests and responses, allowing you to create comprehensive test cases effortlessly. + +Keploy API Test Generator: https://keploy.io/docs/running-keploy/api-test-generator/ + +### About the Project + +This is FastAPI + MongoDB backend written for a Quiz Application. The application allows users to perform CRUD operations on quiz data stored in a MongoDB database. \ No newline at end of file diff --git a/fastapi-mongo/keploy-api-test-generator/Test-run-report.png b/fastapi-mongo/keploy-api-test-generator/Test-run-report.png new file mode 100644 index 0000000..77239f2 Binary files /dev/null and b/fastapi-mongo/keploy-api-test-generator/Test-run-report.png differ diff --git a/fastapi-mongo/keploy-api-test-generator/Test-suites.png b/fastapi-mongo/keploy-api-test-generator/Test-suites.png new file mode 100644 index 0000000..6c8baf9 Binary files /dev/null and b/fastapi-mongo/keploy-api-test-generator/Test-suites.png differ diff --git a/fastapi-mongo/main.py b/fastapi-mongo/main.py new file mode 100644 index 0000000..43e290c --- /dev/null +++ b/fastapi-mongo/main.py @@ -0,0 +1,28 @@ +# app/main.py +from fastapi import FastAPI +from quizzly.routes import parent, quiz, content +from fastapi.middleware.cors import CORSMiddleware +from fastapi.middleware.cors import CORSMiddleware +from quizzly.routes import parent, quiz +from quizzly.core.config import settings + + +app = FastAPI() +# Read and parse CORS origins +origins_str = settings.CORS_ORIGINS +origins = [origin.strip() for origin in origins_str.split(",") if origin.strip()] + +print(origins) + +# Enable CORS +app.add_middleware( + CORSMiddleware, + allow_origins=origins, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +app.include_router(parent.router, prefix="/parent", tags=["Parent"]) +app.include_router(quiz.router, prefix="/quiz", tags=["Quiz"]) +app.include_router(content.router, prefix="/content", tags=["Content"]) diff --git a/fastapi-mongo/models/parent.py b/fastapi-mongo/models/parent.py new file mode 100644 index 0000000..92474a5 --- /dev/null +++ b/fastapi-mongo/models/parent.py @@ -0,0 +1,17 @@ +# app/models/parent.py +from pydantic import BaseModel, Field +from typing import Optional +from bson import ObjectId + + +class ParentCreate(BaseModel): + username: str + password: str + name: Optional[str] + + +class Parent(BaseModel): + id: Optional[str] = Field(default_factory=lambda: str(ObjectId()), alias="_id") + name: str + username: str + password: str diff --git a/fastapi-mongo/models/quiz.py b/fastapi-mongo/models/quiz.py new file mode 100644 index 0000000..f8797a7 --- /dev/null +++ b/fastapi-mongo/models/quiz.py @@ -0,0 +1,82 @@ +# app/models/quiz.py + +from pydantic import BaseModel, Field, HttpUrl +from typing import List, Optional +from bson import ObjectId +from datetime import datetime +import pytz + +IST = pytz.timezone("Asia/Kolkata") + + +class EndQuizRequest(BaseModel): + name: str + + +class QuizStart(BaseModel): + password: int + name: str + + +class AnswerSubmission(BaseModel): + name: str + answers: List[str] + + +# Question model to represent individual question details +class Question(BaseModel): + question: str + choice_A: str + choice_B: str + choice_C: str + choice_D: str + answer: str # Should be one of "A", "B", "C", or "D" + is_correct: Optional[bool] = False + + +# QuizCreate model – input from client +class QuizCreate(BaseModel): + name: str + subject: str + num_questions: int + # trigger_link: HttpUrl + # questions: List[Question] + topic: str + difficulty_level: int + + +class QuestionUpdate(BaseModel): + question: str + choice_A: str + choice_B: str + choice_C: str + choice_D: str + answer: str + is_correct: bool + + +# Main Quiz model – includes fields auto-populated by backend +class Quiz(BaseModel): + id: str = Field(default_factory=lambda: str(ObjectId()), alias="_id") + name: str + subject: str + created_by: str + created_at: datetime = Field(default_factory=datetime.now(IST)) + metadata_fields: Optional[dict] = Field(default_factory=dict) + trigger_link: HttpUrl + taken_by: List[str] + num_questions: int + questions: List[Question] + user_responses: List[dict] + is_started: bool = False + start_time: Optional[datetime] = None + end_time: Optional[datetime] = None + exec_time: Optional[float] = None # in seconds + topic: str + difficulty_level: int + password: int + is_executed: bool = False + + class Config: + json_encoders = {ObjectId: str, datetime: lambda v: v.isoformat()} + populate_by_name = True diff --git a/fastapi-mongo/routes/content.py b/fastapi-mongo/routes/content.py new file mode 100644 index 0000000..3a83e6a --- /dev/null +++ b/fastapi-mongo/routes/content.py @@ -0,0 +1,73 @@ +from fastapi import APIRouter, HTTPException +from typing import List +import json +from googleapiclient.discovery import build +from duckduckgo_search import DDGS +from quizzly.core.config import settings +import os +from pydantic import BaseModel + +router = APIRouter() + +class TopicsRequest(BaseModel): + topics: List[str] + num_results: int = 5 + +@router.post("/articles") +async def get_articles(request: TopicsRequest): + """ + Get articles based on provided topics using DuckDuckGo search + """ + try: + results = {} + with DDGS() as ddgs: + for topic in request.topics: + search_results = ddgs.text(topic, max_results=request.num_results) + articles = [] + for res in search_results: + title = res.get('title') + url = res.get('href') + if title and url: + articles.append({'title': title, 'url': url}) + results[topic] = articles + return results + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@router.post("/youtube") +async def get_youtube_videos(request: TopicsRequest): + """ + Get YouTube videos with thumbnails based on provided topics + """ + if not settings.YOUTUBE_API_KEY: + raise HTTPException(status_code=500, detail="YouTube API key not configured") + + try: + youtube = build('youtube', 'v3', developerKey=settings.YOUTUBE_API_KEY) + results = {} + + for topic in request.topics: + search_response = youtube.search().list( + q=topic, + part='snippet', + type='video', + maxResults=request.num_results + ).execute() + + youtube_data = [] + for item in search_response['items']: + video_title = item['snippet']['title'] + video_url = f"https://www.youtube.com/watch?v={item['id']['videoId']}" + video_id = item['id']['videoId'] + thumbnail_url = item['snippet']['thumbnails']['high']['url'] + youtube_data.append({ + 'title': video_title, + 'video_url': video_url, + 'thumbnail_url': thumbnail_url + }) + + results[topic] = youtube_data + + return results + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) diff --git a/fastapi-mongo/routes/parent.py b/fastapi-mongo/routes/parent.py new file mode 100644 index 0000000..26a4dbb --- /dev/null +++ b/fastapi-mongo/routes/parent.py @@ -0,0 +1,36 @@ +# app/routes/parent.py +import bcrypt +from fastapi import APIRouter, HTTPException +from quizzly.models.parent import Parent, ParentCreate +from quizzly.db.mongodb import db +from quizzly.utils.jwt import create_token +from pydantic import BaseModel + + +class LoginRequest(BaseModel): + username: str + password: str + + +router = APIRouter() + + +@router.post("/register") +async def register_parent(data: ParentCreate): + parent_data = Parent(**data.dict()) + parent_data.password = bcrypt.hashpw(parent_data.password.encode(), bcrypt.gensalt()).decode() + await db.parents.insert_one(parent_data.model_dump(by_alias=True)) + parent = await db.parents.find_one({"username": parent_data.username}) + token = create_token({"id": str(parent["_id"])}) + return {"message": "Parent registered successfully", "access_token": token} + + +@router.post("/login") +async def login(data: LoginRequest): + parent = await db.parents.find_one({"username": data.username}) + if not parent: + raise HTTPException(status_code=404, detail="User not found") + if not bcrypt.checkpw(data.password.encode(), parent["password"].encode()): + raise HTTPException(status_code=401, detail="Invalid password") + token = create_token({"id": str(parent["_id"])}) + return {"access_token": token} diff --git a/fastapi-mongo/routes/quiz.py b/fastapi-mongo/routes/quiz.py new file mode 100644 index 0000000..c491336 --- /dev/null +++ b/fastapi-mongo/routes/quiz.py @@ -0,0 +1,346 @@ +# app/routes/quiz.py +import pytz +from datetime import datetime +from fastapi import APIRouter, Depends, HTTPException +from quizzly.models.quiz import ( + Quiz, + QuizCreate, + QuizStart, + AnswerSubmission, + EndQuizRequest, + QuestionUpdate +) +from quizzly.db.mongodb import db +from quizzly.utils.jwt import get_current_user +from quizzly.utils.gemini import generate_questions +from bson import ObjectId +import random +from quizzly.core.config import settings + +IST = pytz.timezone("Asia/Kolkata") +router = APIRouter() + + +@router.post("/quizzes/create", response_model=Quiz) +async def create_quiz(data: QuizCreate, current_user: dict = Depends(get_current_user)): + frontend_url = settings.FRONTEND_URL + quiz_id = str(ObjectId()) + password = random.randint(1000, 9999) + num_questions = data.num_questions + subject = data.subject + topic = data.topic + difficulty_level = data.difficulty_level + questions = generate_questions(subject, topic, num_questions, difficulty_level) + if not questions: + raise HTTPException(status_code=500, detail="Failed to generate questions") + quiz_doc = { + "_id": quiz_id, + "name": data.name, + "subject": data.subject, + "trigger_link": f"{frontend_url}/take-quiz/{quiz_id}", + "num_questions": num_questions, + "questions": questions, + "user_responses": [], + "taken_by": [], + "topic": data.topic, + "difficulty_level": data.difficulty_level, + "created_by": current_user["id"], + "created_at": datetime.now(IST).replace(tzinfo=None), + "start_time": None, + "end_time": None, + "exec_time": None, + "is_started": False, + "is_executed": False, + "metadata_fields": {}, + "password": password + } + await db.quizzes.insert_one(quiz_doc) + return quiz_doc + + +@router.get("/quizzes") +async def get_quizzes(): + quizzes = await db.quizzes.find().to_list(100) + for quiz in quizzes: + quiz["_id"] = str(quiz["_id"]) + return quizzes + + +@router.get("/quizzes/{quiz_id}") +async def get_quiz(quiz_id: str): + quiz = await db.quizzes.find_one({"_id": quiz_id}) + if not quiz: + raise HTTPException(status_code=404, detail="Quiz not found") + return quiz + + +@router.put("/quizzes/{quiz_id}") +async def update_quiz(quiz_id: str, quiz: Quiz): + await db.quizzes.update_one({"_id": quiz_id}, {"$set": quiz.dict(by_alias=True)}) + return await db.quizzes.find_one({"_id": quiz_id}) + + +@router.delete("/quizzes/{quiz_id}") +async def delete_quiz(quiz_id: str): + await db.quizzes.delete_one({"_id": quiz_id}) + return {"msg": "Quiz deleted"} + + +@router.put("/quizzes/{quiz_id}/questions/{index}") +async def update_question(quiz_id: str, index: int, question: QuestionUpdate, current_user: dict = Depends(get_current_user)): + # Find the quiz + quiz = await db.quizzes.find_one({"_id": quiz_id}) + + # Check if quiz exists + if not quiz: + raise HTTPException(status_code=404, detail="Quiz not found") + + # Check if index is valid + if index < 0 or index >= len(quiz["questions"]): + raise HTTPException(status_code=404, detail="Question index out of range") + + # Check if user is the creator of the quiz + if quiz["created_by"] != current_user["id"]: + raise HTTPException(status_code=403, detail="You don't have permission to update this quiz") + + # Update question + question_data = question.dict(exclude_unset=True) + quiz["questions"][index].update(question_data) + + # Save updated quiz + await db.quizzes.update_one( + {"_id": quiz_id}, + {"$set": {"questions": quiz["questions"]}} + ) + + return {"message": "Question updated successfully", "quiz": quiz} + + +@router.post("/quizzes/start/{quiz_id}") +async def start_quiz(quiz_id: str, request: QuizStart): + quiz = await db.quizzes.find_one({"_id": quiz_id}) + if not quiz: + raise HTTPException(status_code=404, detail="Quiz not found") + + if quiz.get("password") != request.password: + raise HTTPException(status_code=401, detail="Invalid password") + + if quiz.get("is_started", False): + return {"message": "Quiz already started", "quiz": quiz} + + start_time = datetime.now(pytz.timezone("Asia/Kolkata")).replace(tzinfo=None) + await db.quizzes.update_one( + {"_id": quiz_id}, + { + "$set": { + "is_started": True, + "start_time": start_time, + "is_executed": False + }, + "$addToSet": {"taken_by": request.name} + } + ) + + quiz["start_time"] = start_time + quiz["is_started"] = True + return {"message": "Quiz started", "quiz": quiz} + + +@router.post("/quizzes/{quiz_id}/submit_answers") +async def submit_answers( + quiz_id: str, + submission: AnswerSubmission +): + quiz = await db.quizzes.find_one({"_id": quiz_id}) + if not quiz: + raise HTTPException(status_code=404, detail="Quiz not found") + + if not quiz.get("is_started", False): + raise HTTPException(status_code=400, detail="Quiz not started yet") + + if submission.name not in quiz.get("taken_by", []): + raise HTTPException(status_code=403, detail="User not registered for this quiz") + + questions = quiz["questions"] + if len(submission.answers) != len(questions): + raise HTTPException(status_code=400, detail="Number of answers does not match questions") + + # Check if user has already submitted + if any(resp["name"] == submission.name for resp in quiz.get("user_responses", [])): + raise HTTPException(status_code=400, detail="Answers already submitted") + + # Evaluate answers + evaluated_answers = [] + score = 0 + for i, user_ans in enumerate(submission.answers): + correct = questions[i]["answer"] == user_ans + evaluated_answers.append({ + "question_index": i, + "answer": user_ans, + "is_correct": correct + }) + if correct: + score += 1 + + # Calculate time taken + end_time = datetime.now(IST).replace(tzinfo=None) + start_time = quiz.get("start_time") + exec_time = (end_time - start_time).total_seconds() if start_time else 0 + exec_time = round(exec_time, 2) + + user_response = { + "name": submission.name, + "answers": evaluated_answers, + "score": score + } + + await db.quizzes.update_one( + {"_id": quiz_id}, + { + "$push": {"user_responses": user_response}, + "$set": { + "is_executed": True, + "is_started": False, + "end_time": end_time, + "exec_time": exec_time + } + } + ) + + return { + "message": "Answers submitted", + "score": score, + "total": len(questions) + } + + +@router.post("/quizzes/{quiz_id}/end") +async def end_quiz(quiz_id: str, request: EndQuizRequest): + quiz = await db.quizzes.find_one({"_id": quiz_id}) + if not quiz: + raise HTTPException(status_code=404, detail="Quiz not found") + + if not quiz.get("is_started", False): + raise HTTPException(status_code=400, detail="Quiz has not started yet") + + if quiz.get("is_executed", False): + raise HTTPException(status_code=400, detail="Quiz already ended") + + user_responses = quiz.get("user_responses", []) + user_scores = [{"name": user.get("name"), "score": user.get("score", 0)} for user in user_responses ] + + user_response_entry = next( + (resp for resp in user_responses if resp["name"] == request.name), + None + ) + + if not user_response_entry: + raise HTTPException(status_code=400, detail="No responses found for this user") + + questions = quiz.get("questions", []) + wrong_questions = [] + right_questions = [] + + for ans in user_response_entry["answers"]: + q_idx = ans["question_index"] + question = questions[q_idx].get("question") + correct_answer = questions[q_idx].get("answer") + entry = { + "question_index": q_idx, + "question": question, + "your_answer": ans.get("answer"), + "correct_answer": correct_answer + } + if ans.get("is_correct"): + right_questions.append(entry) + else: + wrong_questions.append(entry) + + score = len(right_questions) + num_wrong = len(wrong_questions) + end_time = datetime.now(pytz.timezone("Asia/Kolkata")).replace(tzinfo=None) + start_time = quiz.get("start_time") + exec_time = (end_time - start_time).total_seconds() if start_time else 0 + exec_time = round(exec_time, 2) + + await db.quizzes.update_one( + {"_id": quiz_id, "user_responses.name": request.name}, + { + "$set": { + "is_executed": True, + "is_started": False, + "end_time": end_time, + "exec_time": exec_time, + "user_responses.$.score": score + } + } + ) + + return { + "message": "Quiz ended", + "user_scores": user_scores, + "exec_time": exec_time, + "score": score, + "number_of_wrong_answers": num_wrong, + "right_questions": right_questions, + "wrong_questions": wrong_questions + } + + +@router.get("/quizzes/{quiz_id}/leaderboard") +async def get_leaderboard(quiz_id: str): + quiz = await db.quizzes.find_one({"_id": quiz_id}) + if not quiz: + raise HTTPException(status_code=404, detail="Quiz not found") + + user_responses = quiz.get("user_responses", []) + total_questions = len(quiz.get("questions", [])) + + leaderboard_data = [] + for response in user_responses: + score = response.get("score", 0) + percentage = round((score / total_questions) * 100) if total_questions > 0 else 0 + answers = response.get("answers", []) + + # Calculate correct and incorrect answers + correct_answers = sum(1 for ans in answers if ans.get("is_correct", False)) + incorrect_answers = len(answers) - correct_answers + + # Calculate time taken + time_taken = "N/A" + exec_time = quiz.get("exec_time") + if exec_time is not None: + minutes = int(exec_time // 60) + seconds = int(exec_time % 60) + time_taken = f"{minutes}m {seconds}s" + else: + end_time = quiz.get("end_time") + start_time = quiz.get("start_time") + if end_time is not None and start_time is not None: + time_diff = (end_time - start_time).total_seconds() + minutes = int(time_diff // 60) + seconds = int(time_diff % 60) + time_taken = f"{minutes}m {seconds}s" + + # Handle end_time properly + end_time = quiz.get("end_time") + if end_time is None: + end_time = datetime.now(IST).replace(tzinfo=None) + else: + end_time = end_time.replace(tzinfo=None) + + leaderboard_data.append({ + "id": str(ObjectId()), # Generate a unique ID for each entry + "name": response.get("name", "Anonymous"), + "score": score, + "percentage": percentage, + "correctAnswers": correct_answers, + "incorrectAnswers": incorrect_answers, + "timeTaken": time_taken, + "attemptedAt": end_time.isoformat(), + "answers": answers # Include detailed answer information + }) + + # Sort by score (highest first), then by time taken (fastest first) + leaderboard_data.sort(key=lambda x: (-x["score"], x["timeTaken"])) + return leaderboard_data diff --git a/fastapi-mongo/utils/gemini.py b/fastapi-mongo/utils/gemini.py new file mode 100644 index 0000000..0a678db --- /dev/null +++ b/fastapi-mongo/utils/gemini.py @@ -0,0 +1,70 @@ +import requests +import json +import re +from quizzly.core.config import settings + +GEMINI_API_KEY = settings.GEMINI_API_KEY + + +def generate_questions(subject: str, topic: str, num_questions: int, difficulty_level: int): + PROMPT_TEMPLATE = f""" + You are a teaching assistant tasked with creating {num_questions} Multiple choice questions on the subject {subject} and topic {topic} with 4 choices (A,B,C,D) in the format:\n + "questions": [ + {{ + "question": "What is the output of print(type([]))?", + "choice_A": "", + "choice_B": "", + "choice_C": "", + "choice_D": "", + "answer": "A", + "is_correct": false + }}, + {{ + "question": "Which keyword is used to create a function in Python?", + "choice_A": "func", + "choice_B": "def", + "choice_C": "function", + "choice_D": "lambda", + "answer": "B", + "is_correct": false + }} + ] + Keep the difficulty according to {difficulty_level} level, where level 1 -> class 1 student should answer them and level -> 10 should be class 10 student should answer them. + I want {num_questions} generated questions on the topic {topic} only and they should strictly be in the format as shown in the example above (as a list of dictionaries). The default value of is_correct should be false. Only include the questions in your answer, nothing else. + """ + url = "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key=" + GEMINI_API_KEY + + payload = { + "contents": [ + { + "parts": [ + { + "text": PROMPT_TEMPLATE + } + ] + } + ] + } + + # Headers to define content type and API key + headers = { + "Content-Type": "application/json" + } + + try: + response = requests.post(url, headers=headers, json=payload) + except Exception as e: + raise e + + if response.status_code == 200: + resp = extract_questions_from_gemini_response(response.content) + return resp + + +def extract_questions_from_gemini_response(response_content: bytes): + response_str = response_content.decode('utf-8') + response_json = json.loads(response_str) + model_text = response_json['candidates'][0]['content']['parts'][0]['text'] + model_text = re.sub(r'^```json|```$', '', model_text.strip(), flags=re.MULTILINE).strip("`") + parsed_output = json.loads(model_text) + return parsed_output.get("questions", []) diff --git a/fastapi-mongo/utils/jwt.py b/fastapi-mongo/utils/jwt.py new file mode 100644 index 0000000..f114f19 --- /dev/null +++ b/fastapi-mongo/utils/jwt.py @@ -0,0 +1,44 @@ +from datetime import datetime, timezone, timedelta +from jose import JWTError, jwt +from typing import Optional +from quizzly.core.config import settings + +SECRET_KEY = settings.SECRET_KEY +ALGORITHM = "HS256" +ACCESS_TOKEN_EXPIRE_MINUTES = 300 + + +def create_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: + to_encode = data.copy() + if expires_delta: + expire = datetime.utcnow() + expires_delta + else: + expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) + to_encode.update({"exp": expire}) + encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) + return encoded_jwt + + +def verify_token(token: str) -> dict: + try: + payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) + exp = datetime.fromtimestamp(payload["exp"], tz=timezone.utc) + return payload if exp >= datetime.now(timezone.utc) else None + except JWTError: + return None + + +from fastapi import Depends, HTTPException, status +from fastapi.security import OAuth2PasswordBearer + +oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") + +def get_current_user(token: str = Depends(oauth2_scheme)) -> dict: + user = verify_token(token) + if user is None: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid or expired token", + headers={"WWW-Authenticate": "Bearer"}, + ) + return user