Source code for pyrate.algorithms.imolist

import logging
import time

EXPORT_COMMANDS = [('run', 'create or update the imo list table.')]
INPUTS = []
OUTPUTS = ["aisdb"]

[docs]def run(_, out): create_imo_list(out['aisdb'])
[docs]def create_imo_list(aisdb): """Create the imo list table from MMSI, IMO pairs in clean and dirty tables. This method collects the unique MMSI, IMO pairs from a table, and the time intervals over-which they have been seen in the data. These tuples are then upserted into the `imo_list` table. Removes cases where ships have clashing MMSI numbers within a time threshold. On the clean table pairs with no IMO number are also collected to get the activity intervals of MMSI numbers. On the dirty table only messages specifying an IMO are collected. Arguments --------- aisdb : postgresdb The database upon which to operate """ with aisdb.conn.cursor() as cur: start = time.time() # collect existing set of mmsi, imo tuples in imo_list cur.execute("SELECT mmsi, imo FROM {}".format(aisdb.imolist.get_name())) existing_tuples = set(cur.fetchall()) logging.info("Existing mmsi, imo pairs = %d (%fs)", len(existing_tuples), time.time()-start) # query for mmsi, imo, interval tuples from clean db, and then upsert into # imo_list table. logging.info("Getting mmsi, imo pairs from clean db") start = time.time() cur.execute("SELECT mmsi, imo, MIN(time), MAX(time) FROM {} GROUP BY mmsi, imo".format(aisdb.clean.get_name())) logging.info("Got new mmsi, imo pairs list (%fs)", time.time()-start) _upsert_imo_tuples(aisdb, cur, existing_tuples) # query for mmsi, imo, interval tuples from dirty db, and then upsert into # imo_list table. logging.info("Getting mmsi, imo pairs from dirty db") start = time.time() cur.execute("SELECT mmsi, imo, MIN(time), MAX(time) FROM {} WHERE message_id = 5 GROUP BY mmsi, imo".format(aisdb.dirty.get_name())) logging.info("Got new mmsi, imo pairs list (%fs)", time.time()-start) _upsert_imo_tuples(aisdb, cur, existing_tuples) aisdb.conn.commit()
def _upsert_imo_tuples(aisdb, result_cursor, existing_tuples): """Inserts or updates rows in the imo_list table depending on the mmsi, imo pair's presence in the table. Arguments --------- result_cursor : An iterator of (mmsi, imo, start, end) tuples. existing_tuples : A set of (mmsi, imo) pairs which should be updated rather than inserted. """ with aisdb.conn.cursor() as insert_cur: start = time.time() insert_ctr = 0 update_ctr = 0 for mmsi, imo, min_time, max_time in result_cursor: if (mmsi, imo) in existing_tuples: insert_cur.execute("""UPDATE {} SET first_seen = LEAST(first_seen, %s), last_seen = GREATEST(last_seen, %s) WHERE mmsi = %s AND imo = %s""".format(aisdb.imolist.get_name()), [min_time, max_time, mmsi, imo]) update_ctr = update_ctr + insert_cur.rowcount else: insert_cur.execute("INSERT INTO {} (mmsi, imo, first_seen, last_seen) VALUES (%s,%s,%s,%s)".format(aisdb.imolist.get_name()), [mmsi, imo, min_time, max_time]) insert_ctr = insert_ctr + 1 logging.info("Inserted %d new rows, updated %d rows (%f)", insert_ctr, update_ctr, time.time()-start)