提交 8dc73b9f authored 作者: 宋宏伟's avatar 宋宏伟

update

上级 0a6d0920
import time
from datetime import datetime, timedelta
import logging
from ETL_diagnosis import *
from ETL_drug import *
from ETL_lab import *
from ETL_patient import *
from ETL_visit import *
class BeijingTimeFormatter(logging.Formatter):
"""Custom logging formatter to adjust log timestamps to Beijing time (UTC+8)."""
def formatTime(self, record, datefmt=None):
ct = datetime.fromtimestamp(record.created) + timedelta(hours=8) # Adjust for Beijing time
if datefmt:
s = ct.strftime(datefmt)
else:
try:
s = ct.isoformat(timespec='milliseconds')
except TypeError:
s = ct.isoformat()
return s
def format(self, record):
record.asctime = self.formatTime(record, self.datefmt)
return super(BeijingTimeFormatter, self).format(record)
# 创建日志处理器
file_handler = logging.FileHandler('etl_run.log')
stream_handler = logging.StreamHandler()
# 设置自定义的时间格式化器
formatter = BeijingTimeFormatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
stream_handler.setFormatter(formatter)
# 设置日志配置,使用自定义的时间格式化器
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[file_handler, stream_handler]
)
def parse_date(date_string):
"""解析日期字符串为 datetime 对象,捕获并处理可能的格式错误"""
try:
return datetime.strptime(date_string, "%Y-%m-%d")
except ValueError as e:
print(f"日期格式错误: {date_string}. 错误信息: {e}")
raise # 向上抛出异常以停止处理
def run_etl(pv_id, table_name, data_start_time, data_end_time):
data_start_times = parse_date(data_start_time)
data_end_times = parse_date(data_end_time)
data_start_times_2 = data_start_times - timedelta(days=30)
data_end_times_2 = data_end_times + timedelta(days=60)
data_start_times_utc = data_start_times.strftime("%Y-%m-%d %H:%M:%S") + "+00:00"
data_end_times_utc = data_end_times.strftime("%Y-%m-%d %H:%M:%S") + "+00:00"
data_start_times_2_utc = data_start_times_2.strftime("%Y-%m-%d %H:%M:%S") + "+00:00"
data_end_times_2_utc = data_end_times_2.strftime("%Y-%m-%d %H:%M:%S") + "+00:00"
try:
log_message = f"开始执行 ETL 任务,数据时间范围: {data_start_time} 至 {data_end_time}..."
print(log_message)
logging.info(log_message)
if table_name == 'patient':
etl_patient(pv_id, data_start_time, data_end_time, data_start_times_utc, data_end_times_utc,
data_start_times_2_utc, data_end_times_2_utc)
elif table_name == 'visit':
etl_visit(pv_id, data_start_time, data_end_time, data_start_times_utc, data_end_times_utc,
data_start_times_2_utc, data_end_times_2_utc)
elif table_name == 'prescribing':
etl_prescribing(pv_id, data_start_time, data_end_time, data_start_times_utc, data_end_times_utc,
data_start_times_2_utc, data_end_times_2_utc)
elif table_name == 'diagnosis':
etl_diagnosis(pv_id, data_start_time, data_end_time, data_start_times_utc, data_end_times_utc,
data_start_times_2_utc, data_end_times_2_utc)
elif table_name == 'lab':
etl_lab_result_cm(pv_id, data_start_time, data_end_time, data_start_times_utc, data_end_times_utc,
data_start_times_2_utc, data_end_times_2_utc)
except Exception as e:
print(f"ETL 任务执行失败: {e}")
logging.error(f"ETL 任务执行失败: {e}")
date_ranges = [
["2021-01-01", "2021-07-01"],
["2021-07-01", "2022-01-01"],
["2022-01-01", "2022-07-01"],
["2022-07-01", "2023-01-01"],
["2023-01-01", "2023-07-01"],
["2023-07-01", "2024-01-01"],
["2024-01-01", "2024-07-01"],
["2024-07-01", "2024-10-01"],
]
pv_ids = ['320106426090445', '320104466002630', '320106466000838']
tables = ['patient', 'visit', 'prescribing', 'diagnosis', 'lab']
etl_start_time = time.time()
for pv_id in pv_ids:
for table_name in tables:
for start_date, end_date in date_ranges:
run_etl(pv_id, table_name, start_date, end_date)
etl_end_time = time.time()
total_time = etl_end_time - etl_start_time
print(f"所有 ETL 任务共耗时: {total_time:.2f} 秒")
logging.info(f"所有 ETL 任务共耗时: {total_time:.2f} 秒")
from data_query import *
def etl_diagnosis(pv_id,d_start_time,d_end_time,d_start_time_utc,d_end_time_utc,d_start_time2_utc,d_end_time2_utc):
output_path = r'./diagnosis.csv'
queries = [
f"""
WITH
t1 AS (SELECT DISTINCT
pat_base_id AS patient_id,
visit_record_id AS visit_id,
organization_id AS provider_id
FROM iceberg.cdm.visit_record
WHERE organization_id = '{pv_id}'
),
t2 AS (SELECT DISTINCT
visit_record_id AS visit_id
FROM iceberg.cdm.patient_diagnosis
WHERE diagnosis_name LIKE '%糖尿病%'
AND diagnosis_time >= TIMESTAMP '{d_start_time2_utc}'
AND diagnosis_time < TIMESTAMP '{d_end_time2_utc}'
),
t3 AS (SELECT DISTINCT
visit_record_id AS visit_id,
date_of_visiting AS admission_datetime,
NULL AS discharge_datetime,
specialty_name AS specialty,
CASE
WHEN is_emergency = TRUE THEN '急诊'
ELSE '门诊'
END AS patient_type
FROM iceberg.cdm.outpatient_record
WHERE CAST(date_of_visiting AS DATE) >= DATE '{d_start_time}'
AND CAST(date_of_visiting AS DATE) <= DATE '{d_end_time}'),
t4 AS (SELECT DISTINCT a.patient_id
FROM t1 a
JOIN t2 b ON a.visit_id = b.visit_id),
t5 AS (SELECT b.patient_id, b.visit_id, b.provider_id
FROM t4 a
JOIN t1 b ON a.patient_id = b.patient_id),
t6 AS (SELECT DISTINCT a.patient_id,
a.visit_id,
b.specialty,
b.patient_type,
a.provider_id
FROM t5 a
JOIN t3 b ON a.visit_id = b.visit_id),
t7 AS (
SELECT DISTINCT
*
FROM iceberg.cdm.patient_diagnosis
WHERE diagnosis_time >= TIMESTAMP '{d_start_time2_utc}'
AND diagnosis_time < TIMESTAMP '{d_end_time2_utc}'
AND (diagnosis_name ~* '白内障|视网膜|眼|黄斑|玻璃体|玻血|网脱|失明|弱视|视力'
OR diagnosis_name ~* '黄斑水肿|失明|眼球?萎缩|眼球?缺失|盲目(3|三)|(视力|视觉)重度|神经'
OR diagnosis_name ~* '截肢|切断|截断|截|蛋白尿|蛋白尿|肾?移植|透析?|尿毒症|CKD(5|Ⅴ|五)|肾.{0,4}终末|终末.{0,4}肾'
OR diagnosis_name ~* '(颈|髂总|髂内|肾脏|肢端|腹主|肢|肾小)?主?动脉(粥样硬|痉挛|坏疽|硬|瘤|炎|栓塞|血栓)?化?|间歇性?跛行|红斑性肢痛|(伯|柏)格|雷诺氏|周围血管疾?病|动脉(肌纤维发育异|坏疽|痉挛)?|主动脉(瘤|炎)?|主动脉(粥样)?硬化|(静脉)?曲张|血栓性静脉|下肢(深静脉血栓|静脉曲张|动脉闭塞|血栓性静脉炎|静脉功能不全|静脉炎|(血管|动脉)闭塞症|静脉肌间血栓形成)?|周围循环'
OR diagnosis_name ~* '冠(心病|状|脉)|旁路移植|搭桥|多支|PCI|心绞痛|动脉硬化.{0,3}心脏病|心(肌|脏)(缺|供)血|缺血性心(脏|肌)病|心肌?梗'
OR diagnosis_name ~* '心梗|心肌梗死|心痛|陈旧(性|型)?(心|ST|非ST|Q|前|侧|下|高|间|广泛|(左|右)心室)|心肌梗塞|胸痹'
OR diagnosis_name ~* '脑.{0,3}(梗|塞|死)|卒中|中风'
OR diagnosis_name ~* '心(力|室|房)?衰(竭)?|心功能不全|心功能.*级|心源性哮喘|低心排综合征|KILLIP.*级|HBP|高血压'
OR diagnosis_name ~* '血脂异常|(胆固醇|高脂|甘油三?(脂|酯))血症|高血脂|高粘血症|高(密度)?(酯|脂)蛋白|低(密度)?(酯|脂)蛋白|高?三酰甘油'
OR diagnosis_name ~* '糖尿病')
)
select distinct
b.patient_id,
b.visit_id,
b.patient_type,
a.diagnosis_time as diagnosis_datetime,
b.specialty,
a.diagnosis_code as dx,
a.diagnosis_name as dx_desc,
a.diagnosis_code as dicd10_code,
a.diagnosis_type_name as dx_source,
a.is_primary as pdx,
a.diagnosis_name as icd10_name,
b.provider_id
from t7 a
join t6 b on a.visit_record_id = b.visit_id;
""",
f"""
WITH
t1 AS (SELECT DISTINCT
pat_base_id AS patient_id,
visit_record_id AS visit_id,
organization_id AS provider_id
FROM iceberg.cdm.visit_record
WHERE organization_id = '{pv_id}'
),
t2 AS (SELECT DISTINCT
visit_record_id AS visit_id
FROM iceberg.cdm.patient_diagnosis
WHERE diagnosis_name LIKE '%糖尿病%'
AND diagnosis_time >= TIMESTAMP '{d_start_time2_utc}'
AND diagnosis_time < TIMESTAMP '{d_end_time2_utc}'
),
t3 AS (SELECT DISTINCT
visit_record_id AS visit_id,
admission_time AS admission_datetime,
discharge_time AS discharge_datetime,
admission_specialty_name AS specialty,
'住院' AS patient_type
FROM iceberg.cdm.inpat_record
WHERE discharge_time >= TIMESTAMP '{d_start_time_utc}'
AND discharge_time < TIMESTAMP '{d_end_time_utc}'
),
t4 AS (SELECT DISTINCT a.patient_id
FROM t1 a
JOIN t2 b ON a.visit_id = b.visit_id),
t5 AS (SELECT b.patient_id, b.visit_id, b.provider_id
FROM t4 a
JOIN t1 b ON a.patient_id = b.patient_id),
t6 AS (SELECT DISTINCT a.patient_id,
a.visit_id,
b.admission_datetime,
b.discharge_datetime,
b.specialty,
b.patient_type,
a.provider_id
FROM t5 a
JOIN t3 b ON a.visit_id = b.visit_id),
t7 AS (
SELECT DISTINCT
*
FROM iceberg.cdm.patient_diagnosis
WHERE diagnosis_time >= TIMESTAMP '{d_start_time2_utc}'
AND diagnosis_time < TIMESTAMP '{d_end_time2_utc}'
AND (diagnosis_name ~* '白内障|视网膜|眼|黄斑|玻璃体|玻血|网脱|失明|弱视|视力'
OR diagnosis_name ~* '黄斑水肿|失明|眼球?萎缩|眼球?缺失|盲目(3|三)|(视力|视觉)重度|神经'
OR diagnosis_name ~* '截肢|切断|截断|截|蛋白尿|蛋白尿|肾?移植|透析?|尿毒症|CKD(5|Ⅴ|五)|肾.{0,4}终末|终末.{0,4}肾'
OR diagnosis_name ~* '(颈|髂总|髂内|肾脏|肢端|腹主|肢|肾小)?主?动脉(粥样硬|痉挛|坏疽|硬|瘤|炎|栓塞|血栓)?化?|间歇性?跛行|红斑性肢痛|(伯|柏)格|雷诺氏|周围血管疾?病|动脉(肌纤维发育异|坏疽|痉挛)?|主动脉(瘤|炎)?|主动脉(粥样)?硬化|(静脉)?曲张|血栓性静脉|下肢(深静脉血栓|静脉曲张|动脉闭塞|血栓性静脉炎|静脉功能不全|静脉炎|(血管|动脉)闭塞症|静脉肌间血栓形成)?|周围循环'
OR diagnosis_name ~* '冠(心病|状|脉)|旁路移植|搭桥|多支|PCI|心绞痛|动脉硬化.{0,3}心脏病|心(肌|脏)(缺|供)血|缺血性心(脏|肌)病|心肌?梗'
OR diagnosis_name ~* '心梗|心肌梗死|心痛|陈旧(性|型)?(心|ST|非ST|Q|前|侧|下|高|间|广泛|(左|右)心室)|心肌梗塞|胸痹'
OR diagnosis_name ~* '脑.{0,3}(梗|塞|死)|卒中|中风'
OR diagnosis_name ~* '心(力|室|房)?衰(竭)?|心功能不全|心功能.*级|心源性哮喘|低心排综合征|KILLIP.*级|HBP|高血压'
OR diagnosis_name ~* '血脂异常|(胆固醇|高脂|甘油三?(脂|酯))血症|高血脂|高粘血症|高(密度)?(酯|脂)蛋白|低(密度)?(酯|脂)蛋白|高?三酰甘油'
OR diagnosis_name ~* '糖尿病')
)
select distinct
b.patient_id,
b.visit_id,
b.patient_type,
a.diagnosis_time as diagnosis_datetime,
b.specialty,
a.diagnosis_code as dx,
a.diagnosis_name as dx_desc,
a.diagnosis_code as dicd10_code,
a.diagnosis_type_name as dx_source,
a.is_primary as pdx,
a.diagnosis_name as icd10_name,
b.provider_id
from t7 a
join t6 b on a.visit_record_id = b.visit_id;
"""
]
# 调用函数
execute_queries_and_write_to_csv(queries, output_path)
from data_query import *
def etl_prescribing(pv_id,d_start_time,d_end_time,d_start_time_utc,d_end_time_utc,d_start_time2_utc,d_end_time2_utc):
output_path = r'./prescribing.csv'
queries = [
f"""
WITH
t1 AS (SELECT DISTINCT
pat_base_id AS patient_id,
visit_record_id AS visit_id,
organization_id AS provider_id
FROM iceberg.cdm.visit_record
WHERE organization_id = '{pv_id}'
),
t2 AS (SELECT DISTINCT
visit_record_id AS visit_id
FROM iceberg.cdm.patient_diagnosis
WHERE diagnosis_name LIKE '%糖尿病%'
AND diagnosis_time >= TIMESTAMP '{d_start_time2_utc}'
AND diagnosis_time < TIMESTAMP '{d_end_time2_utc}'
),
t3 AS (SELECT DISTINCT
visit_record_id AS visit_id,
date_of_visiting AS admission_datetime,
NULL AS discharge_datetime,
specialty_name AS specialty,
CASE
WHEN is_emergency = TRUE THEN '急诊'
ELSE '门诊'
END AS patient_type
FROM iceberg.cdm.outpatient_record
WHERE CAST(date_of_visiting AS DATE) >= DATE '{d_start_time}'
AND CAST(date_of_visiting AS DATE) <= DATE '{d_end_time}'),
t4 AS (SELECT DISTINCT a.patient_id
FROM t1 a
JOIN t2 b ON a.visit_id = b.visit_id),
t5 AS (SELECT b.patient_id, b.visit_id, b.provider_id
FROM t4 a
JOIN t1 b ON a.patient_id = b.patient_id),
t6 AS (SELECT DISTINCT a.patient_id,
a.visit_id,
b.specialty,
b.patient_type,
a.provider_id
FROM t5 a
JOIN t3 b ON a.visit_id = b.visit_id),
t7 AS (
SELECT *
FROM iceberg.cdm.outpat_recipe_detail
WHERE prescription_time >= TIMESTAMP '{d_start_time2_utc}'
AND prescription_time < TIMESTAMP '{d_end_time2_utc}'
AND (
drug_name ~* '格列(本脲|吡嗪|喹酮|齐特|美脲|波脲)|甲苯磺丁脲|氯磺丙脲|优降糖|达安疗|美吡达|瑞易宁|秦苏|迪沙|依吡达|优哒灵|元坦|麦林格|唐贝克|曼迪宝|美吡达|糖适平|捷适|达美康|弗莱因|弘旭阳|谐尔平|亚莫利|万苏平|佑苏|力贻苹|迪北|安多美|科德平|伊瑞|佳和洛|普仁平|克糖利'
OR drug_name ~* '(瑞|那|米)格列奈|诺和龙|弗来迪|唐力|唐瑞|贝加|快如妥'
OR drug_name ~* '二甲双胍|甲福明|格华止|奈达|泰白|至力|倍顺|麦克罗辛|麦特美|唐必呋|亿恒|仁欣|悦达宁|力乐尔|卜可|迪化唐锭|美迪康|君士达新|唐落|山姆士|君力达'
OR drug_name ~* '(罗|吡)格列酮|文迪雅|奥洛华|爱能|太罗|维戈洛|宜力喜|圣敏|耐迪|安瑞宁|艾可拓|卡司平|顿灵|贝唐宁|佳普喜|安可妥|凯宝维元|艾汀|卡司平|瑞彤|列洛|夷友'
OR drug_name ~* '(阿卡|伏格列)波糖|米格列醇|拜唐苹|卡博平|贝希|倍欣|华怡平|德赛天|米格尼醇|Glyset|奥恬苹|瑞舒'
OR drug_name ~* '(西|维|沙|利|阿)格列汀'
OR drug_name ~* '(达|恩|坎|伊|托)格列净'
OR drug_name ~* '(西|沙|维|利)格列汀|恩格列净|欧唐静'
OR drug_name ~* '胰岛素|(重和|万苏)林|甘舒霖|优泌(林|乐)|NPH|艾倍得|糖德仕|来得时|优(思|乐)灵|(速|长)秀霖|诺和(锐|平|佳|达|灵)'
OR drug_name ~* '(艾塞那|利司那|贝那鲁|利拉鲁|聚乙二醇洛塞那|司美格鲁|度拉糖)肽|百泌达|百达扬|利时敏|谊生泰|诺和力|弗来美|诺和泰|度易达')
)
select distinct
b.patient_id,
b.visit_id,
b.patient_type,
b.specialty,
a.drug_name as rx_desc,
a.prescription_time as order_datetime,
null as rx_start_datetime,
null as rx_end_datetime,
a.dose as dosage_qty,
a.dose_unit_name as dosage_unit,
a.frequency_code as frequency,
a.qty as quantity,
null as quantity_uom,
a.route_name as roa,
a.specs as drug_spec,
a.day_num as days_supply,
b.provider_id
from t7 a
join t6 b on a.visit_record_id = b.visit_id;
""",
f"""
WITH
t1 AS (SELECT DISTINCT
pat_base_id AS patient_id,
visit_record_id AS visit_id,
organization_id AS provider_id
FROM iceberg.cdm.visit_record
WHERE organization_id = '{pv_id}'
),
t2 AS (SELECT DISTINCT
visit_record_id AS visit_id
FROM iceberg.cdm.patient_diagnosis
WHERE diagnosis_name LIKE '%糖尿病%'
AND diagnosis_time >= TIMESTAMP '{d_start_time2_utc}'
AND diagnosis_time < TIMESTAMP '{d_end_time2_utc}'
),
t3 AS (SELECT DISTINCT
visit_record_id AS visit_id,
admission_time AS admission_datetime,
discharge_time AS discharge_datetime,
admission_specialty_name AS specialty,
'住院' AS patient_type
FROM iceberg.cdm.inpat_record
WHERE discharge_time >= TIMESTAMP '{d_start_time_utc}'
AND discharge_time < TIMESTAMP '{d_end_time_utc}'
),
t4 AS (SELECT DISTINCT a.patient_id
FROM t1 a
JOIN t2 b ON a.visit_id = b.visit_id),
t5 AS (SELECT b.patient_id, b.visit_id, b.provider_id
FROM t4 a
JOIN t1 b ON a.patient_id = b.patient_id),
t6 AS (SELECT DISTINCT a.patient_id,
a.visit_id,
b.admission_datetime,
b.discharge_datetime,
b.specialty,
b.patient_type,
a.provider_id
FROM t5 a
JOIN t3 b ON a.visit_id = b.visit_id),
t7 AS (
SELECT *
FROM iceberg.cdm.inpat_drug_order
WHERE input_time >= TIMESTAMP '{d_start_time2_utc}'
AND input_time < TIMESTAMP '{d_end_time2_utc}'
AND (
drug_name ~* '格列(本脲|吡嗪|喹酮|齐特|美脲|波脲)|甲苯磺丁脲|氯磺丙脲|优降糖|达安疗|美吡达|瑞易宁|秦苏|迪沙|依吡达|优哒灵|元坦|麦林格|唐贝克|曼迪宝|美吡达|糖适平|捷适|达美康|弗莱因|弘旭阳|谐尔平|亚莫利|万苏平|佑苏|力贻苹|迪北|安多美|科德平|伊瑞|佳和洛|普仁平|克糖利'
OR drug_name ~* '(瑞|那|米)格列奈|诺和龙|弗来迪|唐力|唐瑞|贝加|快如妥'
OR drug_name ~* '二甲双胍|甲福明|格华止|奈达|泰白|至力|倍顺|麦克罗辛|麦特美|唐必呋|亿恒|仁欣|悦达宁|力乐尔|卜可|迪化唐锭|美迪康|君士达新|唐落|山姆士|君力达'
OR drug_name ~* '(罗|吡)格列酮|文迪雅|奥洛华|爱能|太罗|维戈洛|宜力喜|圣敏|耐迪|安瑞宁|艾可拓|卡司平|顿灵|贝唐宁|佳普喜|安可妥|凯宝维元|艾汀|卡司平|瑞彤|列洛|夷友'
OR drug_name ~* '(阿卡|伏格列)波糖|米格列醇|拜唐苹|卡博平|贝希|倍欣|华怡平|德赛天|米格尼醇|Glyset|奥恬苹|瑞舒'
OR drug_name ~* '(西|维|沙|利|阿)格列汀'
OR drug_name ~* '(达|恩|坎|伊|托)格列净'
OR drug_name ~* '(西|沙|维|利)格列汀|恩格列净|欧唐静'
OR drug_name ~* '胰岛素|(重和|万苏)林|甘舒霖|优泌(林|乐)|NPH|艾倍得|糖德仕|来得时|优(思|乐)灵|(速|长)秀霖|诺和(锐|平|佳|达|灵)'
OR drug_name ~* '(艾塞那|利司那|贝那鲁|利拉鲁|聚乙二醇洛塞那|司美格鲁|度拉糖)肽|百泌达|百达扬|利时敏|谊生泰|诺和力|弗来美|诺和泰|度易达')
)
select DISTINCT
b.patient_id,
b.visit_id,
b.patient_type,
b.specialty,
a.drug_name as rx_desc,
a.input_time as order_datetime,
a.begin_time as rx_start_datetime,
a.end_time as rx_end_datetime,
a.dose as dosage_qty,
a.dose_unit_name as dosage_unit,
a.frequency_code as frequency,
a.qty as quantity,
null as quantity_uom,
a.route_name as roa,
a.specs as drug_spec,
null as days_supply,
b.provider_id
from t7 a
join t6 b on a.visit_record_id = b.visit_id;
"""
]
# 调用函数
execute_queries_and_write_to_csv(queries, output_path)
from data_query import *
def etl_lab_result_cm(pv_id,d_start_time,d_end_time,d_start_time_utc,d_end_time_utc,d_start_time2_utc,d_end_time2_utc):
output_path = r'./lab_result_cm.csv'
queries = [
f"""
WITH
t1 AS (SELECT DISTINCT
pat_base_id AS patient_id,
visit_record_id AS visit_id,
organization_id AS provider_id
FROM iceberg.cdm.visit_record
WHERE organization_id = '{pv_id}'
),
t2 AS (SELECT DISTINCT
visit_record_id AS visit_id
FROM iceberg.cdm.patient_diagnosis
WHERE diagnosis_name LIKE '%糖尿病%'
AND diagnosis_time >= TIMESTAMP '{d_start_time2_utc}'
AND diagnosis_time < TIMESTAMP '{d_end_time2_utc}'
),
t3 AS (SELECT DISTINCT
visit_record_id AS visit_id,
date_of_visiting AS admission_datetime,
NULL AS discharge_datetime,
specialty_name AS specialty,
CASE
WHEN is_emergency = TRUE THEN '急诊'
ELSE '门诊'
END AS patient_type
FROM iceberg.cdm.outpatient_record
WHERE CAST(date_of_visiting AS DATE) >= DATE '{d_start_time}'
AND CAST(date_of_visiting AS DATE) <= DATE '{d_end_time}'),
t4 AS (SELECT DISTINCT a.patient_id
FROM t1 a
JOIN t2 b ON a.visit_id = b.visit_id),
t5 AS (SELECT b.patient_id, b.visit_id, b.provider_id
FROM t4 a
JOIN t1 b ON a.patient_id = b.patient_id),
t6 AS (SELECT DISTINCT a.patient_id,
a.visit_id,
b.specialty,
b.patient_type,
a.provider_id
FROM t5 a
JOIN t3 b ON a.visit_id = b.visit_id),
t7 AS (
select DISTINCT * from iceberg.cdm.lab_report_result
where (test_item_name ~* 'C肽|C-PR' and test_item_name ~* '空腹|1|60|2|120|3|180')
or (test_item_name ~* '空腹|FPG|空腹血糖' and test_item_name ~* '血')
or (test_item_name ~* 'OGTT|耐量|负荷' and test_item_name ~* '2|120')
)
select DISTINCT
b.patient_id,
b.visit_id,
b.patient_type,
a.report_name as lab_name,
c.test_item_name as lab_item_name,
null as std_lab_item_name,
-- a.specimen_name as specimen_source,
null as specimen_source,
null as lab_order_datetime,
a.specimen_collected_time as specimen_datetime,
a.report_time as result_datetime,
c.numerical_value as result_num,
c.unit_name as result_unit,
c.reference_range as result_range,
c.text_value as result_qual,
c.critical_flag as abnormal_ind,
b.provider_id
from (select *
from iceberg.cdm.lab_report
where report_time >= TIMESTAMP '{d_start_time2_utc}'
AND report_time < TIMESTAMP '{d_end_time2_utc}') a
join t6 b
on a.visit_record_id = b.visit_id
join t7 c on c.result_source_id = a.report_source_id;
""",
f"""
WITH
t1 AS (SELECT DISTINCT
pat_base_id AS patient_id,
visit_record_id AS visit_id,
organization_id AS provider_id
FROM iceberg.cdm.visit_record
WHERE organization_id = '{pv_id}'
),
t2 AS (SELECT DISTINCT
visit_record_id AS visit_id
FROM iceberg.cdm.patient_diagnosis
WHERE diagnosis_name LIKE '%糖尿病%'
AND diagnosis_time >= TIMESTAMP '{d_start_time2_utc}'
AND diagnosis_time < TIMESTAMP '{d_end_time2_utc}'
),
t3 AS (SELECT DISTINCT
visit_record_id AS visit_id,
admission_time AS admission_datetime,
discharge_time AS discharge_datetime,
admission_specialty_name AS specialty,
'住院' AS patient_type
FROM iceberg.cdm.inpat_record
WHERE discharge_time >= TIMESTAMP '{d_start_time_utc}'
AND discharge_time < TIMESTAMP '{d_end_time_utc}'
),
t4 AS (SELECT DISTINCT a.patient_id
FROM t1 a
JOIN t2 b ON a.visit_id = b.visit_id),
t5 AS (SELECT b.patient_id, b.visit_id, b.provider_id
FROM t4 a
JOIN t1 b ON a.patient_id = b.patient_id),
t6 AS (SELECT DISTINCT a.patient_id,
a.visit_id,
b.admission_datetime,
b.discharge_datetime,
b.specialty,
b.patient_type,
a.provider_id
FROM t5 a
JOIN t3 b ON a.visit_id = b.visit_id),
t7 AS (
select DISTINCT * from iceberg.cdm.lab_report_result
where (test_item_name ~* 'C肽|C-PR' and test_item_name ~* '空腹|1|60|2|120|3|180')
or (test_item_name ~* '空腹|FPG|空腹血糖' and test_item_name ~* '血')
or (test_item_name ~* 'OGTT|耐量|负荷' and test_item_name ~* '2|120')
)
select DISTINCT
b.patient_id,
b.visit_id,
b.patient_type,
a.report_name as lab_name,
c.test_item_name as lab_item_name,
null as std_lab_item_name,
a.specimen_name as specimen_source,
null as lab_order_datetime,
a.specimen_collected_time as specimen_datetime,
a.report_time as result_datetime,
c.numerical_value as result_num,
c.unit_name as result_unit,
c.reference_range as result_range,
c.text_value as result_qual,
c.critical_flag as abnormal_ind,
b.provider_id
from (select *
from iceberg.cdm.lab_report
where report_time >= TIMESTAMP '{d_start_time2_utc}'
AND report_time < TIMESTAMP '{d_end_time2_utc}') a
join t6 b
on a.visit_record_id = b.visit_id
join t7 c on c.result_source_id = a.report_source_id;
"""
]
# 调用函数
execute_queries_and_write_to_csv(queries, output_path)
from data_query import *
def etl_patient(pv_id,d_start_time,d_end_time,d_start_time_utc,d_end_time_utc,d_start_time2_utc,d_end_time2_utc):
output_path = r'./patient.csv'
queries = [
f"""
WITH
t1 AS (SELECT DISTINCT
pat_base_id AS patient_id,
visit_record_id AS visit_id,
organization_id AS provider_id
FROM iceberg.cdm.visit_record
WHERE organization_id = '{pv_id}'
),
t2 AS (SELECT DISTINCT
visit_record_id AS visit_id
FROM iceberg.cdm.patient_diagnosis
WHERE diagnosis_name LIKE '%糖尿病%'
AND diagnosis_time >= TIMESTAMP '{d_start_time2_utc}'
AND diagnosis_time < TIMESTAMP '{d_end_time2_utc}'),
t3 AS (SELECT DISTINCT
visit_record_id AS visit_id,
date_of_visiting AS admission_datetime,
NULL AS discharge_datetime,
specialty_name AS specialty,
CASE
WHEN is_emergency = TRUE THEN '急诊'
ELSE '门诊'
END AS patient_type
FROM iceberg.cdm.outpatient_record
WHERE CAST(date_of_visiting AS DATE) >= DATE '{d_start_time}'
AND CAST(date_of_visiting AS DATE) <= DATE '{d_end_time}'),
t4 AS (SELECT DISTINCT a.patient_id
FROM t1 a
JOIN t2 b ON a.visit_id = b.visit_id),
t5 AS (SELECT b.patient_id, b.visit_id, b.provider_id
FROM t4 a
JOIN t1 b ON a.patient_id = b.patient_id),
t6 AS (SELECT DISTINCT a.patient_id,
a.visit_id,
b.admission_datetime,
b.discharge_datetime,
b.specialty,
b.patient_type,
a.provider_id
FROM t5 a
JOIN t3 b ON a.visit_id = b.visit_id)
SELECT DISTINCT
a.patient_id,
b.gender_name as raw_sex,
CASE
WHEN b.gender_name = '男性' THEN '男'
WHEN b.gender_name = '女性' THEN '女'
ELSE null END AS sex,
b.date_of_birth as birth_date,
a.provider_id
FROM t6 a
join (select * from iceberg.cdm.patient_base_info
WHERE organization_id = '{pv_id}'
) b
on a.provider_id = b.organization_id
and a.patient_id = b.pat_base_id;
""",
f"""
WITH
t1 AS (SELECT DISTINCT
pat_base_id AS patient_id,
visit_record_id AS visit_id,
organization_id AS provider_id
FROM iceberg.cdm.visit_record
WHERE organization_id = '{pv_id}'
),
t2 AS (SELECT DISTINCT
visit_record_id AS visit_id
FROM iceberg.cdm.patient_diagnosis
WHERE diagnosis_name LIKE '%糖尿病%'),
t3 AS (SELECT DISTINCT
visit_record_id AS visit_id,
admission_time AS admission_datetime,
discharge_time AS discharge_datetime,
admission_specialty_name AS specialty,
'住院' AS patient_type
FROM iceberg.cdm.inpat_record
WHERE discharge_time >= TIMESTAMP '{d_start_time2_utc}'
AND discharge_time < TIMESTAMP '{d_end_time2_utc}'
),
t4 AS (SELECT DISTINCT a.patient_id
FROM t1 a
JOIN t2 b ON a.visit_id = b.visit_id),
t5 AS (SELECT b.patient_id, b.visit_id, b.provider_id
FROM t4 a
JOIN t1 b ON a.patient_id = b.patient_id),
t6 AS (SELECT DISTINCT a.patient_id,
a.visit_id,
b.admission_datetime,
b.discharge_datetime,
b.specialty,
b.patient_type,
a.provider_id
FROM t5 a
JOIN t3 b ON a.visit_id = b.visit_id)
SELECT DISTINCT
a.patient_id,
b.gender_name as raw_sex,
CASE
WHEN b.gender_name = '男性' THEN '男'
WHEN b.gender_name = '女性' THEN '女'
ELSE null END AS sex,
b.date_of_birth as birth_date,
a.provider_id
FROM t6 a
join (select * from iceberg.cdm.patient_base_info
WHERE organization_id = '{pv_id}'
) b
on a.provider_id = b.organization_id
and a.patient_id = b.pat_base_id;
"""
]
# 调用函数
execute_queries_and_write_to_csv(queries, output_path)
from data_query import *
def etl_visit(pv_id,d_start_time,d_end_time,d_start_time_utc,d_end_time_utc,d_start_time2_utc,d_end_time2_utc):
output_path = r'./visit.csv'
queries = [
f"""
WITH
t1 AS (SELECT DISTINCT
pat_base_id AS patient_id,
visit_record_id AS visit_id,
organization_id AS provider_id
FROM iceberg.cdm.visit_record
WHERE organization_id = '{pv_id}'
),
t2 AS (SELECT DISTINCT
visit_record_id AS visit_id
FROM iceberg.cdm.patient_diagnosis
WHERE diagnosis_name LIKE '%糖尿病%'
AND diagnosis_time >= TIMESTAMP '{d_start_time2_utc}'
AND diagnosis_time < TIMESTAMP '{d_end_time2_utc}'),
t3 AS (SELECT DISTINCT
visit_record_id AS visit_id,
date_of_visiting AS admission_datetime,
NULL AS discharge_datetime,
specialty_name AS specialty,
CASE
WHEN is_emergency = TRUE THEN '急诊'
ELSE '门诊'
END AS patient_type
FROM iceberg.cdm.outpatient_record
WHERE CAST(date_of_visiting AS DATE) >= DATE '{d_start_time}'
AND CAST(date_of_visiting AS DATE) <= DATE '{d_end_time}'),
t4 AS (SELECT DISTINCT a.patient_id
FROM t1 a
JOIN t2 b ON a.visit_id = b.visit_id),
t5 AS (SELECT b.patient_id, b.visit_id, b.provider_id
FROM t4 a
JOIN t1 b ON a.patient_id = b.patient_id),
t6 AS (SELECT DISTINCT a.patient_id,
a.visit_id,
b.admission_datetime,
b.discharge_datetime,
b.specialty,
b.patient_type,
a.provider_id
FROM t5 a
JOIN t3 b ON a.visit_id = b.visit_id)
select * from t6;
""",
f"""
WITH
t1 AS (SELECT DISTINCT
pat_base_id AS patient_id,
visit_record_id AS visit_id,
organization_id AS provider_id
FROM iceberg.cdm.visit_record
WHERE organization_id = '{pv_id}'
),
t2 AS (SELECT DISTINCT
visit_record_id AS visit_id
FROM iceberg.cdm.patient_diagnosis
WHERE diagnosis_name LIKE '%糖尿病%'
AND diagnosis_time >= TIMESTAMP '{d_start_time2_utc}'
AND diagnosis_time < TIMESTAMP '{d_end_time2_utc}'
),
t3 AS (SELECT DISTINCT
visit_record_id AS visit_id,
admission_time AS admission_datetime,
discharge_time AS discharge_datetime,
admission_specialty_name AS specialty,
'住院' AS patient_type
FROM iceberg.cdm.inpat_record
WHERE discharge_time >= TIMESTAMP '{d_start_time_utc}'
AND discharge_time < TIMESTAMP '{d_end_time_utc}'
),
t4 AS (SELECT DISTINCT a.patient_id
FROM t1 a
JOIN t2 b ON a.visit_id = b.visit_id),
t5 AS (SELECT b.patient_id, b.visit_id, b.provider_id
FROM t4 a
JOIN t1 b ON a.patient_id = b.patient_id),
t6 AS (SELECT DISTINCT a.patient_id,
a.visit_id,
b.admission_datetime,
b.discharge_datetime,
b.specialty,
b.patient_type,
a.provider_id
FROM t5 a
JOIN t3 b ON a.visit_id = b.visit_id)
select * from t6;
"""
]
# 调用函数
execute_queries_and_write_to_csv(queries, output_path)
import logging
from datetime import datetime, timedelta
from flightsql import FlightSQLClient
import pandas as pd
import os
class BeijingTimeFormatter(logging.Formatter):
"""自定义日志格式器,将日志时间戳调整为北京时间(UTC+8)。"""
def formatTime(self, record, datefmt=None):
bj_time = datetime.fromtimestamp(record.created) + timedelta(hours=8)
return bj_time.strftime('%Y-%m-%d %H:%M:%S')
# 创建日志处理器
file_handler = logging.FileHandler('etl_run.log')
stream_handler = logging.StreamHandler()
# 设置自定义格式器
formatter = BeijingTimeFormatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
stream_handler.setFormatter(formatter)
# 使用自定义格式器配置日志
logging.basicConfig(
level=logging.INFO,
handlers=[file_handler, stream_handler]
)
def execute_query(sql_query):
"""
执行SQL查询并返回结果为pandas DataFrame。
:param sql_query: str, SQL查询语句
:return: 包含查询结果的pandas DataFrame
"""
try:
# 创建FlightSQLClient实例
client = FlightSQLClient(host='192.168.101.45', port=50802,
insecure=True, disable_server_verification=True, token=True)
# 执行SQL查询并获取结果信息
info = client.execute(sql_query)
# 初始化一个空列表以存储数据框
data_frames = []
# 遍历所有端点,获取数据并转换为DataFrame
for endpoint in info.endpoints:
reader = client.do_get(endpoint.ticket)
data_frame = reader.read_all().to_pandas()
data_frames.append(data_frame)
# 合并所有数据框
final_data_frame = pd.concat(data_frames, ignore_index=True)
return final_data_frame
except Exception as e:
logging.error(f"发生错误: {e}")
return None
def execute_queries_and_write_to_csv(queries, output_path):
"""
执行多个查询并将结果追加到CSV文件。
:param queries: SQL查询语句列表
:param output_path: 输出CSV文件路径
"""
for i, query in enumerate(queries):
df = execute_query(query)
# 根据索引确定查询类型
query_type = "门诊" if i == 0 else "住院"
# 检查DataFrame是否为空
if df is None or df.empty:
log_message = f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - {output_path} {query_type} 查询无结果,跳过写入。"
logging.info(log_message)
continue # 跳过此次迭代
# 记录行数并写入日志
log_message = f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - {output_path} {query_type} 查询行数: {len(df)}"
logging.info(log_message)
# 追加到CSV文件,检查文件是否存在以决定是否写入表头
if not os.path.exists(output_path):
df.to_csv(output_path, index=False, encoding='utf-8-sig', mode='w', header=True)
else:
df.to_csv(output_path, index=False, encoding='utf-8-sig', mode='a', header=False)
# 记录成功写入
log_message = f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 成功将{query_type}查询结果写入{output_path}, 行数: {len(df)}"
logging.info(log_message)
# 示例用法
if __name__ == "__main__":
queries = ["SELECT * FROM iceberg.cdm.outpatient_record LIMIT 10;"]
output_path = "./patient.csv"
execute_queries_and_write_to_csv(queries, output_path)
import adbc_driver_manager
import adbc_driver_flightsql.dbapi as flight_sql
import pandas as pd
def execute_query(sql_query):
"""
执行SQL查询并返回结果as pandas DataFrame。
:param sql_query: str, SQL查询语句
:return: pandas DataFrame 包含查询结果
"""
try:
# 建立连接
conn = flight_sql.connect(uri="grpc://192.168.101.45:50802")
cursor = conn.cursor()
# 执行查询
cursor.execute(sql_query)
# 使用 fetchallarrow() 获取 Arrow 格式结果
result_arrow = cursor.fetchallarrow()
# 打印查询结果的行数
print("查询结果的行数:", result_arrow.num_rows)
# 将 Arrow 表转换为 pandas DataFrame
result_df = result_arrow.to_pandas()
return result_df
except Exception as e:
print(f"发生错误: {e}")
return None
finally:
# 确保连接关闭
if 'cursor' in locals():
cursor.close()
if 'conn' in locals():
conn.close()
# 使用示例
if __name__ == "__main__":
query = "SELECT * FROM iceberg.cdm.outpatient_record LIMIT 10;"
df = execute_query(query)
if df is not None:
print(df)
else:
print("Query failed to execute.")
import pandas as pd
import os
def deduplicate_csv_files():
# 获取当前目录下所有的.csv文件
csv_files = [f for f in os.listdir('.') if f.endswith('.csv')]
for file in csv_files:
# 读取CSV文件
df = pd.read_csv(file, low_memory=False)
# 去重
df_deduplicated = df.drop_duplicates()
print(f"原始数据行数({file}): {len(df)}。去重后数据行数({file}): {len(df_deduplicated)}")
# 覆盖原始文件
df_deduplicated.to_csv(file, encoding='utf-8-sig', index=False)
print(f"去重后的数据已覆盖原文件:{file}")
# 调用函数
deduplicate_csv_files()
import duckdb
import pyarrow as pa
import pyarrow.flight as fl
import logging
import os
# 设置日志格式和级别,方便记录服务器运行中的相关信息
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class DuckDBFlightServer(fl.FlightServerBase):
def __init__(self, db_path):
# 使用 "grpc://0.0.0.0:8815" 作为 location 的格式,监听所有可用网络接口的8815端口
super().__init__(location="grpc://0.0.0.0:8815")
try:
self.connection = duckdb.connect(db_path)
logging.info(f"Successfully connected to DuckDB database at {db_path}")
except duckdb.Error as e:
logging.error(f"Failed to connect to DuckDB database: {e}")
raise # 重新抛出异常,避免程序继续执行出现问题
def do_get(self, context, ticket):
try:
query = ticket.ticket.decode('utf-8')
table = self.connection.execute(query).fetchdf()
# 将 DataFrame 转换为 Arrow Table
arrow_table = pa.Table.from_pandas(table)
# 使用 record_batches 方法返回结果
return pa.flight.RecordBatchStream(arrow_table)
except duckdb.Error as e:
logging.error(f"Error executing query in do_get: {e}")
context.set_error(fl.FlightError(
code=fl.FlightStatusCode.INTERNAL_SERVER_ERROR,
message="Database query error"
))
return None
except UnicodeDecodeError as e:
logging.error(f"Error decoding ticket in do_get: {e}")
context.set_error(fl.FlightError(
code=fl.FlightStatusCode.BAD_REQUEST,
message="Invalid ticket encoding"
))
return None
def do_put(self, context, descriptor, reader):
try:
# 这里简单示例将传入的数据解析后插入到名为 'your_table_name' 的表中,假设表结构和数据格式匹配
# 实际应用中需要根据具体业务需求和数据格式来准确处理写入逻辑
table_name = descriptor.path[0].decode('utf-8') # 获取表名(假设表名放在 descriptor.path 中,需按实际情况调整)
arrow_table = reader.read_all() # 读取所有传入的数据记录批次
df = arrow_table.to_pandas() # 转换为 pandas DataFrame
self.connection.execute(f"INSERT INTO {table_name} SELECT * FROM df")
logging.info(f"Successfully inserted data into table {table_name}")
context.set_success()
except duckdb.Error as e:
logging.error(f"Error executing put operation: {e}")
context.set_error(fl.FlightError(
code=fl.FlightStatusCode.INTERNAL_SERVER_ERROR,
message="Error during data insertion"
))
except UnicodeDecodeError as e:
logging.error(f"Error decoding table name: {e}")
context.set_error(fl.FlightError(
code=fl.FlightStatusCode.BAD_REQUEST,
message="Invalid table name encoding"
))
def main():
db_path = os.path.join(os.path.dirname(__file__), 'exported.duckdb')
server = DuckDBFlightServer(db_path)
print("Starting Flight Server on port 8815...")
try:
server.serve()
except Exception as e:
logging.error(f"Error starting the Flight Server: {e}")
if __name__ == "__main__":
main()
\ No newline at end of file
from flightsql import FlightSQLClient
import socket
import pandas as pd # 确保导入 pandas
from flightsql import __version__ as flightsql_version
print(f"flightsql 版本:{flightsql_version}")
def test_connection(host, port):
try:
with socket.create_connection((host, port), timeout=5):
print(f"成功连接到 {host}:{port}")
except OSError as e:
print(f"无法连接到 {host}:{port},错误: {e}")
# 创建 FlightSQLClient 实例
client = FlightSQLClient(host='192.168.101.45', port=50802, insecure=True, disable_server_verification=True, token=True)
# 测试连接
test_connection('192.168.101.45', 50802)
# 执行 SQL 查询并获取结果信息
info = client.execute("SELECT * FROM iceberg.cdm.outpatient_record LIMIT 10000000")
# 初始化一个空的列表来存储数据框
data_frames = []
# 遍历所有端点,获取数据并转换为 DataFrame
for endpoint in info.endpoints:
reader = client.do_get(endpoint.ticket)
data_frame = reader.read_all().to_pandas()
data_frames.append(data_frame)
# 合并所有数据框
final_data_frame = pd.concat(data_frames, ignore_index=True)
# 输出最终的数据
print(final_data_frame)
import pandas as pd
import os
def deduplicate_csv_files():
# 获取当前目录下所有的.csv文件
csv_files = [f for f in os.listdir('.') if f.endswith('.csv')]
for file in csv_files:
# 读取CSV文件
df = pd.read_csv(file)
# 去重
df_deduplicated = df.drop_duplicates()
print(f"原始数据行数({file}): {len(df)}。去重后数据行数({file}): {len(df_deduplicated)}")
# 输出到新文件
new_filename = f"{os.path.splitext(file)[0]}_unique.csv"
df_deduplicated.to_csv(new_filename, index=False)
print(f"去重后的数据已保存到:{new_filename}")
# 调用函数
deduplicate_csv_files()
from data_query import *
queries = """
select DISTINCT numerical_value,normal_low,normal_high ,count(*) from iceberg.cdm.lab_report_result
where test_item_name = 'BP' group by test_item_name,numerical_value,normal_low,normal_high
"""
"""
queries =
select DISTINCT test_item_name,count(*) from iceberg.cdm.lab_report_result
where test_item_name ~* '血压|舒张压|收缩压|BP|SBP|DBP' group by test_item_name
"""
# 调用函数
a = execute_query(queries)
print(a)
\ No newline at end of file
差异被折叠。
差异被折叠。
差异被折叠。
差异被折叠。
关于 2, 3 部分 南京内部是以SQL查询服务的方式提供, Schema有文档描述, 访问接口是 Arrow Flight SQL
Flight SQL 有不同语言的Client 实现
https://arrow.apache.org/docs/format/FlightSql.html
https://pypi.org/project/adbc-driver-flightsql/ (python)
https://arrow.apache.org/docs/java/flight_sql_jdbc_driver.html (java)
https://pkg.go.dev/github.com/apache/arrow/go/v12/arrow/flight/flightsql (golang)
-江苏省人民医院 用药化验 50 化验没有提交
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论