md-cluster.c 41 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508
  1. /*
  2. * Copyright (C) 2015, SUSE
  3. *
  4. * This program is free software; you can redistribute it and/or modify
  5. * it under the terms of the GNU General Public License as published by
  6. * the Free Software Foundation; either version 2, or (at your option)
  7. * any later version.
  8. *
  9. */
  10. #include <linux/module.h>
  11. #include <linux/kthread.h>
  12. #include <linux/dlm.h>
  13. #include <linux/sched.h>
  14. #include <linux/raid/md_p.h>
  15. #include "md.h"
  16. #include "bitmap.h"
  17. #include "md-cluster.h"
  18. #define LVB_SIZE 64
  19. #define NEW_DEV_TIMEOUT 5000
  20. struct dlm_lock_resource {
  21. dlm_lockspace_t *ls;
  22. struct dlm_lksb lksb;
  23. char *name; /* lock name. */
  24. uint32_t flags; /* flags to pass to dlm_lock() */
  25. wait_queue_head_t sync_locking; /* wait queue for synchronized locking */
  26. bool sync_locking_done;
  27. void (*bast)(void *arg, int mode); /* blocking AST function pointer*/
  28. struct mddev *mddev; /* pointing back to mddev. */
  29. int mode;
  30. };
  31. struct suspend_info {
  32. int slot;
  33. sector_t lo;
  34. sector_t hi;
  35. struct list_head list;
  36. };
  37. struct resync_info {
  38. __le64 lo;
  39. __le64 hi;
  40. };
  41. /* md_cluster_info flags */
  42. #define MD_CLUSTER_WAITING_FOR_NEWDISK 1
  43. #define MD_CLUSTER_SUSPEND_READ_BALANCING 2
  44. #define MD_CLUSTER_BEGIN_JOIN_CLUSTER 3
  45. /* Lock the send communication. This is done through
  46. * bit manipulation as opposed to a mutex in order to
  47. * accomodate lock and hold. See next comment.
  48. */
  49. #define MD_CLUSTER_SEND_LOCK 4
  50. /* If cluster operations (such as adding a disk) must lock the
  51. * communication channel, so as to perform extra operations
  52. * (update metadata) and no other operation is allowed on the
  53. * MD. Token needs to be locked and held until the operation
  54. * completes witha md_update_sb(), which would eventually release
  55. * the lock.
  56. */
  57. #define MD_CLUSTER_SEND_LOCKED_ALREADY 5
  58. /* We should receive message after node joined cluster and
  59. * set up all the related infos such as bitmap and personality */
  60. #define MD_CLUSTER_ALREADY_IN_CLUSTER 6
  61. #define MD_CLUSTER_PENDING_RECV_EVENT 7
  62. #define MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD 8
  63. struct md_cluster_info {
  64. struct mddev *mddev; /* the md device which md_cluster_info belongs to */
  65. /* dlm lock space and resources for clustered raid. */
  66. dlm_lockspace_t *lockspace;
  67. int slot_number;
  68. struct completion completion;
  69. struct mutex recv_mutex;
  70. struct dlm_lock_resource *bitmap_lockres;
  71. struct dlm_lock_resource **other_bitmap_lockres;
  72. struct dlm_lock_resource *resync_lockres;
  73. struct list_head suspend_list;
  74. spinlock_t suspend_lock;
  75. struct md_thread *recovery_thread;
  76. unsigned long recovery_map;
  77. /* communication loc resources */
  78. struct dlm_lock_resource *ack_lockres;
  79. struct dlm_lock_resource *message_lockres;
  80. struct dlm_lock_resource *token_lockres;
  81. struct dlm_lock_resource *no_new_dev_lockres;
  82. struct md_thread *recv_thread;
  83. struct completion newdisk_completion;
  84. wait_queue_head_t wait;
  85. unsigned long state;
  86. /* record the region in RESYNCING message */
  87. sector_t sync_low;
  88. sector_t sync_hi;
  89. };
  90. enum msg_type {
  91. METADATA_UPDATED = 0,
  92. RESYNCING,
  93. NEWDISK,
  94. REMOVE,
  95. RE_ADD,
  96. BITMAP_NEEDS_SYNC,
  97. CHANGE_CAPACITY,
  98. };
  99. struct cluster_msg {
  100. __le32 type;
  101. __le32 slot;
  102. /* TODO: Unionize this for smaller footprint */
  103. __le64 low;
  104. __le64 high;
  105. char uuid[16];
  106. __le32 raid_slot;
  107. };
  108. static void sync_ast(void *arg)
  109. {
  110. struct dlm_lock_resource *res;
  111. res = arg;
  112. res->sync_locking_done = true;
  113. wake_up(&res->sync_locking);
  114. }
  115. static int dlm_lock_sync(struct dlm_lock_resource *res, int mode)
  116. {
  117. int ret = 0;
  118. ret = dlm_lock(res->ls, mode, &res->lksb,
  119. res->flags, res->name, strlen(res->name),
  120. 0, sync_ast, res, res->bast);
  121. if (ret)
  122. return ret;
  123. wait_event(res->sync_locking, res->sync_locking_done);
  124. res->sync_locking_done = false;
  125. if (res->lksb.sb_status == 0)
  126. res->mode = mode;
  127. return res->lksb.sb_status;
  128. }
  129. static int dlm_unlock_sync(struct dlm_lock_resource *res)
  130. {
  131. return dlm_lock_sync(res, DLM_LOCK_NL);
  132. }
  133. /*
  134. * An variation of dlm_lock_sync, which make lock request could
  135. * be interrupted
  136. */
  137. static int dlm_lock_sync_interruptible(struct dlm_lock_resource *res, int mode,
  138. struct mddev *mddev)
  139. {
  140. int ret = 0;
  141. ret = dlm_lock(res->ls, mode, &res->lksb,
  142. res->flags, res->name, strlen(res->name),
  143. 0, sync_ast, res, res->bast);
  144. if (ret)
  145. return ret;
  146. wait_event(res->sync_locking, res->sync_locking_done
  147. || kthread_should_stop()
  148. || test_bit(MD_CLOSING, &mddev->flags));
  149. if (!res->sync_locking_done) {
  150. /*
  151. * the convert queue contains the lock request when request is
  152. * interrupted, and sync_ast could still be run, so need to
  153. * cancel the request and reset completion
  154. */
  155. ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_CANCEL,
  156. &res->lksb, res);
  157. res->sync_locking_done = false;
  158. if (unlikely(ret != 0))
  159. pr_info("failed to cancel previous lock request "
  160. "%s return %d\n", res->name, ret);
  161. return -EPERM;
  162. } else
  163. res->sync_locking_done = false;
  164. if (res->lksb.sb_status == 0)
  165. res->mode = mode;
  166. return res->lksb.sb_status;
  167. }
  168. static struct dlm_lock_resource *lockres_init(struct mddev *mddev,
  169. char *name, void (*bastfn)(void *arg, int mode), int with_lvb)
  170. {
  171. struct dlm_lock_resource *res = NULL;
  172. int ret, namelen;
  173. struct md_cluster_info *cinfo = mddev->cluster_info;
  174. res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
  175. if (!res)
  176. return NULL;
  177. init_waitqueue_head(&res->sync_locking);
  178. res->sync_locking_done = false;
  179. res->ls = cinfo->lockspace;
  180. res->mddev = mddev;
  181. res->mode = DLM_LOCK_IV;
  182. namelen = strlen(name);
  183. res->name = kzalloc(namelen + 1, GFP_KERNEL);
  184. if (!res->name) {
  185. pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name);
  186. goto out_err;
  187. }
  188. strlcpy(res->name, name, namelen + 1);
  189. if (with_lvb) {
  190. res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL);
  191. if (!res->lksb.sb_lvbptr) {
  192. pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name);
  193. goto out_err;
  194. }
  195. res->flags = DLM_LKF_VALBLK;
  196. }
  197. if (bastfn)
  198. res->bast = bastfn;
  199. res->flags |= DLM_LKF_EXPEDITE;
  200. ret = dlm_lock_sync(res, DLM_LOCK_NL);
  201. if (ret) {
  202. pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name);
  203. goto out_err;
  204. }
  205. res->flags &= ~DLM_LKF_EXPEDITE;
  206. res->flags |= DLM_LKF_CONVERT;
  207. return res;
  208. out_err:
  209. kfree(res->lksb.sb_lvbptr);
  210. kfree(res->name);
  211. kfree(res);
  212. return NULL;
  213. }
  214. static void lockres_free(struct dlm_lock_resource *res)
  215. {
  216. int ret = 0;
  217. if (!res)
  218. return;
  219. /*
  220. * use FORCEUNLOCK flag, so we can unlock even the lock is on the
  221. * waiting or convert queue
  222. */
  223. ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_FORCEUNLOCK,
  224. &res->lksb, res);
  225. if (unlikely(ret != 0))
  226. pr_err("failed to unlock %s return %d\n", res->name, ret);
  227. else
  228. wait_event(res->sync_locking, res->sync_locking_done);
  229. kfree(res->name);
  230. kfree(res->lksb.sb_lvbptr);
  231. kfree(res);
  232. }
  233. static void add_resync_info(struct dlm_lock_resource *lockres,
  234. sector_t lo, sector_t hi)
  235. {
  236. struct resync_info *ri;
  237. ri = (struct resync_info *)lockres->lksb.sb_lvbptr;
  238. ri->lo = cpu_to_le64(lo);
  239. ri->hi = cpu_to_le64(hi);
  240. }
  241. static struct suspend_info *read_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres)
  242. {
  243. struct resync_info ri;
  244. struct suspend_info *s = NULL;
  245. sector_t hi = 0;
  246. dlm_lock_sync(lockres, DLM_LOCK_CR);
  247. memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
  248. hi = le64_to_cpu(ri.hi);
  249. if (hi > 0) {
  250. s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
  251. if (!s)
  252. goto out;
  253. s->hi = hi;
  254. s->lo = le64_to_cpu(ri.lo);
  255. }
  256. dlm_unlock_sync(lockres);
  257. out:
  258. return s;
  259. }
  260. static void recover_bitmaps(struct md_thread *thread)
  261. {
  262. struct mddev *mddev = thread->mddev;
  263. struct md_cluster_info *cinfo = mddev->cluster_info;
  264. struct dlm_lock_resource *bm_lockres;
  265. char str[64];
  266. int slot, ret;
  267. struct suspend_info *s, *tmp;
  268. sector_t lo, hi;
  269. while (cinfo->recovery_map) {
  270. slot = fls64((u64)cinfo->recovery_map) - 1;
  271. snprintf(str, 64, "bitmap%04d", slot);
  272. bm_lockres = lockres_init(mddev, str, NULL, 1);
  273. if (!bm_lockres) {
  274. pr_err("md-cluster: Cannot initialize bitmaps\n");
  275. goto clear_bit;
  276. }
  277. ret = dlm_lock_sync_interruptible(bm_lockres, DLM_LOCK_PW, mddev);
  278. if (ret) {
  279. pr_err("md-cluster: Could not DLM lock %s: %d\n",
  280. str, ret);
  281. goto clear_bit;
  282. }
  283. ret = bitmap_copy_from_slot(mddev, slot, &lo, &hi, true);
  284. if (ret) {
  285. pr_err("md-cluster: Could not copy data from bitmap %d\n", slot);
  286. goto clear_bit;
  287. }
  288. /* Clear suspend_area associated with the bitmap */
  289. spin_lock_irq(&cinfo->suspend_lock);
  290. list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list)
  291. if (slot == s->slot) {
  292. list_del(&s->list);
  293. kfree(s);
  294. }
  295. spin_unlock_irq(&cinfo->suspend_lock);
  296. if (hi > 0) {
  297. if (lo < mddev->recovery_cp)
  298. mddev->recovery_cp = lo;
  299. /* wake up thread to continue resync in case resync
  300. * is not finished */
  301. if (mddev->recovery_cp != MaxSector) {
  302. set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
  303. md_wakeup_thread(mddev->thread);
  304. }
  305. }
  306. clear_bit:
  307. lockres_free(bm_lockres);
  308. clear_bit(slot, &cinfo->recovery_map);
  309. }
  310. }
  311. static void recover_prep(void *arg)
  312. {
  313. struct mddev *mddev = arg;
  314. struct md_cluster_info *cinfo = mddev->cluster_info;
  315. set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
  316. }
  317. static void __recover_slot(struct mddev *mddev, int slot)
  318. {
  319. struct md_cluster_info *cinfo = mddev->cluster_info;
  320. set_bit(slot, &cinfo->recovery_map);
  321. if (!cinfo->recovery_thread) {
  322. cinfo->recovery_thread = md_register_thread(recover_bitmaps,
  323. mddev, "recover");
  324. if (!cinfo->recovery_thread) {
  325. pr_warn("md-cluster: Could not create recovery thread\n");
  326. return;
  327. }
  328. }
  329. md_wakeup_thread(cinfo->recovery_thread);
  330. }
  331. static void recover_slot(void *arg, struct dlm_slot *slot)
  332. {
  333. struct mddev *mddev = arg;
  334. struct md_cluster_info *cinfo = mddev->cluster_info;
  335. pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
  336. mddev->bitmap_info.cluster_name,
  337. slot->nodeid, slot->slot,
  338. cinfo->slot_number);
  339. /* deduct one since dlm slot starts from one while the num of
  340. * cluster-md begins with 0 */
  341. __recover_slot(mddev, slot->slot - 1);
  342. }
  343. static void recover_done(void *arg, struct dlm_slot *slots,
  344. int num_slots, int our_slot,
  345. uint32_t generation)
  346. {
  347. struct mddev *mddev = arg;
  348. struct md_cluster_info *cinfo = mddev->cluster_info;
  349. cinfo->slot_number = our_slot;
  350. /* completion is only need to be complete when node join cluster,
  351. * it doesn't need to run during another node's failure */
  352. if (test_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state)) {
  353. complete(&cinfo->completion);
  354. clear_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
  355. }
  356. clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
  357. }
  358. /* the ops is called when node join the cluster, and do lock recovery
  359. * if node failure occurs */
  360. static const struct dlm_lockspace_ops md_ls_ops = {
  361. .recover_prep = recover_prep,
  362. .recover_slot = recover_slot,
  363. .recover_done = recover_done,
  364. };
  365. /*
  366. * The BAST function for the ack lock resource
  367. * This function wakes up the receive thread in
  368. * order to receive and process the message.
  369. */
  370. static void ack_bast(void *arg, int mode)
  371. {
  372. struct dlm_lock_resource *res = arg;
  373. struct md_cluster_info *cinfo = res->mddev->cluster_info;
  374. if (mode == DLM_LOCK_EX) {
  375. if (test_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state))
  376. md_wakeup_thread(cinfo->recv_thread);
  377. else
  378. set_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state);
  379. }
  380. }
  381. static void __remove_suspend_info(struct md_cluster_info *cinfo, int slot)
  382. {
  383. struct suspend_info *s, *tmp;
  384. list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list)
  385. if (slot == s->slot) {
  386. list_del(&s->list);
  387. kfree(s);
  388. break;
  389. }
  390. }
  391. static void remove_suspend_info(struct mddev *mddev, int slot)
  392. {
  393. struct md_cluster_info *cinfo = mddev->cluster_info;
  394. mddev->pers->quiesce(mddev, 1);
  395. spin_lock_irq(&cinfo->suspend_lock);
  396. __remove_suspend_info(cinfo, slot);
  397. spin_unlock_irq(&cinfo->suspend_lock);
  398. mddev->pers->quiesce(mddev, 0);
  399. }
  400. static void process_suspend_info(struct mddev *mddev,
  401. int slot, sector_t lo, sector_t hi)
  402. {
  403. struct md_cluster_info *cinfo = mddev->cluster_info;
  404. struct suspend_info *s;
  405. if (!hi) {
  406. remove_suspend_info(mddev, slot);
  407. set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
  408. md_wakeup_thread(mddev->thread);
  409. return;
  410. }
  411. /*
  412. * The bitmaps are not same for different nodes
  413. * if RESYNCING is happening in one node, then
  414. * the node which received the RESYNCING message
  415. * probably will perform resync with the region
  416. * [lo, hi] again, so we could reduce resync time
  417. * a lot if we can ensure that the bitmaps among
  418. * different nodes are match up well.
  419. *
  420. * sync_low/hi is used to record the region which
  421. * arrived in the previous RESYNCING message,
  422. *
  423. * Call bitmap_sync_with_cluster to clear
  424. * NEEDED_MASK and set RESYNC_MASK since
  425. * resync thread is running in another node,
  426. * so we don't need to do the resync again
  427. * with the same section */
  428. bitmap_sync_with_cluster(mddev, cinfo->sync_low,
  429. cinfo->sync_hi,
  430. lo, hi);
  431. cinfo->sync_low = lo;
  432. cinfo->sync_hi = hi;
  433. s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
  434. if (!s)
  435. return;
  436. s->slot = slot;
  437. s->lo = lo;
  438. s->hi = hi;
  439. mddev->pers->quiesce(mddev, 1);
  440. spin_lock_irq(&cinfo->suspend_lock);
  441. /* Remove existing entry (if exists) before adding */
  442. __remove_suspend_info(cinfo, slot);
  443. list_add(&s->list, &cinfo->suspend_list);
  444. spin_unlock_irq(&cinfo->suspend_lock);
  445. mddev->pers->quiesce(mddev, 0);
  446. }
  447. static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg)
  448. {
  449. char disk_uuid[64];
  450. struct md_cluster_info *cinfo = mddev->cluster_info;
  451. char event_name[] = "EVENT=ADD_DEVICE";
  452. char raid_slot[16];
  453. char *envp[] = {event_name, disk_uuid, raid_slot, NULL};
  454. int len;
  455. len = snprintf(disk_uuid, 64, "DEVICE_UUID=");
  456. sprintf(disk_uuid + len, "%pU", cmsg->uuid);
  457. snprintf(raid_slot, 16, "RAID_DISK=%d", le32_to_cpu(cmsg->raid_slot));
  458. pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot);
  459. init_completion(&cinfo->newdisk_completion);
  460. set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
  461. kobject_uevent_env(&disk_to_dev(mddev->gendisk)->kobj, KOBJ_CHANGE, envp);
  462. wait_for_completion_timeout(&cinfo->newdisk_completion,
  463. NEW_DEV_TIMEOUT);
  464. clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
  465. }
  466. static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg)
  467. {
  468. int got_lock = 0;
  469. struct md_cluster_info *cinfo = mddev->cluster_info;
  470. mddev->good_device_nr = le32_to_cpu(msg->raid_slot);
  471. dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
  472. wait_event(mddev->thread->wqueue,
  473. (got_lock = mddev_trylock(mddev)) ||
  474. test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state));
  475. md_reload_sb(mddev, mddev->good_device_nr);
  476. if (got_lock)
  477. mddev_unlock(mddev);
  478. }
  479. static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg)
  480. {
  481. struct md_rdev *rdev;
  482. rcu_read_lock();
  483. rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot));
  484. if (rdev) {
  485. set_bit(ClusterRemove, &rdev->flags);
  486. set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
  487. md_wakeup_thread(mddev->thread);
  488. }
  489. else
  490. pr_warn("%s: %d Could not find disk(%d) to REMOVE\n",
  491. __func__, __LINE__, le32_to_cpu(msg->raid_slot));
  492. rcu_read_unlock();
  493. }
  494. static void process_readd_disk(struct mddev *mddev, struct cluster_msg *msg)
  495. {
  496. struct md_rdev *rdev;
  497. rcu_read_lock();
  498. rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot));
  499. if (rdev && test_bit(Faulty, &rdev->flags))
  500. clear_bit(Faulty, &rdev->flags);
  501. else
  502. pr_warn("%s: %d Could not find disk(%d) which is faulty",
  503. __func__, __LINE__, le32_to_cpu(msg->raid_slot));
  504. rcu_read_unlock();
  505. }
  506. static int process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg)
  507. {
  508. int ret = 0;
  509. if (WARN(mddev->cluster_info->slot_number - 1 == le32_to_cpu(msg->slot),
  510. "node %d received it's own msg\n", le32_to_cpu(msg->slot)))
  511. return -1;
  512. switch (le32_to_cpu(msg->type)) {
  513. case METADATA_UPDATED:
  514. process_metadata_update(mddev, msg);
  515. break;
  516. case CHANGE_CAPACITY:
  517. set_capacity(mddev->gendisk, mddev->array_sectors);
  518. revalidate_disk(mddev->gendisk);
  519. break;
  520. case RESYNCING:
  521. process_suspend_info(mddev, le32_to_cpu(msg->slot),
  522. le64_to_cpu(msg->low),
  523. le64_to_cpu(msg->high));
  524. break;
  525. case NEWDISK:
  526. process_add_new_disk(mddev, msg);
  527. break;
  528. case REMOVE:
  529. process_remove_disk(mddev, msg);
  530. break;
  531. case RE_ADD:
  532. process_readd_disk(mddev, msg);
  533. break;
  534. case BITMAP_NEEDS_SYNC:
  535. __recover_slot(mddev, le32_to_cpu(msg->slot));
  536. break;
  537. default:
  538. ret = -1;
  539. pr_warn("%s:%d Received unknown message from %d\n",
  540. __func__, __LINE__, msg->slot);
  541. }
  542. return ret;
  543. }
  544. /*
  545. * thread for receiving message
  546. */
  547. static void recv_daemon(struct md_thread *thread)
  548. {
  549. struct md_cluster_info *cinfo = thread->mddev->cluster_info;
  550. struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres;
  551. struct dlm_lock_resource *message_lockres = cinfo->message_lockres;
  552. struct cluster_msg msg;
  553. int ret;
  554. mutex_lock(&cinfo->recv_mutex);
  555. /*get CR on Message*/
  556. if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) {
  557. pr_err("md/raid1:failed to get CR on MESSAGE\n");
  558. mutex_unlock(&cinfo->recv_mutex);
  559. return;
  560. }
  561. /* read lvb and wake up thread to process this message_lockres */
  562. memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg));
  563. ret = process_recvd_msg(thread->mddev, &msg);
  564. if (ret)
  565. goto out;
  566. /*release CR on ack_lockres*/
  567. ret = dlm_unlock_sync(ack_lockres);
  568. if (unlikely(ret != 0))
  569. pr_info("unlock ack failed return %d\n", ret);
  570. /*up-convert to PR on message_lockres*/
  571. ret = dlm_lock_sync(message_lockres, DLM_LOCK_PR);
  572. if (unlikely(ret != 0))
  573. pr_info("lock PR on msg failed return %d\n", ret);
  574. /*get CR on ack_lockres again*/
  575. ret = dlm_lock_sync(ack_lockres, DLM_LOCK_CR);
  576. if (unlikely(ret != 0))
  577. pr_info("lock CR on ack failed return %d\n", ret);
  578. out:
  579. /*release CR on message_lockres*/
  580. ret = dlm_unlock_sync(message_lockres);
  581. if (unlikely(ret != 0))
  582. pr_info("unlock msg failed return %d\n", ret);
  583. mutex_unlock(&cinfo->recv_mutex);
  584. }
  585. /* lock_token()
  586. * Takes the lock on the TOKEN lock resource so no other
  587. * node can communicate while the operation is underway.
  588. */
  589. static int lock_token(struct md_cluster_info *cinfo)
  590. {
  591. int error;
  592. error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
  593. if (error) {
  594. pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n",
  595. __func__, __LINE__, error);
  596. } else {
  597. /* Lock the receive sequence */
  598. mutex_lock(&cinfo->recv_mutex);
  599. }
  600. return error;
  601. }
  602. /* lock_comm()
  603. * Sets the MD_CLUSTER_SEND_LOCK bit to lock the send channel.
  604. */
  605. static int lock_comm(struct md_cluster_info *cinfo, bool mddev_locked)
  606. {
  607. int rv, set_bit = 0;
  608. struct mddev *mddev = cinfo->mddev;
  609. /*
  610. * If resync thread run after raid1d thread, then process_metadata_update
  611. * could not continue if raid1d held reconfig_mutex (and raid1d is blocked
  612. * since another node already got EX on Token and waitting the EX of Ack),
  613. * so let resync wake up thread in case flag is set.
  614. */
  615. if (mddev_locked && !test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
  616. &cinfo->state)) {
  617. rv = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
  618. &cinfo->state);
  619. WARN_ON_ONCE(rv);
  620. md_wakeup_thread(mddev->thread);
  621. set_bit = 1;
  622. }
  623. wait_event(cinfo->wait,
  624. !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state));
  625. rv = lock_token(cinfo);
  626. if (set_bit)
  627. clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
  628. return rv;
  629. }
  630. static void unlock_comm(struct md_cluster_info *cinfo)
  631. {
  632. WARN_ON(cinfo->token_lockres->mode != DLM_LOCK_EX);
  633. mutex_unlock(&cinfo->recv_mutex);
  634. dlm_unlock_sync(cinfo->token_lockres);
  635. clear_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state);
  636. wake_up(&cinfo->wait);
  637. }
  638. /* __sendmsg()
  639. * This function performs the actual sending of the message. This function is
  640. * usually called after performing the encompassing operation
  641. * The function:
  642. * 1. Grabs the message lockresource in EX mode
  643. * 2. Copies the message to the message LVB
  644. * 3. Downconverts message lockresource to CW
  645. * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes
  646. * and the other nodes read the message. The thread will wait here until all other
  647. * nodes have released ack lock resource.
  648. * 5. Downconvert ack lockresource to CR
  649. */
  650. static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
  651. {
  652. int error;
  653. int slot = cinfo->slot_number - 1;
  654. cmsg->slot = cpu_to_le32(slot);
  655. /*get EX on Message*/
  656. error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX);
  657. if (error) {
  658. pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error);
  659. goto failed_message;
  660. }
  661. memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg,
  662. sizeof(struct cluster_msg));
  663. /*down-convert EX to CW on Message*/
  664. error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CW);
  665. if (error) {
  666. pr_err("md-cluster: failed to convert EX to CW on MESSAGE(%d)\n",
  667. error);
  668. goto failed_ack;
  669. }
  670. /*up-convert CR to EX on Ack*/
  671. error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX);
  672. if (error) {
  673. pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n",
  674. error);
  675. goto failed_ack;
  676. }
  677. /*down-convert EX to CR on Ack*/
  678. error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR);
  679. if (error) {
  680. pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n",
  681. error);
  682. goto failed_ack;
  683. }
  684. failed_ack:
  685. error = dlm_unlock_sync(cinfo->message_lockres);
  686. if (unlikely(error != 0)) {
  687. pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n",
  688. error);
  689. /* in case the message can't be released due to some reason */
  690. goto failed_ack;
  691. }
  692. failed_message:
  693. return error;
  694. }
  695. static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg,
  696. bool mddev_locked)
  697. {
  698. int ret;
  699. ret = lock_comm(cinfo, mddev_locked);
  700. if (!ret) {
  701. ret = __sendmsg(cinfo, cmsg);
  702. unlock_comm(cinfo);
  703. }
  704. return ret;
  705. }
  706. static int gather_all_resync_info(struct mddev *mddev, int total_slots)
  707. {
  708. struct md_cluster_info *cinfo = mddev->cluster_info;
  709. int i, ret = 0;
  710. struct dlm_lock_resource *bm_lockres;
  711. struct suspend_info *s;
  712. char str[64];
  713. sector_t lo, hi;
  714. for (i = 0; i < total_slots; i++) {
  715. memset(str, '\0', 64);
  716. snprintf(str, 64, "bitmap%04d", i);
  717. bm_lockres = lockres_init(mddev, str, NULL, 1);
  718. if (!bm_lockres)
  719. return -ENOMEM;
  720. if (i == (cinfo->slot_number - 1)) {
  721. lockres_free(bm_lockres);
  722. continue;
  723. }
  724. bm_lockres->flags |= DLM_LKF_NOQUEUE;
  725. ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
  726. if (ret == -EAGAIN) {
  727. s = read_resync_info(mddev, bm_lockres);
  728. if (s) {
  729. pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
  730. __func__, __LINE__,
  731. (unsigned long long) s->lo,
  732. (unsigned long long) s->hi, i);
  733. spin_lock_irq(&cinfo->suspend_lock);
  734. s->slot = i;
  735. list_add(&s->list, &cinfo->suspend_list);
  736. spin_unlock_irq(&cinfo->suspend_lock);
  737. }
  738. ret = 0;
  739. lockres_free(bm_lockres);
  740. continue;
  741. }
  742. if (ret) {
  743. lockres_free(bm_lockres);
  744. goto out;
  745. }
  746. /* Read the disk bitmap sb and check if it needs recovery */
  747. ret = bitmap_copy_from_slot(mddev, i, &lo, &hi, false);
  748. if (ret) {
  749. pr_warn("md-cluster: Could not gather bitmaps from slot %d", i);
  750. lockres_free(bm_lockres);
  751. continue;
  752. }
  753. if ((hi > 0) && (lo < mddev->recovery_cp)) {
  754. set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
  755. mddev->recovery_cp = lo;
  756. md_check_recovery(mddev);
  757. }
  758. lockres_free(bm_lockres);
  759. }
  760. out:
  761. return ret;
  762. }
  763. static int join(struct mddev *mddev, int nodes)
  764. {
  765. struct md_cluster_info *cinfo;
  766. int ret, ops_rv;
  767. char str[64];
  768. cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL);
  769. if (!cinfo)
  770. return -ENOMEM;
  771. INIT_LIST_HEAD(&cinfo->suspend_list);
  772. spin_lock_init(&cinfo->suspend_lock);
  773. init_completion(&cinfo->completion);
  774. set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
  775. init_waitqueue_head(&cinfo->wait);
  776. mutex_init(&cinfo->recv_mutex);
  777. mddev->cluster_info = cinfo;
  778. cinfo->mddev = mddev;
  779. memset(str, 0, 64);
  780. sprintf(str, "%pU", mddev->uuid);
  781. ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name,
  782. DLM_LSFL_FS, LVB_SIZE,
  783. &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace);
  784. if (ret)
  785. goto err;
  786. wait_for_completion(&cinfo->completion);
  787. if (nodes < cinfo->slot_number) {
  788. pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).",
  789. cinfo->slot_number, nodes);
  790. ret = -ERANGE;
  791. goto err;
  792. }
  793. /* Initiate the communication resources */
  794. ret = -ENOMEM;
  795. cinfo->recv_thread = md_register_thread(recv_daemon, mddev, "cluster_recv");
  796. if (!cinfo->recv_thread) {
  797. pr_err("md-cluster: cannot allocate memory for recv_thread!\n");
  798. goto err;
  799. }
  800. cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1);
  801. if (!cinfo->message_lockres)
  802. goto err;
  803. cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0);
  804. if (!cinfo->token_lockres)
  805. goto err;
  806. cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0);
  807. if (!cinfo->no_new_dev_lockres)
  808. goto err;
  809. ret = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
  810. if (ret) {
  811. ret = -EAGAIN;
  812. pr_err("md-cluster: can't join cluster to avoid lock issue\n");
  813. goto err;
  814. }
  815. cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0);
  816. if (!cinfo->ack_lockres) {
  817. ret = -ENOMEM;
  818. goto err;
  819. }
  820. /* get sync CR lock on ACK. */
  821. if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR))
  822. pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n",
  823. ret);
  824. dlm_unlock_sync(cinfo->token_lockres);
  825. /* get sync CR lock on no-new-dev. */
  826. if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR))
  827. pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret);
  828. pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number);
  829. snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1);
  830. cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1);
  831. if (!cinfo->bitmap_lockres) {
  832. ret = -ENOMEM;
  833. goto err;
  834. }
  835. if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) {
  836. pr_err("Failed to get bitmap lock\n");
  837. ret = -EINVAL;
  838. goto err;
  839. }
  840. cinfo->resync_lockres = lockres_init(mddev, "resync", NULL, 0);
  841. if (!cinfo->resync_lockres) {
  842. ret = -ENOMEM;
  843. goto err;
  844. }
  845. return 0;
  846. err:
  847. set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
  848. md_unregister_thread(&cinfo->recovery_thread);
  849. md_unregister_thread(&cinfo->recv_thread);
  850. lockres_free(cinfo->message_lockres);
  851. lockres_free(cinfo->token_lockres);
  852. lockres_free(cinfo->ack_lockres);
  853. lockres_free(cinfo->no_new_dev_lockres);
  854. lockres_free(cinfo->resync_lockres);
  855. lockres_free(cinfo->bitmap_lockres);
  856. if (cinfo->lockspace)
  857. dlm_release_lockspace(cinfo->lockspace, 2);
  858. mddev->cluster_info = NULL;
  859. kfree(cinfo);
  860. return ret;
  861. }
  862. static void load_bitmaps(struct mddev *mddev, int total_slots)
  863. {
  864. struct md_cluster_info *cinfo = mddev->cluster_info;
  865. /* load all the node's bitmap info for resync */
  866. if (gather_all_resync_info(mddev, total_slots))
  867. pr_err("md-cluster: failed to gather all resyn infos\n");
  868. set_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state);
  869. /* wake up recv thread in case something need to be handled */
  870. if (test_and_clear_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state))
  871. md_wakeup_thread(cinfo->recv_thread);
  872. }
  873. static void resync_bitmap(struct mddev *mddev)
  874. {
  875. struct md_cluster_info *cinfo = mddev->cluster_info;
  876. struct cluster_msg cmsg = {0};
  877. int err;
  878. cmsg.type = cpu_to_le32(BITMAP_NEEDS_SYNC);
  879. err = sendmsg(cinfo, &cmsg, 1);
  880. if (err)
  881. pr_err("%s:%d: failed to send BITMAP_NEEDS_SYNC message (%d)\n",
  882. __func__, __LINE__, err);
  883. }
  884. static void unlock_all_bitmaps(struct mddev *mddev);
  885. static int leave(struct mddev *mddev)
  886. {
  887. struct md_cluster_info *cinfo = mddev->cluster_info;
  888. if (!cinfo)
  889. return 0;
  890. /* BITMAP_NEEDS_SYNC message should be sent when node
  891. * is leaving the cluster with dirty bitmap, also we
  892. * can only deliver it when dlm connection is available */
  893. if (cinfo->slot_number > 0 && mddev->recovery_cp != MaxSector)
  894. resync_bitmap(mddev);
  895. set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
  896. md_unregister_thread(&cinfo->recovery_thread);
  897. md_unregister_thread(&cinfo->recv_thread);
  898. lockres_free(cinfo->message_lockres);
  899. lockres_free(cinfo->token_lockres);
  900. lockres_free(cinfo->ack_lockres);
  901. lockres_free(cinfo->no_new_dev_lockres);
  902. lockres_free(cinfo->resync_lockres);
  903. lockres_free(cinfo->bitmap_lockres);
  904. unlock_all_bitmaps(mddev);
  905. dlm_release_lockspace(cinfo->lockspace, 2);
  906. kfree(cinfo);
  907. return 0;
  908. }
  909. /* slot_number(): Returns the MD slot number to use
  910. * DLM starts the slot numbers from 1, wheras cluster-md
  911. * wants the number to be from zero, so we deduct one
  912. */
  913. static int slot_number(struct mddev *mddev)
  914. {
  915. struct md_cluster_info *cinfo = mddev->cluster_info;
  916. return cinfo->slot_number - 1;
  917. }
  918. /*
  919. * Check if the communication is already locked, else lock the communication
  920. * channel.
  921. * If it is already locked, token is in EX mode, and hence lock_token()
  922. * should not be called.
  923. */
  924. static int metadata_update_start(struct mddev *mddev)
  925. {
  926. struct md_cluster_info *cinfo = mddev->cluster_info;
  927. int ret;
  928. /*
  929. * metadata_update_start is always called with the protection of
  930. * reconfig_mutex, so set WAITING_FOR_TOKEN here.
  931. */
  932. ret = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
  933. &cinfo->state);
  934. WARN_ON_ONCE(ret);
  935. md_wakeup_thread(mddev->thread);
  936. wait_event(cinfo->wait,
  937. !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state) ||
  938. test_and_clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state));
  939. /* If token is already locked, return 0 */
  940. if (cinfo->token_lockres->mode == DLM_LOCK_EX) {
  941. clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
  942. return 0;
  943. }
  944. ret = lock_token(cinfo);
  945. clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
  946. return ret;
  947. }
  948. static int metadata_update_finish(struct mddev *mddev)
  949. {
  950. struct md_cluster_info *cinfo = mddev->cluster_info;
  951. struct cluster_msg cmsg;
  952. struct md_rdev *rdev;
  953. int ret = 0;
  954. int raid_slot = -1;
  955. memset(&cmsg, 0, sizeof(cmsg));
  956. cmsg.type = cpu_to_le32(METADATA_UPDATED);
  957. /* Pick up a good active device number to send.
  958. */
  959. rdev_for_each(rdev, mddev)
  960. if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) {
  961. raid_slot = rdev->desc_nr;
  962. break;
  963. }
  964. if (raid_slot >= 0) {
  965. cmsg.raid_slot = cpu_to_le32(raid_slot);
  966. ret = __sendmsg(cinfo, &cmsg);
  967. } else
  968. pr_warn("md-cluster: No good device id found to send\n");
  969. clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
  970. unlock_comm(cinfo);
  971. return ret;
  972. }
  973. static void metadata_update_cancel(struct mddev *mddev)
  974. {
  975. struct md_cluster_info *cinfo = mddev->cluster_info;
  976. clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
  977. unlock_comm(cinfo);
  978. }
  979. /*
  980. * return 0 if all the bitmaps have the same sync_size
  981. */
  982. int cluster_check_sync_size(struct mddev *mddev)
  983. {
  984. int i, rv;
  985. bitmap_super_t *sb;
  986. unsigned long my_sync_size, sync_size = 0;
  987. int node_num = mddev->bitmap_info.nodes;
  988. int current_slot = md_cluster_ops->slot_number(mddev);
  989. struct bitmap *bitmap = mddev->bitmap;
  990. char str[64];
  991. struct dlm_lock_resource *bm_lockres;
  992. sb = kmap_atomic(bitmap->storage.sb_page);
  993. my_sync_size = sb->sync_size;
  994. kunmap_atomic(sb);
  995. for (i = 0; i < node_num; i++) {
  996. if (i == current_slot)
  997. continue;
  998. bitmap = get_bitmap_from_slot(mddev, i);
  999. if (IS_ERR(bitmap)) {
  1000. pr_err("can't get bitmap from slot %d\n", i);
  1001. return -1;
  1002. }
  1003. /*
  1004. * If we can hold the bitmap lock of one node then
  1005. * the slot is not occupied, update the sb.
  1006. */
  1007. snprintf(str, 64, "bitmap%04d", i);
  1008. bm_lockres = lockres_init(mddev, str, NULL, 1);
  1009. if (!bm_lockres) {
  1010. pr_err("md-cluster: Cannot initialize %s\n", str);
  1011. md_bitmap_free(bitmap);
  1012. return -1;
  1013. }
  1014. bm_lockres->flags |= DLM_LKF_NOQUEUE;
  1015. rv = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
  1016. if (!rv)
  1017. bitmap_update_sb(bitmap);
  1018. lockres_free(bm_lockres);
  1019. sb = kmap_atomic(bitmap->storage.sb_page);
  1020. if (sync_size == 0)
  1021. sync_size = sb->sync_size;
  1022. else if (sync_size != sb->sync_size) {
  1023. kunmap_atomic(sb);
  1024. md_bitmap_free(bitmap);
  1025. return -1;
  1026. }
  1027. kunmap_atomic(sb);
  1028. md_bitmap_free(bitmap);
  1029. }
  1030. return (my_sync_size == sync_size) ? 0 : -1;
  1031. }
  1032. /*
  1033. * Update the size for cluster raid is a little more complex, we perform it
  1034. * by the steps:
  1035. * 1. hold token lock and update superblock in initiator node.
  1036. * 2. send METADATA_UPDATED msg to other nodes.
  1037. * 3. The initiator node continues to check each bitmap's sync_size, if all
  1038. * bitmaps have the same value of sync_size, then we can set capacity and
  1039. * let other nodes to perform it. If one node can't update sync_size
  1040. * accordingly, we need to revert to previous value.
  1041. */
  1042. static void update_size(struct mddev *mddev, sector_t old_dev_sectors)
  1043. {
  1044. struct md_cluster_info *cinfo = mddev->cluster_info;
  1045. struct cluster_msg cmsg;
  1046. struct md_rdev *rdev;
  1047. int ret = 0;
  1048. int raid_slot = -1;
  1049. md_update_sb(mddev, 1);
  1050. if (lock_comm(cinfo, 1)) {
  1051. pr_err("%s: lock_comm failed\n", __func__);
  1052. return;
  1053. }
  1054. memset(&cmsg, 0, sizeof(cmsg));
  1055. cmsg.type = cpu_to_le32(METADATA_UPDATED);
  1056. rdev_for_each(rdev, mddev)
  1057. if (rdev->raid_disk >= 0 && !test_bit(Faulty, &rdev->flags)) {
  1058. raid_slot = rdev->desc_nr;
  1059. break;
  1060. }
  1061. if (raid_slot >= 0) {
  1062. cmsg.raid_slot = cpu_to_le32(raid_slot);
  1063. /*
  1064. * We can only change capiticy after all the nodes can do it,
  1065. * so need to wait after other nodes already received the msg
  1066. * and handled the change
  1067. */
  1068. ret = __sendmsg(cinfo, &cmsg);
  1069. if (ret) {
  1070. pr_err("%s:%d: failed to send METADATA_UPDATED msg\n",
  1071. __func__, __LINE__);
  1072. unlock_comm(cinfo);
  1073. return;
  1074. }
  1075. } else {
  1076. pr_err("md-cluster: No good device id found to send\n");
  1077. unlock_comm(cinfo);
  1078. return;
  1079. }
  1080. /*
  1081. * check the sync_size from other node's bitmap, if sync_size
  1082. * have already updated in other nodes as expected, send an
  1083. * empty metadata msg to permit the change of capacity
  1084. */
  1085. if (cluster_check_sync_size(mddev) == 0) {
  1086. memset(&cmsg, 0, sizeof(cmsg));
  1087. cmsg.type = cpu_to_le32(CHANGE_CAPACITY);
  1088. ret = __sendmsg(cinfo, &cmsg);
  1089. if (ret)
  1090. pr_err("%s:%d: failed to send CHANGE_CAPACITY msg\n",
  1091. __func__, __LINE__);
  1092. set_capacity(mddev->gendisk, mddev->array_sectors);
  1093. revalidate_disk(mddev->gendisk);
  1094. } else {
  1095. /* revert to previous sectors */
  1096. ret = mddev->pers->resize(mddev, old_dev_sectors);
  1097. if (!ret)
  1098. revalidate_disk(mddev->gendisk);
  1099. ret = __sendmsg(cinfo, &cmsg);
  1100. if (ret)
  1101. pr_err("%s:%d: failed to send METADATA_UPDATED msg\n",
  1102. __func__, __LINE__);
  1103. }
  1104. unlock_comm(cinfo);
  1105. }
  1106. static int resync_start(struct mddev *mddev)
  1107. {
  1108. struct md_cluster_info *cinfo = mddev->cluster_info;
  1109. return dlm_lock_sync_interruptible(cinfo->resync_lockres, DLM_LOCK_EX, mddev);
  1110. }
  1111. static int resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi)
  1112. {
  1113. struct md_cluster_info *cinfo = mddev->cluster_info;
  1114. struct resync_info ri;
  1115. struct cluster_msg cmsg = {0};
  1116. /* do not send zero again, if we have sent before */
  1117. if (hi == 0) {
  1118. memcpy(&ri, cinfo->bitmap_lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
  1119. if (le64_to_cpu(ri.hi) == 0)
  1120. return 0;
  1121. }
  1122. add_resync_info(cinfo->bitmap_lockres, lo, hi);
  1123. /* Re-acquire the lock to refresh LVB */
  1124. dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW);
  1125. cmsg.type = cpu_to_le32(RESYNCING);
  1126. cmsg.low = cpu_to_le64(lo);
  1127. cmsg.high = cpu_to_le64(hi);
  1128. /*
  1129. * mddev_lock is held if resync_info_update is called from
  1130. * resync_finish (md_reap_sync_thread -> resync_finish)
  1131. */
  1132. if (lo == 0 && hi == 0)
  1133. return sendmsg(cinfo, &cmsg, 1);
  1134. else
  1135. return sendmsg(cinfo, &cmsg, 0);
  1136. }
  1137. static int resync_finish(struct mddev *mddev)
  1138. {
  1139. struct md_cluster_info *cinfo = mddev->cluster_info;
  1140. dlm_unlock_sync(cinfo->resync_lockres);
  1141. return resync_info_update(mddev, 0, 0);
  1142. }
  1143. static int area_resyncing(struct mddev *mddev, int direction,
  1144. sector_t lo, sector_t hi)
  1145. {
  1146. struct md_cluster_info *cinfo = mddev->cluster_info;
  1147. int ret = 0;
  1148. struct suspend_info *s;
  1149. if ((direction == READ) &&
  1150. test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state))
  1151. return 1;
  1152. spin_lock_irq(&cinfo->suspend_lock);
  1153. if (list_empty(&cinfo->suspend_list))
  1154. goto out;
  1155. list_for_each_entry(s, &cinfo->suspend_list, list)
  1156. if (hi > s->lo && lo < s->hi) {
  1157. ret = 1;
  1158. break;
  1159. }
  1160. out:
  1161. spin_unlock_irq(&cinfo->suspend_lock);
  1162. return ret;
  1163. }
  1164. /* add_new_disk() - initiates a disk add
  1165. * However, if this fails before writing md_update_sb(),
  1166. * add_new_disk_cancel() must be called to release token lock
  1167. */
  1168. static int add_new_disk(struct mddev *mddev, struct md_rdev *rdev)
  1169. {
  1170. struct md_cluster_info *cinfo = mddev->cluster_info;
  1171. struct cluster_msg cmsg;
  1172. int ret = 0;
  1173. struct mdp_superblock_1 *sb = page_address(rdev->sb_page);
  1174. char *uuid = sb->device_uuid;
  1175. memset(&cmsg, 0, sizeof(cmsg));
  1176. cmsg.type = cpu_to_le32(NEWDISK);
  1177. memcpy(cmsg.uuid, uuid, 16);
  1178. cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
  1179. if (lock_comm(cinfo, 1))
  1180. return -EAGAIN;
  1181. ret = __sendmsg(cinfo, &cmsg);
  1182. if (ret) {
  1183. unlock_comm(cinfo);
  1184. return ret;
  1185. }
  1186. cinfo->no_new_dev_lockres->flags |= DLM_LKF_NOQUEUE;
  1187. ret = dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_EX);
  1188. cinfo->no_new_dev_lockres->flags &= ~DLM_LKF_NOQUEUE;
  1189. /* Some node does not "see" the device */
  1190. if (ret == -EAGAIN)
  1191. ret = -ENOENT;
  1192. if (ret)
  1193. unlock_comm(cinfo);
  1194. else {
  1195. dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
  1196. /* Since MD_CHANGE_DEVS will be set in add_bound_rdev which
  1197. * will run soon after add_new_disk, the below path will be
  1198. * invoked:
  1199. * md_wakeup_thread(mddev->thread)
  1200. * -> conf->thread (raid1d)
  1201. * -> md_check_recovery -> md_update_sb
  1202. * -> metadata_update_start/finish
  1203. * MD_CLUSTER_SEND_LOCKED_ALREADY will be cleared eventually.
  1204. *
  1205. * For other failure cases, metadata_update_cancel and
  1206. * add_new_disk_cancel also clear below bit as well.
  1207. * */
  1208. set_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
  1209. wake_up(&cinfo->wait);
  1210. }
  1211. return ret;
  1212. }
  1213. static void add_new_disk_cancel(struct mddev *mddev)
  1214. {
  1215. struct md_cluster_info *cinfo = mddev->cluster_info;
  1216. clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
  1217. unlock_comm(cinfo);
  1218. }
  1219. static int new_disk_ack(struct mddev *mddev, bool ack)
  1220. {
  1221. struct md_cluster_info *cinfo = mddev->cluster_info;
  1222. if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state)) {
  1223. pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev));
  1224. return -EINVAL;
  1225. }
  1226. if (ack)
  1227. dlm_unlock_sync(cinfo->no_new_dev_lockres);
  1228. complete(&cinfo->newdisk_completion);
  1229. return 0;
  1230. }
  1231. static int remove_disk(struct mddev *mddev, struct md_rdev *rdev)
  1232. {
  1233. struct cluster_msg cmsg = {0};
  1234. struct md_cluster_info *cinfo = mddev->cluster_info;
  1235. cmsg.type = cpu_to_le32(REMOVE);
  1236. cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
  1237. return sendmsg(cinfo, &cmsg, 1);
  1238. }
  1239. static int lock_all_bitmaps(struct mddev *mddev)
  1240. {
  1241. int slot, my_slot, ret, held = 1, i = 0;
  1242. char str[64];
  1243. struct md_cluster_info *cinfo = mddev->cluster_info;
  1244. cinfo->other_bitmap_lockres = kzalloc((mddev->bitmap_info.nodes - 1) *
  1245. sizeof(struct dlm_lock_resource *),
  1246. GFP_KERNEL);
  1247. if (!cinfo->other_bitmap_lockres) {
  1248. pr_err("md: can't alloc mem for other bitmap locks\n");
  1249. return 0;
  1250. }
  1251. my_slot = slot_number(mddev);
  1252. for (slot = 0; slot < mddev->bitmap_info.nodes; slot++) {
  1253. if (slot == my_slot)
  1254. continue;
  1255. memset(str, '\0', 64);
  1256. snprintf(str, 64, "bitmap%04d", slot);
  1257. cinfo->other_bitmap_lockres[i] = lockres_init(mddev, str, NULL, 1);
  1258. if (!cinfo->other_bitmap_lockres[i])
  1259. return -ENOMEM;
  1260. cinfo->other_bitmap_lockres[i]->flags |= DLM_LKF_NOQUEUE;
  1261. ret = dlm_lock_sync(cinfo->other_bitmap_lockres[i], DLM_LOCK_PW);
  1262. if (ret)
  1263. held = -1;
  1264. i++;
  1265. }
  1266. return held;
  1267. }
  1268. static void unlock_all_bitmaps(struct mddev *mddev)
  1269. {
  1270. struct md_cluster_info *cinfo = mddev->cluster_info;
  1271. int i;
  1272. /* release other node's bitmap lock if they are existed */
  1273. if (cinfo->other_bitmap_lockres) {
  1274. for (i = 0; i < mddev->bitmap_info.nodes - 1; i++) {
  1275. if (cinfo->other_bitmap_lockres[i]) {
  1276. lockres_free(cinfo->other_bitmap_lockres[i]);
  1277. }
  1278. }
  1279. kfree(cinfo->other_bitmap_lockres);
  1280. cinfo->other_bitmap_lockres = NULL;
  1281. }
  1282. }
  1283. static int gather_bitmaps(struct md_rdev *rdev)
  1284. {
  1285. int sn, err;
  1286. sector_t lo, hi;
  1287. struct cluster_msg cmsg = {0};
  1288. struct mddev *mddev = rdev->mddev;
  1289. struct md_cluster_info *cinfo = mddev->cluster_info;
  1290. cmsg.type = cpu_to_le32(RE_ADD);
  1291. cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
  1292. err = sendmsg(cinfo, &cmsg, 1);
  1293. if (err)
  1294. goto out;
  1295. for (sn = 0; sn < mddev->bitmap_info.nodes; sn++) {
  1296. if (sn == (cinfo->slot_number - 1))
  1297. continue;
  1298. err = bitmap_copy_from_slot(mddev, sn, &lo, &hi, false);
  1299. if (err) {
  1300. pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn);
  1301. goto out;
  1302. }
  1303. if ((hi > 0) && (lo < mddev->recovery_cp))
  1304. mddev->recovery_cp = lo;
  1305. }
  1306. out:
  1307. return err;
  1308. }
  1309. static struct md_cluster_operations cluster_ops = {
  1310. .join = join,
  1311. .leave = leave,
  1312. .slot_number = slot_number,
  1313. .resync_start = resync_start,
  1314. .resync_finish = resync_finish,
  1315. .resync_info_update = resync_info_update,
  1316. .metadata_update_start = metadata_update_start,
  1317. .metadata_update_finish = metadata_update_finish,
  1318. .metadata_update_cancel = metadata_update_cancel,
  1319. .area_resyncing = area_resyncing,
  1320. .add_new_disk = add_new_disk,
  1321. .add_new_disk_cancel = add_new_disk_cancel,
  1322. .new_disk_ack = new_disk_ack,
  1323. .remove_disk = remove_disk,
  1324. .load_bitmaps = load_bitmaps,
  1325. .gather_bitmaps = gather_bitmaps,
  1326. .lock_all_bitmaps = lock_all_bitmaps,
  1327. .unlock_all_bitmaps = unlock_all_bitmaps,
  1328. .update_size = update_size,
  1329. };
  1330. static int __init cluster_init(void)
  1331. {
  1332. pr_warn("md-cluster: EXPERIMENTAL. Use with caution\n");
  1333. pr_info("Registering Cluster MD functions\n");
  1334. register_md_cluster_operations(&cluster_ops, THIS_MODULE);
  1335. return 0;
  1336. }
  1337. static void cluster_exit(void)
  1338. {
  1339. unregister_md_cluster_operations();
  1340. }
  1341. module_init(cluster_init);
  1342. module_exit(cluster_exit);
  1343. MODULE_AUTHOR("SUSE");
  1344. MODULE_LICENSE("GPL");
  1345. MODULE_DESCRIPTION("Clustering support for MD");