197 lines
7.0 KiB
C++
197 lines
7.0 KiB
C++
#include "Client.hpp"
|
|
|
|
#include <thread>
|
|
|
|
using namespace net::client;
|
|
|
|
namespace net::client {
|
|
int connection_callback(picoquic_cnx_t* cnx,
|
|
uint64_t stream_id, uint8_t* bytes, size_t length,
|
|
picoquic_call_back_event_t fin_or_event, void* callback_ctx, void* v_stream_ctx)
|
|
{
|
|
auto client = (Client*)callback_ctx;
|
|
assert(cnx == nullptr || client->contains(cnx) || fin_or_event == picoquic_callback_close);
|
|
(void)cnx;
|
|
assert(v_stream_ctx == NULL || ((net::stream_ctx*)v_stream_ctx)->stream_id == stream_id);
|
|
return client->connectionCallback(stream_id, bytes, length, fin_or_event, v_stream_ctx);
|
|
}
|
|
}
|
|
|
|
Client::Client(const address& ct,
|
|
std::function<bool(const data::out_view&, PacketFlags)> onPacket):
|
|
Context(nullptr, nullptr), Connection(nullptr, true, queue::count), onPacket(onPacket)
|
|
{
|
|
const char *sni = NULL;
|
|
sockaddr_storage server_address;
|
|
if (const auto addr = ct.toAddress()) {
|
|
server_address = addr.value().first;
|
|
if (addr.value().second) {
|
|
sni = ct.host.c_str();
|
|
}
|
|
} else {
|
|
FATAL("Invalid server address format");
|
|
}
|
|
|
|
LOG_D("Connecting to " << ct);
|
|
Connection::setup(Context::getHandle(), (struct sockaddr*) &server_address, sni, connection_callback, this);
|
|
|
|
openSockets(0, server_address.ss_family);
|
|
|
|
while (!connected && !disconnected) {
|
|
pull();
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
|
}
|
|
if (!connected || disconnected) {
|
|
FATAL("Could not contact server at " << ct);
|
|
}
|
|
}
|
|
Client::~Client() {
|
|
Connection::release((uint16_t)disconnect_reason::QUIT);
|
|
while (connected && !disconnected) {
|
|
pull();
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
|
}
|
|
}
|
|
|
|
int Client::connectionCallback(uint64_t stream_id, uint8_t* bytes, size_t length,
|
|
picoquic_call_back_event_t fin_or_event, void* v_stream_ctx) {
|
|
|
|
switch (fin_or_event) {
|
|
case picoquic_callback_stream_data:
|
|
case picoquic_callback_stream_fin: {
|
|
assert(stream_ctx::IsServerId(stream_id));
|
|
auto stream_ctx = (in_stream_ctx *)v_stream_ctx;
|
|
const auto is_fin = fin_or_event == picoquic_callback_stream_fin;
|
|
|
|
if (stream_ctx == NULL) {
|
|
if (is_fin) { // Single frame packet
|
|
if (length > 0) {
|
|
onPacket(data::out_view(bytes, length), PacketFlags::TINY);
|
|
}
|
|
reset(stream_id);
|
|
break;
|
|
}
|
|
// New long stream from server
|
|
stream_ctx = receive(stream_id);
|
|
}
|
|
|
|
if (length > 0) {
|
|
stream_ctx->buffer.write(bytes, length);
|
|
}
|
|
|
|
if (is_fin) {
|
|
if (onPacket(data::out_view(stream_ctx->buffer.data.data(), stream_ctx->buffer.data.size()), PacketFlags::NONE)) {
|
|
close(stream_ctx);
|
|
} else {
|
|
LOG_E("??");
|
|
return -1;
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
|
|
case picoquic_callback_datagram:
|
|
if(!onPacket(data::out_view(bytes, length), PacketFlags::DATAGRAM)) {
|
|
return -1;
|
|
}
|
|
break;
|
|
|
|
case picoquic_callback_stop_sending: /* Should not happen, treated as reset */
|
|
/* Mark stream as abandoned, close the file, etc. */
|
|
Connection::reset(stream_id);
|
|
/* Fall through */
|
|
case picoquic_callback_stream_reset: /* Server reset stream #x */ {
|
|
assert(stream_ctx::IsUnidirId(stream_id));
|
|
|
|
// auto remote_error = picoquic_get_remote_stream_error(cnx, stream_id);
|
|
//FIXME: if remote_error callback onError(remote_error)
|
|
if (stream_ctx::IsClientId(stream_id)) {
|
|
auto stream_ctx = (out_stream_ctx*)v_stream_ctx;
|
|
if (stream_ctx == NULL) {
|
|
close(stream_ctx);
|
|
}
|
|
} else {
|
|
auto stream_ctx = (in_stream_ctx*)v_stream_ctx;
|
|
if (stream_ctx == NULL) {
|
|
close(stream_ctx);
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
|
|
case picoquic_callback_stateless_reset:
|
|
case picoquic_callback_close: /* Received connection close */
|
|
case picoquic_callback_application_close: /* Received application close */ {
|
|
struct quit_notify { server_packet_type type; bool is_application; uint16_t reason; };
|
|
const auto is_app = fin_or_event == picoquic_callback_application_close;
|
|
const auto quit = quit_notify{server_packet_type::QUIT, is_app, getErrorCode(is_app)};
|
|
if(onPacket(data::out_view((const uint8_t*)&quit, sizeof(quit)), PacketFlags::DATAGRAM)) {
|
|
LOG_W("Connection closed");
|
|
disconnected = true;
|
|
setCallback(NULL, NULL);
|
|
}
|
|
break;
|
|
}
|
|
case picoquic_callback_version_negotiation:
|
|
/* The client did not get the right version.
|
|
* TODO: some form of negotiation?
|
|
*/
|
|
LOG_W("Bad protocol version");
|
|
break;
|
|
case picoquic_callback_stream_gap:
|
|
/* This callback is never used. */
|
|
break;
|
|
case picoquic_callback_prepare_to_send: {
|
|
/* Active sending API */
|
|
assert(stream_ctx::IsClientId(stream_id));
|
|
|
|
auto stream_ctx = (out_stream_ctx*)v_stream_ctx;
|
|
if (stream_ctx == NULL) {
|
|
return -1;
|
|
}
|
|
|
|
if (!stream_ctx->buffer.isDone()) {
|
|
size_t available = stream_ctx->buffer.remaining();
|
|
int is_fin = 1;
|
|
|
|
/* The length parameter marks the space available in the packet */
|
|
if (available > length) {
|
|
available = length;
|
|
is_fin = 0;
|
|
}
|
|
uint8_t* buffer = picoquic_provide_stream_data_buffer(bytes, available, is_fin, !is_fin);
|
|
if (buffer == NULL) {
|
|
return -1;
|
|
}
|
|
|
|
// MAYBE: try zero copy
|
|
stream_ctx->buffer.read(buffer, available);
|
|
} else {
|
|
LOG_W("Out stream reset");
|
|
reset(stream_id);
|
|
}
|
|
if (stream_ctx->buffer.isDone()) {
|
|
//MAYBE: reuse
|
|
close(stream_ctx);
|
|
}
|
|
break;
|
|
}
|
|
case picoquic_callback_almost_ready:
|
|
LOG_D("Connection to the server completed, almost ready");
|
|
break;
|
|
case picoquic_callback_ready:
|
|
/* TODO: Check that the transport parameters are what the sample expects */
|
|
LOG_I("Connected to server");
|
|
connected = true;
|
|
break;
|
|
default:
|
|
//MAYBE: picoquic_callback_request_alpn_list
|
|
//MAYBE: picoquic_callback_set_alpn
|
|
//MAYBE: picoquic_callback_pacing_changed
|
|
/* unexpected -- just ignore. */
|
|
break;
|
|
}
|
|
|
|
return 0;
|
|
}
|