1
0
Fork 0
Univerxel/src/client/net/Client.cpp

197 lines
7.0 KiB
C++

#include "Client.hpp"
#include <thread>
using namespace net::client;
namespace net::client {
int connection_callback(picoquic_cnx_t* cnx,
uint64_t stream_id, uint8_t* bytes, size_t length,
picoquic_call_back_event_t fin_or_event, void* callback_ctx, void* v_stream_ctx)
{
auto client = (Client*)callback_ctx;
assert(cnx == nullptr || client->contains(cnx) || fin_or_event == picoquic_callback_close);
(void)cnx;
assert(v_stream_ctx == NULL || ((net::stream_ctx*)v_stream_ctx)->stream_id == stream_id);
return client->connectionCallback(stream_id, bytes, length, fin_or_event, v_stream_ctx);
}
}
Client::Client(const net::address& ct,
std::function<bool(const data::out_view&, net::PacketFlags)> onPacket):
Context(nullptr, nullptr), Connection(nullptr, true, queue::count), onPacket(onPacket)
{
const char *sni = NULL;
sockaddr_storage server_address;
if (const auto addr = ct.toAddress()) {
server_address = addr.value().first;
if (addr.value().second) {
sni = ct.host.c_str();
}
} else {
FATAL("Invalid server address format");
}
LOG_D("Connecting to " << ct);
Connection::setup(Context::getHandle(), (struct sockaddr*) &server_address, sni, connection_callback, this);
openSockets(0, server_address.ss_family);
while (!connected && !disconnected) {
pull();
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
if (!connected || disconnected) {
FATAL("Could not contact server at " << ct);
}
}
Client::~Client() {
Connection::release((uint16_t)disconnect_reason::QUIT);
while (connected && !disconnected) {
pull();
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
int Client::connectionCallback(uint64_t stream_id, uint8_t* bytes, size_t length,
picoquic_call_back_event_t fin_or_event, void* v_stream_ctx) {
switch (fin_or_event) {
case picoquic_callback_stream_data:
case picoquic_callback_stream_fin: {
assert(stream_ctx::IsServerId(stream_id));
auto stream_ctx = (in_stream_ctx *)v_stream_ctx;
const auto is_fin = fin_or_event == picoquic_callback_stream_fin;
if (stream_ctx == NULL) {
if (is_fin) { // Single frame packet
if (length > 0) {
onPacket(data::out_view(bytes, length), PacketFlags::TINY);
}
reset(stream_id);
break;
}
// New long stream from server
stream_ctx = receive(stream_id);
}
if (length > 0) {
stream_ctx->buffer.write(bytes, length);
}
if (is_fin) {
if (onPacket(data::out_view(stream_ctx->buffer.data.data(), stream_ctx->buffer.data.size()), PacketFlags::NONE)) {
close(stream_ctx);
} else {
LOG_E("??");
return -1;
}
}
break;
}
case picoquic_callback_datagram:
if(!onPacket(data::out_view(bytes, length), PacketFlags::DATAGRAM)) {
return -1;
}
break;
case picoquic_callback_stop_sending: /* Should not happen, treated as reset */
/* Mark stream as abandoned, close the file, etc. */
Connection::reset(stream_id);
/* Fall through */
case picoquic_callback_stream_reset: /* Server reset stream #x */ {
assert(stream_ctx::IsUnidirId(stream_id));
// auto remote_error = picoquic_get_remote_stream_error(cnx, stream_id);
//FIXME: if remote_error callback onError(remote_error)
if (stream_ctx::IsClientId(stream_id)) {
auto stream_ctx = (out_stream_ctx*)v_stream_ctx;
if (stream_ctx == NULL) {
close(stream_ctx);
}
} else {
auto stream_ctx = (in_stream_ctx*)v_stream_ctx;
if (stream_ctx == NULL) {
close(stream_ctx);
}
}
break;
}
case picoquic_callback_stateless_reset:
case picoquic_callback_close: /* Received connection close */
case picoquic_callback_application_close: /* Received application close */ {
struct quit_notify { server_packet_type type; bool is_application; uint16_t reason; };
const auto is_app = fin_or_event == picoquic_callback_application_close;
const auto quit = quit_notify{server_packet_type::QUIT, is_app, getErrorCode(is_app)};
if(onPacket(data::out_view((const uint8_t*)&quit, sizeof(quit)), PacketFlags::DATAGRAM)) {
LOG_W("Connection closed");
disconnected = true;
setCallback(NULL, NULL);
}
break;
}
case picoquic_callback_version_negotiation:
/* The client did not get the right version.
* TODO: some form of negotiation?
*/
LOG_W("Bad protocol version");
break;
case picoquic_callback_stream_gap:
/* This callback is never used. */
break;
case picoquic_callback_prepare_to_send: {
/* Active sending API */
assert(stream_ctx::IsClientId(stream_id));
auto stream_ctx = (out_stream_ctx*)v_stream_ctx;
if (stream_ctx == NULL) {
return -1;
}
if (!stream_ctx->buffer.isDone()) {
size_t available = stream_ctx->buffer.remaining();
int is_fin = 1;
/* The length parameter marks the space available in the packet */
if (available > length) {
available = length;
is_fin = 0;
}
uint8_t* buffer = picoquic_provide_stream_data_buffer(bytes, available, is_fin, !is_fin);
if (buffer == NULL) {
return -1;
}
// MAYBE: try zero copy
stream_ctx->buffer.read(buffer, available);
} else {
LOG_W("Out stream reset");
reset(stream_id);
}
if (stream_ctx->buffer.isDone()) {
//MAYBE: reuse
close(stream_ctx);
}
break;
}
case picoquic_callback_almost_ready:
LOG_D("Connection to the server completed, almost ready");
break;
case picoquic_callback_ready:
/* TODO: Check that the transport parameters are what the sample expects */
LOG_I("Connected to server");
connected = true;
break;
default:
//MAYBE: picoquic_callback_request_alpn_list
//MAYBE: picoquic_callback_set_alpn
//MAYBE: picoquic_callback_pacing_changed
/* unexpected -- just ignore. */
break;
}
return 0;
}