Source code for pyrate.algorithms.vesselimporter

""" Extracts a subset of clean ships into ais_extended tables

"""
import logging
import time
import threading
import psycopg2
import queue
from pyrate.utils import interpolate_passages, valid_imo, detect_location_outliers

EXPORT_COMMANDS = [('run', 'Extract a subset of clean ships into ais_extended tables')]
INPUTS = []
OUTPUTS = ['aisdb']

[docs]def run(inp, out, n_threads=2, dropindices=False): aisdb = out['aisdb'] valid_imos, imo_mmsi_intervals = filter_good_ships(aisdb) logging.info("Got %d valid IMO numbers, using %d MMSI numbers", len(valid_imos), len(imo_mmsi_intervals)) # pre filter intervals def filter_intervals(interval): remain = get_remaining_interval(aisdb, *interval) if not remain is None: return [interval[0], interval[1], remain[0], remain[1]] else: return None filtered_intervals = map(filter_intervals, imo_mmsi_intervals) # sorting intervals by MMSI improves performance on a clustered table sorted_intervals = sorted(filter(None, filtered_intervals), key=lambda x: x[0]) logging.info("%d intervals to import", len(sorted_intervals)) # insert ships into extended table. if len(sorted_intervals) > 0: if dropindices: aisdb.extended.drop_indices() generate_extended_table(aisdb, sorted_intervals, n_threads=n_threads) if dropindices: aisdb.extended.create_indices() logging.info("Vessel importer done.")
[docs]def filter_good_ships(aisdb): """Generate a set of imo numbers and (mmsi, imo) validity intervals Generate a set of imo numbers and (mmsi, imo) validity intervals for ships which are deemed to be 'clean'. A clean ship is defined as one which: * Has valid MMSI numbers associated with it. * For each MMSI number, the period of time it is associated with this IMO (via message number 5) overlaps with the period the MMSI number was in use. * For each MMSI number, its usage period does not overlap with that of any other of this ship's MMSI numbers. * That none of these MMSI numbers have been used by another ship (i.e. another IMO number is also associated with this MMSI) Returns ======= valid_imos : A set of valid imo numbers imo_mmsi_intervals : A list of (mmsi, imo, start, end) tuples, describing the validity intervals of each (mmsi, imo) pair """ with aisdb.conn.cursor() as cur: cur.execute("SELECT distinct imo from {}".format(aisdb.imolist.get_name())) imo_list = [row[0] for row in cur.fetchall() if valid_imo(row[0])] logging.info("Checking %d IMOs", len(imo_list)) valid_imos = [] imo_mmsi_intervals = [] for imo in imo_list: cur.execute("""select a.mmsi, a.imo, (a.first_seen, a.last_seen) overlaps (b.first_seen, b.last_seen), LEAST(a.first_seen, b.first_seen), GREATEST(a.last_seen, b.last_seen) from imo_list as a join imo_list as b on a.mmsi = b.mmsi and b.imo is null where a.imo = %s ORDER BY LEAST(a.first_seen, b.first_seen) ASC""", [imo]) mmsi_ranges = cur.fetchall() if len(mmsi_ranges) == 0: #logging.info("No MMSI numbers for IMO %s", imo) continue valid = True last_end = None for mmsi, _, overlap, start, end in mmsi_ranges: if not overlap: valid = False #logging.info("(%s, %s) does not overlap (%s, _)", mmsi, imo, mmsi) break if last_end != None and start < last_end: valid = False #logging.info("IMO: %s, overlapping MMSI intervals", imo) break last_end = end if valid: # check for other users of this mmsi number mmsi_list = [row[0] for row in mmsi_ranges] cur.execute("""select a.mmsi, a.imo, b.imo from imo_list as a join imo_list as b on a.mmsi = b.mmsi and a.imo < b.imo where a.mmsi IN ({})""".format(','.join(['%s' for i in mmsi_list])), mmsi_list) if cur.rowcount == 0: # yay its valid! valid_imos.append(imo) for mmsi, _, _, start, end in mmsi_ranges: imo_mmsi_intervals.append([mmsi, imo, start, end]) else: pass #logging.info("IMO: %s, reuse of MMSI", imo) return (valid_imos, imo_mmsi_intervals)
[docs]def cluster_table(aisdb, table): """Performs a clustering of the postgresql table on the MMSI index. This process significantly improves the runtime of extended table generation. """ with aisdb.conn.cursor() as cur: index_name = table.name.lower() + "_mmsi_idx" logging.info("Clustering table %s on index %s. This may take a while...", table.name, index_name) cur.execute("CLUSTER {} USING {}".format(table.name, index_name))
[docs]def generate_extended_table(aisdb, intervals, n_threads=2): logging.info("Inserting %d squeaky clean MMSIs", len(intervals)) start = time.time() interval_q = queue.Queue() for interval in sorted(intervals, key=lambda x: x[0]): interval_q.put(interval) pool = [threading.Thread(target=interval_copier, daemon=True, args=(aisdb.options, interval_q)) for i in range(n_threads)] [t.start() for t in pool] total = len(intervals) remain = total start = time.time() while not interval_q.empty(): q_size = interval_q.qsize() if remain > q_size: logging.info("%d/%d MMSIs completed, %f/s.", total - q_size, total, (total - q_size) / (time.time() - start)) remain = q_size time.sleep(5) interval_q.join()
[docs]def interval_copier(db_options, interval_q): from pyrate.repositories import aisdb as db aisdb = db.load(db_options) logging.debug("Start interval copier task") with aisdb: while not interval_q.empty(): interval = interval_q.get() process_interval_series(aisdb, interval) interval_q.task_done()
[docs]def process_interval_series(aisdb, interval): mmsi, imo, start, end = interval t_start = time.time() # constrain interval based on previous import remaining_work = get_remaining_interval(aisdb, mmsi, imo, start, end) if remaining_work is None: #logging.info("Interval was already inserted: (%s, %s, %s, %s)", mmsi, imo, start, end) return 0 else: start, end = remaining_work # get data for this interval range msg_stream = aisdb.get_message_stream(mmsi, from_ts=start, to_ts=end, use_clean_db=True) row_count = len(msg_stream) if row_count == 0: logging.warning("No rows to insert for interval %s", interval) return 0 insert_message_stream(aisdb, [mmsi, imo, start, end], msg_stream) # finished, commit aisdb.conn.commit() logging.debug("Inserted %d rows for MMSI %d. (%fs)", row_count, mmsi, time.time() - t_start) return row_count
[docs]def insert_message_stream(aisdb, interval, msg_stream): """Takes a stream of messages for an MMSI over an interval, runs it through outlier detection and interpolation algorithms, then inserts the resulting stream into the ais_extended table.""" mmsi, imo, start, end = interval # call the message filter valid, invalid = detect_location_outliers(msg_stream) artificial = interpolate_passages(valid) aisdb.extended.insert_rows_batch(valid + artificial) # mark the work we've done aisdb.action_log.insert_row({'action': "import", 'mmsi': mmsi, 'ts_from': start, 'ts_to': end, 'count': len(valid)}) aisdb.action_log.insert_row({'action': "outlier detection (noop)", 'mmsi': mmsi, 'ts_from': start, 'ts_to': end, 'count': len(invalid)}) aisdb.action_log.insert_row({'action': "interpolation (noop)", 'mmsi': mmsi, 'ts_from': start, 'ts_to': end, 'count': len(artificial)}) upsert_interval_to_imolist(aisdb, mmsi, imo, start, end)
[docs]def get_remaining_interval(aisdb, mmsi, imo, start, end): with aisdb.conn.cursor() as cur: try: cur.execute("SELECT tsrange(%s, %s) - tsrange(%s, %s) * tsrange(first_seen - interval '1 second', last_seen + interval '1 second') FROM {} WHERE mmsi = %s AND imo = %s".format(aisdb.clean_imolist.name), [start, end, start, end, mmsi, imo]) row = cur.fetchone() if not row is None: sub_interval = row[0] if sub_interval.isempty: return None else: return sub_interval.lower, sub_interval.upper else: return (start, end) except psycopg2.Error as e: logging.warning("Error calculating timetamp intersection for MMSI %d: %s", mmsi, e) return None
[docs]def upsert_interval_to_imolist(aisdb, mmsi, imo, start, end): with aisdb.conn.cursor() as cur: cur.execute("SELECT COUNT(*) FROM {} WHERE mmsi = %s AND imo = %s" .format(aisdb.clean_imolist.name), [mmsi, imo]) count = cur.fetchone()[0] if count == 1: cur.execute("""UPDATE {} SET first_seen = LEAST(first_seen, %s), last_seen = GREATEST(last_seen, %s) WHERE mmsi = %s AND imo = %s""".format(aisdb.clean_imolist.name), [start, end, mmsi, imo]) elif count == 0: aisdb.clean_imolist.insert_row({'mmsi': mmsi, 'imo': imo, 'first_seen': start, 'last_seen': end})