-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfunctions.py
More file actions
142 lines (107 loc) · 4.81 KB
/
functions.py
File metadata and controls
142 lines (107 loc) · 4.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import os
import subprocess
from http.client import responses
import requests
import asyncio
from loguru import logger
from playwright.async_api import async_playwright
from playwright_stealth import stealth_async
from bs4 import BeautifulSoup
from config import SERPER_API_KEY
def save_code2(code: str, filename: str) -> str:
logger.debug(f"Создаю файл в директории {filename}")
try:
os.makedirs("./ai", exist_ok=True)
filepath = os.path.join("./ai", filename)
with open(filepath, "w", encoding="UTF-8") as f:
f.write(code)
return f"Файл {filename} успешно был создан"
except Exception as e:
logger.error(f"Какая-то ошибка: {str(e)}")
return f"Ошибка создания файла: {str(e)[:5000]}"
def save_code(code: str, filepath: str) -> str:
logger.debug(f"Создаю файл в директории {filepath}")
try:
# Если в пути нет слешей, используем "./ai" как базовую директорию
if os.path.sep not in filepath:
filepath = os.path.join("./ai", filepath)
# Создаем директорию по указанному пути
os.makedirs(os.path.dirname(filepath), exist_ok=True)
# Записываем код в файл
with open(filepath, "w", encoding="UTF-8") as f:
f.write(code)
return f"Файл {os.path.basename(filepath)} успешно был создан"
except Exception as e:
logger.error(f"Ошибка создания файла: {str(e)}")
return f"Ошибка создания файла: {str(e)[:5000]}"
def run_command(command: str, input_str: str | None = None) -> str:
logger.debug(f"Выполняю команду {command}, входные данные: {str(input_str)}")
os.makedirs("./ai", exist_ok=True)
try:
process = subprocess.Popen(
command,
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
cwd="./ai/"
)
if input_str:
stdout, stderr = process.communicate(input=input_str)
else:
stdout, stderr = process.communicate()
return stdout[:16000] or stderr[:16000]
except Exception as e:
logger.error(f"Ошибка при выполнении команды: {str(e)}")
return f"Ошибка при выполнении команды: {str(e)[:5000]}"
def search(query: str) -> str:
logger.info(f"Ищу в интернете {query}")
try:
response = requests.post(
"https://google.serper.dev/search",
headers={
"X-API-KEY": SERPER_API_KEY,
"Content-type": "application/json"
},
json={
"q": query
}
)
if response.status_code in [200, 201]:
data_json = response.json()
data = "\n".join(
f"{item['title']}: {item['snippet']}"
for item in data_json.get("organic", [])[:3]
)
return data
except Exception as e:
logger.error(f"Ошибка при поиски инфы в инете: {str(e)}")
return f"Ошибка при поиски инфы в инете: {str(e)[:5000]}"
async def fetch_page(url: str) -> str:
logger.info(f"Получаю исходный код страницы: {url}")
try:
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=False,
args=["--disable-blink-features=AutomationControleed"]
)
context = await browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.3",
viewport={"width": 1280, "height": 800}
)
page = await context.new_page()
await stealth_async(page)
await page.goto(url, wait_until="networkidle")
await asyncio.sleep(3)
page_content = await page.content()
await page.screenshot(path="screenshot.png")
soup = BeautifulSoup(page_content, "html.parser")
for tag in soup(["script", "style", "svg", "iframe"]):
tag.decompose()
for tag in soup.find_all(style=True):
del tag["style"]
return str(soup.body)[:26000] if soup.body else page_content[:26000]
except Exception as e:
logger.error(f"Ошибка при посещении веб сайта: {str(e)}")
return f"Ошибка при посещении веб сайта: {str(e)[:5000]}"