1
0
Fork 0
Univerxel/src/server/Server.cpp

489 lines
19 KiB
C++

#include "./Server.hpp"
#include "world/Universe.hpp"
#include <signal.h>
#include <chrono>
#include <thread>
#include <Tracy.hpp>
#include "./world/client.hpp"
#undef LOG_PREFIX
#define LOG_PREFIX "Server: "
#ifndef STANDALONE_SERVER
constexpr bool CAN_SHARE = true;
#else
constexpr bool CAN_SHARE = false;
#endif
using namespace world;
namespace ws = world::server;
Server::Server(config::server::options& options): options(options), host(options.connection,
[&](net::server::Peer* peer) { return onConnect(peer); },
[&](net::server::Peer* peer, bool is_app, uint16_t reason) { return onDisconnect(peer, is_app, reason); },
[&](net::server::Peer* peer, const memory::read_view &buf, net::PacketFlags flags) { return onPacket(peer, buf, flags); }
), serializer(options.world.folderPath),
localHandle(CAN_SHARE && options.allowLocal ? new server_handle() : nullptr) { }
Server::~Server() {
LOG_W("Stopped");
}
static bool running = true;
void handle_signal(int) { running = false; }
void Server::run() {
signal(SIGINT, handle_signal);
signal(SIGTERM, handle_signal);
universe = std::make_unique<ws::Universe>(options.world, &serializer, localHandle);
if (localHandle) {
localHandle->emit = [&](const world::action::packet &packet) { onPacket(packet); };
localHandle->entityId = universe->addPlayer();
{
const auto entry = universe->findHierarchy(localHandle->entityId); assert(entry);
localHandle->teleport = world::relative_transform{entry->parent, entry->relative};
}
localHandle->running = true;
}
auto lastUpdate = std::chrono::steady_clock::now();
const auto deltaTime = [&](const std::chrono::_V2::steady_clock::time_point &now) {
std::chrono::duration<float, std::ratio<1>> delta = now - lastUpdate;
lastUpdate = now;
return delta.count();
};
while (running && (!localHandle || localHandle->running)) {
FrameMarkStart("Server");
const auto tickStart = std::chrono::steady_clock::now();
universe->update(deltaTime(tickStart));
universe->upgrade();
broadcastEntitiesChanges(*universe->setElements());
{
size_t upt = 1;
const size_t UPT = options.upt;
const auto nextTick = tickStart + std::chrono::milliseconds(1000 / options.tps);
const auto uptDelay = std::chrono::milliseconds(1000 / options.tps / UPT);
do {
pull();
const auto now = std::chrono::steady_clock::now();
if (now >= nextTick)
break;
if (upt < UPT && now >= lastUpdate + uptDelay) {
universe->update(deltaTime(now));
upt++;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
} while (true);
}
FrameMarkEnd("Server");
}
broadcastMessage("> Server is closing");
host.sendBroadcast(memory::read_buffer::Of(net::server_packet_type::QUIT, net::disconnect_reason::CLOSE));
for (size_t i = 0; i < 20; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000 / 20));
host.pull();
}
host.close((uint16_t)net::disconnect_reason::CLOSE);
if (localHandle) {
universe->removePlayer(localHandle->entityId);
}
universe.reset();
}
void Server::pull() {
ZoneScopedN("Network");
host.pull();
// Send pending data
host.iterPeers([&](net::server::Peer *peer) {
auto ctx = peer->getCtx<ws::client>();
if (!ctx)
return;
auto& edits = ctx->pending.edits;
constexpr auto PARALLEL_EDITS = 1;
if (!edits.empty() && peer->queueSize(net::server::queue::EDIT) <= PARALLEL_EDITS - 1) {
peer->send(memory::read_buffer::Of(net::server_packet_type::EDITS, edits.front()));
edits.pop();
}
auto& chunks = ctx->pending.chunks;
if (!chunks.empty()) {
//TODO: use congestion
constexpr auto PARALLEL_CHUNKS = 5;
size_t i = peer->queueSize(net::server::queue::CHUNK);
node_chunk_pos pending{generational::id(), chunk_pos(0)};
while(i <= (PARALLEL_CHUNKS - 1) && chunks.pop(pending)) {
if (const auto chunk = universe->findChunk(pending)) {
peer->send(serializer.writeChunk(pending, chunk), net::server::queue::CHUNK);
i++;
}
}
if (chunks.empty())
peer->send(serializer.writeChunkReset()); //FIXME: must be received after last chunk
}
});
}
bool Server::onConnect(net::server::Peer* peer) {
ZoneScopedN("Connect");
LOG_I("Client connect from " << peer->getAddress());
return true;
}
bool Server::onDisconnect(net::server::Peer *peer, bool is_app, uint16_t reason) {
ZoneScopedN("Disconnect");
LOG_I((is_app ? "Client disconnect from " : "Client connection lost from ") << peer->getAddress() << " (" << reason << ')');
if (auto ctx = peer->getCtx<ws::client>()) {
//FIXME: save inventory, pos...
universe->removePlayer(ctx->entityId);
broadcastMessage("> " + ctx->name + " has left our universe");
delete ctx;
peer->ctx = nullptr;
}
return true;
}
bool Server::onPacket(net::server::Peer *peer, const memory::read_view &buf, net::PacketFlags) {
ZoneScopedN("Packet");
using namespace net;
auto packet = memory::read_buffer(buf, nullptr);
client_packet_type type;
if(!packet.read(type)) {
LOG_D("Empty packet from " << peer->getAddress());
return false;
}
const auto ctx = peer->getCtx<ws::client>();
if (!ctx) { // Not logged in
//TODO: disconnect unauthorized
if (type != net::client_packet_type::HELLO)
return false;
uint8_t type;
if (!packet.read(type))
return false;
switch (type) {
case 0: { // Register
//TODO: check player index and get id
const auto rem = packet.readRemaining();
std::string name((const char*)rem.data());
if (name.size() == 0 || (name.size() != rem.size() && name.size() != rem.size()-1))
return false;
peer->ctx = new ws::client(0, name, universe->addPlayer());
break;
}
//TODO: handle login
default:
return false;
}
peer->send(serializer.writeCapabilities(options.world.loadDistance, options.world.floodFillLimit));
//TODO: lock while not received
peer->send(serializer.writeDict(), net::server::queue::CHUNK);
const auto ctx = peer->getCtx<ws::client>();
{
const auto entry = universe->findHierarchy(ctx->entityId); assert(entry);
peer->send(serializer.writeTeleport(ctx->entityId, world::relative_transform{entry->parent, entry->relative}));
}
broadcastMessage("> " + ctx->name + " has joined us");
if (const auto entities = serializeEntitiesFrom(*universe->getElements()); entities.ptr) {
peer->send(entities, net::server::queue::ENTITY);
}
return true;
}
switch (type) {
case client_packet_type::CAPABILITIES: {
packet.read(ctx->caps.handleEdits);
/*if (!PREDICTABLE && ctx->handleEdits) {
LOG_E("Client misread capabilities");
}*/
break;
}
case client_packet_type::MOVE: {
if (relative_pos pos; !packet.read(pos) ||
!universe->movePlayer(ctx->entityId, pos)) {
LOG_T("Bad move");
const auto entry = universe->findHierarchy(ctx->entityId); assert(entry);
peer->send(serializer.writeTeleport(ctx->entityId, world::relative_transform{entry->parent, entry->relative}));
}
break;
}
case client_packet_type::FILL_SHAPE: {
if(const auto fill = packet.read<action::FillShape>()) {
fillShape(*fill);
} else {
LOG_T("Bad fill");
}
break;
}
case client_packet_type::MESSAGE: {
const auto ref = packet.readRemaining();
broadcastMessage(ctx->name + ": " + std::string((const char*)ref.data(), ref.size()));
break;
}
case client_packet_type::REQUEST_REGIONS: {
if (auto player = universe->getPlayer(ctx->entityId)) {
area_id id;
if (!packet.read(id))
break;
if (auto node = universe->getElements()->findArea(id)) {
region_pos rpos;
auto regions = node->get()->getRegions();
const auto pos = player->absolute.position;
while (!packet.isDone() && packet.read(rpos)) {
if (auto it_r = regions->find(rpos); it_r != regions->end()) {
const auto off = glm::divide(node->absolute.computeChild(glm::multiply(it_r->first, glm::lvec3(REGION_LENGTH * CHUNK_LENGTH))) - pos);
if (glm::length2(off) <= glm::pow2(options.world.loadDistance + REGION_LENGTH * 2)) {
peer->send(serializer.writeRegion(area_region_ref(id, rpos), it_r->second), net::server::CHUNK);
} else {
LOG_T("Request out of range region");
}
} else {
LOG_T("Requested region not found");
}
}
} else {
LOG_T("Bad region request");
}
}
break;
}
case client_packet_type::REQUEST_CHUNKS: {
if (auto player = universe->getPlayer(ctx->entityId)) {
const auto pos = player->absolute.position;
node_id id;
if (!packet.read<node_id>(id))
break;
const auto elements = universe->getElements();
if (ws::Elements::Is<ws::Elements::Type::Area>(elements->withFlag(id))) {
if (auto area = elements->findArea(id.val)) {
auto &chunks = area->get()->getChunks();
chunk_pos cpos;
for (size_t i = 0; !packet.isDone() && i < MAX_PENDING_CHUNK_COUNT && packet.read(cpos); i++) {
const auto dist = glm::length2(glm::divide(area->absolute.computeChild(glm::multiply(cpos)) - pos));
if (dist <= glm::pow2(options.world.loadDistance) && chunks.inRange(cpos)) {
if (chunks.findInRange(cpos) != nullptr)
ctx->pending.chunks.push(node_chunk_pos{id, cpos}, dist);
//MAYBE: else notify client
} else {
LOG_T("Request out of range chunk");
}
}
} else {
LOG_T("Bad chunk request");
}
} else if (auto part = elements->findPart(id.val)) {
auto size = part->get();
const auto dist = glm::length2(glm::divide(part->absolute.position - pos));
if (dist <= glm::pow2(options.world.loadDistance)) {
chunk_pos cpos;
for (size_t i = 0; !packet.isDone() && i < MAX_PENDING_CHUNK_COUNT && packet.read(cpos); i++) {
if (dist <= glm::pow2(options.world.loadDistance) && size->inRange(cpos)) {
ctx->pending.chunks.push(node_chunk_pos{id, cpos}, dist);
} else {
LOG_T("Request out of range chunk");
}
}
} else {
LOG_T("Request out of range part");
}
} else {
LOG_T("Bad chunk request");
}
if (ctx->pending.chunks.empty())
peer->send(serializer.writeChunkReset());
}
break;
}
case client_packet_type::REQUEST_ENTITIES: {
node_id root;
if (!packet.read(root))
break;
LOG_E("Unimplemented");
break;
}
default:
LOG_T("Bad packet from " << peer->getAddress() << " " << (int)type);
break;
}
return true;
}
void Server::onPacket(const world::action::packet &packet) {
ZoneScopedN("Local Packet");
assert(localHandle);
if(const auto fill = std::get_if<world::action::FillShape>(&packet)) {
fillShape(*fill);
} else if(const auto message = std::get_if<world::action::Message>(&packet)) {
broadcastMessage(localHandle->playerName + ": " + message->text);
} else if(const auto move = std::get_if<world::action::Move>(&packet)) {
if(!universe->movePlayer(localHandle->entityId, move->pos)) {
LOG_W("Bad move of local player");
}
} else {
LOG_W("Bad packet from local client " << packet.index());
}
}
void Server::broadcastMessage(const std::string& msg) {
if (CAN_SHARE && localHandle) {
localHandle->messages.emplace(msg);
}
host.sendBroadcast(serializer.writerMessage(msg));
}
void Server::broadcastEntitiesChanges(world::server::Elements& l) {
memory::write_buffer packet(net::server_packet_type::ENTITIES, sizeof(node_id));
l.nodes.iter_empty([&] (const node_id& id) {
if (Elements::Has<Elements::Flag::Removed>(id)) {
const auto flagId = Elements::Set<Elements::Flag::Added, false>(
Elements::Set<Elements::Flag::Moved, false>(id));
packet.push(flagId);
l.nodes.set_flag(Elements::Set<Elements::Flag::Removed, false>(id));
}
});
l.hierarchy.iter([&](const Elements::hierarchy_entry &entry) {
const auto id = l.withFlag(entry.self);
assert(id);
const auto added = Elements::Has<Elements::Flag::Added>(id);
if (Elements::Has<Elements::Flag::Moved>(id) || added) {
packet.push(id);
packet.push(ws::Elements::start_point(entry.parent, entry.relative, entry.vel));
l.nodes.set_flag(Elements::Set<Elements::Flag::Added, false>(
Elements::Set<Elements::Flag::Moved, false>(id)));
}
if (added) {
switch (Elements::GetType(id)) {
case Elements::Type::Area: {
const auto area = l.findArea(id.val)->get();
packet.push(Area::params{area->getParams().radius, area->getCurvature()});
break;
}
case Elements::Type::Part:
packet.push(l.findPart(id.val)->get()->size);
break;
case Elements::Type::Instance:
packet.push(NodeOf<Instance>::Make(l.findNode(id))->get()->type);
break;
default:
break;
}
}
});
if (packet.isFull())
host.sendBroadcast(packet.finish(), net::server::queue::ENTITY);
#if TRACY_ENABLE
int64_t queued = 0;
host.iterPeers([&](net::server::Peer *peer) { queued += peer->queueSize(net::server::queue::ENTITY); });
TracyPlot("QueuedEntities", queued);
#endif
}
memory::read_buffer Server::serializeEntitiesFrom(const world::server::Elements& l, node_id root) {
memory::write_buffer packet(net::server_packet_type::ENTITIES, sizeof(node_id));
generational::id::idx_t from = 0;
generational::id::idx_t count = l.hierarchy.size();
if (const auto root_entry = l.hierarchy.get(root)) {
for (node_id cur = root_entry->parent; cur;) { // Read parents in correct order
const auto node = l.nodes.directly_at(cur);
if (!node)
break;
const auto entry = l.hierarchy.get(cur);
if (!entry)
break;
const auto flagCur = Elements::Set<Elements::Flag::Added, false>(
Elements::Set<Elements::Flag::Moved, true>(cur));
packet.push(flagCur);
packet.push(ws::Elements::start_point{entry->parent, entry->relative, entry->vel});
cur = entry->parent;
}
from = l.hierarchy.find(root).value().val;
count = root_entry->childs;
} else if (root) { // Not in hierarchy
return memory::read_buffer(nullptr, 0);
}
for (size_t i = 0; i < count; i++) {
const auto entry = l.hierarchy.at(from + i);
assert(entry);
const auto id = l.withFlag(entry->self);
const auto node = l.nodes.directly_at(id);
assert(node);
const auto flagId = Elements::Set<Elements::Flag::Added, true>(
Elements::Set<Elements::Flag::Moved, false>(id));
packet.push(flagId);
packet.push(ws::Elements::start_point(entry->parent, entry->relative, entry->vel));
switch (Elements::GetType(id)) {
case Elements::Type::Area: {
const auto area = NodeOf<ws::Area>::Make(node)->get();
packet.push(Area::params{area->getParams().radius, area->getCurvature()});
break;
}
case Elements::Type::Part:
packet.push(NodeOf<Part>::Make(node)->get()->size);
break;
case Elements::Type::Instance:
packet.push(NodeOf<Instance>::Make(node)->get()->type);
break;
default:
break;
}
}
return packet.finish();
}
void Server::fillShape(const world::action::FillShape& fill) {
//TODO: check ray and tool
if (fill.val.is_solid() && !universe->isRangeFree(fill.pos, geometry::Volume{action::ToGeometry(fill.shape), fill.radius})) {
LOG_T("Entity in solid fill area");
return;
}
if (const auto id = universe->getElements()->withFlag(fill.pos.node); !Elements::Is<Elements::Type::Area>(id)) {
LOG_W("Part edit not supported for now");
}
bool stupidClient = false;
host.iterPeers([&](net::server::Peer *peer) {
//MAYBE: only in range
if (const auto ctx = peer->getCtx<ws::client>()) {
if (ctx->caps.handleEdits)
ctx->pending.edits.push(fill);
else
stupidClient = true;
}
});
if (stupidClient) {
ws::fill_edits edits;
universe->fill(fill, &edits);
if (!edits.empty()) {
ZoneScopedN("Stupidity");
auto buffer = serializer.writeEdits(fill.pos.node, edits);
host.iterPeers([&](net::server::Peer *peer) {
//MAYBE: only in range
const auto ctx = peer->getCtx<ws::client>();
if (ctx && !ctx->caps.handleEdits)
peer->send(buffer, net::server::queue::CHUNK);
});
}
} else {
universe->fill(fill);
}
//TODO: handle inventory
}