提交 3abbf97a authored 作者: 宋宏伟's avatar 宋宏伟

update

上级 8dc73b9f
codes/etl/data-integration
**/*.csv
**/*.parquet
venv
*.csv
*.parquet
*.log
codes/etl/data-integration/
codes/transform/__pycache__/
.idea
.Rproj.user
## 添加nginx版本
FROM rocker/shiny:4.3.2
# 配置环境变量
ENV PYTHON_VER=3.8.18
ENV BASE_PATH='/opt'
ENV JAVA_HOME=${BASE_PATH}/jdk11
ENV PATH=${JAVA_HOME}/bin:$PATH
ENV KETTLE_HOME=${BASE_PATH}/in2-t2dm/config/etl
# 复制基础文件
WORKDIR ${BASE_PATH}
COPY . ${BASE_PATH}/in2-t2dm/
#添加系统repo镜像源
RUN echo "deb http://mirrors.aliyun.com/ubuntu/ jammy main restricted universe multiverse\n\
deb-src http://mirrors.aliyun.com/ubuntu/ jammy main restricted universe multiverse\n\
deb http://mirrors.aliyun.com/ubuntu/ jammy-security main restricted universe multiverse\n\
deb-src http://mirrors.aliyun.com/ubuntu/ jammy-security main restricted universe multiverse\n\
deb http://mirrors.aliyun.com/ubuntu/ jammy-updates main restricted universe multiverse\n\
deb-src http://mirrors.aliyun.com/ubuntu/ jammy-updates main restricted universe multiverse\n\
deb http://mirrors.aliyun.com/ubuntu/ jammy-proposed main restricted universe multiverse\n\
deb-src http://mirrors.aliyun.com/ubuntu/ jammy-proposed main restricted universe multiverse\n\
deb http://mirrors.aliyun.com/ubuntu/ jammy-backports main restricted universe multiverse\n\
deb-src http://mirrors.aliyun.com/ubuntu/ jammy-backports main restricted universe multiverse" > /etc/apt/sources.list
# 更新软件包列表,安装Python和nginx相关包,清理缓存
RUN apt-get update && \
apt-get install -y xz-utils build-essential zlib1g-dev libncurses5-dev libgdbm-dev libnss3-dev libssl-dev libsqlite3-dev libreadline-dev libffi-dev curl libbz2-dev pkg-config make nginx cron&& \
rm -rf /var/lib/apt/lists/* && \
mkdir -p ${BASE_PATH}/in2-t2dm/
# 安装Python
COPY --from=registry.cn-hangzhou.aliyuncs.com/palan/in2-dependency:1.0 ${BASE_PATH}/Python-${PYTHON_VER}.tar.xz ${BASE_PATH}/
RUN tar -xf Python-${PYTHON_VER}.tar.xz && \
cd Python-${PYTHON_VER} && \
./configure --enable-optimizations --enable-shared && \
make -j 4 && \
make install && \
ldconfig ${BASE_PATH}/Python${PYTHON_VER} && \
pip3 install --no-cache-dir -i https://mirrors.aliyun.com/pypi/simple -r ${BASE_PATH}/in2-t2dm/codes/transform/requirements.txt && \
rm -rf ${BASE_PATH}/Python-${PYTHON_VER}.tar.xz ${BASE_PATH}/Python-${PYTHON_VER}
# 安装kettle的依赖环境java,解压kettle
COPY --from=registry.cn-hangzhou.aliyuncs.com/palan/in2-dependency:1.0 ${BASE_PATH}/OpenJDK11U-jdk_x64_linux_hotspot_11.0.9.1_1.tar.gz ${BASE_PATH}/
COPY --from=registry.cn-hangzhou.aliyuncs.com/palan/in2-dependency:1.0 ${BASE_PATH}/pdi-ce-9.4.0.0-343.zip ${BASE_PATH}/
RUN unzip -d ${BASE_PATH}/in2-t2dm/codes/etl/ pdi-ce-9.4.0.0-343.zip && \
rm -f ${BASE_PATH}/pdi-ce-9.4.0.0-343.zip && \
tar -zxvf ${BASE_PATH}/OpenJDK11U-jdk_x64_linux_hotspot_11.0.9.1_1.tar.gz -C ${BASE_PATH}/ && \
mv ${BASE_PATH}/jdk-11.0.9.1+1 ${BASE_PATH}/jdk11 && \
rm -rf ${BASE_PATH}/OpenJDK11U-jdk_x64_linux_hotspot_11.0.9.1_1.tar.gz
# 安装R包
RUN R -e "install.packages(c('pacman', 'here', 'rio', 'sp', 'shiny', 'shinydashboard', 'webshot', 'png', 'plotly', 'lubridate', 'showtext'),repos='https://mirrors.tuna.tsinghua.edu.cn/CRAN/')" && \
R -e "install.packages(c('textshaping','ragg','rvest','xml2','gtsummary','gt','arrow', 'tidyverse'))"
# 配置nginx相关的目录
RUN mv /etc/nginx/* ${BASE_PATH}/in2-t2dm/codes/nginx/ && \
rm -rf /etc/nginx && \
ln -s ${BASE_PATH}/in2-t2dm/codes/nginx /etc/nginx && \
mv ${BASE_PATH}/in2-t2dm/codes/nginx/nginx.conf ${BASE_PATH}/in2-t2dm/config/nginx/nginx.conf && \
ln -s ${BASE_PATH}/in2-t2dm/config/nginx/nginx.conf ${BASE_PATH}/in2-t2dm/codes/nginx/nginx.conf && \
mv /var/log/nginx/* ${BASE_PATH}/in2-t2dm/logs/nginx/ && \
rm -rf /var/log/nginx && \
ln -s ${BASE_PATH}/in2-t2dm/logs/nginx /var/log/nginx
.PHONY: fridaytools fridaytools-ali
fridaytools:
docker build . -t fridaytools:0.44
fridaytools-ali:
docker build . -t registry.cn-hangzhou.aliyuncs.com/palan/fridaytools:0.44
## 项目结构
```
in2-t2dm
├── codes [各类代码目录]
│   ├── admin [管理控制台代码,密码可以配置到外部的compsoe文件中]
│   ├── bash [各类调用的脚本和清理脚本]
│   ├── etl
│   ├── nginx [已安装软件未配置,容器内部的nginx代码,3837:/shiny/,80:/admin/]
│   ├── preprocess
│   ├── shiny
│   └── transform
├── config [各类配置文件目录]
│   ├── crontab
│   ├── etl [kettle file,密码不使用明文]
│   ├── nginx
│   ├── shiny
│   └── transform
├── data [数据目录]
│   ├── cleandata [通过标化代码处理后的数据]
│   ├── preprocessed 成的summary数据]
│   └── rawdata [通过kettle获取的数据]
├── docker-compose.yml
├── Dockerfile
├── docs
├── logs [各类日志文件]
│   ├── admin
│   ├── crontab
│   ├── etl
│   ├── nginx
│   ├── preprocess
│   ├── shiny
│   └── transform
├── Makefile
├── README.md
└── shinyserver
```
## 代码执行指令
```
# 执行etl处理程序
docker-compose up etl
# 执行transform处理程序
docker-compose up transform
# 执行数据预处理程序
docker-compose up preprocess
# 执行shiny展示程序
docker-compose up -d shiny
```
上述指令会调用./codes/bash/目录下的脚本,需要执行或部分调整的shell指令,可以在脚本中进行编辑。
## 项目更新与部署流程
### 代码合并
1. in-t2dm的各分支目录均与“项目结构保持一致”,开发工作在各分支进行。
2. 当发生代码变更后,会提交到master分支合并,经过审核后完成合并。
3. 在需要部署的环境中pull master分支的内容。
### 镜像更新
- 在项目目录中执行make fridaytools,会执行Makefile中配置的docker build指令;
- docker build会执行按照Dockerfile文件内容进行构建;
- 构建的过程中会使用到registry.cn-hangzhou.aliyuncs.com/palan/in2-dependency:1.0镜像,该镜像为jdk、kettle、python的安装包。详情见in2-dependency项目。
## 说明
### 软件版本和包说明
软件版本:
- R:4.3.2
- shiny: 1.5.22.1014
- Python:3.8.18
- kettle:9.4
- openjdk:11.0.9.1
- ubuntu:22.04
python包:
- pandas==2.0.3
- pyarrow==14.0.2
R包:
- pacman
- here
- rio
- sp
- shiny
- shinydashboard
- webshot
- png
- plotly
- lubridate
- showtext
- textshaping
- ragg
- rvest
- xml2
- gtsummary
- gt
- arrow
- tidyverse
### kettle数据库密码加密
加密指令如下:
```
[root@develop in2-t2dm]# docker run --rm fridaytools:0.43 bash -c "/opt/in2-t2dm/codes/etl//data-integration/encr.sh 123"
Encrypted 2be98afc86aa7f2e4cb79ce10bec3fd89
```
加密后的kettle.properties配置文件片段如下:
```
dt_postgresql_password=Encrypted 2be98afc86aa7f2e4cb79ce10bec3fd89
```
## 其他问题
* 文件命名规范问题,参照[Google 开源项目风格指南](https://zh-google-styleguide.readthedocs.io/en/latest/contents/)和Github知名开源项目。
* 文件名和目录默认要全部小写, 可以包含下划线 (`_`) 或连字符 (`-`), 依照项目的约定, 如果没有约定, 那么 “`_`” 更好。所有名称都必须使用半角字符。
* 特殊的文件大小写可以按照业界习惯来书写。例如:'README.md、Dockerfile、Makefile、\*.R'。
* 同一个项目中,相同类型的文件命名大小写和连接线必须保持一致,例如项目中生成的多份数据文件,文件命名风格必须统一。
* 所有文件名和目录命名不允许有其他符号,包括空格符。
* 需要考虑代码的可部署性。避免在项目中使用绝对路径,使用相对路径时,要在代码文件的顶部进行定义,便于进行调整和修改。
* 项目依赖包文件,便于部署时明确项目所使用的包和版本。需要提供项目所使用的依赖文件,在R中为'DESCRIPTION'文件,在Python中的'requirements.*txt*'文件,这类文件通常使用工具来生成。
* 项目代码使用git管理时,要注意使用'.gitignore'文件,避免本地数据文件或敏感文件被提交到线上。
# in2dm南京
\ No newline at end of file
#!/bin/bash
cd /opt/in2-t2dm/codes/etl/python/
python3 ETL_all.py
\ No newline at end of file
#!/bin/bash
cd /opt/in2-t2dm/codes/preprocess/R/
Rscript dataset_summary.R 2>&1 | tee -a /opt/in2-t2dm/logs/preprocess/preprocess_$(date +%Y%m%d).log
#!/bin/bash
service cron start
cd /opt/in2-t2dm/codes/shiny/
mv in2_t2dm_shiny_v0.1.R app.R
/init
#!/bin/bash
cd /opt/in2-t2dm/codes/transform/;export PYTHONPATH='/opt/in2-t2dm:$PYTHONPATH'
python3 data_governance.py
import time
from datetime import datetime, timedelta
import logging
from logging_config import setup_logging
from configparser import RawConfigParser
from ETL_diagnosis import *
from ETL_drug import *
from ETL_lab import *
from ETL_patient import *
from ETL_visit import *
from de_weight_csv import *
class BeijingTimeFormatter(logging.Formatter):
"""Custom logging formatter to adjust log timestamps to Beijing time (UTC+8)."""
def formatTime(self, record, datefmt=None):
ct = datetime.fromtimestamp(record.created) + timedelta(hours=8) # Adjust for Beijing time
if datefmt:
s = ct.strftime(datefmt)
else:
try:
s = ct.isoformat(timespec='milliseconds')
except TypeError:
s = ct.isoformat()
return s
# 调用 setup_logging 函数进行日志配置
setup_logging()
def format(self, record):
record.asctime = self.formatTime(record, self.datefmt)
return super(BeijingTimeFormatter, self).format(record)
# 创建日志处理器
file_handler = logging.FileHandler('etl_run.log')
stream_handler = logging.StreamHandler()
# 设置自定义的时间格式化器
formatter = BeijingTimeFormatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
stream_handler.setFormatter(formatter)
# 设置日志配置,使用自定义的时间格式化器
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[file_handler, stream_handler]
)
def parse_date(date_string):
"""解析日期字符串为 datetime 对象,捕获并处理可能的格式错误"""
......@@ -48,6 +21,7 @@ def parse_date(date_string):
print(f"日期格式错误: {date_string}. 错误信息: {e}")
raise # 向上抛出异常以停止处理
def run_etl(pv_id, table_name, data_start_time, data_end_time):
data_start_times = parse_date(data_start_time)
data_end_times = parse_date(data_end_time)
......@@ -66,47 +40,46 @@ def run_etl(pv_id, table_name, data_start_time, data_end_time):
logging.info(log_message)
if table_name == 'patient':
etl_patient(pv_id, data_start_time, data_end_time, data_start_times_utc, data_end_times_utc,
data_start_times_2_utc, data_end_times_2_utc)
etl_patient(data_start_time, data_end_time, data_start_times_utc, data_end_times_utc,
data_start_times_2_utc, data_end_times_2_utc, pv_id)
elif table_name == 'visit':
etl_visit(pv_id, data_start_time, data_end_time, data_start_times_utc, data_end_times_utc,
data_start_times_2_utc, data_end_times_2_utc)
etl_visit(data_start_time, data_end_time, data_start_times_utc, data_end_times_utc,
data_start_times_2_utc, data_end_times_2_utc, pv_id)
elif table_name == 'prescribing':
etl_prescribing(pv_id, data_start_time, data_end_time, data_start_times_utc, data_end_times_utc,
data_start_times_2_utc, data_end_times_2_utc)
etl_prescribing(data_start_time, data_end_time, data_start_times_utc, data_end_times_utc,
data_start_times_2_utc, data_end_times_2_utc, pv_id)
elif table_name == 'diagnosis':
etl_diagnosis(pv_id, data_start_time, data_end_time, data_start_times_utc, data_end_times_utc,
data_start_times_2_utc, data_end_times_2_utc)
etl_diagnosis(data_start_time, data_end_time, data_start_times_utc, data_end_times_utc,
data_start_times_2_utc, data_end_times_2_utc, pv_id)
elif table_name == 'lab':
etl_lab_result_cm(pv_id, data_start_time, data_end_time, data_start_times_utc, data_end_times_utc,
data_start_times_2_utc, data_end_times_2_utc)
etl_lab_result_cm(data_start_time, data_end_time, data_start_times_utc, data_end_times_utc,
data_start_times_2_utc, data_end_times_2_utc, pv_id)
except Exception as e:
print(f"ETL 任务执行失败: {e}")
logging.error(f"ETL 任务执行失败: {e}")
date_ranges = [
["2021-01-01", "2021-07-01"],
["2021-07-01", "2022-01-01"],
["2022-01-01", "2022-07-01"],
["2022-07-01", "2023-01-01"],
["2023-01-01", "2023-07-01"],
["2023-07-01", "2024-01-01"],
["2024-01-01", "2024-07-01"],
["2024-07-01", "2024-10-01"],
]
pv_ids = ['320106426090445', '320104466002630', '320106466000838']
tables = ['patient', 'visit', 'prescribing', 'diagnosis', 'lab']
etl_start_time = time.time()
for pv_id in pv_ids:
for table_name in tables:
for start_date, end_date in date_ranges:
run_etl(pv_id, table_name, start_date, end_date)
etl_end_time = time.time()
total_time = etl_end_time - etl_start_time
print(f"所有 ETL 任务共耗时: {total_time:.2f} 秒")
logging.info(f"所有 ETL 任务共耗时: {total_time:.2f} 秒")
if __name__ == "__main__":
conf_path = '../../../config/etl/etl_config.ini' # 数据提取配置文件
config = RawConfigParser()
config.optionxform = str # 禁用键的小写转换
config.read(conf_path, encoding='utf-8')
# 获取机构id列表
pv_ids = eval(config.get('pv_ids', 'pv_ids'))
# 获取时间范围列表
date_ranges = eval(config.get('date_ranges', 'date_ranges'))
# 获取提取表格名称列表
tables = eval(config.get('tables', 'tables'))
etl_start_time = time.time()
for pv_id in pv_ids:
for table_name in tables:
for start_date, end_date in date_ranges:
run_etl(pv_id, table_name, start_date, end_date)
# 数据去重
deduplicate_csv_files()
etl_end_time = time.time()
total_time = etl_end_time - etl_start_time
print(f"所有 ETL 任务共耗时: {total_time:.2f} 秒")
logging.info(f"所有 ETL 任务共耗时: {total_time:.2f} 秒")
from test2 import *
from de_weight_csv import *
create_and_save_csv()
deduplicate_csv_files()
from data_query import *
def etl_diagnosis(pv_id,d_start_time,d_end_time,d_start_time_utc,d_end_time_utc,d_start_time2_utc,d_end_time2_utc):
output_path = r'./diagnosis.csv'
queries = [
f"""
def etl_diagnosis(d_start_time, d_end_time, d_start_time_utc, d_end_time_utc, d_start_time2_utc, d_end_time2_utc,
pv_id):
output_path = 'diagnosis.csv'
base_queries = [
f"""
WITH
t1 AS (SELECT DISTINCT
pat_base_id AS patient_id,
......@@ -53,11 +54,11 @@ def etl_diagnosis(pv_id,d_start_time,d_end_time,d_start_time_utc,d_end_time_utc,
AND diagnosis_time < TIMESTAMP '{d_end_time2_utc}'
AND (diagnosis_name ~* '白内障|视网膜|眼|黄斑|玻璃体|玻血|网脱|失明|弱视|视力'
OR diagnosis_name ~* '黄斑水肿|失明|眼球?萎缩|眼球?缺失|盲目(3|三)|(视力|视觉)重度|神经'
OR diagnosis_name ~* '截肢|切断|截断|截|蛋白尿|蛋白尿|肾?移植|透析?|尿毒症|CKD(5|Ⅴ|五)|肾.{0,4}终末|终末.{0,4}肾'
OR diagnosis_name ~* '截肢|切断|截断|截|蛋白尿|蛋白尿|肾?移植|透析?|尿毒症|CKD(5|Ⅴ|五)|肾.{0, 4}终末|终末.{0, 4}肾'
OR diagnosis_name ~* '(颈|髂总|髂内|肾脏|肢端|腹主|肢|肾小)?主?动脉(粥样硬|痉挛|坏疽|硬|瘤|炎|栓塞|血栓)?化?|间歇性?跛行|红斑性肢痛|(伯|柏)格|雷诺氏|周围血管疾?病|动脉(肌纤维发育异|坏疽|痉挛)?|主动脉(瘤|炎)?|主动脉(粥样)?硬化|(静脉)?曲张|血栓性静脉|下肢(深静脉血栓|静脉曲张|动脉闭塞|血栓性静脉炎|静脉功能不全|静脉炎|(血管|动脉)闭塞症|静脉肌间血栓形成)?|周围循环'
OR diagnosis_name ~* '冠(心病|状|脉)|旁路移植|搭桥|多支|PCI|心绞痛|动脉硬化.{0,3}心脏病|心(肌|脏)(缺|供)血|缺血性心(脏|肌)病|心肌?梗'
OR diagnosis_name ~* '冠(心病|状|脉)|旁路移植|搭桥|多支|PCI|心绞痛|动脉硬化.{0, 3}心脏病|心(肌|脏)(缺|供)血|缺血性心(脏|肌)病|心肌?梗'
OR diagnosis_name ~* '心梗|心肌梗死|心痛|陈旧(性|型)?(心|ST|非ST|Q|前|侧|下|高|间|广泛|(左|右)心室)|心肌梗塞|胸痹'
OR diagnosis_name ~* '脑.{0,3}(梗|塞|死)|卒中|中风'
OR diagnosis_name ~* '脑.{0, 3}(梗|塞|死)|卒中|中风'
OR diagnosis_name ~* '心(力|室|房)?衰(竭)?|心功能不全|心功能.*级|心源性哮喘|低心排综合征|KILLIP.*级|HBP|高血压'
OR diagnosis_name ~* '血脂异常|(胆固醇|高脂|甘油三?(脂|酯))血症|高血脂|高粘血症|高(密度)?(酯|脂)蛋白|低(密度)?(酯|脂)蛋白|高?三酰甘油'
OR diagnosis_name ~* '糖尿病')
......@@ -79,7 +80,7 @@ def etl_diagnosis(pv_id,d_start_time,d_end_time,d_start_time_utc,d_end_time_utc,
join t6 b on a.visit_record_id = b.visit_id;
""",
f"""
f"""
WITH
t1 AS (SELECT DISTINCT
pat_base_id AS patient_id,
......@@ -129,11 +130,11 @@ def etl_diagnosis(pv_id,d_start_time,d_end_time,d_start_time_utc,d_end_time_utc,
AND diagnosis_time < TIMESTAMP '{d_end_time2_utc}'
AND (diagnosis_name ~* '白内障|视网膜|眼|黄斑|玻璃体|玻血|网脱|失明|弱视|视力'
OR diagnosis_name ~* '黄斑水肿|失明|眼球?萎缩|眼球?缺失|盲目(3|三)|(视力|视觉)重度|神经'
OR diagnosis_name ~* '截肢|切断|截断|截|蛋白尿|蛋白尿|肾?移植|透析?|尿毒症|CKD(5|Ⅴ|五)|肾.{0,4}终末|终末.{0,4}肾'
OR diagnosis_name ~* '截肢|切断|截断|截|蛋白尿|蛋白尿|肾?移植|透析?|尿毒症|CKD(5|Ⅴ|五)|肾.{0, 4}终末|终末.{0, 4}肾'
OR diagnosis_name ~* '(颈|髂总|髂内|肾脏|肢端|腹主|肢|肾小)?主?动脉(粥样硬|痉挛|坏疽|硬|瘤|炎|栓塞|血栓)?化?|间歇性?跛行|红斑性肢痛|(伯|柏)格|雷诺氏|周围血管疾?病|动脉(肌纤维发育异|坏疽|痉挛)?|主动脉(瘤|炎)?|主动脉(粥样)?硬化|(静脉)?曲张|血栓性静脉|下肢(深静脉血栓|静脉曲张|动脉闭塞|血栓性静脉炎|静脉功能不全|静脉炎|(血管|动脉)闭塞症|静脉肌间血栓形成)?|周围循环'
OR diagnosis_name ~* '冠(心病|状|脉)|旁路移植|搭桥|多支|PCI|心绞痛|动脉硬化.{0,3}心脏病|心(肌|脏)(缺|供)血|缺血性心(脏|肌)病|心肌?梗'
OR diagnosis_name ~* '冠(心病|状|脉)|旁路移植|搭桥|多支|PCI|心绞痛|动脉硬化.{0, 3}心脏病|心(肌|脏)(缺|供)血|缺血性心(脏|肌)病|心肌?梗'
OR diagnosis_name ~* '心梗|心肌梗死|心痛|陈旧(性|型)?(心|ST|非ST|Q|前|侧|下|高|间|广泛|(左|右)心室)|心肌梗塞|胸痹'
OR diagnosis_name ~* '脑.{0,3}(梗|塞|死)|卒中|中风'
OR diagnosis_name ~* '脑.{0, 3}(梗|塞|死)|卒中|中风'
OR diagnosis_name ~* '心(力|室|房)?衰(竭)?|心功能不全|心功能.*级|心源性哮喘|低心排综合征|KILLIP.*级|HBP|高血压'
OR diagnosis_name ~* '血脂异常|(胆固醇|高脂|甘油三?(脂|酯))血症|高血脂|高粘血症|高(密度)?(酯|脂)蛋白|低(密度)?(酯|脂)蛋白|高?三酰甘油'
OR diagnosis_name ~* '糖尿病')
......@@ -154,9 +155,12 @@ def etl_diagnosis(pv_id,d_start_time,d_end_time,d_start_time_utc,d_end_time_utc,
from t7 a
join t6 b on a.visit_record_id = b.visit_id;
"""
]
# 调用函数
execute_queries_and_write_to_csv(queries, output_path)
]
# 判断 pv_id 是否为空
if not pv_id:
queries = [query.replace(f"WHERE organization_id = '{pv_id}'", "") for query in base_queries]
else:
queries = base_queries
# 调用函数
execute_queries_and_write_to_csv(queries, output_path)
from data_query import *
def etl_prescribing(pv_id,d_start_time,d_end_time,d_start_time_utc,d_end_time_utc,d_start_time2_utc,d_end_time2_utc):
output_path = r'./prescribing.csv'
queries = [
f"""
def etl_prescribing(d_start_time, d_end_time, d_start_time_utc, d_end_time_utc, d_start_time2_utc, d_end_time2_utc,
pv_id):
output_path = 'prescribing.csv'
base_queries = [
f"""
WITH
t1 AS (SELECT DISTINCT
pat_base_id AS patient_id,
......@@ -73,7 +74,8 @@ def etl_prescribing(pv_id,d_start_time,d_end_time,d_start_time_utc,d_end_time_ut
null as rx_end_datetime,
a.dose as dosage_qty,
a.dose_unit_name as dosage_unit,
a.frequency_code as frequency,
a.frequency_code,
a.frequency_name,
a.qty as quantity,
null as quantity_uom,
a.route_name as roa,
......@@ -84,7 +86,7 @@ def etl_prescribing(pv_id,d_start_time,d_end_time,d_start_time_utc,d_end_time_ut
join t6 b on a.visit_record_id = b.visit_id;
""",
f"""
f"""
WITH
t1 AS (SELECT DISTINCT
pat_base_id AS patient_id,
......@@ -154,7 +156,8 @@ def etl_prescribing(pv_id,d_start_time,d_end_time,d_start_time_utc,d_end_time_ut
a.end_time as rx_end_datetime,
a.dose as dosage_qty,
a.dose_unit_name as dosage_unit,
a.frequency_code as frequency,
a.frequency_code,
a.frequency_name,
a.qty as quantity,
null as quantity_uom,
a.route_name as roa,
......@@ -164,7 +167,11 @@ def etl_prescribing(pv_id,d_start_time,d_end_time,d_start_time_utc,d_end_time_ut
from t7 a
join t6 b on a.visit_record_id = b.visit_id;
"""
]
# 调用函数
execute_queries_and_write_to_csv(queries, output_path)
]
# 判断 pv_id 是否为空
if not pv_id:
queries = [query.replace(f"WHERE organization_id = '{pv_id}'", "") for query in base_queries]
else:
queries = base_queries
# 调用函数
execute_queries_and_write_to_csv(queries, output_path)
from data_query import *
def etl_lab_result_cm(pv_id,d_start_time,d_end_time,d_start_time_utc,d_end_time_utc,d_start_time2_utc,d_end_time2_utc):
output_path = r'./lab_result_cm.csv'
queries = [
f"""
def etl_lab_result_cm(d_start_time, d_end_time, d_start_time_utc, d_end_time_utc, d_start_time2_utc, d_end_time2_utc,
pv_id):
output_path = 'lab_result_cm.csv'
base_queries = [
f"""
WITH
t1 AS (SELECT DISTINCT
pat_base_id AS patient_id,
......@@ -49,7 +50,8 @@ def etl_lab_result_cm(pv_id,d_start_time,d_end_time,d_start_time_utc,d_end_time_
select DISTINCT * from iceberg.cdm.lab_report_result
where (test_item_name ~* 'C肽|C-PR' and test_item_name ~* '空腹|1|60|2|120|3|180')
or (test_item_name ~* '空腹|FPG|空腹血糖' and test_item_name ~* '血')
or (test_item_name ~* 'OGTT|耐量|负荷' and test_item_name ~* '2|120')
or (test_item_name ~* 'OGTT|耐量|负荷' and test_item_name ~* '2|120'
or (test_item_name ~* 'HbA1c|糖化血红蛋白')
)
select DISTINCT
b.patient_id,
......@@ -57,7 +59,6 @@ def etl_lab_result_cm(pv_id,d_start_time,d_end_time,d_start_time_utc,d_end_time_
b.patient_type,
a.report_name as lab_name,
c.test_item_name as lab_item_name,
null as std_lab_item_name,
-- a.specimen_name as specimen_source,
null as specimen_source,
null as lab_order_datetime,
......@@ -78,7 +79,7 @@ def etl_lab_result_cm(pv_id,d_start_time,d_end_time,d_start_time_utc,d_end_time_
join t7 c on c.result_source_id = a.report_source_id;
""",
f"""
f"""
WITH
t1 AS (SELECT DISTINCT
pat_base_id AS patient_id,
......@@ -124,7 +125,8 @@ def etl_lab_result_cm(pv_id,d_start_time,d_end_time,d_start_time_utc,d_end_time_
select DISTINCT * from iceberg.cdm.lab_report_result
where (test_item_name ~* 'C肽|C-PR' and test_item_name ~* '空腹|1|60|2|120|3|180')
or (test_item_name ~* '空腹|FPG|空腹血糖' and test_item_name ~* '血')
or (test_item_name ~* 'OGTT|耐量|负荷' and test_item_name ~* '2|120')
or (test_item_name ~* 'OGTT|耐量|负荷' and test_item_name ~* '2|120'
or (test_item_name ~* 'HbA1c|糖化血红蛋白')
)
select DISTINCT
b.patient_id,
......@@ -132,7 +134,6 @@ def etl_lab_result_cm(pv_id,d_start_time,d_end_time,d_start_time_utc,d_end_time_
b.patient_type,
a.report_name as lab_name,
c.test_item_name as lab_item_name,
null as std_lab_item_name,
a.specimen_name as specimen_source,
null as lab_order_datetime,
a.specimen_collected_time as specimen_datetime,
......@@ -151,7 +152,13 @@ def etl_lab_result_cm(pv_id,d_start_time,d_end_time,d_start_time_utc,d_end_time_
on a.visit_record_id = b.visit_id
join t7 c on c.result_source_id = a.report_source_id;
"""
]
]
# 判断 pv_id 是否为空
if not pv_id:
queries = [query.replace(f"WHERE organization_id = '{pv_id}'", "") for query in base_queries]
else:
queries = base_queries
# 调用函数
execute_queries_and_write_to_csv(queries, output_path)
# 调用函数
execute_queries_and_write_to_csv(queries, output_path)
from data_query import *
def etl_patient(pv_id,d_start_time,d_end_time,d_start_time_utc,d_end_time_utc,d_start_time2_utc,d_end_time2_utc):
output_path = r'./patient.csv'
queries = [
f"""
def etl_patient(d_start_time, d_end_time, d_start_time_utc, d_end_time_utc, d_start_time2_utc, d_end_time2_utc,
pv_id):
output_path = 'patient.csv'
base_queries = [
f"""
WITH
t1 AS (SELECT DISTINCT
pat_base_id AS patient_id,
......@@ -63,14 +64,14 @@ def etl_patient(pv_id,d_start_time,d_end_time,d_start_time_utc,d_end_time_utc,d_
and a.patient_id = b.pat_base_id;
""",
f"""
f"""
WITH
t1 AS (SELECT DISTINCT
pat_base_id AS patient_id,
visit_record_id AS visit_id,
organization_id AS provider_id
FROM iceberg.cdm.visit_record
WHERE organization_id = '{pv_id}'
WHERE organization_id = '{pv_id}'
),
t2 AS (SELECT DISTINCT
visit_record_id AS visit_id
......@@ -118,7 +119,11 @@ def etl_patient(pv_id,d_start_time,d_end_time,d_start_time_utc,d_end_time_utc,d_
on a.provider_id = b.organization_id
and a.patient_id = b.pat_base_id;
"""
]
# 调用函数
execute_queries_and_write_to_csv(queries, output_path)
]
# 判断 pv_id 是否为空
if not pv_id:
queries = [query.replace(f"WHERE organization_id = '{pv_id}'", "") for query in base_queries]
else:
queries = base_queries
# 调用函数
execute_queries_and_write_to_csv(queries, output_path)
from data_query import *
def etl_visit(pv_id,d_start_time,d_end_time,d_start_time_utc,d_end_time_utc,d_start_time2_utc,d_end_time2_utc):
output_path = r'./visit.csv'
queries = [
f"""
def etl_visit(d_start_time, d_end_time, d_start_time_utc, d_end_time_utc, d_start_time2_utc, d_end_time2_utc,
pv_id):
output_path = 'visit.csv'
base_queries = [
f"""
WITH
t1 AS (SELECT DISTINCT
pat_base_id AS patient_id,
......@@ -48,7 +49,7 @@ def etl_visit(pv_id,d_start_time,d_end_time,d_start_time_utc,d_end_time_utc,d_st
JOIN t3 b ON a.visit_id = b.visit_id)
select * from t6;
""",
f"""
f"""
WITH
t1 AS (SELECT DISTINCT
pat_base_id AS patient_id,
......@@ -93,7 +94,11 @@ def etl_visit(pv_id,d_start_time,d_end_time,d_start_time_utc,d_end_time_utc,d_st
select * from t6;
"""
]
# 调用函数
execute_queries_and_write_to_csv(queries, output_path)
]
# 判断 pv_id 是否为空
if not pv_id:
queries = [query.replace(f"WHERE organization_id = '{pv_id}'", "") for query in base_queries]
else:
queries = base_queries
# 调用函数
execute_queries_and_write_to_csv(queries, output_path)
......@@ -3,38 +3,21 @@ from datetime import datetime, timedelta
from flightsql import FlightSQLClient
import pandas as pd
import os
from logging_config import setup_logging
class BeijingTimeFormatter(logging.Formatter):
"""自定义日志格式器,将日志时间戳调整为北京时间(UTC+8)。"""
def formatTime(self, record, datefmt=None):
bj_time = datetime.fromtimestamp(record.created) + timedelta(hours=8)
return bj_time.strftime('%Y-%m-%d %H:%M:%S')
# 创建日志处理器
file_handler = logging.FileHandler('etl_run.log')
stream_handler = logging.StreamHandler()
# 设置自定义格式器
formatter = BeijingTimeFormatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
stream_handler.setFormatter(formatter)
# 使用自定义格式器配置日志
logging.basicConfig(
level=logging.INFO,
handlers=[file_handler, stream_handler]
)
# 调用 setup_logging 函数进行日志配置
setup_logging()
def execute_query(sql_query):
"""
执行SQL查询并返回结果为pandas DataFrame。
:param sql_query: str, SQL查询语句
:return: 包含查询结果的pandas DataFrame
"""
try:
# 创建FlightSQLClient实例
client = FlightSQLClient(host='192.168.101.45', port=50802,
client = FlightSQLClient(host='192.168.101.45', port=50802,
insecure=True, disable_server_verification=True, token=True)
# 执行SQL查询并获取结果信息
......@@ -56,30 +39,36 @@ def execute_query(sql_query):
except Exception as e:
logging.error(f"发生错误: {e}")
print(f"发生错误: {e}")
return None
def execute_queries_and_write_to_csv(queries, output_path):
def execute_queries_and_write_to_csv(queries, output_paths):
"""
执行多个查询并将结果追加到CSV文件。
:param queries: SQL查询语句列表
:param output_path: 输出CSV文件路径
:param output_paths: 输出CSV文件路径
"""
base_paths = os.path.dirname(os.path.abspath(__file__))
output_path = os.path.join(base_paths, '..', '..', '..', 'data', 'rawdata', output_paths)
for i, query in enumerate(queries):
df = execute_query(query)
# 根据索引确定查询类型
query_type = "门诊" if i == 0 else "住院"
# 检查DataFrame是否为空
if df is None or df.empty:
log_message = f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - {output_path} {query_type} 查询无结果,跳过写入。"
logging.info(log_message)
print(log_message)
continue # 跳过此次迭代
# 记录行数并写入日志
log_message = f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - {output_path} {query_type} 查询行数: {len(df)}"
logging.info(log_message)
print(log_message)
# 追加到CSV文件,检查文件是否存在以决定是否写入表头
if not os.path.exists(output_path):
......@@ -90,9 +79,11 @@ def execute_queries_and_write_to_csv(queries, output_path):
# 记录成功写入
log_message = f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 成功将{query_type}查询结果写入{output_path}, 行数: {len(df)}"
logging.info(log_message)
print(log_message)
# 示例用法
if __name__ == "__main__":
queries = ["SELECT * FROM iceberg.cdm.outpatient_record LIMIT 10;"]
output_path = "./patient.csv"
execute_queries_and_write_to_csv(queries, output_path)
output_path = "patient.csv"
execute_queries_and_write_to_csv(queries, output_path)
\ No newline at end of file
import pandas as pd
import os
import logging
from logging_config import setup_logging
setup_logging()
logger = logging.getLogger(__name__)
def deduplicate_csv_files():
# 获取当前文件的绝对路径
base_path = os.path.dirname(os.path.abspath(__file__))
# 定义数据所在目录
data_dir = os.path.join(base_path, '..', '..', '..', 'data', 'rawdata')
try:
# 获取指定目录下所有的.csv文件
csv_files = [os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.endswith('.csv')]
logger.info(f"在 {data_dir} 目录下找到 {len(csv_files)} 个 CSV 文件。")
for file in csv_files:
try:
# 读取CSV文件
df = pd.read_csv(file, low_memory=False)
# 去重
df_deduplicated = df.drop_duplicates()
logger.info(f"原始数据行数({file}): {len(df)}。去重后数据行数({file}): {len(df_deduplicated)}")
# 覆盖原始文件
df_deduplicated.to_csv(file, encoding='utf-8-sig', index=False)
logger.info(f"去重后的数据已覆盖原文件:{file}")
except Exception as e:
logger.error(f"处理文件 {file} 时发生错误: {e}", exc_info=True)
except Exception as e:
logger.error(f"在获取 CSV 文件列表时发生错误: {e}", exc_info=True)
\ No newline at end of file
import logging
from datetime import datetime, timedelta
import os
class BeijingTimeFormatter(logging.Formatter):
"""自定义日志格式器,将日志时间戳调整为北京时间(UTC+8)。"""
def formatTime(self, record, datefmt=None):
bj_time = datetime.fromtimestamp(record.created) + timedelta(hours=8)
return bj_time.strftime('%Y-%m-%d %H:%M:%S')
def setup_logging():
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO) # 设置日志级别
# 获取当前文件的绝对路径
base_path = os.path.dirname(os.path.abspath(__file__))
# 创建日志处理器
log_path = os.path.join(base_path, '..', '..', '..', 'logs', 'etl', 'etl_run.log')
file_handler = logging.FileHandler(log_path)
stream_handler = logging.StreamHandler()
# 创建格式化器
formatter = BeijingTimeFormatter('%(asctime)s - %(levelname)s - %(message)s')
# 为处理器设置格式化器
file_handler.setFormatter(formatter)
stream_handler.setFormatter(formatter)
# 避免重复添加处理器
if not any(isinstance(handler, logging.FileHandler) for handler in root_logger.handlers):
root_logger.addHandler(file_handler)
if not any(isinstance(handler, logging.StreamHandler) for handler in root_logger.handlers):
root_logger.addHandler(stream_handler)
\ No newline at end of file
import pandas as pd
import logging
from logging_config import setup_logging
import os
setup_logging()
logger = logging.getLogger(__name__)
def create_and_save_csv():
# 创建模拟数据的 DataFrame
data = {
'Name': ['Alice', 'Alice', 'Alice', 'Bob', 'Charlie'],
'Age': [25, 25, 25, 30, 35],
'City': ['New York', 'New York', 'New York', 'Los Angeles', 'Chicago']
}
df = pd.DataFrame(data)
# 获取当前文件的绝对路径
base_path = os.path.dirname(os.path.abspath(__file__))
# 指定 CSV 文件保存路径
csv_path = os.path.join(base_path, '..', '..', '..', 'data', 'rawdata', 'output2.csv')
print(csv_path)
df.to_csv(csv_path, index=False)
logger.info("执行成功")
\ No newline at end of file
import pandas as pd
# 读取CSV文件
file_path = 'lab_result_cm.csv' # 替换为你的CSV文件路径
df = pd.read_csv(file_path)
# 删除std_lab_item_name列
# axis=1指定我们要删除的是列,而不是行
# inplace=True表示直接在原DataFrame上进行修改,不创建新的DataFrame
df.drop('std_lab_item_name', axis=1, inplace=True)
# 将修改后的数据保存回原始CSV文件
# index=False表示在保存时不包括行索引,因为行索引在大多数情况下是不需要的
df.to_csv(file_path, index=False)
from data_query import *
queries = """
CREATE VIEW palan_view_procedure1 as
select DISTINCT b.visit_record_id as visit_id,a.operation_time from(
SELECT
case_base_id,
procedure_name,
operation_time
FROM iceberg.cdm.case_operation
where procedure_name ~* '鼻息肉摘除|鼻内窥镜|鼻窦手' -- 根据真实数据调整 这个条件要放宽看下有那些关于鼻子的手术名称
and operation_time >= TIMESTAMP '2019-01-01 00:00:00+00:00'
and operation_time < TIMESTAMP '2021-12-31 00:00:00+00:00') a
join
(select visit_record_id,case_base_id from iceberg.cdm.case_base
where discharge_time >= TIMESTAMP '2018-11-01 00:00:00+00:00'
AND discharge_time < TIMESTAMP '2022-02-28 00:00:00+00:00') b
on a.case_base_id = b.case_base_id;
[sql:query]
-- 取出就诊对应的患者id和最早一次(2019-2021)手术时间。
CREATE VIEW palan_view_procedure2 as
SELECT DISTINCT
a.pat_base_id AS patient_id,
min(b.operation_time) as min_operation_time
FROM iceberg.cdm.visit_record a join palan_view_procedure1 b on a.visit_record_id = b.visit_id
group by a.pat_base_id;
[sql:query]
-- 关联患者表限制手术时年龄
CREATE VIEW palan_view_procedure3 as
select distinct a.pat_base_id AS patient_id,b.min_operation_time from
iceberg.cdm.patient_base_info a join palan_view_procedure2 b on a.pat_base_id = b.patient_id
WHERE (DATEDIFF(day, a.date_of_birth, b.min_operation_time) / 365.25) >= 12; -- 根据情况调整
select* from palan_view_procedure3;
"""
# 调用函数
a = execute_query(queries)
print(a)
# Global options
options(scipen = 1000)
options(scipen = 1, digits = 2)
options(encoding = 'UTF-8')
library(tidyverse)
library(here)
library(rio)
library(readxl)
library(janitor)
library(gtsummary)
library(survival)
library(officer)
library(officedown)
library(flextable)
library(zoo)
# library(eoffice)
library(tableone)
library(plotly)
library(reticulate)
calc_age <- function(birthDate, refDate = Sys.Date(), unit = "year") {
require(lubridate)
if (grepl(x = unit, pattern = "year")) {
as.period(interval(birthDate, refDate), unit = 'year')$year
} else if (grepl(x = unit, pattern = "month")) {
as.period(interval(birthDate, refDate), unit = 'month')$month
} else if (grepl(x = unit, pattern = "week")) {
floor(as.period(interval(birthDate, refDate), unit = 'day')$day / 7)
} else if (grepl(x = unit, pattern = "day")) {
as.period(interval(birthDate, refDate), unit = 'day')$day
} else {
print("Argument 'unit' must be one of 'year', 'month', 'week', or 'day'")
NA
}
}
center_par <- fp_par(text.align = "center",padding = 10)
bold_face <- shortcuts$fp_bold(font.size = 20)
toc <- fpar(
ftext("目 录", prop = bold_face ),
fp_p = center_par)
LANDSCAPE_STOP <-
block_section(
prop_section(
page_size = page_size(orient = "landscape"),
type = "continuous"
)
)
theme_gtsummary_language(
language = "zh-cn",
decimal.mark = NULL,
set_theme = TRUE
)
bold_face1 <- fp_text(font.size = 25, bold = TRUE,
font.family = "Times New Roma")
bold_face2 <- fp_text(font.size = 20, font.family = "宋体")
差异被折叠。
差异被折叠。
# Global options
options(scipen = 1000)
options(scipen = 1, digits = 2)
options(encoding = 'UTF-8')
library(here)
source(here("codes","preprocess","R","common.R"))
source(here("codes","preprocess","R","wrangling.R"))
patient_clean <- arrow::read_parquet("../../../data/preprocessed/patient_clean.parquet")
visit_clean <- arrow::read_parquet("../../../data/preprocessed/visit_clean.parquet")
diag_clean <- arrow::read_parquet("../../../data/preprocessed/diag_clean.parquet")
rx_clean <- arrow::read_parquet("../../../data/preprocessed/rx_clean.parquet")
lab_clean <- arrow::read_parquet("../../../data/preprocessed/lab_clean.parquet")
vital_clean <- arrow::read_parquet("../../../data/preprocessed/vital_clean.parquet")
# 数据时间范围:2021-01-01 ~ 2024-12-31
# Index date : 初诊"2型糖尿病": 2022-01-01 ~ 2024-12-31
# Baseline : Index date - 3 * 30.5
# Followup : 每季度
# Subgroup :
# (初诊) 为空腹血糖>7.0 mmol/L和或糖化血红蛋白>7%并排除既往诊断糖尿病及服用降糖药物者
# (复诊) 空腹血糖>7.0 mmol/L和或糖化血红蛋白>7%,且既往诊断糖尿病及服用降糖药物者
# uncontrolled(lab) :
# 复诊口服失控:用药后2个月内HbA1c >=7% (如有多个结果以最后一个为准)
# 复诊注射失控:用药后2个月内HbA1c >=7%
# 换用方案: 失控后第一次就诊是什么方案
# 停药:用药间隔超过90天视为停药
# 重启:停药后再用药
# 入组日
diag_index <- diag_clean %>%
# filter(raw_dx_desc == "2型糖尿病") %>%
filter(T2DM == 1) %>%
filter(diagnosis_datetime >= "2018-01-01" & diagnosis_datetime <= "2022-12-31") %>%
# filter(diagnosis_datetime >= "2022-01-01" & diagnosis_datetime <= "2024-12-31") %>%
group_by(patient_id) %>%
arrange(diagnosis_datetime) %>%
slice_head(n = 1) %>%
ungroup() %>%
select(patient_id, index_visit_id = visit_id, index_date = diagnosis_datetime)
# left_join(diag_subgroup) %>%
# mutate(index_ym = format(as.Date(diagnosis_datetime), "%Y-%m")) %>%
# 年龄和性别变量缺失
demo_anti <- diag_index %>%
left_join(patient_clean) %>%
drop_na(sex, birth_date) %>%
filter(grepl("男|女", sex)) %>%
select(patient_id) %>%
distinct()
# 妊娠糖尿病和1型糖尿病
diag_anti <- diag_index %>%
left_join(diag_clean) %>%
filter(std_dx_desc == "1型糖尿病"|std_dx_desc == "妊娠糖尿病") %>%
select(patient_id) %>%
distinct()
# 既往糖尿病
t2dm_past <- diag_index %>%
left_join(diag_clean) %>%
# filter(raw_dx_desc == "2型糖尿病" & diagnosis_datetime < index_date) %>%
filter(T2DM == 1 & diagnosis_datetime < index_date) %>%
mutate(past_t2dm = 1) %>%
select(patient_id, past_t2dm) %>%
distinct()
# 既往降糖药
rx_past <- diag_index %>%
left_join(rx_clean) %>%
drop_na(rx_desc) %>%
filter(order_datetime < index_date) %>%
mutate(past_rx = 1) %>%
select(patient_id, past_rx) %>%
distinct()
# 识别期内空腹血糖 > 7.0 mmol/L和或糖化血红蛋白 > 7%
# a1c_fpg_bl <- diag_index %>%
# left_join(lab_clean) %>%
# # filter(result_datetime >= index_date - 90 & result_datetime <= index_date) %>%
# # group_by(patient_id, lab_item_name) %>%
# # arrange(result_datetime) %>%
# # slice_tail(n = 1) %>%
# # ungroup()
# mutate(lab_if = case_when(lab_item_name == "HbA1c" & lab_result > 7.0 ~ 1,
# lab_item_name == "FPG" & lab_result > 7.0 ~ 1)) %>%
# filter(lab_if == 1) %>%
# select(patient_id) %>%
# distinct()
# 初诊 & 非初诊
# diag_subgroup <- diag_index %>%
# left_join(diag_clean) %>%
# filter(raw_dx_desc == "2型糖尿病") %>%
# group_by(patient_id) %>%
# arrange(diagnosis_datetime) %>%
# slice_head(n = 1) %>%
# ungroup() %>%
# mutate(subgroup = case_when(diagnosis_datetime >= "2018-01-01" & diagnosis_datetime <= "2020-12-31" ~ "Drug Naive", #identication period
# diagnosis_datetime < "2018-01-01" ~ "Revisit")) %>% #identication period
# mutate(subgroup = case_when(diagnosis_datetime >= "2022-01-01" & diagnosis_datetime <= "2024-12-31" ~ "Drug Naive", # identication period
# diagnosis_datetime < "2022-01-01" ~ "Revisit")) %>% # identication period
# select(patient_id, subgroup) %>%
# drop_na()
diag_subgroup <- diag_index %>%
# inner_join(a1c_fpg_bl) %>%
left_join(t2dm_past) %>%
left_join(rx_past) %>%
mutate(subgroup = case_when(is.na(past_t2dm) & is.na(past_rx) ~ "Drug Naive", #identication period
past_t2dm == 1|past_rx == 1 ~ "Revisit")) %>% #identication period
select(patient_id, subgroup) %>%
drop_na()
# 纳排
target_group <- diag_index %>%
inner_join(demo_anti) %>%
anti_join(diag_anti) %>%
inner_join(diag_subgroup) %>%
select(patient_id, index_visit_id, index_date, subgroup) %>%
mutate(index_ym = format(as.Date(index_date), "%Y-%m"))
write_csv(target_group, "../../../data/preprocessed/target_group.csv")
差异被折叠。
This source diff could not be displayed because it is too large. You can view the blob instead.
差异被折叠。
差异被折叠。
差异被折叠。
差异被折叠。
差异被折叠。
差异被折叠。
差异被折叠。
差异被折叠。
差异被折叠。
差异被折叠。
差异被折叠。
差异被折叠。
差异被折叠。
差异被折叠。
差异被折叠。
差异被折叠。
差异被折叠。
差异被折叠。
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论