Source code for pyrate.algorithms.aisparser

""" Parses the AIS data from csv of xml files and populates the AIS database
"""

import os
import csv
import logging
import queue
import threading
import time
import sys
from datetime import datetime
from xml.etree import ElementTree
from pyrate import utils

EXPORT_COMMANDS = [('run', 'parse messages from csv into the database.')]
# Repository used for input to the algorithm
INPUTS = ["aiscsv"]
# Repositories used for output from the algorithm
OUTPUTS = ["aisdb", "baddata"]

[docs]def parse_timestamp(s): return datetime.strptime(s, '%Y%m%d_%H%M%S')
[docs]def int_or_null(s): if len(s) == 0: return None else: return int(s)
[docs]def float_or_null(s): if len(s) == 0 or s == 'None': return None else: return float(s)
[docs]def imostr(s): if len(s) > 20: return None return s
[docs]def longstr(s): if len(s) > 255: return s.substring(0, 254) return s
[docs]def set_null_on_fail(row, col, test): """ Helper function which sets the column in a row of data to null on fail Arguments --------- row : dict A dictionary of the fields col : str The column to check test : func One of the validation functions in pyrate.utils """ if not row[col] == None and not test(row[col]): row[col] = None
[docs]def check_imo(imo): return imo is None or utils.valid_imo(imo)
# column name constants MMSI = 'MMSI' TIME = 'Time' MESSAGE_ID = 'Message_ID' NAV_STATUS = 'Navigational_status' SOG = 'SOG' LONGITUDE = 'Longitude' LATITUDE = 'Latitude' COG = 'COG' HEADING = 'Heading' IMO = 'IMO' DRAUGHT = 'Draught' DEST = 'Destination' VESSEL_NAME = 'Vessel_Name' ETA_MONTH = 'ETA_month' ETA_DAY = 'ETA_day' ETA_HOUR = 'ETA_hour' ETA_MINUTE = 'ETA_minute' # specifies columns to take from raw data, and functions to convert them into # suitable type for the database. AIS_CSV_COLUMNS = [MMSI, TIME, MESSAGE_ID, NAV_STATUS, SOG, LONGITUDE, LATITUDE, COG, HEADING, IMO, DRAUGHT, DEST, VESSEL_NAME, ETA_MONTH, ETA_DAY, ETA_HOUR, ETA_MINUTE] # xml names differ from csv. This array describes the names in this file which # correspond to the csv column names AIS_XML_COLNAMES = [ 'mmsi', 'date_time', 'msg_type', 'nav_status', 'sog', 'lon', 'lat', 'cog', 'heading', 'imo', 'draught', 'destination', 'vessel_name', 'eta_month', 'eta_day', 'eta_hour', 'eta_minute']
[docs]def xml_name_to_csv(name): """Converts a tag name from an XML file to the corresponding name from CSV.""" return AIS_CSV_COLUMNS[AIS_XML_COLNAMES.index(name)]
[docs]def parse_raw_row(row): """Parse values from row, returning a new dict with converted values Parse values from row, returning a new dict with converted values converted into appropriate types. Throw an exception to reject row Arguments --------- row : dict A dictionary of headers and values from the csv file Returns ------- converted_row : dict A dictionary of headers and values converted using the helper functions """ converted_row = {} converted_row[MMSI] = int_or_null(row[MMSI]) converted_row[TIME] = parse_timestamp(row[TIME]) converted_row[MESSAGE_ID] = int_or_null(row[MESSAGE_ID]) converted_row[NAV_STATUS] = int_or_null(row[NAV_STATUS]) converted_row[SOG] = float_or_null(row[SOG]) converted_row[LONGITUDE] = float_or_null(row[LONGITUDE]) converted_row[LATITUDE] = float_or_null(row[LATITUDE]) converted_row[COG] = float_or_null(row[COG]) converted_row[HEADING] = float_or_null(row[HEADING]) converted_row[IMO] = int_or_null(row[IMO]) converted_row[DRAUGHT] = float_or_null(row[DRAUGHT]) converted_row[DEST] = longstr(row[DEST]) converted_row[VESSEL_NAME] = longstr(row[VESSEL_NAME]) converted_row[ETA_MONTH] = int_or_null(row[ETA_MONTH]) converted_row[ETA_DAY] = int_or_null(row[ETA_DAY]) converted_row[ETA_HOUR] = int_or_null(row[ETA_HOUR]) converted_row[ETA_MINUTE] = int_or_null(row[ETA_MINUTE]) return converted_row
CONTAINS_LAT_LON = set([1, 2, 3, 4, 9, 11, 17, 18, 19, 21, 27])
[docs]def validate_row(row): # validate MMSI, message_id and IMO if not utils.valid_mmsi(row[MMSI]) \ or not utils.valid_message_id(row[MESSAGE_ID]) \ or not check_imo(row[IMO]): raise ValueError("Row invalid") # check lat long for messages which should contain it if row[MESSAGE_ID] in CONTAINS_LAT_LON: if not (utils.valid_longitude(row[LONGITUDE]) and \ utils.valid_latitude(row[LATITUDE])): raise ValueError("Row invalid (lat,lon)") # otherwise set them to None else: row[LONGITUDE] = None row[LATITUDE] = None # validate other columns set_null_on_fail(row, NAV_STATUS, utils.valid_navigational_status) set_null_on_fail(row, SOG, utils.is_valid_sog) set_null_on_fail(row, COG, utils.is_valid_cog) set_null_on_fail(row, HEADING, utils.is_valid_heading) return row
[docs]def get_data_source(name): """Guesses data source from file name. If the name contains 'terr' then we guess terrestrial data, otherwise we assume satellite. Arguments ========= name : str File name Returns ======= int 0 if satellite, 1 if terrestrial """ if name.find('terr') != -1: # terrestrial return 1 else: # assume satellite return 0
[docs]def run(inp, out, dropindices=True, source=0): """Populate the AIS_Raw database with messages from the AIS csv files Arguments --------- inp : str The name of the repositor(-y/-ies) as defined in the global variable `INPUTS` out : str The name of the repositor(-y/-ies) as defined in the global variable `OUTPUTS` dropindices : bool, optional, default=True Drop indexes for faster insert source : int, optional, default=0 Indicates terrestrial (1) or satellite data (0) Returns ------- """ files = inp['aiscsv'] db = out['aisdb'] log = out['baddata'] # drop indexes for faster insert if dropindices: db.clean.drop_indices() db.dirty.drop_indices() def sqlworker(q, table): """ Worker thread Takes batches of tuples from the queue to be inserted into the database """ while True: msgs = [q.get()] while not q.empty(): msgs.append(q.get(timeout=0.5)) n = len(msgs) if n > 0: #logging.debug("Inserting {} rows into {}".format(n, table.name)) try: table.insert_rows_batch(msgs) except Exception as e: logging.warning("Error executing query: "+ repr(e)) # mark this task as done for _ in range(n): q.task_done() # queue for messages to be inserted into db dirtyq = queue.Queue(maxsize=1000000) cleanq = queue.Queue(maxsize=1000000) # set up processing pipeline threads clean_thread = threading.Thread(target=sqlworker, daemon=True, args=(cleanq, db.clean)) dirty_thread = threading.Thread(target=sqlworker, daemon=True, args=(dirtyq, db.dirty)) dirty_thread.start() clean_thread.start() start = time.time() for fp, name, ext in files.iterfiles(): # check if we've already parsed this file with db.conn.cursor() as cur: cur.execute("SELECT COUNT(*) FROM " + db.sources.name + " WHERE filename = %s AND source = %s", [name, source]) if cur.fetchone()[0] > 0: logging.info("Already parsed "+ name +", skipping...") continue # parse file try: log_path = os.path.join(log.root, os.path.basename(name)) invalid_ctr, clean_ctr, dirty_ctr, duration = parse_file(fp, name, ext, log_path, cleanq, dirtyq, source=source) dirtyq.join() cleanq.join() db.sources.insert_row({'filename': name, 'ext': ext, 'invalid': invalid_ctr, 'clean': clean_ctr, 'dirty': dirty_ctr, 'source': source}) db.conn.commit() logging.info("Completed " + name + ": %d clean, %d dirty, %d invalid messages, %fs", clean_ctr, dirty_ctr, invalid_ctr, duration) except RuntimeError as error: logging.warn("Error parsing file %s: %s", name, repr(error)) db.conn.rollback() # wait for queued tasks to finish dirtyq.join() cleanq.join() db.conn.commit() logging.info("Parsing complete, time elapsed = %fs", time.time() - start) if dropindices: start = time.time() logging.info("Rebuilding table indices...") db.clean.create_indices() db.dirty.create_indices() logging.info("Finished building indices, time elapsed = %fs", time.time() - start)
[docs]def parse_file(fp, name, ext, baddata_logfile, cleanq, dirtyq, source=0): """ Parses a file containing AIS data, placing rows of data onto queues Arguments --------- fp : str Filepath of file to be parsed name : str Name of file to be parsed ext : str Extension, either '.csv' or '.xml' baddata_logfile : str Name of the logfile cleanq : Queue for messages to be inserted into clean table dirtyq : Queue for messages to be inserted into dirty table source : int, optional, default=0 0 is satellite, 1 is terrestrial Returns ------- invalid_ctr : int Number of invalid rows clean_ctr : int Number of clean rows dirty_ctr : int Number of dirty rows time_elapsed : time The time elapsed since starting the parse_file procedure """ filestart = time.time() logging.info("Parsing "+ name) # open error log csv file and write header with open(baddata_logfile, 'w') as errorlog: logwriter = csv.writer(errorlog, delimiter=',', quotechar='"') # message counters clean_ctr = 0 dirty_ctr = 0 invalid_ctr = 0 # Select the a file iterator based on file extension if ext == '.csv': iterator = readcsv elif ext == '.xml': iterator = readxml else: raise RuntimeError("Cannot parse file with extension %s"% ext) # infer the data source from the file name #source = get_data_source(name) # parse and iterate lines from the current file for row in iterator(fp): converted_row = {} try: # parse raw data converted_row = parse_raw_row(row) converted_row['source'] = source except ValueError as e: # invalid data in row. Write it to error log if not 'raw' in row: row['raw'] = [row[c] for c in AIS_CSV_COLUMNS] logwriter.writerow(row['raw'] + ["{}".format(e)]) invalid_ctr = invalid_ctr + 1 continue except KeyError: # missing data in row. if not 'raw' in row: row['raw'] = [row[c] for c in AIS_CSV_COLUMNS] logwriter.writerow(row['raw'] + ["Bad row length"]) invalid_ctr = invalid_ctr + 1 continue # validate parsed row and add to appropriate queue try: validated_row = validate_row(converted_row) cleanq.put(validated_row) clean_ctr = clean_ctr + 1 except ValueError: dirtyq.put(converted_row) dirty_ctr = dirty_ctr + 1 if invalid_ctr == 0: os.remove(baddata_logfile) return (invalid_ctr, clean_ctr, dirty_ctr, time.time() - filestart)
[docs]def readcsv(fp): """ Returns a dictionary of the subset of columns required Reads each line in CSV file, checks if all columns are available, and returns a dictionary of the subset of columns required (as per AIS_CSV_COLUMNS). If row is invalid (too few columns), returns an empty dictionary. Arguments --------- fp : str File path Yields ------ rowsubset : dict A dictionary of the subset of columns as per `columns` """ # fix for large field error. Specify max field size to the maximum convertable int value. # source: http://stackoverflow.com/questions/15063936/csv-error-field-larger-than-field-limit-131072 max_int = sys.maxsize decrement = True while decrement: # decrease the max_int value by factor 10 # as long as the OverflowError occurs. decrement = False try: csv.field_size_limit(max_int) except OverflowError: max_int = int(max_int/10) decrement = True # First line is column headers. # Use to extract indices of columns we are extracting cols = fp.readline().rstrip('\r\n').split(',') indices = {} n_cols = len(cols) try: for col in AIS_CSV_COLUMNS: indices[col] = cols.index(col) except Exception as e: raise RuntimeError("Missing columns in file header: {}".format(e)) try: for row in csv.reader(fp, delimiter=',', quotechar='"'): rowsubset = {} rowsubset['raw'] = row if len(row) == n_cols: for col in AIS_CSV_COLUMNS: try: rowsubset[col] = row[indices[col]] # raw column data except IndexError: # not enough columns, just blank missing data. rowsubset[col] = '' yield rowsubset except UnicodeDecodeError as e: raise RuntimeError(e) except csv.Error as e: raise RuntimeError(e)
[docs]def readxml(fp): current = _empty_row() # iterate xml 'end' events for _, elem in ElementTree.iterparse(fp): # end of aismessage if elem.tag == 'aismessage': yield current current = _empty_row() else: if elem.tag in AIS_XML_COLNAMES and elem.text != None: current[xml_name_to_csv(elem.tag)] = elem.text
def _empty_row(): row = {} for col in AIS_CSV_COLUMNS: row[col] = '' return row