md-cluster.c 44 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607
  1. // SPDX-License-Identifier: GPL-2.0-or-later
  2. /*
  3. * Copyright (C) 2015, SUSE
  4. */
  5. #include <linux/module.h>
  6. #include <linux/kthread.h>
  7. #include <linux/dlm.h>
  8. #include <linux/sched.h>
  9. #include <linux/raid/md_p.h>
  10. #include "md.h"
  11. #include "md-bitmap.h"
  12. #include "md-cluster.h"
  13. #define LVB_SIZE 64
  14. #define NEW_DEV_TIMEOUT 5000
  15. struct dlm_lock_resource {
  16. dlm_lockspace_t *ls;
  17. struct dlm_lksb lksb;
  18. char *name; /* lock name. */
  19. uint32_t flags; /* flags to pass to dlm_lock() */
  20. wait_queue_head_t sync_locking; /* wait queue for synchronized locking */
  21. bool sync_locking_done;
  22. void (*bast)(void *arg, int mode); /* blocking AST function pointer*/
  23. struct mddev *mddev; /* pointing back to mddev. */
  24. int mode;
  25. };
  26. struct resync_info {
  27. __le64 lo;
  28. __le64 hi;
  29. };
  30. /* md_cluster_info flags */
  31. #define MD_CLUSTER_WAITING_FOR_NEWDISK 1
  32. #define MD_CLUSTER_SUSPEND_READ_BALANCING 2
  33. #define MD_CLUSTER_BEGIN_JOIN_CLUSTER 3
  34. /* Lock the send communication. This is done through
  35. * bit manipulation as opposed to a mutex in order to
  36. * accomodate lock and hold. See next comment.
  37. */
  38. #define MD_CLUSTER_SEND_LOCK 4
  39. /* If cluster operations (such as adding a disk) must lock the
  40. * communication channel, so as to perform extra operations
  41. * (update metadata) and no other operation is allowed on the
  42. * MD. Token needs to be locked and held until the operation
  43. * completes witha md_update_sb(), which would eventually release
  44. * the lock.
  45. */
  46. #define MD_CLUSTER_SEND_LOCKED_ALREADY 5
  47. /* We should receive message after node joined cluster and
  48. * set up all the related infos such as bitmap and personality */
  49. #define MD_CLUSTER_ALREADY_IN_CLUSTER 6
  50. #define MD_CLUSTER_PENDING_RECV_EVENT 7
  51. #define MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD 8
  52. struct md_cluster_info {
  53. struct mddev *mddev; /* the md device which md_cluster_info belongs to */
  54. /* dlm lock space and resources for clustered raid. */
  55. dlm_lockspace_t *lockspace;
  56. int slot_number;
  57. struct completion completion;
  58. struct mutex recv_mutex;
  59. struct dlm_lock_resource *bitmap_lockres;
  60. struct dlm_lock_resource **other_bitmap_lockres;
  61. struct dlm_lock_resource *resync_lockres;
  62. struct list_head suspend_list;
  63. spinlock_t suspend_lock;
  64. /* record the region which write should be suspended */
  65. sector_t suspend_lo;
  66. sector_t suspend_hi;
  67. int suspend_from; /* the slot which broadcast suspend_lo/hi */
  68. struct md_thread *recovery_thread;
  69. unsigned long recovery_map;
  70. /* communication loc resources */
  71. struct dlm_lock_resource *ack_lockres;
  72. struct dlm_lock_resource *message_lockres;
  73. struct dlm_lock_resource *token_lockres;
  74. struct dlm_lock_resource *no_new_dev_lockres;
  75. struct md_thread *recv_thread;
  76. struct completion newdisk_completion;
  77. wait_queue_head_t wait;
  78. unsigned long state;
  79. /* record the region in RESYNCING message */
  80. sector_t sync_low;
  81. sector_t sync_hi;
  82. };
  83. enum msg_type {
  84. METADATA_UPDATED = 0,
  85. RESYNCING,
  86. NEWDISK,
  87. REMOVE,
  88. RE_ADD,
  89. BITMAP_NEEDS_SYNC,
  90. CHANGE_CAPACITY,
  91. BITMAP_RESIZE,
  92. };
  93. struct cluster_msg {
  94. __le32 type;
  95. __le32 slot;
  96. /* TODO: Unionize this for smaller footprint */
  97. __le64 low;
  98. __le64 high;
  99. char uuid[16];
  100. __le32 raid_slot;
  101. };
  102. static void sync_ast(void *arg)
  103. {
  104. struct dlm_lock_resource *res;
  105. res = arg;
  106. res->sync_locking_done = true;
  107. wake_up(&res->sync_locking);
  108. }
  109. static int dlm_lock_sync(struct dlm_lock_resource *res, int mode)
  110. {
  111. int ret = 0;
  112. ret = dlm_lock(res->ls, mode, &res->lksb,
  113. res->flags, res->name, strlen(res->name),
  114. 0, sync_ast, res, res->bast);
  115. if (ret)
  116. return ret;
  117. wait_event(res->sync_locking, res->sync_locking_done);
  118. res->sync_locking_done = false;
  119. if (res->lksb.sb_status == 0)
  120. res->mode = mode;
  121. return res->lksb.sb_status;
  122. }
  123. static int dlm_unlock_sync(struct dlm_lock_resource *res)
  124. {
  125. return dlm_lock_sync(res, DLM_LOCK_NL);
  126. }
  127. /*
  128. * An variation of dlm_lock_sync, which make lock request could
  129. * be interrupted
  130. */
  131. static int dlm_lock_sync_interruptible(struct dlm_lock_resource *res, int mode,
  132. struct mddev *mddev)
  133. {
  134. int ret = 0;
  135. ret = dlm_lock(res->ls, mode, &res->lksb,
  136. res->flags, res->name, strlen(res->name),
  137. 0, sync_ast, res, res->bast);
  138. if (ret)
  139. return ret;
  140. wait_event(res->sync_locking, res->sync_locking_done
  141. || kthread_should_stop()
  142. || test_bit(MD_CLOSING, &mddev->flags));
  143. if (!res->sync_locking_done) {
  144. /*
  145. * the convert queue contains the lock request when request is
  146. * interrupted, and sync_ast could still be run, so need to
  147. * cancel the request and reset completion
  148. */
  149. ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_CANCEL,
  150. &res->lksb, res);
  151. res->sync_locking_done = false;
  152. if (unlikely(ret != 0))
  153. pr_info("failed to cancel previous lock request "
  154. "%s return %d\n", res->name, ret);
  155. return -EPERM;
  156. } else
  157. res->sync_locking_done = false;
  158. if (res->lksb.sb_status == 0)
  159. res->mode = mode;
  160. return res->lksb.sb_status;
  161. }
  162. static struct dlm_lock_resource *lockres_init(struct mddev *mddev,
  163. char *name, void (*bastfn)(void *arg, int mode), int with_lvb)
  164. {
  165. struct dlm_lock_resource *res = NULL;
  166. int ret, namelen;
  167. struct md_cluster_info *cinfo = mddev->cluster_info;
  168. res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
  169. if (!res)
  170. return NULL;
  171. init_waitqueue_head(&res->sync_locking);
  172. res->sync_locking_done = false;
  173. res->ls = cinfo->lockspace;
  174. res->mddev = mddev;
  175. res->mode = DLM_LOCK_IV;
  176. namelen = strlen(name);
  177. res->name = kzalloc(namelen + 1, GFP_KERNEL);
  178. if (!res->name) {
  179. pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name);
  180. goto out_err;
  181. }
  182. strlcpy(res->name, name, namelen + 1);
  183. if (with_lvb) {
  184. res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL);
  185. if (!res->lksb.sb_lvbptr) {
  186. pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name);
  187. goto out_err;
  188. }
  189. res->flags = DLM_LKF_VALBLK;
  190. }
  191. if (bastfn)
  192. res->bast = bastfn;
  193. res->flags |= DLM_LKF_EXPEDITE;
  194. ret = dlm_lock_sync(res, DLM_LOCK_NL);
  195. if (ret) {
  196. pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name);
  197. goto out_err;
  198. }
  199. res->flags &= ~DLM_LKF_EXPEDITE;
  200. res->flags |= DLM_LKF_CONVERT;
  201. return res;
  202. out_err:
  203. kfree(res->lksb.sb_lvbptr);
  204. kfree(res->name);
  205. kfree(res);
  206. return NULL;
  207. }
  208. static void lockres_free(struct dlm_lock_resource *res)
  209. {
  210. int ret = 0;
  211. if (!res)
  212. return;
  213. /*
  214. * use FORCEUNLOCK flag, so we can unlock even the lock is on the
  215. * waiting or convert queue
  216. */
  217. ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_FORCEUNLOCK,
  218. &res->lksb, res);
  219. if (unlikely(ret != 0))
  220. pr_err("failed to unlock %s return %d\n", res->name, ret);
  221. else
  222. wait_event(res->sync_locking, res->sync_locking_done);
  223. kfree(res->name);
  224. kfree(res->lksb.sb_lvbptr);
  225. kfree(res);
  226. }
  227. static void add_resync_info(struct dlm_lock_resource *lockres,
  228. sector_t lo, sector_t hi)
  229. {
  230. struct resync_info *ri;
  231. ri = (struct resync_info *)lockres->lksb.sb_lvbptr;
  232. ri->lo = cpu_to_le64(lo);
  233. ri->hi = cpu_to_le64(hi);
  234. }
  235. static int read_resync_info(struct mddev *mddev,
  236. struct dlm_lock_resource *lockres)
  237. {
  238. struct resync_info ri;
  239. struct md_cluster_info *cinfo = mddev->cluster_info;
  240. int ret = 0;
  241. dlm_lock_sync(lockres, DLM_LOCK_CR);
  242. memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
  243. if (le64_to_cpu(ri.hi) > 0) {
  244. cinfo->suspend_hi = le64_to_cpu(ri.hi);
  245. cinfo->suspend_lo = le64_to_cpu(ri.lo);
  246. ret = 1;
  247. }
  248. dlm_unlock_sync(lockres);
  249. return ret;
  250. }
  251. static void recover_bitmaps(struct md_thread *thread)
  252. {
  253. struct mddev *mddev = thread->mddev;
  254. struct md_cluster_info *cinfo = mddev->cluster_info;
  255. struct dlm_lock_resource *bm_lockres;
  256. char str[64];
  257. int slot, ret;
  258. sector_t lo, hi;
  259. while (cinfo->recovery_map) {
  260. slot = fls64((u64)cinfo->recovery_map) - 1;
  261. snprintf(str, 64, "bitmap%04d", slot);
  262. bm_lockres = lockres_init(mddev, str, NULL, 1);
  263. if (!bm_lockres) {
  264. pr_err("md-cluster: Cannot initialize bitmaps\n");
  265. goto clear_bit;
  266. }
  267. ret = dlm_lock_sync_interruptible(bm_lockres, DLM_LOCK_PW, mddev);
  268. if (ret) {
  269. pr_err("md-cluster: Could not DLM lock %s: %d\n",
  270. str, ret);
  271. goto clear_bit;
  272. }
  273. ret = md_bitmap_copy_from_slot(mddev, slot, &lo, &hi, true);
  274. if (ret) {
  275. pr_err("md-cluster: Could not copy data from bitmap %d\n", slot);
  276. goto clear_bit;
  277. }
  278. /* Clear suspend_area associated with the bitmap */
  279. spin_lock_irq(&cinfo->suspend_lock);
  280. cinfo->suspend_hi = 0;
  281. cinfo->suspend_lo = 0;
  282. cinfo->suspend_from = -1;
  283. spin_unlock_irq(&cinfo->suspend_lock);
  284. /* Kick off a reshape if needed */
  285. if (test_bit(MD_RESYNCING_REMOTE, &mddev->recovery) &&
  286. test_bit(MD_RECOVERY_RESHAPE, &mddev->recovery) &&
  287. mddev->reshape_position != MaxSector)
  288. md_wakeup_thread(mddev->sync_thread);
  289. if (hi > 0) {
  290. if (lo < mddev->recovery_cp)
  291. mddev->recovery_cp = lo;
  292. /* wake up thread to continue resync in case resync
  293. * is not finished */
  294. if (mddev->recovery_cp != MaxSector) {
  295. /*
  296. * clear the REMOTE flag since we will launch
  297. * resync thread in current node.
  298. */
  299. clear_bit(MD_RESYNCING_REMOTE,
  300. &mddev->recovery);
  301. set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
  302. md_wakeup_thread(mddev->thread);
  303. }
  304. }
  305. clear_bit:
  306. lockres_free(bm_lockres);
  307. clear_bit(slot, &cinfo->recovery_map);
  308. }
  309. }
  310. static void recover_prep(void *arg)
  311. {
  312. struct mddev *mddev = arg;
  313. struct md_cluster_info *cinfo = mddev->cluster_info;
  314. set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
  315. }
  316. static void __recover_slot(struct mddev *mddev, int slot)
  317. {
  318. struct md_cluster_info *cinfo = mddev->cluster_info;
  319. set_bit(slot, &cinfo->recovery_map);
  320. if (!cinfo->recovery_thread) {
  321. cinfo->recovery_thread = md_register_thread(recover_bitmaps,
  322. mddev, "recover");
  323. if (!cinfo->recovery_thread) {
  324. pr_warn("md-cluster: Could not create recovery thread\n");
  325. return;
  326. }
  327. }
  328. md_wakeup_thread(cinfo->recovery_thread);
  329. }
  330. static void recover_slot(void *arg, struct dlm_slot *slot)
  331. {
  332. struct mddev *mddev = arg;
  333. struct md_cluster_info *cinfo = mddev->cluster_info;
  334. pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
  335. mddev->bitmap_info.cluster_name,
  336. slot->nodeid, slot->slot,
  337. cinfo->slot_number);
  338. /* deduct one since dlm slot starts from one while the num of
  339. * cluster-md begins with 0 */
  340. __recover_slot(mddev, slot->slot - 1);
  341. }
  342. static void recover_done(void *arg, struct dlm_slot *slots,
  343. int num_slots, int our_slot,
  344. uint32_t generation)
  345. {
  346. struct mddev *mddev = arg;
  347. struct md_cluster_info *cinfo = mddev->cluster_info;
  348. cinfo->slot_number = our_slot;
  349. /* completion is only need to be complete when node join cluster,
  350. * it doesn't need to run during another node's failure */
  351. if (test_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state)) {
  352. complete(&cinfo->completion);
  353. clear_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
  354. }
  355. clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
  356. }
  357. /* the ops is called when node join the cluster, and do lock recovery
  358. * if node failure occurs */
  359. static const struct dlm_lockspace_ops md_ls_ops = {
  360. .recover_prep = recover_prep,
  361. .recover_slot = recover_slot,
  362. .recover_done = recover_done,
  363. };
  364. /*
  365. * The BAST function for the ack lock resource
  366. * This function wakes up the receive thread in
  367. * order to receive and process the message.
  368. */
  369. static void ack_bast(void *arg, int mode)
  370. {
  371. struct dlm_lock_resource *res = arg;
  372. struct md_cluster_info *cinfo = res->mddev->cluster_info;
  373. if (mode == DLM_LOCK_EX) {
  374. if (test_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state))
  375. md_wakeup_thread(cinfo->recv_thread);
  376. else
  377. set_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state);
  378. }
  379. }
  380. static void remove_suspend_info(struct mddev *mddev, int slot)
  381. {
  382. struct md_cluster_info *cinfo = mddev->cluster_info;
  383. mddev->pers->quiesce(mddev, 1);
  384. spin_lock_irq(&cinfo->suspend_lock);
  385. cinfo->suspend_hi = 0;
  386. cinfo->suspend_lo = 0;
  387. spin_unlock_irq(&cinfo->suspend_lock);
  388. mddev->pers->quiesce(mddev, 0);
  389. }
  390. static void process_suspend_info(struct mddev *mddev,
  391. int slot, sector_t lo, sector_t hi)
  392. {
  393. struct md_cluster_info *cinfo = mddev->cluster_info;
  394. struct mdp_superblock_1 *sb = NULL;
  395. struct md_rdev *rdev;
  396. if (!hi) {
  397. /*
  398. * clear the REMOTE flag since resync or recovery is finished
  399. * in remote node.
  400. */
  401. clear_bit(MD_RESYNCING_REMOTE, &mddev->recovery);
  402. remove_suspend_info(mddev, slot);
  403. set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
  404. md_wakeup_thread(mddev->thread);
  405. return;
  406. }
  407. rdev_for_each(rdev, mddev)
  408. if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) {
  409. sb = page_address(rdev->sb_page);
  410. break;
  411. }
  412. /*
  413. * The bitmaps are not same for different nodes
  414. * if RESYNCING is happening in one node, then
  415. * the node which received the RESYNCING message
  416. * probably will perform resync with the region
  417. * [lo, hi] again, so we could reduce resync time
  418. * a lot if we can ensure that the bitmaps among
  419. * different nodes are match up well.
  420. *
  421. * sync_low/hi is used to record the region which
  422. * arrived in the previous RESYNCING message,
  423. *
  424. * Call md_bitmap_sync_with_cluster to clear NEEDED_MASK
  425. * and set RESYNC_MASK since resync thread is running
  426. * in another node, so we don't need to do the resync
  427. * again with the same section.
  428. *
  429. * Skip md_bitmap_sync_with_cluster in case reshape
  430. * happening, because reshaping region is small and
  431. * we don't want to trigger lots of WARN.
  432. */
  433. if (sb && !(le32_to_cpu(sb->feature_map) & MD_FEATURE_RESHAPE_ACTIVE))
  434. md_bitmap_sync_with_cluster(mddev, cinfo->sync_low,
  435. cinfo->sync_hi, lo, hi);
  436. cinfo->sync_low = lo;
  437. cinfo->sync_hi = hi;
  438. mddev->pers->quiesce(mddev, 1);
  439. spin_lock_irq(&cinfo->suspend_lock);
  440. cinfo->suspend_from = slot;
  441. cinfo->suspend_lo = lo;
  442. cinfo->suspend_hi = hi;
  443. spin_unlock_irq(&cinfo->suspend_lock);
  444. mddev->pers->quiesce(mddev, 0);
  445. }
  446. static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg)
  447. {
  448. char disk_uuid[64];
  449. struct md_cluster_info *cinfo = mddev->cluster_info;
  450. char event_name[] = "EVENT=ADD_DEVICE";
  451. char raid_slot[16];
  452. char *envp[] = {event_name, disk_uuid, raid_slot, NULL};
  453. int len;
  454. len = snprintf(disk_uuid, 64, "DEVICE_UUID=");
  455. sprintf(disk_uuid + len, "%pU", cmsg->uuid);
  456. snprintf(raid_slot, 16, "RAID_DISK=%d", le32_to_cpu(cmsg->raid_slot));
  457. pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot);
  458. init_completion(&cinfo->newdisk_completion);
  459. set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
  460. kobject_uevent_env(&disk_to_dev(mddev->gendisk)->kobj, KOBJ_CHANGE, envp);
  461. wait_for_completion_timeout(&cinfo->newdisk_completion,
  462. NEW_DEV_TIMEOUT);
  463. clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
  464. }
  465. static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg)
  466. {
  467. int got_lock = 0;
  468. struct md_cluster_info *cinfo = mddev->cluster_info;
  469. mddev->good_device_nr = le32_to_cpu(msg->raid_slot);
  470. dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
  471. wait_event(mddev->thread->wqueue,
  472. (got_lock = mddev_trylock(mddev)) ||
  473. test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state));
  474. md_reload_sb(mddev, mddev->good_device_nr);
  475. if (got_lock)
  476. mddev_unlock(mddev);
  477. }
  478. static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg)
  479. {
  480. struct md_rdev *rdev;
  481. rcu_read_lock();
  482. rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot));
  483. if (rdev) {
  484. set_bit(ClusterRemove, &rdev->flags);
  485. set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
  486. md_wakeup_thread(mddev->thread);
  487. }
  488. else
  489. pr_warn("%s: %d Could not find disk(%d) to REMOVE\n",
  490. __func__, __LINE__, le32_to_cpu(msg->raid_slot));
  491. rcu_read_unlock();
  492. }
  493. static void process_readd_disk(struct mddev *mddev, struct cluster_msg *msg)
  494. {
  495. struct md_rdev *rdev;
  496. rcu_read_lock();
  497. rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot));
  498. if (rdev && test_bit(Faulty, &rdev->flags))
  499. clear_bit(Faulty, &rdev->flags);
  500. else
  501. pr_warn("%s: %d Could not find disk(%d) which is faulty",
  502. __func__, __LINE__, le32_to_cpu(msg->raid_slot));
  503. rcu_read_unlock();
  504. }
  505. static int process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg)
  506. {
  507. int ret = 0;
  508. if (WARN(mddev->cluster_info->slot_number - 1 == le32_to_cpu(msg->slot),
  509. "node %d received it's own msg\n", le32_to_cpu(msg->slot)))
  510. return -1;
  511. switch (le32_to_cpu(msg->type)) {
  512. case METADATA_UPDATED:
  513. process_metadata_update(mddev, msg);
  514. break;
  515. case CHANGE_CAPACITY:
  516. set_capacity(mddev->gendisk, mddev->array_sectors);
  517. revalidate_disk(mddev->gendisk);
  518. break;
  519. case RESYNCING:
  520. set_bit(MD_RESYNCING_REMOTE, &mddev->recovery);
  521. process_suspend_info(mddev, le32_to_cpu(msg->slot),
  522. le64_to_cpu(msg->low),
  523. le64_to_cpu(msg->high));
  524. break;
  525. case NEWDISK:
  526. process_add_new_disk(mddev, msg);
  527. break;
  528. case REMOVE:
  529. process_remove_disk(mddev, msg);
  530. break;
  531. case RE_ADD:
  532. process_readd_disk(mddev, msg);
  533. break;
  534. case BITMAP_NEEDS_SYNC:
  535. __recover_slot(mddev, le32_to_cpu(msg->slot));
  536. break;
  537. case BITMAP_RESIZE:
  538. if (le64_to_cpu(msg->high) != mddev->pers->size(mddev, 0, 0))
  539. ret = md_bitmap_resize(mddev->bitmap,
  540. le64_to_cpu(msg->high), 0, 0);
  541. break;
  542. default:
  543. ret = -1;
  544. pr_warn("%s:%d Received unknown message from %d\n",
  545. __func__, __LINE__, msg->slot);
  546. }
  547. return ret;
  548. }
  549. /*
  550. * thread for receiving message
  551. */
  552. static void recv_daemon(struct md_thread *thread)
  553. {
  554. struct md_cluster_info *cinfo = thread->mddev->cluster_info;
  555. struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres;
  556. struct dlm_lock_resource *message_lockres = cinfo->message_lockres;
  557. struct cluster_msg msg;
  558. int ret;
  559. mutex_lock(&cinfo->recv_mutex);
  560. /*get CR on Message*/
  561. if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) {
  562. pr_err("md/raid1:failed to get CR on MESSAGE\n");
  563. mutex_unlock(&cinfo->recv_mutex);
  564. return;
  565. }
  566. /* read lvb and wake up thread to process this message_lockres */
  567. memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg));
  568. ret = process_recvd_msg(thread->mddev, &msg);
  569. if (ret)
  570. goto out;
  571. /*release CR on ack_lockres*/
  572. ret = dlm_unlock_sync(ack_lockres);
  573. if (unlikely(ret != 0))
  574. pr_info("unlock ack failed return %d\n", ret);
  575. /*up-convert to PR on message_lockres*/
  576. ret = dlm_lock_sync(message_lockres, DLM_LOCK_PR);
  577. if (unlikely(ret != 0))
  578. pr_info("lock PR on msg failed return %d\n", ret);
  579. /*get CR on ack_lockres again*/
  580. ret = dlm_lock_sync(ack_lockres, DLM_LOCK_CR);
  581. if (unlikely(ret != 0))
  582. pr_info("lock CR on ack failed return %d\n", ret);
  583. out:
  584. /*release CR on message_lockres*/
  585. ret = dlm_unlock_sync(message_lockres);
  586. if (unlikely(ret != 0))
  587. pr_info("unlock msg failed return %d\n", ret);
  588. mutex_unlock(&cinfo->recv_mutex);
  589. }
  590. /* lock_token()
  591. * Takes the lock on the TOKEN lock resource so no other
  592. * node can communicate while the operation is underway.
  593. */
  594. static int lock_token(struct md_cluster_info *cinfo)
  595. {
  596. int error;
  597. error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
  598. if (error) {
  599. pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n",
  600. __func__, __LINE__, error);
  601. } else {
  602. /* Lock the receive sequence */
  603. mutex_lock(&cinfo->recv_mutex);
  604. }
  605. return error;
  606. }
  607. /* lock_comm()
  608. * Sets the MD_CLUSTER_SEND_LOCK bit to lock the send channel.
  609. */
  610. static int lock_comm(struct md_cluster_info *cinfo, bool mddev_locked)
  611. {
  612. int rv, set_bit = 0;
  613. struct mddev *mddev = cinfo->mddev;
  614. /*
  615. * If resync thread run after raid1d thread, then process_metadata_update
  616. * could not continue if raid1d held reconfig_mutex (and raid1d is blocked
  617. * since another node already got EX on Token and waitting the EX of Ack),
  618. * so let resync wake up thread in case flag is set.
  619. */
  620. if (mddev_locked && !test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
  621. &cinfo->state)) {
  622. rv = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
  623. &cinfo->state);
  624. WARN_ON_ONCE(rv);
  625. md_wakeup_thread(mddev->thread);
  626. set_bit = 1;
  627. }
  628. wait_event(cinfo->wait,
  629. !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state));
  630. rv = lock_token(cinfo);
  631. if (set_bit)
  632. clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
  633. return rv;
  634. }
  635. static void unlock_comm(struct md_cluster_info *cinfo)
  636. {
  637. WARN_ON(cinfo->token_lockres->mode != DLM_LOCK_EX);
  638. mutex_unlock(&cinfo->recv_mutex);
  639. dlm_unlock_sync(cinfo->token_lockres);
  640. clear_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state);
  641. wake_up(&cinfo->wait);
  642. }
  643. /* __sendmsg()
  644. * This function performs the actual sending of the message. This function is
  645. * usually called after performing the encompassing operation
  646. * The function:
  647. * 1. Grabs the message lockresource in EX mode
  648. * 2. Copies the message to the message LVB
  649. * 3. Downconverts message lockresource to CW
  650. * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes
  651. * and the other nodes read the message. The thread will wait here until all other
  652. * nodes have released ack lock resource.
  653. * 5. Downconvert ack lockresource to CR
  654. */
  655. static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
  656. {
  657. int error;
  658. int slot = cinfo->slot_number - 1;
  659. cmsg->slot = cpu_to_le32(slot);
  660. /*get EX on Message*/
  661. error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX);
  662. if (error) {
  663. pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error);
  664. goto failed_message;
  665. }
  666. memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg,
  667. sizeof(struct cluster_msg));
  668. /*down-convert EX to CW on Message*/
  669. error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CW);
  670. if (error) {
  671. pr_err("md-cluster: failed to convert EX to CW on MESSAGE(%d)\n",
  672. error);
  673. goto failed_ack;
  674. }
  675. /*up-convert CR to EX on Ack*/
  676. error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX);
  677. if (error) {
  678. pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n",
  679. error);
  680. goto failed_ack;
  681. }
  682. /*down-convert EX to CR on Ack*/
  683. error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR);
  684. if (error) {
  685. pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n",
  686. error);
  687. goto failed_ack;
  688. }
  689. failed_ack:
  690. error = dlm_unlock_sync(cinfo->message_lockres);
  691. if (unlikely(error != 0)) {
  692. pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n",
  693. error);
  694. /* in case the message can't be released due to some reason */
  695. goto failed_ack;
  696. }
  697. failed_message:
  698. return error;
  699. }
  700. static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg,
  701. bool mddev_locked)
  702. {
  703. int ret;
  704. ret = lock_comm(cinfo, mddev_locked);
  705. if (!ret) {
  706. ret = __sendmsg(cinfo, cmsg);
  707. unlock_comm(cinfo);
  708. }
  709. return ret;
  710. }
  711. static int gather_all_resync_info(struct mddev *mddev, int total_slots)
  712. {
  713. struct md_cluster_info *cinfo = mddev->cluster_info;
  714. int i, ret = 0;
  715. struct dlm_lock_resource *bm_lockres;
  716. char str[64];
  717. sector_t lo, hi;
  718. for (i = 0; i < total_slots; i++) {
  719. memset(str, '\0', 64);
  720. snprintf(str, 64, "bitmap%04d", i);
  721. bm_lockres = lockres_init(mddev, str, NULL, 1);
  722. if (!bm_lockres)
  723. return -ENOMEM;
  724. if (i == (cinfo->slot_number - 1)) {
  725. lockres_free(bm_lockres);
  726. continue;
  727. }
  728. bm_lockres->flags |= DLM_LKF_NOQUEUE;
  729. ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
  730. if (ret == -EAGAIN) {
  731. if (read_resync_info(mddev, bm_lockres)) {
  732. pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
  733. __func__, __LINE__,
  734. (unsigned long long) cinfo->suspend_lo,
  735. (unsigned long long) cinfo->suspend_hi,
  736. i);
  737. cinfo->suspend_from = i;
  738. }
  739. ret = 0;
  740. lockres_free(bm_lockres);
  741. continue;
  742. }
  743. if (ret) {
  744. lockres_free(bm_lockres);
  745. goto out;
  746. }
  747. /* Read the disk bitmap sb and check if it needs recovery */
  748. ret = md_bitmap_copy_from_slot(mddev, i, &lo, &hi, false);
  749. if (ret) {
  750. pr_warn("md-cluster: Could not gather bitmaps from slot %d", i);
  751. lockres_free(bm_lockres);
  752. continue;
  753. }
  754. if ((hi > 0) && (lo < mddev->recovery_cp)) {
  755. set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
  756. mddev->recovery_cp = lo;
  757. md_check_recovery(mddev);
  758. }
  759. lockres_free(bm_lockres);
  760. }
  761. out:
  762. return ret;
  763. }
  764. static int join(struct mddev *mddev, int nodes)
  765. {
  766. struct md_cluster_info *cinfo;
  767. int ret, ops_rv;
  768. char str[64];
  769. cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL);
  770. if (!cinfo)
  771. return -ENOMEM;
  772. INIT_LIST_HEAD(&cinfo->suspend_list);
  773. spin_lock_init(&cinfo->suspend_lock);
  774. init_completion(&cinfo->completion);
  775. set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
  776. init_waitqueue_head(&cinfo->wait);
  777. mutex_init(&cinfo->recv_mutex);
  778. mddev->cluster_info = cinfo;
  779. cinfo->mddev = mddev;
  780. memset(str, 0, 64);
  781. sprintf(str, "%pU", mddev->uuid);
  782. ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name,
  783. DLM_LSFL_FS, LVB_SIZE,
  784. &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace);
  785. if (ret)
  786. goto err;
  787. wait_for_completion(&cinfo->completion);
  788. if (nodes < cinfo->slot_number) {
  789. pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).",
  790. cinfo->slot_number, nodes);
  791. ret = -ERANGE;
  792. goto err;
  793. }
  794. /* Initiate the communication resources */
  795. ret = -ENOMEM;
  796. cinfo->recv_thread = md_register_thread(recv_daemon, mddev, "cluster_recv");
  797. if (!cinfo->recv_thread) {
  798. pr_err("md-cluster: cannot allocate memory for recv_thread!\n");
  799. goto err;
  800. }
  801. cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1);
  802. if (!cinfo->message_lockres)
  803. goto err;
  804. cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0);
  805. if (!cinfo->token_lockres)
  806. goto err;
  807. cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0);
  808. if (!cinfo->no_new_dev_lockres)
  809. goto err;
  810. ret = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
  811. if (ret) {
  812. ret = -EAGAIN;
  813. pr_err("md-cluster: can't join cluster to avoid lock issue\n");
  814. goto err;
  815. }
  816. cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0);
  817. if (!cinfo->ack_lockres) {
  818. ret = -ENOMEM;
  819. goto err;
  820. }
  821. /* get sync CR lock on ACK. */
  822. if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR))
  823. pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n",
  824. ret);
  825. dlm_unlock_sync(cinfo->token_lockres);
  826. /* get sync CR lock on no-new-dev. */
  827. if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR))
  828. pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret);
  829. pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number);
  830. snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1);
  831. cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1);
  832. if (!cinfo->bitmap_lockres) {
  833. ret = -ENOMEM;
  834. goto err;
  835. }
  836. if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) {
  837. pr_err("Failed to get bitmap lock\n");
  838. ret = -EINVAL;
  839. goto err;
  840. }
  841. cinfo->resync_lockres = lockres_init(mddev, "resync", NULL, 0);
  842. if (!cinfo->resync_lockres) {
  843. ret = -ENOMEM;
  844. goto err;
  845. }
  846. return 0;
  847. err:
  848. set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
  849. md_unregister_thread(&cinfo->recovery_thread);
  850. md_unregister_thread(&cinfo->recv_thread);
  851. lockres_free(cinfo->message_lockres);
  852. lockres_free(cinfo->token_lockres);
  853. lockres_free(cinfo->ack_lockres);
  854. lockres_free(cinfo->no_new_dev_lockres);
  855. lockres_free(cinfo->resync_lockres);
  856. lockres_free(cinfo->bitmap_lockres);
  857. if (cinfo->lockspace)
  858. dlm_release_lockspace(cinfo->lockspace, 2);
  859. mddev->cluster_info = NULL;
  860. kfree(cinfo);
  861. return ret;
  862. }
  863. static void load_bitmaps(struct mddev *mddev, int total_slots)
  864. {
  865. struct md_cluster_info *cinfo = mddev->cluster_info;
  866. /* load all the node's bitmap info for resync */
  867. if (gather_all_resync_info(mddev, total_slots))
  868. pr_err("md-cluster: failed to gather all resyn infos\n");
  869. set_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state);
  870. /* wake up recv thread in case something need to be handled */
  871. if (test_and_clear_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state))
  872. md_wakeup_thread(cinfo->recv_thread);
  873. }
  874. static void resync_bitmap(struct mddev *mddev)
  875. {
  876. struct md_cluster_info *cinfo = mddev->cluster_info;
  877. struct cluster_msg cmsg = {0};
  878. int err;
  879. cmsg.type = cpu_to_le32(BITMAP_NEEDS_SYNC);
  880. err = sendmsg(cinfo, &cmsg, 1);
  881. if (err)
  882. pr_err("%s:%d: failed to send BITMAP_NEEDS_SYNC message (%d)\n",
  883. __func__, __LINE__, err);
  884. }
  885. static void unlock_all_bitmaps(struct mddev *mddev);
  886. static int leave(struct mddev *mddev)
  887. {
  888. struct md_cluster_info *cinfo = mddev->cluster_info;
  889. if (!cinfo)
  890. return 0;
  891. /*
  892. * BITMAP_NEEDS_SYNC message should be sent when node
  893. * is leaving the cluster with dirty bitmap, also we
  894. * can only deliver it when dlm connection is available.
  895. *
  896. * Also, we should send BITMAP_NEEDS_SYNC message in
  897. * case reshaping is interrupted.
  898. */
  899. if ((cinfo->slot_number > 0 && mddev->recovery_cp != MaxSector) ||
  900. (mddev->reshape_position != MaxSector &&
  901. test_bit(MD_CLOSING, &mddev->flags)))
  902. resync_bitmap(mddev);
  903. set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
  904. md_unregister_thread(&cinfo->recovery_thread);
  905. md_unregister_thread(&cinfo->recv_thread);
  906. lockres_free(cinfo->message_lockres);
  907. lockres_free(cinfo->token_lockres);
  908. lockres_free(cinfo->ack_lockres);
  909. lockres_free(cinfo->no_new_dev_lockres);
  910. lockres_free(cinfo->resync_lockres);
  911. lockres_free(cinfo->bitmap_lockres);
  912. unlock_all_bitmaps(mddev);
  913. dlm_release_lockspace(cinfo->lockspace, 2);
  914. kfree(cinfo);
  915. return 0;
  916. }
  917. /* slot_number(): Returns the MD slot number to use
  918. * DLM starts the slot numbers from 1, wheras cluster-md
  919. * wants the number to be from zero, so we deduct one
  920. */
  921. static int slot_number(struct mddev *mddev)
  922. {
  923. struct md_cluster_info *cinfo = mddev->cluster_info;
  924. return cinfo->slot_number - 1;
  925. }
  926. /*
  927. * Check if the communication is already locked, else lock the communication
  928. * channel.
  929. * If it is already locked, token is in EX mode, and hence lock_token()
  930. * should not be called.
  931. */
  932. static int metadata_update_start(struct mddev *mddev)
  933. {
  934. struct md_cluster_info *cinfo = mddev->cluster_info;
  935. int ret;
  936. /*
  937. * metadata_update_start is always called with the protection of
  938. * reconfig_mutex, so set WAITING_FOR_TOKEN here.
  939. */
  940. ret = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
  941. &cinfo->state);
  942. WARN_ON_ONCE(ret);
  943. md_wakeup_thread(mddev->thread);
  944. wait_event(cinfo->wait,
  945. !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state) ||
  946. test_and_clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state));
  947. /* If token is already locked, return 0 */
  948. if (cinfo->token_lockres->mode == DLM_LOCK_EX) {
  949. clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
  950. return 0;
  951. }
  952. ret = lock_token(cinfo);
  953. clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
  954. return ret;
  955. }
  956. static int metadata_update_finish(struct mddev *mddev)
  957. {
  958. struct md_cluster_info *cinfo = mddev->cluster_info;
  959. struct cluster_msg cmsg;
  960. struct md_rdev *rdev;
  961. int ret = 0;
  962. int raid_slot = -1;
  963. memset(&cmsg, 0, sizeof(cmsg));
  964. cmsg.type = cpu_to_le32(METADATA_UPDATED);
  965. /* Pick up a good active device number to send.
  966. */
  967. rdev_for_each(rdev, mddev)
  968. if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) {
  969. raid_slot = rdev->desc_nr;
  970. break;
  971. }
  972. if (raid_slot >= 0) {
  973. cmsg.raid_slot = cpu_to_le32(raid_slot);
  974. ret = __sendmsg(cinfo, &cmsg);
  975. } else
  976. pr_warn("md-cluster: No good device id found to send\n");
  977. clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
  978. unlock_comm(cinfo);
  979. return ret;
  980. }
  981. static void metadata_update_cancel(struct mddev *mddev)
  982. {
  983. struct md_cluster_info *cinfo = mddev->cluster_info;
  984. clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
  985. unlock_comm(cinfo);
  986. }
  987. static int update_bitmap_size(struct mddev *mddev, sector_t size)
  988. {
  989. struct md_cluster_info *cinfo = mddev->cluster_info;
  990. struct cluster_msg cmsg = {0};
  991. int ret;
  992. cmsg.type = cpu_to_le32(BITMAP_RESIZE);
  993. cmsg.high = cpu_to_le64(size);
  994. ret = sendmsg(cinfo, &cmsg, 0);
  995. if (ret)
  996. pr_err("%s:%d: failed to send BITMAP_RESIZE message (%d)\n",
  997. __func__, __LINE__, ret);
  998. return ret;
  999. }
  1000. static int resize_bitmaps(struct mddev *mddev, sector_t newsize, sector_t oldsize)
  1001. {
  1002. struct bitmap_counts *counts;
  1003. char str[64];
  1004. struct dlm_lock_resource *bm_lockres;
  1005. struct bitmap *bitmap = mddev->bitmap;
  1006. unsigned long my_pages = bitmap->counts.pages;
  1007. int i, rv;
  1008. /*
  1009. * We need to ensure all the nodes can grow to a larger
  1010. * bitmap size before make the reshaping.
  1011. */
  1012. rv = update_bitmap_size(mddev, newsize);
  1013. if (rv)
  1014. return rv;
  1015. for (i = 0; i < mddev->bitmap_info.nodes; i++) {
  1016. if (i == md_cluster_ops->slot_number(mddev))
  1017. continue;
  1018. bitmap = get_bitmap_from_slot(mddev, i);
  1019. if (IS_ERR(bitmap)) {
  1020. pr_err("can't get bitmap from slot %d\n", i);
  1021. bitmap = NULL;
  1022. goto out;
  1023. }
  1024. counts = &bitmap->counts;
  1025. /*
  1026. * If we can hold the bitmap lock of one node then
  1027. * the slot is not occupied, update the pages.
  1028. */
  1029. snprintf(str, 64, "bitmap%04d", i);
  1030. bm_lockres = lockres_init(mddev, str, NULL, 1);
  1031. if (!bm_lockres) {
  1032. pr_err("Cannot initialize %s lock\n", str);
  1033. goto out;
  1034. }
  1035. bm_lockres->flags |= DLM_LKF_NOQUEUE;
  1036. rv = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
  1037. if (!rv)
  1038. counts->pages = my_pages;
  1039. lockres_free(bm_lockres);
  1040. if (my_pages != counts->pages)
  1041. /*
  1042. * Let's revert the bitmap size if one node
  1043. * can't resize bitmap
  1044. */
  1045. goto out;
  1046. md_bitmap_free(bitmap);
  1047. }
  1048. return 0;
  1049. out:
  1050. md_bitmap_free(bitmap);
  1051. update_bitmap_size(mddev, oldsize);
  1052. return -1;
  1053. }
  1054. /*
  1055. * return 0 if all the bitmaps have the same sync_size
  1056. */
  1057. static int cluster_check_sync_size(struct mddev *mddev)
  1058. {
  1059. int i, rv;
  1060. bitmap_super_t *sb;
  1061. unsigned long my_sync_size, sync_size = 0;
  1062. int node_num = mddev->bitmap_info.nodes;
  1063. int current_slot = md_cluster_ops->slot_number(mddev);
  1064. struct bitmap *bitmap = mddev->bitmap;
  1065. char str[64];
  1066. struct dlm_lock_resource *bm_lockres;
  1067. sb = kmap_atomic(bitmap->storage.sb_page);
  1068. my_sync_size = sb->sync_size;
  1069. kunmap_atomic(sb);
  1070. for (i = 0; i < node_num; i++) {
  1071. if (i == current_slot)
  1072. continue;
  1073. bitmap = get_bitmap_from_slot(mddev, i);
  1074. if (IS_ERR(bitmap)) {
  1075. pr_err("can't get bitmap from slot %d\n", i);
  1076. return -1;
  1077. }
  1078. /*
  1079. * If we can hold the bitmap lock of one node then
  1080. * the slot is not occupied, update the sb.
  1081. */
  1082. snprintf(str, 64, "bitmap%04d", i);
  1083. bm_lockres = lockres_init(mddev, str, NULL, 1);
  1084. if (!bm_lockres) {
  1085. pr_err("md-cluster: Cannot initialize %s\n", str);
  1086. md_bitmap_free(bitmap);
  1087. return -1;
  1088. }
  1089. bm_lockres->flags |= DLM_LKF_NOQUEUE;
  1090. rv = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
  1091. if (!rv)
  1092. md_bitmap_update_sb(bitmap);
  1093. lockres_free(bm_lockres);
  1094. sb = kmap_atomic(bitmap->storage.sb_page);
  1095. if (sync_size == 0)
  1096. sync_size = sb->sync_size;
  1097. else if (sync_size != sb->sync_size) {
  1098. kunmap_atomic(sb);
  1099. md_bitmap_free(bitmap);
  1100. return -1;
  1101. }
  1102. kunmap_atomic(sb);
  1103. md_bitmap_free(bitmap);
  1104. }
  1105. return (my_sync_size == sync_size) ? 0 : -1;
  1106. }
  1107. /*
  1108. * Update the size for cluster raid is a little more complex, we perform it
  1109. * by the steps:
  1110. * 1. hold token lock and update superblock in initiator node.
  1111. * 2. send METADATA_UPDATED msg to other nodes.
  1112. * 3. The initiator node continues to check each bitmap's sync_size, if all
  1113. * bitmaps have the same value of sync_size, then we can set capacity and
  1114. * let other nodes to perform it. If one node can't update sync_size
  1115. * accordingly, we need to revert to previous value.
  1116. */
  1117. static void update_size(struct mddev *mddev, sector_t old_dev_sectors)
  1118. {
  1119. struct md_cluster_info *cinfo = mddev->cluster_info;
  1120. struct cluster_msg cmsg;
  1121. struct md_rdev *rdev;
  1122. int ret = 0;
  1123. int raid_slot = -1;
  1124. md_update_sb(mddev, 1);
  1125. if (lock_comm(cinfo, 1)) {
  1126. pr_err("%s: lock_comm failed\n", __func__);
  1127. return;
  1128. }
  1129. memset(&cmsg, 0, sizeof(cmsg));
  1130. cmsg.type = cpu_to_le32(METADATA_UPDATED);
  1131. rdev_for_each(rdev, mddev)
  1132. if (rdev->raid_disk >= 0 && !test_bit(Faulty, &rdev->flags)) {
  1133. raid_slot = rdev->desc_nr;
  1134. break;
  1135. }
  1136. if (raid_slot >= 0) {
  1137. cmsg.raid_slot = cpu_to_le32(raid_slot);
  1138. /*
  1139. * We can only change capiticy after all the nodes can do it,
  1140. * so need to wait after other nodes already received the msg
  1141. * and handled the change
  1142. */
  1143. ret = __sendmsg(cinfo, &cmsg);
  1144. if (ret) {
  1145. pr_err("%s:%d: failed to send METADATA_UPDATED msg\n",
  1146. __func__, __LINE__);
  1147. unlock_comm(cinfo);
  1148. return;
  1149. }
  1150. } else {
  1151. pr_err("md-cluster: No good device id found to send\n");
  1152. unlock_comm(cinfo);
  1153. return;
  1154. }
  1155. /*
  1156. * check the sync_size from other node's bitmap, if sync_size
  1157. * have already updated in other nodes as expected, send an
  1158. * empty metadata msg to permit the change of capacity
  1159. */
  1160. if (cluster_check_sync_size(mddev) == 0) {
  1161. memset(&cmsg, 0, sizeof(cmsg));
  1162. cmsg.type = cpu_to_le32(CHANGE_CAPACITY);
  1163. ret = __sendmsg(cinfo, &cmsg);
  1164. if (ret)
  1165. pr_err("%s:%d: failed to send CHANGE_CAPACITY msg\n",
  1166. __func__, __LINE__);
  1167. set_capacity(mddev->gendisk, mddev->array_sectors);
  1168. revalidate_disk(mddev->gendisk);
  1169. } else {
  1170. /* revert to previous sectors */
  1171. ret = mddev->pers->resize(mddev, old_dev_sectors);
  1172. if (!ret)
  1173. revalidate_disk(mddev->gendisk);
  1174. ret = __sendmsg(cinfo, &cmsg);
  1175. if (ret)
  1176. pr_err("%s:%d: failed to send METADATA_UPDATED msg\n",
  1177. __func__, __LINE__);
  1178. }
  1179. unlock_comm(cinfo);
  1180. }
  1181. static int resync_start(struct mddev *mddev)
  1182. {
  1183. struct md_cluster_info *cinfo = mddev->cluster_info;
  1184. return dlm_lock_sync_interruptible(cinfo->resync_lockres, DLM_LOCK_EX, mddev);
  1185. }
  1186. static void resync_info_get(struct mddev *mddev, sector_t *lo, sector_t *hi)
  1187. {
  1188. struct md_cluster_info *cinfo = mddev->cluster_info;
  1189. spin_lock_irq(&cinfo->suspend_lock);
  1190. *lo = cinfo->suspend_lo;
  1191. *hi = cinfo->suspend_hi;
  1192. spin_unlock_irq(&cinfo->suspend_lock);
  1193. }
  1194. static int resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi)
  1195. {
  1196. struct md_cluster_info *cinfo = mddev->cluster_info;
  1197. struct resync_info ri;
  1198. struct cluster_msg cmsg = {0};
  1199. /* do not send zero again, if we have sent before */
  1200. if (hi == 0) {
  1201. memcpy(&ri, cinfo->bitmap_lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
  1202. if (le64_to_cpu(ri.hi) == 0)
  1203. return 0;
  1204. }
  1205. add_resync_info(cinfo->bitmap_lockres, lo, hi);
  1206. /* Re-acquire the lock to refresh LVB */
  1207. dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW);
  1208. cmsg.type = cpu_to_le32(RESYNCING);
  1209. cmsg.low = cpu_to_le64(lo);
  1210. cmsg.high = cpu_to_le64(hi);
  1211. /*
  1212. * mddev_lock is held if resync_info_update is called from
  1213. * resync_finish (md_reap_sync_thread -> resync_finish)
  1214. */
  1215. if (lo == 0 && hi == 0)
  1216. return sendmsg(cinfo, &cmsg, 1);
  1217. else
  1218. return sendmsg(cinfo, &cmsg, 0);
  1219. }
  1220. static int resync_finish(struct mddev *mddev)
  1221. {
  1222. struct md_cluster_info *cinfo = mddev->cluster_info;
  1223. int ret = 0;
  1224. clear_bit(MD_RESYNCING_REMOTE, &mddev->recovery);
  1225. /*
  1226. * If resync thread is interrupted so we can't say resync is finished,
  1227. * another node will launch resync thread to continue.
  1228. */
  1229. if (!test_bit(MD_CLOSING, &mddev->flags))
  1230. ret = resync_info_update(mddev, 0, 0);
  1231. dlm_unlock_sync(cinfo->resync_lockres);
  1232. return ret;
  1233. }
  1234. static int area_resyncing(struct mddev *mddev, int direction,
  1235. sector_t lo, sector_t hi)
  1236. {
  1237. struct md_cluster_info *cinfo = mddev->cluster_info;
  1238. int ret = 0;
  1239. if ((direction == READ) &&
  1240. test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state))
  1241. return 1;
  1242. spin_lock_irq(&cinfo->suspend_lock);
  1243. if (hi > cinfo->suspend_lo && lo < cinfo->suspend_hi)
  1244. ret = 1;
  1245. spin_unlock_irq(&cinfo->suspend_lock);
  1246. return ret;
  1247. }
  1248. /* add_new_disk() - initiates a disk add
  1249. * However, if this fails before writing md_update_sb(),
  1250. * add_new_disk_cancel() must be called to release token lock
  1251. */
  1252. static int add_new_disk(struct mddev *mddev, struct md_rdev *rdev)
  1253. {
  1254. struct md_cluster_info *cinfo = mddev->cluster_info;
  1255. struct cluster_msg cmsg;
  1256. int ret = 0;
  1257. struct mdp_superblock_1 *sb = page_address(rdev->sb_page);
  1258. char *uuid = sb->device_uuid;
  1259. memset(&cmsg, 0, sizeof(cmsg));
  1260. cmsg.type = cpu_to_le32(NEWDISK);
  1261. memcpy(cmsg.uuid, uuid, 16);
  1262. cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
  1263. if (lock_comm(cinfo, 1))
  1264. return -EAGAIN;
  1265. ret = __sendmsg(cinfo, &cmsg);
  1266. if (ret) {
  1267. unlock_comm(cinfo);
  1268. return ret;
  1269. }
  1270. cinfo->no_new_dev_lockres->flags |= DLM_LKF_NOQUEUE;
  1271. ret = dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_EX);
  1272. cinfo->no_new_dev_lockres->flags &= ~DLM_LKF_NOQUEUE;
  1273. /* Some node does not "see" the device */
  1274. if (ret == -EAGAIN)
  1275. ret = -ENOENT;
  1276. if (ret)
  1277. unlock_comm(cinfo);
  1278. else {
  1279. dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
  1280. /* Since MD_CHANGE_DEVS will be set in add_bound_rdev which
  1281. * will run soon after add_new_disk, the below path will be
  1282. * invoked:
  1283. * md_wakeup_thread(mddev->thread)
  1284. * -> conf->thread (raid1d)
  1285. * -> md_check_recovery -> md_update_sb
  1286. * -> metadata_update_start/finish
  1287. * MD_CLUSTER_SEND_LOCKED_ALREADY will be cleared eventually.
  1288. *
  1289. * For other failure cases, metadata_update_cancel and
  1290. * add_new_disk_cancel also clear below bit as well.
  1291. * */
  1292. set_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
  1293. wake_up(&cinfo->wait);
  1294. }
  1295. return ret;
  1296. }
  1297. static void add_new_disk_cancel(struct mddev *mddev)
  1298. {
  1299. struct md_cluster_info *cinfo = mddev->cluster_info;
  1300. clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
  1301. unlock_comm(cinfo);
  1302. }
  1303. static int new_disk_ack(struct mddev *mddev, bool ack)
  1304. {
  1305. struct md_cluster_info *cinfo = mddev->cluster_info;
  1306. if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state)) {
  1307. pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev));
  1308. return -EINVAL;
  1309. }
  1310. if (ack)
  1311. dlm_unlock_sync(cinfo->no_new_dev_lockres);
  1312. complete(&cinfo->newdisk_completion);
  1313. return 0;
  1314. }
  1315. static int remove_disk(struct mddev *mddev, struct md_rdev *rdev)
  1316. {
  1317. struct cluster_msg cmsg = {0};
  1318. struct md_cluster_info *cinfo = mddev->cluster_info;
  1319. cmsg.type = cpu_to_le32(REMOVE);
  1320. cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
  1321. return sendmsg(cinfo, &cmsg, 1);
  1322. }
  1323. static int lock_all_bitmaps(struct mddev *mddev)
  1324. {
  1325. int slot, my_slot, ret, held = 1, i = 0;
  1326. char str[64];
  1327. struct md_cluster_info *cinfo = mddev->cluster_info;
  1328. cinfo->other_bitmap_lockres =
  1329. kcalloc(mddev->bitmap_info.nodes - 1,
  1330. sizeof(struct dlm_lock_resource *), GFP_KERNEL);
  1331. if (!cinfo->other_bitmap_lockres) {
  1332. pr_err("md: can't alloc mem for other bitmap locks\n");
  1333. return 0;
  1334. }
  1335. my_slot = slot_number(mddev);
  1336. for (slot = 0; slot < mddev->bitmap_info.nodes; slot++) {
  1337. if (slot == my_slot)
  1338. continue;
  1339. memset(str, '\0', 64);
  1340. snprintf(str, 64, "bitmap%04d", slot);
  1341. cinfo->other_bitmap_lockres[i] = lockres_init(mddev, str, NULL, 1);
  1342. if (!cinfo->other_bitmap_lockres[i])
  1343. return -ENOMEM;
  1344. cinfo->other_bitmap_lockres[i]->flags |= DLM_LKF_NOQUEUE;
  1345. ret = dlm_lock_sync(cinfo->other_bitmap_lockres[i], DLM_LOCK_PW);
  1346. if (ret)
  1347. held = -1;
  1348. i++;
  1349. }
  1350. return held;
  1351. }
  1352. static void unlock_all_bitmaps(struct mddev *mddev)
  1353. {
  1354. struct md_cluster_info *cinfo = mddev->cluster_info;
  1355. int i;
  1356. /* release other node's bitmap lock if they are existed */
  1357. if (cinfo->other_bitmap_lockres) {
  1358. for (i = 0; i < mddev->bitmap_info.nodes - 1; i++) {
  1359. if (cinfo->other_bitmap_lockres[i]) {
  1360. lockres_free(cinfo->other_bitmap_lockres[i]);
  1361. }
  1362. }
  1363. kfree(cinfo->other_bitmap_lockres);
  1364. cinfo->other_bitmap_lockres = NULL;
  1365. }
  1366. }
  1367. static int gather_bitmaps(struct md_rdev *rdev)
  1368. {
  1369. int sn, err;
  1370. sector_t lo, hi;
  1371. struct cluster_msg cmsg = {0};
  1372. struct mddev *mddev = rdev->mddev;
  1373. struct md_cluster_info *cinfo = mddev->cluster_info;
  1374. cmsg.type = cpu_to_le32(RE_ADD);
  1375. cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
  1376. err = sendmsg(cinfo, &cmsg, 1);
  1377. if (err)
  1378. goto out;
  1379. for (sn = 0; sn < mddev->bitmap_info.nodes; sn++) {
  1380. if (sn == (cinfo->slot_number - 1))
  1381. continue;
  1382. err = md_bitmap_copy_from_slot(mddev, sn, &lo, &hi, false);
  1383. if (err) {
  1384. pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn);
  1385. goto out;
  1386. }
  1387. if ((hi > 0) && (lo < mddev->recovery_cp))
  1388. mddev->recovery_cp = lo;
  1389. }
  1390. out:
  1391. return err;
  1392. }
  1393. static struct md_cluster_operations cluster_ops = {
  1394. .join = join,
  1395. .leave = leave,
  1396. .slot_number = slot_number,
  1397. .resync_start = resync_start,
  1398. .resync_finish = resync_finish,
  1399. .resync_info_update = resync_info_update,
  1400. .resync_info_get = resync_info_get,
  1401. .metadata_update_start = metadata_update_start,
  1402. .metadata_update_finish = metadata_update_finish,
  1403. .metadata_update_cancel = metadata_update_cancel,
  1404. .area_resyncing = area_resyncing,
  1405. .add_new_disk = add_new_disk,
  1406. .add_new_disk_cancel = add_new_disk_cancel,
  1407. .new_disk_ack = new_disk_ack,
  1408. .remove_disk = remove_disk,
  1409. .load_bitmaps = load_bitmaps,
  1410. .gather_bitmaps = gather_bitmaps,
  1411. .resize_bitmaps = resize_bitmaps,
  1412. .lock_all_bitmaps = lock_all_bitmaps,
  1413. .unlock_all_bitmaps = unlock_all_bitmaps,
  1414. .update_size = update_size,
  1415. };
  1416. static int __init cluster_init(void)
  1417. {
  1418. pr_warn("md-cluster: support raid1 and raid10 (limited support)\n");
  1419. pr_info("Registering Cluster MD functions\n");
  1420. register_md_cluster_operations(&cluster_ops, THIS_MODULE);
  1421. return 0;
  1422. }
  1423. static void cluster_exit(void)
  1424. {
  1425. unregister_md_cluster_operations();
  1426. }
  1427. module_init(cluster_init);
  1428. module_exit(cluster_exit);
  1429. MODULE_AUTHOR("SUSE");
  1430. MODULE_LICENSE("GPL");
  1431. MODULE_DESCRIPTION("Clustering support for MD");