Python 算法交易实验73 QTV200第二步: 数据清洗并写入ClickHouse

说明

先检查一下昨天启动的worker是否正常工作,然后做一些简单的清洗,存入clickhouse。

内容

1 检查数据

from Basefuncs import * 
# 将一般字符串转为UCS 名称
def dt_str2ucs_blockname(some_dt_str):
    some_dt_str1   =some_dt_str.replace('-','.').replace(' ','.').replace(':','.')
    return '.'.join(some_dt_str1.split('.')[:4])
'''
dt_str2ucs_blockname('2024-06-24 09:30:00')
'2024.06.24.09'
'''
# 测试队列声明
qm = QManager(redis_agent_host = 'http://192.168.0.4:xx/',redis_connection_hash = None,q_max_len= 1000000, batch_size=10000)
qm.info()
target_stream_name = 'xxx'
qm.stream_len(target_stream_name)
2804

获取数据(使用单worker,模式比较简单且性能足够)

data = qm.xrange(target_stream_name)['data']
data_df = pd.DataFrame(data)
keep_cols = ['rec_id', 'data_dt','open', 'close','high','low','vol', 'amt', 'data_source','code','market']
data_df1 = data_df[keep_cols].dropna().drop_duplicates(['rec_id'])

# 第一次操作,把之前无关的数据删掉
data_df1 = data_df1[data_df1['data_dt'] >='2024-06-24 00:00:00']

在这里插入图片描述
向clickhouse发起query,请求每个etf的最大时间,之后要使得新增的数据大于这个时间,另外目标表的字段形如
在这里插入图片描述
这是之前做的设计,因为隔的时间有点久都有点忘了。不过这个设计是合理的,后面会看到。

要做的转换也很简单:

  • 1 将时间字符转为时间戳
  • 2 从日期中分解出shard、part、block和brick

转换段

import time

data_df1['ts'] = data_df1['data_dt'].apply(inverse_time_str).apply(int)

data_df1['brick'] = data_df1['data_dt'].apply(dt_str2ucs_blockname)
data_df1['block'] =data_df1['brick'].apply(lambda x: x[:x.rfind('.')])
data_df1['part'] =data_df1['block'].apply(lambda x: x[:x.rfind('.')])
data_df1['shard'] =data_df1['part'].apply(lambda x: x[:x.rfind('.')])

data_df1['pid'] = data_df1['code'].apply(str) + '_' + data_df1['ts'].apply(str)

keep_cols1 = ['data_dt','open','close','high','low', 'vol','amt', 'brick','block','part', 'shard', 'code','ts', 'pid']
data_df2 =data_df1[keep_cols1]

在这里插入图片描述

今天就到这里吧,明晚接着写。

Go on …

昨天疏忽了,数据不应该直接存库,而是应该整理好之后送到队列。然后由默认的worker将数据搬到clickhouse.

2 存数规则

第二步的输入队列BUFF.xxxstream_in,输出队列BUFF.xxx.stream_out
第一次需要确保对应数据表的存在。clickhouse对数值的要求比较严格,为了避免麻烦,统一设置成Float32。(这样可以用统一的同步worker)。另外clickhouse不支持删除数据,这点倒是比较特别。
在这里插入图片描述
但可以支持全部删除数据(保留数据结构) TRUNCATE table market_data_v2

create_table_sql = '''
CREATE TABLE market_data_v2
(
    data_dt String,
    open Float32,
    close Float32,
    high Float32,
    low Float32,
    vol Float32,
    amt Float32,
    brick String,
    block String,
    part String,
    shard String,
    code String,
    ts Float32,
    pid String
)
ENGINE = MergeTree
ORDER BY (ts )
'''

click_para = gb.getx('sp_global.buffer.lan.xxx.xxx.para')
chc = CHClient(**click_para)
chc._exe_sql(create_table_sql)
chc._exe_sql('show tables')
[('market_data',), ('market_data_v2',)]

etl_worker.py

# 0 记录日志
import logging
from logging.handlers import RotatingFileHandler

logger = logging.getLogger('MyLogger')
handler = RotatingFileHandler('/var/log/workers.log', maxBytes=1024*1024*100, backupCount=5)
logger.addHandler(handler)
logger.setLevel(logging.INFO)

