Skip to content

Commit e023a49

Browse files
committed
add untracked wikidata monitor file
1 parent 9256077 commit e023a49

File tree

1 file changed

+310
-0
lines changed

1 file changed

+310
-0
lines changed

monitor_websocket_wikidata.py

+310
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
# -*- coding: utf-8 -*-
2+
3+
# some code from Twisted Matrix's irc_test.py
4+
from twisted.words.protocols import irc
5+
from twisted.internet import reactor, protocol
6+
from twisted.internet.protocol import ReconnectingClientFactory
7+
from twisted.web.client import getPage
8+
from autobahn.websocket import (WebSocketServerFactory,
9+
WebSocketServerProtocol,
10+
listenWS)
11+
import re
12+
import socket
13+
14+
import wapiti
15+
from json import dumps, loads
16+
17+
DEBUG = False
18+
19+
LOCAL_GEOIP = 'http://localhost:7999'
20+
21+
import logging
22+
logging.basicConfig(level=logging.DEBUG,
23+
format='%(asctime)s\t%(name)s\t %(message)s',
24+
datefmt='%Y-%m-%d %H:%M:%S')
25+
bcast_log = logging.getLogger('bcast_log')
26+
irc_log = logging.getLogger('irc_log')
27+
api_log = logging.getLogger('api_log')
28+
29+
30+
DEFAULT_LANG = 'en'
31+
DEFAULT_PROJECT = 'wikipedia'
32+
DEFAULT_BCAST_PORT = 9000
33+
34+
COLOR_RE = re.compile(r"\x03(?:\d{1,2}(?:,\d{1,2})?)?",
35+
re.UNICODE) # remove IRC color codes
36+
PARSE_EDIT_RE = re.compile(r'(\[\[(?P<page_title>.*?)\]\])'
37+
r' +((?P<flags>[A-Z\!]+) )?'
38+
r'(?P<url>\S*)'
39+
r' +\* (?P<user>.*?)'
40+
r' \* (\((?P<change_size>[\+\-][0-9]+)\))?'
41+
r' ?(?P<summary>.+)?')
42+
HASHTAG_RE = re.compile("(?:^|\s)[##]{1}(\w+)", re.UNICODE)
43+
44+
NON_MAIN_NS = ['Talk',
45+
'User',
46+
'User talk',
47+
'Wikipedia',
48+
'Wikipedia talk',
49+
'File',
50+
'File talk',
51+
'MediaWiki',
52+
'MediaWiki talk',
53+
'Template',
54+
'Template talk',
55+
'Help',
56+
'Help talk',
57+
'Category',
58+
'Category talk',
59+
'Portal',
60+
'Portal talk',
61+
'Book',
62+
'Book talk',
63+
'Education Program',
64+
'Education Program talk',
65+
'TimedText',
66+
'TimedText talk',
67+
'Module',
68+
'Module talk',
69+
'Special',
70+
'Media']
71+
72+
73+
def is_ip(addr):
74+
try:
75+
socket.inet_aton(addr)
76+
except Exception:
77+
return False
78+
return True
79+
80+
81+
def process_message(message, non_main_ns=NON_MAIN_NS, bcast_callback=None):
82+
no_color = COLOR_RE.sub('', message)
83+
ret = PARSE_EDIT_RE.match(no_color)
84+
msg_dict = {'is_new': False,
85+
'is_bot': False,
86+
'is_unpatrolled': False,
87+
'is_anon': False}
88+
if ret:
89+
msg_dict.update(ret.groupdict())
90+
else:
91+
msg_dict = {}
92+
'''
93+
Special logs:
94+
- Special:Log/abusefilter
95+
- Special:Log/block
96+
- Special:Log/newusers
97+
- Special:Log/move
98+
- Special:Log/pagetriage-curation
99+
- Special:Log/delete
100+
- Special:Log/upload
101+
- Special:Log/patrol
102+
'''
103+
ns, _, title = msg_dict['page_title'].rpartition(':')
104+
if ns not in non_main_ns:
105+
msg_dict['ns'] = 'Main'
106+
else:
107+
msg_dict['ns'] = ns
108+
flags = msg_dict.get('flags') or ''
109+
110+
msg_dict['is_new'] = 'N' in flags
111+
msg_dict['is_bot'] = 'B' in flags
112+
msg_dict['is_minor'] = 'M' in flags
113+
msg_dict['is_unpatrolled'] = '!' in flags
114+
115+
username = msg_dict.get('user')
116+
is_anon = is_ip(username)
117+
118+
summary = msg_dict.get('summary')
119+
120+
if summary:
121+
# This is missing some of the features from wikimon/parsers.py
122+
msg_dict['hashtags'] = HASHTAG_RE.findall(summary)
123+
else:
124+
msg_dict['hashtags'] = []
125+
126+
def broadcast(geo_json=None):
127+
if geo_json is not None:
128+
geo_dict = loads(geo_json)
129+
msg_dict['geo_ip'] = geo_dict
130+
bcast_callback(msg_dict)
131+
132+
def report_failure_broadcast(error):
133+
bcast_log.debug("could not fetch from local geoip: %s", error)
134+
broadcast()
135+
136+
if is_anon:
137+
msg_dict['is_anon'] = True
138+
139+
if bcast_callback:
140+
try:
141+
geo_url = str(LOCAL_GEOIP + '/json/' + username)
142+
except UnicodeError:
143+
pass
144+
else:
145+
getPage(geo_url).addCallbacks(callback=broadcast,
146+
errback=report_failure_broadcast)
147+
elif bcast_callback:
148+
broadcast()
149+
return msg_dict
150+
151+
152+
class Monitor(irc.IRCClient):
153+
def __init__(self, bsf, nmns, factory):
154+
self.broadcaster = bsf
155+
self.non_main_ns = nmns
156+
self.factory = factory
157+
irc_log.info('created IRC monitor...')
158+
159+
def connectionMade(self):
160+
irc.IRCClient.connectionMade(self)
161+
irc_log.info('connected to IRC server...')
162+
163+
def signedOn(self):
164+
self.join(self.factory.channel)
165+
irc_log.info('joined %s ...', self.factory.channel)
166+
167+
def privmsg(self, user, channel, msg):
168+
try:
169+
msg = msg.decode('utf-8')
170+
except UnicodeError as ue:
171+
bcast_log.warn('UnicodeError: %r on IRC message %r', ue, msg)
172+
return
173+
process_message(msg, self.non_main_ns, self._bc_callback)
174+
175+
def _bc_callback(self, msg_dict):
176+
# Which revisions to broadcast?
177+
json_msg_dict = dumps(msg_dict)
178+
self.broadcaster.broadcast(json_msg_dict)
179+
180+
181+
class MonitorFactory(ReconnectingClientFactory):
182+
def __init__(self, channel, bsf, nmns=NON_MAIN_NS):
183+
self.channel = channel
184+
self.bsf = bsf
185+
self.nmns = nmns
186+
187+
def buildProtocol(self, addr):
188+
irc_log.info('monitor IRC connected to %s', self.channel)
189+
self.resetDelay()
190+
return Monitor(self.bsf, self.nmns, self)
191+
192+
def startConnecting(self, connector):
193+
irc_log.info('monitor IRC starting connection to %s', self.channel)
194+
protocol.startConnecting(self, connector)
195+
196+
def clientConnectionLost(self, connector, reason):
197+
irc_log.error('lost monitor IRC connection: %s', reason)
198+
ReconnectingClientFactory.clientConnectionLost(self, connector, reason)
199+
200+
def clientConnectionFailed(self, connector, reason):
201+
irc_log.error('failed monitor IRC connection: %s', reason)
202+
ReconnectingClientFactory.clientConnectionFailed(self, connector, reason)
203+
204+
205+
class BroadcastServerProtocol(WebSocketServerProtocol):
206+
def onOpen(self):
207+
self.factory.register(self)
208+
209+
def onMessage(self, msg, binary):
210+
if not binary:
211+
self.factory.broadcast("'%s' from %s" % (msg, self.peerstr))
212+
213+
def connectionLost(self, reason):
214+
WebSocketServerProtocol.connectionLost(self, reason)
215+
self.factory.unregister(self)
216+
217+
218+
class BroadcastServerFactory(WebSocketServerFactory):
219+
def __init__(self, url, lang, project, *a, **kw):
220+
WebSocketServerFactory.__init__(self, url, *a, **kw)
221+
self.clients = set()
222+
self.tickcount = 0
223+
224+
start_monitor(self, lang, project) # blargh
225+
226+
def tick(self):
227+
self.tickcount += 1
228+
self.broadcast("'tick %d' from server" % self.tickcount)
229+
reactor.callLater(1, self.tick)
230+
231+
def register(self, client):
232+
if not client in self.clients:
233+
bcast_log.info("registered client %s", client.peerstr)
234+
self.clients.add(client)
235+
236+
def unregister(self, client):
237+
try:
238+
self.clients.remove(client)
239+
bcast_log.info("unregistered client %s", client.peerstr)
240+
except KeyError:
241+
pass
242+
243+
def broadcast(self, msg):
244+
bcast_log.info("broadcasting message %r", msg)
245+
for c in self.clients:
246+
c.sendMessage(msg)
247+
bcast_log.debug("message sent to %s", c.peerstr)
248+
249+
250+
class BroadcastPreparedServerFactory(BroadcastServerFactory):
251+
def broadcast(self, msg):
252+
preparedMsg = self.prepareMessage(msg)
253+
for c in self.clients:
254+
c.sendPreparedMessage(preparedMsg)
255+
bcast_log.info("prepared message sent to %s", c.peerstr)
256+
257+
258+
def start_monitor(broadcaster, lang=DEFAULT_LANG, project=DEFAULT_PROJECT):
259+
channel = '%s.%s' % (lang, project)
260+
try:
261+
api_url = 'http://%s.%s.org/w/api.php' % (lang, project)
262+
api_log.info('fetching namespaces from %r', api_url)
263+
wc = wapiti.WapitiClient('wikimon@hatnote.com', api_url=api_url)
264+
api_log.info('successfully fetched namespaces from %r', api_url)
265+
page_info = wc.get_source_info()
266+
nmns = [ns.title for ns in page_info[0].namespace_map if ns.title]
267+
irc_log.info('connecting to %s...', channel)
268+
f = MonitorFactory(channel, broadcaster, nmns)
269+
except:
270+
irc_log.info('connecting to %s without ns', channel)
271+
f = MonitorFactory(channel, broadcaster)
272+
reactor.connectTCP("irc.wikimedia.org", 6667, f)
273+
274+
275+
def create_parser():
276+
from argparse import ArgumentParser
277+
desc = "broadcast realtime edits to a Mediawiki project over websockets"
278+
prs = ArgumentParser(description=desc)
279+
prs.add_argument('--project', default=DEFAULT_PROJECT)
280+
prs.add_argument('--lang', default=DEFAULT_LANG)
281+
prs.add_argument('--port', default=DEFAULT_BCAST_PORT, type=int,
282+
help='listen port for websocket connections')
283+
prs.add_argument('--debug', default=DEBUG, action='store_true')
284+
prs.add_argument('--loglevel', default='WARN',
285+
help='e.g., DEBUG, INFO, WARN, etc.')
286+
return prs
287+
288+
289+
def main():
290+
parser = create_parser()
291+
args = parser.parse_args()
292+
try:
293+
bcast_log.setLevel(getattr(logging, args.loglevel.upper()))
294+
except:
295+
print 'warning: invalid log level'
296+
bcast_log.setLevel(logging.WARN)
297+
ws_listen_addr = 'ws://localhost:%d' % (args.port,)
298+
ServerFactory = BroadcastServerFactory
299+
factory = ServerFactory(ws_listen_addr,
300+
project=args.project,
301+
lang=args.lang,
302+
debug=DEBUG,
303+
debugCodePaths=DEBUG)
304+
factory.protocol = BroadcastServerProtocol
305+
factory.setProtocolOptions(allowHixie76=True)
306+
listenWS(factory)
307+
reactor.run()
308+
309+
if __name__ == '__main__':
310+
main()

0 commit comments

Comments
 (0)