Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 45 additions & 5 deletions examples/python/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,47 @@ def handle_tile_request(tile: mapget.TileFeatureLayer):
# Add an attribute layer
attr_layer: mapget.Object = feature.attribute_layers().new_layer("rules")
attr: mapget.Attribute = attr_layer.new_attribute("SPEED_LIMIT")
# TODO: Add Python bindings for validities.
# attr.set_direction(mapget.Direction.POSITIVE)
attr.validity().new_offset_range(
mapget.ValidityGeometryOffsetType.RELATIVE_LENGTH,
0.0,
1.0,
direction=mapget.Direction.POSITIVE)
attr.add_field("speedLimit", 50)

# Add a child feature ID
# TODO: Add Python bindings for relations.
# feature.children().append(tile.new_feature_id("Way", [("wayId", 10)]))
# Add a feature relation
target = tile.new_feature_id("Way", [("wayId", 10)])
relation = feature.add_relation("successor", target)
relation.source_validity().new_complete()

# Attach source-data provenance.
refs = tile.new_source_data_references([
("RawWayLayer", "primary", mapget.SourceDataAddress(0, 128))
])
feature.set_source_data_references(refs)


def handle_source_data_request(tile: mapget.TileSourceDataLayer):
compound = tile.new_compound(2)
compound.set_schema_name("example.RawWay")
compound.set_source_data_address(mapget.SourceDataAddress(0, 128))
compound.add_field("wayId", 0)
compound.add_field("name", "Main St.")
tile.add_root(compound)


def handle_locate_request(request: mapget.LocateRequest):
response = mapget.LocateResponse(request)
response.tile_key = mapget.MapTileKey(
mapget.LayerType.FEATURES,
request.map_id,
"WayLayer",
mapget.TileId(12345),
0)
return [response]


def handle_cache_expired(tile_key: mapget.MapTileKey, expired_at_us: int):
print(f"Cached tile expired: {tile_key} at {expired_at_us}us.")


# Instantiate a data source with a minimal mandatory set
Expand All @@ -71,12 +105,18 @@ def handle_tile_request(tile: mapget.TileFeatureLayer):
}]]
}
]
},
"RawWayLayer": {
"type": "SourceData"
}
}, "mapId": "TestMap"
})

# Set the callback which is invoked when a tile is requested.
ds.on_tile_feature_request(handle_tile_request)
ds.on_tile_sourcedata_request(handle_source_data_request)
ds.on_locate_request(handle_locate_request)
ds.on_cache_expired(handle_cache_expired)

# Parse port as optional first argument
port = 0 # Pick random free port
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ class RemoteDataSource : public DataSource
DataSourceInfo const& info,
TileLayer::LoadStateCallback loadStateCallback = {}) override;
std::vector<LocateResponse> locate(const mapget::LocateRequest &req) override;
void onCacheExpired(
MapTileKey const& tileKey,
std::chrono::system_clock::time_point expiredAt) override;

private:
// DataSourceInfo is fetched in the constructor
Expand Down Expand Up @@ -97,6 +100,9 @@ class RemoteDataSourceProcess : public DataSource
DataSourceInfo const& info,
TileLayer::LoadStateCallback loadStateCallback = {}) override;
std::vector<LocateResponse> locate(const mapget::LocateRequest &req) override;
void onCacheExpired(
MapTileKey const& tileKey,
std::chrono::system_clock::time_point expiredAt) override;

private:
std::unique_ptr<RemoteDataSource> remoteSource_;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <chrono>
#include <functional>
#include "mapget/model/sourcedatalayer.h"
#include "mapget/model/featurelayer.h"
Expand Down Expand Up @@ -44,6 +45,13 @@ class DataSourceServer : public HttpServer
DataSourceServer&
onLocateRequest(std::function<std::vector<LocateResponse>(LocateRequest const&)> const&);

/**
* Set the callback which will be invoked when a service reports that a
* cached tile for this remote datasource expired and is being refreshed.
*/
DataSourceServer& onCacheExpired(
std::function<void(MapTileKey const&, std::chrono::system_clock::time_point)> const&);

/**
* Get the DataSourceInfo metadata which this instance was constructed with.
*/
Expand Down
36 changes: 36 additions & 0 deletions libs/http-datasource/src/datasource-client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,33 @@ std::vector<LocateResponse> RemoteDataSource::locate(const LocateRequest& req)
return responseVector;
}