# ---------------------------------------- 设置日志

from Basefuncs import * 
def tuple_list2dict(tuple_list):
    """
    将包含三个元素的tuple列表转换为字典。
    
    参数:
    tuple_list (List[Tuple[K, V1, V2]]): 包含键和两个值的tuple的列表。
    
    返回:
    Dict[K, Tuple[V1, V2]]: 转换后的字典,其中值是包含两个元素的tuple。
    """
    return {key:value1 for key, value1 in tuple_list}

# 将一般字符串转为UCS 名称
def dt_str2ucs_blockname(some_dt_str):
    some_dt_str1   =some_dt_str.replace('-','.').replace(' ','.').replace(':','.')
    return '.'.join(some_dt_str1.split('.')[:4])
'''
dt_str2ucs_blockname('2024-06-24 09:30:00')
'2024.06.24.09'
'''
# ---------------------------------------- 基本函数

# 测试队列声明
qm = QManager(redis_agent_host = 'http://192.168.0.4:xx/',redis_connection_hash = None,q_max_len= 1000000, batch_size=10000)
qm.info()
source_stream_name ='stream_in'
target_stream_name ='stream_out'
source_stream_len =  qm.stream_len(source_stream_name)
target_stream_len = qm.stream_len(target_stream_name)
print('source',source_stream_len)
print('target', target_stream_len)
# qm.ensure_group(target_stream_name)
cur_dt_str = get_time_str1()
if source_stream_len:
    is_source_recs = True
else:
    is_source_recs = False
    logger.info('%s %s source No Recs' %(cur_dt_str,'etl_worker'))
# 获取数据(使用单worker,模式比较简单且性能足够)

# ---------------------------------------- 队列取数,有数据才执行下面
if is_source_recs:

	
	# ---------------------------------------- 取数,取出消息列表和需要的列
    # worker 30 秒启动一次
    data = qm.xrange(source_stream_name)['data']

    data_df = pd.DataFrame(data)
    msg_id_list = list(data_df['_msg_id'])
    keep_cols = ['rec_id', 'data_dt','open', 'close','high','low','vol', 'amt', 'data_source','code','market']
    data_df1 = data_df[keep_cols].dropna().drop_duplicates(['rec_id'])

    # 第一次操作,把之前无关的数据删掉
    # data_df1 = data_df1[data_df1['data_dt'] >='2024-06-24 00:00:00']

    import time

    data_df1['ts'] = data_df1['data_dt'].apply(inverse_time_str).apply(int)

    data_df1['brick'] = data_df1['data_dt'].apply(dt_str2ucs_blockname)
    data_df1['block'] =data_df1['brick'].apply(lambda x: x[:x.rfind('.')])
    data_df1['part'] =data_df1['block'].apply(lambda x: x[:x.rfind('.')])
    data_df1['shard'] =data_df1['part'].apply(lambda x: x[:x.rfind('.')])

    data_df1['pid'] = data_df1['code'].apply(str) + '_' + data_df1['ts'].apply(str)

    keep_cols1 = ['data_dt','open','close','high','low', 'vol','amt', 'brick','block','part', 'shard', 'code','ts', 'pid']
    data_df2 =data_df1[keep_cols1]
	
	# ------------------------------------- 获取当前数据库已有的数据

    # 获取各code最大值
    click_para = {'database': 'xx',
        'host': '192.168.0.4',
        'name': 'xx',
        'password': 'xx',
        'port': xxx,
        'user': 'xx'}
    chc = CHClient(**click_para)

    '''
    这个 SQL 语句的作用是按照 `code` 分组,并为每个 `code` 找到对应的最新日期(`data_dt`),这个最新日期是基于 `ts` 字段的最大值来确定的。`argMax` 函数在这里用于找到每个分组中 `ts` 值最大时对应的 `data_dt` 值。

    具体来说,`argMax(data_dt, ts)` 会返回每个 `code` 分组中使得 `ts` 达到最大值的 `data_dt` 值。这意味着对于每个 `code`,查询会找到 `ts` 字段的最大值,并返回对应的 `data_dt` 值,即每个 `code` 的最新数据日期。

    最终,这个查询会返回一个结果集,其中包含每个 `code` 以及对应的最新数据日期(`last_data_dt`)。这对于分析每个代码的最新市场数据非常有用。
    '''

    latest_sql = '''
    SELECT
        code,
        argMax(data_dt, ts) AS last_data_dt
    FROM
        market_data_v2
    GROUP BY
        code
    '''
    # 更新时
    latest_date_tuple_list = chc._exe_sql(latest_sql)
    latest_date_dict = tuple_list2dict(latest_date_tuple_list)
	# ------------------------------------- 使用时间进行过滤
    # 筛选新数据
    data_df2['existed_dt'] = data_df2['code'].map(latest_date_dict).fillna('')

    output_sel = data_df2['data_dt'] > data_df2['existed_dt']
    output_df = data_df2[output_sel][keep_cols1]
    output_data_listofdict = output_df.to_dict(orient='records')
    output_data_listofdict2 = slice_list_by_batch2(output_data_listofdict, qm.batch_size)
    for some_data_listofdict in output_data_listofdict2:
        qm.parrallel_write_msg(target_stream_name, some_data_listofdict)

    del_msg = qm.xdel(source_stream_name, msg_id_list)
    logger.info('%s %s del source %s Recs' %(cur_dt_str,'etl_worker',del_msg['data'] ))

