100 lines
3.1 KiB
Python
100 lines
3.1 KiB
Python
|
#!/usr/bin/env python3
|
||
|
|
||
|
import requests
|
||
|
import hashlib
|
||
|
import re
|
||
|
from json import JSONDecoder
|
||
|
from bs4 import BeautifulSoup
|
||
|
|
||
|
import time
|
||
|
|
||
|
import asyncio
|
||
|
import logging
|
||
|
from slixmpp import ClientXMPP, JID
|
||
|
|
||
|
currentHTMLHash = "268482119524892ebf8a1efa6c2d066e08a4d7fa42d1d703cb00c25f1ef96c5b"
|
||
|
currentJSHash = "96e0c20d1da2808dcb76a3e6824441fb377fbc533b4c240f811ff17abb8178b9"
|
||
|
|
||
|
jsValues = ['de', 'en', 'hb', 'bhv', 'none', 'jj', 'az']
|
||
|
|
||
|
client = ClientXMPP("account@chat.sum7.eu", "PASSWORD")
|
||
|
|
||
|
listSubscriber = [
|
||
|
{ "jid": JID("genofire@chat.sum7.eu"),
|
||
|
"data": {
|
||
|
"dateOfBirth":"1970-01-02",
|
||
|
"email":"a@example.org",
|
||
|
"repeatEmail":"a@example.org",
|
||
|
"firstName":"Geno",
|
||
|
"lastName":"Fire",
|
||
|
"location":"hb",
|
||
|
"zip":"28000"
|
||
|
}
|
||
|
},
|
||
|
{"jid": JID("genofire@meckerspace.de")
|
||
|
}
|
||
|
]
|
||
|
|
||
|
def checkHTML(notify = print, jsFile = print):
|
||
|
res = requests.get("https://impfzentrum.bremen.de/")
|
||
|
hashV = hashlib.sha256(res.text.encode()).hexdigest()
|
||
|
if hashV != currentHTMLHash:
|
||
|
notify("new index.html found with hash {}".format(hashV))
|
||
|
soup = BeautifulSoup(res.text, 'html.parser')
|
||
|
for s in soup.find_all('script'):
|
||
|
jsFile(s.get("src"), notify)
|
||
|
|
||
|
|
||
|
def checkJS(file, notify = print):
|
||
|
res = requests.get("https://impfzentrum.bremen.de/"+file)
|
||
|
hashV = hashlib.sha256(res.text.encode()).hexdigest()
|
||
|
if hashV != currentJSHash:
|
||
|
notify("new js with path {} and hash {} found".format(file, hashV))
|
||
|
p = re.findall(r".?value.?:\s*\"(.{1,5})\"", res.text, re.M)
|
||
|
newP = [x for x in p if x not in jsValues]
|
||
|
if len(newP) == 1 :
|
||
|
handleVac(newP[0], notify)
|
||
|
elif len(newP) > 1:
|
||
|
notify("error on finding new vacation {} - please go yourself to https://impfzentrum.bremen.de/registration/new", p)
|
||
|
|
||
|
def handleVac(v, notify = print):
|
||
|
notify("new vacation is {}".format(v))
|
||
|
for req in listSubscriber:
|
||
|
if "data" not in req:
|
||
|
continue
|
||
|
data = req["data"]
|
||
|
data["preference"] = v
|
||
|
res = requests.post("https://api.schedule-bremen.dgshost.de/api/frontend/registration", data)
|
||
|
notify("try of registration - statuscode {}, if correct result should be curly brackets and statuscode 201 else please go yourself to https://impfzentrum.bremen.de/registration/new: {}".format(res.status_code, res.text), jids=[req["jid"]])
|
||
|
|
||
|
def notify(text, jids=[x["jid"] for x in listSubscriber]):
|
||
|
global client
|
||
|
for jid in jids:
|
||
|
#print("{}: {}".format(jid, text))
|
||
|
client.send_message(jid, text, mtype="chat")
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
def session_start(event):
|
||
|
global client
|
||
|
client.send_presence()
|
||
|
client.get_roster()
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
logging.basicConfig(level=logging.DEBUG,
|
||
|
format='%(levelname)-8s %(message)s')
|
||
|
|
||
|
client.add_event_handler("session_start", session_start)
|
||
|
|
||
|
client.connect()
|
||
|
client.process(timeout=1)
|
||
|
notify("startup hb vecation bot", jids=[listSubscriber[0]["jid"]])
|
||
|
while True:
|
||
|
checkHTML(jsFile = checkJS, notify=notify)
|
||
|
client.process(timeout=1)
|
||
|
#time.sleep(1)
|
||
|
notify("stop hb vecation bot", jids=[listSubscriber[0]["jid"]])
|
||
|
|