dataFeed

对应demo地址datakit

datakit提供了录制实时行情的功能,但如果需要从其他数据源中获取历史行情,则需要借助dataFeed的帮助。

dataFeed代码中提供了一个基类Ifeed,Ifeed中实现了代码转换、目录结构处理等功能,但没有实现get_tick与get_bar方法,这个是需要我们自己定义的子类实现的。

import rqdatac as rq
from wtpy.WtCoreDefs import WTSBarStruct, WTSTickStruct
from wtpy.wrapper import WtDataHelper
import pandas as pd
from tqdm import tqdm
import os
class Ifeed(object):
    def __init__(self):
        self.dthelper = WtDataHelper()
        self.period_map = {"m1":"min1","m5":"min5","d":"day","tick":"ticks"}
        self.frequency_map = {
            "m1":"1m",
            "m5":"5m",
            "d":"1d",
        }
    
    def get_tick(self,symbol,start_date=None,end_date=None):
        return
    
    def get_bar(self,symbol,frequency,start_date=None,end_date=None):
        return
    
    def parse_code(self,code):
        items = code.split(".")
        return items[0],items[1],items[2]

    def code_std(self,stdCode:str):
        stdCode = stdCode.upper()
        items = stdCode.split(".")
        exchg = self.exchgStdToRQ(items[0])
        if len(items) == 2:
            # 简单股票代码,格式如SSE.600000
            return items[1] + "." + exchg
        elif items[1] in ["IDX","ETF","STK","OPT"]:
            # 标准股票代码,格式如SSE.IDX.000001
            return items[2] + "." + exchg
        elif len(items) == 3:
            # 标准期货代码,格式如CFFEX.IF.2103
            if items[2] != 'HOT':
                return ''.join(items[1:])
            else:
                return items[1] + "88"
            
    def cover_d_bar(self,df):
        count = len(df)
        BUFFER = WTSBarStruct * count
        buffer = BUFFER()
        for index, row in tqdm(df.iterrows()):
            curBar = buffer[index]
            curBar.date = int(row["date"])
            curBar.open = float(row["open"])
            curBar.high = float(row["high"])
            curBar.low = float(row["low"])
            curBar.close = float(row["close"])
            curBar.vol = float(row["vol"])
            curBar.money = float(row["money"])
            curBar.hold = float(row["hold"])
        return buffer
    
    def cover_m_bar(self,df):
        count = len(df)
        BUFFER = WTSBarStruct * count
        buffer = BUFFER()
        for index, row in tqdm(df.iterrows()):
            curBar = buffer[index]
            curBar.time = int((int(row["date"])-19900000)*10000 + int(row["time"]))
            curBar.open = float(row["open"])
            curBar.high = float(row["high"])
            curBar.low = float(row["low"])
            curBar.close = float(row["close"])
            curBar.vol = float(row["vol"])
            curBar.money = float(row["money"])
            curBar.hold = float(row["hold"])
        return buffer
        
    def cover_tick(self,df):
        count = len(df)
        BUFFER = WTSTickStruct * count
        buffer = BUFFER()
        for index, row in tqdm(df.iterrows()):
            curTick = buffer[index]
            curTick.exchg = bytes(row["exchg"],'utf-8')
            curTick.code = bytes(row["code"],'utf-8')
            curTick.price = float(row["price"])
            curTick.open = float(row["open"])
            curTick.high = float(row["high"])
            curTick.low = float(row["low"])
            curTick.settle_price = float(row["settle_price"])
            curTick.total_volume = float(row["total_volume"])
            curTick.volume = float(row["volume"])
            curTick.total_turnover = float(row["total_turnover"])
            curTick.turn_over = float(row["turn_over"])
            curTick.open_interest = float(row["open_interest"])
            curTick.diff_interest = float(row["diff_interest"])
            curTick.trading_date = int(row["trading_date"])
            curTick.action_date = int(row["action_date"])
            curTick.action_time = int(int(row["action_time"]) / 1000)
            curTick.pre_close = float(row["pre_close"])
            curTick.pre_settle = float(row["pre_settle"])
            curTick.pre_interest = float(0.0)
            for x in range(0,5):
                setattr(curTick,f"bid_price_{x}",float(row["bid_" + str(x+1)]))
                setattr(curTick,f"bid_qty_{x}",float(row["bid_qty_" + str(x+1)]))
                setattr(curTick,f"ask_prices_{x}",float(row["ask_" + str(x+1)]))
                setattr(curTick,f"ask_qty_{x}",float(row["ask_qty_" + str(x+1)]))
        return buffer
        
    def bar_df_to_dsb(self,df,dsb_file,period):
        if "d" in period:
            buffer = self.cover_d_bar(df)
        elif "m" in period:
            buffer = self.cover_m_bar(df)      
        self.dthelper.store_bars(barFile=dsb_file,firstBar=buffer,count=len(buffer),period=period)

    def tick_df_to_dsb(self,df,dsb_file):
        buffer = self.cover_tick(df)
        self.dthelper.store_ticks(tickFile=dsb_file, firstTick=buffer, count=len(buffer))
        
    # 新下的数据会覆盖旧的数据
    def store_bin_bar(self,storage_path,code,start_date=None,end_date=None,frequency="1m",col_map=None):
        df = self.get_bar(code,start_date,end_date,frequency)
        period = self.period_map[frequency]
        save_path = os.path.join(storage_path,"bin",period)
        if not os.path.exists(save_path):
            os.makedirs(save_path)
        dsb_path = os.path.join(save_path,f"{code}_{frequency}.dsb")
        self.bar_df_to_dsb(df,dsb_path,frequency)
        
    def store_bin_tick(self,storage_path,code,start_date=None,end_date=None,col_map=None):
        df = self.get_tick(code,start_date,end_date)
        save_path = os.path.join(storage_path,"bin","ticks")
        if not os.path.exists(save_path):
            os.makedirs(save_path)
        g = df.groupby("trading_date")
        for trading_date,g_df in g:
            g_df = g_df.reset_index()
            dsb_path = os.path.join(save_path,f"{code}_tick_{trading_date}.dsb")
            self.tick_df_to_dsb(g_df,dsb_path)
    
    # 除了转换为dsb格式,还会按照his的格式进行存储
    def store_his_bar(self,storage_path,code,start_date=None,end_date=None,frequency="1m",skip_saved=False):
        print(f"开始转存{code}")
        exchange,pid,month = self.parse_code(code)
        if exchange == "CZCE":
            month = month[-3:]
        if frequency not in self.frequency_map.keys():
            print("周期只能为m1、m5或d,回测或实盘中会自动拼接")
        period = self.period_map[frequency]
        save_path = os.path.join(storage_path,"his",period,exchange)
        if not os.path.exists(save_path):
                try:
                    os.makedirs(save_path)
                except:
                    pass
        dsb_name = f"{pid}{month}.dsb"
        dsb_path = os.path.join(save_path,dsb_name)
        if skip_saved:
            saved_list = os.listdir(save_path)
            if dsb_name in saved_list:
                print(f"重复数据,跳过{dsb_name}")
                return
        df = self.get_bar(code,start_date,end_date,frequency)
        if df is None:
            print(f"{code}没有数据")
            return
        self.bar_df_to_dsb(df,dsb_path,frequency)
        
    def store_his_tick(self,storage_path,code,start_date=None,end_date=None,skip_saved=False):
        print(f"开始转存{code}")
        exchange,pid,month = self.parse_code(code)
        if exchange == "CZCE":
            month = month[-3:]
        # 分天下载,避免内存超出
        for date in pd.date_range(start_date,end_date):
            save_path = os.path.join(storage_path,"his","ticks",exchange,date.strftime('%Y%m%d'))
            if not os.path.exists(save_path):
                try:
                    os.makedirs(save_path)
                except:
                    pass
            dsb_name = f"{pid}{month}.dsb"
            if skip_saved:
                saved_list = os.listdir(save_path)
                if dsb_name in saved_list:
                    print(f"重复数据,跳过{dsb_name}")
                    continue
            t_day = date.strftime('%Y.%m.%d')
            df = self.get_tick(code,t_day,t_day)
            if (df is None) or (df.empty):
                print(f"{date}:{code}没有数据")
                continue
            dsb_path = os.path.join(save_path,f"{pid}{month}.dsb")
            self.tick_df_to_dsb(df,dsb_path)
            
    def bar_dsb_to_csv(self,binFolder,csvFolder):
        self.dthelper.dump_bars(binFolder, csvFolder)
    
    def tick_dsb_to_csv(self,binFolder,csvFolder):
        self.dthelper.dump_ticks(binFolder, csvFolder)

