diff --git a/.gitignore b/.gitignore index 96e81ed..cd6faca 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ # Ignoring are files .out +.nfile # Ignoring directory .vscode diff --git a/.gitmodules b/.gitmodules index ada3e15..f439b3c 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,9 +1,16 @@ -[submodule "extern/binacpp"] - path = extern/binacpp - url = https://github.com/binance-exchange/binacpp.git -[submodule "extern/libpqxx"] - path = extern/libpqxx +[submodule "_deps/cpp-httplib"] + path = _deps/cpp-httplib + url = https://github.com/yhirose/cpp-httplib.git + branch = master +[submodule "_deps/binacpp"] + path = _deps/binacpp + url = https://github.com/lpdgrl/binacpp.git + branch = master +[submodule "_deps/libpqxx"] + path = _deps/libpqxx url = https://github.com/jtv/libpqxx.git -[submodule "extern/flatbuffers"] - path = extern/flatbuffers + branch = master +[submodule "_deps/flatbuffers"] + path = _deps/flatbuffers url = https://github.com/google/flatbuffers.git + branch = master diff --git a/CMakeLists.txt b/CMakeLists.txt index 9c2a90f..29972d1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,21 +1,17 @@ cmake_minimum_required(VERSION 3.10) project (fhc) -set(CMAKE_CXX_STANDARD 20) +set(CMAKE_CXX_STANDARD 23) set(CMAKE_CXX_STANDARD_REQUIRED ON) -set(BINACPP_DIR extern/binacpp) -set(LIBPQXX_DIR extern/libpqxx) -set(FLATBUFFERS_DIR extern/flatbuffers) - +set(BINACPP_DIR _deps/binacpp) +set(LIBPQXX_DIR _deps/libpqxx) +set(FLATBUFFERS_DIR _deps/flatbuffers) +set(CPP_HTTPLIB_DIR _deps/cpp-httplib) set(FHC_HEADERS_DIR include) -set(FHC_CPP_DIR src) - -# set(FHC_CPP -# ${FHC_CPP_DIR}/main.cpp -# ) - +set(FHC_SOURCE_DIR src) +set(EXAMPLES_DIR ${CMAKE_CURRENT_SOURCE_DIR}/examples) # Add binacpp add_subdirectory(${BINACPP_DIR}) @@ -25,6 +21,11 @@ add_subdirectory(${LIBPQXX_DIR}) # Add flattbuffers add_subdirectory(${FLATBUFFERS_DIR}) +# Add cpp-httplib +add_subdirectory(${CPP_HTTPLIB_DIR}) + +add_subdirectory(${EXAMPLES_DIR}) + # Generate flatbuffers files set(FLATC ${FLATBUFFERS_DIR}/flatc) set(SCHEMA ${CMAKE_CURRENT_SOURCE_DIR}/schemas/klines.fbs) @@ -33,32 +34,42 @@ set(GENERATED ${CMAKE_CURRENT_SOURCE_DIR}/include/flatbuffers/klines_generated.h add_custom_command( OUTPUT ${GENERATED} COMMAND ${FLATC} --cpp -o ${CMAKE_CURRENT_SOURCE_DIR}/include/flatbuffers/ ${SCHEMA} - DEPENDS ${SCHEMA} + DEPENDS ${SCHEMA} flatc COMMENT "Generating FlatBuffers code from ${SCHEMA}" ) add_custom_target(generate_flatbuffers DEPENDS ${GENERATED}) -# Paths to include -include_directories( - ${CMAKE_SOURCE_DIR}/extern/binacpp/src - FHC_HEADERS_DIR -) - # Target executable file -add_executable(fhc - ${FHC_CPP_DIR}/main.cpp - ${FHC_CPP_DIR}/server/server.cpp - ${FHC_CPP_DIR}/server/request_handler.cpp - ${FHC_CPP_DIR}/base/database.cpp - ${FHC_CPP_DIR}/base/pqxx_adapter.cpp - ${FHC_CPP_DIR}/base/binance_client.cpp +add_library(fhc STATIC + ${FHC_SOURCE_DIR}/server/server.cpp + ${FHC_SOURCE_DIR}/server/request_handler.cpp + ${FHC_SOURCE_DIR}/server/utils.cpp + ${FHC_SOURCE_DIR}/server/httplib_adapter.cpp + ${FHC_SOURCE_DIR}/base/database.cpp + ${FHC_SOURCE_DIR}/base/pqxx_adapter.cpp + ${FHC_SOURCE_DIR}/base/binance_client.cpp + ${FHC_SOURCE_DIR}/base/sql_loader.cpp ) add_dependencies(fhc generate_flatbuffers) -target_link_libraries(fhc PRIVATE +target_include_directories(fhc PUBLIC + ${BINACPP_DIR}/src + ${FHC_HEADERS_DIR} +) + +target_link_libraries(fhc PUBLIC binacpp pqxx flatbuffers + httplib +) + +target_compile_definitions(fhc PRIVATE + CMAKE_CONFIG_PATH="${CMAKE_SOURCE_DIR}/config/" +) + +target_compile_definitions(fhc PRIVATE + CMAKE_SQL_PATH="${CMAKE_SOURCE_DIR}/sql/" ) \ No newline at end of file diff --git a/README.MD b/README.MD new file mode 100644 index 0000000..9afc530 --- /dev/null +++ b/README.MD @@ -0,0 +1,100 @@ +# Binance historical storage +## Цель проекта +Предпологается, что будет использоваться как API для построения сервисов связанных с выводом информации о криптовалюте (пока только добавлены свечи BTCUSDT). + +## Зависимости от third-party +В качестве хранилища данных используется Postgres. + +Библиотека Libpqxx используется для взаимодействия с Postgres. + +Библиотека flatbuffers используется для zero-copy сериализации данных. + +Библиотека http-lib используется для сетевого взаимодействия. В будущем добавлю поддержку boost::beast. + +Библиотека binacpp используется для получения данных с биржи Binance. Ее пришлось форкнуть, так как нужно было добавить поддержку api v3 from Binance и CMake сборку. + +Все используемые библиотеки затянуты в проект при помощи git submodule. + +## Как собрать проект? +Для сборки проекта используется CMake и генератор Ninja. А так же должен быть установлен curl и docker-compose в системе. Проект собирается как STATIC библиотека. + +Файл конфигурации для запуска через docker compose находится в корне репозитория. Он подтягивает образ Postgres и adminer (web-морда для Postgres). + +Например, чтобы установить curl в Arch Linux, введите команду в терминале: +```shell +pacman -Sy curl docker-composes +``` + +Запуск рабочего окружения в docker-compose в корне репозитория: +```shell +sudo docker-compose up -d +``` + +Клонируем репозиторий: +```shell +git clone https://github.com/lpdgrl/FHC.git +``` + +Подтягиваем зависимости: +```shell +git submodule update --init --recursive +``` + +Выполняем сборку в корне репозитория: +```shell +mkdir build && cd build +cmake .. -G Ninja +cmake --build . +``` + +В папке examples находится http-пример для запуска. +Перед первым запуском нужно подготовить конфигурацию в конфиг-файле (.nfile) в директории config. С репозиторием поставляется шаблонный конфиг-файл .nfile-template и для быстрого старта +вы можете его переименовать в .nfile + +```shell +mv .nfile-template .nfile +``` + +Структура конфиг-файла простая: +```shell + DB_HOST=localhost # хост базы данных Postgres + DB_NAME=postgres # имя базы данных + DB_USER=postgres # имя пользователя базы данных + DB_PASSWORD=postgres # пароль базы данных + HTTP_HOST=0.0.0.0 # ip-address http-сервера + HTTP_PORT=8000 # порт http-сервера +``` + +Теперь все готово, и чтобы запустить http-пример, введите в терминал: +```shell +{подставить свой путь}/build/examples/http_example +``` +или, если находитесь в ./build/examples/ + +```shell +./http-example +``` + +## Как использовать? +При первоначальном запуске, база данных будет заполняться свечами BTCUSDT с захардкоженной датой 2025-08-01 по текущую дату. Планирую в будущем, добавить возможность чтобы заполнять базу данных с произвольной даты. + +В строке браузера введите: +```shell +http://localhost:8000/ +``` +и вам отобразится веб-страница с выбором дат для получения flatbuffer со свечами по BTCUSDT за выбранный период. Для проверки, что массив байтов пришел, необходимо открыть в браузере консоль из инструментов разработчика и увидите количество переданных байтов и массив сырых байтов. + +Чтобы получить обновления по свечам (пока только BTCUSDT) введите в браузере: +```shell +http://localhost:8000/api/update +``` +После заполнения базы данных новыми данными на странице отобразится количество добавленных свечей в базу данных либо, что данные в базе данных на текущее время актуальны. + +## Какой функционал реализован? +SQL Loader для загрузки sql-файлов и подстановка в них параметров при помощи механизма libpqxx. + +Передача произвольного GET-обработчика внутрь httplib. Это подробно показано в примере examples\http-example. + +Получение и обновление klines по одной криптовалюте. + +[TODO проекта](TODO.MD) \ No newline at end of file diff --git a/TODO.MD b/TODO.MD new file mode 100644 index 0000000..430b2a9 --- /dev/null +++ b/TODO.MD @@ -0,0 +1,24 @@ +## TODO: +* [x] 1. Сделать конфиги через .envfile окружение или подобное. Сделать файл .envtemplate + +* [x] 2. http запросы (userver). Запросы update и klines. + +* [ ] 3. Миграционный скрипт (ранбук на первый запуск) + +* [x] 4. Хранимые процедуры или sql файл. + +* [x] 5. Оформить репо (readme, examples) + +* [x] 6. Форкнуть binacpp + +* [ ] 7. Выгрузить свечи BTCUSDT, ETHUSDT, SOLUSDT + +* [ ] 8. Выгрузить price по BTC, ETH, SOL + +* [ ] 9. Сделать docker-образ с приложением + +* [ ] 10. Постоянное обновление данных + +* [ ] 11. Прикрутить фронт с flattbuffers + +* [ ] 12. Таблица для хранения symbol криптовалюты которую нужно выгружать\обновлять \ No newline at end of file diff --git a/_deps/binacpp b/_deps/binacpp new file mode 160000 index 0000000..69a4523 --- /dev/null +++ b/_deps/binacpp @@ -0,0 +1 @@ +Subproject commit 69a4523044ba7718d8eb29abeca1edf51721f9eb diff --git a/_deps/cpp-httplib b/_deps/cpp-httplib new file mode 160000 index 0000000..27ee115 --- /dev/null +++ b/_deps/cpp-httplib @@ -0,0 +1 @@ +Subproject commit 27ee115a60522c16c1e153c4e3de777a8f9b794a diff --git a/_deps/flatbuffers b/_deps/flatbuffers new file mode 160000 index 0000000..ac8b124 --- /dev/null +++ b/_deps/flatbuffers @@ -0,0 +1 @@ +Subproject commit ac8b1244965f33254c348578b95a6953c6faf0a6 diff --git a/extern/libpqxx b/_deps/libpqxx similarity index 100% rename from extern/libpqxx rename to _deps/libpqxx diff --git a/config/.nfile-template b/config/.nfile-template new file mode 100644 index 0000000..6baf6fe --- /dev/null +++ b/config/.nfile-template @@ -0,0 +1,6 @@ +DB_HOST=localhost +DB_NAME=postgres +DB_USER=postgres +DB_PASSWORD=postgres +HTTP_HOST=0.0.0.0 +HTTP_PORT=8000 \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index d7f4112..ce685b7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -7,8 +7,8 @@ services: shm_size: 128mb environment: POSTGRES_USER: postgres - POSTGRES_PASSWORD: kis123Oits - POSTGRES_DB: fhc + POSTGRES_PASSWORD: postgres + POSTGRES_DB: postgres ports: - "5432:5432" diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt new file mode 100644 index 0000000..9a3ca94 --- /dev/null +++ b/examples/CMakeLists.txt @@ -0,0 +1,9 @@ +# Пример CMakeLists.txt в examples/example1/ +add_executable(http-example http_example.cpp) + + +target_link_libraries(http-example PRIVATE fhc) + +target_compile_definitions(http-example PRIVATE + CMAKE_PATH_TO_WEB_FILES="${CMAKE_SOURCE_DIR}/examples/web/" +) \ No newline at end of file diff --git a/examples/http_example.cpp b/examples/http_example.cpp new file mode 100644 index 0000000..0cec5c8 --- /dev/null +++ b/examples/http_example.cpp @@ -0,0 +1,56 @@ +#include "../include/server/server.hpp" + +#include + +void RunServer(); + +int main(int argc, const char** argv) { + RunServer(); + + return EXIT_SUCCESS; +} + +void RunServer() { + using namespace fhc; + server::Server server; + + auto api_klines = [&](const std::unordered_map& params, auto write) { + std::string start_date; + std::string end_date; + + if (auto it_find_start_time = params.find("StartTime"); it_find_start_time != params.end()) { + start_date = it_find_start_time->second; + } + + if (auto it_find_end_time = params.find("EndTime"); it_find_end_time != params.end()) { + end_date = it_find_end_time->second; + } + + if (start_date.empty() || end_date.empty()) { + std::cerr << "StartTime or EndTime is empty!" << std::endl; + } + + flatbuffers::FlatBufferBuilder builder = server.GetKlinesFromStorage(start_date, end_date); + + auto buf = std::make_shared>( + builder.GetBufferPointer(), + builder.GetBufferPointer() + builder.GetSize() + ); + + write(reinterpret_cast(buf->data()), buf->size()); + }; + server.SetHandler("/api/klines", {server::HTTP_METHOD::GET, api_klines, "application/octet-stream"}); + + auto api_klines_update = [&](const std::unordered_map& params, auto write) { + std::string result = server.UpdateKlinesInStorage(); + + write(reinterpret_cast(result.data()), result.size()); + }; + + server.SetHandler("/api/update", {server::HTTP_METHOD::GET, api_klines_update, "text/plain"}); + + server.Init(); + server.SetMountPoint("/", CMAKE_PATH_TO_WEB_FILES); + std::cout << "Start server!" << std::endl; + server.Run(); +} diff --git a/examples/web/index.html b/examples/web/index.html new file mode 100644 index 0000000..e389772 --- /dev/null +++ b/examples/web/index.html @@ -0,0 +1,35 @@ + + + + +Test Klines + + + +

