Weather

HIVE SCDTYPE II implementation based on stage table in MYSQL .(using SQOOP incremental load)

 HIVE SCDTYPE II implementation based on stage table in MYSQL .(using SQOOP incremental load) explanation

--------------------
In MYSQL TABLE

mysql>describe pt_adt_info_stg;(this is stage table of scdtype-2 implementation in mysql)

 pt_adt_info_stg_id              int (primary key in Mysql table)
pt_adt_admission_date        timestamp(admission date of patient)
 pt_adt_inventory_id            int
 pt_adt_patient_id                int
pt_adt_discharge_date         timestamp(discharge date of patient)
 pt_adt_staff_id                     int
pt_adt_last_update                timestamp(timestamp when record is modified in table)

------------------------------------------
To set unixtime with any field
hive> Select t.*, from_unixtime(unix_timestamp())  from temp_filmdet t;

//from_unixtime(bigint unixtime[, string format]) is a built in date function in hive ,returns string
//unix_timestamp() built in function in hive gets current UNIX timestamp in seconds

STEP 1:

 sqoop incremental update ( to find out new records based on pt_adt_info_stg_id )

go to $SQOOP_HOME:

 $SQOOP_HOME/bin> ./sqoop import --connect jdbc:mysql://191.160.463.1/ptdb
 --table pt_adt_info_stg
--username biadmin  --password biadmin
 --target-dir /user/hadoop/admissionnew
 --check-column "pt_adt_info_stg_id" 
--incremental append 
--last-value 2000000 -m 3

Note: where new admission data in "/user/hadoop/admissionnew"(when ever a  new admission  incident occurs then the new data for Id field gets inserted into table and data in pt_adt_discharge_date field is open ) 

STEP 2

$SQOOP_HOME/bin>./sqoop import --connect jdbc:mysql://192.168.56.1/ptdb  --table pt_adt_info_stg --username biadmin --password biadmin
 --target-dir /user/hadoop/admissionlastmod
 --check-column "last_update" --incremental lastmodified 
 --last-value "2016-01-25 11:51:25" 
--where " pt_adt_admission_date not null and pt_adt_discharge_date is not null" -m 3

Note: where discharge_date is set after admission_date, 2) data in hdfs file in path
"/user/hadoop/admissionlastmod"                           

STEP 3:

 The previous history data that should be updated in table (updating previous id's where changes did happen). Above using sqoop we imported the record into HDFS file named "admissionlastmod"in above step 2


hive:>create external table pt_adt_stage
(
pt_adt_info_stg_id              int
pt_adt_admission_date        timestamp
 pt_adt_inventory_id            int
 pt_adt_patient_id                int
pt_adt_discharge_date         timestamp
 pt_adt_staff_id                     int
pt_adt_last_update                timestamp
 )
     row format delimited
     fields terminated by ','
    stored as textfile
     location '/user/hadoop/admissionlastmod';

STEP 4:

Original table in hive: with scdtype 2 implementation is pt_adt_info (already present in hive with SCD-type2 implementation with history of data stored in it)


$HIVE_HOME/bin:> hive (to go to hive shell)

hive:>describe pt_adt_info(shows the structure of the table that exist with data in hive)

details are
pt_adt_info_stg_id              int
pt_adt_admission_date        timestamp
 pt_adt_inventory_id            int
 pt_adt_patient_id                int
pt_adt_discharge_date         timestamp
 pt_adt_staff_id                     int
pt_adt_last_update                timestamp

     partitioned by(scdtype2 timestamp),stored as textfile, location '/user/hadoop/pt_adt_info_org/';

execute describe pt_adt_info to know the structure of the table

STEP 5:

Creating a new table with scdtype 2 implementation
 hive:>
create external table pt_adt_info_20150120
(
pt_adt_info_stg_id              int
pt_adt_admission_date        timestamp
 pt_adt_inventory_id            int
 pt_adt_patient_id                int
pt_adt_discharge_date         timestamp
 pt_adt_staff_id                     int
pt_adt_last_update                timestamp
 )
     partitioned by(scdtype2 timestamp)
     row format delimited
     fields terminated by ','
    stored as textfile
     location '/user/hadoop/pt_adt_info_20150120/';
------------------------------------------------------------

Step 6

Loading the history data rows  from Original table (pt_adt_info) which does have any modifications into pt_adt_info_20150120 (work around)

hive> insert into table  pt_adt_info_20150120                                         
     partition(scdtype2='2015-01-19 11:51:25')                         
    select pto. pt_adt_info_stg_id ,
 pto.pt_adt_admission_date,
pto.pt_adt_inventory_id , pto.pt_adt_patient_id ,
pto.pt_adt_discharge_date, pto. pt_adt_staff_id ,
pto.pt_adt_last_update
from pt_adt_info pto where
pto. pt_adt_info_stg_id not in (select pstg.pt_adt_info_stg_id  from pt_adt_stg pstg).

Step 7

 loading the data that changed (means updated records) into table pt_adt_info_20150120(We have this data loaded in staging table pt_adt_stage)

hive> 
insert into table  pt_adt_info_20150120                                         
     partition(scdtype2='2015-01-19 11:51:25')                         
    select stg. pt_adt_info_stg_id , stg.pt_adt_admission_date,
stg.pt_adt_inventory_id , stg.pt_adt_patient_id ,
stg.pt_adt_discharge_date,stg. pt_adt_staff_id ,
stg.pt_adt_last_update  from pt_adt_stage stg

Step 8:

Loading    where new admission data in "/user/hadoop/admissionnew"(where new admission data exists with open pt_adt_discharge_date) 

 Hive>load data inpath '/user/hadoop/admissionnew' into table pt_adt_info_20150120

partition(scdtype2='2015-01-19 11:51:25');

-----------------------------------
Finally loaded all the data into  pt_adt_info_20150120 in three stages
 1) loading the records data that does not had any changes in STEP 6
 2)Loading the  updated data only from staging table in STEP 7
3) Loading the new records  data in STEP 8