123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547 |
- import sys, os, traceback
- import dateutil.parser
- from time import strftime, localtime
- # returns html-formatted string
- def make_buttons(btn_dict, msg_id):
- buttons = "<div class=\"buttons\">"
- fmt = "<a href=\"%s\">[%s]</a>"
- for key in btn_dict:
- url = btn_dict[key]
- if url[-1] == '=':
- # then interpret it as a query string
- url += str(msg_id)
- buttons += fmt % (url,key)
- buttons += "</div>"
- return buttons
- # apply div classes for use with .css
- def make_post(num, timestamp, conf, msg):
- fmt = conf["format"]
- if "buttons" in conf:
- b = make_buttons(conf["buttons"], num)
- else:
- b = ""
- return fmt.format(
- __timestamp__=timestamp, __num__=num, __msg__=msg, __btn__=b)
- def make_gallery(indices, w, conf=None):
- tag = []
- if indices == []:
- return tag
- template = '''
- <div class=\"panel\">
- <a href=\"%s\"><img src=\"%s\" class=\"embed\"></a>
- </div>
- '''
- tag.append("<div class=\"gallery\">")
- for index in reversed(indices):
- image = w.pop(index)
- is_path = image[0] == '.' or image[0] == '/'
- if conf and not is_path:
- thumb = "%s/%s" % (conf["path_to_thumb"], image)
- full = "%s/%s" % (conf["path_to_fullsize"], image)
- tag.append(template % (full,thumb))
- continue
- elif not conf and not is_path:
- msg = ("Warning: no path defined for image %s!" % image)
- print(msg,file=sys.stderr)
- else:
- pass
- tag.append(template % (image, image))
- tag.append("</div>")
- return tag
- # apply basic HTML formatting - only div class here is gallery
- from html import escape
- def markup(message, config):
- def is_image(s, image_formats):
- l = s.rsplit('.', maxsplit=1)
- if len(l) < 2:
- return False
- # Python 3.10.5
- # example result that had to be filtered:
- # string: started.
- # result: ['started', '']
- if l[1] == str(''):
- return False
- #print(s, l, file=sys.stderr)
- if l[1] in image_formats:
- return True
- return False
- result = 0
- tagged = ""
- # support multiple images (gallery style)
- tags = [] # list of strings
- output = []
- gallery = []
- ptags = config["tag_paragraphs"]
- sep = ""
- if "line_separator" in config:
- sep = config["line_separator"]
- for line in message:
- images = [] # list of integers
- words = line.split()
- for i in range(len(words)):
- word = words[i]
- # don't help people click http
- if word.find("src=") == 0 or word.find("href=") == 0:
- continue
- elif word.find("https://") != -1:
- w = escape(word)
- new_word = ("<a href=\"%s\">%s</a>") % (w, w)
- words[i] = new_word
- elif word.find("#") != -1 and len(word) > 1:
- # split by unicode blank character if present
- # allows tagging such as #fanfic|tion
- w = word.split(chr(8206))
- # w[0] is the portion closest to the #
- tags.append(w[0])
- new_word = "<span class=\"hashtag\">%s</span>" % (w[0])
- if len(w) > 1:
- new_word += w[1]
- words[i] = new_word
- elif is_image(word, config["accepted_images"]):
- images.append(i)
- if len(images) > 0:
- # function invokes pop() which modifies list 'words'
- gc = config["gallery"] if "gallery" in config else None
- gallery = make_gallery(images, words, gc)
- if ptags and len(words) > 0:
- words.insert(0,"<p>")
- words.append("</p>")
- output.append(" ".join(words))
- # avoid paragraph with an image gallery
- if len(gallery) > 0:
- output.append("".join(gallery))
- gallery = []
- return sep.join(output), tags
- class Post:
- def __init__(self, ts, msg):
- self.timestamp = ts.strip() # string
- self.message = msg # list
- # format used for sorting
- def get_epoch_time(self):
- t = dateutil.parser.parse(self.timestamp)
- return int(t.timestamp())
- # format used for display
- def get_short_time(self):
- t = dateutil.parser.parse(self.timestamp)
- return t.strftime("%y %b %d")
- def parse_txt(filename):
- content = []
- with open(filename, 'r') as f:
- content = f.readlines()
- posts = [] # list of posts - same order as file
- message = [] # list of lines
- # {-1 = init;; 0 = timestamp is next, 1 = message is next}
- state = -1
- timestamp = ""
- for line in content:
- if state == -1:
- state = 0
- continue
- elif state == 0:
- timestamp = line
- state = 1
- elif state == 1:
- if len(line) > 1:
- message.append(line)
- else:
- p = Post(timestamp, message)
- posts.append(p)
- # reset
- message = []
- state = 0
- return posts
- def get_posts(posts, config):
- taginfos = []
- tagcloud = dict() # (tag, count)
- tagged = dict() # (tag, index of message)
- total = len(posts)
- count = total
- index = count # - 1
- timeline = []
- btns = None
- for post in posts:
- markedup, tags = markup(post.message, config)
- count -= 1
- index -= 1
- timeline.append(
- make_post(count, post.get_short_time(), config, markedup)
- )
- for tag in tags:
- if tagcloud.get(tag) == None:
- tagcloud[tag] = 0
- tagcloud[tag] += 1
- if tagged.get(tag) == None:
- tagged[tag] = []
- tagged[tag].append(index)
- return timeline, tagcloud, tagged
- def make_tagcloud(d, rell):
- sorted_d = {k: v for k,
- v in sorted(d.items(),
- key=lambda item: -item[1])}
- output = []
- fmt = "<span class=\"hashtag\"><a href=\"%s\">%s(%i)</a></span>"
- #fmt = "<span class=\"hashtag\">%s(%i)</span>"
- for key in d.keys():
- link = rell % key[1:]
- output.append(fmt % (link, key, d[key]))
- return output
- class Paginator:
- def __init__(self, post_count, ppp, loc=None):
- if post_count <= 0:
- raise Exception
- if not loc:
- loc = "pages"
- if loc and not os.path.exists(loc):
- os.mkdir(loc)
- self.TOTAL_POSTS = post_count
- self.PPP = ppp
- self.TOTAL_PAGES = int(post_count/self.PPP)
- self.SUBDIR = loc
- self.FILENAME = "%i.html"
- self.written = []
- def toc(self, current_page=None, path=None): #style 1
- if self.TOTAL_PAGES < 1:
- return "[no pages]"
- if path == None:
- path = self.SUBDIR
- # For page 'n' do not create an anchor tag
- fmt = "<a href=\"%s\">[%i]</a>" #(filename, page number)
- anchors = []
- for i in reversed(range(self.TOTAL_PAGES)):
- if i != current_page:
- x = path + "/" + (self.FILENAME % i)
- anchors.append(fmt % (x, i))
- else:
- anchors.append("<b>[%i]</b>" % i)
- return "\n".join(anchors)
- # makes one page
- def singlepage(self, template, tagcloud, timeline_, i=None, p=None):
- tc = "\n".join(tagcloud)
- tl = "\n\n".join(timeline_)
- toc = self.toc(i, p)
- return template.format(
- postcount=self.TOTAL_POSTS, tags=tc, pages=toc, timeline=tl
- )
- def paginate(self, template, tagcloud, timeline, is_tagline=False):
- outfile = "%s/%s" % (self.SUBDIR, self.FILENAME)
- timeline.reverse() # reorder from oldest to newest
- start = 0
- for i in range(start, self.TOTAL_PAGES):
- fn = outfile % i
- with open(fn, 'w') as f:
- self.written.append(fn)
- prev = self.PPP * i
- curr = self.PPP * (i+1)
- sliced = timeline[prev:curr]
- sliced.reverse()
- f.write(self.singlepage(template, tagcloud, sliced, i, "."))
- return
- import argparse
- if __name__ == "__main__":
- def sort(filename):
- def export(new_content, new_filename):
- with open(new_filename, 'w') as f:
- print(file=f)
- for post in new_content:
- print(post.timestamp, file=f)
- print("".join(post.message), file=f)
- return
- posts = parse_txt(filename)
- posts.sort(key=lambda e: e.get_epoch_time())
- outfile = ("%s.sorted" % filename)
- print("Sorted text written to ", outfile)
- export(reversed(posts), outfile)
- def get_args():
- p = argparse.ArgumentParser()
- p.add_argument("template", help="an html template file")
- p.add_argument("content", help="text file for microblog content")
- p.add_argument("--sort", \
- help="sorts content from oldest to newest"
- " (this is a separate operation from page generation)", \
- action="store_true")
- p.add_argument("--skip-fetch", \
- help="skips fetching profile data from remote sources;"
- " has no effect if webring is not enabled",\
- action="store_true")
- args = p.parse_args()
- if args.sort:
- sort(args.content)
- exit()
- return args.template, args.content, args.skip_fetch
- # assume relative path
- def demote_css(template, css_list, level=1):
- prepend = ""
- if level == 1:
- prepend = '.'
- else:
- for i in range(level):
- prepend = ("../%s" % prepend)
- tpl = template
- for css in css_list:
- tpl = tpl.replace(css, ("%s%s" % (prepend, css) ))
- return tpl
- def writepage(template, timeline, tagcloud, config, subdir = None):
- count = len(timeline)
- html = ""
- with open(template,'r') as f:
- html = f.read()
- try:
- p = config["postsperpage"]
- pagectrl = Paginator(count, p, subdir)
- except ZeroDivisionError as e:
- print("error: ",e, ". check 'postsperpage' in config", file=sys.stderr)
- exit()
- except Exception as e:
- print("error: ",e, ("(number of posts = %i)" % count), file=sys.stderr)
- exit()
- latest = timeline[:pagectrl.PPP]
- link_from_top = "./tags/%s/latest.html"
- link_from_subdir = "../tags/%s/latest.html"
- link_from_tagdir = "../%s/latest.html"
- cloud = ""
- level = 1
- is_tagline = False
- if subdir == None: # if top level page
- cloud = make_tagcloud(tagcloud, link_from_top)
- print(pagectrl.singlepage(html, cloud, latest))
- cloud = make_tagcloud(tagcloud, link_from_subdir)
- else:
- if subdir != "webring": # timelines per tag
- is_tagline = True
- level += 1
- cloud = make_tagcloud(tagcloud, link_from_tagdir)
- else:
- cloud = make_tagcloud(tagcloud, link_from_subdir)
- demoted = demote_css(html, config["relative_css"], level)
- filename = "%s/latest.html" % subdir
- with open(filename, 'w') as f: # landing page for tag
- pagectrl.written.append(filename)
- page = pagectrl.singlepage(demoted, cloud, latest, p=".")
- f.write(page)
- pagectrl.paginate(
- demote_css(html, config["relative_css"], level),
- cloud, timeline, is_tagline)
- return pagectrl.written
- import toml
- def load_settings():
- s = dict()
- filename = "settings.toml"
- if os.path.exists(filename):
- with open(filename, 'r') as f:
- s = toml.loads(f.read())
- else:
- s = None
- return s
- import json
- def export_profile(post_count, last_update, config):
- if "profile" not in config:
- return
- p = config["profile"]
- p["post-count"] = post_count
- p["last-updated"] = last_update
- if "username" not in p or "url" not in p:
- print("Warning: no profile exported", file=sys.stderr)
- return
- with open(config["file_output"], 'w') as f:
- print(json.dumps(p), file=f)
- def get_webring(f_cfg):
- import pycurl
- from io import BytesIO
- def get_proxy():
- proxy = ""
- if "http_proxy" in os.environ:
- proxy = os.environ["http_proxy"]
- elif "https_proxy" in os.environ:
- proxy = os.environ["https_proxy"]
- host = proxy[proxy.rfind('/') + 1: proxy.rfind(':')]
- port = proxy[proxy.rfind(':') + 1:]
- foo = proxy.find("socks://") >= 0 or proxy.find("socks5h://")
- return host, int(port), foo
- def fetch(url_list):
- curl = pycurl.Curl()
- if "http_proxy" in os.environ or "https_proxy" in os.environ:
- hostname, port_no, is_socks = get_proxy()
- curl.setopt(pycurl.PROXY, hostname)
- curl.setopt(pycurl.PROXYPORT, port_no)
- if is_socks:
- curl.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS5_HOSTNAME)
- datum = []
- meta = []
- for url in url_list:
- buf = BytesIO()
- curl.setopt(curl.WRITEDATA, buf)
- curl.setopt(pycurl.URL, url)
- try:
- curl.perform()
- datum.append(buf)
- meta.append(curl.getinfo(curl.CONTENT_TYPE))
- except pycurl.error as e:
- print(e,": ", url, file=sys.stderr)
- # print(buf.getvalue(),"\n\t", curl.getinfo(curl.CONTENT_TYPE), file=sys.stderr)
- curl.close()
- assert(len(datum) == len(meta))
- return datum, meta
- def to_json(curl_outs):
- json_objs = []
- for buf in curl_outs:
- try:
- json_objs.append(json.loads(buf.getvalue()))
- except Exception as e:
- print(e)
- return json_objs
- def render(profiles, template):
- rendered = []
- SHORT_BIO_LIMIT = 150
- for profile in profiles:
- try:
- epoch_timestamp = profile["last-updated"]
- if not isinstance(epoch_timestamp, int):
- epoch_timestamp = 0
- post_count = profile["post-count"]
- if not isinstance(post_count, int):
- post_count = 0
- self_desc = profile["short-bio"]
- if len(profile["short-bio"]) >= SHORT_BIO_LIMIT:
- self_desc = profile["short-bio"][:SHORT_BIO_LIMIT] + "..."
- foo = template.format(
- __avatar__=escape(profile["avatar"]),
- __handle__=escape(profile["username"]),
- __url__=escape(profile["url"]),
- __post_count__ = post_count,
- __shortbio__= escape(self_desc),
- __lastupdated__= strftime(
- "%Y %b %d", localtime(epoch_timestamp)) )
- rendered.append(foo)
- except KeyError as e:
- print("remote profile is missing key: ", e, file=sys.stderr)
- print("\tsource: ", profile, file=sys.stderr)
- return rendered
- def get_avatars(profiles, save_path, img_src):
- import hashlib
- imgs, info = fetch([p["avatar"] for p in profiles])
- length = len(imgs)
- if length != len(profiles) or length == 0:
- print("error in retrieving images", file=sys.stderr)
- return
- for i in range(0,length):
- content_type = info[i].split('/')
- ext = content_type.pop()
- if content_type.pop() != "image":
- print("\tskip: not an image", file=sys.stderr)
- continue
- data = imgs[i].getvalue()
- h = hashlib.sha1(data).hexdigest()
- filename = "%s.%s" % (h, ext)
- path = "%s/%s" % (save_path, filename)
- profiles[i]["avatar"] = "%s/%s" % (img_src, filename)
- if not os.path.isfile(path):
- with open(path, "wb") as f:
- f.write(data)
- j, m = fetch(f_cfg["list"])
- list_of_json_objs = to_json(j)
- if list_of_json_objs == []:
- print("no remote profiles loaded", file=sys.stderr)
- return []
- if f_cfg["internal-avatars"]["enabled"]:
- a = f_cfg["internal-avatars"]["local_path_to_avatars"]
- b = f_cfg["internal-avatars"]["path_to_avatars"]
- get_avatars(list_of_json_objs, a, b)
- try:
- list_of_json_objs.sort(key=lambda e: e["last-updated"], reverse=True)
- except KeyError: pass
- return render(list_of_json_objs, f_cfg["format"])
- def main():
- tpl, content, skip_fetch = get_args()
- cfg = load_settings()
- if cfg == None:
- print("exit: no settings.toml found.", file=sys.stderr)
- return
- if "post" not in cfg:
- print("exit: table 'post' absent in settings.toml", file=sys.stderr)
- return
- if "page" not in cfg:
- print("exit: table 'page' absent in settings.toml", file=sys.stderr)
- return
- p = parse_txt(content)
- tl, tc, tg = get_posts(p, cfg["post"])
- if tl == []:
- return
- # main timeline
- updated = []
- updated += writepage(tpl, tl, tc, cfg["page"])
- # timeline per tag
- if tc != dict() and tg != dict():
- if not os.path.exists("tags"):
- os.mkdir("tags")
- for key in tg.keys():
- tagline = []
- for index in tg[key]:
- tagline.append(tl[index])
- # [1:] means to omit hashtag from dir name
- updated += writepage(
- tpl, tagline, tc, cfg["page"], \
- subdir="tags/%s" % key[1:] \
- )
- if "webring" in cfg:
- if cfg["webring"]["enabled"] == True:
- export_profile(
- len(p), p[0].get_epoch_time(), cfg["webring"] )
- if not skip_fetch:
- fellows = get_webring(cfg["webring"]["following"] )
- if fellows != []:
- updated += writepage(
- tpl, fellows, tc, cfg["page"], subdir="webring")
- with open("updatedfiles.txt", 'w') as f:
- for filename in updated:
- print(filename, file=f) # sys.stderr)
- if "latestpage" in cfg:
- print(cfg["latestpage"], file=f)
- if "latestpages" in cfg:
- for page in cfg["latestpages"]:
- print(page, file=f)
- try:
- main()
- except KeyError as e:
- traceback.print_exc()
- print("\n\tA key may be missing from your settings file.", file=sys.stderr)
- except dateutil.parser._parser.ParserError:
- traceback.print_exc()
- print("\n\tFailed to interpret a date from string..",
- "\n\tYour file of posts may be malformed.",
- "\n\tCheck if your file starts with a line break.", file=sys.stderr)
- except toml.decoder.TomlDecodeError:
- traceback.print_exc()
- print("\n\tYour configuration file is malformed.")
- except FileNotFoundError as e:
- traceback.print_exc()
- print("\n\tA potential cause is attempting to save a file to a folder that does not exist.")
|