下面以从米筐获取历史数据为例,讲解如何通过继承Ifeed来实现历史数据获取。

class RqFeed(Ifeed):
    def __init__(self,user=None,passwd=None):
        super().__init__()
        # 米筐的一些初始化
        self.rq = rq
        self.rq.init(user,passwd)
        # 因为每个数据源中各个字段的名称可能不同,因此需要我们自己统一转换为wt数据中的字段
        # bar数据字段映射,key为数据源中的字段,value为wt数据中对应的字段
        self.bar_col_map = {
            "date":"date",
            "time":"time",
            "open":"open",
            "high":"high",
            "low":"low",
            "close":"close",
            "total_turnover":"money",
            "volume":"vol",
            "open_interest":"hold",
        }
        # tick数据字段映射
        self.tick_col_map = {
            "code":"code",
            "exchg":"exchg",
            "last":"price",
            "open":"open",
            "high":"high",
            "low":"low",
            "volume":"total_volume",
            "vol":"volume",
            "total_turnover":"total_turnover",
            "turn_over":"turn_over",
            "open_interest":"open_interest",
            "diff_interest":"diff_interest",
            "trading_date":"trading_date",
            "date":"action_date",
            "time":"action_time",
            "prev_close":"pre_close",
            "settle_price":"settle_price",
            "prev_settlement":"pre_settle",
        }
        for i in [1,2,3,4,5]:
            self.tick_col_map[f"a{i}"] = f"ask_{i}"
            self.tick_col_map[f"b{i}"] = f"bid_{i}"
            self.tick_col_map[f"a{i}_v"] = f"ask_qty_{i}"
            self.tick_col_map[f"b{i}_v"] = f"bid_qty_{i}"
    
    # 米筐与WT中一些代码之间的转换
    def exchgStdToRQ(self,exchg:str) -> str:
        if exchg == 'SSE':
            return "XSHG"
        elif exchg == 'SZSE':
            return "XSHE"
        else:
            return exchg
    
    # 代码标准化处理
    def code_std(self,stdCode:str):
        stdCode = stdCode.upper()
        items = stdCode.split(".")
        exchg = self.exchgStdToRQ(items[0])
        if len(items) == 2:
            # 简单股票代码,格式如SSE.600000
            return items[1] + "." + exchg
        elif items[1] in ["IDX","ETF","STK","OPT"]:
            # 标准股票代码,格式如SSE.IDX.000001
            return items[2] + "." + exchg
        elif len(items) == 3:
            # 标准期货代码,格式如CFFEX.IF.2103
            if items[2] != 'HOT':
                return ''.join(items[1:])
            else:
                return items[1] + "88"
    
    # 获取tick数据df,主要要转换为统一的格式,比如字段名要重命名为WT的
    def get_tick(self,code,start_date=None,end_date=None):
        symbol = self.code_std(code)
        exchange,pid,month = self.parse_code(code)
        df = self.rq.get_price(symbol,start_date=start_date,end_date=end_date,frequency="tick")
        if df is None:
            return None
        df = df.reset_index()
        df["exchg"] = code.split(".")[0]
        df["code"] = code.split(".")[1] + code.split(".")[2]
        #改一下时间的格式
        if "datetime" in df.columns:
            df["datetime"] = pd.to_datetime(df["datetime"])
            df["date"] =  df["datetime"].dt.strftime("%Y%m%d")
            df["time"] =  df["datetime"].dt.strftime("%H%M%S%f")
            df["trading_date"] =  df["trading_date"].dt.strftime("%Y%m%d")
        else:
            df["date"] =  df["date"].dt.strftime("%Y%m%d")
            df["time"] = "000000"
            
        multiplier = self.rq.futures.get_contract_multiplier(pid.upper(),start_date,end_date)["contract_multiplier"].max()
        df["settle_price"] = ((df["total_turnover"] / df["volume"]) * multiplier).fillna(0.0)
        df["turn_over"] = (df["total_turnover"] - df["total_turnover"].shift(1)).fillna(0.0)
        df["vol"] = (df["volume"] - df["volume"].shift(1)).fillna(0.0)
        df["diff_interest"] = (df["open_interest"] - df["open_interest"].shift(1)).fillna(0.0)
        df = df[[col for col in self.tick_col_map.keys()]]
        # 重命名
        df = df.rename(columns=self.tick_col_map)
        return df
    
    # 获取bar数据的df
    def get_bar(self, code, start_date=None,end_date=None,frequency="1m"):
        if frequency not in self.frequency_map.keys():
            print("周期只能为m1、m5或d,回测或实盘中会自动拼接")
        rq_frequency = self.frequency_map[frequency]
        symbol = self.code_std(code)
        df = self.rq.get_price(symbol,start_date=start_date,end_date=end_date,frequency=rq_frequency)
        if df is None:
            return None
        df = df.reset_index()
        #改一下时间的格式
        if "datetime" in df.columns:
            df["datetime"] = pd.to_datetime(df["datetime"])
            df["date"] =  df["datetime"].dt.strftime("%Y%m%d")
            df["time"] =  df["datetime"].dt.strftime("%H%M")
        else:
            df["date"] =  df["date"].dt.strftime("%Y%m%d")
            df["time"] = "000000"
        df = df[[col for col in self.bar_col_map.keys()]]
        df = df.rename(columns=self.bar_col_map)
        return df

