device_network.cpp 20 KB


  1. /*
  2. * Copyright 2011-2013 Blender Foundation
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "device/device.h"
  17. #include "device/device_intern.h"
  18. #include "device/device_network.h"
  19. #include "util/util_foreach.h"
  20. #include "util/util_logging.h"
  21. #if defined(WITH_NETWORK)
  22. CCL_NAMESPACE_BEGIN
  23. typedef map<device_ptr, device_ptr> PtrMap;
  24. typedef vector<uint8_t> DataVector;
  25. typedef map<device_ptr, DataVector> DataMap;
  26. /* tile list */
  27. typedef vector<RenderTile> TileList;
  28. /* search a list of tiles and find the one that matches the passed render tile */
  29. static TileList::iterator tile_list_find(TileList &tile_list, RenderTile &tile)
  30. {
  31. for (TileList::iterator it = tile_list.begin(); it != tile_list.end(); ++it)
  32. if (tile.x == it->x && tile.y == it->y && tile.start_sample == it->start_sample)
  33. return it;
  34. return tile_list.end();
  35. }
  36. class NetworkDevice : public Device {
  37. public:
  38. boost::asio::io_service io_service;
  39. tcp::socket socket;
  40. device_ptr mem_counter;
  41. DeviceTask the_task; /* todo: handle multiple tasks */
  42. thread_mutex rpc_lock;
  43. virtual bool show_samples() const
  44. {
  45. return false;
  46. }
  47. NetworkDevice(DeviceInfo &info, Stats &stats, Profiler &profiler, const char *address)
  48. : Device(info, stats, profiler, true), socket(io_service)
  49. {
  50. error_func = NetworkError();
  51. stringstream portstr;
  52. portstr << SERVER_PORT;
  53. tcp::resolver resolver(io_service);
  54. tcp::resolver::query query(address, portstr.str());
  55. tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
  56. tcp::resolver::iterator end;
  57. boost::system::error_code error = boost::asio::error::host_not_found;
  58. while (error && endpoint_iterator != end) {
  59. socket.close();
  60. socket.connect(*endpoint_iterator++, error);
  61. }
  62. if (error)
  63. error_func.network_error(error.message());
  64. mem_counter = 0;
  65. }
  66. ~NetworkDevice()
  67. {
  68. RPCSend snd(socket, &error_func, "stop");
  69. snd.write();
  70. }
  71. virtual BVHLayoutMask get_bvh_layout_mask() const
  72. {
  73. return BVH_LAYOUT_BVH2;
  74. }
  75. void mem_alloc(device_memory &mem)
  76. {
  77. if (mem.name) {
  78. VLOG(1) << "Buffer allocate: " << mem.name << ", "
  79. << string_human_readable_number(mem.memory_size()) << " bytes. ("
  80. << string_human_readable_size(mem.memory_size()) << ")";
  81. }
  82. thread_scoped_lock lock(rpc_lock);
  83. mem.device_pointer = ++mem_counter;
  84. RPCSend snd(socket, &error_func, "mem_alloc");
  85. snd.add(mem);
  86. snd.write();
  87. }
  88. void mem_copy_to(device_memory &mem)
  89. {
  90. thread_scoped_lock lock(rpc_lock);
  91. RPCSend snd(socket, &error_func, "mem_copy_to");
  92. snd.add(mem);
  93. snd.write();
  94. snd.write_buffer(mem.host_pointer, mem.memory_size());
  95. }
  96. void mem_copy_from(device_memory &mem, int y, int w, int h, int elem)
  97. {
  98. thread_scoped_lock lock(rpc_lock);
  99. size_t data_size = mem.memory_size();
  100. RPCSend snd(socket, &error_func, "mem_copy_from");
  101. snd.add(mem);
  102. snd.add(y);
  103. snd.add(w);
  104. snd.add(h);
  105. snd.add(elem);
  106. snd.write();
  107. RPCReceive rcv(socket, &error_func);
  108. rcv.read_buffer(mem.host_pointer, data_size);
  109. }
  110. void mem_zero(device_memory &mem)
  111. {
  112. thread_scoped_lock lock(rpc_lock);
  113. RPCSend snd(socket, &error_func, "mem_zero");
  114. snd.add(mem);
  115. snd.write();
  116. }
  117. void mem_free(device_memory &mem)
  118. {
  119. if (mem.device_pointer) {
  120. thread_scoped_lock lock(rpc_lock);
  121. RPCSend snd(socket, &error_func, "mem_free");
  122. snd.add(mem);
  123. snd.write();
  124. mem.device_pointer = 0;
  125. }
  126. }
  127. void const_copy_to(const char *name, void *host, size_t size)
  128. {
  129. thread_scoped_lock lock(rpc_lock);
  130. RPCSend snd(socket, &error_func, "const_copy_to");
  131. string name_string(name);
  132. snd.add(name_string);
  133. snd.add(size);
  134. snd.write();
  135. snd.write_buffer(host, size);
  136. }
  137. bool load_kernels(const DeviceRequestedFeatures &requested_features)
  138. {
  139. if (error_func.have_error())
  140. return false;
  141. thread_scoped_lock lock(rpc_lock);
  142. RPCSend snd(socket, &error_func, "load_kernels");
  143. snd.add(requested_features.experimental);
  144. snd.add(requested_features.max_closure);
  145. snd.add(requested_features.max_nodes_group);
  146. snd.add(requested_features.nodes_features);
  147. snd.write();
  148. bool result;
  149. RPCReceive rcv(socket, &error_func);
  150. rcv.read(result);
  151. return result;
  152. }
  153. void task_add(DeviceTask &task)
  154. {
  155. thread_scoped_lock lock(rpc_lock);
  156. the_task = task;
  157. RPCSend snd(socket, &error_func, "task_add");
  158. snd.add(task);
  159. snd.write();
  160. }
  161. void task_wait()
  162. {
  163. thread_scoped_lock lock(rpc_lock);
  164. RPCSend snd(socket, &error_func, "task_wait");
  165. snd.write();
  166. lock.unlock();
  167. TileList the_tiles;
  168. /* todo: run this threaded for connecting to multiple clients */
  169. for (;;) {
  170. if (error_func.have_error())
  171. break;
  172. RenderTile tile;
  173. lock.lock();
  174. RPCReceive rcv(socket, &error_func);
  175. if (rcv.name == "acquire_tile") {
  176. lock.unlock();
  177. /* todo: watch out for recursive calls! */
  178. if (the_task.acquire_tile(this, tile)) { /* write return as bool */
  179. the_tiles.push_back(tile);
  180. lock.lock();
  181. RPCSend snd(socket, &error_func, "acquire_tile");
  182. snd.add(tile);
  183. snd.write();
  184. lock.unlock();
  185. }
  186. else {
  187. lock.lock();
  188. RPCSend snd(socket, &error_func, "acquire_tile_none");
  189. snd.write();
  190. lock.unlock();
  191. }
  192. }
  193. else if (rcv.name == "release_tile") {
  194. rcv.read(tile);
  195. lock.unlock();
  196. TileList::iterator it = tile_list_find(the_tiles, tile);
  197. if (it != the_tiles.end()) {
  198. tile.buffers = it->buffers;
  199. the_tiles.erase(it);
  200. }
  201. assert(tile.buffers != NULL);
  202. the_task.release_tile(tile);
  203. lock.lock();
  204. RPCSend snd(socket, &error_func, "release_tile");
  205. snd.write();
  206. lock.unlock();
  207. }
  208. else if (rcv.name == "task_wait_done") {
  209. lock.unlock();
  210. break;
  211. }
  212. else
  213. lock.unlock();
  214. }
  215. }
  216. void task_cancel()
  217. {
  218. thread_scoped_lock lock(rpc_lock);
  219. RPCSend snd(socket, &error_func, "task_cancel");
  220. snd.write();
  221. }
  222. int get_split_task_count(DeviceTask &)
  223. {
  224. return 1;
  225. }
  226. private:
  227. NetworkError error_func;
  228. };
  229. Device *device_network_create(DeviceInfo &info,
  230. Stats &stats,
  231. Profiler &profiler,
  232. const char *address)
  233. {
  234. return new NetworkDevice(info, stats, profiler, address);
  235. }
  236. void device_network_info(vector<DeviceInfo> &devices)
  237. {
  238. DeviceInfo info;
  239. info.type = DEVICE_NETWORK;
  240. info.description = "Network Device";
  241. info.id = "NETWORK";
  242. info.num = 0;
  243. /* todo: get this info from device */
  244. info.has_volume_decoupled = false;
  245. info.has_osl = false;
  246. devices.push_back(info);
  247. }
  248. class DeviceServer {
  249. public:
  250. thread_mutex rpc_lock;
  251. void network_error(const string &message)
  252. {
  253. error_func.network_error(message);
  254. }
  255. bool have_error()
  256. {
  257. return error_func.have_error();
  258. }
  259. DeviceServer(Device *device_, tcp::socket &socket_)
  260. : device(device_), socket(socket_), stop(false), blocked_waiting(false)
  261. {
  262. error_func = NetworkError();
  263. }
  264. void listen()
  265. {
  266. /* receive remote function calls */
  267. for (;;) {
  268. listen_step();
  269. if (stop)
  270. break;
  271. }
  272. }
  273. protected:
  274. void listen_step()
  275. {
  276. thread_scoped_lock lock(rpc_lock);
  277. RPCReceive rcv(socket, &error_func);
  278. if (rcv.name == "stop")
  279. stop = true;
  280. else
  281. process(rcv, lock);
  282. }
  283. /* create a memory buffer for a device buffer and insert it into mem_data */
  284. DataVector &data_vector_insert(device_ptr client_pointer, size_t data_size)
  285. {
  286. /* create a new DataVector and insert it into mem_data */
  287. pair<DataMap::iterator, bool> data_ins = mem_data.insert(
  288. DataMap::value_type(client_pointer, DataVector()));
  289. /* make sure it was a unique insertion */
  290. assert(data_ins.second);
  291. /* get a reference to the inserted vector */
  292. DataVector &data_v = data_ins.first->second;
  293. /* size the vector */
  294. data_v.resize(data_size);
  295. return data_v;
  296. }
  297. DataVector &data_vector_find(device_ptr client_pointer)
  298. {
  299. DataMap::iterator i = mem_data.find(client_pointer);
  300. assert(i != mem_data.end());
  301. return i->second;
  302. }
  303. /* setup mapping and reverse mapping of client_pointer<->real_pointer */
  304. void pointer_mapping_insert(device_ptr client_pointer, device_ptr real_pointer)
  305. {
  306. pair<PtrMap::iterator, bool> mapins;
  307. /* insert mapping from client pointer to our real device pointer */
  308. mapins = ptr_map.insert(PtrMap::value_type(client_pointer, real_pointer));
  309. assert(mapins.second);
  310. /* insert reverse mapping from real our device pointer to client pointer */
  311. mapins = ptr_imap.insert(PtrMap::value_type(real_pointer, client_pointer));
  312. assert(mapins.second);
  313. }
  314. device_ptr device_ptr_from_client_pointer(device_ptr client_pointer)
  315. {
  316. PtrMap::iterator i = ptr_map.find(client_pointer);
  317. assert(i != ptr_map.end());
  318. return i->second;
  319. }
  320. device_ptr device_ptr_from_client_pointer_erase(device_ptr client_pointer)
  321. {
  322. PtrMap::iterator i = ptr_map.find(client_pointer);
  323. assert(i != ptr_map.end());
  324. device_ptr result = i->second;
  325. /* erase the mapping */
  326. ptr_map.erase(i);
  327. /* erase the reverse mapping */
  328. PtrMap::iterator irev = ptr_imap.find(result);
  329. assert(irev != ptr_imap.end());
  330. ptr_imap.erase(irev);
  331. /* erase the data vector */
  332. DataMap::iterator idata = mem_data.find(client_pointer);
  333. assert(idata != mem_data.end());
  334. mem_data.erase(idata);
  335. return result;
  336. }
  337. /* note that the lock must be already acquired upon entry.
  338. * This is necessary because the caller often peeks at
  339. * the header and delegates control to here when it doesn't
  340. * specifically handle the current RPC.
  341. * The lock must be unlocked before returning */
  342. void process(RPCReceive &rcv, thread_scoped_lock &lock)
  343. {
  344. if (rcv.name == "mem_alloc") {
  345. string name;
  346. network_device_memory mem(device);
  347. rcv.read(mem, name);
  348. lock.unlock();
  349. /* Allocate host side data buffer. */
  350. size_t data_size = mem.memory_size();
  351. device_ptr client_pointer = mem.device_pointer;
  352. DataVector &data_v = data_vector_insert(client_pointer, data_size);
  353. mem.host_pointer = (data_size) ? (void *)&(data_v[0]) : 0;
  354. /* Perform the allocation on the actual device. */
  355. device->mem_alloc(mem);
  356. /* Store a mapping to/from client_pointer and real device pointer. */
  357. pointer_mapping_insert(client_pointer, mem.device_pointer);
  358. }
  359. else if (rcv.name == "mem_copy_to") {
  360. string name;
  361. network_device_memory mem(device);
  362. rcv.read(mem, name);
  363. lock.unlock();
  364. size_t data_size = mem.memory_size();
  365. device_ptr client_pointer = mem.device_pointer;
  366. if (client_pointer) {
  367. /* Lookup existing host side data buffer. */
  368. DataVector &data_v = data_vector_find(client_pointer);
  369. mem.host_pointer = (void *)&data_v[0];
  370. /* Translate the client pointer to a real device pointer. */
  371. mem.device_pointer = device_ptr_from_client_pointer(client_pointer);
  372. }
  373. else {
  374. /* Allocate host side data buffer. */
  375. DataVector &data_v = data_vector_insert(client_pointer, data_size);
  376. mem.host_pointer = (data_size) ? (void *)&(data_v[0]) : 0;
  377. }
  378. /* Copy data from network into memory buffer. */
  379. rcv.read_buffer((uint8_t *)mem.host_pointer, data_size);
  380. /* Copy the data from the memory buffer to the device buffer. */
  381. device->mem_copy_to(mem);
  382. if (!client_pointer) {
  383. /* Store a mapping to/from client_pointer and real device pointer. */
  384. pointer_mapping_insert(client_pointer, mem.device_pointer);
  385. }
  386. }
  387. else if (rcv.name == "mem_copy_from") {
  388. string name;
  389. network_device_memory mem(device);
  390. int y, w, h, elem;
  391. rcv.read(mem, name);
  392. rcv.read(y);
  393. rcv.read(w);
  394. rcv.read(h);
  395. rcv.read(elem);
  396. device_ptr client_pointer = mem.device_pointer;
  397. mem.device_pointer = device_ptr_from_client_pointer(client_pointer);
  398. DataVector &data_v = data_vector_find(client_pointer);
  399. mem.host_pointer = (device_ptr) & (data_v[0]);
  400. device->mem_copy_from(mem, y, w, h, elem);
  401. size_t data_size = mem.memory_size();
  402. RPCSend snd(socket, &error_func, "mem_copy_from");
  403. snd.write();
  404. snd.write_buffer((uint8_t *)mem.host_pointer, data_size);
  405. lock.unlock();
  406. }
  407. else if (rcv.name == "mem_zero") {
  408. string name;
  409. network_device_memory mem(device);
  410. rcv.read(mem, name);
  411. lock.unlock();
  412. size_t data_size = mem.memory_size();
  413. device_ptr client_pointer = mem.device_pointer;
  414. if (client_pointer) {
  415. /* Lookup existing host side data buffer. */
  416. DataVector &data_v = data_vector_find(client_pointer);
  417. mem.host_pointer = (void *)&data_v[0];
  418. /* Translate the client pointer to a real device pointer. */
  419. mem.device_pointer = device_ptr_from_client_pointer(client_pointer);
  420. }
  421. else {
  422. /* Allocate host side data buffer. */
  423. DataVector &data_v = data_vector_insert(client_pointer, data_size);
  424. mem.host_pointer = (void *) ? (device_ptr) & (data_v[0]) : 0;
  425. }
  426. /* Zero memory. */
  427. device->mem_zero(mem);
  428. if (!client_pointer) {
  429. /* Store a mapping to/from client_pointer and real device pointer. */
  430. pointer_mapping_insert(client_pointer, mem.device_pointer);
  431. }
  432. }
  433. else if (rcv.name == "mem_free") {
  434. string name;
  435. network_device_memory mem(device);
  436. rcv.read(mem, name);
  437. lock.unlock();
  438. device_ptr client_pointer = mem.device_pointer;
  439. mem.device_pointer = device_ptr_from_client_pointer_erase(client_pointer);
  440. device->mem_free(mem);
  441. }
  442. else if (rcv.name == "const_copy_to") {
  443. string name_string;
  444. size_t size;
  445. rcv.read(name_string);
  446. rcv.read(size);
  447. vector<char> host_vector(size);
  448. rcv.read_buffer(&host_vector[0], size);
  449. lock.unlock();
  450. device->const_copy_to(name_string.c_str(), &host_vector[0], size);
  451. }
  452. else if (rcv.name == "load_kernels") {
  453. DeviceRequestedFeatures requested_features;
  454. rcv.read(requested_features.experimental);
  455. rcv.read(requested_features.max_closure);
  456. rcv.read(requested_features.max_nodes_group);
  457. rcv.read(requested_features.nodes_features);
  458. bool result;
  459. result = device->load_kernels(requested_features);
  460. RPCSend snd(socket, &error_func, "load_kernels");
  461. snd.add(result);
  462. snd.write();
  463. lock.unlock();
  464. }
  465. else if (rcv.name == "task_add") {
  466. DeviceTask task;
  467. rcv.read(task);
  468. lock.unlock();
  469. if (task.buffer)
  470. task.buffer = device_ptr_from_client_pointer(task.buffer);
  471. if (task.rgba_half)
  472. task.rgba_half = device_ptr_from_client_pointer(task.rgba_half);
  473. if (task.rgba_byte)
  474. task.rgba_byte = device_ptr_from_client_pointer(task.rgba_byte);
  475. if (task.shader_input)
  476. task.shader_input = device_ptr_from_client_pointer(task.shader_input);
  477. if (task.shader_output)
  478. task.shader_output = device_ptr_from_client_pointer(task.shader_output);
  479. task.acquire_tile = function_bind(&DeviceServer::task_acquire_tile, this, _1, _2);
  480. task.release_tile = function_bind(&DeviceServer::task_release_tile, this, _1);
  481. task.update_progress_sample = function_bind(&DeviceServer::task_update_progress_sample,
  482. this);
  483. task.update_tile_sample = function_bind(&DeviceServer::task_update_tile_sample, this, _1);
  484. task.get_cancel = function_bind(&DeviceServer::task_get_cancel, this);
  485. device->task_add(task);
  486. }
  487. else if (rcv.name == "task_wait") {
  488. lock.unlock();
  489. blocked_waiting = true;
  490. device->task_wait();
  491. blocked_waiting = false;
  492. lock.lock();
  493. RPCSend snd(socket, &error_func, "task_wait_done");
  494. snd.write();
  495. lock.unlock();
  496. }
  497. else if (rcv.name == "task_cancel") {
  498. lock.unlock();
  499. device->task_cancel();
  500. }
  501. else if (rcv.name == "acquire_tile") {
  502. AcquireEntry entry;
  503. entry.name = rcv.name;
  504. rcv.read(entry.tile);
  505. acquire_queue.push_back(entry);
  506. lock.unlock();
  507. }
  508. else if (rcv.name == "acquire_tile_none") {
  509. AcquireEntry entry;
  510. entry.name = rcv.name;
  511. acquire_queue.push_back(entry);
  512. lock.unlock();
  513. }
  514. else if (rcv.name == "release_tile") {
  515. AcquireEntry entry;
  516. entry.name = rcv.name;
  517. acquire_queue.push_back(entry);
  518. lock.unlock();
  519. }
  520. else {
  521. cout << "Error: unexpected RPC receive call \"" + rcv.name + "\"\n";
  522. lock.unlock();
  523. }
  524. }
  525. bool task_acquire_tile(Device *, RenderTile &tile)
  526. {
  527. thread_scoped_lock acquire_lock(acquire_mutex);
  528. bool result = false;
  529. RPCSend snd(socket, &error_func, "acquire_tile");
  530. snd.write();
  531. do {
  532. if (blocked_waiting)
  533. listen_step();
  534. /* todo: avoid busy wait loop */
  535. thread_scoped_lock lock(rpc_lock);
  536. if (!acquire_queue.empty()) {
  537. AcquireEntry entry = acquire_queue.front();
  538. acquire_queue.pop_front();
  539. if (entry.name == "acquire_tile") {
  540. tile = entry.tile;
  541. if (tile.buffer)
  542. tile.buffer = ptr_map[tile.buffer];
  543. result = true;
  544. break;
  545. }
  546. else if (entry.name == "acquire_tile_none") {
  547. break;
  548. }
  549. else {
  550. cout << "Error: unexpected acquire RPC receive call \"" + entry.name + "\"\n";
  551. }
  552. }
  553. } while (acquire_queue.empty() && !stop && !have_error());
  554. return result;
  555. }
  556. void task_update_progress_sample()
  557. {
  558. ; /* skip */
  559. }
  560. void task_update_tile_sample(RenderTile &)
  561. {
  562. ; /* skip */
  563. }
  564. void task_release_tile(RenderTile &tile)
  565. {
  566. thread_scoped_lock acquire_lock(acquire_mutex);
  567. if (tile.buffer)
  568. tile.buffer = ptr_imap[tile.buffer];
  569. {
  570. thread_scoped_lock lock(rpc_lock);
  571. RPCSend snd(socket, &error_func, "release_tile");
  572. snd.add(tile);
  573. snd.write();
  574. lock.unlock();
  575. }
  576. do {
  577. if (blocked_waiting)
  578. listen_step();
  579. /* todo: avoid busy wait loop */
  580. thread_scoped_lock lock(rpc_lock);
  581. if (!acquire_queue.empty()) {
  582. AcquireEntry entry = acquire_queue.front();
  583. acquire_queue.pop_front();
  584. if (entry.name == "release_tile") {
  585. lock.unlock();
  586. break;
  587. }
  588. else {
  589. cout << "Error: unexpected release RPC receive call \"" + entry.name + "\"\n";
  590. }
  591. }
  592. } while (acquire_queue.empty() && !stop);
  593. }
  594. bool task_get_cancel()
  595. {
  596. return false;
  597. }
  598. /* properties */
  599. Device *device;
  600. tcp::socket &socket;
  601. /* mapping of remote to local pointer */
  602. PtrMap ptr_map;
  603. PtrMap ptr_imap;
  604. DataMap mem_data;
  605. struct AcquireEntry {
  606. string name;
  607. RenderTile tile;
  608. };
  609. thread_mutex acquire_mutex;
  610. list<AcquireEntry> acquire_queue;
  611. bool stop;
  612. bool blocked_waiting;
  613. private:
  614. NetworkError error_func;
  615. /* todo: free memory and device (osl) on network error */
  616. };
  617. void Device::server_run()
  618. {
  619. try {
  620. /* starts thread that responds to discovery requests */
  621. ServerDiscovery discovery;
  622. for (;;) {
  623. /* accept connection */
  624. boost::asio::io_service io_service;
  625. tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), SERVER_PORT));
  626. tcp::socket socket(io_service);
  627. acceptor.accept(socket);
  628. string remote_address = socket.remote_endpoint().address().to_string();
  629. printf("Connected to remote client at: %s\n", remote_address.c_str());
  630. DeviceServer server(this, socket);
  631. server.listen();
  632. printf("Disconnected.\n");
  633. }
  634. }
  635. catch (exception &e) {
  636. fprintf(stderr, "Network server exception: %s\n", e.what());
  637. }
  638. }
  639. CCL_NAMESPACE_END
  640. #endif