Wait for RPC call response before we return back to the caller

This commit is contained in:
Simon Fels 2016-06-30 00:10:56 +02:00
commit 520be6e4bc
5 changed files with 197 additions and 16 deletions

View file

@ -44,6 +44,7 @@ set(SOURCES
anbox/common/fd.cpp
anbox/common/fd_sets.h
anbox/common/variable_length_array.h
anbox/common/wait_handle.cpp
anbox/network/message_sender.h
anbox/network/message_receiver.h

View file

@ -62,42 +62,62 @@ void PlatformApiProxy::install(const std::string &path) {
const auto container_path = utils::string_format("%s/%s",
config::container_share_path(), fs::path(path).filename().string());
auto c = std::make_shared<Request<protobuf::bridge::InstallApplication>>();
auto c = std::make_shared<Request<protobuf::bridge::Void>>();
protobuf::bridge::InstallApplication message;
message.set_path(container_path);
{
std::lock_guard<decltype(mutex_)> lock(mutex_);
install_wait_handle_.expect_result();
}
channel_->call_method("install_application",
&message,
c->response.get(),
google::protobuf::NewCallback(this, &PlatformApiProxy::application_installed, c.get()));
install_wait_handle_.wait_for_all();
if (c->response->has_error())
throw std::runtime_error(c->response->error());
}
void PlatformApiProxy::application_installed(Request<protobuf::bridge::InstallApplication> *request) {
DEBUG("");
void PlatformApiProxy::application_installed(Request<protobuf::bridge::Void> *request) {
install_wait_handle_.result_received();
}
void PlatformApiProxy::launch(const std::string &package, const std::string &activity) {
ensure_rpc_channel();
auto c = std::make_shared<Request<protobuf::bridge::LaunchApplication>>();
auto c = std::make_shared<Request<protobuf::bridge::Void>>();
protobuf::bridge::LaunchApplication message;
message.set_package_name(package);
message.set_activity(activity);
{
std::lock_guard<decltype(mutex_)> lock(mutex_);
launch_wait_handle_.expect_result();
}
channel_->call_method("launch_application",
&message,
c->response.get(),
google::protobuf::NewCallback(this, &PlatformApiProxy::application_launched, c.get()));
launch_wait_handle_.wait_for_all();
if (c->response->has_error())
throw std::runtime_error(c->response->error());
}
void PlatformApiProxy::application_launched(Request<protobuf::bridge::LaunchApplication> *request) {
DEBUG("");
void PlatformApiProxy::application_launched(Request<protobuf::bridge::Void> *request) {
launch_wait_handle_.result_received();
}
void PlatformApiProxy::set_dns_servers(const std::string &domain, const std::vector<std::string> &servers) {
ensure_rpc_channel();
auto c = std::make_shared<Request<protobuf::bridge::SetDnsServers>>();
auto c = std::make_shared<Request<protobuf::bridge::Void>>();
protobuf::bridge::SetDnsServers message;
message.set_domain(domain);
@ -107,14 +127,24 @@ void PlatformApiProxy::set_dns_servers(const std::string &domain, const std::vec
server_message->set_address(server);
}
{
std::lock_guard<decltype(mutex_)> lock(mutex_);
set_dns_servers_wait_handle_.expect_result();
}
channel_->call_method("set_dns_servers",
&message,
c->response.get(),
google::protobuf::NewCallback(this, &PlatformApiProxy::dns_servers_set, c.get()));
set_dns_servers_wait_handle_.wait_for_all();
if (c->response->has_error())
throw std::runtime_error(c->response->error());
}
void PlatformApiProxy::dns_servers_set(Request<protobuf::bridge::SetDnsServers> *request) {
DEBUG("");
void PlatformApiProxy::dns_servers_set(Request<protobuf::bridge::Void> *request) {
set_dns_servers_wait_handle_.result_received();
}
} // namespace bridge
} // namespace anbox

View file

@ -19,6 +19,7 @@
#define ANBOX_ANDROID_APPLICATION_MANAGER_H_
#include "anbox/application_manager.h"
#include "anbox/common/wait_handle.h"
#include <memory>
#include <vector>
@ -26,9 +27,7 @@
namespace anbox {
namespace protobuf {
namespace bridge {
class InstallApplication;
class LaunchApplication;
class SetDnsServers;
class Void;
} // namespace bridge
} // namespace protobuf
namespace bridge {
@ -51,15 +50,20 @@ private:
template<typename Response>
struct Request {
Request() : response(std::make_shared<Response>()) { }
Request() : response(std::make_shared<Response>()), success(true) { }
std::shared_ptr<Response> response;
bool success;
};
void application_installed(Request<protobuf::bridge::InstallApplication> *request);
void application_launched(Request<protobuf::bridge::LaunchApplication> *request);
void dns_servers_set(Request<protobuf::bridge::SetDnsServers> *request);
void application_installed(Request<protobuf::bridge::Void> *request);
void application_launched(Request<protobuf::bridge::Void> *request);
void dns_servers_set(Request<protobuf::bridge::Void> *request);
mutable std::mutex mutex_;
std::shared_ptr<RpcChannel> channel_;
common::WaitHandle install_wait_handle_;
common::WaitHandle launch_wait_handle_;
common::WaitHandle set_dns_servers_wait_handle_;
};
} // namespace bridge
} // namespace anbox

View file

@ -0,0 +1,92 @@
/*
* Copyright © 2012-2014 Canonical Ltd.
*
* This program is free software: you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* Authored by: Kevin DuBois <kevin.dubois@canonical.com>
* Daniel van Vugt <daniel.van.vugt@canonical.com>
*/
#include "anbox/common/wait_handle.h"
namespace anbox {
namespace common {
WaitHandle::WaitHandle() :
guard(),
wait_condition(),
expecting(0),
received(0)
{
}
WaitHandle::~WaitHandle()
{
}
void WaitHandle::expect_result()
{
std::lock_guard<std::mutex> lock(guard);
expecting++;
}
void WaitHandle::result_received()
{
std::lock_guard<std::mutex> lock(guard);
received++;
wait_condition.notify_all();
}
void WaitHandle::wait_for_all() // wait for all results you expect
{
std::unique_lock<std::mutex> lock(guard);
wait_condition.wait(lock, [&]{ return received == expecting; });
received = 0;
expecting = 0;
}
void WaitHandle::wait_for_pending(std::chrono::milliseconds limit)
{
std::unique_lock<std::mutex> lock(guard);
wait_condition.wait_for(lock, limit, [&]{ return received == expecting; });
}
void WaitHandle::wait_for_one() // wait for any single result
{
std::unique_lock<std::mutex> lock(guard);
wait_condition.wait(lock, [&]{ return received != 0; });
--received;
--expecting;
}
bool WaitHandle::has_result()
{
std::lock_guard<std::mutex> lock(guard);
return received > 0;
}
bool WaitHandle::is_pending()
{
std::unique_lock<std::mutex> lock(guard);
return expecting > 0 && received != expecting;
}
} // namespace common
} // namespace anbox

View file

@ -0,0 +1,54 @@
/*
* Copyright © 2012 Canonical Ltd.
*
* This program is free software: you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* Authored by: Thomas Guest <thomas.guest@canonical.com>
* Daniel van Vugt <daniel.van.vugt@canonical.com>
*/
#ifndef ANBOX_COMMON_WAIT_HANDLE_H_
#define ANBOX_COMMON_WAIT_HANDLE_H_
#include <chrono>
#include <condition_variable>
#include <mutex>
namespace anbox {
namespace common {
struct WaitHandle
{
public:
WaitHandle();
~WaitHandle();
void expect_result();
void result_received();
void wait_for_all();
void wait_for_one();
void wait_for_pending(std::chrono::milliseconds limit);
bool has_result();
bool is_pending();
private:
std::mutex guard;
std::condition_variable wait_condition;
int expecting;
int received;
};
} // namespace common
} // namespace anbox
#endif