Revert "First attempt on getting rid of the socket server"
This reverts commit 91d37082ab.
This commit is contained in:
parent
91d37082ab
commit
a6e26f46d6
17 changed files with 162 additions and 253 deletions
1
external/android-emugl/CMakeLists.txt
vendored
1
external/android-emugl/CMakeLists.txt
vendored
|
|
@ -3,7 +3,6 @@
|
|||
set(CMAKE_C_FLAGS "-Wall")
|
||||
|
||||
include_directories(
|
||||
${CMAKE_SOURCE_DIR}/src
|
||||
${CMAKE_SOURCE_DIR}/external/android-emugl/shared
|
||||
${CMAKE_SOURCE_DIR}/external/android-emugl/host/include
|
||||
${CMAKE_SOURCE_DIR}/external/android-emugl/shared/OpenglCodecCommon
|
||||
|
|
|
|||
|
|
@ -21,8 +21,6 @@
|
|||
|
||||
#include "ErrorLog.h"
|
||||
|
||||
#include "anbox/logger.h"
|
||||
|
||||
class IOStream {
|
||||
public:
|
||||
|
||||
|
|
@ -72,7 +70,7 @@ public:
|
|||
}
|
||||
|
||||
int flush() {
|
||||
DEBUG("buf %p free %d buf size %d", m_buf, m_free, m_bufsize);
|
||||
|
||||
if (!m_buf || m_free == m_bufsize) return 0;
|
||||
|
||||
int stat = commitBuffer(m_bufsize - m_free);
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ add_custom_command(
|
|||
set(SOURCES
|
||||
GLESv1Decoder.cpp)
|
||||
|
||||
if ("${cmake_build_type_lower}" STREQUAL "debug")
|
||||
if ("${cmake_build_type_lower}" STREQUAL "trace")
|
||||
set(OPENGL_DEBUG "-DOPENGL_DEBUG_PRINTOUT -DCHECK_GL_ERROR")
|
||||
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${OPENGL_DEBUG}")
|
||||
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${OPENGL_DEBUG}")
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ add_custom_command(
|
|||
set(SOURCES
|
||||
GLESv2Decoder.cpp)
|
||||
|
||||
if ("${cmake_build_type_lower}" STREQUAL "debug")
|
||||
if ("${cmake_build_type_lower}" STREQUAL "trace")
|
||||
set(OPENGL_DEBUG "-DOPENGL_DEBUG_PRINTOUT -DCHECK_GL_ERROR")
|
||||
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${OPENGL_DEBUG}")
|
||||
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${OPENGL_DEBUG}")
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ add_custom_command(
|
|||
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
|
||||
DEPENDS emugen)
|
||||
|
||||
if ("${cmake_build_type_lower}" STREQUAL "debug")
|
||||
if ("${cmake_build_type_lower}" STREQUAL "trace")
|
||||
set(OPENGL_DEBUG "-DOPENGL_DEBUG_PRINTOUT -DCHECK_GL_ERROR")
|
||||
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${OPENGL_DEBUG}")
|
||||
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${OPENGL_DEBUG}")
|
||||
|
|
|
|||
|
|
@ -255,17 +255,12 @@ void ColorBuffer::readPixels(int x,
|
|||
GLenum p_format,
|
||||
GLenum p_type,
|
||||
void* pixels) {
|
||||
DEBUG("");
|
||||
|
||||
ScopedHelperContext context(m_helper);
|
||||
if (!context.isOk()) {
|
||||
return;
|
||||
}
|
||||
|
||||
DEBUG("Before bind");
|
||||
|
||||
if (bindFbo(&m_fbo, m_tex)) {
|
||||
DEBUG("Off to GL to read pixels");
|
||||
s_gles2.glReadPixels(x, y, width, height, p_format, p_type, pixels);
|
||||
unbindFbo();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -545,9 +545,7 @@ HandleType FrameBuffer::genHandle()
|
|||
HandleType FrameBuffer::createColorBuffer(int p_width, int p_height,
|
||||
GLenum p_internalFormat)
|
||||
{
|
||||
DEBUG("width %d height %d", p_width, p_height);
|
||||
emugl::Mutex::AutoLock mutex(m_lock);
|
||||
DEBUG("Got lock");
|
||||
HandleType ret = 0;
|
||||
|
||||
ColorBufferPtr cb(ColorBuffer::create(
|
||||
|
|
@ -750,16 +748,10 @@ void FrameBuffer::readColorBuffer(HandleType p_colorbuffer,
|
|||
int x, int y, int width, int height,
|
||||
GLenum format, GLenum type, void *pixels)
|
||||
{
|
||||
DEBUG("handle %d x %d y %d width %d height %d",
|
||||
p_colorbuffer, x, y, width, height);
|
||||
|
||||
emugl::Mutex::AutoLock mutex(m_lock);
|
||||
|
||||
DEBUG("Got lock");
|
||||
|
||||
ColorBufferMap::iterator c( m_colorbuffers.find(p_colorbuffer) );
|
||||
if (c == m_colorbuffers.end()) {
|
||||
DEBUG("Didn't found color buffer");
|
||||
// bad colorbuffer handle
|
||||
return;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -73,8 +73,8 @@ LayerManager::~LayerManager() {
|
|||
}
|
||||
|
||||
void LayerManager::post_layer(const LayerInfo &layer) {
|
||||
// if (is_layer_blacklisted(layer.name))
|
||||
// return;
|
||||
if (is_layer_blacklisted(layer.name))
|
||||
return;
|
||||
|
||||
FrameBufferWindow *window = nullptr;
|
||||
for (auto &l : layers_) {
|
||||
|
|
|
|||
|
|
@ -79,6 +79,61 @@ RENDER_APICALL int RENDER_APIENTRY initOpenGLRenderer(
|
|||
set_emugl_crash_reporter(crashfunc);
|
||||
set_emugl_logger(logfuncs.coarse);
|
||||
set_emugl_cxt_logger(logfuncs.fine);
|
||||
//
|
||||
// Fail if renderer is already initialized
|
||||
//
|
||||
if (s_renderThread) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// kUseThread is used to determine whether the RenderWindow should use
|
||||
// a separate thread to manage its subwindow GL/GLES context.
|
||||
// For now, this feature is disabled entirely for the following
|
||||
// reasons:
|
||||
//
|
||||
// - It must be disabled on Windows at all times, otherwise the main window becomes
|
||||
// unresponsive after a few seconds of user interaction (e.g. trying to
|
||||
// move it over the desktop). Probably due to the subtle issues around
|
||||
// input on this platform (input-queue is global, message-queue is
|
||||
// per-thread). Also, this messes considerably the display of the
|
||||
// main window when running the executable under Wine.
|
||||
//
|
||||
// - On Linux/XGL and OSX/Cocoa, this used to be necessary to avoid corruption
|
||||
// issues with the GL state of the main window when using the SDL UI.
|
||||
// After the switch to Qt, this is no longer necessary and may actually cause
|
||||
// undesired interactions between the UI thread and the RenderWindow thread:
|
||||
// for example, in a multi-monitor setup the context might be recreated when
|
||||
// dragging the window between monitors, triggering a Qt-specific callback
|
||||
// in the context of RenderWindow thread, which will become blocked on the UI
|
||||
// thread, which may in turn be blocked on something else.
|
||||
bool kUseThread = false;
|
||||
|
||||
//
|
||||
// initialize the renderer and listen to connections
|
||||
// on a thread in the current process.
|
||||
//
|
||||
s_renderWindow = new RenderWindow(native_display, kUseThread);
|
||||
if (!s_renderWindow) {
|
||||
ERR("Could not create rendering window class");
|
||||
GL_LOG("Could not create rendering window class");
|
||||
return false;
|
||||
}
|
||||
if (!s_renderWindow->isValid()) {
|
||||
ERR("Could not initialize emulated framebuffer\n");
|
||||
delete s_renderWindow;
|
||||
s_renderWindow = NULL;
|
||||
return false;
|
||||
}
|
||||
|
||||
s_renderThread = RenderServer::create(addr, addrLen);
|
||||
if (!s_renderThread) {
|
||||
return false;
|
||||
}
|
||||
strncpy(s_renderAddr, addr, sizeof(s_renderAddr));
|
||||
|
||||
s_renderThread->start();
|
||||
|
||||
GL_LOG("OpenGL renderer initialized successfully");
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -25,8 +25,6 @@
|
|||
|
||||
#include "OpenGLESDispatch/EGLDispatch.h"
|
||||
|
||||
#include "anbox/logger.h"
|
||||
|
||||
#include <map>
|
||||
#include <string>
|
||||
|
||||
|
|
@ -238,7 +236,6 @@ static void rcDestroyWindowSurface(uint32_t windowSurface)
|
|||
static uint32_t rcCreateColorBuffer(uint32_t width,
|
||||
uint32_t height, GLenum internalFormat)
|
||||
{
|
||||
DEBUG("");
|
||||
FrameBuffer *fb = FrameBuffer::getFB();
|
||||
if (!fb) {
|
||||
return 0;
|
||||
|
|
@ -354,8 +351,6 @@ static void rcReadColorBuffer(uint32_t colorBuffer,
|
|||
GLint width, GLint height,
|
||||
GLenum format, GLenum type, void* pixels)
|
||||
{
|
||||
DEBUG("");
|
||||
|
||||
FrameBuffer *fb = FrameBuffer::getFB();
|
||||
if (!fb) {
|
||||
return;
|
||||
|
|
|
|||
|
|
@ -26,19 +26,15 @@
|
|||
#include "OpenGLESDispatch/GLESv1Dispatch.h"
|
||||
#include "../../../shared/OpenglCodecCommon/ChecksumCalculatorThreadInfo.h"
|
||||
|
||||
#include "anbox/logger.h"
|
||||
|
||||
#define STREAM_BUFFER_SIZE 4*1024*1024
|
||||
|
||||
RenderThread::RenderThread(IOStream *stream, emugl::Mutex *lock) :
|
||||
emugl::Thread(),
|
||||
m_lock(lock),
|
||||
m_stream(stream) {
|
||||
DEBUG("");
|
||||
}
|
||||
m_stream(stream) {}
|
||||
|
||||
RenderThread::~RenderThread() {
|
||||
DEBUG("");
|
||||
delete m_stream;
|
||||
}
|
||||
|
||||
// static
|
||||
|
|
@ -47,75 +43,78 @@ RenderThread* RenderThread::create(IOStream *stream, emugl::Mutex *lock) {
|
|||
}
|
||||
|
||||
void RenderThread::forceStop() {
|
||||
DEBUG("");
|
||||
m_stream->forceStop();
|
||||
}
|
||||
|
||||
intptr_t RenderThread::main() {
|
||||
RenderThreadInfo tInfo;
|
||||
// Not used below but will store a reference of itself in TLS so that
|
||||
// it can be accessed down the stack in the same thread when decoding
|
||||
// any of the commands.
|
||||
ChecksumCalculatorThreadInfo tChecksumInfo;
|
||||
|
||||
//
|
||||
// initialize decoders
|
||||
//
|
||||
tInfo.m_glDec.initGL(gles1_dispatch_get_proc_func, NULL);
|
||||
tInfo.m_gl2Dec.initGL(gles2_dispatch_get_proc_func, NULL);
|
||||
initRenderControlContext(&tInfo.m_rcDec);
|
||||
|
||||
ReadBuffer readBuf(STREAM_BUFFER_SIZE);
|
||||
|
||||
DEBUG("Started");
|
||||
|
||||
while (1) {
|
||||
|
||||
int stat = readBuf.getData(m_stream);
|
||||
if (stat <= 0) {
|
||||
DEBUG("Connection closed");
|
||||
break;
|
||||
}
|
||||
|
||||
DEBUG("Got %d bytes for decoding", readBuf.validData());
|
||||
|
||||
bool progress;
|
||||
do {
|
||||
progress = false;
|
||||
m_lock->lock();
|
||||
DEBUG("Locked");
|
||||
|
||||
m_lock->lock();
|
||||
//
|
||||
// try to process some of the command buffer using the GLESv1 decoder
|
||||
//
|
||||
size_t last = tInfo.m_glDec.decode(readBuf.buf(), readBuf.validData(), m_stream);
|
||||
if (last > 0) {
|
||||
DEBUG("Ran GL commands");
|
||||
progress = true;
|
||||
readBuf.consume(last);
|
||||
}
|
||||
|
||||
//
|
||||
// try to process some of the command buffer using the GLESv2 decoder
|
||||
//
|
||||
last = tInfo.m_gl2Dec.decode(readBuf.buf(), readBuf.validData(), m_stream);
|
||||
if (last > 0) {
|
||||
DEBUG("Ran GL2 commands");
|
||||
progress = true;
|
||||
readBuf.consume(last);
|
||||
}
|
||||
|
||||
//
|
||||
// try to process some of the command buffer using the
|
||||
// renderControl decoder
|
||||
//
|
||||
last = tInfo.m_rcDec.decode(readBuf.buf(), readBuf.validData(), m_stream);
|
||||
if (last > 0) {
|
||||
DEBUG("Ran RC commands");
|
||||
readBuf.consume(last);
|
||||
progress = true;
|
||||
}
|
||||
|
||||
m_lock->unlock();
|
||||
DEBUG("Unlocked");
|
||||
} while (progress);
|
||||
|
||||
} while( progress );
|
||||
|
||||
}
|
||||
|
||||
DEBUG("Shutting down");
|
||||
|
||||
//
|
||||
// Release references to the current thread's context/surfaces if any
|
||||
//
|
||||
FrameBuffer::getFB()->bindContext(0, 0, 0);
|
||||
if (tInfo.currContext || tInfo.currDrawSurf || tInfo.currReadSurf)
|
||||
ERROR("Exiting with current context/surfaces");
|
||||
if (tInfo.currContext || tInfo.currDrawSurf || tInfo.currReadSurf) {
|
||||
fprintf(stderr, "ERROR: RenderThread exiting with current context/surfaces\n");
|
||||
}
|
||||
|
||||
FrameBuffer::getFB()->drainWindowSurface();
|
||||
|
||||
FrameBuffer::getFB()->drainRenderContext();
|
||||
|
||||
return 0;
|
||||
|
|
|
|||
|
|
@ -71,9 +71,6 @@ void GLRendererServer::start() {
|
|||
log_funcs.coarse = logger_write;
|
||||
log_funcs.fine = logger_write;
|
||||
|
||||
// HACK: This will do nothing but set our log functions
|
||||
initOpenGLRenderer(nullptr, nullptr, 0, log_funcs, logger_write);
|
||||
|
||||
FrameBuffer::initialize(window_creator_->native_display());
|
||||
}
|
||||
} // namespace graphics
|
||||
|
|
|
|||
|
|
@ -26,227 +26,122 @@
|
|||
|
||||
#include <condition_variable>
|
||||
#include <queue>
|
||||
#include <functional>
|
||||
|
||||
namespace {
|
||||
constexpr const size_t default_buffer_size{384};
|
||||
constexpr const size_t max_send_buffer_size{1024};
|
||||
|
||||
class DelayedIOStream : public IOStream {
|
||||
class DirectIOStream : public IOStream {
|
||||
public:
|
||||
typedef std::vector<char> Buffer;
|
||||
|
||||
explicit DelayedIOStream(const std::shared_ptr<anbox::network::SocketMessenger> &messenger,
|
||||
size_t buffer_size = default_buffer_size) :
|
||||
explicit DirectIOStream(const std::shared_ptr<anbox::network::SocketMessenger> &messenger,
|
||||
const size_t &buffer_size = 10000) :
|
||||
IOStream(buffer_size),
|
||||
messenger_(messenger) {
|
||||
// writer_thread_(std::bind(&DelayedIOStream::worker_thread, this)) {
|
||||
}
|
||||
|
||||
virtual ~DelayedIOStream() {
|
||||
DEBUG("");
|
||||
forceStop();
|
||||
DEBUG("Shutting down");
|
||||
virtual ~DirectIOStream() {
|
||||
if (send_buffer_ != nullptr) {
|
||||
free(send_buffer_);
|
||||
send_buffer_ = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
void* allocBuffer(size_t min_size) override {
|
||||
DEBUG("min size %d", min_size);
|
||||
if (buffer_.size() < min_size)
|
||||
buffer_.resize(min_size);
|
||||
return buffer_.data();
|
||||
size_t size = (send_buffer_size_ < min_size ? min_size : send_buffer_size_);
|
||||
if (!send_buffer_)
|
||||
send_buffer_ = (unsigned char *) malloc(size);
|
||||
else if (send_buffer_size_ < size) {
|
||||
unsigned char *p = (unsigned char *)realloc(send_buffer_, size);
|
||||
if (p != NULL) {
|
||||
send_buffer_ = p;
|
||||
send_buffer_size_ = size;
|
||||
} else {
|
||||
free(send_buffer_);
|
||||
send_buffer_ = NULL;
|
||||
send_buffer_size_ = 0;
|
||||
}
|
||||
}
|
||||
return send_buffer_;
|
||||
}
|
||||
|
||||
int commitBuffer(size_t size) override {
|
||||
DEBUG("size %d", size);
|
||||
std::unique_lock<std::mutex> l(read_mutex_);
|
||||
|
||||
#if 0
|
||||
if (buffer_.capacity() <= 2 * size) {
|
||||
buffer_.resize(size);
|
||||
out_queue_.push(std::move(buffer_));
|
||||
} else {
|
||||
out_queue_.push(Buffer(buffer_.data(), buffer_.data() + size));
|
||||
}
|
||||
DEBUG("Submitted data into output queue (%d bytes)", size);
|
||||
can_write_.notify_all();
|
||||
#else
|
||||
ssize_t bytes_left = size;
|
||||
while (bytes_left > 0) {
|
||||
const ssize_t written = messenger_->send_raw(buffer_.data() + (size - bytes_left), bytes_left);
|
||||
if (written < 0 ) {
|
||||
if (errno != EINTR) {
|
||||
ERROR("Failed to write data: %s", std::strerror(errno));
|
||||
break;
|
||||
}
|
||||
WARNING("Socket busy, trying again");
|
||||
} else
|
||||
bytes_left -= written;
|
||||
}
|
||||
|
||||
DEBUG("Sent data to remote (%d bytes)", buffer_.size());
|
||||
#endif
|
||||
return static_cast<int>(size);
|
||||
messenger_->send(reinterpret_cast<const char*>(send_buffer_), size);
|
||||
return size;
|
||||
}
|
||||
|
||||
const unsigned char* readFully(void *buffer, size_t length) override {
|
||||
size_t size = length;
|
||||
auto data = read(buffer, &size);
|
||||
if (size < length)
|
||||
return nullptr;
|
||||
return data;
|
||||
}
|
||||
|
||||
const unsigned char* read(void *buffer, size_t *length) override {
|
||||
std::unique_lock<std::mutex> l(read_mutex_);
|
||||
|
||||
if (stopped_) {
|
||||
DEBUG("Aborting");
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
if (current_read_buffer_left_ == 0 && in_queue_.empty()) {
|
||||
DEBUG("Waiting for data to be available");
|
||||
can_read_.wait(l);
|
||||
|
||||
if (stopped_) {
|
||||
DEBUG("Aborting");
|
||||
return nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
DEBUG("Trying to read %d bytes", *length);
|
||||
size_t read = 0;
|
||||
auto buf = static_cast<unsigned char*>(buffer);
|
||||
const auto buffer_end = buf + *length;
|
||||
while (buf != buffer_end) {
|
||||
if (current_read_buffer_left_ == 0) {
|
||||
// If we don't have anymore buffers we need to stop reading here
|
||||
if (in_queue_.empty())
|
||||
break;
|
||||
|
||||
current_read_buffer_ = in_queue_.front();
|
||||
in_queue_.pop();
|
||||
current_read_buffer_left_ = current_read_buffer_.size();
|
||||
}
|
||||
|
||||
const size_t current_size = std::min<size_t>(buffer_end - buf,
|
||||
current_read_buffer_left_);
|
||||
::memcpy(buffer, current_read_buffer_.data() +
|
||||
(current_read_buffer_.size() - current_read_buffer_left_),
|
||||
current_size);
|
||||
|
||||
read += current_size;
|
||||
buf += current_size;
|
||||
current_read_buffer_left_ -= current_size;
|
||||
|
||||
DEBUG("Size %d, left to read %d", current_size, current_read_buffer_left_);
|
||||
}
|
||||
|
||||
if (read == 0)
|
||||
return nullptr;
|
||||
|
||||
*length = read;
|
||||
|
||||
DEBUG("Read %d bytes (buffers left %d)", read, in_queue_.size());
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
||||
int writeFully(const void *buffer, size_t length) override {
|
||||
(void) buffer;
|
||||
(void) length;
|
||||
const unsigned char* readFully(void*, size_t) override {
|
||||
ERROR("Not implemented");
|
||||
return -1;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
const unsigned char* read(void *data, size_t *size) override {
|
||||
if (!wait_for_data() || buffer_.size() == 0) {
|
||||
*size = 0;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
auto bytes_to_read = *size;
|
||||
if (bytes_to_read > buffer_.size())
|
||||
bytes_to_read = buffer_.size();
|
||||
|
||||
::memcpy(data, buffer_.data(), bytes_to_read);
|
||||
buffer_.erase(buffer_.begin(), buffer_.begin() + bytes_to_read);
|
||||
|
||||
*size = bytes_to_read;
|
||||
|
||||
return static_cast<const unsigned char*>(data);
|
||||
}
|
||||
|
||||
int writeFully(const void*, size_t) override {
|
||||
ERROR("Not implemented");
|
||||
return 0;
|
||||
}
|
||||
|
||||
void forceStop() override {
|
||||
DEBUG("");
|
||||
stopped_ = true;
|
||||
can_read_.notify_all();
|
||||
can_write_.notify_all();
|
||||
std::unique_lock<std::mutex> l(mutex_);
|
||||
buffer_.clear();
|
||||
}
|
||||
|
||||
void post_data(const Buffer &buffer) {
|
||||
DEBUG("Got data and waiting for lock");
|
||||
std::unique_lock<std::mutex> l(read_mutex_);
|
||||
DEBUG("Received %d bytes", buffer.size());
|
||||
in_queue_.push(std::move(buffer));
|
||||
can_read_.notify_all();
|
||||
void submitData(const std::vector<std::uint8_t> &data) {
|
||||
std::unique_lock<std::mutex> l(mutex_);
|
||||
for (const auto &byte : data)
|
||||
buffer_.push_back(byte);
|
||||
// buffer_.insert(buffer_.end(), data.begin(), data.end());
|
||||
lock_.notify_one();
|
||||
}
|
||||
|
||||
private:
|
||||
void worker_thread() {
|
||||
DEBUG("Running send thread");
|
||||
while (true) {
|
||||
std::unique_lock<std::mutex> l(write_mutex_);
|
||||
while (out_queue_.empty() && !stopped_) {
|
||||
can_write_.wait(l, [&]() { return !out_queue_.empty() || stopped_; });
|
||||
DEBUG("Woke up (queue size %d)", out_queue_.size());
|
||||
}
|
||||
bool wait_for_data() {
|
||||
std::unique_lock<std::mutex> l(mutex_);
|
||||
|
||||
if (stopped_)
|
||||
break;
|
||||
if (!l.owns_lock())
|
||||
return false;
|
||||
|
||||
DEBUG("Going to send out %d bytes", out_queue_.front().size());
|
||||
|
||||
auto buffer = out_queue_.front();
|
||||
out_queue_.pop();
|
||||
|
||||
ssize_t bytes_left = buffer.size();
|
||||
while (bytes_left > 0) {
|
||||
const ssize_t written = messenger_->send_raw(buffer.data() + (buffer.size() - bytes_left), bytes_left);
|
||||
if (written < 0 ) {
|
||||
if (errno != EINTR) {
|
||||
ERROR("Failed to write data: %s", std::strerror(errno));
|
||||
break;
|
||||
}
|
||||
WARNING("Socket busy, trying again");
|
||||
} else
|
||||
bytes_left -= written;
|
||||
}
|
||||
DEBUG("Sent %d bytes to client (queue size %d)", buffer.size(), out_queue_.size());
|
||||
}
|
||||
|
||||
DEBUG("Shutting down");
|
||||
lock_.wait(l, [&]() { return !buffer_.empty(); });
|
||||
return true;
|
||||
}
|
||||
|
||||
std::shared_ptr<anbox::network::SocketMessenger> messenger_;
|
||||
Buffer buffer_;
|
||||
std::thread writer_thread_;
|
||||
std::queue<Buffer> out_queue_;
|
||||
Buffer current_write_buffer_;
|
||||
size_t current_write_buffer_left_ = 0;
|
||||
std::queue<Buffer> in_queue_;
|
||||
Buffer current_read_buffer_;
|
||||
size_t current_read_buffer_left_ = 0;
|
||||
std::mutex write_mutex_;
|
||||
std::mutex read_mutex_;
|
||||
std::condition_variable can_write_;
|
||||
std::condition_variable can_read_;
|
||||
bool stopped_ = false;
|
||||
std::mutex mutex_;
|
||||
std::condition_variable lock_;
|
||||
std::vector<std::uint8_t> buffer_;
|
||||
unsigned char *send_buffer_ = nullptr;
|
||||
size_t send_buffer_size_ = 0;
|
||||
};
|
||||
}
|
||||
|
||||
namespace anbox {
|
||||
namespace graphics {
|
||||
emugl::Mutex OpenGlesMessageProcessor::global_lock{};
|
||||
static int next_id = 0;
|
||||
|
||||
OpenGlesMessageProcessor::OpenGlesMessageProcessor(const std::shared_ptr<network::SocketMessenger> &messenger) :
|
||||
messenger_(messenger),
|
||||
id_(next_id++),
|
||||
stream_(std::make_shared<DelayedIOStream>(messenger_)) {
|
||||
stream_(std::make_shared<DirectIOStream>(messenger_)),
|
||||
renderer_(RenderThread::create(stream_.get(), &global_lock)) {
|
||||
|
||||
// We have to read the client flags first before we can continue
|
||||
// processing the actual commands
|
||||
unsigned int client_flags = 0;
|
||||
auto err = messenger_->receive_msg(boost::asio::buffer(&client_flags, sizeof(unsigned int)));
|
||||
if (err)
|
||||
ERROR("%s", err.message());
|
||||
std::array<std::uint8_t, sizeof(unsigned int)> buffer;
|
||||
messenger_->receive_msg(boost::asio::buffer(buffer));
|
||||
|
||||
renderer_.reset(RenderThread::create(stream_.get(), &global_lock));
|
||||
renderer_->start();
|
||||
|
||||
DEBUG("Started new OpenGL ES message processor");
|
||||
}
|
||||
|
||||
OpenGlesMessageProcessor::~OpenGlesMessageProcessor() {
|
||||
|
|
@ -256,9 +151,8 @@ OpenGlesMessageProcessor::~OpenGlesMessageProcessor() {
|
|||
}
|
||||
|
||||
bool OpenGlesMessageProcessor::process_data(const std::vector<std::uint8_t> &data) {
|
||||
DEBUG("[%d] Got %d bytes", id_, data.size());
|
||||
auto stream = std::static_pointer_cast<DelayedIOStream>(stream_);
|
||||
stream->post_data(DelayedIOStream::Buffer(data.data(), data.data() + data.size()));
|
||||
auto stream = std::static_pointer_cast<DirectIOStream>(stream_);
|
||||
stream->submitData(data);
|
||||
return true;
|
||||
}
|
||||
} // namespace graphics
|
||||
|
|
|
|||
|
|
@ -45,7 +45,6 @@ private:
|
|||
static emugl::Mutex global_lock;
|
||||
|
||||
std::shared_ptr<network::SocketMessenger> messenger_;
|
||||
int id_;
|
||||
std::shared_ptr<IOStream> stream_;
|
||||
std::shared_ptr<RenderThread> renderer_;
|
||||
};
|
||||
|
|
|
|||
|
|
@ -74,16 +74,6 @@ Credentials BaseSocketMessenger<stream_protocol>::creds() const {
|
|||
return {cr.pid, cr.uid, cr.gid};
|
||||
}
|
||||
|
||||
template<typename stream_protocol>
|
||||
ssize_t BaseSocketMessenger<stream_protocol>::send_raw(char const* data, size_t length)
|
||||
{
|
||||
VariableLengthArray<serialization_buffer_size> whole_message{length};
|
||||
std::copy(data, data + length, whole_message.data());
|
||||
|
||||
std::unique_lock<std::mutex> lg(message_lock);
|
||||
return ::send(socket_fd, data, length, 0);
|
||||
}
|
||||
|
||||
template<typename stream_protocol>
|
||||
void BaseSocketMessenger<stream_protocol>::send(char const* data, size_t length)
|
||||
{
|
||||
|
|
@ -98,7 +88,6 @@ void BaseSocketMessenger<stream_protocol>::send(char const* data, size_t length)
|
|||
boost::asio::transfer_all());
|
||||
}
|
||||
catch (const boost::system::system_error &err) {
|
||||
DEBUG("Got error: %s", err.what());
|
||||
if (err.code() == boost::asio::error::try_again)
|
||||
continue;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -37,7 +37,6 @@ public:
|
|||
unsigned short local_port() const override;
|
||||
|
||||
void send(char const* data, size_t length) override;
|
||||
ssize_t send_raw(char const* data, size_t length) override;
|
||||
void async_receive_msg(AnboxReadHandler const& handle, boost::asio::mutable_buffers_1 const &buffer) override;
|
||||
boost::system::error_code receive_msg(boost::asio::mutable_buffers_1 const& buffer) override;
|
||||
size_t available_bytes() override;
|
||||
|
|
|
|||
|
|
@ -20,7 +20,6 @@
|
|||
#define ANBOX_NETWORK_MESSAGE_SENDER_H_
|
||||
|
||||
#include <sys/types.h>
|
||||
#include <cstddef>
|
||||
|
||||
namespace anbox {
|
||||
namespace network {
|
||||
|
|
@ -28,7 +27,6 @@ class MessageSender
|
|||
{
|
||||
public:
|
||||
virtual void send(char const* data, size_t length) = 0;
|
||||
virtual ssize_t send_raw(char const* data, size_t length) = 0;
|
||||
|
||||
protected:
|
||||
MessageSender() = default;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue