Using Spark for Affiliating Healthcare Data to Organizations

by Shahid Ashraf

At Applied, we are using Spark for processing larger open data sets like NIHreporter, PubMed and ctgov and affiliate organizations. These organisations publish millions of records yearly and update these monthly or weekly. In order to use these open data and infer information, they have to be processed through various steps. These include downloading, extraction, cleaning, and transforming them into appropriate format. (Read our blogs on ETL  and Pubmed API to know more about these processes.) The data after processing is stored in mysql  with tuple count of 19 millions and size of around 1 terabyte. The traditional scripts will not work with so much of data, it will usually take more then a week to finish processing and update data back to mysql. We need a distributed approach to process data, so following the big data notion, we use Spark for processing these records.

Lets’ take a look at how we will affiliate top twenty organizations and associate our records with them.

Step 1: Setup spark standalone cluster

Use spark-ec2 script to create a 10 node spark cluster.

Step 2: Load data in spark cluster

Use Sqoop to load data from MySql into HDFS of spark standalone cluster.

&nbsp;./sqoop import --connect jdbc:mysql://xx.yy.pp.zz</span><span class="s1">/select_star --username root --password pswd&nbsp; --m 2 --query 'SELECT record_id,affiliation from select_star_core_hcp WHERE affiliation != "" or affiliation != NULL or affiliation !=" " ORDER BY affiliation AND $CONDITIONS' --target-dir /selectaff

Step 3: Create a pattern config file (using json) for the below twenty organizations.

