info = { 'author': 'Jon Bergli Heier', 'title': 'Posten.no tracking', 'description': 'Fetches tracking info from posten.no.', } cfg_section = 'module/tracking' import urllib2, datetime, re, pytz, json, urllib from xml.etree import ElementTree as ET from twisted.internet import reactor from twisted.internet.task import LoopingCall from sqlalchemy import create_engine, Column, Integer, String, DateTime, ForeignKey, UniqueConstraint from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, relation, backref from sqlalchemy.orm.exc import NoResultFound from sqlalchemy.exc import IntegrityError from sqlalchemy.sql import or_, and_ import easypost engine = Session = None Base = declarative_base() class Consignment(Base): __tablename__ = 'consignment' id = Column(Integer, primary_key = True) server = Column(String, nullable = False) nick = Column(String, nullable = False) channel = Column(String) # NULL for private code = Column(String, nullable = False, index = True) label = Column(String, index = True) added = Column(DateTime, nullable = False) last = Column(DateTime) type = Column(String, nullable = False) # Tracking module packages = relation('Package', backref = 'consignment', primaryjoin = 'Package.consignment_id == Consignment.id') __table_args__ = (UniqueConstraint('code', 'type', name = 'consignment_code_type_uc'),) def __init__(self, server, nick, channel, type, code, label, added): self.server = server self.nick = nick self.channel = channel self.type = type self.code = code self.label = label self.added = added def __str__(self): s = '\002%s\002' % self.code.encode('utf-8') if self.label: s += ' (%s)' % self.label.encode('utf-8') return s def __unicode__(self): return str(self).decode('utf-8') class Package(Base): __tablename__ = 'package' id = Column(Integer, primary_key = True) consignment_id = Column(Integer, ForeignKey('consignment.id')) code = Column(String, nullable = False, index = True) last = Column(DateTime) # Last update status = Column(String) # Last status def __init__(self, consignment_id, code): self.consignment_id = consignment_id self.code = code def __str__(self): s = '\002%s\002' % self.code.encode('utf-8') if self.consignment.label: s += ' (%s)' % self.consignment.label.encode('utf-8') return s def __unicode__(self): return str(self).decode('utf-8') class PackageError(Exception): pass class NoPackageFound(PackageError): pass class UnknownTypeError(PackageError): pass class TrackingResult: def __init__(self, code, date, desc, delivered = False, previous_code = None): self.code = code self.date = date self.desc = desc self.delivered = delivered # Set when a package changes tracking code. self.previous_code = previous_code def __str__(self): return '\002{code}\002 {date} - {desc}'.format(code = self.code, date = self.date, desc = self.desc) tracking_modules = {} class TrackingModuleMeta(type): def __init__(cls, name, bases, dict): if name != 'TrackingModule': tracking_modules[cls.name] = cls class TrackingModule(object): __metaclass__ = TrackingModuleMeta config_section = 'module/tracking' def splitcode(self, code): return code_split(code)[-1] class PostenModule(TrackingModule): name = 'posten' url = 'http://sporing.posten.no/sporing.xml?q=%s' def get_xml(self, url): try: u = urllib2.urlopen(url, timeout = config.getfloat(cfg_section, 'timeout')) except urllib2.HTTPError as e: if e.code == 404: raise NoPackageFound('No packages found') raise PackageError(str(e)) except Exception as e: raise PackageError(str(e)) if u.headers['content-type'].startswith('application/xml'): xml = ET.parse(u) else: xml = None u.close() return xml def get_url(self, code = None): url = 'http://sporing.posten.no/' if code: url += 'sporing.html?q=' + code return url def track(self, code): code = self.splitcode(code) xml = self.get_xml(self.url % code) if not xml: return ns = 'http://www.bring.no/sporing/1.0' packages = xml.find('.//{%s}PackageSet' % (ns,)) if packages is None: raise NoPackageFound('No packages found for \002%s\002.' % code) results = [] for package in packages: code = package.attrib['packageId'] if 'previousPackageId' in package.attrib: previous_code = package.attrib['previousPackageId'] else: previous_code = None eventset = package.find('{%s}EventSet' % ns) if not len(eventset): continue last = eventset[0] desc = last.find('{%s}Description' % ns).text.replace('<', '<').replace('
', ' ') desc = re.sub(r'<[^>]*?>', '', desc).encode('utf8').strip() isodate = last.find('{%s}OccuredAtIsoDateTime' % ns).text[:-10] isodate = datetime.datetime.strptime(isodate, '%Y-%m-%dT%H:%M:%S') city = last.find('{%s}City' % ns).text status = last.find('{%s}Status' % ns).text.encode('utf-8') if city: desc = '%s (%s)' % (desc, city.encode('utf8')) date = last.find('{%s}OccuredAtDisplayDate' % ns).text + ' ' + last.find('{%s}OccuredAtDisplayTime' % ns).text results.append(TrackingResult(code, isodate, desc, status == 'DELIVERED', previous_code = previous_code)) return results class FedexModule(TrackingModule): name = 'fedex' def get_url(self, code = None): url = 'https://www.fedex.com/fedextrack/index.html' if code: url += '?tracknumbers=' + code return url def track(self, code): code = self.splitcode(code) data = {'TrackPackagesRequest': {'appType': 'wtrk', 'uniqueKey': '', 'processingParameters': {'anonymousTransaction': True, 'clientId': 'WTRK', 'returnDetailedErrors': True, 'returnLocalizedDateTime': False}, 'trackingInfoList': [{'trackNumberInfo': {'trackingNumber': code, 'trackingQualifier': '', 'trackingCarrier': ''}}]}} url = 'https://www.fedex.com/trackingCal/track?' + urllib.urlencode({'data': json.dumps(data), 'action': 'trackpackages', 'locale': 'en_US', 'format': 'json', 'version': 99}) try: u = urllib2.urlopen(url) except urllib2.HTTPError as e: raise PackageError(str(e)) data = u.read() u.close() for m in re.findall(r'(\\x[a-fA-F\d]{2})', data): data = data.replace(m, chr(int(m[-2:], 16))) data = json.loads(data) response = data['TrackPackagesResponse'] packages = response['packageList'] results = [] for package in packages: lastevent = package['scanEventList'][0] # Ignore events with missing date or time, this seems to be a temporary thing. if not lastevent['date'] or not lastevent['time']: continue date = datetime.datetime.strptime('%s %s' % (lastevent['date'], lastevent['time']), '%Y-%m-%d %H:%M:%S') if lastevent['gmtOffset']: tz = pytz.timezone('Etc/GMT' + ('+' if lastevent['gmtOffset'][0] == '-' else '-') + str(int(lastevent['gmtOffset'][1:3]))) date = tz.localize(date).astimezone(pytz.timezone(config.get(cfg_section, 'local_timezone'))).replace(tzinfo = None) status = lastevent['status'].encode('utf-8') if lastevent['scanDetails']: status += ' - %s' % lastevent['scanDetails'].encode('utf-8') if lastevent['scanLocation']: status += ' (%s)' % lastevent['scanLocation'].encode('utf-8') results.append(TrackingResult(code, date, status, lastevent['isDelivered'])) return results class PostNordModule(TrackingModule): name = 'postnord' def get_url(self, code = None): url = 'http://www.postnordlogistics.no/minside/SOPS/' if code: url += code return url def track(self, code): code = self.splitcode(code) url = 'http://www.postnordlogistics.no/XMLServer/rest/trackandtrace?' + urllib.urlencode({'q': code}) try: u = urllib2.urlopen(url) except urllib2.HTTPError as e: raise PackageError(str(e)) data = json.load(u) u.close() response = data['TrackingInformationResponse'] results = [] for shipment in response['shipments']: for item in shipment['items']: code = item['itemId'] delivered = (item.get('status') == 'DELIVERED') lastevent = item['events'][-1] date = datetime.datetime.strptime(lastevent['eventTime'], '%Y-%m-%dT%H:%M:%S') event = lastevent['eventDescription'] location = lastevent.get('location') if location: location = lastevent['location'].get('displayName') or lastevent['location'].get('city') if location: status = '%s (%s)' % (event, location) else: status = event results.append(TrackingResult(code, date, status.encode('utf-8'), delivered)) return results class UpsModule(TrackingModule): name = 'ups' def get_url(self, code = None): if code is None: url = 'http://www.ups.com/WebTracking/track' else: data = { 'TypeOfInquiryNumber': 'T', 'InquiryNumber1': code, } url = 'http://www.ups.com/WebTracking/processInputRequest?' + urllib.urlencode(data) return url def track(self, code): code = self.splitcode(code) access_key = config.get(self.config_section, 'ups_access_key') username = config.get(self.config_section, 'ups_username') password = config.get(self.config_section, 'ups_password') url = 'https://wwwcie.ups.com/ups.app/xml/Track' request = """ %s %s %s Track %s """ % (access_key, username, password, code) try: u = urllib2.urlopen(url, request) except urllib2.HTTPError as e: raise PackageError(str(e)) xml = ET.parse(u) packages = xml.findall('Shipment/Package') if packages is None: raise NoPackageFound('No packages found for \002%s\002.' % code) results = [] for package in packages: code = package.find('TrackingNumber').text activity = package.find('Activity') if activity is None: continue address = activity.find('ActivityLocation/Address') location_text = "%s, %s" % (address.find('City').text, address.find('CountryCode').text) statuscode = activity.find('Status/StatusType/Code').text event_text = activity.find('Status/StatusType/Description').text date = activity.find('Date').text time = activity.find('Time').text year, month, day = int(date[0:4]), int(date[4:6]), int(date[6:8]) hour, minute, second = int(time[0:2]), int(time[2:4]), int(time[4:6]) isodate = datetime.datetime(year, month, day, hour, minute, second) desc = "%s (%s)" % (event_text, location_text) results.append(TrackingResult(code, isodate, desc, statuscode == 'D')) return results class EasypostModule: def track(self, code): code = self.splitcode(code) try: tracker = (t for t in easypost.Tracker.all() if t.tracking_code == code).next() except StopIteration: tracker = easypost.Tracker.create(tracking_code = code, carrier = self.easypost_carrier) results = [] if tracker.tracking_details: delivered = tracker.status == 'delivered' lastevent = tracker.tracking_details[-1] date = datetime.datetime.strptime(lastevent.datetime, '%Y-%m-%dT%H:%M:%SZ') status = lastevent.message location = [] for f in ('city', 'state', 'country'): if lastevent['tracking_location'][f]: location.append(lastevent['tracking_location'][f]) if location: status += ' (%s)' % ', '.join(location) results.append(TrackingResult(code, date, status.encode('utf-8'), delivered)) return results class DHLModule(TrackingModule, EasypostModule): name = 'dhl' easypost_carrier = 'DHLExpress' def get_url(self, code = None): url = 'http://www.dhl.no/no/express/sporing.html?AWB=' if code: url += code return url def code_split(code): if ':' in code: return code.split(':') else: return 'posten', code def get_tracking_module(arg): type, code = code_split(arg) if not type in tracking_modules: raise UnknownTypeError('Unknown type "%s"' % type) return tracking_modules[type]() class Module: def __init__(self, bot): easypost.api_key = config.get(cfg_section, 'easypost_api_key') self.setup_db() self.irc = bot self.tracking = [] self.lc = LoopingCall(self.lc_callback) if bot: self.lc.start(config.getfloat(cfg_section, 'interval'), False) self.irc.register_keyword('!track', self) def stop(self): try: self.lc.stop() except: pass def setup_db(self): global engine, Session if engine and Session: return engine = create_engine(config.get(cfg_section, 'db_path')) Base.metadata.create_all(engine) Session = sessionmaker(bind = engine, autoflush = True, autocommit = False) def track_start(self, code, label, nick, channel): type, code = code_split(code) if not type in tracking_modules: return '\002%s\002 is an invalid type.' % type if not len(code): return 'Missing tracking number.' msg = None try: session = Session() consignment = Consignment(self.irc.factory.server, nick, channel, type, code, label.decode('utf8'), datetime.datetime.utcnow()) session.add(consignment) session.commit() msg = 'Now tracking \002%s\002.' % code reactor.callLater(1, self.track_update, type, code, True) except IntegrityError as e: msg = 'Already tracking \002%s\002.' % code finally: session.close() return msg def track_stop(self, code, nick, channel): type, code = code_split(code) if not len(code): return 'Missing tracking number or label.' msg = None try: session = Session() consignments = session.query(Consignment).outerjoin(Consignment.packages) \ .filter(and_(Consignment.server == self.irc.factory.server, or_(Consignment.code == code, Consignment.label == code, Package.code == code))) if nick: consignments = consignments.filter(Consignment.nick == nick) deleted = [] consignments = consignments.all() if not len(consignments): raise NoResultFound # we should have exactly one consignment, but need to handle cases when there are more for consignment in consignments: code = str(consignment) for p in consignment.packages: session.delete(p) deleted.append(code) session.delete(consignment) session.commit() msg = 'No longer tracking %s' % ', '.join(deleted) except NoResultFound: msg = '\002%s\002 is not being tracked.' % code.decode('utf8') finally: session.close() return msg def track_status(self, code, nick, channel): # don't code_split() here; this is done later if no results are returned msg = [] try: session = Session() consignments = session.query(Consignment).filter(and_(Consignment.server == self.irc.factory.server, or_(Consignment.code.like(code + '%'), Consignment.label.like((code + '%'))))) if nick: consignments = consignments.filter(Consignment.nick == nick) results = [] for row in consignments: try: self.track_update(row.type, row.code, propagate_error = True) except NoPackageFound: results.append('No packages found for %s' % unicode(row)) continue except PackageError as e: results.append('Failed to fetch data for %s.' % unicode(row)) continue i = 0 for package in row.packages: i += 1 date = package.last desc = package.status if date and desc: s = '%s %s - %s' % (unicode(package), date, desc) else: s = 'No tracking info found for %s' % unicode(package) results.append(s) if i == 0: results.append('No packages found for %s' % unicode(row)) if len(results): msg = results elif len(code): try: # type:code is handled by get_tracking_module() tm = get_tracking_module(code) data = tm.track(code) if data: for package in data: msg.append('\002%s\002 %s - %s' % (package.code, package.date, package.desc.decode('utf8'))) elif code: msg = 'Failed to fetch tracking data for \002%s\002.' % code.decode('utf8') except PackageError as e: msg = str(e) else: msg = 'No tracking number given or registered.' finally: session.close() return msg def track_label(self, code, label, nick, channel): type, code = code_split(code) if not len(code): return 'Missing tracking number or label.' msg = None try: session = Session() consignment = session.query(Consignment).filter(and_(Consignment.server == self.irc.factory.server, Consignment.code == code, Consignment.nick == nick)).one() consignment.label = label.decode('utf8') session.add(consignment) session.commit() msg = '\002%s\002 now labelled \002%s\002' % (code, label.decode('utf8')) finally: session.close() return msg def track_list(self, nick): msg = None try: session = Session() trackings = [] consignments = session.query(Consignment).filter(and_(Consignment.server == self.irc.factory.server, Consignment.nick == nick)) for row in consignments: trackings.append(str(row)) if len(trackings): msg = 'Trackings for %s: %s' % (nick, ', '.join(trackings)) else: msg = 'No trackings for %s' % nick except NoResultFound: msg = 'No trackings registered for %s.' finally: session.close() return msg def track_url(self, code, nick, channel): msg = [] try: session = Session() consignments = session.query(Consignment).filter(and_(Consignment.server == self.irc.factory.server, or_(Consignment.code.like(code + '%'), Consignment.label.like(code+ '%')))) if nick: consignments = consignments.filter(Consignment.nick == nick) results = [] for consignment in consignments: m = get_tracking_module('%s:%s' % (consignment.type, consignment.code)) msg.append(m.get_url(consignment.code)) if not len(msg): # Rreturn an error only if we got a code to filter with. if code: msg.append('No packages found.') else: m = get_tracking_module('') msg = [m.get_url()] finally: session.close() return msg def keyword(self, nick, channel, kw, msg): usage = 'Usage: !track (start TRACKINGNO [LABEL])|(stop|status TRACKINGNO|LABEL) - See !track help for full list' args = msg.split() if len(args) < 1: self.irc.msg(channel if not channel == self.irc.nickname else nick.split('!')[0], usage) return mode = args[0] if mode.lower() in ('start', 'add', 'stop', 'status'): if len(args) < 2 and mode.lower() != 'status': self.irc.msg(channel if not channel == self.irc.nickname else nick.split('!')[0], usage) return if mode.lower() in ('start', 'add'): code = args[1] label = ' '.join(args[2:]) if len(args) > 2 else '' else: code = ' '.join(args[1:]) elif mode.lower() == 'label': if len(args) < 3: self.irc.msg(channel if not channel == self.irc.nickname else nick.split('!')[0], usage) return code = args[1] label = ' '.join(args[2:]) elif mode.lower() == 'url': code = ' '.join(args[1:]) if len(args) > 1 else '' elif mode.lower() == 'help': self.irc.msg(nick.split('!')[0], ''' start TRACKINGNO [LABEL] - Start tracking package with optional label (alias: add) stop TRACKINGNO|LABEL - Stop tracking package given by either tracking number or label status TRACKINGNO|LABEL - Show status for registered matches, or look up tracking number if no matches (implicitly updates status) list - List all registered tracking numbers without fetching status label TRACKINGNO LABEL - Changes the label for the given tracking number url [TRACKINGNO] - Lists URLs for matches to the corresponding website''') return msg = None if mode.lower() in ('start', 'add'): msg = self.track_start(code, label, nick.split('!')[0], channel if not channel == self.irc.nickname else None) elif mode.lower() == 'stop': msg = self.track_stop(code, nick.split('!')[0], channel if not channel == self.irc.nickname else None) elif mode.lower() == 'status': msg = self.track_status(code, nick.split('!')[0], channel if not channel == self.irc.nickname else None) elif mode.lower() == 'list': msg = self.track_list(nick.split('!')[0]) elif mode.lower() == 'label': msg = self.track_label(code, label, nick.split('!')[0], channel if not channel == self.irc.nickname else None) elif mode.lower() == 'url': msg = self.track_url(code, nick.split('!')[0], channel if not channel == self.irc.nickname else None) else: msg = 'Invalid mode "%s".' % mode if msg: if type(msg) == list: for i in msg: if isinstance(i, unicode): i = i.encode('utf-8') self.irc.msg(channel if not channel == self.irc.nickname else nick.split('!')[0], i) else: if isinstance(msg, unicode): msg = msg.encode('utf-8') self.irc.msg(channel if not channel == self.irc.nickname else nick.split('!')[0], msg) else: self.irc.msg(channel if not channel == self.irc.nickname else nick.split('!')[0], 'No data returned (this is a bug).') # called by start and status def track_update(self, type, code, announce = False, propagate_error = False): try: session = Session() consignment = session.query(Consignment).filter(and_(Consignment.server == self.irc.factory.server, Consignment.type == type, Consignment.code == code)).one() self.update_consignment(session, consignment, announce, propagate_error) session.commit() finally: session.close() def update_consignment(self, session, consignment, announce = True, propagate_error = False): now = datetime.datetime.utcnow() td = datetime.timedelta(seconds = config.getint(cfg_section, 'cache_time')) # return if consignment was cached within cache_time seconds if consignment.last and consignment.last > now - td: return consignment.last = now target = consignment.channel or consignment.nick target = target.encode('utf-8') removed = False tm = tracking_modules[consignment.type]() try: package_results = tm.track(consignment.code) or [] except PackageError as e: if propagate_error: raise e # ignore, set results to an empty list so that we can reach the stale check at the end package_results = [] for data in package_results: try: package = session.query(Package).filter(Package.consignment_id == consignment.id) if data.previous_code: package = package.filter(Package.code.in_((data.code, data.previous_code))) else: package = package.filter(Package.code == data.code) package = package.one() except NoResultFound: # don't add delivered packages if data.delivered: continue try: conflicts = session.query(Consignment).join(Consignment.packages) \ .filter(and_(Package.code == data.code, Consignment.type != consignment.type)).all() except NoResultFound: pass else: if len(conflicts): self.irc.msg(target, '%s: %s conflicts with another consignment' % consignment.nick.encode('utf-8'), consignment) removed = True break package = Package(consignment.id, data.code) session.add(package) try: session.commit() except IntegrityError: # assume several packets within the same consignment was added as a consignment session.rollback() removed = True self.irc.msg(target, '%s: %s conflicts with another consignment' % (consignment.nick.encode('utf-8'), consignment)) break package = session.query(Package).filter(and_(Package.consignment_id == consignment.id, Package.code == data.code)).one() # This happens when the package changes code. if package.code != data.code: # Always announce. msg = '%s: %s has changed code to \002%s\002' % (consignment.nick.encode('utf-8'), package, data.code) self.irc.msg(target, msg) package.code = data.code session.add(package) if package.last == None or data.date > package.last: code = data.code last = data.date desc = data.desc msg = '%s: %s %s - %s' % (consignment.nick.encode('utf-8'), package, last, desc) if data.delivered: session.delete(package) msg += ' (Package delivered - tracking stopped)' removed = True else: package.last = last package.status = desc session.add(package) if announce or removed: self.irc.msg(target, msg) elif package.last != None and package.last < datetime.datetime.utcnow() - datetime.timedelta(hours = 24*30): msg = '%s: Removing stale package %s' % (consignment.nick.encode('utf-8'), consignment) self.irc.msg(target, msg) session.delete(package) removed = True # empty consignment and 30 days old if len(consignment.packages) == 0 and consignment.added < datetime.datetime.utcnow() - datetime.timedelta(hours = 24*30): msg = '%s: Removing stale consignment %s' % (consignment.nick.encode('utf-8'), consignment) self.irc.msg(target, msg) session.delete(consignment) if removed and len(consignment.packages) == 0: msg = '%s: %s is no longer being tracked' % (consignment.nick.encode('utf-8'), consignment) self.irc.msg(target, msg) session.delete(consignment) def lc_callback(self): try: session = Session() consignments = session.query(Consignment).filter(and_(Consignment.server == self.irc.factory.server, or_(Consignment.channel.in_(config.get(self.irc.factory.server, 'channels').split()), Consignment.channel == None))) for row in consignments: self.update_consignment(session, row) session.commit() except Exception: import traceback print 'Exception as %s' % datetime.datetime.now() traceback.print_exc() finally: session.close() if __name__ == '__main__': import sys, ConfigParser, os config = ConfigParser.ConfigParser() config.read([os.path.expanduser('~/.fot')]) m = Module(None) for code in sys.argv[1:]: tm = get_tracking_module(code) tracking = tm.track(code) if tracking: for track in tracking: if type(track) == list: print '\n'.join(track) else: print track # vim: noet ts=4