Source code for pyrate.repositories.aisdb

from pyrate.repositories import sql
import psycopg2
import logging
try:
    import pandas as pd
except ImportError:
    logging.warn("No pandas found")
    pd = None

EXPORT_COMMANDS = [('status', 'report status of this repository.'),
                   ('create', 'create the repository.'),
                   ('truncate', 'delete all data in this repository.'),
                   ('update', 'update the database schema')]

[docs]def load(options, readonly=False): return AISdb(options, readonly)
[docs]class AISdb(sql.PgsqlRepository): double_type = 'double precision' clean_db_spec = { 'cols': [ ('MMSI', 'integer'), ('Time', 'timestamp without time zone'), ('Message_ID', 'integer'), ('Navigational_status', 'integer'), ('SOG', double_type), ('Longitude', double_type), ('Latitude', double_type), ('COG', double_type), ('Heading', double_type), ('IMO', 'integer null'), ('Draught', double_type), ('Destination', 'character varying(255)'), ('Vessel_Name', 'character varying(255)'), ('ETA_month', 'integer'), ('ETA_day', 'integer'), ('ETA_hour', 'integer'), ('ETA_minute', 'integer'), ('source', 'smallint'), ('ID', 'BIGSERIAL PRIMARY KEY') ], 'indices': [ ('dt_idx', ['Time']), ('imo_idx', ['IMO']), ('lonlat_idx', ['Longitude', 'Latitude']), ('mmsi_idx', ['MMSI']), ('msg_idx', ['Message_ID']), ('source_idx', ['source']), ('mmsi_imo_idx', ['MMSI','IMO']) ] } dirty_db_spec = { 'cols': [ ('MMSI', 'bigint'), ('Time', 'timestamp without time zone'), ('Message_ID', 'integer'), ('Navigational_status', 'integer'), ('SOG', double_type), ('Longitude', double_type), ('Latitude', double_type), ('COG', double_type), ('Heading', double_type), ('IMO', 'integer null'), ('Draught', double_type), ('Destination', 'character varying(255)'), ('Vessel_Name', 'character varying(255)'), ('ETA_month', 'integer'), ('ETA_day', 'integer'), ('ETA_hour', 'integer'), ('ETA_minute', 'integer'), ('source', 'smallint'), ('ID', 'BIGSERIAL PRIMARY KEY') ], 'indices': [ ('dt_idx', ['Time']), ('imo_idx', ['IMO']), ('lonlat_idx', ['Longitude', 'Latitude']), ('mmsi_idx', ['MMSI']), ('msg_idx', ['Message_ID']), ('source_idx', ['source']), ('mmsi_imo_idx', ['MMSI','IMO']) ] } sources_db_spec = { 'cols': [ ('ID', 'SERIAL PRIMARY KEY'), ('timestamp', 'timestamp without time zone DEFAULT now()'), ('filename', 'TEXT'), ('ext', 'TEXT'), ('invalid', 'integer'), ('clean', 'integer'), ('dirty', 'integer'), ('source', 'integer') ] } imolist_db_spec = { 'cols': [ ('mmsi', 'integer NOT NULL'), ('imo', 'integer NULL'), ('first_seen', 'timestamp without time zone'), ('last_seen', 'timestamp without time zone') ], 'constraint': ['CONSTRAINT imo_list_key UNIQUE (mmsi, imo)'] } clean_imo_list = { 'cols': imolist_db_spec['cols'], 'constraint': ['CONSTRAINT imo_list_pkey PRIMARY KEY (mmsi, imo)'] } action_log_spec = { 'cols': [ ('timestamp', 'timestamp without time zone DEFAULT now()'), ('action', 'TEXT'), ('mmsi', 'integer NOT NULL'), ('ts_from', 'timestamp without time zone'), ('ts_to', 'timestamp without time zone'), ('count', 'integer NULL') ], 'indices': [ ('ts_idx', ['timestamp']), ('action_idx', ['action']), ('mmsi_idx', ['mmsi']) ], 'constraint': ['CONSTRAINT action_log_pkey PRIMARY KEY (timestamp, action, mmsi)'] } def __init__(self, options, readonly=False): super(AISdb, self).__init__(options, readonly) self.clean = sql.Table(self, 'ais_clean', self.clean_db_spec['cols'], self.clean_db_spec['indices']) self.dirty = sql.Table(self, 'ais_dirty', self.dirty_db_spec['cols'], self.dirty_db_spec['indices']) self.sources = sql.Table(self, 'ais_sources', self.sources_db_spec['cols']) self.imolist = sql.Table(self, 'imo_list', self.imolist_db_spec['cols'], constraint=self.imolist_db_spec['constraint']) if self.postgis=='yes': self.extended = AISExtendedTable(self) self.clean_imolist = sql.Table(self, 'imo_list_clean', self.clean_imo_list['cols'], constraint=self.clean_imo_list['constraint']) self.action_log = sql.Table(self, 'action_log', self.action_log_spec['cols'], self.action_log_spec['indices'], constraint=self.action_log_spec['constraint']) if self.postgis=='yes': self.tables = [self.clean, self.dirty, self.sources, self.imolist, self.extended, self.clean_imolist, self.action_log] else: self.tables = [self.clean, self.dirty, self.sources, self.imolist, self.clean_imolist, self.action_log]
[docs] def status(self): print("Status of PGSql database "+ self.db +":") for tb in self.tables: s = tb.status() if s >= 0: print("Table {}: {} rows.".format(tb.get_name(), s)) else: print("Table {}: not yet created.".format(tb.get_name()))
[docs] def create(self): """Create the tables for the AIS data.""" for tb in self.tables: tb.create()
[docs] def truncate(self): """Delete all data in the AIS table.""" for tb in self.tables: tb.truncate()
[docs] def update(self): """Updates (non-destructively) existing tables to new schema """ for db in [self.clean, self.dirty]: table_name = db.get_name() sql = """ALTER TABLE {} ALTER COLUMN id SET DATA TYPE BIGINT;""".format(table_name) with self.conn.cursor() as cur: logging.debug("Updating the database schema for table {}".format(table_name)) logging.debug(cur.mogrify(sql)) try: cur.execute(sql) self.conn.commit() except psycopg2.ProgrammingError as error: logging.error("Error updating database schema for table {}".format(table_name)) logging.error(error.pgerror)
[docs] def ship_info(self, imo): with self.conn.cursor() as cur: cur.execute("select vessel_name, MIN(time), MAX(time) from ais_clean where message_id = 5 and imo = %s GROUP BY vessel_name", [imo]) for row in cur: print("Vessel: {} ({} - {})".format(*row)) cur.execute("select mmsi, first_seen, last_seen from imo_list where imo = %s", [imo]) for row in cur: print("MMSI = {} ({} - {})".format(*row))
[docs] def get_messages_for_vessel(self, imo, from_ts=None, to_ts=None, use_clean_db=False, as_df=False): if use_clean_db: imo_list = self.imolist else: imo_list = self.clean_imolist where = ["imo = {}"] params = [imo] #Amended EOK - no time field in this table # if not from_ts is None: # where.append("time >= {}") # params.append(from_ts) # if not to_ts is None: # where.append("time <= {}") # params.append(to_ts) with self.conn.cursor() as cur: cur.execute("select mmsi, first_seen, last_seen from {} where {}".format(imo_list.name, ' AND '.join(where)).format(*params)) msg_stream = None # get data for each of this ship's mmsi numbers, and concat for mmsi, first, last in cur: stream = self.get_message_stream(mmsi, from_ts=first, to_ts=last, use_clean_db=use_clean_db, as_df=as_df) if msg_stream is None: msg_stream = stream else: msg_stream = msg_stream + stream return msg_stream
[docs] def get_message_stream(self, mmsi, from_ts=None, to_ts=None, use_clean_db=False, as_df=False): """Gets the stream of messages for the given mmsi, ordered by timestamp ascending""" # construct db query if use_clean_db: db = self.clean else: db = self.extended where = ["mmsi = %s"] params = [mmsi] if not from_ts is None: where.append("time >= %s") params.append(from_ts) if not to_ts is None: where.append("time <= %s") params.append(to_ts) cols_list = ','.join([c[0].lower() for c in db.cols]) where_clause = ' AND '.join(where) sql = "SELECT {} FROM {} WHERE {} ORDER BY time ASC".format(cols_list, db.get_name(), where_clause) if as_df: if pd is None: raise RuntimeError("Pandas not found, cannot create dataframe") # create pandas dataframe with self.conn.cursor() as cur: full_sql = cur.mogrify(sql, params).decode('ascii') return pd.read_sql(full_sql, self.conn, index_col='time', parse_dates=['time']) else: with self.conn.cursor() as cur: cur.execute(sql, params) msg_stream = [] # convert tuples from db cursor into dicts for row in cur: message = {} for i, col in enumerate(db.cols): message[col[0]] = row[i] msg_stream.append(message) return msg_stream
[docs]class AISExtendedTable(sql.Table): def __init__(self, db): super(AISExtendedTable, self).__init__(db, 'ais_extended', AISdb.clean_db_spec['cols'] + [('location', 'geography(POINT, 4326)')], AISdb.clean_db_spec['indices'])
[docs] def create(self): with self.db.conn.cursor() as cur: cur.execute("CREATE EXTENSION IF NOT EXISTS postgis") super(AISExtendedTable, self).create() with self.db.conn.cursor() as cur: # trigger for GIS location generation try: cur.execute("""CREATE OR REPLACE FUNCTION location_insert() RETURNS trigger AS ' BEGIN NEW."location" := ST_SetSRID(ST_MakePoint(NEW.longitude, NEW.latitude),4326); RETURN NEW; END; ' LANGUAGE plpgsql; CREATE TRIGGER {0}_gis_insert BEFORE INSERT OR UPDATE ON {0} FOR EACH ROW EXECUTE PROCEDURE location_insert(); """.format(self.name)) except psycopg2.ProgrammingError: logging.info("{}_gis_insert already exists".format(self.name)) self.db.conn.rollback() self.db.conn.commit()
[docs] def create_indices(self): with self.db.conn.cursor() as cur: idxn = self.name.lower() + "_location_idx" try: logging.info("CREATING GIST INDEX "+ idxn + " on table "+ self.name) cur.execute("CREATE INDEX \""+ idxn +"\" ON \"" + self.name +"\" USING GIST(\"location\")") except psycopg2.ProgrammingError: logging.info("Index "+ idxn +" already exists") self.db.conn.rollback() super(AISExtendedTable, self).create_indices()
[docs] def drop_indices(self): with self.db.conn.cursor() as cur: tbl = self.name idxn = tbl.lower() + "_location_idx" logging.info("Dropping index: "+ idxn + " on table "+ tbl) cur.execute("DROP INDEX IF EXISTS \""+ idxn +"\"") super(AISExtendedTable, self).drop_indices()