补充一个从csv中加载的demo,从csv中获取数据的代码略有不同,主要是接口层面的

class CsvFeed(Ifeed):
    def __init__(self):
        super().__init__()
        self.bar_col_map = {
            "date":"date",
            "time":"time",
            "open":"open",
            "high":"high",
            "low":"low",
            "close":"close",
            "turnover":"money",
            "volume":"vol",
            "hold":"hold",
        }
        self.tick_col_map = {
            "code":"code",
            "exchg":"exchg",
            "price":"price",
            "open":"open",
            "high":"high",
            "low":"low",
            "total_volume":"total_volume",
            "volume":"volume",
            "total_turnover":"total_turnover",
            "open_interest":"open_interest",
            "trade_date":"trading_date",
            "date":"action_date",
            "time":"action_time",
            "settle":"settle_price",
            "turnover":"turn_over",
            "diff_interest":"diff_interest",
            "preclose" : "pre_close",
            "presettle": "pre_settle",
            "preinterest" : "pre_interest",
        }
        for i in [1,2,3,4,5]:
            self.tick_col_map[f"askprice{i}"] = f"ask_{i}"
            self.tick_col_map[f"bidprice{i}"] = f"bid_{i}"
            self.tick_col_map[f"askqty{i}"] = f"ask_qty_{i}"
            self.tick_col_map[f"bidqty{i}"] = f"bid_qty_{i}"
            
    def get_tick(self,code,csv_path):
        df = pd.read_csv(csv_path)
        df = df.reset_index()
        df["exchg"] = code.split(".")[0]
        df["code"] = code.split(".")[1] + code.split(".")[2]
        #改一下时间的格式
        if "datetime" in df.columns:
            df["datetime"] = pd.to_datetime(df["datetime"])
            df["date"] =  df["datetime"].dt.strftime("%Y%m%d")
            df["time"] =  df["datetime"].dt.strftime("%H%M%S%f")
            df["trading_date"] =  df["trading_date"].dt.strftime("%Y%m%d")
        else:
            df["date"] =  df["date"].astype("str")
            df["time"] = "000000"
            
        df["diff_interest"] = 0 # 如果没有的就自己设个0
        
        df = df[[col for col in self.tick_col_map.keys()]]
        df = df.rename(columns=self.tick_col_map)
        return df
    
    def get_bar(self, code, csv_path):
        df = pd.read_csv(csv_path)
        if df is None:
            return None
        df = df.reset_index()
        #改一下时间的格式
        if "datetime" in df.columns:
            df["datetime"] = pd.to_datetime(df["datetime"])
            df["date"] =  df["datetime"].dt.strftime("%Y%m%d")
            df["time"] =  df["datetime"].dt.strftime("%H%M%S")
        else:
            df["date"] =  pd.to_datetime(df["date"]).dt.strftime("%Y%m%d")
            df["time"] = "000000"
        df = df[[col for col in self.bar_col_map.keys()]]
        df = df.rename(columns=self.bar_col_map)
        return df
    
    def store_his_bar(self,storage_path,code,csv_path,frequency="m1"):
        print(f"开始转存{code}")
        if frequency not in self.frequency_map.keys():
            print("周期只能为m1、m5或d,回测或实盘中会自动拼接")
        period = self.period_map[frequency]
        exchange,pid,month = self.parse_code(code)
        if exchange == "CZCE":
            month = month[-3:]
        save_path = os.path.join(storage_path,"his",period,exchange)
        if not os.path.exists(save_path):
            try:
                os.makedirs(save_path)
            except:
                pass
        dsb_name = f"{pid}{month}.dsb"
        dsb_path = os.path.join(save_path,dsb_name)
        df = self.get_bar(code,csv_path)
        if df is None:
            print(f"{code}没有数据")
            return
        self.bar_df_to_dsb(df,dsb_path,frequency)
        
    def store_his_tick(self,storage_path,code,csv_path):
        print(f"开始转存{code}")
        exchange,pid,month = self.parse_code(code)
        if exchange == "CZCE":
            month = month[-3:]
        df = self.get_tick(code,csv_path)
        for index,g_df in df.groupby("trading_date"):
            save_path = os.path.join(storage_path,"his","ticks",exchange,str(index))
            if not os.path.exists(save_path):
                try:
                    os.makedirs(save_path)
                except:
                    pass
            dsb_name = f"{pid}{month}.dsb"
            dsb_path = os.path.join(save_path,f"{pid}{month}.dsb")
            self.tick_df_to_dsb(df,dsb_path)

