In [1]:
import pandas as pd
import numpy as np
import itertools
from functools import reduce
import os
import multiprocessing
import time
import warnings
from tqdm import tqdm
import datetime

warnings.filterwarnings("ignore")

pd.options.display.width = 1200
pd.options.display.max_colwidth = 100
pd.options.display.max_columns = 10
pd.options.mode.chained_assignment = None

In [2]:
def beforeClosingVolumePercent(trade):
    tradeTime = trade["TradeTime"].str.slice(11,23)
    beforeClosingVolume = trade["TradeQty"][(tradeTime >= "14:30:00.000")&(tradeTime <= "15:00:00.000")].sum()
    totalVolume = trade["TradeQty"].sum()
    res = beforeClosingVolume / totalVolume
    return res

In [None]:
df = pd.read_csv("/ssd/ssd3/data/oneStock_oneFile_TL/20230201/trade/000001.csv")
t0 = time.time()
res = beforeClosingVolumePercent(df)
print("cal time: ", time.time() - t0, "s")
print(res)

In [5]:
def pool_func(tick_obj, trade_path_obj):
    single_tick_res = pd.DataFrame(columns=["DATE","BCVP"])
    tmp_date = trade_path_obj.split('/')[-2]
    # print(tmp_date)
    tmp_date = tmp_date[0:4] + "-" + tmp_date[4:6] + "-" + tmp_date[6:8]
    # print(tmp_date)
    for tick in tqdm(tick_obj):
        single_tick_res.at[tick[:6], "DATE"] = tmp_date
        try:
            df = pd.read_csv(os.path.join(trade_path_obj, tick))

            Indicator = beforeClosingVolumePercent(df)
            # print(Indicator)
            # print("开盘后大单净买入占比:", Indicator)
            single_tick_res.at[tick[:6], "BCVP"] = Indicator

        except Exception as error:
            single_tick_res.at[tick[:6], "BCVP"] = np.nan
            continue

    return single_tick_res


class multi_task_split:

    def __init__(self, data, processes_to_use):
        self.data = data
        self.processes_to_use = processes_to_use

    def num_of_jobs(self):
        return min(len(self.data), self.processes_to_use, multiprocessing.cpu_count())

    def split_args(self):
        q, r = divmod(len(self.data), self.num_of_jobs())
        return (self.data[i * q + min(i, r): (i + 1) * q + min(i + 1, r)] for i in range(self.num_of_jobs()))

In [None]:
n_use = 24
# 路径修改为存放数据路径
trade_path = r"/ssd/ssd3/data/oneStock_oneFile_TL/20230201/trade"
stock_pool = os.listdir(trade_path)
processes_decided = multi_task_split(stock_pool, n_use).num_of_jobs()
print("进程数：", processes_decided)
split_args_to_process = list(multi_task_split(stock_pool, n_use).split_args())
args = [(split_args_to_process[i], trade_path) for i in range(len(split_args_to_process))]
print("#" * 50 + "Multiprocessing Start" + "#" * 50)
t0 = time.time()
with multiprocessing.Pool(processes=processes_decided) as pool:
    res = tqdm(pool.starmap(pool_func, args))
    print("cal time: ", time.time() - t0, "s")
    res_combined = pd.concat(res, axis=0)
    pool.close()
    print("cal time: ", time.time() - t0, "s")
print(res_combined)