#!/usr/bin/env python3 import requests import hashlib import re from json import JSONDecoder from bs4 import BeautifulSoup import time import asyncio import logging from slixmpp import ClientXMPP, JID currentHTMLHash = "268482119524892ebf8a1efa6c2d066e08a4d7fa42d1d703cb00c25f1ef96c5b" currentJSHash = "96e0c20d1da2808dcb76a3e6824441fb377fbc533b4c240f811ff17abb8178b9" jsValues = ['de', 'en', 'hb', 'bhv', 'none', 'jj', 'az'] client = ClientXMPP("account@chat.sum7.eu", "PASSWORD") listSubscriber = [ { "jid": JID("genofire@chat.sum7.eu"), "data": { "dateOfBirth":"1970-01-02", "email":"a@example.org", "repeatEmail":"a@example.org", "firstName":"Geno", "lastName":"Fire", "location":"hb", "zip":"28000" } }, {"jid": JID("genofire@meckerspace.de") } ] def checkHTML(notify = print, jsFile = print): res = requests.get("https://impfzentrum.bremen.de/") hashV = hashlib.sha256(res.text.encode()).hexdigest() if hashV != currentHTMLHash: notify("new index.html found with hash {}".format(hashV)) soup = BeautifulSoup(res.text, 'html.parser') for s in soup.find_all('script'): jsFile(s.get("src"), notify) def checkJS(file, notify = print): res = requests.get("https://impfzentrum.bremen.de/"+file) hashV = hashlib.sha256(res.text.encode()).hexdigest() if hashV != currentJSHash: notify("new js with path {} and hash {} found".format(file, hashV)) p = re.findall(r".?value.?:\s*\"(.{1,5})\"", res.text, re.M) newP = [x for x in p if x not in jsValues] if len(newP) == 1 : handleVac(newP[0], notify) elif len(newP) > 1: notify("error on finding new vacation {} - please go yourself to https://impfzentrum.bremen.de/registration/new", p) def handleVac(v, notify = print): notify("new vacation is {}".format(v)) for req in listSubscriber: if "data" not in req: continue data = req["data"] data["preference"] = v res = requests.post("https://api.schedule-bremen.dgshost.de/api/frontend/registration", data) notify("try of registration - statuscode {}, if correct result should be curly brackets and statuscode 201 else please go yourself to https://impfzentrum.bremen.de/registration/new: {}".format(res.status_code, res.text), jids=[req["jid"]]) def notify(text, jids=[x["jid"] for x in listSubscriber]): global client for jid in jids: #print("{}: {}".format(jid, text)) client.send_message(jid, text, mtype="chat") def session_start(event): global client client.send_presence() client.get_roster() if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG, format='%(levelname)-8s %(message)s') client.add_event_handler("session_start", session_start) client.connect() client.process(timeout=1) notify("startup hb vecation bot", jids=[listSubscriber[0]["jid"]]) while True: checkHTML(jsFile = checkJS, notify=notify) client.process(timeout=1) #time.sleep(1) notify("stop hb vecation bot", jids=[listSubscriber[0]["jid"]])