Implementing HIPAA Compliant Audit Trails using SQLAlchemy ORM Queries

by Aamir Bhatt

An audit log is a document that records an event in an information (IT) technology system. Instead of a document we store it in a database. The audit log adds audit logging functionality to all entities, so you can easily track entity views, inserts, updates and deletes. The audit logs play vital role in providing security for EHR by implementing HIPAA security rule. Alteration of data has been a major problem in recent times. Audit logs maintain integrity of data by ensuring who has accessed the data, what values have been changed and what the old value was with respect to time. If accidentally any information is altered, audit trails can help in recovery too.

SQLAlchemy Model

In the past, programmers construct database backend using write raw SQL statements, pass them to the database engine and parse the returned results as a normal array of records.Nowadays, programmers can write Object-relational mapping(ORM) programs to remove the necessity of writing tedious and error-prone raw SQL statements that are inflexible and hard-to-maintain.

The Model declarative base class behaves like a regular Python class but has a query attribute attached that can be used to query the model. Each attribute of the model represents a database field.In the below example we have User Class that is a subclass of db.model. To know more about Models and ORM please refer http://docs.sqlalchemy.org/en/rel_1_0/orm/tutorial.html

In order to log all User model queries we create another model UserAuditLog which is a replica of User Model with more parameters in it.

class User(db.Model):
__tablename__ = 'tbl_user'

id = db.Column(db.Integer, primary_key=True)
first_name = db.Column(db.String(255))
last_name = db.Column(db.String(255))
password = db.Column(db.String(127))
last_login = db.Column(db.DateTime)
is_active = db.Column(db.Boolean, default=False)
id = db.Column(db.Integer, primary_key=True)
created_on = db.Column(db.DateTime, nullable=False,default=datetime.utcnow)
modified_on = db.Column(db.DateTime, onupdate=datetime.utcnow)
username = db.Column(db.String(255), unique=True, nullable=False)
email = db.Column(db.String(255), nullable=False, unique=True)

The equivalent schema for the above User class is as below
Screen Shot 2015-08-13 at 12.21.10 pm

class UserAuditLog(AuditBase):
__tablename__ = 'tbl_user_log'

first_name = db.Column(db.String(255))
last_name = db.Column(db.String(255))
password = db.Column(db.String(127)) 
created_on = db.Column(db.DateTime, nullable=False,default=datetime.utcnow)
modified_on = db.Column(db.DateTime, onupdate=datetime.utcnow)
username = db.Column(db.String(255), unique=True, nullable=False)
email = db.Column(db.String(255), nullable=False, unique=True)
operation = Column(Enum('INSERT', 'DELETE', 'UPDATE'))
user_id = Column(Integer) table_id = Column(Integer)

Adding Event Listener

sqlalchemy comes up with the lot of events that are triggered on particular events. Here are some more events:

http://docs.sqlalchemy.org/en/rel_1_0/orm/events.html.

Whenever we insert a record in User model the below event listener listens it and calls the associated function. We add this in the same file where we declared the above models. Here we have my_after_insert_listener associated with User insert query.

class UserAuditLog(UserTableBase, AuditBase, db.Model):
__tablename__ = 'tbl_user_log'
email = db.Column(db.String(255), nullable=False)
username = db.Column(db.String(255), nullable=False)
event.listen(User, 'after_insert', my_after_insert_listener)

..

def my_after_insert_listener(mapper, connection, target):
data = target.__dict__.copy()
table_name = target.__tablename__
data['user_id'] = None
data['operation'] = 'INSERT'
data['table_id'] = data.get('id')
data['id'] = None
log_name = table_name + '_log'
auditexecute(log_name, data)

In the above function,target is a meta class object giving more information regarding the last query executed, and data passed to query. We add more data to the query data as above, in the data dictionary. At last we call auditexecute function.

def auditexecute(log_name, data): print db.Model._decl_class_registry.values()
for c in db.Model._decl_class_registry.values(): if hasattr(c, '__tablename__') and c.__tablename__ == log_name: db.session.execute(c.__table__.insert(), data) db.session.commit()

audit execute has db.Model._decl_class_registry.values() which has all table objects there.
We check if <table_log> is present. If present, we execute that object and add data dictionary.

Querying:

>>>user = User(first_name='aamir',last_name='bhat',email='bhatt@trialx.com',username='bhattaamir')

>>>db.session.add(user) 
>>>db.session.commit()
>>>User.query.all().count()
1
>>>UserAuditLog.query.all().count()
1
>>>user = User.query.filter_by(id=1).first()
>>>user.email
>>>'bhatt@trialx.com'
>>>UserAuditLog.query.filter_by(table_id=1).first()
>>>user.email
>>>'bhatt@trialx.com'

This is a basic “how to create audit log for a single table”. This way we can add more logs and log more events like updating, deleting an object etc. We can even add background log execution using celery, that can save run time execution of a query.

Leave a Reply

Your email address will not be published. Required fields are marked *

Tools & Practices

Tools and Technologies we use at Applied

Contact us now

Popular Posts