app.py 88 KB


  1. import binascii
  2. import json
  3. import logging
  4. import mimetypes
  5. import os
  6. import traceback
  7. import urllib
  8. from datetime import datetime
  9. from datetime import timedelta
  10. from datetime import timezone
  11. from functools import wraps
  12. from io import BytesIO
  13. from typing import Any
  14. from typing import Dict
  15. from typing import Optional
  16. from typing import Tuple
  17. from urllib.parse import urlencode
  18. from urllib.parse import urlparse
  19. from requests.exceptions import HTTPError
  20. import requests
  21. import bleach
  22. import mf2py
  23. import pymongo
  24. import timeago
  25. from bson.objectid import ObjectId
  26. from dateutil import parser
  27. from flask import Flask
  28. from flask import make_response
  29. from flask import Response
  30. from flask import abort
  31. from flask import jsonify as flask_jsonify
  32. from flask import redirect
  33. from flask import render_template
  34. from flask import request
  35. from flask import session
  36. from flask import url_for
  37. from flask_wtf.csrf import CSRFProtect
  38. from html2text import html2text
  39. from itsdangerous import BadSignature
  40. from little_boxes import activitypub as ap
  41. from little_boxes.activitypub import ActivityType
  42. from little_boxes.activitypub import _to_list
  43. from little_boxes.activitypub import clean_activity
  44. from little_boxes.activitypub import format_datetime
  45. from little_boxes.activitypub import get_backend
  46. from little_boxes.content_helper import parse_markdown
  47. from little_boxes.linked_data_sig import generate_signature
  48. from little_boxes.errors import ActivityGoneError
  49. from little_boxes.errors import NotAnActivityError
  50. from little_boxes.errors import BadActivityError
  51. from little_boxes.errors import ActivityNotFoundError
  52. from little_boxes.errors import Error
  53. from little_boxes.errors import NotFromOutboxError
  54. from little_boxes.httpsig import HTTPSigAuth
  55. from little_boxes.httpsig import verify_request
  56. from little_boxes.webfinger import get_actor_url
  57. from little_boxes.webfinger import get_remote_follow_template
  58. from utils import opengraph
  59. from passlib.hash import bcrypt
  60. from u2flib_server import u2f
  61. from werkzeug.utils import secure_filename
  62. import activitypub
  63. import config
  64. from activitypub import Box
  65. from activitypub import embed_collection
  66. from activitypub import _answer_key
  67. from config import USER_AGENT
  68. from config import ADMIN_API_KEY
  69. from config import BASE_URL
  70. from config import DB
  71. from config import DEBUG_MODE
  72. from config import DOMAIN
  73. from config import EMOJIS
  74. from config import HEADERS
  75. from config import ICON_URL
  76. from config import ID
  77. from config import JWT
  78. from config import KEY
  79. from config import ME
  80. from config import MEDIA_CACHE
  81. from config import PASS
  82. from config import USERNAME
  83. from config import VERSION
  84. from config import PUBLIC_DOMAIN
  85. from config import ACTOR_URL
  86. from config import PORT
  87. from config import PREVIEW_LIMIT
  88. from config import _drop_db
  89. from utils.key import get_secret_key
  90. from utils.lookup import lookup
  91. from utils.media import Kind
  92. from poussetaches import PousseTaches
  93. p = PousseTaches(
  94. os.getenv("MICROBLOGPUB_POUSSETACHES_HOST", "http://localhost:7991"),f"http://localhost:{PORT}",
  95. )
  96. # p = PousseTaches("http://localhost:7991", "http://localhost:5000")
  97. back = activitypub.MicroblogPubBackend()
  98. ap.use_backend(back)
  99. MY_PERSON = ap.Person(**ME)
  100. app = Flask(__name__)
  101. app.secret_key = get_secret_key("flask")
  102. app.config.update(WTF_CSRF_CHECK_DEFAULT=False)
  103. csrf = CSRFProtect(app)
  104. logger = logging.getLogger(__name__)
  105. # Hook up Flask logging with gunicorn
  106. root_logger = logging.getLogger()
  107. if os.getenv("FLASK_DEBUG"):
  108. logger.setLevel(logging.DEBUG)
  109. root_logger.setLevel(logging.DEBUG)
  110. else:
  111. gunicorn_logger = logging.getLogger("gunicorn.error")
  112. root_logger.handlers = gunicorn_logger.handlers
  113. root_logger.setLevel(gunicorn_logger.level)
  114. SIG_AUTH = HTTPSigAuth(KEY)
  115. def verify_pass(pwd):
  116. return bcrypt.verify(pwd, PASS)
  117. @app.context_processor
  118. def inject_config():
  119. q = {
  120. "type": "Create",
  121. "activity.object.inReplyTo": None,
  122. "meta.deleted": False,
  123. "meta.public": True,
  124. }
  125. notes_count = DB.activities.find(
  126. {"box": Box.OUTBOX.value, "$or": [q, {"type": "Announce", "meta.undo": False}]}
  127. ).count()
  128. with_replies_count = DB.activities.find(
  129. {
  130. "box": Box.OUTBOX.value,
  131. "type": {"$in": [ActivityType.CREATE.value, ActivityType.ANNOUNCE.value]},
  132. "meta.undo": False,
  133. "meta.deleted": False,
  134. "meta.public": True,
  135. }
  136. ).count()
  137. liked_count = DB.activities.count(
  138. {
  139. "box": Box.OUTBOX.value,
  140. "meta.deleted": False,
  141. "meta.undo": False,
  142. "type": ActivityType.LIKE.value,
  143. }
  144. )
  145. followers_q = {
  146. "box": Box.INBOX.value,
  147. "type": ActivityType.FOLLOW.value,
  148. "meta.undo": False,
  149. }
  150. following_q = {
  151. "box": Box.OUTBOX.value,
  152. "type": ActivityType.FOLLOW.value,
  153. "meta.undo": False,
  154. }
  155. return dict(
  156. microblogpub_version=VERSION,
  157. config=config,
  158. logged_in=session.get("logged_in", False),
  159. followers_count=DB.activities.count(followers_q),
  160. following_count=DB.activities.count(following_q),
  161. notes_count=notes_count,
  162. liked_count=liked_count,
  163. with_replies_count=with_replies_count,
  164. me=ME,
  165. base_url=config.BASE_URL,
  166. )
  167. @app.after_request
  168. def set_x_powered_by(response):
  169. response.headers["X-Powered-By"] = "microblog.pub"
  170. return response
  171. # HTML/templates helper
  172. ALLOWED_TAGS = [
  173. "a",
  174. "abbr",
  175. "acronym",
  176. "b",
  177. "br",
  178. "blockquote",
  179. "code",
  180. "pre",
  181. "em",
  182. "i",
  183. "li",
  184. "ol",
  185. "strong",
  186. "ul",
  187. "span",
  188. "div",
  189. "p",
  190. "h1",
  191. "h2",
  192. "h3",
  193. "h4",
  194. "h5",
  195. "h6",
  196. ]
  197. def clean_html(html):
  198. try:
  199. return bleach.clean(html, tags=ALLOWED_TAGS)
  200. except Exception:
  201. return ""
  202. _GRIDFS_CACHE: Dict[Tuple[Kind, str, Optional[int]], str] = {}
  203. def _get_file_url(url, size, kind):
  204. k = (kind, url, size)
  205. cached = _GRIDFS_CACHE.get(k)
  206. if cached:
  207. return cached
  208. doc = MEDIA_CACHE.get_file(url, size, kind)
  209. if doc:
  210. u = f"/media/{str(doc._id)}"
  211. _GRIDFS_CACHE[k] = u
  212. return u
  213. # MEDIA_CACHE.cache(url, kind)
  214. app.logger.error(f"cache not available for {url}/{size}/{kind}")
  215. return url
  216. @app.template_filter()
  217. def gtone(n):
  218. return n > 1
  219. @app.template_filter()
  220. def gtnow(dtstr):
  221. return format_datetime(datetime.now().astimezone()) > dtstr
  222. @app.template_filter()
  223. def remove_mongo_id(dat):
  224. if isinstance(dat, list):
  225. return [remove_mongo_id(item) for item in dat]
  226. if "_id" in dat:
  227. dat["_id"] = str(dat["_id"])
  228. for k, v in dat.items():
  229. if isinstance(v, dict):
  230. dat[k] = remove_mongo_id(dat[k])
  231. return dat
  232. @app.template_filter()
  233. def get_video_link(data):
  234. for link in data:
  235. if link.get("mimeType", "").startswith("video/"):
  236. return link.get("href")
  237. return None
  238. @app.template_filter()
  239. def get_actor_icon_url(url, size):
  240. return _get_file_url(url, size, Kind.ACTOR_ICON)
  241. @app.template_filter()
  242. def get_attachment_url(url, size):
  243. return _get_file_url(url, size, Kind.ATTACHMENT)
  244. @app.template_filter()
  245. def get_og_image_url(url, size=100):
  246. try:
  247. return _get_file_url(url, size, Kind.OG_IMAGE)
  248. except Exception:
  249. return ""
  250. @app.template_filter()
  251. def permalink_id(val):
  252. return str(hash(val))
  253. @app.template_filter()
  254. def quote_plus(t):
  255. return urllib.parse.quote_plus(t)
  256. @app.template_filter()
  257. def is_from_outbox(t):
  258. return t.startswith(ID)
  259. @app.template_filter()
  260. def clean(html):
  261. return clean_html(html)
  262. @app.template_filter()
  263. def html2plaintext(body):
  264. return html2text(body)
  265. @app.template_filter()
  266. def domain(url):
  267. return urlparse(url).netloc
  268. @app.template_filter()
  269. def url_or_id(d):
  270. if isinstance(d, dict):
  271. if ("url" in d) and isinstance(d["url"], str):
  272. return d["url"]
  273. else:
  274. return d["id"]
  275. return ""
  276. @app.template_filter()
  277. def get_url(u):
  278. print(f"GET_URL({u!r})")
  279. if isinstance(u, list):
  280. for l in u:
  281. if l.get("mimeType") == "text/html":
  282. u = l
  283. if isinstance(u, dict):
  284. return u["href"]
  285. elif isinstance(u, str):
  286. return u
  287. else:
  288. return u
  289. @app.template_filter()
  290. def get_actor(url):
  291. if not url:
  292. return None
  293. if isinstance(url, list):
  294. url = url[0]
  295. if isinstance(url, dict):
  296. url = url.get("id")
  297. print(f"GET_ACTOR {url}")
  298. try:
  299. return get_backend().fetch_iri(url)
  300. except (ActivityNotFoundError, ActivityGoneError):
  301. return f"Deleted<{url}>"
  302. except Exception as exc:
  303. return f"Error<{url}/{exc!r}>"
  304. @app.template_filter()
  305. def format_time(val):
  306. if val:
  307. dt = parser.parse(val)
  308. return datetime.strftime(dt, "%B %d, %Y, %H:%M %p")
  309. return val
  310. @app.template_filter()
  311. def format_timeago(val):
  312. if val:
  313. dt = parser.parse(val)
  314. return timeago.format(dt, datetime.now(timezone.utc))
  315. return val
  316. @app.template_filter()
  317. def has_type(doc, _types):
  318. for _type in _to_list(_types):
  319. if _type in _to_list(doc["type"]):
  320. return True
  321. return False
  322. @app.template_filter()
  323. def has_actor_type(doc):
  324. for t in ap.ACTOR_TYPES:
  325. if has_type(doc, t.value):
  326. return True
  327. return False
  328. def _is_img(filename):
  329. filename = filename.lower()
  330. if (
  331. filename.endswith(".png")
  332. or filename.endswith(".jpg")
  333. or filename.endswith(".jpeg")
  334. or filename.endswith(".gif")
  335. or filename.endswith(".svg")
  336. ):
  337. return True
  338. return False
  339. @app.template_filter()
  340. def not_only_imgs(attachment):
  341. for a in attachment:
  342. if isinstance(a, dict) and not _is_img(a["url"]):
  343. return True
  344. if isinstance(a, str) and not _is_img(a):
  345. return True
  346. return False
  347. @app.template_filter()
  348. def is_img(filename):
  349. return _is_img(filename)
  350. @app.template_filter()
  351. def get_answer_count(choice, meta):
  352. print(choice, meta)
  353. return meta.get("question_answers", {}).get(_answer_key(choice), 0)
  354. def add_response_headers(headers={}):
  355. """This decorator adds the headers passed in to the response"""
  356. def decorator(f):
  357. @wraps(f)
  358. def decorated_function(*args, **kwargs):
  359. resp = make_response(f(*args, **kwargs))
  360. h = resp.headers
  361. for header, value in headers.items():
  362. h[header] = value
  363. return resp
  364. return decorated_function
  365. return decorator
  366. def noindex(f):
  367. """This decorator passes X-Robots-Tag: noindex, nofollow"""
  368. return add_response_headers({"X-Robots-Tag": "noindex, nofollow"})(f)
  369. def login_required(f):
  370. @wraps(f)
  371. def decorated_function(*args, **kwargs):
  372. if not session.get("logged_in"):
  373. return redirect(url_for("admin_login", next=request.url))
  374. return f(*args, **kwargs)
  375. return decorated_function
  376. def _api_required():
  377. if session.get("logged_in"):
  378. if request.method not in ["GET", "HEAD"]:
  379. # If a standard API request is made with a "login session", it must havw a CSRF token
  380. csrf.protect()
  381. return
  382. # Token verification
  383. token = request.headers.get("Authorization", "").replace("Bearer ", "")
  384. if not token:
  385. # IndieAuth token
  386. token = request.form.get("access_token", "")
  387. # Will raise a BadSignature on bad auth
  388. payload = JWT.loads(token)
  389. logger.info(f"api call by {payload}")
  390. def api_required(f):
  391. @wraps(f)
  392. def decorated_function(*args, **kwargs):
  393. try:
  394. _api_required()
  395. except BadSignature:
  396. abort(401)
  397. return f(*args, **kwargs)
  398. return decorated_function
  399. def jsonify(**data):
  400. if "@context" not in data:
  401. data["@context"] = config.DEFAULT_CTX
  402. return Response(
  403. response=json.dumps(data),
  404. headers={
  405. "Content-Type": "application/json"
  406. if app.debug
  407. else "application/activity+json"
  408. },
  409. )
  410. def is_api_request():
  411. h = request.headers.get("Accept")
  412. if h is None:
  413. return False
  414. h = h.split(",")[0]
  415. if h in HEADERS or h == "application/json":
  416. return True
  417. return False
  418. @app.errorhandler(ValueError)
  419. def handle_value_error(error):
  420. logger.error(
  421. f"caught value error: {error!r}, {traceback.format_tb(error.__traceback__)}"
  422. )
  423. response = flask_jsonify(message=error.args[0])
  424. response.status_code = 400
  425. return response
  426. @app.errorhandler(Error)
  427. def handle_activitypub_error(error):
  428. logger.error(
  429. f"caught activitypub error {error!r}, {traceback.format_tb(error.__traceback__)}"
  430. )
  431. response = flask_jsonify(error.to_dict())
  432. response.status_code = error.status_code
  433. return response
  434. class TaskError(Exception):
  435. """Raised to log the error for poussetaches."""
  436. def __init__(self):
  437. self.message = traceback.format_exc()
  438. @app.errorhandler(TaskError)
  439. def handle_task_error(error):
  440. logger.error(
  441. f"caught activitypub error {error!r}, {traceback.format_tb(error.__traceback__)}"
  442. )
  443. response = flask_jsonify({"traceback": error.message})
  444. response.status_code = 500
  445. return response
  446. # @app.errorhandler(Exception)
  447. # def handle_other_error(error):
  448. # logger.error(
  449. # f"caught error {error!r}, {traceback.format_tb(error.__traceback__)}"
  450. # )
  451. # response = flask_jsonify({})
  452. # response.status_code = 500
  453. # return response
  454. # App routes
  455. ROBOTS_TXT = """User-agent: *
  456. Disallow: /login
  457. Disallow: /admin/
  458. Disallow: /static/
  459. Disallow: /media/
  460. Disallow: /uploads/"""
  461. @app.route("/robots.txt")
  462. def robots_txt():
  463. return Response(response=ROBOTS_TXT, headers={"Content-Type": "text/plain"})
  464. @app.route("/media/<media_id>")
  465. @noindex
  466. def serve_media(media_id):
  467. f = MEDIA_CACHE.fs.get(ObjectId(media_id))
  468. resp = app.response_class(f, direct_passthrough=True, mimetype=f.content_type)
  469. resp.headers.set("Content-Length", f.length)
  470. resp.headers.set("ETag", f.md5)
  471. resp.headers.set(
  472. "Last-Modified", f.uploadDate.strftime("%a, %d %b %Y %H:%M:%S GMT")
  473. )
  474. resp.headers.set("Cache-Control", "public,max-age=31536000,immutable")
  475. resp.headers.set("Content-Encoding", "gzip")
  476. return resp
  477. @app.route("/uploads/<oid>/<fname>")
  478. def serve_uploads(oid, fname):
  479. f = MEDIA_CACHE.fs.get(ObjectId(oid))
  480. resp = app.response_class(f, direct_passthrough=True, mimetype=f.content_type)
  481. resp.headers.set("Content-Length", f.length)
  482. resp.headers.set("ETag", f.md5)
  483. resp.headers.set(
  484. "Last-Modified", f.uploadDate.strftime("%a, %d %b %Y %H:%M:%S GMT")
  485. )
  486. resp.headers.set("Cache-Control", "public,max-age=31536000,immutable")
  487. resp.headers.set("Content-Encoding", "gzip")
  488. return resp
  489. #######
  490. # Login
  491. @app.route("/admin/logout")
  492. @login_required
  493. def admin_logout():
  494. session["logged_in"] = False
  495. return redirect("/")
  496. @app.route("/login", methods=["POST", "GET"])
  497. @noindex
  498. def admin_login():
  499. if session.get("logged_in") is True:
  500. return redirect(url_for("admin_notifications"))
  501. devices = [doc["device"] for doc in DB.u2f.find()]
  502. u2f_enabled = True if devices else False
  503. if request.method == "POST":
  504. csrf.protect()
  505. # 1. Check regular password login flow
  506. pwd = request.form.get("pass")
  507. if pwd:
  508. if verify_pass(pwd):
  509. session["logged_in"] = True
  510. return redirect(
  511. request.args.get("redirect") or url_for("admin_notifications")
  512. )
  513. else:
  514. abort(403)
  515. # 2. Check for U2F payload, if any
  516. elif devices:
  517. resp = json.loads(request.form.get("resp"))
  518. try:
  519. u2f.complete_authentication(session["challenge"], resp)
  520. except ValueError as exc:
  521. print("failed", exc)
  522. abort(403)
  523. return
  524. finally:
  525. session["challenge"] = None
  526. session["logged_in"] = True
  527. return redirect(
  528. request.args.get("redirect") or url_for("admin_notifications")
  529. )
  530. else:
  531. abort(401)
  532. payload = None
  533. if devices:
  534. payload = u2f.begin_authentication(ID, devices)
  535. session["challenge"] = payload
  536. return render_template("login.html", u2f_enabled=u2f_enabled, payload=payload)
  537. @app.route("/remote_follow", methods=["GET", "POST"])
  538. def remote_follow():
  539. if request.method == "GET":
  540. return render_template("remote_follow.html")
  541. csrf.protect()
  542. profile = request.form.get("profile")
  543. if not profile.startswith("@"):
  544. profile = f"@{profile}"
  545. return redirect(
  546. get_remote_follow_template(profile).format(uri=f"{USERNAME}@{DOMAIN}")
  547. )
  548. @app.route("/authorize_follow", methods=["GET", "POST"])
  549. @login_required
  550. def authorize_follow():
  551. if request.method == "GET":
  552. return render_template(
  553. "authorize_remote_follow.html", profile=request.args.get("profile")
  554. )
  555. actor = get_actor_url(request.form.get("profile"))
  556. if not actor:
  557. abort(500)
  558. q = {
  559. "box": Box.OUTBOX.value,
  560. "type": ActivityType.FOLLOW.value,
  561. "meta.undo": False,
  562. "activity.object": actor,
  563. }
  564. if DB.activities.count(q) > 0:
  565. return redirect("/following")
  566. follow = ap.Follow(actor=MY_PERSON.id, object=actor)
  567. post_to_outbox(follow)
  568. return redirect("/following")
  569. @app.route("/u2f/register", methods=["GET", "POST"])
  570. @login_required
  571. def u2f_register():
  572. # TODO(tsileo): ensure no duplicates
  573. if request.method == "GET":
  574. payload = u2f.begin_registration(ID)
  575. session["challenge"] = payload
  576. return render_template("u2f.html", payload=payload)
  577. else:
  578. resp = json.loads(request.form.get("resp"))
  579. device, device_cert = u2f.complete_registration(session["challenge"], resp)
  580. session["challenge"] = None
  581. DB.u2f.insert_one({"device": device, "cert": device_cert})
  582. session["logged_in"] = False
  583. return redirect("/login")
  584. #######
  585. # Activity pub routes
  586. @app.route("/drop_cache")
  587. @login_required
  588. def drop_cache():
  589. DB.actors.drop()
  590. return "Done"
  591. def paginated_query(db, q, limit=25, sort_key="_id"):
  592. older_than = newer_than = None
  593. query_sort = -1
  594. first_page = not request.args.get("older_than") and not request.args.get(
  595. "newer_than"
  596. )
  597. query_older_than = request.args.get("older_than")
  598. query_newer_than = request.args.get("newer_than")
  599. if query_older_than:
  600. q["_id"] = {"$lt": ObjectId(query_older_than)}
  601. elif query_newer_than:
  602. q["_id"] = {"$gt": ObjectId(query_newer_than)}
  603. query_sort = 1
  604. outbox_data = list(db.find(q, limit=limit + 1).sort(sort_key, query_sort))
  605. outbox_len = len(outbox_data)
  606. outbox_data = sorted(
  607. outbox_data[:limit], key=lambda x: str(x[sort_key]), reverse=True
  608. )
  609. if query_older_than:
  610. newer_than = str(outbox_data[0]["_id"])
  611. if outbox_len == limit + 1:
  612. older_than = str(outbox_data[-1]["_id"])
  613. elif query_newer_than:
  614. older_than = str(outbox_data[-1]["_id"])
  615. if outbox_len == limit + 1:
  616. newer_than = str(outbox_data[0]["_id"])
  617. elif first_page and outbox_len == limit + 1:
  618. older_than = str(outbox_data[-1]["_id"])
  619. return outbox_data, older_than, newer_than
  620. CACHING = True
  621. def _get_cached(type_="html", arg=None):
  622. if not CACHING:
  623. return None
  624. logged_in = session.get("logged_in")
  625. if not logged_in:
  626. cached = DB.cache2.find_one({"path": request.path, "type": type_, "arg": arg})
  627. if cached:
  628. app.logger.info("from cache")
  629. return cached["response_data"]
  630. return None
  631. def _cache(resp, type_="html", arg=None):
  632. if not CACHING:
  633. return None
  634. logged_in = session.get("logged_in")
  635. if not logged_in:
  636. DB.cache2.update_one(
  637. {"path": request.path, "type": type_, "arg": arg},
  638. {"$set": {"response_data": resp, "date": datetime.now(timezone.utc)}},
  639. upsert=True,
  640. )
  641. return None
  642. @app.route("/")
  643. def index():
  644. if is_api_request():
  645. return jsonify(**ME)
  646. cache_arg = (
  647. f"{request.args.get('older_than', '')}:{request.args.get('newer_than', '')}"
  648. )
  649. cached = _get_cached("html", cache_arg)
  650. if cached:
  651. return cached
  652. q = {
  653. "box": Box.OUTBOX.value,
  654. "type": {"$in": [ActivityType.CREATE.value, ActivityType.ANNOUNCE.value]},
  655. "activity.object.inReplyTo": None,
  656. "meta.deleted": False,
  657. "meta.undo": False,
  658. "$or": [{"meta.pinned": False}, {"meta.pinned": {"$exists": False}}],
  659. }
  660. print(list(DB.activities.find(q)))
  661. pinned = []
  662. # Only fetch the pinned notes if we're on the first page
  663. if not request.args.get("older_than") and not request.args.get("newer_than"):
  664. q_pinned = {
  665. "box": Box.OUTBOX.value,
  666. "type": ActivityType.CREATE.value,
  667. "meta.deleted": False,
  668. "meta.undo": False,
  669. "meta.pinned": True,
  670. }
  671. pinned = list(DB.activities.find(q_pinned))
  672. outbox_data, older_than, newer_than = paginated_query(
  673. DB.activities, q, limit=config.LIMIT
  674. )
  675. resp = render_template(
  676. "index.html",
  677. outbox_data=outbox_data,
  678. older_than=older_than,
  679. newer_than=newer_than,
  680. pinned=pinned,
  681. )
  682. _cache(resp, "html", cache_arg)
  683. return resp
  684. @app.route("/with_replies")
  685. @login_required
  686. def with_replies():
  687. q = {
  688. "box": Box.OUTBOX.value,
  689. "type": {"$in": [ActivityType.CREATE.value, ActivityType.ANNOUNCE.value]},
  690. "meta.deleted": False,
  691. "meta.public": True,
  692. "meta.undo": False,
  693. }
  694. outbox_data, older_than, newer_than = paginated_query(DB.activities, q, limit=config.LIMIT)
  695. return render_template(
  696. "index.html",
  697. outbox_data=outbox_data,
  698. older_than=older_than,
  699. newer_than=newer_than,
  700. )
  701. def _build_thread(data, include_children=True): # noqa: C901
  702. data["_requested"] = True
  703. app.logger.info(f"_build_thread({data!r})")
  704. root_id = data["meta"].get("thread_root_parent", data["activity"]["object"]["id"])
  705. query = {
  706. "$or": [{"meta.thread_root_parent": root_id}, {"activity.object.id": root_id}],
  707. "meta.deleted": False,
  708. }
  709. replies = [data]
  710. for dat in DB.activities.find(query):
  711. if dat["type"][0] == ActivityType.CREATE.value:
  712. replies.append(dat)
  713. else:
  714. # Make a Note/Question/... looks like a Create
  715. dat = {
  716. "activity": {"object": dat["activity"]},
  717. "meta": dat["meta"],
  718. "_id": dat["_id"],
  719. }
  720. replies.append(dat)
  721. replies = sorted(replies, key=lambda d: d["activity"]["object"]["published"])
  722. # Index all the IDs in order to build a tree
  723. idx = {}
  724. replies2 = []
  725. for rep in replies:
  726. rep_id = rep["activity"]["object"]["id"]
  727. if rep_id in idx:
  728. continue
  729. idx[rep_id] = rep.copy()
  730. idx[rep_id]["_nodes"] = []
  731. replies2.append(rep)
  732. # Build the tree
  733. for rep in replies2:
  734. rep_id = rep["activity"]["object"]["id"]
  735. if rep_id == root_id:
  736. continue
  737. reply_of = ap._get_id(rep["activity"]["object"]["inReplyTo"])
  738. try:
  739. idx[reply_of]["_nodes"].append(rep)
  740. except KeyError:
  741. app.logger.info(f"{reply_of} is not there! skipping {rep}")
  742. # Flatten the tree
  743. thread = []
  744. def _flatten(node, level=0):
  745. node["_level"] = level
  746. thread.append(node)
  747. for snode in sorted(
  748. idx[node["activity"]["object"]["id"]]["_nodes"],
  749. key=lambda d: d["activity"]["object"]["published"],
  750. ):
  751. _flatten(snode, level=level + 1)
  752. try:
  753. _flatten(idx[root_id])
  754. except KeyError:
  755. app.logger.info(f"{root_id} is not there! skipping")
  756. return thread
  757. @app.route("/note/<note_id>")
  758. def note_by_id(note_id):
  759. if is_api_request():
  760. return redirect(url_for("outbox_activity", item_id=note_id))
  761. data = DB.activities.find_one(
  762. {"box": Box.OUTBOX.value, "remote_id": back.activity_url(note_id)}
  763. )
  764. if not data:
  765. abort(404)
  766. if data["meta"].get("deleted", False):
  767. abort(410)
  768. thread = _build_thread(data)
  769. app.logger.info(f"thread={thread!r}")
  770. raw_likes = list(
  771. DB.activities.find(
  772. {
  773. "meta.undo": False,
  774. "meta.deleted": False,
  775. "type": ActivityType.LIKE.value,
  776. "$or": [
  777. # FIXME(tsileo): remove all the useless $or
  778. {"activity.object.id": data["activity"]["object"]["id"]},
  779. {"activity.object": data["activity"]["object"]["id"]},
  780. ],
  781. }
  782. )
  783. )
  784. likes = []
  785. for doc in raw_likes:
  786. try:
  787. likes.append(doc["meta"]["actor"])
  788. except Exception:
  789. app.logger.exception(f"invalid doc: {doc!r}")
  790. app.logger.info(f"likes={likes!r}")
  791. raw_shares = list(
  792. DB.activities.find(
  793. {
  794. "meta.undo": False,
  795. "meta.deleted": False,
  796. "type": ActivityType.ANNOUNCE.value,
  797. "$or": [
  798. {"activity.object.id": data["activity"]["object"]["id"]},
  799. {"activity.object": data["activity"]["object"]["id"]},
  800. ],
  801. }
  802. )
  803. )
  804. shares = []
  805. for doc in raw_shares:
  806. try:
  807. shares.append(doc["meta"]["actor"])
  808. except Exception:
  809. app.logger.exception(f"invalid doc: {doc!r}")
  810. app.logger.info(f"shares={shares!r}")
  811. return render_template(
  812. "note.html", likes=likes, shares=shares, thread=thread, note=data
  813. )
  814. @app.route("/nodeinfo")
  815. def nodeinfo():
  816. response = _get_cached("api")
  817. cached = True
  818. if not response:
  819. cached = False
  820. q = {
  821. "box": Box.OUTBOX.value,
  822. "meta.deleted": False, # TODO(tsileo): retrieve deleted and expose tombstone
  823. "type": {"$in": [ActivityType.CREATE.value, ActivityType.ANNOUNCE.value]},
  824. }
  825. response = json.dumps(
  826. {
  827. "version": "2.0",
  828. "software": {
  829. "name": "microblogpub",
  830. "version": f"Microblog.pub {VERSION}",
  831. },
  832. "protocols": ["activitypub"],
  833. "services": {"inbound": [], "outbound": []},
  834. "openRegistrations": False,
  835. "usage": {"users": {"total": 1}, "localPosts": DB.activities.count(q)},
  836. "metadata": {
  837. "sourceCode": config.SOURCE_URL,
  838. "nodeName": f"@{USERNAME}@{DOMAIN}",
  839. },
  840. }
  841. )
  842. if not cached:
  843. _cache(response, "api")
  844. return Response(
  845. headers={
  846. "Content-Type": "application/json; profile=http://nodeinfo.diaspora.software/ns/schema/2.0#"
  847. },
  848. response=response,
  849. )
  850. @app.route("/.well-known/nodeinfo")
  851. def wellknown_nodeinfo():
  852. return flask_jsonify(
  853. links=[
  854. {
  855. "rel": "http://nodeinfo.diaspora.software/ns/schema/2.0",
  856. "href": f"{ID}/nodeinfo",
  857. }
  858. ]
  859. )
  860. @app.route("/.well-known/webfinger")
  861. def wellknown_webfinger():
  862. """Enable WebFinger support, required for Mastodon interopability."""
  863. # TODO(tsileo): move this to little-boxes?
  864. resource = request.args.get("resource")
  865. if resource not in [f"acct:{USERNAME}@{DOMAIN}",f"acct:{USERNAME}@{PUBLIC_DOMAIN}",ID,ACTOR_URL]:
  866. abort(404)
  867. out = {
  868. "subject": f"acct:{USERNAME}@{DOMAIN}",
  869. "aliases": [ID],
  870. "links": [
  871. {
  872. "rel": "http://webfinger.net/rel/profile-page",
  873. "type": "text/html",
  874. "href": BASE_URL,
  875. },
  876. {"rel": "self", "type": "application/activity+json", "href": ID},
  877. {
  878. "rel": "http://ostatus.org/schema/1.0/subscribe",
  879. "template": BASE_URL + "/authorize_follow?profile={uri}",
  880. },
  881. {"rel": "magic-public-key", "href": KEY.to_magic_key()},
  882. {
  883. "href": ICON_URL,
  884. "rel": "http://webfinger.net/rel/avatar",
  885. "type": mimetypes.guess_type(ICON_URL)[0],
  886. },
  887. ],
  888. }
  889. return Response(
  890. response=json.dumps(out),
  891. headers={
  892. "Content-Type": "application/jrd+json; charset=utf-8"
  893. if not app.debug
  894. else "application/json"
  895. },
  896. )
  897. def add_extra_collection(raw_doc: Dict[str, Any]) -> Dict[str, Any]:
  898. if raw_doc["activity"]["type"] != ActivityType.CREATE.value:
  899. return raw_doc
  900. raw_doc["activity"]["object"]["replies"] = embed_collection(
  901. raw_doc.get("meta", {}).get("count_direct_reply", 0),
  902. f'{raw_doc["remote_id"]}/replies',
  903. )
  904. raw_doc["activity"]["object"]["likes"] = embed_collection(
  905. raw_doc.get("meta", {}).get("count_like", 0), f'{raw_doc["remote_id"]}/likes'
  906. )
  907. raw_doc["activity"]["object"]["shares"] = embed_collection(
  908. raw_doc.get("meta", {}).get("count_boost", 0), f'{raw_doc["remote_id"]}/shares'
  909. )
  910. return raw_doc
  911. def remove_context(activity: Dict[str, Any]) -> Dict[str, Any]:
  912. if "@context" in activity:
  913. del activity["@context"]
  914. return activity
  915. def _add_answers_to_questions(raw_doc: Dict[str, Any]) -> None:
  916. activity = raw_doc["activity"]
  917. if (
  918. ap._has_type(activity["type"], ActivityType.CREATE)
  919. and "object" in activity
  920. and ap._has_type(activity["object"]["type"], ActivityType.QUESTION)
  921. ):
  922. for choice in activity["object"].get("oneOf", activity["object"].get("anyOf")):
  923. choice["replies"] = {
  924. "type": ActivityType.COLLECTION.value,
  925. "totalItems": raw_doc["meta"]
  926. .get("question_answers", {})
  927. .get(_answer_key(choice["name"]), 0),
  928. }
  929. now = datetime.now().astimezone()
  930. if format_datetime(now) > activity["object"]["endTime"]:
  931. activity["object"]["closed"] = activity["object"]["endTime"]
  932. def activity_from_doc(raw_doc: Dict[str, Any], embed: bool = False) -> Dict[str, Any]:
  933. raw_doc = add_extra_collection(raw_doc)
  934. activity = clean_activity(raw_doc["activity"])
  935. # Handle Questions
  936. # TODO(tsileo): what about object embedded by ID/URL?
  937. _add_answers_to_questions(raw_doc)
  938. if embed:
  939. return remove_context(activity)
  940. return activity
  941. @app.route("/outbox", methods=["GET", "POST"])
  942. def outbox():
  943. if request.method == "GET":
  944. if not is_api_request():
  945. abort(404)
  946. # TODO(tsileo): returns the whole outbox if authenticated
  947. q = {
  948. "box": Box.OUTBOX.value,
  949. "meta.deleted": False,
  950. "meta.undo": False,
  951. "meta.public": True,
  952. "type": {"$in": [ActivityType.CREATE.value, ActivityType.ANNOUNCE.value]},
  953. }
  954. return jsonify(
  955. **activitypub.build_ordered_collection(
  956. DB.activities,
  957. q=q,
  958. cursor=request.args.get("cursor"),
  959. map_func=lambda doc: activity_from_doc(doc, embed=True),
  960. col_name="outbox",
  961. )
  962. )
  963. # Handle POST request
  964. try:
  965. _api_required()
  966. except BadSignature:
  967. abort(401)
  968. data = request.get_json(force=True)
  969. print(data)
  970. activity = ap.parse_activity(data)
  971. activity_id = post_to_outbox(activity)
  972. return Response(status=201, headers={"Location": activity_id})
  973. @app.route("/outbox/<item_id>")
  974. def outbox_detail(item_id):
  975. doc = DB.activities.find_one(
  976. {"box": Box.OUTBOX.value, "remote_id": back.activity_url(item_id)}
  977. )
  978. if not doc:
  979. abort(404)
  980. if doc["meta"].get("deleted", False):
  981. obj = ap.parse_activity(doc["activity"])
  982. resp = jsonify(**obj.get_tombstone().to_dict())
  983. resp.status_code = 410
  984. return resp
  985. return jsonify(**activity_from_doc(doc))
  986. @app.route("/outbox/<item_id>/activity")
  987. def outbox_activity(item_id):
  988. data = DB.activities.find_one(
  989. {"box": Box.OUTBOX.value, "remote_id": back.activity_url(item_id)}
  990. )
  991. if not data:
  992. abort(404)
  993. obj = activity_from_doc(data)
  994. if data["meta"].get("deleted", False):
  995. obj = ap.parse_activity(data["activity"])
  996. resp = jsonify(**obj.get_object().get_tombstone().to_dict())
  997. resp.status_code = 410
  998. return resp
  999. if obj["type"] != ActivityType.CREATE.value:
  1000. abort(404)
  1001. return jsonify(**obj["object"])
  1002. @app.route("/outbox/<item_id>/replies")
  1003. def outbox_activity_replies(item_id):
  1004. if not is_api_request():
  1005. abort(404)
  1006. data = DB.activities.find_one(
  1007. {
  1008. "box": Box.OUTBOX.value,
  1009. "remote_id": back.activity_url(item_id),
  1010. "meta.deleted": False,
  1011. }
  1012. )
  1013. if not data:
  1014. abort(404)
  1015. obj = ap.parse_activity(data["activity"])
  1016. if obj.ACTIVITY_TYPE != ActivityType.CREATE:
  1017. abort(404)
  1018. q = {
  1019. "meta.deleted": False,
  1020. "type": ActivityType.CREATE.value,
  1021. "activity.object.inReplyTo": obj.get_object().id,
  1022. }
  1023. return jsonify(
  1024. **activitypub.build_ordered_collection(
  1025. DB.activities,
  1026. q=q,
  1027. cursor=request.args.get("cursor"),
  1028. map_func=lambda doc: doc["activity"]["object"],
  1029. col_name=f"outbox/{item_id}/replies",
  1030. first_page=request.args.get("page") == "first",
  1031. )
  1032. )
  1033. @app.route("/outbox/<item_id>/likes")
  1034. def outbox_activity_likes(item_id):
  1035. if not is_api_request():
  1036. abort(404)
  1037. data = DB.activities.find_one(
  1038. {
  1039. "box": Box.OUTBOX.value,
  1040. "remote_id": back.activity_url(item_id),
  1041. "meta.deleted": False,
  1042. }
  1043. )
  1044. if not data:
  1045. abort(404)
  1046. obj = ap.parse_activity(data["activity"])
  1047. if obj.ACTIVITY_TYPE != ActivityType.CREATE:
  1048. abort(404)
  1049. q = {
  1050. "meta.undo": False,
  1051. "type": ActivityType.LIKE.value,
  1052. "$or": [
  1053. {"activity.object.id": obj.get_object().id},
  1054. {"activity.object": obj.get_object().id},
  1055. ],
  1056. }
  1057. return jsonify(
  1058. **activitypub.build_ordered_collection(
  1059. DB.activities,
  1060. q=q,
  1061. cursor=request.args.get("cursor"),
  1062. map_func=lambda doc: remove_context(doc["activity"]),
  1063. col_name=f"outbox/{item_id}/likes",
  1064. first_page=request.args.get("page") == "first",
  1065. )
  1066. )
  1067. @app.route("/outbox/<item_id>/shares")
  1068. def outbox_activity_shares(item_id):
  1069. if not is_api_request():
  1070. abort(404)
  1071. data = DB.activities.find_one(
  1072. {
  1073. "box": Box.OUTBOX.value,
  1074. "remote_id": back.activity_url(item_id),
  1075. "meta.deleted": False,
  1076. }
  1077. )
  1078. if not data:
  1079. abort(404)
  1080. obj = ap.parse_activity(data["activity"])
  1081. if obj.ACTIVITY_TYPE != ActivityType.CREATE:
  1082. abort(404)
  1083. q = {
  1084. "meta.undo": False,
  1085. "type": ActivityType.ANNOUNCE.value,
  1086. "$or": [
  1087. {"activity.object.id": obj.get_object().id},
  1088. {"activity.object": obj.get_object().id},
  1089. ],
  1090. }
  1091. return jsonify(
  1092. **activitypub.build_ordered_collection(
  1093. DB.activities,
  1094. q=q,
  1095. cursor=request.args.get("cursor"),
  1096. map_func=lambda doc: remove_context(doc["activity"]),
  1097. col_name=f"outbox/{item_id}/shares",
  1098. first_page=request.args.get("page") == "first",
  1099. )
  1100. )
  1101. @app.route("/admin", methods=["GET"])
  1102. @login_required
  1103. def admin():
  1104. q = {
  1105. "meta.deleted": False,
  1106. "meta.undo": False,
  1107. "type": ActivityType.LIKE.value,
  1108. "box": Box.OUTBOX.value,
  1109. }
  1110. col_liked = DB.activities.count(q)
  1111. return render_template(
  1112. "admin.html",
  1113. instances=list(DB.instances.find()),
  1114. inbox_size=DB.activities.count({"box": Box.INBOX.value}),
  1115. outbox_size=DB.activities.count({"box": Box.OUTBOX.value}),
  1116. col_liked=col_liked,
  1117. col_followers=DB.activities.count(
  1118. {
  1119. "box": Box.INBOX.value,
  1120. "type": ActivityType.FOLLOW.value,
  1121. "meta.undo": False,
  1122. }
  1123. ),
  1124. col_following=DB.activities.count(
  1125. {
  1126. "box": Box.OUTBOX.value,
  1127. "type": ActivityType.FOLLOW.value,
  1128. "meta.undo": False,
  1129. }
  1130. ),
  1131. )
  1132. @app.route("/admin/tasks", methods=["GET"])
  1133. @login_required
  1134. def admin_tasks():
  1135. return render_template(
  1136. "admin_tasks.html",
  1137. success=p.get_success(),
  1138. dead=p.get_dead(),
  1139. waiting=p.get_waiting(),
  1140. cron=p.get_cron(),
  1141. )
  1142. @app.route("/admin/lookup", methods=["GET", "POST"])
  1143. @login_required
  1144. def admin_lookup():
  1145. data = None
  1146. meta = None
  1147. if request.method == "POST":
  1148. if request.form.get("url"):
  1149. data = lookup(request.form.get("url"))
  1150. if data.has_type(ActivityType.ANNOUNCE):
  1151. meta = dict(
  1152. object=data.get_object().to_dict(),
  1153. object_actor=data.get_object().get_actor().to_dict(),
  1154. actor=data.get_actor().to_dict(),
  1155. )
  1156. print(data)
  1157. app.logger.debug(data.to_dict())
  1158. return render_template(
  1159. "lookup.html", data=data, meta=meta, url=request.form.get("url")
  1160. )
  1161. @app.route("/admin/thread")
  1162. @login_required
  1163. def admin_thread():
  1164. data = DB.activities.find_one(
  1165. {
  1166. "type": ActivityType.CREATE.value,
  1167. "activity.object.id": request.args.get("oid"),
  1168. }
  1169. )
  1170. if not data:
  1171. abort(404)
  1172. if data["meta"].get("deleted", False):
  1173. abort(410)
  1174. thread = _build_thread(data)
  1175. tpl = "note.html"
  1176. if request.args.get("debug"):
  1177. tpl = "note_debug.html"
  1178. return render_template(tpl, thread=thread, note=data)
  1179. @app.route("/admin/new", methods=["GET"])
  1180. @login_required
  1181. def admin_new():
  1182. reply_id = None
  1183. content = ""
  1184. thread = []
  1185. print(request.args)
  1186. if request.args.get("reply"):
  1187. data = DB.activities.find_one({"activity.object.id": request.args.get("reply")})
  1188. if data:
  1189. reply = ap.parse_activity(data["activity"])
  1190. else:
  1191. data = dict(
  1192. meta={},
  1193. activity=dict(
  1194. object=get_backend().fetch_iri(request.args.get("reply"))
  1195. ),
  1196. )
  1197. reply = ap.parse_activity(data["activity"]["object"])
  1198. reply_id = reply.id
  1199. if reply.ACTIVITY_TYPE == ActivityType.CREATE:
  1200. reply_id = reply.get_object().id
  1201. actor = reply.get_actor()
  1202. domain = urlparse(actor.id).netloc
  1203. # FIXME(tsileo): if reply of reply, fetch all participants
  1204. content = f"@{actor.preferredUsername}@{domain} "
  1205. thread = _build_thread(data)
  1206. return render_template(
  1207. "new.html",
  1208. reply=reply_id,
  1209. content=content,
  1210. thread=thread,
  1211. emojis=EMOJIS.split(" "),
  1212. )
  1213. @app.route("/admin/notifications")
  1214. @login_required
  1215. def admin_notifications():
  1216. # Setup the cron for deleting old activities
  1217. # FIXME(tsileo): put back to 12h
  1218. p.push({}, "/task/cleanup", schedule="@every 1h")
  1219. # Trigger a cleanup if asked
  1220. if request.args.get("cleanup"):
  1221. p.push({}, "/task/cleanup")
  1222. # FIXME(tsileo): show unfollow (performed by the current actor) and liked???
  1223. mentions_query = {
  1224. "type": ActivityType.CREATE.value,
  1225. "activity.object.tag.type": "Mention",
  1226. "activity.object.tag.name": f"@{USERNAME}@{DOMAIN}",
  1227. "meta.deleted": False,
  1228. }
  1229. replies_query = {
  1230. "type": ActivityType.CREATE.value,
  1231. "activity.object.inReplyTo": {"$regex": f"^{BASE_URL}"},
  1232. }
  1233. announced_query = {
  1234. "type": ActivityType.ANNOUNCE.value,
  1235. "activity.object": {"$regex": f"^{BASE_URL}"},
  1236. }
  1237. new_followers_query = {"type": ActivityType.FOLLOW.value}
  1238. unfollow_query = {
  1239. "type": ActivityType.UNDO.value,
  1240. "activity.object.type": ActivityType.FOLLOW.value,
  1241. }
  1242. likes_query = {
  1243. "type": ActivityType.LIKE.value,
  1244. "activity.object": {"$regex": f"^{BASE_URL}"},
  1245. }
  1246. followed_query = {"type": ActivityType.ACCEPT.value}
  1247. q = {
  1248. "box": Box.INBOX.value,
  1249. "$or": [
  1250. mentions_query,
  1251. announced_query,
  1252. replies_query,
  1253. new_followers_query,
  1254. followed_query,
  1255. unfollow_query,
  1256. likes_query,
  1257. ],
  1258. }
  1259. inbox_data, older_than, newer_than = paginated_query(DB.activities, q)
  1260. return render_template(
  1261. "stream.html",
  1262. inbox_data=inbox_data,
  1263. older_than=older_than,
  1264. newer_than=newer_than,
  1265. )
  1266. @app.route("/api/key")
  1267. @login_required
  1268. def api_user_key():
  1269. return flask_jsonify(api_key=ADMIN_API_KEY)
  1270. def _user_api_arg(key: str, **kwargs):
  1271. """Try to get the given key from the requests, try JSON body, form data and query arg."""
  1272. if request.is_json:
  1273. oid = request.json.get(key)
  1274. else:
  1275. oid = request.args.get(key) or request.form.get(key)
  1276. if not oid:
  1277. if "default" in kwargs:
  1278. app.logger.info(f'{key}={kwargs.get("default")}')
  1279. return kwargs.get("default")
  1280. raise ValueError(f"missing {key}")
  1281. app.logger.info(f"{key}={oid}")
  1282. return oid
  1283. def _user_api_get_note(from_outbox: bool = False):
  1284. oid = _user_api_arg("id")
  1285. app.logger.info(f"fetching {oid}")
  1286. note = ap.parse_activity(get_backend().fetch_iri(oid))
  1287. if from_outbox and not note.id.startswith(ID):
  1288. raise NotFromOutboxError(
  1289. f"cannot load {note.id}, id must be owned by the server"
  1290. )
  1291. return note
  1292. def _user_api_response(**kwargs):
  1293. _redirect = _user_api_arg("redirect", default=None)
  1294. if _redirect:
  1295. return redirect(_redirect)
  1296. resp = flask_jsonify(**kwargs)
  1297. resp.status_code = 201
  1298. return resp
  1299. @app.route("/api/note/delete", methods=["POST"])
  1300. @api_required
  1301. def api_delete():
  1302. """API endpoint to delete a Note activity."""
  1303. note = _user_api_get_note(from_outbox=True)
  1304. delete = ap.Delete(actor=ID, object=ap.Tombstone(id=note.id).to_dict(embed=True))
  1305. delete_id = post_to_outbox(delete)
  1306. return _user_api_response(activity=delete_id)
  1307. @app.route("/api/boost", methods=["POST"])
  1308. @api_required
  1309. def api_boost():
  1310. note = _user_api_get_note()
  1311. announce = note.build_announce(MY_PERSON)
  1312. announce_id = post_to_outbox(announce)
  1313. return _user_api_response(activity=announce_id)
  1314. @app.route("/api/vote", methods=["POST"])
  1315. @api_required
  1316. def api_vote():
  1317. oid = _user_api_arg("id")
  1318. app.logger.info(f"fetching {oid}")
  1319. note = ap.parse_activity(get_backend().fetch_iri(oid))
  1320. choice = _user_api_arg("choice")
  1321. raw_note = dict(
  1322. attributedTo=MY_PERSON.id,
  1323. cc=[],
  1324. to=note.get_actor().id,
  1325. name=choice,
  1326. tag=[],
  1327. inReplyTo=note.id,
  1328. )
  1329. note = ap.Note(**raw_note)
  1330. create = note.build_create()
  1331. create_id = post_to_outbox(create)
  1332. return _user_api_response(activity=create_id)
  1333. @app.route("/api/like", methods=["POST"])
  1334. @api_required
  1335. def api_like():
  1336. note = _user_api_get_note()
  1337. like = note.build_like(MY_PERSON)
  1338. like_id = post_to_outbox(like)
  1339. return _user_api_response(activity=like_id)
  1340. @app.route("/api/note/pin", methods=["POST"])
  1341. @api_required
  1342. def api_pin():
  1343. note = _user_api_get_note(from_outbox=True)
  1344. DB.activities.update_one(
  1345. {"activity.object.id": note.id, "box": Box.OUTBOX.value},
  1346. {"$set": {"meta.pinned": True}},
  1347. )
  1348. return _user_api_response(pinned=True)
  1349. @app.route("/api/note/unpin", methods=["POST"])
  1350. @api_required
  1351. def api_unpin():
  1352. note = _user_api_get_note(from_outbox=True)
  1353. DB.activities.update_one(
  1354. {"activity.object.id": note.id, "box": Box.OUTBOX.value},
  1355. {"$set": {"meta.pinned": False}},
  1356. )
  1357. return _user_api_response(pinned=False)
  1358. @app.route("/api/undo", methods=["POST"])
  1359. @api_required
  1360. def api_undo():
  1361. oid = _user_api_arg("id")
  1362. doc = DB.activities.find_one(
  1363. {
  1364. "box": Box.OUTBOX.value,
  1365. "$or": [{"remote_id": back.activity_url(oid)}, {"remote_id": oid}],
  1366. }
  1367. )
  1368. if not doc:
  1369. raise ActivityNotFoundError(f"cannot found {oid}")
  1370. obj = ap.parse_activity(doc.get("activity"))
  1371. # FIXME(tsileo): detect already undo-ed and make this API call idempotent
  1372. undo = obj.build_undo()
  1373. undo_id = post_to_outbox(undo)
  1374. return _user_api_response(activity=undo_id)
  1375. @app.route("/admin/stream")
  1376. @login_required
  1377. def admin_stream():
  1378. q = {"meta.stream": True, "meta.deleted": False}
  1379. tpl = "stream.html"
  1380. if request.args.get("debug"):
  1381. tpl = "stream_debug.html"
  1382. if request.args.get("debug_inbox"):
  1383. q = {}
  1384. inbox_data, older_than, newer_than = paginated_query(
  1385. DB.activities, q, limit=int(request.args.get("limit", 25))
  1386. )
  1387. return render_template(
  1388. tpl, inbox_data=inbox_data, older_than=older_than, newer_than=newer_than
  1389. )
  1390. @app.route("/inbox", methods=["GET", "POST"]) # noqa: C901
  1391. def inbox():
  1392. if request.method == "GET":
  1393. if not is_api_request():
  1394. abort(404)
  1395. try:
  1396. _api_required()
  1397. except BadSignature:
  1398. abort(404)
  1399. return jsonify(
  1400. **activitypub.build_ordered_collection(
  1401. DB.activities,
  1402. q={"meta.deleted": False, "box": Box.INBOX.value},
  1403. cursor=request.args.get("cursor"),
  1404. map_func=lambda doc: remove_context(doc["activity"]),
  1405. col_name="inbox",
  1406. )
  1407. )
  1408. data = request.get_json(force=True)
  1409. logger.debug(f"req_headers={request.headers}")
  1410. logger.debug(f"raw_data={data}")
  1411. try:
  1412. if not verify_request(
  1413. request.method, request.path, request.headers, request.data
  1414. ):
  1415. raise Exception("failed to verify request")
  1416. except Exception:
  1417. logger.exception(
  1418. "failed to verify request, trying to verify the payload by fetching the remote"
  1419. )
  1420. try:
  1421. data = get_backend().fetch_iri(data["id"])
  1422. except ActivityGoneError:
  1423. # XXX Mastodon sends Delete activities that are not dereferencable, it's the actor url with #delete
  1424. # appended, so an `ActivityGoneError` kind of ensure it's "legit"
  1425. if data["type"] == ActivityType.DELETE.value and data["id"].startswith(
  1426. data["object"]
  1427. ):
  1428. logger.info(f"received a Delete for an actor {data!r}")
  1429. if get_backend().inbox_check_duplicate(MY_PERSON, data["id"]):
  1430. # The activity is already in the inbox
  1431. logger.info(f"received duplicate activity {data!r}, dropping it")
  1432. DB.activities.insert_one(
  1433. {
  1434. "box": Box.INBOX.value,
  1435. "activity": data,
  1436. "type": _to_list(data["type"]),
  1437. "remote_id": data["id"],
  1438. "meta": {"undo": False, "deleted": False},
  1439. }
  1440. )
  1441. # TODO(tsileo): write the callback the the delete external actor event
  1442. return Response(status=201)
  1443. except Exception:
  1444. logger.exception(f'failed to fetch remote id at {data["id"]}')
  1445. return Response(
  1446. status=422,
  1447. headers={"Content-Type": "application/json"},
  1448. response=json.dumps(
  1449. {
  1450. "error": "failed to verify request (using HTTP signatures or fetching the IRI)"
  1451. }
  1452. ),
  1453. )
  1454. print(data)
  1455. activity = ap.parse_activity(data)
  1456. logger.debug(f"inbox activity={activity}/{data}")
  1457. post_to_inbox(activity)
  1458. return Response(status=201)
  1459. def without_id(l):
  1460. out = []
  1461. for d in l:
  1462. if "_id" in d:
  1463. del d["_id"]
  1464. out.append(d)
  1465. return out
  1466. @app.route("/api/debug", methods=["GET", "DELETE"])
  1467. @api_required
  1468. def api_debug():
  1469. """Endpoint used/needed for testing, only works in DEBUG_MODE."""
  1470. if not DEBUG_MODE:
  1471. return flask_jsonify(message="DEBUG_MODE is off")
  1472. if request.method == "DELETE":
  1473. _drop_db()
  1474. return flask_jsonify(message="DB dropped")
  1475. return flask_jsonify(
  1476. inbox=DB.activities.count({"box": Box.INBOX.value}),
  1477. outbox=DB.activities.count({"box": Box.OUTBOX.value}),
  1478. outbox_data=without_id(DB.activities.find({"box": Box.OUTBOX.value})),
  1479. )
  1480. @app.route("/api/new_note", methods=["POST"])
  1481. @api_required
  1482. def api_new_note():
  1483. source = _user_api_arg("content")
  1484. if not source:
  1485. raise ValueError("missing content")
  1486. _reply, reply = None, None
  1487. try:
  1488. _reply = _user_api_arg("reply")
  1489. except ValueError:
  1490. pass
  1491. content, tags = parse_markdown(source)
  1492. to = request.args.get("to")
  1493. cc = [ID + "/followers"]
  1494. if _reply:
  1495. reply = ap.fetch_remote_activity(_reply)
  1496. cc.append(reply.attributedTo)
  1497. for tag in tags:
  1498. if tag["type"] == "Mention":
  1499. cc.append(tag["href"])
  1500. raw_note = dict(
  1501. attributedTo=MY_PERSON.id,
  1502. cc=list(set(cc)),
  1503. to=[to if to else ap.AS_PUBLIC],
  1504. content=content,
  1505. tag=tags,
  1506. source={"mediaType": "text/markdown", "content": source},
  1507. inReplyTo=reply.id if reply else None,
  1508. )
  1509. if "file" in request.files and request.files["file"].filename:
  1510. file = request.files["file"]
  1511. rfilename = secure_filename(file.filename)
  1512. with BytesIO() as buf:
  1513. file.save(buf)
  1514. oid = MEDIA_CACHE.save_upload(buf, rfilename)
  1515. mtype = mimetypes.guess_type(rfilename)[0]
  1516. raw_note["attachment"] = [
  1517. {
  1518. "mediaType": mtype,
  1519. "name": rfilename,
  1520. "type": "Document",
  1521. "url": f"{BASE_URL}/uploads/{oid}/{rfilename}",
  1522. }
  1523. ]
  1524. note = ap.Note(**raw_note)
  1525. create = note.build_create()
  1526. create_id = post_to_outbox(create)
  1527. return _user_api_response(activity=create_id)
  1528. @app.route("/api/new_question", methods=["POST"])
  1529. @api_required
  1530. def api_new_question():
  1531. source = _user_api_arg("content")
  1532. if not source:
  1533. raise ValueError("missing content")
  1534. content, tags = parse_markdown(source)
  1535. cc = [ID + "/followers"]
  1536. for tag in tags:
  1537. if tag["type"] == "Mention":
  1538. cc.append(tag["href"])
  1539. answers = []
  1540. for i in range(4):
  1541. a = _user_api_arg(f"answer{i}", default=None)
  1542. if not a:
  1543. break
  1544. answers.append({"type": ActivityType.NOTE.value, "name": a})
  1545. choices = {
  1546. "endTime": ap.format_datetime(
  1547. datetime.now().astimezone()
  1548. + timedelta(minutes=int(_user_api_arg("open_for")))
  1549. )
  1550. }
  1551. of = _user_api_arg("of")
  1552. if of == "anyOf":
  1553. choices["anyOf"] = answers
  1554. else:
  1555. choices["oneOf"] = answers
  1556. raw_question = dict(
  1557. attributedTo=MY_PERSON.id,
  1558. cc=list(set(cc)),
  1559. to=[ap.AS_PUBLIC],
  1560. content=content,
  1561. tag=tags,
  1562. source={"mediaType": "text/markdown", "content": source},
  1563. inReplyTo=None,
  1564. **choices,
  1565. )
  1566. question = ap.Question(**raw_question)
  1567. create = question.build_create()
  1568. create_id = post_to_outbox(create)
  1569. return _user_api_response(activity=create_id)
  1570. @app.route("/api/stream")
  1571. @api_required
  1572. def api_stream():
  1573. return Response(
  1574. response=json.dumps(
  1575. activitypub.build_inbox_json_feed("/api/stream", request.args.get("cursor"))
  1576. ),
  1577. headers={"Content-Type": "application/json"},
  1578. )
  1579. @app.route("/api/block", methods=["POST"])
  1580. @api_required
  1581. def api_block():
  1582. actor = _user_api_arg("actor")
  1583. existing = DB.activities.find_one(
  1584. {
  1585. "box": Box.OUTBOX.value,
  1586. "type": ActivityType.BLOCK.value,
  1587. "activity.object": actor,
  1588. "meta.undo": False,
  1589. }
  1590. )
  1591. if existing:
  1592. return _user_api_response(activity=existing["activity"]["id"])
  1593. block = ap.Block(actor=MY_PERSON.id, object=actor)
  1594. block_id = post_to_outbox(block)
  1595. return _user_api_response(activity=block_id)
  1596. @app.route("/api/follow", methods=["POST"])
  1597. @api_required
  1598. def api_follow():
  1599. actor = _user_api_arg("actor")
  1600. q = {
  1601. "box": Box.OUTBOX.value,
  1602. "type": ActivityType.FOLLOW.value,
  1603. "meta.undo": False,
  1604. "activity.object": actor,
  1605. }
  1606. existing = DB.activities.find_one(q)
  1607. if existing:
  1608. return _user_api_response(activity=existing["activity"]["id"])
  1609. follow = ap.Follow(actor=MY_PERSON.id, object=actor)
  1610. follow_id = post_to_outbox(follow)
  1611. return _user_api_response(activity=follow_id)
  1612. @app.route("/followers")
  1613. def followers():
  1614. q = {"box": Box.INBOX.value, "type": ActivityType.FOLLOW.value, "meta.undo": False}
  1615. if is_api_request():
  1616. return jsonify(
  1617. **activitypub.build_ordered_collection(
  1618. DB.activities,
  1619. q=q,
  1620. cursor=request.args.get("cursor"),
  1621. map_func=lambda doc: doc["activity"]["actor"],
  1622. col_name="followers",
  1623. )
  1624. )
  1625. raw_followers, older_than, newer_than = paginated_query(DB.activities, q)
  1626. followers = [
  1627. doc["meta"]["actor"] for doc in raw_followers if "actor" in doc.get("meta", {})
  1628. ]
  1629. return render_template(
  1630. "followers.html",
  1631. followers_data=followers,
  1632. older_than=older_than,
  1633. newer_than=newer_than,
  1634. )
  1635. @app.route("/following")
  1636. def following():
  1637. q = {"box": Box.OUTBOX.value, "type": ActivityType.FOLLOW.value, "meta.undo": False}
  1638. if is_api_request():
  1639. return jsonify(
  1640. **activitypub.build_ordered_collection(
  1641. DB.activities,
  1642. q=q,
  1643. cursor=request.args.get("cursor"),
  1644. map_func=lambda doc: doc["activity"]["object"],
  1645. col_name="following",
  1646. )
  1647. )
  1648. if config.HIDE_FOLLOWING and not session.get("logged_in", False):
  1649. abort(404)
  1650. following, older_than, newer_than = paginated_query(DB.activities, q)
  1651. following = [
  1652. (doc["remote_id"], doc["meta"]["object"])
  1653. for doc in following
  1654. if "remote_id" in doc and "object" in doc.get("meta", {})
  1655. ]
  1656. return render_template(
  1657. "following.html",
  1658. following_data=following,
  1659. older_than=older_than,
  1660. newer_than=newer_than,
  1661. )
  1662. @app.route("/tags/<tag>")
  1663. def tags(tag):
  1664. if not DB.activities.count(
  1665. {
  1666. "box": Box.OUTBOX.value,
  1667. "activity.object.tag.type": "Hashtag",
  1668. "activity.object.tag.name": "#" + tag,
  1669. }
  1670. ):
  1671. abort(404)
  1672. if not is_api_request():
  1673. return render_template(
  1674. "tags.html",
  1675. tag=tag,
  1676. outbox_data=DB.activities.find(
  1677. {
  1678. "box": Box.OUTBOX.value,
  1679. "type": ActivityType.CREATE.value,
  1680. "meta.deleted": False,
  1681. "activity.object.tag.type": "Hashtag",
  1682. "activity.object.tag.name": "#" + tag,
  1683. }
  1684. ),
  1685. )
  1686. q = {
  1687. "box": Box.OUTBOX.value,
  1688. "meta.deleted": False,
  1689. "meta.undo": False,
  1690. "type": ActivityType.CREATE.value,
  1691. "activity.object.tag.type": "Hashtag",
  1692. "activity.object.tag.name": "#" + tag,
  1693. }
  1694. return jsonify(
  1695. **activitypub.build_ordered_collection(
  1696. DB.activities,
  1697. q=q,
  1698. cursor=request.args.get("cursor"),
  1699. map_func=lambda doc: doc["activity"]["object"]["id"],
  1700. col_name=f"tags/{tag}",
  1701. )
  1702. )
  1703. @app.route("/featured")
  1704. def featured():
  1705. if not is_api_request():
  1706. abort(404)
  1707. q = {
  1708. "box": Box.OUTBOX.value,
  1709. "type": ActivityType.CREATE.value,
  1710. "meta.deleted": False,
  1711. "meta.undo": False,
  1712. "meta.pinned": True,
  1713. }
  1714. data = [clean_activity(doc["activity"]["object"]) for doc in DB.activities.find(q)]
  1715. return jsonify(**activitypub.simple_build_ordered_collection("featured", data))
  1716. @app.route("/liked")
  1717. def liked():
  1718. if not is_api_request():
  1719. q = {
  1720. "box": Box.OUTBOX.value,
  1721. "type": ActivityType.LIKE.value,
  1722. "meta.deleted": False,
  1723. "meta.undo": False,
  1724. }
  1725. liked, older_than, newer_than = paginated_query(DB.activities, q)
  1726. return render_template(
  1727. "liked.html", liked=liked, older_than=older_than, newer_than=newer_than
  1728. )
  1729. q = {"meta.deleted": False, "meta.undo": False, "type": ActivityType.LIKE.value}
  1730. return jsonify(
  1731. **activitypub.build_ordered_collection(
  1732. DB.activities,
  1733. q=q,
  1734. cursor=request.args.get("cursor"),
  1735. map_func=lambda doc: doc["activity"]["object"],
  1736. col_name="liked",
  1737. )
  1738. )
  1739. #######
  1740. # IndieAuth
  1741. def build_auth_resp(payload):
  1742. if request.headers.get("Accept") == "application/json":
  1743. return Response(
  1744. status=200,
  1745. headers={"Content-Type": "application/json"},
  1746. response=json.dumps(payload),
  1747. )
  1748. return Response(
  1749. status=200,
  1750. headers={"Content-Type": "application/x-www-form-urlencoded"},
  1751. response=urlencode(payload),
  1752. )
  1753. def _get_prop(props, name, default=None):
  1754. if name in props:
  1755. items = props.get(name)
  1756. if isinstance(items, list):
  1757. return items[0]
  1758. return items
  1759. return default
  1760. def get_client_id_data(url):
  1761. data = mf2py.parse(url=url)
  1762. for item in data["items"]:
  1763. if "h-x-app" in item["type"] or "h-app" in item["type"]:
  1764. props = item.get("properties", {})
  1765. print(props)
  1766. return dict(
  1767. logo=_get_prop(props, "logo"),
  1768. name=_get_prop(props, "name"),
  1769. url=_get_prop(props, "url"),
  1770. )
  1771. return dict(logo=None, name=url, url=url)
  1772. @app.route("/indieauth/flow", methods=["POST"])
  1773. @login_required
  1774. def indieauth_flow():
  1775. auth = dict(
  1776. scope=" ".join(request.form.getlist("scopes")),
  1777. me=request.form.get("me"),
  1778. client_id=request.form.get("client_id"),
  1779. state=request.form.get("state"),
  1780. redirect_uri=request.form.get("redirect_uri"),
  1781. response_type=request.form.get("response_type"),
  1782. )
  1783. code = binascii.hexlify(os.urandom(8)).decode("utf-8")
  1784. auth.update(code=code, verified=False)
  1785. print(auth)
  1786. if not auth["redirect_uri"]:
  1787. abort(500)
  1788. DB.indieauth.insert_one(auth)
  1789. # FIXME(tsileo): fetch client ID and validate redirect_uri
  1790. red = f'{auth["redirect_uri"]}?code={code}&state={auth["state"]}&me={auth["me"]}'
  1791. return redirect(red)
  1792. @app.route("/indieauth", methods=["GET", "POST"])
  1793. def indieauth_endpoint():
  1794. if request.method == "GET":
  1795. if not session.get("logged_in"):
  1796. return redirect(url_for("admin_login", next=request.url))
  1797. me = request.args.get("me")
  1798. # FIXME(tsileo): ensure me == ID
  1799. client_id = request.args.get("client_id")
  1800. redirect_uri = request.args.get("redirect_uri")
  1801. state = request.args.get("state", "")
  1802. response_type = request.args.get("response_type", "id")
  1803. scope = request.args.get("scope", "").split()
  1804. print("STATE", state)
  1805. return render_template(
  1806. "indieauth_flow.html",
  1807. client=get_client_id_data(client_id),
  1808. scopes=scope,
  1809. redirect_uri=redirect_uri,
  1810. state=state,
  1811. response_type=response_type,
  1812. client_id=client_id,
  1813. me=me,
  1814. )
  1815. # Auth verification via POST
  1816. code = request.form.get("code")
  1817. redirect_uri = request.form.get("redirect_uri")
  1818. client_id = request.form.get("client_id")
  1819. auth = DB.indieauth.find_one_and_update(
  1820. {
  1821. "code": code,
  1822. "redirect_uri": redirect_uri,
  1823. "client_id": client_id,
  1824. }, # }, # , 'verified': False},
  1825. {"$set": {"verified": True}},
  1826. sort=[("_id", pymongo.DESCENDING)],
  1827. )
  1828. print(auth)
  1829. print(code, redirect_uri, client_id)
  1830. if not auth:
  1831. abort(403)
  1832. return
  1833. session["logged_in"] = True
  1834. me = auth["me"]
  1835. state = auth["state"]
  1836. scope = " ".join(auth["scope"])
  1837. print("STATE", state)
  1838. return build_auth_resp({"me": me, "state": state, "scope": scope})
  1839. @app.route("/token", methods=["GET", "POST"])
  1840. def token_endpoint():
  1841. if request.method == "POST":
  1842. code = request.form.get("code")
  1843. me = request.form.get("me")
  1844. redirect_uri = request.form.get("redirect_uri")
  1845. client_id = request.form.get("client_id")
  1846. auth = DB.indieauth.find_one(
  1847. {
  1848. "code": code,
  1849. "me": me,
  1850. "redirect_uri": redirect_uri,
  1851. "client_id": client_id,
  1852. }
  1853. )
  1854. if not auth:
  1855. abort(403)
  1856. scope = " ".join(auth["scope"])
  1857. payload = dict(
  1858. me=me, client_id=client_id, scope=scope, ts=datetime.now().timestamp()
  1859. )
  1860. token = JWT.dumps(payload).decode("utf-8")
  1861. return build_auth_resp({"me": me, "scope": scope, "access_token": token})
  1862. # Token verification
  1863. token = request.headers.get("Authorization").replace("Bearer ", "")
  1864. try:
  1865. payload = JWT.loads(token)
  1866. except BadSignature:
  1867. abort(403)
  1868. # TODO(tsileo): handle expiration
  1869. return build_auth_resp(
  1870. {
  1871. "me": payload["me"],
  1872. "scope": payload["scope"],
  1873. "client_id": payload["client_id"],
  1874. }
  1875. )
  1876. #################
  1877. # Feeds
  1878. @app.route("/feed.json")
  1879. def json_feed():
  1880. return Response(
  1881. response=json.dumps(activitypub.json_feed("/feed.json")),
  1882. headers={"Content-Type": "application/json"},
  1883. )
  1884. @app.route("/feed.atom")
  1885. def atom_feed():
  1886. return Response(
  1887. response=activitypub.gen_feed().atom_str(),
  1888. headers={"Content-Type": "application/atom+xml"},
  1889. )
  1890. @app.route("/feed.rss")
  1891. def rss_feed():
  1892. return Response(
  1893. response=activitypub.gen_feed().rss_str(),
  1894. headers={"Content-Type": "application/rss+xml"},
  1895. )
  1896. ###########
  1897. # Tasks
  1898. class Tasks:
  1899. @staticmethod
  1900. def cache_object(iri: str) -> None:
  1901. p.push(iri, "/task/cache_object")
  1902. @staticmethod
  1903. def cache_actor(iri: str, also_cache_attachments: bool = True) -> None:
  1904. p.push(
  1905. {"iri": iri, "also_cache_attachments": also_cache_attachments},
  1906. "/task/cache_actor",
  1907. )
  1908. @staticmethod
  1909. def post_to_remote_inbox(payload: str, recp: str) -> None:
  1910. p.push({"payload": payload, "to": recp}, "/task/post_to_remote_inbox")
  1911. @staticmethod
  1912. def forward_activity(iri: str) -> None:
  1913. p.push(iri, "/task/forward_activity")
  1914. @staticmethod
  1915. def fetch_og_meta(iri: str) -> None:
  1916. p.push(iri, "/task/fetch_og_meta")
  1917. @staticmethod
  1918. def process_new_activity(iri: str) -> None:
  1919. p.push(iri, "/task/process_new_activity")
  1920. @staticmethod
  1921. def cache_attachments(iri: str) -> None:
  1922. p.push(iri, "/task/cache_attachments")
  1923. @staticmethod
  1924. def finish_post_to_inbox(iri: str) -> None:
  1925. p.push(iri, "/task/finish_post_to_inbox")
  1926. @staticmethod
  1927. def finish_post_to_outbox(iri: str) -> None:
  1928. p.push(iri, "/task/finish_post_to_outbox")
  1929. @app.route("/task/fetch_og_meta", methods=["POST"])
  1930. def task_fetch_og_meta():
  1931. task = p.parse(request)
  1932. app.logger.info(f"task={task!r}")
  1933. iri = task.payload
  1934. try:
  1935. activity = ap.fetch_remote_activity(iri)
  1936. app.logger.info(f"activity={activity!r}")
  1937. if activity.has_type(ap.ActivityType.CREATE):
  1938. note = activity.get_object()
  1939. links = opengraph.links_from_note(note.to_dict())
  1940. og_metadata = opengraph.fetch_og_metadata(USER_AGENT, links)
  1941. for og in og_metadata:
  1942. if not og.get("image"):
  1943. continue
  1944. MEDIA_CACHE.cache_og_image2(og["image"], iri)
  1945. app.logger.debug(f"OG metadata {og_metadata!r}")
  1946. DB.activities.update_one(
  1947. {"remote_id": iri}, {"$set": {"meta.og_metadata": og_metadata}}
  1948. )
  1949. app.logger.info(f"OG metadata fetched for {iri}")
  1950. except (ActivityGoneError, ActivityNotFoundError):
  1951. app.logger.exception(f"dropping activity {iri}, skip OG metedata")
  1952. return ""
  1953. except requests.exceptions.HTTPError as http_err:
  1954. if 400 <= http_err.response.status_code < 500:
  1955. app.logger.exception("bad request, no retry")
  1956. return ""
  1957. app.logger.exception("failed to fetch OG metadata")
  1958. raise TaskError() from http_err
  1959. except Exception as err:
  1960. app.logger.exception(f"failed to fetch OG metadata for {iri}")
  1961. raise TaskError() from err
  1962. return ""
  1963. @app.route("/task/cache_object", methods=["POST"])
  1964. def task_cache_object():
  1965. task = p.parse(request)
  1966. app.logger.info(f"task={task!r}")
  1967. iri = task.payload
  1968. try:
  1969. activity = ap.fetch_remote_activity(iri)
  1970. app.logger.info(f"activity={activity!r}")
  1971. obj = activity.get_object()
  1972. DB.activities.update_one(
  1973. {"remote_id": activity.id},
  1974. {
  1975. "$set": {
  1976. "meta.object": obj.to_dict(embed=True),
  1977. "meta.object_actor": activitypub._actor_to_meta(obj.get_actor()),
  1978. }
  1979. },
  1980. )
  1981. except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError):
  1982. DB.activities.update_one({"remote_id": iri}, {"$set": {"meta.deleted": True}})
  1983. app.logger.exception(f"flagging activity {iri} as deleted, no object caching")
  1984. except Exception as err:
  1985. app.logger.exception(f"failed to cache object for {iri}")
  1986. raise TaskError() from err
  1987. return ""
  1988. @app.route("/task/finish_post_to_outbox", methods=["POST"]) # noqa:C901
  1989. def task_finish_post_to_outbox():
  1990. task = p.parse(request)
  1991. app.logger.info(f"task={task!r}")
  1992. iri = task.payload
  1993. try:
  1994. activity = ap.fetch_remote_activity(iri)
  1995. app.logger.info(f"activity={activity!r}")
  1996. recipients = activity.recipients()
  1997. if activity.has_type(ap.ActivityType.DELETE):
  1998. back.outbox_delete(MY_PERSON, activity)
  1999. elif activity.has_type(ap.ActivityType.UPDATE):
  2000. back.outbox_update(MY_PERSON, activity)
  2001. elif activity.has_type(ap.ActivityType.CREATE):
  2002. back.outbox_create(MY_PERSON, activity)
  2003. elif activity.has_type(ap.ActivityType.ANNOUNCE):
  2004. back.outbox_announce(MY_PERSON, activity)
  2005. elif activity.has_type(ap.ActivityType.LIKE):
  2006. back.outbox_like(MY_PERSON, activity)
  2007. elif activity.has_type(ap.ActivityType.UNDO):
  2008. obj = activity.get_object()
  2009. if obj.has_type(ap.ActivityType.LIKE):
  2010. back.outbox_undo_like(MY_PERSON, obj)
  2011. elif obj.has_type(ap.ActivityType.ANNOUNCE):
  2012. back.outbox_undo_announce(MY_PERSON, obj)
  2013. elif obj.has_type(ap.ActivityType.FOLLOW):
  2014. back.undo_new_following(MY_PERSON, obj)
  2015. app.logger.info(f"recipients={recipients}")
  2016. activity = ap.clean_activity(activity.to_dict())
  2017. DB.cache2.remove()
  2018. payload = json.dumps(activity)
  2019. for recp in recipients:
  2020. app.logger.debug(f"posting to {recp}")
  2021. Tasks.post_to_remote_inbox(payload, recp)
  2022. except (ActivityGoneError, ActivityNotFoundError):
  2023. app.logger.exception(f"no retry")
  2024. except Exception as err:
  2025. app.logger.exception(f"failed to post to remote inbox for {iri}")
  2026. raise TaskError() from err
  2027. return ""
  2028. @app.route("/task/finish_post_to_inbox", methods=["POST"]) # noqa: C901
  2029. def task_finish_post_to_inbox():
  2030. task = p.parse(request)
  2031. app.logger.info(f"task={task!r}")
  2032. iri = task.payload
  2033. try:
  2034. activity = ap.fetch_remote_activity(iri)
  2035. app.logger.info(f"activity={activity!r}")
  2036. if activity.has_type(ap.ActivityType.DELETE):
  2037. back.inbox_delete(MY_PERSON, activity)
  2038. elif activity.has_type(ap.ActivityType.UPDATE):
  2039. back.inbox_update(MY_PERSON, activity)
  2040. elif activity.has_type(ap.ActivityType.CREATE):
  2041. back.inbox_create(MY_PERSON, activity)
  2042. elif activity.has_type(ap.ActivityType.ANNOUNCE):
  2043. back.inbox_announce(MY_PERSON, activity)
  2044. elif activity.has_type(ap.ActivityType.LIKE):
  2045. back.inbox_like(MY_PERSON, activity)
  2046. elif activity.has_type(ap.ActivityType.FOLLOW):
  2047. # Reply to a Follow with an Accept
  2048. accept = ap.Accept(actor=ID, object=activity.to_dict(embed=True))
  2049. post_to_outbox(accept)
  2050. elif activity.has_type(ap.ActivityType.UNDO):
  2051. obj = activity.get_object()
  2052. if obj.has_type(ap.ActivityType.LIKE):
  2053. back.inbox_undo_like(MY_PERSON, obj)
  2054. elif obj.has_type(ap.ActivityType.ANNOUNCE):
  2055. back.inbox_undo_announce(MY_PERSON, obj)
  2056. elif obj.has_type(ap.ActivityType.FOLLOW):
  2057. back.undo_new_follower(MY_PERSON, obj)
  2058. try:
  2059. invalidate_cache(activity)
  2060. except Exception:
  2061. app.logger.exception("failed to invalidate cache")
  2062. except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError):
  2063. app.logger.exception(f"no retry")
  2064. except Exception as err:
  2065. app.logger.exception(f"failed to cache attachments for {iri}")
  2066. raise TaskError() from err
  2067. return ""
  2068. def post_to_outbox(activity: ap.BaseActivity) -> str:
  2069. if activity.has_type(ap.CREATE_TYPES):
  2070. activity = activity.build_create()
  2071. # Assign create a random ID
  2072. obj_id = back.random_object_id()
  2073. activity.set_id(back.activity_url(obj_id), obj_id)
  2074. back.save(Box.OUTBOX, activity)
  2075. Tasks.cache_actor(activity.id)
  2076. Tasks.finish_post_to_outbox(activity.id)
  2077. return activity.id
  2078. def post_to_inbox(activity: ap.BaseActivity) -> None:
  2079. # Check for Block activity
  2080. actor = activity.get_actor()
  2081. if back.outbox_is_blocked(MY_PERSON, actor.id):
  2082. app.logger.info(
  2083. f"actor {actor!r} is blocked, dropping the received activity {activity!r}"
  2084. )
  2085. return
  2086. if back.inbox_check_duplicate(MY_PERSON, activity.id):
  2087. # The activity is already in the inbox
  2088. app.logger.info(f"received duplicate activity {activity!r}, dropping it")
  2089. back.save(Box.INBOX, activity)
  2090. Tasks.process_new_activity(activity.id)
  2091. app.logger.info(f"spawning task for {activity!r}")
  2092. Tasks.finish_post_to_inbox(activity.id)
  2093. def invalidate_cache(activity):
  2094. if activity.has_type(ap.ActivityType.LIKE):
  2095. if activity.get_object().id.startswith(BASE_URL):
  2096. DB.cache2.remove()
  2097. elif activity.has_type(ap.ActivityType.ANNOUNCE):
  2098. if activity.get_object().id.startswith(BASE_URL):
  2099. DB.cache2.remove()
  2100. elif activity.has_type(ap.ActivityType.UNDO):
  2101. DB.cache2.remove()
  2102. elif activity.has_type(ap.ActivityType.DELETE):
  2103. # TODO(tsileo): only invalidate if it's a delete of a reply
  2104. DB.cache2.remove()
  2105. elif activity.has_type(ap.ActivityType.UPDATE):
  2106. DB.cache2.remove()
  2107. elif activity.has_type(ap.ActivityType.CREATE):
  2108. note = activity.get_object()
  2109. in_reply_to = note.get_in_reply_to()
  2110. if not in_reply_to or in_reply_to.startswith(ID):
  2111. DB.cache2.remove()
  2112. # FIXME(tsileo): check if it's a reply of a reply
  2113. @app.route("/task/cache_attachments", methods=["POST"])
  2114. def task_cache_attachments():
  2115. task = p.parse(request)
  2116. app.logger.info(f"task={task!r}")
  2117. iri = task.payload
  2118. try:
  2119. activity = ap.fetch_remote_activity(iri)
  2120. app.logger.info(f"activity={activity!r}")
  2121. # Generates thumbnails for the actor's icon and the attachments if any
  2122. actor = activity.get_actor()
  2123. # Update the cached actor
  2124. DB.actors.update_one(
  2125. {"remote_id": iri},
  2126. {"$set": {"remote_id": iri, "data": actor.to_dict(embed=True)}},
  2127. upsert=True,
  2128. )
  2129. if actor.icon:
  2130. MEDIA_CACHE.cache(actor.icon["url"], Kind.ACTOR_ICON)
  2131. if activity.has_type(ap.ActivityType.CREATE):
  2132. for attachment in activity.get_object()._data.get("attachment", []):
  2133. if (
  2134. attachment.get("mediaType", "").startswith("image/")
  2135. or attachment.get("type") == ap.ActivityType.IMAGE.value
  2136. ):
  2137. try:
  2138. MEDIA_CACHE.cache_attachment2(attachment["url"], iri)
  2139. except ValueError:
  2140. app.logger.exception(f"failed to cache {attachment}")
  2141. app.logger.info(f"attachments cached for {iri}")
  2142. except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError):
  2143. app.logger.exception(f"dropping activity {iri}, no attachment caching")
  2144. except Exception as err:
  2145. app.logger.exception(f"failed to cache attachments for {iri}")
  2146. raise TaskError() from err
  2147. return ""
  2148. @app.route("/task/cache_actor", methods=["POST"])
  2149. def task_cache_actor() -> str:
  2150. task = p.parse(request)
  2151. app.logger.info(f"task={task!r}")
  2152. iri, also_cache_attachments = (
  2153. task.payload["iri"],
  2154. task.payload.get("also_cache_attachments", True),
  2155. )
  2156. try:
  2157. activity = ap.fetch_remote_activity(iri)
  2158. app.logger.info(f"activity={activity!r}")
  2159. if activity.has_type(ap.ActivityType.CREATE):
  2160. Tasks.fetch_og_meta(iri)
  2161. if activity.has_type([ap.ActivityType.LIKE, ap.ActivityType.ANNOUNCE]):
  2162. Tasks.cache_object(iri)
  2163. actor = activity.get_actor()
  2164. cache_actor_with_inbox = False
  2165. if activity.has_type(ap.ActivityType.FOLLOW):
  2166. if actor.id != ID:
  2167. # It's a Follow from the Inbox
  2168. cache_actor_with_inbox = True
  2169. else:
  2170. # It's a new following, cache the "object" (which is the actor we follow)
  2171. DB.activities.update_one(
  2172. {"remote_id": iri},
  2173. {
  2174. "$set": {
  2175. "meta.object": activitypub._actor_to_meta(
  2176. activity.get_object()
  2177. )
  2178. }
  2179. },
  2180. )
  2181. # Cache the actor info
  2182. DB.activities.update_one(
  2183. {"remote_id": iri},
  2184. {
  2185. "$set": {
  2186. "meta.actor": activitypub._actor_to_meta(
  2187. actor, cache_actor_with_inbox
  2188. )
  2189. }
  2190. },
  2191. )
  2192. app.logger.info(f"actor cached for {iri}")
  2193. if also_cache_attachments and activity.has_type(ap.ActivityType.CREATE):
  2194. Tasks.cache_attachments(iri)
  2195. except (ActivityGoneError, ActivityNotFoundError):
  2196. DB.activities.update_one({"remote_id": iri}, {"$set": {"meta.deleted": True}})
  2197. app.logger.exception(f"flagging activity {iri} as deleted, no actor caching")
  2198. except Exception as err:
  2199. app.logger.exception(f"failed to cache actor for {iri}")
  2200. raise TaskError() from err
  2201. return ""
  2202. @app.route("/task/process_new_activity", methods=["POST"]) # noqa:c901
  2203. def task_process_new_activity():
  2204. """Process an activity received in the inbox"""
  2205. task = p.parse(request)
  2206. app.logger.info(f"task={task!r}")
  2207. iri = task.payload
  2208. try:
  2209. activity = ap.fetch_remote_activity(iri)
  2210. app.logger.info(f"activity={activity!r}")
  2211. # Is the activity expected?
  2212. # following = ap.get_backend().following()
  2213. should_forward = False
  2214. should_delete = False
  2215. should_keep = False
  2216. tag_stream = False
  2217. if activity.has_type(ap.ActivityType.ANNOUNCE):
  2218. # FIXME(tsileo): Ensure it's follower and store into a "dead activities" DB
  2219. try:
  2220. activity.get_object()
  2221. tag_stream = True
  2222. if activity.get_object_id().startswith(BASE_URL):
  2223. should_keep = True
  2224. except (NotAnActivityError, BadActivityError):
  2225. app.logger.exception(f"failed to get announce object for {activity!r}")
  2226. # Most likely on OStatus notice
  2227. tag_stream = False
  2228. should_delete = True
  2229. except (ActivityGoneError, ActivityNotFoundError):
  2230. # The announced activity is deleted/gone, drop it
  2231. should_delete = True
  2232. elif activity.has_type(ap.ActivityType.FOLLOW):
  2233. # FIXME(tsileo): ensure it's a follow where the server is the object
  2234. should_keep = True
  2235. elif activity.has_type(ap.ActivityType.CREATE):
  2236. note = activity.get_object()
  2237. in_reply_to = note.get_in_reply_to()
  2238. # Make the note part of the stream if it's not a reply, or if it's a local reply
  2239. if not in_reply_to or in_reply_to.startswith(ID):
  2240. tag_stream = True
  2241. # FIXME(tsileo): check for direct addressing in the to, cc, bcc... fields
  2242. if (in_reply_to and in_reply_to.startswith(ID)) or note.has_mention(ID):
  2243. should_keep = True
  2244. if in_reply_to:
  2245. try:
  2246. reply = ap.fetch_remote_activity(note.get_in_reply_to())
  2247. if (
  2248. reply.id.startswith(ID) or reply.has_mention(ID)
  2249. ) and activity.is_public():
  2250. # The reply is public "local reply", forward the reply (i.e. the original activity) to the
  2251. # original recipients
  2252. should_forward = True
  2253. should_keep = True
  2254. except NotAnActivityError:
  2255. # Most likely a reply to an OStatus notce
  2256. should_delete = True
  2257. # (partial) Ghost replies handling
  2258. # [X] This is the first time the server has seen this Activity.
  2259. should_forward = False
  2260. local_followers = ID + "/followers"
  2261. for field in ["to", "cc"]:
  2262. if field in activity._data:
  2263. if local_followers in activity._data[field]:
  2264. # [X] The values of to, cc, and/or audience contain a Collection owned by the server.
  2265. should_forward = True
  2266. # [X] The values of inReplyTo, object, target and/or tag are objects owned by the server
  2267. if not (in_reply_to and in_reply_to.startswith(ID)):
  2268. should_forward = False
  2269. elif activity.has_type(ap.ActivityType.DELETE):
  2270. note = DB.activities.find_one(
  2271. {"activity.object.id": activity.get_object().id}
  2272. )
  2273. if note and note["meta"].get("forwarded", False):
  2274. # If the activity was originally forwarded, forward the delete too
  2275. should_forward = True
  2276. elif activity.has_type(ap.ActivityType.LIKE):
  2277. if activity.get_object_id().startswith(BASE_URL):
  2278. should_keep = True
  2279. else:
  2280. # We only want to keep a like if it's a like for a local activity
  2281. # (Pleroma relay the likes it received, we don't want to store them)
  2282. should_delete = True
  2283. if should_forward:
  2284. app.logger.info(f"will forward {activity!r} to followers")
  2285. Tasks.forward_activity(activity.id)
  2286. if should_delete:
  2287. app.logger.info(f"will soft delete {activity!r}")
  2288. app.logger.info(f"{iri} tag_stream={tag_stream}")
  2289. DB.activities.update_one(
  2290. {"remote_id": activity.id},
  2291. {
  2292. "$set": {
  2293. "meta.keep": should_keep,
  2294. "meta.stream": tag_stream,
  2295. "meta.forwarded": should_forward,
  2296. "meta.deleted": should_delete,
  2297. }
  2298. },
  2299. )
  2300. app.logger.info(f"new activity {iri} processed")
  2301. if not should_delete and not activity.has_type(ap.ActivityType.DELETE):
  2302. Tasks.cache_actor(iri)
  2303. except (ActivityGoneError, ActivityNotFoundError):
  2304. app.logger.exception(f"dropping activity {iri}, skip processing")
  2305. return ""
  2306. except Exception as err:
  2307. app.logger.exception(f"failed to process new activity {iri}")
  2308. raise TaskError() from err
  2309. return ""
  2310. @app.route("/task/forward_activity", methods=["POST"])
  2311. def task_forward_activity():
  2312. task = p.parse(request)
  2313. app.logger.info(f"task={task!r}")
  2314. iri = task.payload
  2315. try:
  2316. activity = ap.fetch_remote_activity(iri)
  2317. recipients = back.followers_as_recipients()
  2318. app.logger.debug(f"Forwarding {activity!r} to {recipients}")
  2319. activity = ap.clean_activity(activity.to_dict())
  2320. payload = json.dumps(activity)
  2321. for recp in recipients:
  2322. app.logger.debug(f"forwarding {activity!r} to {recp}")
  2323. Tasks.post_to_remote_inbox(payload, recp)
  2324. except Exception as err:
  2325. app.logger.exception("task failed")
  2326. raise TaskError() from err
  2327. return ""
  2328. @app.route("/task/post_to_remote_inbox", methods=["POST"])
  2329. def task_post_to_remote_inbox():
  2330. """Post an activity to a remote inbox."""
  2331. task = p.parse(request)
  2332. app.logger.info(f"task={task!r}")
  2333. payload, to = task.payload["payload"], task.payload["to"]
  2334. try:
  2335. app.logger.info("payload=%s", payload)
  2336. app.logger.info("generating sig")
  2337. signed_payload = json.loads(payload)
  2338. # Don't overwrite the signature if we're forwarding an activity
  2339. if "signature" not in signed_payload:
  2340. generate_signature(signed_payload, KEY)
  2341. app.logger.info("to=%s", to)
  2342. resp = requests.post(
  2343. to,
  2344. data=json.dumps(signed_payload),
  2345. auth=SIG_AUTH,
  2346. headers={
  2347. "Content-Type": HEADERS[1],
  2348. "Accept": HEADERS[1],
  2349. "User-Agent": USER_AGENT,
  2350. },
  2351. )
  2352. app.logger.info("resp=%s", resp)
  2353. app.logger.info("resp_body=%s", resp.text)
  2354. resp.raise_for_status()
  2355. except HTTPError as err:
  2356. app.logger.exception("request failed")
  2357. if 400 >= err.response.status_code >= 499:
  2358. app.logger.info("client error, no retry")
  2359. return ""
  2360. raise TaskError() from err
  2361. except Exception as err:
  2362. app.logger.exception("task failed")
  2363. raise TaskError() from err
  2364. return ""
  2365. @app.route("/task/update_question", methods=["POST"])
  2366. def task_update_question():
  2367. """Post an activity to a remote inbox."""
  2368. task = p.parse(request)
  2369. app.logger.info(f"task={task!r}")
  2370. iri = task.payload
  2371. try:
  2372. app.logger.info(f"Updating question {iri}")
  2373. # TODO(tsileo): sends an Update with the question/iri as an actor, with the updated stats (LD sig will fail?)
  2374. # but to who? followers and people who voted? but this must not be visible right?
  2375. # also sends/trigger a notification when a poll I voted for ends like Mastodon?
  2376. except HTTPError as err:
  2377. app.logger.exception("request failed")
  2378. if 400 >= err.response.status_code >= 499:
  2379. app.logger.info("client error, no retry")
  2380. return ""
  2381. raise TaskError() from err
  2382. except Exception as err:
  2383. app.logger.exception("task failed")
  2384. raise TaskError() from err
  2385. return ""
  2386. @app.route("/task/cleanup", methods=["POST"])
  2387. def task_cleanup():
  2388. task = p.parse(request)
  2389. app.logger.info(f"task={task!r}")
  2390. p.push({}, "/task/cleanup_part_1")
  2391. return ""
  2392. @app.route("/task/cleanup_part_1", methods=["POST"])
  2393. def task_cleanup_part_1():
  2394. task = p.parse(request)
  2395. app.logger.info(f"task={task!r}")
  2396. d = (datetime.utcnow() - timedelta(days=15)).strftime("%Y-%m-%d")
  2397. # (We keep Follow and Accept forever)
  2398. # Announce and Like cleanup
  2399. for ap_type in [ActivityType.ANNOUNCE, ActivityType.LIKE]:
  2400. # Migrate old (before meta.keep activities on the fly)
  2401. DB.activities.update_many(
  2402. {
  2403. "box": Box.INBOX.value,
  2404. "type": ap_type.value,
  2405. "meta.keep": {"$exists": False},
  2406. "activity.object": {"$regex": f"^{BASE_URL}"},
  2407. },
  2408. {"$set": {"meta.keep": True}},
  2409. )
  2410. DB.activities.update_many(
  2411. {
  2412. "box": Box.INBOX.value,
  2413. "type": ap_type.value,
  2414. "meta.keep": {"$exists": False},
  2415. "activity.object.id": {"$regex": f"^{BASE_URL}"},
  2416. },
  2417. {"$set": {"meta.keep": True}},
  2418. )
  2419. DB.activities.update_many(
  2420. {
  2421. "box": Box.INBOX.value,
  2422. "type": ap_type.value,
  2423. "meta.keep": {"$exists": False},
  2424. },
  2425. {"$set": {"meta.keep": False}},
  2426. )
  2427. # End of the migration
  2428. # Delete old activities
  2429. DB.activities.delete_many(
  2430. {
  2431. "box": Box.INBOX.value,
  2432. "type": ap_type.value,
  2433. "meta.keep": False,
  2434. "activity.published": {"$lt": d},
  2435. }
  2436. )
  2437. # And delete the soft-deleted one
  2438. DB.activities.delete_many(
  2439. {
  2440. "box": Box.INBOX.value,
  2441. "type": ap_type.value,
  2442. "meta.keep": False,
  2443. "meta.deleted": True,
  2444. }
  2445. )
  2446. # Create cleanup (more complicated)
  2447. # The one that mention our actor
  2448. DB.activities.update_many(
  2449. {
  2450. "box": Box.INBOX.value,
  2451. "meta.keep": {"$exists": False},
  2452. "activity.object.tag.href": {"$regex": f"^{BASE_URL}"},
  2453. },
  2454. {"$set": {"meta.keep": True}},
  2455. )
  2456. DB.activities.update_many(
  2457. {
  2458. "box": Box.REPLIES.value,
  2459. "meta.keep": {"$exists": False},
  2460. "activity.tag.href": {"$regex": f"^{BASE_URL}"},
  2461. },
  2462. {"$set": {"meta.keep": True}},
  2463. )
  2464. # The replies of the outbox
  2465. DB.activities.update_many(
  2466. {"meta.thread_root_parent": {"$regex": f"^{BASE_URL}"}},
  2467. {"$set": {"meta.keep": True}},
  2468. )
  2469. # Track all the threads we participated
  2470. keep_threads = []
  2471. for data in DB.activities.find(
  2472. {
  2473. "box": Box.OUTBOX.value,
  2474. "type": ActivityType.CREATE.value,
  2475. "meta.thread_root_parent": {"$exists": True},
  2476. }
  2477. ):
  2478. keep_threads.append(data["meta"]["thread_root_parent"])
  2479. for root_parent in set(keep_threads):
  2480. DB.activities.update_many(
  2481. {"meta.thread_root_parent": root_parent}, {"$set": {"meta.keep": True}}
  2482. )
  2483. DB.activities.update_many(
  2484. {
  2485. "box": {"$in": [Box.REPLIES.value, Box.INBOX.value]},
  2486. "meta.keep": {"$exists": False},
  2487. },
  2488. {"$set": {"meta.keep": False}},
  2489. )
  2490. DB.activities.update_many(
  2491. {
  2492. "box": Box.OUTBOX.value,
  2493. "type": {"$in": [ActivityType.CREATE.value, ActivityType.ANNOUNCE.value]},
  2494. "meta.public": {"$exists": False},
  2495. },
  2496. {"$set": {"meta.public": True}},
  2497. )
  2498. p.push({}, "/task/cleanup_part_2")
  2499. return "OK"
  2500. @app.route("/task/cleanup_part_2", methods=["POST"])
  2501. def task_cleanup_part_2():
  2502. task = p.parse(request)
  2503. app.logger.info(f"task={task!r}")
  2504. d = (datetime.utcnow() - timedelta(days=15)).strftime("%Y-%m-%d")
  2505. # Go over the old Create activities
  2506. for data in DB.activities.find(
  2507. {
  2508. "box": Box.INBOX.value,
  2509. "type": ActivityType.CREATE.value,
  2510. "meta.keep": False,
  2511. "activity.published": {"$lt": d},
  2512. }
  2513. ).limit(5000):
  2514. # Delete the cached attachment/
  2515. for grid_item in MEDIA_CACHE.fs.find({"remote_id": data["remote_id"]}):
  2516. MEDIA_CACHE.fs.delete(grid_item._id)
  2517. DB.activities.delete_one({"_id": data["_id"]})
  2518. p.push({}, "/task/cleanup_part_3")
  2519. return "OK"
  2520. @app.route("/task/cleanup_part_3", methods=["POST"])
  2521. def task_cleanup_part_3():
  2522. task = p.parse(request)
  2523. app.logger.info(f"task={task!r}")
  2524. d = (datetime.utcnow() - timedelta(days=15)).strftime("%Y-%m-%d")
  2525. # Delete old replies we don't care about
  2526. DB.activities.delete_many(
  2527. {"box": Box.REPLIES.value, "meta.keep": False, "activity.published": {"$lt": d}}
  2528. )
  2529. # Remove all the attachments no tied to a remote_id (post celery migration)
  2530. for grid_item in MEDIA_CACHE.fs.find(
  2531. {"kind": {"$in": ["og", "attachment"]}, "remote_id": {"$exists": False}}
  2532. ):
  2533. MEDIA_CACHE.fs.delete(grid_item._id)
  2534. # TODO(tsileo): iterator over "actor_icon" and look for unused one in a separate task
  2535. return "OK"