info = {
'author': 'Jon Bergli Heier',
'title': 'Posten.no tracking',
'description': 'Fetches tracking info from posten.no.',
}
cfg_section = 'module/tracking'
import urllib2, datetime, re, pytz, json, urllib
from xml.etree import ElementTree as ET
from twisted.internet import reactor
from twisted.internet.task import LoopingCall
from sqlalchemy import create_engine, Column, Integer, String, DateTime, ForeignKey, UniqueConstraint
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relation, backref
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.exc import IntegrityError
from sqlalchemy.sql import or_, and_
import easypost
engine = Session = None
Base = declarative_base()
class Consignment(Base):
__tablename__ = 'consignment'
id = Column(Integer, primary_key = True)
server = Column(String, nullable = False)
nick = Column(String, nullable = False)
channel = Column(String) # NULL for private
code = Column(String, nullable = False, index = True)
label = Column(String, index = True)
added = Column(DateTime, nullable = False)
last = Column(DateTime)
type = Column(String, nullable = False) # Tracking module
packages = relation('Package', backref = 'consignment', primaryjoin = 'Package.consignment_id == Consignment.id')
__table_args__ = (UniqueConstraint('code', 'type', name = 'consignment_code_type_uc'),)
def __init__(self, server, nick, channel, type, code, label, added):
self.server = server
self.nick = nick
self.channel = channel
self.type = type
self.code = code
self.label = label
self.added = added
def __str__(self):
s = '\002%s\002' % self.code.encode('utf-8')
if self.label:
s += ' (%s)' % self.label.encode('utf-8')
return s
def __unicode__(self):
return str(self).decode('utf-8')
class Package(Base):
__tablename__ = 'package'
id = Column(Integer, primary_key = True)
consignment_id = Column(Integer, ForeignKey('consignment.id'))
code = Column(String, nullable = False, index = True)
last = Column(DateTime) # Last update
status = Column(String) # Last status
def __init__(self, consignment_id, code):
self.consignment_id = consignment_id
self.code = code
def __str__(self):
s = '\002%s\002' % self.code.encode('utf-8')
if self.consignment.label:
s += ' (%s)' % self.consignment.label.encode('utf-8')
return s
def __unicode__(self):
return str(self).decode('utf-8')
class PackageError(Exception): pass
class NoPackageFound(PackageError): pass
class UnknownTypeError(PackageError): pass
class TrackingResult:
def __init__(self, code, date, desc, delivered = False, previous_code = None):
self.code = code
self.date = date
self.desc = desc
self.delivered = delivered
# Set when a package changes tracking code.
self.previous_code = previous_code
def __str__(self):
return '\002{code}\002 {date} - {desc}'.format(code = self.code, date = self.date, desc = self.desc)
tracking_modules = {}
class TrackingModuleMeta(type):
def __init__(cls, name, bases, dict):
if name != 'TrackingModule':
tracking_modules[cls.name] = cls
class TrackingModule(object):
__metaclass__ = TrackingModuleMeta
config_section = 'module/tracking'
def splitcode(self, code):
return code_split(code)[-1]
class PostenModule(TrackingModule):
name = 'posten'
url = 'http://sporing.posten.no/sporing.xml?q=%s'
def get_xml(self, url):
try:
u = urllib2.urlopen(url, timeout = config.getfloat(cfg_section, 'timeout'))
except urllib2.HTTPError as e:
if e.code == 404:
raise NoPackageFound('No packages found')
raise PackageError(str(e))
except Exception as e:
raise PackageError(str(e))
if u.headers['content-type'].startswith('application/xml'):
xml = ET.parse(u)
else:
xml = None
u.close()
return xml
def get_url(self, code = None):
url = 'http://sporing.posten.no/'
if code:
url += 'sporing.html?q=' + code
return url
def track(self, code):
code = self.splitcode(code)
xml = self.get_xml(self.url % code)
if not xml:
return
ns = 'http://www.bring.no/sporing/1.0'
packages = xml.find('.//{%s}PackageSet' % (ns,))
if packages is None:
raise NoPackageFound('No packages found for \002%s\002.' % code)
results = []
for package in packages:
code = package.attrib['packageId']
if 'previousPackageId' in package.attrib:
previous_code = package.attrib['previousPackageId']
else:
previous_code = None
eventset = package.find('{%s}EventSet' % ns)
if not len(eventset):
continue
last = eventset[0]
desc = last.find('{%s}Description' % ns).text.replace('<', '<').replace('
', ' ')
desc = re.sub(r'<[^>]*?>', '', desc).encode('utf8').strip()
isodate = last.find('{%s}OccuredAtIsoDateTime' % ns).text[:-10]
isodate = datetime.datetime.strptime(isodate, '%Y-%m-%dT%H:%M:%S')
city = last.find('{%s}City' % ns).text
status = last.find('{%s}Status' % ns).text.encode('utf-8')
if city:
desc = '%s (%s)' % (desc, city.encode('utf8'))
date = last.find('{%s}OccuredAtDisplayDate' % ns).text + ' ' + last.find('{%s}OccuredAtDisplayTime' % ns).text
results.append(TrackingResult(code, isodate, desc, status == 'DELIVERED', previous_code = previous_code))
return results
class FedexModule(TrackingModule):
name = 'fedex'
def get_url(self, code = None):
url = 'https://www.fedex.com/fedextrack/index.html'
if code:
url += '?tracknumbers=' + code
return url
def track(self, code):
code = self.splitcode(code)
data = {'TrackPackagesRequest': {'appType': 'wtrk', 'uniqueKey': '', 'processingParameters': {'anonymousTransaction': True, 'clientId': 'WTRK', 'returnDetailedErrors': True, 'returnLocalizedDateTime': False}, 'trackingInfoList': [{'trackNumberInfo': {'trackingNumber': code, 'trackingQualifier': '', 'trackingCarrier': ''}}]}}
url = 'https://www.fedex.com/trackingCal/track?' + urllib.urlencode({'data': json.dumps(data), 'action': 'trackpackages', 'locale': 'en_US', 'format': 'json', 'version': 99})
try:
u = urllib2.urlopen(url)
except urllib2.HTTPError as e:
raise PackageError(str(e))
data = u.read()
u.close()
for m in re.findall(r'(\\x[a-fA-F\d]{2})', data):
data = data.replace(m, chr(int(m[-2:], 16)))
data = json.loads(data)
response = data['TrackPackagesResponse']
packages = response['packageList']
results = []
for package in packages:
lastevent = package['scanEventList'][0]
# Ignore events with missing date or time, this seems to be a temporary thing.
if not lastevent['date'] or not lastevent['time']:
continue
date = datetime.datetime.strptime('%s %s' % (lastevent['date'], lastevent['time']), '%Y-%m-%d %H:%M:%S')
if lastevent['gmtOffset']:
tz = pytz.timezone('Etc/GMT' + ('+' if lastevent['gmtOffset'][0] == '-' else '-') + str(int(lastevent['gmtOffset'][1:3])))
date = tz.localize(date).astimezone(pytz.timezone(config.get(cfg_section, 'local_timezone'))).replace(tzinfo = None)
status = lastevent['status'].encode('utf-8')
if lastevent['scanDetails']:
status += ' - %s' % lastevent['scanDetails'].encode('utf-8')
if lastevent['scanLocation']:
status += ' (%s)' % lastevent['scanLocation'].encode('utf-8')
results.append(TrackingResult(code, date, status, lastevent['isDelivered']))
return results
class PostNordModule(TrackingModule):
name = 'postnord'
def get_url(self, code = None):
url = 'http://www.postnordlogistics.no/minside/SOPS/'
if code:
url += code
return url
def track(self, code):
code = self.splitcode(code)
url = 'http://www.postnordlogistics.no/XMLServer/rest/trackandtrace?' + urllib.urlencode({'q': code})
try:
u = urllib2.urlopen(url)
except urllib2.HTTPError as e:
raise PackageError(str(e))
data = json.load(u)
u.close()
response = data['TrackingInformationResponse']
results = []
for shipment in response['shipments']:
for item in shipment['items']:
code = item['itemId']
delivered = (item.get('status') == 'DELIVERED')
lastevent = item['events'][-1]
date = datetime.datetime.strptime(lastevent['eventTime'], '%Y-%m-%dT%H:%M:%S')
event = lastevent['eventDescription']
location = lastevent.get('location')
if location:
location = lastevent['location'].get('displayName') or lastevent['location'].get('city')
if location:
status = '%s (%s)' % (event, location)
else:
status = event
results.append(TrackingResult(code, date, status.encode('utf-8'), delivered))
return results
class UpsModule(TrackingModule):
name = 'ups'
def get_url(self, code = None):
if code is None:
url = 'http://www.ups.com/WebTracking/track'
else:
data = {
'TypeOfInquiryNumber': 'T',
'InquiryNumber1': code,
}
url = 'http://www.ups.com/WebTracking/processInputRequest?' + urllib.urlencode(data)
return url
def track(self, code):
code = self.splitcode(code)
access_key = config.get(self.config_section, 'ups_access_key')
username = config.get(self.config_section, 'ups_username')
password = config.get(self.config_section, 'ups_password')
url = 'https://wwwcie.ups.com/ups.app/xml/Track'
request = """
%s
%s
%s
Track
%s
""" % (access_key, username, password, code)
try:
u = urllib2.urlopen(url, request)
except urllib2.HTTPError as e:
raise PackageError(str(e))
xml = ET.parse(u)
packages = xml.findall('Shipment/Package')
if packages is None:
raise NoPackageFound('No packages found for \002%s\002.' % code)
results = []
for package in packages:
code = package.find('TrackingNumber').text
activity = package.find('Activity')
if activity is None:
continue
address = activity.find('ActivityLocation/Address')
location_text = "%s, %s" % (address.find('City').text, address.find('CountryCode').text)
statuscode = activity.find('Status/StatusType/Code').text
event_text = activity.find('Status/StatusType/Description').text
date = activity.find('Date').text
time = activity.find('Time').text
year, month, day = int(date[0:4]), int(date[4:6]), int(date[6:8])
hour, minute, second = int(time[0:2]), int(time[2:4]), int(time[4:6])
isodate = datetime.datetime(year, month, day, hour, minute, second)
desc = "%s (%s)" % (event_text, location_text)
results.append(TrackingResult(code, isodate, desc, statuscode == 'D'))
return results
class EasypostModule:
def track(self, code):
code = self.splitcode(code)
try:
tracker = (t for t in easypost.Tracker.all() if t.tracking_code == code).next()
except StopIteration:
tracker = easypost.Tracker.create(tracking_code = code, carrier = self.easypost_carrier)
results = []
if tracker.tracking_details:
delivered = tracker.status == 'delivered'
lastevent = tracker.tracking_details[-1]
date = datetime.datetime.strptime(lastevent.datetime, '%Y-%m-%dT%H:%M:%SZ')
status = lastevent.message
location = []
for f in ('city', 'state', 'country'):
if lastevent['tracking_location'][f]:
location.append(lastevent['tracking_location'][f])
if location:
status += ' (%s)' % ', '.join(location)
results.append(TrackingResult(code, date, status.encode('utf-8'), delivered))
return results
class DHLModule(TrackingModule, EasypostModule):
name = 'dhl'
easypost_carrier = 'DHLExpress'
def get_url(self, code = None):
url = 'http://www.dhl.no/no/express/sporing.html?AWB='
if code:
url += code
return url
def code_split(code):
if ':' in code:
return code.split(':')
else:
return 'posten', code
def get_tracking_module(arg):
type, code = code_split(arg)
if not type in tracking_modules:
raise UnknownTypeError('Unknown type "%s"' % type)
return tracking_modules[type]()
class Module:
def __init__(self, bot):
easypost.api_key = config.get(cfg_section, 'easypost_api_key')
self.setup_db()
self.irc = bot
self.tracking = []
self.lc = LoopingCall(self.lc_callback)
if bot:
self.lc.start(config.getfloat(cfg_section, 'interval'), False)
self.irc.register_keyword('!track', self)
def stop(self):
try:
self.lc.stop()
except:
pass
def setup_db(self):
global engine, Session
if engine and Session:
return
engine = create_engine(config.get(cfg_section, 'db_path'))
Base.metadata.create_all(engine)
Session = sessionmaker(bind = engine, autoflush = True, autocommit = False)
def track_start(self, code, label, nick, channel):
type, code = code_split(code)
if not type in tracking_modules:
return '\002%s\002 is an invalid type.' % type
if not len(code):
return 'Missing tracking number.'
msg = None
try:
session = Session()
consignment = Consignment(self.irc.factory.server, nick, channel, type, code, label.decode('utf8'), datetime.datetime.utcnow())
session.add(consignment)
session.commit()
msg = 'Now tracking \002%s\002.' % code
reactor.callLater(1, self.track_update, type, code, True)
except IntegrityError as e:
msg = 'Already tracking \002%s\002.' % code
finally:
session.close()
return msg
def track_stop(self, code, nick, channel):
type, code = code_split(code)
if not len(code):
return 'Missing tracking number or label.'
msg = None
try:
session = Session()
consignments = session.query(Consignment).outerjoin(Consignment.packages) \
.filter(and_(Consignment.server == self.irc.factory.server,
or_(Consignment.code == code, Consignment.label == code, Package.code == code)))
if nick:
consignments = consignments.filter(Consignment.nick == nick)
deleted = []
consignments = consignments.all()
if not len(consignments):
raise NoResultFound
# we should have exactly one consignment, but need to handle cases when there are more
for consignment in consignments:
code = str(consignment)
for p in consignment.packages:
session.delete(p)
deleted.append(code)
session.delete(consignment)
session.commit()
msg = 'No longer tracking %s' % ', '.join(deleted)
except NoResultFound:
msg = '\002%s\002 is not being tracked.' % code.decode('utf8')
finally:
session.close()
return msg
def track_status(self, code, nick, channel):
# don't code_split() here; this is done later if no results are returned
msg = []
try:
session = Session()
consignments = session.query(Consignment).filter(and_(Consignment.server == self.irc.factory.server,
or_(Consignment.code.like(code + '%'), Consignment.label.like((code + '%')))))
if nick:
consignments = consignments.filter(Consignment.nick == nick)
results = []
for row in consignments:
try:
self.track_update(row.type, row.code, propagate_error = True)
except NoPackageFound:
results.append('No packages found for %s' % unicode(row))
continue
except PackageError as e:
results.append('Failed to fetch data for %s.' % unicode(row))
continue
i = 0
for package in row.packages:
i += 1
date = package.last
desc = package.status
if date and desc:
s = '%s %s - %s' % (unicode(package), date, desc)
else:
s = 'No tracking info found for %s' % unicode(package)
results.append(s)
if i == 0:
results.append('No packages found for %s' % unicode(row))
if len(results):
msg = results
elif len(code):
try:
# type:code is handled by get_tracking_module()
tm = get_tracking_module(code)
data = tm.track(code)
if data:
for package in data:
msg.append('\002%s\002 %s - %s' % (package.code, package.date, package.desc.decode('utf8')))
elif code:
msg = 'Failed to fetch tracking data for \002%s\002.' % code.decode('utf8')
except PackageError as e:
msg = str(e)
else:
msg = 'No tracking number given or registered.'
finally:
session.close()
return msg
def track_label(self, code, label, nick, channel):
type, code = code_split(code)
if not len(code):
return 'Missing tracking number or label.'
msg = None
try:
session = Session()
consignment = session.query(Consignment).filter(and_(Consignment.server == self.irc.factory.server, Consignment.code == code, Consignment.nick == nick)).one()
consignment.label = label.decode('utf8')
session.add(consignment)
session.commit()
msg = '\002%s\002 now labelled \002%s\002' % (code, label.decode('utf8'))
finally:
session.close()
return msg
def track_list(self, nick):
msg = None
try:
session = Session()
trackings = []
consignments = session.query(Consignment).filter(and_(Consignment.server == self.irc.factory.server, Consignment.nick == nick))
for row in consignments:
trackings.append(str(row))
if len(trackings):
msg = 'Trackings for %s: %s' % (nick, ', '.join(trackings))
else:
msg = 'No trackings for %s' % nick
except NoResultFound:
msg = 'No trackings registered for %s.'
finally:
session.close()
return msg
def track_url(self, code, nick, channel):
msg = []
try:
session = Session()
consignments = session.query(Consignment).filter(and_(Consignment.server == self.irc.factory.server,
or_(Consignment.code.like(code + '%'), Consignment.label.like(code+ '%'))))
if nick:
consignments = consignments.filter(Consignment.nick == nick)
results = []
for consignment in consignments:
m = get_tracking_module('%s:%s' % (consignment.type, consignment.code))
msg.append(m.get_url(consignment.code))
if not len(msg):
# Rreturn an error only if we got a code to filter with.
if code:
msg.append('No packages found.')
else:
m = get_tracking_module('')
msg = [m.get_url()]
finally:
session.close()
return msg
def keyword(self, nick, channel, kw, msg):
usage = 'Usage: !track (start TRACKINGNO [LABEL])|(stop|status TRACKINGNO|LABEL) - See !track help for full list'
args = msg.split()
if len(args) < 1:
self.irc.msg(channel if not channel == self.irc.nickname else nick.split('!')[0], usage)
return
mode = args[0]
if mode.lower() in ('start', 'add', 'stop', 'status'):
if len(args) < 2 and mode.lower() != 'status':
self.irc.msg(channel if not channel == self.irc.nickname else nick.split('!')[0], usage)
return
if mode.lower() in ('start', 'add'):
code = args[1]
label = ' '.join(args[2:]) if len(args) > 2 else ''
else:
code = ' '.join(args[1:])
elif mode.lower() == 'label':
if len(args) < 3:
self.irc.msg(channel if not channel == self.irc.nickname else nick.split('!')[0], usage)
return
code = args[1]
label = ' '.join(args[2:])
elif mode.lower() == 'url':
code = ' '.join(args[1:]) if len(args) > 1 else ''
elif mode.lower() == 'help':
self.irc.msg(nick.split('!')[0], '''
start TRACKINGNO [LABEL] - Start tracking package with optional label (alias: add)
stop TRACKINGNO|LABEL - Stop tracking package given by either tracking number or label
status TRACKINGNO|LABEL - Show status for registered matches, or look up tracking number if no matches (implicitly updates status)
list - List all registered tracking numbers without fetching status
label TRACKINGNO LABEL - Changes the label for the given tracking number
url [TRACKINGNO] - Lists URLs for matches to the corresponding website''')
return
msg = None
if mode.lower() in ('start', 'add'):
msg = self.track_start(code, label, nick.split('!')[0], channel if not channel == self.irc.nickname else None)
elif mode.lower() == 'stop':
msg = self.track_stop(code, nick.split('!')[0], channel if not channel == self.irc.nickname else None)
elif mode.lower() == 'status':
msg = self.track_status(code, nick.split('!')[0], channel if not channel == self.irc.nickname else None)
elif mode.lower() == 'list':
msg = self.track_list(nick.split('!')[0])
elif mode.lower() == 'label':
msg = self.track_label(code, label, nick.split('!')[0], channel if not channel == self.irc.nickname else None)
elif mode.lower() == 'url':
msg = self.track_url(code, nick.split('!')[0], channel if not channel == self.irc.nickname else None)
else:
msg = 'Invalid mode "%s".' % mode
if msg:
if type(msg) == list:
for i in msg:
if isinstance(i, unicode):
i = i.encode('utf-8')
self.irc.msg(channel if not channel == self.irc.nickname else nick.split('!')[0], i)
else:
if isinstance(msg, unicode):
msg = msg.encode('utf-8')
self.irc.msg(channel if not channel == self.irc.nickname else nick.split('!')[0], msg)
else:
self.irc.msg(channel if not channel == self.irc.nickname else nick.split('!')[0], 'No data returned (this is a bug).')
# called by start and status
def track_update(self, type, code, announce = False, propagate_error = False):
try:
session = Session()
consignment = session.query(Consignment).filter(and_(Consignment.server == self.irc.factory.server, Consignment.type == type, Consignment.code == code)).one()
self.update_consignment(session, consignment, announce, propagate_error)
session.commit()
finally:
session.close()
def update_consignment(self, session, consignment, announce = True, propagate_error = False):
now = datetime.datetime.utcnow()
td = datetime.timedelta(seconds = config.getint(cfg_section, 'cache_time'))
# return if consignment was cached within cache_time seconds
if consignment.last and consignment.last > now - td:
return
consignment.last = now
target = consignment.channel or consignment.nick
target = target.encode('utf-8')
removed = False
tm = tracking_modules[consignment.type]()
try:
package_results = tm.track(consignment.code) or []
except PackageError as e:
if propagate_error:
raise e
# ignore, set results to an empty list so that we can reach the stale check at the end
package_results = []
for data in package_results:
try:
package = session.query(Package).filter(Package.consignment_id == consignment.id)
if data.previous_code:
package = package.filter(Package.code.in_((data.code, data.previous_code)))
else:
package = package.filter(Package.code == data.code)
package = package.one()
except NoResultFound:
# don't add delivered packages
if data.delivered:
continue
try:
conflicts = session.query(Consignment).join(Consignment.packages) \
.filter(and_(Package.code == data.code, Consignment.type != consignment.type)).all()
except NoResultFound:
pass
else:
if len(conflicts):
self.irc.msg(target, '%s: %s conflicts with another consignment' % consignment.nick.encode('utf-8'), consignment)
removed = True
break
package = Package(consignment.id, data.code)
session.add(package)
try:
session.commit()
except IntegrityError:
# assume several packets within the same consignment was added as a consignment
session.rollback()
removed = True
self.irc.msg(target, '%s: %s conflicts with another consignment' % (consignment.nick.encode('utf-8'), consignment))
break
package = session.query(Package).filter(and_(Package.consignment_id == consignment.id, Package.code == data.code)).one()
# This happens when the package changes code.
if package.code != data.code:
# Always announce.
msg = '%s: %s has changed code to \002%s\002' % (consignment.nick.encode('utf-8'), package, data.code)
self.irc.msg(target, msg)
package.code = data.code
session.add(package)
if package.last == None or data.date > package.last:
code = data.code
last = data.date
desc = data.desc
msg = '%s: %s %s - %s' % (consignment.nick.encode('utf-8'), package, last, desc)
if data.delivered:
session.delete(package)
msg += ' (Package delivered - tracking stopped)'
removed = True
else:
package.last = last
package.status = desc
session.add(package)
if announce or removed:
self.irc.msg(target, msg)
elif package.last != None and package.last < datetime.datetime.utcnow() - datetime.timedelta(hours = 24*30):
msg = '%s: Removing stale package %s' % (consignment.nick.encode('utf-8'), consignment)
self.irc.msg(target, msg)
session.delete(package)
removed = True
# empty consignment and 30 days old
if len(consignment.packages) == 0 and consignment.added < datetime.datetime.utcnow() - datetime.timedelta(hours = 24*30):
msg = '%s: Removing stale consignment %s' % (consignment.nick.encode('utf-8'), consignment)
self.irc.msg(target, msg)
session.delete(consignment)
if removed and len(consignment.packages) == 0:
msg = '%s: %s is no longer being tracked' % (consignment.nick.encode('utf-8'), consignment)
self.irc.msg(target, msg)
session.delete(consignment)
def lc_callback(self):
try:
session = Session()
consignments = session.query(Consignment).filter(and_(Consignment.server == self.irc.factory.server,
or_(Consignment.channel.in_(config.get(self.irc.factory.server, 'channels').split()), Consignment.channel == None)))
for row in consignments:
self.update_consignment(session, row)
session.commit()
except Exception:
import traceback
print 'Exception as %s' % datetime.datetime.now()
traceback.print_exc()
finally:
session.close()
if __name__ == '__main__':
import sys, ConfigParser, os
config = ConfigParser.ConfigParser()
config.read([os.path.expanduser('~/.fot')])
m = Module(None)
for code in sys.argv[1:]:
tm = get_tracking_module(code)
tracking = tm.track(code)
if tracking:
for track in tracking:
if type(track) == list:
print '\n'.join(track)
else:
print track
# vim: noet ts=4