HIVE SCDTYPE II implementation based on stage table in MYSQL .(using SQOOP incremental load) explanation
-------------------- In MYSQL TABLE
mysql>describe pt_adt_info_stg;(this is stage table of scdtype-2 implementation in mysql)
pt_adt_info_stg_id int (primary key in Mysql table)
pt_adt_admission_date timestamp(admission date of patient)
pt_adt_inventory_id int
pt_adt_patient_id int
pt_adt_discharge_date timestamp(discharge date of patient)
pt_adt_staff_id int
pt_adt_last_update timestamp(timestamp when record is modified in table)
------------------------------------------
To set unixtime with any field
hive> Select t.*, from_unixtime(unix_timestamp()) from temp_filmdet t;
//from_unixtime(bigint unixtime[, string format]) is a built in date function in hive ,returns string
//unix_timestamp() built in function in hive gets current UNIX timestamp in seconds
STEP 1:
sqoop incremental update ( to find out new records based on pt_adt_info_stg_id )go to $SQOOP_HOME:
$SQOOP_HOME/bin> ./sqoop import --connect jdbc:mysql://191.160.463.1/ptdb
--table pt_adt_info_stg
--username biadmin --password biadmin
--target-dir /user/hadoop/admissionnew
--check-column "pt_adt_info_stg_id"
--incremental append
--last-value 2000000 -m 3
Note: where new admission data in "/user/hadoop/admissionnew"(when ever a new admission incident occurs then the new data for Id field gets inserted into table and data in pt_adt_discharge_date field is open )
STEP 2
$SQOOP_HOME/bin>./sqoop import --connect jdbc:mysql://192.168.56.1/ptdb --table pt_adt_info_stg --username biadmin --password biadmin--target-dir /user/hadoop/admissionlastmod
--check-column "last_update" --incremental lastmodified
--last-value "2016-01-25 11:51:25"
--where " pt_adt_admission_date not null and pt_adt_discharge_date is not null" -m 3
Note: where discharge_date is set after admission_date, 2) data in hdfs file in path
"/user/hadoop/admissionlastmod"
STEP 3:
The previous history data that should be updated in table (updating previous id's where changes did happen). Above using sqoop we imported the record into HDFS file named "admissionlastmod"in above step 2hive:>create external table pt_adt_stage
(
pt_adt_info_stg_id int
pt_adt_admission_date timestamp
pt_adt_inventory_id int
pt_adt_patient_id int
pt_adt_discharge_date timestamp
pt_adt_staff_id int
pt_adt_last_update timestamp
)
row format delimited
fields terminated by ','
stored as textfile
location '/user/hadoop/admissionlastmod';
STEP 4:
Original table in hive: with scdtype 2 implementation is pt_adt_info (already present in hive with SCD-type2 implementation with history of data stored in it)
$HIVE_HOME/bin:> hive (to go to hive shell)
hive:>describe pt_adt_info(shows the structure of the table that exist with data in hive)
details are
pt_adt_info_stg_id int
pt_adt_admission_date timestamp
pt_adt_inventory_id int
pt_adt_patient_id int
pt_adt_discharge_date timestamp
pt_adt_staff_id int
pt_adt_last_update timestamp
partitioned by(scdtype2 timestamp),stored as textfile, location '/user/hadoop/pt_adt_info_org/';
execute describe pt_adt_info to know the structure of the table
STEP 5:
Creating a new table with scdtype 2 implementationhive:>
create external table pt_adt_info_20150120
(
pt_adt_info_stg_id int
pt_adt_admission_date timestamp
pt_adt_inventory_id int
pt_adt_patient_id int
pt_adt_discharge_date timestamp
pt_adt_staff_id int
pt_adt_last_update timestamp
)
partitioned by(scdtype2 timestamp)
row format delimited
fields terminated by ','
stored as textfile
location '/user/hadoop/pt_adt_info_20150120/';
------------------------------------------------------------
Step 6
Loading the history data rows from Original table (pt_adt_info) which does have any modifications into pt_adt_info_20150120 (work around)hive> insert into table pt_adt_info_20150120
partition(scdtype2='2015-01-19 11:51:25')
select pto. pt_adt_info_stg_id ,
pto.pt_adt_admission_date,
pto.pt_adt_inventory_id , pto.pt_adt_patient_id ,
pto.pt_adt_discharge_date, pto. pt_adt_staff_id ,
pto.pt_adt_last_update
from pt_adt_info pto where
pto. pt_adt_info_stg_id not in (select pstg.pt_adt_info_stg_id from pt_adt_stg pstg).
Step 7
loading the data that changed (means updated records) into table pt_adt_info_20150120(We have this data loaded in staging table pt_adt_stage)hive>
insert into table pt_adt_info_20150120
partition(scdtype2='2015-01-19 11:51:25')
select stg. pt_adt_info_stg_id , stg.pt_adt_admission_date,
stg.pt_adt_inventory_id , stg.pt_adt_patient_id ,
stg.pt_adt_discharge_date,stg. pt_adt_staff_id ,
stg.pt_adt_last_update from pt_adt_stage stg
Step 8:
Loading where new admission data in "/user/hadoop/admissionnew"(where new admission data exists with open pt_adt_discharge_date)Hive>load data inpath '/user/hadoop/admissionnew' into table pt_adt_info_20150120
partition(scdtype2='2015-01-19 11:51:25');
-----------------------------------
Finally loaded all the data into pt_adt_info_20150120 in three stages
1) loading the records data that does not had any changes in STEP 6
2)Loading the updated data only from staging table in STEP 7
3) Loading the new records data in STEP 8