FlatBuffers Raw Test

+
+ Start Date: + End Date: + +
+ + + + + diff --git a/extern/binacpp b/extern/binacpp deleted file mode 160000 index 49a5139..0000000 --- a/extern/binacpp +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 49a51392ca1b8c72194a5a5b7bb9621413d3214d diff --git a/extern/flatbuffers b/extern/flatbuffers deleted file mode 160000 index 4c0eecd..0000000 --- a/extern/flatbuffers +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 4c0eecd25adee9aa4df363d6487049f61a4b1c7d diff --git a/include/base/istorage_adapter.hpp b/include/base/istorage_adapter.hpp index d2e4978..0423b12 100644 --- a/include/base/istorage_adapter.hpp +++ b/include/base/istorage_adapter.hpp @@ -16,8 +16,8 @@ namespace fhc::base::interface { virtual bool IsConnected() const = 0; virtual void Execute(const std::string& query) const = 0; - virtual std::vector> Query(const std::string& query) const = 0; - + // virtual std::vector> Query(const std::string& query, Args&& args) const = 0; + // virtual std::vector> Query(const std::string& query) const = 0; virtual ~IStorageAdapter() = default; }; } diff --git a/include/base/pqxx_adapter.hpp b/include/base/pqxx_adapter.hpp index 932a445..9294333 100644 --- a/include/base/pqxx_adapter.hpp +++ b/include/base/pqxx_adapter.hpp @@ -9,15 +9,51 @@ namespace fhc::base::interface { void Connect(const std::string& options) override; void Disconnect() override; - + void Execute(const std::string& query) const override; - std::vector> Query(const std::string& query) const; + template + void ExecuteWithParams(const std::string& query, Args&&... args) const; + + template + std::vector> Query(const std::string& query, Args&&... args) const; + bool IsConnected() const override; ~PqxxAdapter() = default; private: + template + pqxx::params MakeParams(Args&&... args) const; + std::vector> PqxxResultToVectorStr(pqxx::result& result) const; + std::unique_ptr connection_; }; + + template + pqxx::params PqxxAdapter::MakeParams(Args&&... args) const { + pqxx::params p; + (p.append(std::forward(args)), ...); + + return p; + } + + template + void PqxxAdapter::ExecuteWithParams(const std::string& query, Args&&... args) const { + pqxx::work txn(*connection_); + pqxx::result r = txn.exec(query, MakeParams(std::forward(args)...)); + txn.commit(); + } + + template + std::vector> PqxxAdapter::Query(const std::string& query, Args&&... args) const { + // TODO: Switch to nontransaction object + + pqxx::work txn(*connection_); + pqxx::result r = txn.exec(query, MakeParams(std::forward(args)...)); + txn.commit(); + + std::vector> result(std::move(PqxxResultToVectorStr(r))); + return result; + } } diff --git a/include/base/sql_loader.hpp b/include/base/sql_loader.hpp new file mode 100644 index 0000000..996f465 --- /dev/null +++ b/include/base/sql_loader.hpp @@ -0,0 +1,22 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace fhc::base::sql_loader { + // static const std::string SQL_DIR_PATH("/sql/"); + static const std::string SQL_FILE_SUFFIX(".sql"); + + class SqlLoader { + public: + SqlLoader() = default; + void LoadSQL(const std::string& file_name); + std::string_view GetSQL(const std::string& sql_query_name) const; + + private: + std::unordered_map queries_; + }; +} diff --git a/include/server/httplib_adapter.hpp b/include/server/httplib_adapter.hpp new file mode 100644 index 0000000..7da751f --- /dev/null +++ b/include/server/httplib_adapter.hpp @@ -0,0 +1,33 @@ +#pragma once + +#include + +#include + +#include "ihttp_server.hpp" + +namespace fhc::server::http_adapter { + class HttpLibAdapter : public IHttpServer { + public: + HttpLibAdapter() = default; + + void Init(HttpConfig config) override; + void Run() override; + void Stop() override; + + void Post(const std::string&, RequestHandler handler) override; + + void SetMountPoint(const std::string& mount_point, const std::string& path_mount_point) override; + + void Get(const std::string& path, StreamResponse::Handler handler, + const std::string& content_type) override; + + const std::string& GetPort() const override; + int GetIp() const override; + + ~HttpLibAdapter() = default; + private: + std::unique_ptr http_svr_; + HttpConfig http_config_; + }; +} diff --git a/include/server/ihttp_server.hpp b/include/server/ihttp_server.hpp new file mode 100644 index 0000000..815cf7f --- /dev/null +++ b/include/server/ihttp_server.hpp @@ -0,0 +1,45 @@ +#pragma once + +#include +#include +#include + +namespace fhc::server::http_adapter { + struct HttpConfig { + std::string ip; + int port; + }; + + struct Request { + std::string path; + std::unordered_map params; + }; + + struct StreamResponse { + using WriteFunc = std::function; + using Handler = std::function, WriteFunc write)>; + }; + + class IHttpServer { + public: + using RequestHandler = std::function& params, + std::string& response_body, + std::string& content_type)>; + + virtual void Init(HttpConfig) = 0; + virtual void Run() = 0; + virtual void Stop() = 0; + + virtual void Get(const std::string& path, StreamResponse::Handler handler, + const std::string& content_type) = 0; + virtual void Post(const std::string&, RequestHandler) = 0; + + virtual void SetMountPoint(const std::string&, const std::string&) = 0; + + virtual const std::string& GetPort() const = 0; + virtual int GetIp() const = 0; + + virtual ~IHttpServer() = default; + }; +} diff --git a/include/server/request_handler.hpp b/include/server/request_handler.hpp index 9a4b773..a857437 100644 --- a/include/server/request_handler.hpp +++ b/include/server/request_handler.hpp @@ -3,8 +3,13 @@ #include "../base/pqxx_adapter.hpp" #include "../base/binance_client.hpp" #include "../flatbuffers/klines_generated.h" +#include "../base/sql_loader.hpp" + namespace fhc::server { + namespace krn = std::chrono; + + using RespFromQuery = std::vector>; struct TimeStamp { int year; @@ -15,24 +20,42 @@ namespace fhc::server { class RequestHandler { public: RequestHandler() = delete; - RequestHandler(std::shared_ptr db_adapter); - + RequestHandler(std::shared_ptr db_adapter, + std::shared_ptr sql_loader); + + void InitDataBase() const; void ExecuteQuery(const std::string& query) const; + bool IsTableExists(const std::string& table) const; - std::vector GetKlines(const std::string& symbol, const std::string& interval, uint64_t start_time, uint64_t end_time) const; - - void SendToFront(TimeStamp timestamp_start, TimeStamp timestamp_end) const; - - void PopulateDatabase() const; + flatbuffers::FlatBufferBuilder GetKlinesFromStorage(const std::string& timestamp_start, const std::string& timestamp_end) const; + std::string UpdateKlinesInStorage() const; + + void PopulateDatabase(TimeStamp timestamp) const; ~RequestHandler() = default; private: + std::uint64_t StrToUint64(const std::string& str) const; + krn::year_month_day StrDateToEpochUnixTime(const std::string& str_date) const; + std::uint64_t StrStartDateToEpochUnixTime(const std::string& str_start_date) const; + std::uint64_t StrEndDateToEpochUnixTime(const std::string& str_end_date) const; + + std::vector GetKlines( + const std::string& symbol, + const std::string& interval, + uint64_t start_time, + uint64_t end_time) const; + std::uint64_t GetTimeFromBinance() const; + std::uint64_t BinanceTimeToEpochUnixTime(std::uint64_t binance_time) const; std::uint64_t EpochUnixTimeToBinanceTime(std::uint64_t unix_time) const; std::uint64_t GetTimeSinceEpoch(TimeStamp timestamp) const; - - std::shared_ptr db_adapter_; - std::unique_ptr binance_client_; + + flatbuffers::FlatBufferBuilder CreateFlatBuffer(const RespFromQuery& resp) const; + + private: + std::shared_ptr db_adapter_; + std::shared_ptr sql_loader_; + std::unique_ptr binance_client_; }; } \ No newline at end of file diff --git a/include/server/server.hpp b/include/server/server.hpp index 8bf7b7f..f52582f 100644 --- a/include/server/server.hpp +++ b/include/server/server.hpp @@ -1,11 +1,29 @@ #pragma once #include "../base/database.hpp" +#include "../base/sql_loader.hpp" #include "../server/request_handler.hpp" +#include "../server/utils.hpp" +#include "../server/httplib_adapter.hpp" + +#include namespace fhc::server { + enum class HTTP_METHOD { + GET, + POST, + }; + class Server final { public: + using Handler = http_adapter::StreamResponse::Handler; + + struct HTTPHandler { + HTTP_METHOD http_method; + Handler handler; + std::string content_type; + }; + Server() = default; void Init(); @@ -13,12 +31,24 @@ namespace fhc::server { void Stop(); bool IsRun() const; + void SetHandler(const std::string& path, HTTPHandler http_handler); + void RemoveHandler(const std::string& path); + void SetMountPoint(const std::string& root, const std::string& path_to_files) const; + + flatbuffers::FlatBufferBuilder GetKlinesFromStorage(const std::string& start_date, const std::string& end_date) const; + + std::string UpdateKlinesInStorage() const; + ~Server() = default; private: - void InitializationDatabase(); + void AddHandlerToHttpSvr() const; std::unique_ptr db_; - std::unique_ptr handler_; + std::unique_ptr request_handler_; + std::unique_ptr http_svr_; + std::shared_ptr sql_loader_; + bool running_ = false; + std::unordered_map handlers_; }; } diff --git a/include/server/utils.hpp b/include/server/utils.hpp new file mode 100644 index 0000000..09d3991 --- /dev/null +++ b/include/server/utils.hpp @@ -0,0 +1,27 @@ +#pragma once + +#include +#include +#include +#include + +namespace fhc::server::utils { + struct Config { + std::string host; + std::string name; + std::string user; + std::string password; + std::string http_host; + int http_port; + + std::string operator()() const { + return {"host=" + host + " dbname=" + name + " user=" + user + " password=" + password}; + } + }; + + static const std::string CONFIG_FILE(".nfile"); + + static const char SIGN_EQUAL = '='; + + Config ReadConfig(); +}; \ No newline at end of file diff --git a/sql/create_table_crypto_klines.sql b/sql/create_table_crypto_klines.sql new file mode 100644 index 0000000..8e3012b --- /dev/null +++ b/sql/create_table_crypto_klines.sql @@ -0,0 +1,16 @@ +CREATE TABLE IF NOT EXISTS crypto_klines ( + id SERIAL PRIMARY KEY, + symbol VARCHAR(10) NOT NULL, + open_time BIGINT NOT NULL, + open NUMERIC(18,8) NOT NULL, + high NUMERIC(18,8) NOT NULL, + low NUMERIC(18,8) NOT NULL, + close NUMERIC(18,8) NOT NULL, + volume NUMERIC(24,10) NOT NULL, + close_time BIGINT NOT NULL, + quote_asset_volume NUMERIC(24,10) NOT NULL, + trades_count INTEGER NOT NULL, + taker_buy_base_volume NUMERIC(24,10) NOT NULL, + taker_buy_quote_volume NUMERIC(24,10) NOT NULL, + UNIQUE (symbol, open_time, close_time) +) \ No newline at end of file diff --git a/sql/insert_kline.sql b/sql/insert_kline.sql new file mode 100644 index 0000000..0cc9e35 --- /dev/null +++ b/sql/insert_kline.sql @@ -0,0 +1,3 @@ +INSERT INTO crypto_klines +(symbol, open_time, open, high, low, close, volume, close_time, quote_asset_volume, trades_count, taker_buy_base_volume, taker_buy_quote_volume) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) \ No newline at end of file diff --git a/sql/select_exists_crypto_klines.sql b/sql/select_exists_crypto_klines.sql new file mode 100644 index 0000000..6247527 --- /dev/null +++ b/sql/select_exists_crypto_klines.sql @@ -0,0 +1 @@ +SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'crypto_klines') \ No newline at end of file diff --git a/sql/select_klines.sql b/sql/select_klines.sql new file mode 100644 index 0000000..842f820 --- /dev/null +++ b/sql/select_klines.sql @@ -0,0 +1 @@ +SELECT * FROM crypto_klines WHERE open_time >= $1 AND close_time <= $2 \ No newline at end of file diff --git a/sql/select_last_kline.sql b/sql/select_last_kline.sql new file mode 100644 index 0000000..4c421eb --- /dev/null +++ b/sql/select_last_kline.sql @@ -0,0 +1 @@ +SELECT open_time FROM crypto_klines ORDER BY open_time DESC LIMIT 1 \ No newline at end of file diff --git a/src/base/pqxx_adapter.cpp b/src/base/pqxx_adapter.cpp index 7a68aaa..b0d9899 100644 --- a/src/base/pqxx_adapter.cpp +++ b/src/base/pqxx_adapter.cpp @@ -42,31 +42,24 @@ namespace fhc::base::interface { txn.commit(); } - std::vector> PqxxAdapter::Query(const std::string& query) const { - // TODO: Switch to nontransaction object - std::vector> result; - pqxx::work txn(*connection_); + bool PqxxAdapter::IsConnected() const { + return connection_ && connection_->is_open(); + } - pqxx::result r = txn.exec(query); - txn.commit(); + std::vector> PqxxAdapter::PqxxResultToVectorStr(pqxx::result& result) const { + std::vector> res; - if (r.size() > 0) { - result.reserve(r.size()); + if (result.size() > 0) { + res.reserve(result.size()); } - - for (size_t i = 0; i < r.size(); ++i) { + for (size_t i = 0; i < result.size(); ++i) { std::vector row; - for (size_t j = 0; j < r[i].size(); ++j) { - row.emplace_back(r[i][j].c_str()); + for (size_t j = 0; j < result[i].size(); ++j) { + row.emplace_back(result[i][j].as()); } - result.push_back(std::move(row)); + res.push_back(std::move(row)); } - - return result; - } - - bool PqxxAdapter::IsConnected() const { - return connection_ && connection_->is_open(); + return res; } } \ No newline at end of file diff --git a/src/base/sql_loader.cpp b/src/base/sql_loader.cpp new file mode 100644 index 0000000..ba149f2 --- /dev/null +++ b/src/base/sql_loader.cpp @@ -0,0 +1,23 @@ +#include "../../include/base/sql_loader.hpp" + +namespace fhc::base::sql_loader { + void SqlLoader::LoadSQL(const std::string& file_name) { + std::ifstream sql_file(CMAKE_SQL_PATH + file_name + SQL_FILE_SUFFIX, std::ios::in); + + if (!sql_file.is_open()) { + std::cerr << "File " << file_name << " doesn't opened!" << std::endl; + } + + std::string line; + while (std::getline(sql_file, line)) { + queries_[file_name] += line; + } + } + + std::string_view SqlLoader::GetSQL(const std::string& sql_query_name) const { + if (auto it_sql = queries_.find(sql_query_name); it_sql != queries_.end()) { + return it_sql->second; + } + return {}; + } +} diff --git a/src/main.cpp b/src/main.cpp deleted file mode 100644 index 322a750..0000000 --- a/src/main.cpp +++ /dev/null @@ -1,11 +0,0 @@ -#include "../include/server/server.hpp" - -int main() { - - - fhc::server::Server server; - server.Init(); - server.Run(); - - return EXIT_SUCCESS; -} \ No newline at end of file diff --git a/src/server/httplib_adapter.cpp b/src/server/httplib_adapter.cpp new file mode 100644 index 0000000..18a7e59 --- /dev/null +++ b/src/server/httplib_adapter.cpp @@ -0,0 +1,56 @@ +#include "../../include/server/httplib_adapter.hpp" + +namespace fhc::server::http_adapter { + + void HttpLibAdapter::Init(HttpConfig config) { + http_svr_ = std::make_unique(); + http_config_ = config; + } + + + void HttpLibAdapter::Post(const std::string&, RequestHandler handler) { + + } + + void HttpLibAdapter::Get(const std::string& path, StreamResponse::Handler handler, + const std::string& content_type) + { + // TODO: Add message if path is empty + if (path.empty()) { + return; + } + + http_svr_->Get(path, [handler, content_type](const httplib::Request& request, httplib::Response& response) { + std::unordered_map params; + for (const auto& param : request.params) { + params[param.first] = param.second; + } + + auto write = [&](const char* data, size_t size) { + response.set_content(data, size, content_type); + }; + + handler(params, write); + }); + } + + void HttpLibAdapter::SetMountPoint(const std::string& mount_point, const std::string& path_mount_point) { + http_svr_->set_mount_point(mount_point, path_mount_point); + } + + void HttpLibAdapter::Run() { + http_svr_->listen(http_config_.ip, http_config_.port); + } + + void HttpLibAdapter::Stop() { + http_svr_->stop(); + } + + const std::string& HttpLibAdapter::GetPort() const { + return http_config_.ip; + } + + int HttpLibAdapter::GetIp() const { + return http_config_.port; + } +} \ No newline at end of file diff --git a/src/server/request_handler.cpp b/src/server/request_handler.cpp index a8e9bea..392ad87 100644 --- a/src/server/request_handler.cpp +++ b/src/server/request_handler.cpp @@ -2,29 +2,44 @@ namespace fhc::server { - RequestHandler::RequestHandler(std::shared_ptr db_adapter) : db_adapter_(db_adapter) { + RequestHandler::RequestHandler(std::shared_ptr db_adapter, + std::shared_ptr sql_loader) + : db_adapter_(db_adapter) + , sql_loader_(sql_loader) + { std::string api_key; std::string secret_key; - binance_client_ = std::make_unique(api_key, secret_key); + binance_client_ = std::make_unique(api_key, secret_key); } + void RequestHandler::InitDataBase() const { + auto query = sql_loader_->GetSQL("create_table_crypto_klines"s); + db_adapter_->Execute(query.data()); + } + + bool RequestHandler::IsTableExists([[maybe_unused]] const std::string& table) const { + RespFromQuery result; + const auto query = sql_loader_->GetSQL("select_exists_crypto_klines"s); + + auto pqxx_ptr = dynamic_cast(db_adapter_.get()); + if ( pqxx_ptr != nullptr) { + result = pqxx_ptr->Query(query.data()); + } + + return result[0][0] == "t"; + } + void RequestHandler::ExecuteQuery(const std::string& query) const { db_adapter_->Execute(query); } - std::vector RequestHandler::GetKlines(const std::string& symbol, const std::string& interval, uint64_t start_time, uint64_t end_time) const{ + std::vector RequestHandler::GetKlines(const std::string& symbol, const std::string& interval, uint64_t start_time, uint64_t end_time) const{ auto result = binance_client_->GetKlines(symbol, interval, start_time, end_time); return result; } std::uint64_t RequestHandler::GetTimeFromBinance() const { - std::uint64_t server_time = 0; - - std::istringstream in_str(binance_client_->GetServerTime()); - - in_str >> server_time; - - return server_time; + return StrToUint64(binance_client_->GetServerTime()); } std::uint64_t RequestHandler::BinanceTimeToEpochUnixTime(std::uint64_t binance_time) const { @@ -42,25 +57,136 @@ namespace fhc::server { krn::year_month_day ymd{krn::year{year}, krn::month{month}, krn::day{day}}; auto days_since_epoch = krn::sys_days{ymd}; - auto time_point = days_since_epoch + 0h + 0min + 0s; - return krn::duration_cast(time_point.time_since_epoch()).count(); + auto time_point = days_since_epoch + 0h + 0min; + return krn::duration_cast(time_point.time_since_epoch()).count(); + } + + krn::year_month_day RequestHandler::StrDateToEpochUnixTime(const std::string& str_date) const { + krn::year_month_day ymd; + std::istringstream iss(str_date); + + iss >> krn::parse("%F", ymd); + return ymd; } - void RequestHandler::SendToFront(TimeStamp timestamp_start, TimeStamp timestamp_end) const { - auto start_time = EpochUnixTimeToBinanceTime(GetTimeSinceEpoch(timestamp_start)); - auto end_time = EpochUnixTimeToBinanceTime(GetTimeSinceEpoch(timestamp_end)); + std::uint64_t RequestHandler::StrStartDateToEpochUnixTime(const std::string& str_start_date) const { + krn::year_month_day ymd = StrDateToEpochUnixTime(str_start_date); + + return krn::duration_cast(krn::time_point((krn::sys_days{ymd}) + 0h + 0min) + .time_since_epoch()).count(); + } + + std::uint64_t RequestHandler::StrEndDateToEpochUnixTime(const std::string& str_end_date) const { + krn::year_month_day ymd = StrDateToEpochUnixTime(str_end_date); + + return krn::duration_cast(krn::time_point((krn::sys_days{ymd}) + 23h + 59min + 59s) + .time_since_epoch()).count(); + } + + std::uint64_t RequestHandler::StrToUint64(const std::string& str) const { + std::istringstream s(str); + return *std::istream_iterator(s); + } + + std::string RequestHandler::UpdateKlinesInStorage() const { + RespFromQuery result; + const auto query = sql_loader_->GetSQL("select_last_kline"s); + + auto pqxx_ptr = dynamic_cast(db_adapter_.get()); + if ( pqxx_ptr != nullptr) { + result = pqxx_ptr->Query(query.data()); + } + + // TODO: Add check if result is empty + uint64_t storage_last_open_time = StrToUint64(result[0][0]); + + uint64_t binance_server_time = GetTimeFromBinance(); + + if ((binance_server_time - storage_last_open_time) <= 60000 ) { + return {"Storage and Binance has equal data. Doesn't need updates."}; + } + + int count_insert_klines = 0; + std::string symbol_crypto{"BTCUSDT"s}; + std::string interval{"1m"s}; + + // TODO: Replace all magic number on the constant + for (auto start_time = storage_last_open_time + 60000; // + 60000 to avoid taking the current kline again in the request + start_time < binance_server_time; + start_time += 59999999) { + + using namespace std; + + // TODO: Replace all magic number on the constant + auto end_time = start_time + 60000000; + auto result = GetKlines(symbol_crypto, interval, start_time, end_time); + + for (const auto& kline : result) { + // TODO: Switch this to prepared statemens (libpqxx). + // Looks it INSERT INTO crypto_klines (..., ...) VALUES ($1, $2, $3, $...) + auto insert_query = sql_loader_->GetSQL("insert_kline"s); + try { + pqxx_ptr->ExecuteWithParams(insert_query.data(), + kline.symbol, + to_string(kline.open_time), + to_string(kline.open), + to_string(kline.high), + to_string(kline.low), + to_string(kline.close), + to_string(kline.volume), + to_string(kline.close_time), + to_string(kline.quote_asset_volume), + to_string(kline.trades_count), + to_string(kline.taker_buy_base_volume), + to_string(kline.taker_buy_quote_volume)); + } catch (pqxx::failure& e) { + std::cout << e.what() << std::endl; + } + + count_insert_klines++; + + std::cout << "Insert " << count_insert_klines << " " << + kline.symbol << " " << + to_string(kline.open_time) << " " << + to_string(kline.open) << " " << + to_string(kline.high) << " " << + to_string(kline.low) << " " << + to_string(kline.close) << " " << + to_string(kline.volume) << " " << + to_string(kline.close_time) << " " << + to_string(kline.quote_asset_volume) << " " << + to_string(kline.trades_count) << " " << + to_string(kline.taker_buy_base_volume) << " " << + to_string(kline.taker_buy_quote_volume) << std::endl; + } + } + + return {"Insert klines " + std::to_string(count_insert_klines)}; + } + + flatbuffers::FlatBufferBuilder RequestHandler::GetKlinesFromStorage(const std::string& timestamp_start, const std::string& timestamp_end) const { + // TODO: What's going on here? + auto start_time = EpochUnixTimeToBinanceTime(BinanceTimeToEpochUnixTime(StrStartDateToEpochUnixTime(timestamp_start))); + auto end_time = EpochUnixTimeToBinanceTime(BinanceTimeToEpochUnixTime(StrEndDateToEpochUnixTime(timestamp_end))); // TODO: In futuring need use pqxx::esc for blocking sql injection - std::string query = ("SELECT * FROM crypto_klines " - "WHERE open_time >= " + std::to_string(start_time) + - " AND close_time <= " + std::to_string(end_time)); - auto result = db_adapter_->Query(query); + const auto query = sql_loader_->GetSQL("select_klines"s); + RespFromQuery result; + + // TODO: Remove dynamic_cast + if (auto pqxx_ptr = dynamic_cast(db_adapter_.get()); pqxx_ptr != nullptr) { + result = pqxx_ptr->Query(query.data(), std::to_string(start_time), std::to_string(end_time)); + } - // Starting serialization data + return CreateFlatBuffer(result); + } + + flatbuffers::FlatBufferBuilder RequestHandler::CreateFlatBuffer(const RespFromQuery& resp) const { + // Starting serialization data flatbuffers::FlatBufferBuilder builder(1024); std::vector> kline_offsets; - for (const auto& row : result) { + for (const auto& row : resp) { auto symbol = builder.CreateString(row[1]); auto kline = marketdata::CreateKline( builder, @@ -87,45 +213,70 @@ namespace fhc::server { builder.Finish(kline_list); - // Ready to save in the file or send over network - uint8_t* buf = builder.GetBufferPointer(); - int size = builder.GetSize(); + return builder; } - void RequestHandler::PopulateDatabase() const { - std::string symbol_crypto{"BTCUSDT"}; - std::string interval{"1m"}; + void RequestHandler::PopulateDatabase(TimeStamp timestamp) const { + // TODO: Separate it into a unique entity + std::string symbol_crypto{"BTCUSDT"s}; + std::string interval{"1m"s}; - auto since_begins_year_2025 = EpochUnixTimeToBinanceTime(GetTimeSinceEpoch({2025, 1, 1})); + auto since_begins_year_2025 = GetTimeSinceEpoch(timestamp); auto current_milliseconds = GetTimeFromBinance(); + // TODO: Remove dynamic_cast + auto pqxx_adapter_ptr = dynamic_cast(db_adapter_.get()); + if (pqxx_adapter_ptr == nullptr) { + std::cerr << "Db adapter is not Pqxx Adapter!" << std::endl; + } + + int count_insert_klines = 0; // TODO: Replace all magic number on the constant for (long long unsigned int start_time = since_begins_year_2025; start_time < current_milliseconds; start_time += 59999999) { + using namespace std; + + // TODO: Replace all magic number on the constant auto end_time = start_time + 60000000; auto result = GetKlines(symbol_crypto, interval, start_time, end_time); for (const auto& kline : result) { - // TODO: Switch this to prepared statemens (libpqxx). - // Looks it INSERT INTO crypto_klines (..., ...) VALUES ($1, $2, $3, $...) - std::string query = - "INSERT INTO crypto_klines " - "(symbol, open_time, open, high, low, close, volume, close_time, " - "quote_asset_volume, trades_count, taker_buy_base_volume, taker_buy_quote_volume) " - "VALUES ('" + kline.symbol + "'," + - std::to_string(kline.open_time) + "," + - std::to_string(kline.open) + "," + - std::to_string(kline.high) + "," + - std::to_string(kline.low) + "," + - std::to_string(kline.close) + "," + - std::to_string(kline.volume) + "," + - std::to_string(kline.close_time) + "," + - std::to_string(kline.quote_asset_volume) + "," + - std::to_string(kline.trades_count) + "," + - std::to_string(kline.taker_buy_base_volume) + "," + - std::to_string(kline.taker_buy_quote_volume) + - ");"; - std::cout << query << std::endl; - ExecuteQuery(query); + // TODO: Switch this to prepared statemens (libpqxx). + // Looks it INSERT INTO crypto_klines (..., ...) VALUES ($1, $2, $3, $...) + auto query = sql_loader_->GetSQL("insert_kline"s); + try { + pqxx_adapter_ptr->ExecuteWithParams(query.data(), + kline.symbol, + to_string(kline.open_time), + to_string(kline.open), + to_string(kline.high), + to_string(kline.low), + to_string(kline.close), + to_string(kline.volume), + to_string(kline.close_time), + to_string(kline.quote_asset_volume), + to_string(kline.trades_count), + to_string(kline.taker_buy_base_volume), + to_string(kline.taker_buy_quote_volume) + ); + } catch (pqxx::failure& e) { + std::cout << e.what() << std::endl; + } + + std::cout << "Insert " << count_insert_klines << " " << + kline.symbol << " " << + to_string(kline.open_time) << " " << + to_string(kline.open) << " " << + to_string(kline.high) << " " << + to_string(kline.low) << " " << + to_string(kline.close) << " " << + to_string(kline.volume) << " " << + to_string(kline.close_time) << " " << + to_string(kline.quote_asset_volume) << " " << + to_string(kline.trades_count) << " " << + to_string(kline.taker_buy_base_volume) << " " << + to_string(kline.taker_buy_quote_volume) << std::endl; + + count_insert_klines++; } } } diff --git a/src/server/server.cpp b/src/server/server.cpp index 53b25d6..6401b64 100644 --- a/src/server/server.cpp +++ b/src/server/server.cpp @@ -4,9 +4,30 @@ namespace fhc::server { using namespace std::literals; void Server::Init() { - std::string options = "host=localhost dbname=fhc user=postgres password=kis123Oits"s; + utils::Config config = utils::ReadConfig(); + std::string options(config()); + db_ = std::make_unique(options); - handler_ = std::make_unique(db_->GetAdapter()); + + http_svr_ = std::make_unique(); + sql_loader_ = std::make_shared(); + + request_handler_ = std::make_unique(db_->GetAdapter(), sql_loader_); + + sql_loader_->LoadSQL("create_table_crypto_klines"s); + sql_loader_->LoadSQL("select_klines"s); + sql_loader_->LoadSQL("insert_kline"s); + sql_loader_->LoadSQL("select_last_kline"s); + sql_loader_->LoadSQL("select_exists_crypto_klines"s); + + // TODO: Read settings from config + http_svr_->Init({config.http_host, config.http_port}); + + AddHandlerToHttpSvr(); + } + + void Server::SetMountPoint(const std::string& root, const std::string& path_to_files) const { + http_svr_->SetMountPoint(root, path_to_files); } void Server::Run() { @@ -14,46 +35,64 @@ namespace fhc::server { if (db_ != nullptr) { db_->Connect(); } - + + if (!request_handler_->IsTableExists("crypto_klines"s)) { + request_handler_->InitDataBase(); + request_handler_->PopulateDatabase({2025, 8, 1}); + } + + http_svr_->Run(); + // TODO: Some need in future running_ = true; - handler_->SendToFront({2025, 10, 01}, {2025, 10, 30}); - - // TODO: Add select when this run - // InitializationDatabase(); - // handler_->PopulateDatabase(); } - void Server::Stop() { + void Server::Stop() { // TODO: Add throw and catch exceptions if db_ is nullptr if (db_ != nullptr) { db_->Disconnect(); } + + if (http_svr_ != nullptr) { + http_svr_->Stop(); + } + + running_ = false; } bool Server::IsRun() const { return running_; } - void Server::InitializationDatabase() { - std::string create_table = R"( - CREATE TABLE crypto_klines ( - id SERIAL PRIMARY KEY, - symbol VARCHAR(10) NOT NULL, -- торговая пара, например BTCUSDT - open_time BIGINT NOT NULL, -- время открытия свечи (в мс) - open NUMERIC(18,8) NOT NULL, -- цена открытия - high NUMERIC(18,8) NOT NULL, -- максимальная цена - low NUMERIC(18,8) NOT NULL, -- минимальная цена - close NUMERIC(18,8) NOT NULL, -- цена закрытия - volume NUMERIC(24,10) NOT NULL, -- объём торгов - close_time BIGINT NOT NULL, -- время закрытия свечи (в мс) - quote_asset_volume NUMERIC(24,10) NOT NULL, -- объём в котируемом активе - trades_count INTEGER NOT NULL, -- количество сделок - taker_buy_base_volume NUMERIC(24,10) NOT NULL, -- объём покупок у тейкеров (в базовом активе) - taker_buy_quote_volume NUMERIC(24,10) NOT NULL, -- объём покупок у тейкеров (в котируемом активе) - UNIQUE (symbol, open_time) -- уникальность по паре и времени открытия - ); - )"; - handler_->ExecuteQuery(create_table); + + flatbuffers::FlatBufferBuilder Server::GetKlinesFromStorage(const std::string& start_date, const std::string& end_date) const { + return request_handler_->GetKlinesFromStorage(start_date, end_date); + } + + std::string Server::UpdateKlinesInStorage() const { + return request_handler_->UpdateKlinesInStorage(); + } + + void Server::SetHandler(const std::string& path, HTTPHandler http_handler) { + handlers_[path] = http_handler; + } + + void Server::RemoveHandler(const std::string& path) { + // TODO: Needs add substitution handler with response 404 when deleted handler in Server hash-map + if (auto it_find_path = handlers_.find(path); it_find_path != handlers_.end()) { + handlers_.erase(path); + } + } + + void Server::AddHandlerToHttpSvr() const { + if (!handlers_.empty()) { + for (const auto& [path, http_handler] : handlers_) { + switch (http_handler.http_method) { + case HTTP_METHOD::GET: + http_svr_->Get(path, http_handler.handler, http_handler.content_type); + break; + } + } + } } } diff --git a/src/server/utils.cpp b/src/server/utils.cpp new file mode 100644 index 0000000..9eb51fa --- /dev/null +++ b/src/server/utils.cpp @@ -0,0 +1,34 @@ +#include "../../include/server/utils.hpp" +#include + +namespace fhc::server::utils { + Config ReadConfig() { + std::unordered_map result; + + auto parent_dir = std::filesystem::current_path().parent_path().parent_path(); + + + std::ifstream file_config(CMAKE_CONFIG_PATH + CONFIG_FILE, std::ios::in); + + if (!file_config.is_open()) { + std::cerr << "File " << CONFIG_FILE << " doesn't opened!" << std::endl; + return {}; + } + + std::string line; + while (std::getline(file_config, line)) { + result[line.substr(0, line.find_first_of(SIGN_EQUAL))] = line.substr(line.find_first_of(SIGN_EQUAL) + 1, line.size() - 1); + } + + Config config{ + .host = result["DB_HOST"], + .name = result["DB_NAME"], + .user = result["DB_USER"], + .password = result["DB_PASSWORD"], + .http_host = result["HTTP_HOST"], + .http_port = std::stoi(result["HTTP_PORT"]) + }; + + return config; + } +} \ No newline at end of file