将该脚本发布为任务,30秒执行一次同步。

exe_qtv200_etl_worker.sh

#!/bin/bash

# 记录
# sh /home/test_exe.sh com_info_change_pattern running

# 有些情况需要把source替换为 .
# . /root/anaconda3/etc/profile.d/conda.sh
# 激活 base 环境(或你创建的特定环境)
source /root/miniconda3/etc/profile.d/conda.sh

#conda init
conda activate base

cd  /home/workers && python3 etl_worker.py

存数成功,后续就自动运行了。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/751010.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【日记】软考居然一次过了(620 字)

正文 早上空闲的时候,上 QQ 看了一下,许久不见动静的系统架构设计师群有人说出分了。我想高级都出分了,中级应该也出来了,于是用手机查了一下。看到分数几乎快要泪从中来。为什么软考能一次过,银行从业资格证考了两三…

放烟花短视频素材去哪里找?去哪里下载?烟花素材网分享

在当代社会,短视频凭借其独有的魅力成为大众传递情感、记录生活、分享快乐的新兴方式。特别是在庆祝节日和特殊时刻时,烟花的绚丽效果常常被用来吸引观众的目光,成为视频作品中的亮点。然而,对于短视频制作者来说,寻找…

【SCAU操作系统】期末复习简答及计算题例题解析

一、写出下列英文缩写词在计算机系统中的英文或中文全名。 OS: Operating System 操作系统PSW: Program Status Word 程序状态字FCFS: First Come First Serve 先来先服务PCB: Process Control Block 进程控制块DMA: Direct Memory Access 直接存储器存取MMU: Memory Manageme…

Does a vector database maintain pre-vector chunked data for RAG systems?

题意:一个向量数据库是否为RAG系统维护预向量化分块数据? 问题背景: I believe that when using an LLM with a Retrieval-Augmented Generation (RAG) approach, the results retrieved from a vector search must ultimately be presented…

从源码到上线:直播带货系统与短视频商城APP开发全流程

很多人问小编,一个完整的直播带货系统和短视频商城APP是如何从源码开发到最终上线的呢?今天,笔者将详细介绍这一全过程。 一、需求分析与规划 1.市场调研与需求分析:首先需要进行市场调研,了解当前市场的需求和竞争情…

Flutter学习:从搭建环境到运行