void RemoteDataSource::onCacheExpired(
MapTileKey const& tileKey,
std::chrono::system_clock::time_point expiredAt)
{
auto& client = httpClients_[(nextClient_++) % httpClients_.size()];

auto cacheExpiredReq = drogon::HttpRequest::newHttpRequest();
cacheExpiredReq->setMethod(drogon::Post);
cacheExpiredReq->setPath("/cache-expired");
cacheExpiredReq->setContentTypeCode(drogon::CT_APPLICATION_JSON);
cacheExpiredReq->setBody(nlohmann::json{
{"tileKey", tileKey.toString()},
{"expiredAt", std::chrono::duration_cast<std::chrono::microseconds>(
expiredAt.time_since_epoch()).count()},
}.dump());

auto [resultCode, response] = client->sendRequest(cacheExpiredReq);
if (resultCode != drogon::ReqResult::Ok || !response || (int)response->statusCode() >= 300) {
log().warn(
"Failed to notify remote data source about cache expiry for {}: {}",
tileKey.toString(),
resultCode != drogon::ReqResult::Ok
? drogon::to_string(resultCode)
: response ? fmt::format("HTTP {}", (int)response->statusCode()) : "No remote response");
}
}

std::shared_ptr<RemoteDataSource> RemoteDataSource::fromHostPort(const std::string& hostPort)
{
auto delimiterPos = hostPort.find(':');
Expand Down Expand Up @@ -269,4 +296,13 @@ std::vector<LocateResponse> RemoteDataSourceProcess::locate(const LocateRequest&
return remoteSource_->locate(req);
}

void RemoteDataSourceProcess::onCacheExpired(
MapTileKey const& tileKey,
std::chrono::system_clock::time_point expiredAt)
{
if (!remoteSource_)
raise("Remote data source is not initialized.");
remoteSource_->onCacheExpired(tileKey, expiredAt);
}

}
37 changes: 37 additions & 0 deletions libs/http-datasource/src/datasource-server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <drogon/HttpAppFramework.h>
#include <drogon/HttpResponse.h>

#include <chrono>
#include <memory>
#include <stdexcept>
#include <string>
Expand All @@ -26,6 +27,7 @@
throw std::runtime_error("TileSourceDataLayer callback is unset!");
};
std::function<std::vector<LocateResponse>(const LocateRequest&)> locateCallback_;
std::function<void(MapTileKey const&, std::chrono::system_clock::time_point)> cacheExpiredCallback_;
std::shared_ptr<StringPool> strings_;

explicit Impl(DataSourceInfo info) : info_(std::move(info)), strings_(std::make_shared<StringPool>(info_.nodeId_))
Expand Down Expand Up @@ -59,6 +61,13 @@
return *this;
}

DataSourceServer& DataSourceServer::onCacheExpired(
const std::function<void(MapTileKey const&, std::chrono::system_clock::time_point)>& callback)
{
impl_->cacheExpiredCallback_ = callback;
return *this;
}

DataSourceInfo const& DataSourceServer::info() { return impl_->info_; }

void DataSourceServer::setup(drogon::HttpAppFramework& app)
Expand Down Expand Up @@ -194,6 +203,34 @@
}
},
{drogon::Post});

app.registerHandler(
"/cache-expired",
[this](const drogon::HttpRequestPtr& req, std::function<void(const drogon::HttpResponsePtr&)>&& callback)

Check warning on line 209 in libs/http-datasource/src/datasource-server.cpp

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

This lambda has 23 lines, which is greater than the 20 lines authorized. Split it into several lambdas or functions, or make it a named function.

See more on https://sonarcloud.io/project/issues?id=ndsev_mapget&issues=AZ4H0svRqmm_xGZ5OHun&open=AZ4H0svRqmm_xGZ5OHun&pullRequest=164
{
try {
if (impl_->cacheExpiredCallback_) {
auto const body = nlohmann::json::parse(std::string(req->body()));
auto const tileKey = MapTileKey(body.at("tileKey").get<std::string>());
auto const expiredAtUs = body.at("expiredAt").get<int64_t>();
auto const expiredAt = std::chrono::system_clock::time_point{
std::chrono::microseconds{expiredAtUs}};
impl_->cacheExpiredCallback_(tileKey, expiredAt);
}

auto resp = drogon::HttpResponse::newHttpResponse();
resp->setStatusCode(drogon::k204NoContent);
callback(resp);
}
catch (std::exception const& e) {

Check warning on line 225 in libs/http-datasource/src/datasource-server.cpp

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Catch a more specific exception instead of a generic one.

See more on https://sonarcloud.io/project/issues?id=ndsev_mapget&issues=AZ4H0svRqmm_xGZ5OHuo&open=AZ4H0svRqmm_xGZ5OHuo&pullRequest=164
auto resp = drogon::HttpResponse::newHttpResponse();
resp->setStatusCode(drogon::k400BadRequest);
resp->setContentTypeCode(drogon::CT_TEXT_PLAIN);
resp->setBody(std::string("Invalid request: ") + e.what());
callback(resp);
}
},
{drogon::Post});
}

} // namespace mapget
Loading
Loading