1 /*
2 * Copyright (c) 1998-2002 The Jgroup Team.
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU Lesser General Public License version 2 as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU Lesser General Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this program; if not, write to the Free Software
15 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16 *
17 */
18
19 package jgroup.arm;
20
21 import java.util.Collections;
22 import java.util.Iterator;
23 import java.util.Map;
24 import java.util.TreeMap;
25
26 import jgroup.core.ConfigurationException;
27 import jgroup.core.MemberId;
28 import jgroup.core.View;
29 import jgroup.core.arm.DistributionScheme;
30 import jgroup.core.arm.GroupExistsException;
31 import jgroup.core.arm.RedundancyException;
32 import jgroup.core.arm.UnknownGroupException;
33 import jgroup.relacs.config.AppConfig;
34 import jgroup.relacs.config.Domain;
35 import jgroup.relacs.config.DomainSet;
36 import jgroup.relacs.config.Host;
37 import jgroup.relacs.config.HostSet;
38
39 import org.apache.log4j.Logger;
40
41 /**
42 * ReplicaCount class is used to compute replica allocation on
43 * domains/hosts in the distributed system.
44 *
45 * FIXME HEIN: This class needs to implement the DomainListener and
46 * HostListener. Be aware the the host set below is a clone (shallow
47 * copy) of the original. This means that we should work on the
48 * original when adding new host, but also add the new host to the host
49 * set use to represent the various replica counts. That is, adding a
50 * new host, the replica count for it should be set to zero.
51 *
52 * @author Hein Meling
53 * @since Jgroup 1.2
54 */
55 final class ReplicaCount
56 implements DistributionScheme
57 {
58
59 ////////////////////////////////////////////////////////////////////////////////////////////
60 // Logger
61 ////////////////////////////////////////////////////////////////////////////////////////////
62
63 /** Obtain logger for this class */
64 private static final Logger log = Logger.getLogger(ReplicaCount.class);
65
66
67 ////////////////////////////////////////////////////////////////////////////////////////////
68 // Constants
69 ////////////////////////////////////////////////////////////////////////////////////////////
70
71 /** Zero value for storing in a collection. */
72 private static final Integer ZERO = new Integer(0);
73
74 /**
75 * Key for the <code>Host</code> content map; to access the number of
76 * replicas executing on the associated host.
77 */
78 private static final String REPLICA_COUNT = "ReplicaCount";
79
80 /**
81 * Key for the <code>Domain</code> content map; to obtain access to
82 * the TreeMap of sorted hosts. Sorting is based on the number of
83 * replicas executing on each host.
84 */
85 private static final String REPLICA_COUNT_MAP = "ReplicaCountMap";
86
87
88 ////////////////////////////////////////////////////////////////////////////////////////////
89 // Fields
90 ////////////////////////////////////////////////////////////////////////////////////////////
91
92 /** The set of domains over which we want to assign replicas. */
93 private DomainSet domains;
94
95 /** Table for managing groups and applications */
96 private GroupTable groupTable;
97
98
99 ////////////////////////////////////////////////////////////////////////////////////////////
100 // Constructor
101 ////////////////////////////////////////////////////////////////////////////////////////////
102
103 ReplicaCount(GroupTable groupTable, DomainSet domains)
104 throws ConfigurationException
105 {
106 this.groupTable = groupTable;
107 this.domains = domains;
108
109 for (Domain domain : domains) {
110 HostSet hosts = (HostSet) domain.getHostSet().clone();
111 Map<Integer,HostSet> countMap =
112 Collections.synchronizedMap(new TreeMap<Integer,HostSet>());
113 countMap.put(ZERO, hosts);
114 domain.put(REPLICA_COUNT_MAP, countMap);
115 for (Iterator j = hosts.iterator(); j.hasNext(); ) {
116 Host host = (Host) j.next();
117 host.put(REPLICA_COUNT, ZERO);
118 }
119 }
120 }
121
122
123 ////////////////////////////////////////////////////////////////////////////////////////////
124 // Methods from the DistributionScheme interface
125 ////////////////////////////////////////////////////////////////////////////////////////////
126
127 /**
128 * Assign the host location for the specified application. The
129 * <code>AppConfig</code> object also contains the required redundancy.
130 * <P>
131 * We first try to place replicas in different domains. But if some
132 * domains are unavailable, or the requested redundancy is greater
133 * than the number of domains, this is not possible. In this case we
134 * place replicas in different domains were possible and then in the
135 * same domain as existing replicas. We never place replicas on the
136 * same host.
137 * <P>
138 * FIXME HEIN: This method should prehaps be synchronized on the
139 * 'domains' object in order to avoid concurrent changes to the set
140 * of domains while iterating. For instance if a domain is removed
141 * after we determined that the size was sufficient to satisfy the
142 * given redundancy, the method would behave incorrectly.
143 *
144 * @param app
145 * An <code>AppConfig</code> object specifying the class parameters
146 * and redundancy requirements.
147 * @return
148 * A <code>HostSet</code> specifying the locations where the replicas
149 * should be created.
150 * @exception RedundancyException
151 * Is raised if an the specified redundancy could not be satisfied.
152 * @exception GroupExistsException
153 * Raised if the specified application group already exists (has been
154 * assigned to some hostset).
155 */
156 public HostSet assignReplicas(AppConfig app)
157 throws RedundancyException, GroupExistsException
158 {
159 if (app == null)
160 throw new NullPointerException("No application object specified for assignReplicas()");
161 if (groupTable.contains(app))
162 throw new GroupExistsException("Cannot assign replicas", app.getGroupId());
163
164 /* Get the number of domains in the distributed system. */
165 int numDomains = domains.size();
166 if (numDomains == 0)
167 throw new RedundancyException("No domains available to assign replicas.");
168
169 /* Create an empty HostSet with capacity := redundancy */
170 int redundancy = app.getInitialRedundancy();
171 HostSet assignedHosts = new HostSet(redundancy);
172
173 /*
174 * We exit the outer loop once the redundancy has been satisfied or
175 * the number of domains to avoid is equal to the number of
176 * available domains. In the latter case, we must rollback and
177 * throw an exception (see below.)
178 */
179 for (DomainSet avoid = new DomainSet();
180 redundancy > 0 && avoid.size() < numDomains; ) {
181
182 /* Iterate over domains */
183 for (Iterator i = domains.iterator(); redundancy > 0 && i.hasNext(); ) {
184 Domain domain = (Domain) i.next();
185 if (!avoid.containsDomain(domain)) {
186 Host host = getHost(domain, assignedHosts);
187 /*
188 * If getHost returns null the domain is marked as avoid. This
189 * is because it is either unavailable or all hosts in it has
190 * been used to place replicas of this type.
191 */
192 if (host == null)
193 avoid.addDomain(domain);
194 else {
195 /* We found an available host; adding it to the host set */
196 assignedHosts.addHost(host);
197 redundancy--;
198 }
199 }
200 }
201 }
202
203 /*
204 * If the number of hosts obtained is less than the requested
205 * redundancy we rollback any updates to the system tables and throw
206 * an exception.
207 */
208 if (assignedHosts.size() < redundancy) {
209 removeReplicas(assignedHosts);
210 throw new RedundancyException("Could not satisfy requested redundancy for " + app.getClassName());
211 }
212
213 /*
214 * If the group table already has assigned this application,
215 * a GroupExistsException is thrown.
216 */
217 groupTable.add(app, assignedHosts);
218
219 return assignedHosts;
220 }
221
222
223 public HostSet collocateReplicas(AppConfig app, AppConfig collocateWithApp)
224 throws GroupExistsException, RedundancyException, UnknownGroupException
225 {
226 if (groupTable.contains(app))
227 throw new GroupExistsException("Cannot assign replicas", app.getGroupId());
228
229 if (app.getInitialRedundancy() != collocateWithApp.getInitialRedundancy() ||
230 app.getMinimalRedundancy() != collocateWithApp.getMinimalRedundancy()) {
231 RedundancyException e = new RedundancyException("Cannot collocate replicas.");
232 log.error("Cannot collocate replicas with different redundancy requirements:\n"
233 + app + "\n" + collocateWithApp, e);
234 throw e;
235 }
236 HostSet assignedHosts = groupTable.getAssignedHosts(collocateWithApp);
237 if (assignedHosts.isEmpty()) {
238 throw new UnknownGroupException("Application not previously assigned: "
239 + collocateWithApp.getClassName(), collocateWithApp.getGroupId());
240 }
241 return assignedHosts;
242 }
243
244
245 /**
246 * Remove replica assignment data by updating the content map of
247 * hosts and their corresponding domain content map. In addition
248 *
249 * @param app
250 * The application replicas to be removed.
251 * @throws UnknownGroupException
252 * Raised if the specified application group is unknown.
253 */
254 public HostSet removeReplicas(AppConfig app)
255 throws UnknownGroupException
256 {
257 if (!groupTable.contains(app))
258 throw new UnknownGroupException(app.getGroupId());
259 HostSet hosts = groupTable.getAssignedHosts(app);
260 removeReplicas(hosts);
261 groupTable.remove(app);
262 return hosts;
263 }
264
265
266 /**
267 * Reassign the application replica running on the specified host.
268 *
269 * @param app
270 * The application group to which the replica belongs.
271 * @param host
272 * The host from which to remove the replica.
273 * @return
274 * The host on which a new replica should be created.
275 * @exception UnknownGroupException
276 * The given application group is unknown.
277 * @exception RedundancyException
278 * Raised if the system could not reassign the replica to any host
279 * in the system.
280 */
281 public Host reassignReplica(AppConfig app, Host host)
282 throws UnknownGroupException, RedundancyException
283 {
284 if (!groupTable.contains(app))
285 throw new UnknownGroupException(app.getGroupId());
286 HostSet assignedHosts = groupTable.getAssignedHosts(app);
287
288 /* First try to get a new host within the same domain. */
289 Host newHost = getHost(host.getDomain(), assignedHosts);
290 if (newHost == null) {
291
292 /* Iterate over domains to try get an alternative location. */
293 for (Iterator iter = domains.iterator(); newHost == null && iter.hasNext(); )
294 newHost = getHost((Domain) iter.next(), assignedHosts);
295
296 if (newHost == null)
297 throw new RedundancyException("Could not reassign the given replica " + app + ", away from " + host);
298 }
299
300 /*
301 * Update the host set for the given application to reflect the
302 * reassignment. Note that we may already have removed the host
303 * from the assignedHosts set during the viewChange() method
304 * invocation.
305 */
306 groupTable.removeHost(app, host);
307 groupTable.addHost(app, newHost);
308
309 return newHost;
310 }
311
312
313 /* (non-javadoc)
314 * @see jgroup.core.arm.DistributionScheme#viewChangeEvent(jgroup.core.View)
315 */
316 public void viewChangeEvent(View view)
317 {
318 // Update the GroupTable according to this view change event
319 groupTable.viewChangeEvent(view);
320 /*
321 * Cancel the renew timer for the application given by the provided
322 * view. This is required to prevent the renew mechanism from
323 * triggering, in cases when a new view has been installed, but the
324 * members has recently been initialized, and are thus out of sync
325 * with the renew timer at the RM replicas.
326 */
327 int groupSize = view.size();
328 if (groupSize > 0) {
329 ReplicaPingEvent pingEvent =
330 ReplicaPingEvent.getNewPingEvent(view.getGid(), groupSize);
331 if (pingEvent != null)
332 pingEvent.handle(this);
333 }
334 }
335
336
337 ////////////////////////////////////////////////////////////////////////////////////////////
338 // Private methods
339 ////////////////////////////////////////////////////////////////////////////////////////////
340
341 /**
342 * Remove replica assignment data by updating the content map of
343 * hosts and their corresponding domain content map.
344 *
345 * @param hosts
346 * A <code>HostSet</code> containing the hosts that should have
347 * their content map updated due to this removal.
348 *
349 * @throws IllegalStateException
350 * Raised if the domain of one of the hosts in the set was not
351 * initialized correctly, or if the countMap does not contain the
352 * specified host in the expected position. This exception should
353 * never occur unless invoked at an incorrect time.
354 */
355 private void removeReplicas(final HostSet hosts)
356 {
357 for (Iterator i = hosts.iterator(); i.hasNext(); )
358 removeReplica((Host) i.next());
359 }
360
361
362 /**
363 * Remove a replica assignment from the specified host,
364 * by updating the content map of the host and its
365 * corresponding domain content map.
366 *
367 * @param host
368 * The <code>Host</code> that should have its content map
369 * updated due to this removal.
370 *
371 * @throws IllegalStateException
372 * Raised if the domain of the host was not initialized correctly,
373 * or if the countMap does not contain the specified host in the
374 * expected position. This exception should never occur unless
375 * this method is invoked at an incorrect time.
376 */
377 @SuppressWarnings("unchecked")
378 public static void removeReplica(Host host)
379 {
380 Integer count = (Integer) host.get(REPLICA_COUNT);
381 Integer oldCount = new Integer(count.intValue() - 1);
382 host.put(REPLICA_COUNT, oldCount);
383 Domain domain = host.getDomain();
384 domain.decReplicaCount();
385
386 Map<Integer,HostSet> countMap = (Map) domain.get(REPLICA_COUNT_MAP);
387 if (countMap == null)
388 throw new IllegalStateException("Domain has not been initialized: " + domain);
389
390 /*
391 * Get the host set with the current replica count from which this
392 * host should be removed.
393 */
394 HostSet hostSet = countMap.get(count);
395 if (hostSet == null) {
396 //FIXME HACK: to avoid NPE problem.
397 for (HostSet hosts : countMap.values()) {
398 if (hosts.containsHost(host)) {
399 hostSet = hosts;
400 break;
401 }
402 }
403 }
404 if (!hostSet.removeHost(host))
405 throw new IllegalStateException("Host not in host set for the current replica count.");
406
407 if (hostSet.isEmpty())
408 countMap.remove(count);
409
410 /*
411 * Get the host set with the old replica count to which this host
412 * should be added.
413 */
414 hostSet = countMap.get(oldCount);
415 if (hostSet == null) {
416 /*
417 * There is no other hosts with matching old replica count value,
418 * therefore we create a new (empty) host set and place it in the
419 * sorted countMap, into which the given host will be put.
420 */
421 hostSet = new HostSet();
422 countMap.put(oldCount, hostSet);
423 }
424
425 /*
426 * Add the given host to the set of hosts with the old replica count
427 * value.
428 */
429 hostSet.addHost(host);
430 }
431
432
433 /**
434 * Update the internal tables to include a replica placed on
435 * the given host.
436 */
437 public static void updateHost(Host host)
438 {
439 Domain domain = host.getDomain();
440 /*
441 * We clone the domain host set since we need to remove a
442 * host from it before using it as the avoidHostSet.
443 */
444 HostSet avoidHostSet = (HostSet) domain.getHostSet().clone();
445 /*
446 * Remove the host to place, from the avoidHostSet to force
447 * the getHost() method to update the table correctly, and
448 * return the same host; if not, we are in trouble.
449 */
450 avoidHostSet.removeHost(host);
451 Host newHost = getHost(domain, avoidHostSet);
452 if (newHost == null || !newHost.equals(host)) {
453 log.warn(host.getHostName() + " could not be added to the replica assignment; "
454 + "the avoid set contains these hosts: " + avoidHostSet);
455 } else {
456 if (log.isDebugEnabled())
457 log.debug("Updated host to include one more replica: " + host);
458 }
459 }
460
461 /**
462 * Returns a host from the given domain that is not in the avoid set.
463 * The host returned is a host with the lowest replica count in this
464 * domain.
465 *
466 * @param domain
467 * Domain from which to get a host.
468 * @param avoid
469 * A set of hosts in this domain to avoid. That is, a new replica
470 * should not be placed on the same host. It is the responsibility
471 * of the caller to determine which hosts are already used for this
472 * invocation series. To start an invocation series the avoid set
473 * may be <code>null</code> or just an empty <code>HostSet</code>.
474 * @return
475 * An availble host in the given domain, and not in the avoid set.
476 * If no more hosts are available in this domain, <code>null</code>
477 * is returned.
478 * @throws IllegalStateException
479 * Raised if the specified domain has not been correctly initialized.
480 */
481 @SuppressWarnings("unchecked")
482 private static Host getHost(Domain domain, HostSet avoid)
483 {
484 if (avoid == null)
485 avoid = new HostSet(0);
486
487 Map<Integer,HostSet> countMap = (Map) domain.get(REPLICA_COUNT_MAP);
488 if (countMap == null)
489 throw new IllegalStateException("Domain has not been initialized: " + domain);
490
491 /*
492 * Get the host set in this domain with the lowest replica count.
493 */
494 synchronized (countMap) {
495 for (Iterator cntIter = countMap.keySet().iterator(); cntIter.hasNext(); ) {
496
497 /*
498 * Since the countMap is a sorted map, the keys will be iterated
499 * in sorted order from low to high.
500 */
501 Integer count = (Integer) cntIter.next();
502
503 /* Obtain the host set associated with this count value; the lowest possible. */
504 HostSet hosts = (HostSet) countMap.get(count);
505 /* If this count value contains no hosts, we try the next one. */
506 if (hosts == null)
507 continue;
508
509 synchronized (hosts) {
510 for (Iterator iter = hosts.iterator(); iter.hasNext(); ) {
511 Host host = (Host) iter.next();
512 if (!avoid.containsHost(host) && host.isAvailable()) {
513 /* Found an available host with a low replica count. */
514 foundHost(host, hosts, countMap, count);
515 return host;
516 }
517 }
518 }
519 }
520 }
521
522 /* We could not find any unused and available hosts. */
523 return null;
524 }
525
526
527 private static void foundHost(Host host, HostSet hosts,
528 Map<Integer,HostSet> countMap, Integer count)
529 {
530 /*
531 * Remove this host from its present host set. If the host
532 * set is empty after this removal, we must remove the entry
533 * from the sorted countMap which is stored in the domain
534 * content map.
535 */
536 hosts.removeHost(host);
537 if (hosts.isEmpty())
538 countMap.remove(count);
539
540 /* New replica count value; for the selected host */
541 Integer newCount = new Integer(count.intValue() + 1);
542
543 if (countMap.containsKey(newCount)) {
544
545 /*
546 * Add this host to an existing set of hosts with the new
547 * replica count value.
548 */
549 ((HostSet) countMap.get(newCount)).addHost(host);
550
551 } else {
552
553 /*
554 * There is no other hosts with matching replica count
555 * value, we add our host to a new host set, that we put in
556 * the sorted countMap.
557 */
558 hosts = new HostSet();
559 hosts.addHost(host);
560 countMap.put(newCount, hosts);
561
562 }
563
564 /* Update the count value associated with this host and its domain. */
565 host.put(REPLICA_COUNT, newCount);
566 Domain domain = host.getDomain();
567 domain.incReplicaCount();
568 }
569
570
571 ////////////////////////////////////////////////////////////////////////////////////////////
572 // Object methods
573 ////////////////////////////////////////////////////////////////////////////////////////////
574
575 public String toString()
576 {
577 StringBuilder b = new StringBuilder();
578 for (Iterator iter = groupTable.iterator(); iter.hasNext();) {
579 GroupData gd = (GroupData) iter.next();
580 HostSet hosts = gd.getAssignedHosts();
581 for (Iterator it = hosts.iterator(); it.hasNext();) {
582 Host host = (Host) it.next();
583 Integer count = (Integer) host.get(REPLICA_COUNT);
584 b.append(host);
585 b.append(" has assigned ");
586 b.append(count);
587 b.append(" replicas\n");
588 }
589 }
590 return groupTable.toString();
591 }
592
593
594 ////////////////////////////////////////////////////////////////////////////////////////////
595 // MergingListener methods
596 ////////////////////////////////////////////////////////////////////////////////////////////
597
598 public Object getState(MemberId[] dests)
599 {
600 return domains;
601 }
602
603
604 public void putState(Object status, MemberId[] sources)
605 {
606 if (log.isDebugEnabled()) {
607 log.debug("RC-putstate");
608 for (int i = 0; i < sources.length; i++) {
609 log.debug("sources: " + sources[i]);
610 }
611 }
612 DomainSet rDomains = (DomainSet) status;
613 for (Domain rDomain : rDomains) {
614 Domain lDomain = domains.getDomain(rDomain);
615 if (rDomain.getReplicaCount() > lDomain.getReplicaCount()) {
616 Map<Integer,HostSet> oldcountMap = (Map) lDomain.get(REPLICA_COUNT_MAP);
617 Map<Integer,HostSet> countMap = (Map) rDomain.get(REPLICA_COUNT_MAP);
618 if (log.isDebugEnabled()) {
619 log.debug(lDomain.getAddress().getHostAddress() + " (local): " + lDomain.getReplicaCount());
620 log.debug(rDomain.getAddress().getHostAddress() + " (remote): " + rDomain.getReplicaCount());
621 log.debug("old countmap: " + oldcountMap);
622 log.debug("new countmap: " + countMap);
623 }
624 lDomain.put(REPLICA_COUNT_MAP, countMap);
625 }
626 }
627 }
628
629 } // End of ReplicaCount