一、开发环境的搭建 本文所示内容都是在Windows系统下进行的。 1、下载 Flutter SDK Flutter 官网(https://docs.flutter.cn/release/archive?tabwindows) 或者通过 git clone -b master https://github.com/flutter/flutter.git 下载 2、配置环境…

VMware 最新的安全漏洞公告VMSA-2024-0013

#深度好文计划# 一、摘要 2024年6月26日,VMware 发布了最新的安全漏洞公告 VMSA-2024-0013,修复了 VMware ESXi 和 VMware vCenter 中的多个安全漏洞。 VMSA-2024-0013:VMware ESXi 和 vCenter Server 更新修正了多个安全性漏洞 &#xff…

datax入门(datax的安装与简单使用)——01

datax入门(datax的安装与简单使用)——01 1. 官网2. 工具部署(通过下载DataX工具包)2.1 下载、解压2.2 配置2.2.1 查看配置模版2.2.2 根据模版配置json2.2.3 启动DataX 3. datax的简单使用3.1 mysql2stream3.2 mysql2mysql3.2.1 拼…

评估大型语言模型生成文章的能力

1. AI解读 1.1. 总体概要 本文探讨了大型语言模型(LLMs)如GPT-4在生成特定领域(如计算机科学中的自然语言处理NLP)教育调查文章方面的能力和局限性。研究发现,尽管GPT-4能够根据特定指导生成高质量的调查文章&#x…

使用jupyter打开本地ipynb文件的方法

常用方法: 先启动jupyter,然后在打开的页面点击upload,选择想要打开的文件上传然后打开,但是这样其实是先复制了一份到jupyter中,然后打开运行。而我不想复制。 方法二 先打开项目文件所在文件夹,文件夹…

背靠广汽、小马智行,如祺出行打得过滴滴和百度吗?

©自象限原创 作者丨艾AA 编辑丨薛黎 北京时间6月14日凌晨,在特斯拉股东大会上,马斯克阐述了对Robotaxi(自动驾驶出租车)商业模式的构想——特斯拉不仅会运营自己的无人驾驶出租车车队,还可以让特斯拉车主们的爱…

Flutter学习目录

学习Dart语言 官网:https://dart.cn/ 快速入门:Dart 语言开发文档(dart.cn/guides) 学习Flutter Flutter生命周期 点击跳转Flutter更换主题 点击跳转StatelessWidget和StatefulWidget的区别 点击跳转学习Flutter中新的Navigato…

一文入门CMake

我们前几篇文章已经入门了gcc和Makefile,现在可以来玩玩CMake了。 CMake和Makefile是差不多的,基本上是可以相互替换使用的。CMAke可以生成Makefile,所以本质上我们还是用的Makefile,只不过用了CMake就不用再写Makefile了&#x…

Spring底层原理之bean的加载方式四 @import 注解

bean的加载方式四 import 第四种bean的导入方式 是import导入的方式 在配置类上面加上注解就行 package com.bigdata1421.config;import com.bigdata1421.bean.Dog; import org.springframework.context.annotation.Import;Import(Dog.class) public class SpringConfig4 {…

Linux高级编程——进程

1.进程的含义? 进程是一个程序执行的过程,会去分配内存资源,cpu的调度 PID, 进程标识符 当前工作路径 chdir umask 0002 进程打开的文件列表 文件IO中有提到 (类似于标准输入 标准输出的编号,系统给0,1&#xf…

点云处理实战 点云平面拟合

目录 一、什么是平拟合 二、拟合步骤 三、数学原理 1、平面拟合 2、PCA过程 四、代码 一、什么是平拟合 平面拟合是指在三维空间中找到一个平面,使其尽可能接近给定的点云。最小二乘法是一种常用的拟合方法,通过最小化误差平方和来找到最优的拟合平面。 二、拟合步骤…

1.1章节print输出函数语法八种 使用和示例

1.打印变量和字符串 2-4.三种使用字符串格式化 5.输出ASCLL码的值和中文字符 6.打印到文件或其他对象(而不是控制台) 7.自定义分隔符、和换行符和结束符 8.连接符加号连接字符串 在Python中,print() 函数用于在控制台上输出信息。这是一个非常…

设置日历程序

目录 一 设计原型 二 后台源码 一 设计原型 二 后台源码 namespace 设置日历 {public partial class Form1 : Form{public Form1(){InitializeComponent();}private void dateTimePicker1_ValueChanged(object sender, EventArgs e){richTextBox1.Text dateTimePicker1.T…

React@16.x(42)路由v5.x(7)常见应用场景(4)- 路由切换动画

目录 1,实现路由切换基础样式 2,使用 CSSTransition 添加动画1,自定义动画组件 *TransitionRoute.jsx*2,*App.jsx*3,样式改动 3,注意点 通过一个例子来说明如何实现。 1,实现路由切换 基础样式…