Source code for pyrate.utils

import datetime
from geographiclib.geodesic import Geodesic
from geopy.distance import distance

[docs]def valid_mmsi(mmsi): """Checks if a given MMSI number is valid. Arguments --------- mmsi : int An MMSI number Returns ------- Returns True if the MMSI number is 9 digits long. """ return not mmsi is None and len(str(int(mmsi))) == 9
VALID_MESSAGE_IDS = range(1, 28)
[docs]def valid_message_id(message_id): return message_id in VALID_MESSAGE_IDS
VALID_NAVIGATIONAL_STATUSES = set([0, 1, 2, 3, 4, 5, 6, 7, 8, 11, 12, 14, 15])
[docs]def valid_navigational_status(status): return status in VALID_NAVIGATIONAL_STATUSES
[docs]def valid_longitude(lon): """Check valid longitude. Arguments --------- lon : integer A longitude Returns ------- True if the longitude is valid """ return lon != None and lon >= -180 and lon <= 180
[docs]def valid_latitude(lat): """Check valid latitude. Arguments --------- lon : integer A latitude Returns ------- True if the latitude is valid """ return lat != None and lat >= -90 and lat <= 90
[docs]def valid_imo(imo=0): """Check valid IMO using checksum. Arguments --------- imo : integer An IMO ship identifier Returns ------- True if the IMO number is valid Notes ----- Taken from Eoin O'Keeffe's `checksum_valid` function in pyAIS """ try: str_imo = str(int(imo)) if len(str_imo) != 7: return False sum_val = 0 for ii, chk in enumerate(range(7, 1, -1)): sum_val += chk * int(str_imo[ii]) if str_imo[6] == str(sum_val)[len(str(sum_val))-1]: return True except: return False return False
[docs]def is_valid_sog(sog): """Validates speed over ground Arguments --------- sog : float Speed over ground Returns ------- True if speed over ground is greater than zero and less than 102.2 """ return sog >= 0 and sog <= 102.2
[docs]def is_valid_cog(cog): """Validates course over ground Arguments --------- cog : float Course over ground Returns ------- True if course over ground is greater than zero and less than 360 degrees """ return cog >= 0 and cog < 360
[docs]def is_valid_heading(heading): """Validates heading Arguments --------- heading : float The heading of the ship in degrees Returns ------- True if heading is greater than zero and less than 360 degrees """ try: return (heading >= 0 and heading < 360) or heading == 511 except: #getting errors on none type for heading return False
[docs]def speed_calc(msg_stream, index1, index2): """Computes the speed between two messages in the message stream Parameters ---------- msg_stream : A list of dictionaries representing AIS messages for a single MMSI number. Dictionary keys correspond to the column names from the `ais_clean` table. The list of messages should be ordered by timestamp in ascending order. index1 : int The index of the first message index2 : int The index of the second message Returns ------- timediff : datetime The difference in time between the two messages in datetime dist : float The distance between messages in nautical miles speed : float The speed in knots """ timediff = abs(msg_stream[index2]['Time'] - msg_stream[index1]['Time']) try: dist = distance((msg_stream[index1]['Latitude'], msg_stream[index1]['Longitude']),\ (msg_stream[index2]['Latitude'], msg_stream[index2]['Longitude'])).m except ValueError: dist = Geodesic.WGS84.Inverse(msg_stream[index1]['Latitude'], msg_stream[index1]['Longitude'],\ msg_stream[index2]['Latitude'], msg_stream[index2]['Longitude'])['s12'] #in metres if timediff > datetime.timedelta(0): convert_metres_to_nautical_miles = 0.0005399568 speed = (dist * convert_metres_to_nautical_miles) / (timediff.days * 24 + timediff.seconds / 3600) else: speed = 102.2 return timediff, dist, speed
[docs]def detect_location_outliers(msg_stream, as_df=False): """Detects outlier messages by submitting messages to a speed test The algorithm proceeds as follows: #. Create a linked list of all messages with non-null locations (pointing to next message) #. Loop through linked list and check for location outliers: * A location outlier is who does not pass the speed test (<= 50kn; link is 'discarded' when not reached in time) * No speed test is performed when: * distance too small (< 0.054nm ~ 100m; catches most positioning inaccuracies) => no outlier * time gap too big (>= 215h ~ 9d; time it takes to get anywhere on the globe at 50kn not respecting land) => next message is new 'start' If an alledged outlier is found its link is set to be the current message's link #. The start of a linked list becomes special attention: if speed check fails, the subsequent link is tested Line of thinking is: Can I get to the next message in time? If not 'next' must be an outlier, go to next but one. Parameters ---------- msg_stream : A list of dictionaries representing AIS messages for a single MMSI number. Dictionary keys correspond to the column names from the `ais_clean` table. The list of messages should be ordered by timestamp in ascending order. as_df : bool, optional Set to True if `msg_stream` are passed as a pandas DataFrame Returns ------- outlier_rows : The rows in the message stream which are outliers """ if as_df: raise NotImplementedError('msg_stream cannot be DataFrame, as_df=True does not work yet.') # 1) linked list linked_rows = [None]*len(msg_stream) link = None has_location_count = 0 for row_index in reversed(range(len(msg_stream))): if msg_stream[row_index]['Longitude'] is not None and msg_stream[row_index]['Latitude'] is not None: linked_rows[row_index] = link link = row_index has_location_count = has_location_count + 1 # 2) outlier_rows = [False] * len(msg_stream) if has_location_count < 2: # no messages that could be outliers available return outlier_rows index = next((i for i, j in enumerate(linked_rows) if j), None) at_start = True while linked_rows[index] is not None: # time difference, distance and speed beetween rows of index and corresponding link timediff, dist, speed = speed_calc(msg_stream, index, linked_rows[index]) if timediff > datetime.timedelta(hours=215): index = linked_rows[index] at_start = True #restart elif dist < 100: index = linked_rows[index] #skip over gap (at_start remains same) elif speed > 50: if at_start is False: #for now just skip single outliers, i.e. test current index with next but one outlier_rows[linked_rows[index]] = True linked_rows[index] = linked_rows[linked_rows[index]] elif at_start is True and linked_rows[linked_rows[index]] is None: #no subsequent message outlier_rows[index] = True outlier_rows[linked_rows[index]] = True index = linked_rows[index] else: # explore first three messages A, B, C (at_start is True) indexA = index indexB = linked_rows[index] indexC = linked_rows[indexB] timediffAC, distAC, speedAC = speed_calc(msg_stream, indexA, indexC) timediffBC, distBC, speedBC = speed_calc(msg_stream, indexB, indexC) #if speedtest A->C ok or distance small, then B is outlier, next index is C if timediffAC <= datetime.timedelta(hours=215) and (distAC < 100 or speedAC <= 50): outlier_rows[indexB] = True index = indexC at_start = False #elif speedtest B->C ok or distance, then A is outlier, next index is C elif timediffBC <= datetime.timedelta(hours=215) and (distBC < 100 or speedBC <= 50): outlier_rows[indexA] = True index = indexC at_start = False #else bot not ok, dann A and B outlier, set C to new start else: outlier_rows[indexA] = True outlier_rows[indexB] = True index = indexC at_start = True else: #all good index = linked_rows[index] at_start = False return outlier_rows
[docs]def interpolate_passages(msg_stream): """Interpolate far apart points in an ordered stream of messages. Parameters ---------- msg_stream : A list of dictionaries representing AIS messages for a single MMSI number. Dictionary keys correspond to the column names from the `ais_clean` table. The list of messages should be ordered by timestamp in ascending order. Returns ------- artificial_messages : list A list of artificial messages to fill in gaps/navigate around land. """ artificial_messages = [] return artificial_messages