具体使用

    # 从米筐下载数据
    feed = RqFeed()
    storage_path = "./storage"
    # 输入的代码记得区分大小写
    feed.store_his_bar(storage_path,"SHFE.ni.2201",start_date="20211225",end_date="20220101",frequency="m1",skip_saved=False)
    feed.store_his_tick(storage_path,"SHFE.ni.2201",start_date="20211225",end_date="20220101",skip_saved=False)

    # 从csv中下载数据
    feed = CsvFeed()
    storage_path = "./storage"
    feed.store_his_tick(storage_path, "SHFE.ni.2204","ni2204.csv")
    feed.store_his_bar(storage_path, "SHFE.ni.2204","SHFE.NI.2202_m1.csv",frequency="m1")

    # 读取dsb数据
    dtHelper = WtDataHelper()
    dtHelper.dump_bars(binFolder="./storage/his/min1/SHFE/", csvFolder="min1_csv")
    dtHelper.dump_ticks(binFolder="./storage/his/ticks/SHFE/20211227/", csvFolder="ticks_csv")
Tip

继承Ifeed的子类主要需要实现get_tick与get_bar两个功能,在这两个功能中,需要返回wt格式的DataFarme有一下需要注意的细节:

  • 字段要映射对
  • bar中的time字段,精确到分钟
  • tick则需精确到毫秒,具体参考上面的例子
  • 自己缺少的字段自己要给默认值,但不能没有