{
 "affiliation_tag": {
 "NYU": {
 "AFFILIATION_PATTERNS": "^NYU +| +NYU +| +NYU$|,NYU +|NEW YORK UNIVERSITY|Ronald O. Perelman",
 "DEPARTMENT_LIST": [
 "Department of Anesthesiology",
 "Department of Biochemistry &amp;amp; Molecular Pharmacology",
 "Department of Cardiothoracic Surgery",
 "Division of Cardiac Surgery",
 "Division of Thoracic Surgery",
 "Division of Pediatric and Adult Congenital Cardiac Surgery",
 "Department of Cell Biology",
 "Department of Child &amp;amp; Adolescent Psychiatry",
 "Ronald O. Perelman Department of Dermatology",
 "Ronald O. Perelman Department of Emergency Medicine",
 "Department of Environmental Medicine",
 "Department of Forensic Medicine",
 "Department of Medicine",
 "Leon H. Charney Division of Cardiology",
 "Division of Endocrinology",
 "Division of Gastroenterology",
 "Division of General Internal Medicine &amp;amp; Clinical Innovation",
 "Division of Geriatrics Medicine &amp;amp; Palliative Care",
 "Division of Hematology &amp;amp; Medical Oncology",
 "Division of Medical Humanities",
 "Division of Infectious Disease &amp;amp; Immunology",
 "Division of Nephrology",
 "Division of Pulmonary, Critical Care &amp;amp; Sleep Medicine",
 "Division of Rheumatology",
 "Division of Translational Medicine",
 "Department of Microbiology",
 "Department of Neurology",
 "Division of Neuromuscular Diseases",
 "Department of Neuroscience &amp;amp; Physiology",
 "Department of Neurosurgery",
 "Department of Obstetrics &amp;amp; Gynecology",
 "Department of Ophthalmology",
 "Department of Orthopaedic Surgery",
 "Division of Pediatric Orthopaedic Surgery",
 "Department of Otolaryngology",
 "Laboratory for Translational Auditory Research",
 "Department of Pathology",
 "Department of Pediatrics",
 "Department of Plastic Surgery",
 "Department of Population Health",
 "Department of Psychiatry",
 "Department of Radiation Oncology",
 "Department of Radiology",
 "Department of Rehabilitation Medicine",
 "Department of Surgery",
 "Acute Care Surgery",
 "Adrenal Surgery",
 "Bariatric Surgery",
 "Colon &amp;amp; Rectal Surgery",
 "Endocrine Surgery",
 "General Surgery",
 "Oral &amp;amp; Maxillofacial Surgery",
 "Pancreatitis Program",
 "Pediatric Surgery",
 "Surgical Oncology",
 "Transplant Surgery",
 "Vascular &amp;amp; Endovascular Surgery",
 "Department of Urology"
 ]
 },
 "UPEN": {
 "AFFILIATION_PATTERNS": "university of pennsylvania|^UPEN +| +UPEN +|,UPEN +| +UPEN$|university of pennsylvania PERELMAN SCHOOL OF MEDICINE|University of Pennsylvania School of Medicine|Penn Presbyterian Medical Center|Hospital of the University of Pennsylvania|Pennsylvania Hospital|Chester County Hospital",
 "DEPARTMENT_LIST": [
 "Biochemistry and Biophysics",
 "Biostatistics and Epidemiology",
 "Cancer Biology",
 "Cell and Developmental Biology",
 "Genetics",
 "Medical Ethics and Health Policy",
 "Microbiology",
 "Neuroscience",
 "Pharmacology",
 "Physiology",
 "Anesthesiology and Critical Care",
 "Dermatology",
 "Emergency Medicine",
 "Family Medicine and Community Health",
 "Medicine",
 "Cardiovascular Medicine",
 "Endocrinology, Diabetes and Metabolism",
 "Gastroenterology",
 "General Internal Medicine",
 "Geriatrics",
 "Hematology/Oncology",
 "Infectious Diseases",
 "Pulmonary and Critical Care Medicine",
 "Renal-Electrolyte and Hypertension",
 "Rheumatology",
 "Sleep Medicine",
 "Translational Medicine and Human Genetics",
 "Neurology",
 "Neurosurgery",
 "Obstetrics and Gynecology",
 "Ophthalmology",
 "Orthopaedic Surgery",
 "Otorhinolaryngology - Head and Neck Surgery",
 "Pathology and Laboratory Medicine",
 "Pediatrics",
 "Physical Medicine and Rehabilitation",
 "Psychiatry",
 "Radiation Oncology",
 "Radiology",
 "Surgery",
 "Cardiovascular Surgery",
 "Colon and Rectal Surgery",
 "Endocrine and Oncologic Surgery",
 "Gastrointestinal Surgery",
 "Pediatric Surgery",
 "Plastic Surgery",
 "Thoracic Surgery",
 "Transplant Surgery",
 "Traumatology, Surgical Critical Care and Emergency Surgery",
 "Urology",
 "Vascular Surgery &amp;amp; Endovascular Therapy"
 ]
 },
 "RIC": {
 "AFFILIATION_PATTERNS": "^RIC +| +RIC +| +RIC$|,RIC +|Rehabilitation Institute of Chicago|Rehabilitation Institute of Chicago|Advocate Illinois Masonic Medical Center|Alexian Rehabilitation Hospital|Franciscan St. Anthony Health|Franciscan St. Margaret Mercy Health|Silver Cross Hospital",
 "DEPARTMENT_LIST": [
 "Stroke Recovery",
 "Specialized Services",
 "Chronic Pain",
 "Spinal Cord Injuries",
 "Spine &amp;amp; Sports Rehabilitation",
 "Brain Injuries",
 "Pediatric &amp;amp; Adolescent Rehabilitation",
 "Prosthetics &amp;amp; Orthotics",
 "Vestibular Disorders",
 "Women's Health",
 "Neurological Disorders",
 "Amputation &amp;amp; Limb Deficiencies",
 "Parkinson's Disease",
 "Arthritis and Joint Conditions",
 "Gait (Walking) Disorders",
 "Cancer Rehabilitation",
 "Cerebral Palsy",
 "Orthopedic Conditions",
 "Back Injuries and Back Pain",
 "Osteoporosis"
 ]
 },

...

}

This step is a bit time consuming as we have to manually create these patterns/organization names. In future, the aim is to enable learning new patterns from these patterns.

Step 4 : Actual spark application which runs on spark cluster.

We have used the spark 1.3.1 for performing tagging. Check out introduction to spark application to get started with pyspark.  After loading the data to HDFS through step 2 , we will process the data through spark application written in python. We need to setup the file path by giving HDFS url of file. The results of processing are updated back to mysql through the same spark script. We can also write the results back to HDFS as well. For string matching we use our fuzzy string matching functions, which needs to be submitted to cluster and also set cluster url in application using setMaster option in spark context,

setMaster(
    "spark://ec2-54-167-11-106.compute-1.amazonaws.com:7077")

then submit the application to cluster by using following command on driver node:

$ spark-submit aff_tagger.py

&lt;pre&gt;#aff_tagger.py

class AffiliationFilter():
    """docstring for AffiliationFilter"""
    def __init__(self, **kwargs):
        self.source = kwargs.get("source", None)
        with open("affiliation_tagging_map.py") as json_data:
            self.rmap       = simplejson.load(json_data)
            self.enrich_map  = self.rmap['affiliation_tag']#.values()#[self.source.upper()]

    def find_match(self, jsonl):
        """ find the match using pattern in config file"""
            match = True
            for aff_canon in self.enrich_map.keys():
                aff_search_pattern =  self.enrich_map[aff_canon]['AFFILIATION_PATTERNS']
                p = re.search(aff_search_pattern,jsonl['AFFILIATION'],re.IGNORECASE)
                if match and p:
                    jsonl.update({"AFFILIATION_CANON":aff_canon})
                    aff_dept_list = self.enrich_map[aff_canon]['DEPARTMENT_LIST']
                    t = {}
                    for dept in aff_dept_list:
                        if dept in jsonl["AFFILIATION"]:
                            t["AFFILIATION_DEPT"] = dept
                        elif TokenMatch().site_name_match(dept,jsonl["AFFILIATION"])[0] &amp;gt; 58:
                            t["AFFILIATION_DEPT"] = dept
                    if not t:
                        t["AFFILIATION_DEPT"] = ""
                    jsonl.update(t)
                    match = False
                if match == False:
                    return jsonl
            if match:
                jsonl.update({"AFFILIATION_CANON": "UNTAGGED","AFFILIATION_DEPT": ""})
            return jsonl

    def convert_time(self,string,format = "%Y-%m-%d"):
        """converts to Y-m-d format"""
        return strftime("%Y-%m-%d %H:%M:00",strptime(string, format))

    def get_month_from_date(self,date_obj):
        return date_obj.strftime('%B')

    def get_quarter(self, month):
        """ gets quarter from month"""
        return math.ceil(month/3)

    def jsonize(self, data):
        data =  json.loads(data)
        return data

    def get_affiliation(self, jsonl):
        # if "new york university" in jsonl["AFFILIATION"].lower() or "nyu" in jsonl["AFFILIATION"].lower():
            # print jsonl["YEAR"],jsonl["AFFILIATION"]
            return jsonl

    def update_mysql(self, iterator):
        """ updates the mysql tuple back with the tag of institute"""
        conn = MySQLdb.connect(host=SELECT_STAR_DB_SERVER_IP, port=SELECT_STAR_DB_PORT, user=SELECT_STAR_DB_USER, passwd=SELECT_STAR_DB_PASSWORD, db="select_star",
                                           charset='utf8', use_unicode=True)
        conn.autocommit(True)
        cur = conn.cursor(cursorclass=MySQLdb.cursors.DictCursor)
        print "partitions...",
        f = open("/root/mysqlf.txt","w+")
        k = 0
        for i, jsonl in enumerate(iterator):
            sql = "update select_star_core_hcp set  aff_tag = '%s' where record_id = '%s';"
            sql = sql %(jsonl["AFFILIATION_CANON"],jsonl["RECORD_ID"])
            f.write(sql+ "\n")
            print cur.execute(sql)
            sql1 = """SELECT pre_enriched from raw_records WHERE record_id = '%s';""" %jsonl["RECORD_ID"]
            f.write(sql1+ "\n")
            r = cur.execute(sql1)
            r = cur.fetchall()
            if len(r) &amp;gt; 0:
                if r[0]["pre_enriched"]:
                    ujsonl = simplejson.loads(r[0]["pre_enriched"])
                    ujsonl["AFF_TAG"] = jsonl["AFFILIATION_CANON"]
                    ujsonl["AFFILIATION_DEPT"] = jsonl["AFFILIATION_DEPT"]
                    query = """UPDATE  raw_records SET pre_enriched = %s WHERE  record_id = %s;"""
                    cur.execute(query,[simplejson.dumps(ujsonl),jsonl["RECORD_ID"]])
            else:
                k+=1
        print "no of row in this partition %s %s" %(i,k)
        conn.close()

    def f(self, iterator):
            print "partition"
            for x in iterator:
                 break

    def tuple_json(self,tupl):
        tupl = tupl.split(',')
        return {"RECORD_ID":tupl[0],"AFFILIATION":','.join(tupl[1:])}

if __name__ == '__main__':
    conf = SparkConf().setAppName(
    "select_star_filter_affilition_v1").setMaster(
    "spark://ec2-54-167-xx-xx6.compute-1.amazonaws.com:7077")
    # .set("spark.speculation","true")
    # sc = SparkContext(conf=conf)
    sc = SparkContext(conf=conf,pyFiles=["/root/metrics/token_matcher.py","/root/metrics/normalize.py","/root/metrics/settings.py","/root/metrics/token_constants.py"])
    path = "hdfs://ec2-54-167-xx-xx.compute-1.amazonaws.com:9000/selectaff"
    A = AffiliationFilter()
    select_star_RecordRDD = sc.textFile(path).repartition(40)
    select_star_RecordRDD = select_star_RecordRDD.filter(lambda l: l).map(A.tuple_json).filter(lambda l: l.get("AFFILIATION",None))
    filtered_rdd = select_star_RecordRDD.map(A.find_match)
    col = filtered_rdd.filter(lambda l: l["AFFILIATION_CANON"] not in ["NYU","UPEN"]) # in ["NYU","UPEN","RIC","MFMER"]
    # col.saveAsTextFile("hdfs://ec2-54-147-63-178.compute-1.amazonaws.com:9000/ric.txt")
    print "%s " %("*"*500)
    #todo use foreach on each rdd...
    #filtered_rdd.foreach(a.update_mysql)
    print col.foreachPartition(A.update_mysql)

So, this is how we can process large healthcare data using Apache spark and assign affiliation to the organization records, based on string matching. In the upcoming blogs, I will discuss how at Applied we are using spark for record linking large datasets.

Follow me on Twitter and Applied for more cool stuff we do.

To learn more about

Contact Us

One thought on “Using Spark for Affiliating Healthcare Data to Organizations”

Leave a Reply

Your email address will not be published. Required fields are marked *

Data Science & PopHealth

Methods, tools, systems for healthcare data analysis

Contact us now

Popular Posts