for (;;) {" 678 E e = p.item;" 679 sb.append(e == this ? "(this Collection)" : e);" 680 p = p.next;" 681 if (p == null)" 682 return sb.append(']').toString();" 683 sb.append(',').append(' ');" 684 }" 685 } finally {" 686 fullyUnlock();" 687 }" 688 }" 689 " 690 /**" 691 * Atomically removes all of the elements from this queue." 692 * The queue will be empty after this call returns." 693 */" 694 public void clear() {" 695 fullyLock();" 696 try {" 697 for (Node<E> p, h = head; (p = h.next) != null; h = p) {" 698 h.next = h;" 699 p.item = null;" 700 }" 701 head = last;" 702 // assert head.item == null && head.next == null;" 703 if (count.getAndSet(0) == capacity)" 704 notFull.signal();" 705 } finally {" 706 fullyUnlock();" 707 }" 708 }" 709 " 710 /**" 711 * @throws UnsupportedOperationException {@inheritDoc}" 712 * @throws ClassCastException {@inheritDoc}" 713 * @throws NullPointerException {@inheritDoc}" 714 * @throws IllegalArgumentException {@inheritDoc}" 715 */" 716 public int drainTo(Collection<? super E> c) {" 717 return drainTo(c, Integer.MAX_VALUE);" 718 }" 719 " 720 /**" 721 * @throws UnsupportedOperationException {@inheritDoc}" 722 * @throws ClassCastException {@inheritDoc}" 723 * @throws NullPointerException {@inheritDoc}" 724 * @throws IllegalArgumentException {@inheritDoc}" 725 */" 726 public int drainTo(Collection<? super E> c, int maxElements) {" 727 if (c == null)" 728 throw new NullPointerException();" 729 if (c == this)" 730 throw new IllegalArgumentException();" 731 boolean signalNotFull = false;" 732 final ReentrantLock takeLock = this.takeLock;" 733 takeLock.lock();" 734 try {" 735 int n = Math.min(maxElements, count.get());" 736 // count.get provides visibility to first n Nodes" 737 Node<E> h = head;" 738 int i = 0;" 739 try {" 740 while (i < n) {" 741 Node<E> p = h.next;" 742 c.add(p.item);" 743 p.item = null;" 744 h.next = h;" 745 h = p;" 746 ++i;" 747 }" 748 return n;" 749 } finally {" 750 // Restore invariants even if c.add() threw" 751 if (i > 0) {" 752 // assert h.item == null;" 753 head = h;" 754 signalNotFull = (count.getAndAdd(-i) == capacity);" 755 }" 756 }" 757 } finally {" 758 takeLock.unlock();" 759 if (signalNotFull)" 760 signalNotFull();" 761 }" 762 }" 763 " 764 /**" 765 * Returns an iterator over the elements in this queue in proper sequence." 766 * The elements will be returned in order from first (head) to last (tail)." 767 *" 768 * <p>The returned iterator is a "weakly consistent" iterator that" 769 * will never throw {@link java.util.ConcurrentModificationException" 770 * ConcurrentModificationException}, and guarantees to traverse" 771 * elements as they existed upon construction of the iterator, and" 772 * may (but is not guaranteed to) reflect any modifications" 773 * subsequent to construction." 774 *" 775 * @return an iterator over the elements in this queue in proper sequence" 776 */" 777 public Iterator<E> iterator() {" 778 return new Itr();" 779 }" 780 " 781 private class Itr implements Iterator<E> {" 782 /*" 783 * Basic weakly-consistent iterator. At all times hold the next" 784 * item to hand out so that if hasNext() reports true, we will" 785 * still have it to return even if lost race with a take etc." 786 */" 787 private Node<E> current;" 788 private Node<E> lastRet;" 789 private E currentElement;" 790 " 791 Itr() {" 792 fullyLock();" 793 try {" 794 current = head.next;" 795 if (current != null)" 796 currentElement = current.item;" 797 } finally {" 798 fullyUnlock();" 799 }" 800 }" 801 " 802 public boolean hasNext() {" 803 return current != null;" 804 }" 805 " 806 /**" 807 * Returns the next live successor of p, or null if no such." 808 *" 809 * Unlike other traversal methods, iterators need to handle both:" 810 * - dequeued nodes (p.next == p)" 811 * - (possibly multiple) interior removed nodes (p.item == null)" 812 */" 813 private Node<E> nextNode(Node<E> p) {" 814 for (;;) {" 815 Node<E> s = p.next;" 816 if (s == p)" 817 return head.next;" 818 if (s == null || s.item != null)" 819 return s;" 820 p = s;" 821 }" 822 }" 823 " 824 public E next() {" 825 fullyLock();" 826 try {" 827 if (current == null)" 828 throw new NoSuchElementException();" 829 E x = currentElement;" 830 lastRet = current;" 831 current = nextNode(current);" 832 currentElement = (current == null) ? null : current.item;" 833 return x;" 834 } finally {" 835 fullyUnlock();" 836 }" 837 }" 838 " 839 public void remove() {" 840 if (lastRet == null)" 841 throw new IllegalStateException();" 842 fullyLock();" 843 try {" 844 Node<E> node = lastRet;" 845 lastRet = null;" 846 for (Node<E> trail = head, p = trail.next;" 847 p != null;" 848 trail = p, p = p.next) {" 849 if (p == node) {" 850 unlink(p, trail);" 851 break;" 852 }" 853 }" 854 } finally {" 855 fullyUnlock();" 856 }" 857 }" 858 }" 859 " 860 /**" 861 * Save the state to a stream (that is, serialize it)." 862 *" 863 * @serialData The capacity is emitted (int), followed by all of" 864 * its elements (each an {@code Object}) in the proper order," 865 * followed by a null" 866 * @param s the stream" 867 */" 868 private void writeObject(java.io.ObjectOutputStream s)" 869 throws java.io.IOException {" 870 " 871 fullyLock();" 872 try {" 873 // Write out any hidden stuff, plus capacity" 874 s.defaultWriteObject();" 875 " 876 // Write out all elements in the proper order." 877 for (Node<E> p = head.next; p != null; p = p.next)" 878 s.writeObject(p.item);" 879 " 880 // Use trailing null as sentinel" 881 s.writeObject(null);" 882 } finally {" 883 fullyUnlock();" 884 }" 885 }" 886 " 887 /**" 888 * Reconstitute this queue instance from a stream (that is," 889 * deserialize it)." 890 *" 891 * @param s the stream" 892 */" 893 private void readObject(java.io.ObjectInputStream s)" 894 throws java.io.IOException, ClassNotFoundException {" 895 // Read in capacity, and any hidden stuff" 896 s.defaultReadObject();" 897 " 898 count.set(0);" 899 last = head = new Node<E>(null);" 900 " 901 // Read in all elements and place in queue" 902 for (;;) {" 903 @SuppressWarnings("unchecked")" 904 E item = (E)s.readObject();" 905 if (item == null)" 906 break;" 907 add(item);" 908 }" 909 }" 910 }" ! Save This Page! Home » openjdk-7 » java » util » concurrent » [javadoc | source]! ! 340 /*" 341 * Note that count is used in wait guard even though it is" 342 * not protected by lock. This works because count can" 343 * only decrease at this point (all other puts are shut" 344 * out by lock), and we (or some other waiting put) are" 345 * signalled if it ever changes from capacity. Similarly" 346 * for all other uses of count in other wait guards." 347 */" 348 while (count.get() == capacity) {" 349 notFull.await();" 350 }" 351 enqueue(node);" 352 c = count.getAndIncrement();" 353 if (c + 1 < capacity)" 354 notFull.signal();" 355 } finally {" 356 putLock.unlock();" 357 }" 358 if (c == 0)" 359 signalNotEmpty();" 360 }" 361 " 362 /**" 363 * Inserts the specified element at the tail of this queue, waiting if" 364 * necessary up to the specified wait time for space to become available." 365 *" 366 * @return {@code true} if successful, or {@code false} if" 367 * the specified waiting time elapses before space is available." 368 * @throws InterruptedException {@inheritDoc}" 369 * @throws NullPointerException {@inheritDoc}" 370 */" 371 public boolean offer(E e, long timeout, TimeUnit unit)" 372 throws InterruptedException {" 373 " 374 if (e == null) throw new NullPointerException();" 375 long nanos = unit.toNanos(timeout);" 376 int c = -1;" 377 final ReentrantLock putLock = this.putLock;" 378 final AtomicInteger count = this.count;" 379 putLock.lockInterruptibly();" 380 try {" 381 while (count.get() == capacity) {" 382 if (nanos <= 0)" 383 return false;" 384 nanos = notFull.awaitNanos(nanos);" 385 }" 386 enqueue(new Node<E>(e));" 387 c = count.getAndIncrement();" 388 if (c + 1 < capacity)" 389 notFull.signal();" 390 } finally {" 391 putLock.unlock();" 392 }" 393 if (c == 0)" 394 signalNotEmpty();" 395 return true;" 396 }" 397 " 398 /**" 399 * Inserts the specified element at the tail of this queue if it is" 400 * possible to do so immediately without exceeding the queue's capacity," 401 * returning {@code true} upon success and {@code false} if this queue" 402 * is full." 403 * When using a capacity-restricted queue, this method is generally" 404 * preferable to method {@link BlockingQueue#add add}, which can fail to" 405 * insert an element only by throwing an exception." 406 *" 407 * @throws NullPointerException if the specified element is null" 408 */" 409 public boolean offer(E e) {" 410 if (e == null) throw new NullPointerException();" 411 final AtomicInteger count = this.count;" 412 if (count.get() == capacity)" 413 return false;" 414 int c = -1;" 415 Node<E> node = new Node(e);" 416 final ReentrantLock putLock = this.putLock;" 417 putLock.lock();" 418 try {" 419 if (count.get() < capacity) {" 420 enqueue(node);" 421 c = count.getAndIncrement();" 422 if (c + 1 < capacity)" 423 notFull.signal();" 424 }" 425 } finally {" 426 putLock.unlock();" 427 }" 428 if (c == 0)" 429 signalNotEmpty();" 430 return c >= 0;" 431 }" 432 " 433 " 434 public E take() throws InterruptedException {" 435 E x;" 436 int c = -1;" 437 final AtomicInteger count = this.count;" 438 final ReentrantLock takeLock = this.takeLock;" 439 takeLock.lockInterruptibly();" 440 try {" 441 while (count.get() == 0) {" 442 notEmpty.await();" 443 }" 444 x = dequeue();" 445 c = count.getAndDecrement();" 446 if (c > 1)" 447 notEmpty.signal();" 448 } finally {" 449 takeLock.unlock();" 450 }" 451 if (c == capacity)" 452 signalNotFull();" 453 return x;" 454 }" 455 " 456 public E poll(long timeout, TimeUnit unit) throws InterruptedException {" 457 E x = null;" 458 int c = -1;" 459 long nanos = unit.toNanos(timeout);" 460 final AtomicInteger count = this.count;" 461 final ReentrantLock takeLock = this.takeLock;" 462 takeLock.lockInterruptibly();" 463 try {" 464 while (count.get() == 0) {" 465 if (nanos <= 0)" 466 return null;" 467 nanos = notEmpty.awaitNanos(nanos);" 468 }" 469 x = dequeue();" 470 c = count.getAndDecrement();" 471 if (c > 1)" 472 notEmpty.signal();" 473 } finally {" 474 takeLock.unlock();" 475 }" 476 if (c == capacity)" 477 signalNotFull();" 478 return x;" 479 }" 480 " 481 public E poll() {" 482 final AtomicInteger count = this.count;" 483 if (count.get() == 0)" 484 return null;" 485 E x = null;" 486 int c = -1;" 487 final ReentrantLock takeLock = this.takeLock;" 488 takeLock.lock();" 489 try {" 490 if (count.get() > 0) {" 491 x = dequeue();" 492 c = count.getAndDecrement();" 493 if (c > 1)" ! 494 notEmpty.signal();" 495 }" 496 } finally {" 497 takeLock.unlock();" 498 }" 499 if (c == capacity)" 500 signalNotFull();" 501 return x;" 502 }" 503 " 504 public E peek() {" 505 if (count.get() == 0)" 506 return null;" 507 final ReentrantLock takeLock = this.takeLock;" 508 takeLock.lock();" 509 try {" 510 Node<E> first = head.next;" 511 if (first == null)" 512 return null;" 513 else" 514 return first.item;" 515 } finally {" 516 takeLock.unlock();" 517 }" 518 }" 519 " 520 /**" 521 * Unlinks interior Node p with predecessor trail." 522 */" 523 void unlink(Node<E> p, Node<E> trail) {" 524 // assert isFullyLocked();" 525 // p.next is not changed, to allow iterators that are" 526 // traversing p to maintain their weak-consistency guarantee." 527 p.item = null;" 528 trail.next = p.next;" 529 if (last == p)" 530 last = trail;" 531 if (count.getAndDecrement() == capacity)" 532 notFull.signal();" 533 }" 534 " 535 /**" 536 * Removes a single instance of the specified element from this queue," 537 * if it is present. More formally, removes an element {@code e} such" 538 * that {@code o.equals(e)}, if this queue contains one or more such" 539 * elements." 540 * Returns {@code true} if this queue contained the specified element" 541 * (or equivalently, if this queue changed as a result of the call)." 542 *" 543 * @param o element to be removed from this queue, if present" 544 * @return {@code true} if this queue changed as a result of the call" 545 */" 546 public boolean remove(Object o) {" 547 if (o == null) return false;" 548 fullyLock();" 549 try {" 550 for (Node<E> trail = head, p = trail.next;" 551 p != null;" 552 trail = p, p = p.next) {" 553 if (o.equals(p.item)) {" 554 unlink(p, trail);" 555 return true;" 556 }" 557 }" 558 return false;" 559 } finally {" 560 fullyUnlock();" 561 }" 562 }" 563 " 564 /**" 565 * Returns {@code true} if this queue contains the specified element." 566 * More formally, returns {@code true} if and only if this queue contains" 567 * at least one element {@code e} such that {@code o.equals(e)}." 568 *" 569 * @param o object to be checked for containment in this queue" 570 * @return {@code true} if this queue contains the specified element" 571 */" 572 public boolean contains(Object o) {" 573 if (o == null) return false;" 574 fullyLock();" 575 try {" 576 for (Node<E> p = head.next; p != null; p = p.next)" 577 if (o.equals(p.item))" 578 return true;" 579 return false;" 580 } finally {" 581 fullyUnlock();" 582 }" 583 }" 584 " 585 /**" 586 * Returns an array containing all of the elements in this queue, in" 587 * proper sequence." 588 *" 589 * <p>The returned array will be "safe" in that no references to it are" 590 * maintained by this queue. (In other words, this method must allocate" 591 * a new array). The caller is thus free to modify the returned array." 592 *" 593 * <p>This method acts as bridge between array-based and collection-based" 594 * APIs." 595 *" 596 * @return an array containing all of the elements in this queue" 597 */" 598 public Object[] toArray() {" 599 fullyLock();" 600 try {" 601 int size = count.get();" 602 Object[] a = new Object[size];" 603 int k = 0;" 604 for (Node<E> p = head.next; p != null; p = p.next)" 605 a[k++] = p.item;" 606 return a;" 607 } finally {" 608 fullyUnlock();" 609 }" 610 }" 611 " 612 /**" 613 * Returns an array containing all of the elements in this queue, in" 614 * proper sequence; the runtime type of the returned array is that of" 615 * the specified array. If the queue fits in the specified array, it" 616 * is returned therein. Otherwise, a new array is allocated with the" 617 * runtime type of the specified array and the size of this queue." 618 *" 619 * <p>If this queue fits in the specified array with room to spare" 620 * (i.e., the array has more elements than this queue), the element in" 621 * the array immediately following the end of the queue is set to" 622 * {@code null}." 623 *" 624 * <p>Like the {@link #toArray()} method, this method acts as bridge between" 625 * array-based and collection-based APIs. Further, this method allows" 626 * precise control over the runtime type of the output array, and may," 627 * under certain circumstances, be used to save allocation costs." 628 *" 629 * <p>Suppose {@code x} is a queue known to contain only strings." 630 * The following code can be used to dump the queue into a newly" 631 * allocated array of {@code String}:" 632 *" 633 * <pre>" 634 * String[] y = x.toArray(new String[0]);</pre>" 635 *" 636 * Note that {@code toArray(new Object[0])} is identical in function to" 637 * {@code toArray()}." 638 *" 639 * @param a the array into which the elements of the queue are to" 640 * be stored, if it is big enough; otherwise, a new array of the" 641 * same runtime type is allocated for this purpose" 642 * @return an array containing all of the elements in this queue" 643 * @throws ArrayStoreException if the runtime type of the specified array" 644 * is not a supertype of the runtime type of every element in" 645 * this queue" 646 * @throws NullPointerException if the specified array is null" 647 */" 648 @SuppressWarnings("unchecked")" 649 public <T> T[] toArray(T[] a) {" 650 fullyLock();" 651 try {" 652 int size = count.get();" 653 if (a.length < size)" 654 a = (T[])java.lang.reflect.Array.newInstance" 655 (a.getClass().getComponentType(), size);" 656 " 657 int k = 0;" 658 for (Node<E> p = head.next; p != null; p = p.next)" 659 a[k++] = (T)p.item;" 660 if (a.length > k)" 661 a[k] = null;" 662 return a;" 663 } finally {" 664 fullyUnlock();" 665 }" 666 }" 667 " 668 public String toString() {" 669 fullyLock();" 670 try {" 671 Node<E> p = head.next;" 672 if (p == null)" 673 return "[]";" 674 Save This Page! Home » openjdk-7 » java » util » concurrent » [javadoc | source]! 1 /*" 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER." 3 *" 4 * This code is free software; you can redistribute it and/or modify it" 5 * under the terms of the GNU General Public License version 2 only, as" 6 * published by the Free Software Foundation. Oracle designates this" 7 * particular file as subject to the "Classpath" exception as provided" 8 * by Oracle in the LICENSE file that accompanied this code." 9 *" 10 * This code is distributed in the hope that it will be useful, but WITHOUT" 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or" 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License" 13 * version 2 for more details (a copy is included in the LICENSE file that" 14 * accompanied this code)." 15 *" 16 * You should have received a copy of the GNU General Public License version" 17 * 2 along with this work; if not, write to the Free Software Foundation," 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA." 19 *" 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA" 21 * or visit www.oracle.com if you need additional information or have any" 22 * questions." 23 */" 24 " 25 /*" 26 * This file is available under and governed by the GNU General Public" 27 * License version 2 only, as published by the Free Software Foundation." 28 * However, the following notice accompanied the original version of this" 29 * file:" 30 *" 31 * Written by Doug Lea with assistance from members of JCP JSR-166" 32 * Expert Group and released to the public domain, as explained at" 33 * http://creativecommons.org/publicdomain/zero/1.0/" 34 */" 35 " 36 package java.util.concurrent;" 37 " 38 import java.util.concurrent.atomic.AtomicInteger;" 39 import java.util.concurrent.locks.Condition;" 40 import java.util.concurrent.locks.ReentrantLock;" 41 import java.util.AbstractQueue;" 42 import java.util.Collection;" 43 import java.util.Iterator;" 44 import java.util.NoSuchElementException;" 45 " 46 /**" 47 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on" 48 * linked nodes." 49 * This queue orders elements FIFO (first-in-first-out)." 50 * The <em>head</em> of the queue is that element that has been on the" 51 * queue the longest time." 52 * The <em>tail</em> of the queue is that element that has been on the" 53 * queue the shortest time. New elements" 54 * are inserted at the tail of the queue, and the queue retrieval" 55 * operations obtain elements at the head of the queue." 56 * Linked queues typically have higher throughput than array-based queues but" 57 * less predictable performance in most concurrent applications." 58 *" 59 * <p> The optional capacity bound constructor argument serves as a" 60 * way to prevent excessive queue expansion. The capacity, if unspecified," 61 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are" 62 * dynamically created upon each insertion unless this would bring the" 63 * queue above capacity." 64 *" 65 * <p>This class and its iterator implement all of the" 66 * <em>optional</em> methods of the {@link Collection} and {@link" 67 * Iterator} interfaces." 68 *" 69 * <p>This class is a member of the" 70 * <a href="{@docRoot}/../technotes/guides/collections/index.html">" 71 * Java Collections Framework</a>." 72 *" 73 * @since 1.5" 74 * @author Doug Lea" 75 * @param <E> the type of elements held in this collection" 76 *" 77 */" 78 public class LinkedBlockingQueue<E> extends AbstractQueue<E>" 79 implements BlockingQueue<E>, java.io.Serializable {" 80 private static final long serialVersionUID = -6903933977591709194L;" 81 " 82 /*" 83 * A variant of the "two lock queue" algorithm. The putLock gates" 84 * entry to put (and offer), and has an associated condition for" 85 * waiting puts. Similarly for the takeLock. The "count" field" 86 * that they both rely on is maintained as an atomic to avoid" 87 * needing to get both locks in most cases. Also, to minimize need" 88 * for puts to get takeLock and vice-versa, cascading notifies are" 89 * used. When a put notices that it has enabled at least one take," 90 * it signals taker. That taker in turn signals others if more" 91 * items have been entered since the signal. And symmetrically for" 92 * takes signalling puts. Operations such as remove(Object) and" 93 * iterators acquire both locks." 94 *" 95 * Visibility between writers and readers is provided as follows:" 96 *" 97 * Whenever an element is enqueued, the putLock is acquired and" 98 * count updated. A subsequent reader guarantees visibility to the" 99 * enqueued Node by either acquiring the putLock (via fullyLock)" 100 * or by acquiring the takeLock, and then reading n = count.get();" 101 * this gives visibility to the first n items." 102 *" 103 * To implement weakly consistent iterators, it appears we need to" 104 * keep all Nodes GC-reachable from a predecessor dequeued Node." 105 * That would cause two problems:" 106 * - allow a rogue Iterator to cause unbounded memory retention" 107 * - cause cross-generational linking of old Nodes to new Nodes if" 108 * a Node was tenured while live, which generational GCs have a" 109 * hard time dealing with, causing repeated major collections." 110 * However, only non-deleted Nodes need to be reachable from" 111 * dequeued Nodes, and reachability does not necessarily have to" 112 * be of the kind understood by the GC. We use the trick of" 113 * linking a Node that has just been dequeued to itself. Such a" 114 * self-link implicitly means to advance to head.next." 115 */" 116 " 117 /**" 118 * Linked list node class" 119 */" 120 static class Node<E> {" 121 E item;" 122 " 123 /**" 124 * One of:" 125 * - the real successor Node" 126 * - this Node, meaning the successor is head.next" 127 * - null, meaning there is no successor (this is the last node)" 128 */" 129 Node<E> next;" 130 " 131 Node(E x) { item = x; }" 132 }" 133 " 134 /** The capacity bound, or Integer.MAX_VALUE if none */" 135 private final int capacity;" 136 " 137 /** Current number of elements */" 138 private final AtomicInteger count = new AtomicInteger(0);" 139 " 140 /**" 141 * Head of linked list." 142 * Invariant: head.item == null" 143 */" 144 private transient Node<E> head;" 145 " 146 /**" 147 * Tail of linked list." 148 * Invariant: last.next == null" 149 */" 150 private transient Node<E> last;" 151 " 152 /** Lock held by take, poll, etc */" 153 private final ReentrantLock takeLock = new ReentrantLock();" 154 " 155 /** Wait queue for waiting takes */" 156 private final Condition notEmpty = takeLock.newCondition();" 157 " 158 /** Lock held by put, offer, etc */" 159 private final ReentrantLock putLock = new ReentrantLock();" 160 " 161 /** Wait queue for waiting puts */" 162 private final Condition notFull = putLock.newCondition();" 163 " 164 /**" 165 * Signals a waiting take. Called only from put/offer (which do not" 166 * otherwise ordinarily lock takeLock.)" 167 */" 168 private void signalNotEmpty() {" 169 final ReentrantLock takeLock = this.takeLock;" 170 takeLock.lock();" 171 try {" 172 notEmpty.signal();" 173 } finally {" 174 takeLock.unlock();" 175 }" 176 }" 177 " 178 /**" 179 * Signals a waiting put. Called only from take/poll." 180 */" 181 private void signalNotFull() {" 182 final ReentrantLock putLock = this.putLock;" 183 putLock.lock();" 184 try {" 185 notFull.signal();" 186 } finally {" 187 putLock.unlock();" 188 }" 189 }" 190 " 191 /**" 192 * Links node at end of queue." 193 *" 194 * @param node the node" 195 */" 196 private void enqueue(Node<E> node) {" 197 // assert putLock.isHeldByCurrentThread();" 198 // assert last.next == null;" 199 last = last.next = node;" 200 }" 201 " 202 /**" 203 * Removes a node from head of queue." 204 *" 205 * @return the node" 206 */" 207 private E dequeue() {" 208 // assert takeLock.isHeldByCurrentThread();" 209 // assert head.item == null;" 210 Node<E> h = head;" 211 Node<E> first = h.next;" 212 h.next = h; // help GC" 213 head = first;" 214 E x = first.item;" 215 first.item = null;" 216 return x;" 217 }" 218 " 219 /**" 220 * Lock to prevent both puts and takes." 221 */" 222 void fullyLock() {" 223 putLock.lock();" 224 takeLock.lock();" 225 }" 226 " 227 /**" 228 * Unlock to allow both puts and takes." 229 */" 230 void fullyUnlock() {" 231 takeLock.unlock();" 232 putLock.unlock();" 233 }" 234 " 235 // /**" 236 // * Tells whether both locks are held by current thread." 237 // */" 238 // boolean isFullyLocked() {" 239 // return (putLock.isHeldByCurrentThread() &&" 240 // takeLock.isHeldByCurrentThread());" 241 // }" 242 " 243 /**" 244 * Creates a {@code LinkedBlockingQueue} with a capacity of" 245 * {@link Integer#MAX_VALUE}." 246 */" 247 public LinkedBlockingQueue() {" 248 this(Integer.MAX_VALUE);" 249 }" 250 " 251 /**" 252 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity." 253 *" 254 * @param capacity the capacity of this queue" 255 * @throws IllegalArgumentException if {@code capacity} is not greater" 256 * than zero" 257 */" 258 public LinkedBlockingQueue(int capacity) {" 259 if (capacity <= 0) throw new IllegalArgumentException();" 260 this.capacity = capacity;" 261 last = head = new Node<E>(null);" 262 }" 263 " 264 /**" 265 * Creates a {@code LinkedBlockingQueue} with a capacity of" 266 * {@link Integer#MAX_VALUE}, initially containing the elements of the" 267 * given collection," 268 * added in traversal order of the collection's iterator." 269 *" 270 * @param c the collection of elements to initially contain" 271 * @throws NullPointerException if the specified collection or any" 272 * of its elements are null" 273 */" 274 public LinkedBlockingQueue(Collection<? extends E> c) {" 275 this(Integer.MAX_VALUE);" 276 final ReentrantLock putLock = this.putLock;" 277 putLock.lock(); // Never contended, but necessary for visibility" 278 try {" 279 int n = 0;" 280 for (E e : c) {" 281 if (e == null)" 282 throw new NullPointerException();" 283 if (n == capacity)" 284 throw new IllegalStateException("Queue full");" 285 enqueue(new Node<E>(e));" 286 ++n;" 287 }" 288 count.set(n);" 289 } finally {" 290 putLock.unlock();" 291 }" 292 }" 293 " 294 " 295 // this doc comment is overridden to remove the reference to collections" 296 // greater in size than Integer.MAX_VALUE" 297 /**" 298 * Returns the number of elements in this queue." 299 *" 300 * @return the number of elements in this queue" 301 */" 302 public int size() {" 303 return count.get();" 304 }" 305 " 306 // this doc comment is a modified copy of the inherited doc comment," 307 // without the reference to unlimited queues." 308 /**" 309 * Returns the number of additional elements that this queue can ideally" 310 * (in the absence of memory or resource constraints) accept without" 311 * blocking. This is always equal to the initial capacity of this queue" 312 * less the current {@code size} of this queue." 313 *" 314 * <p>Note that you <em>cannot</em> always tell if an attempt to insert" 315 * an element will succeed by inspecting {@code remainingCapacity}" 316 * because it may be the case that another thread is about to" 317 * insert or remove an element." 318 */" 319 public int remainingCapacity() {" 320 return capacity - count.get();" 321 }" 322 " 323 /**" 324 * Inserts the specified element at the tail of this queue, waiting if" 325 * necessary for space to become available." 326 *" 327 * @throws InterruptedException {@inheritDoc}" 328 * @throws NullPointerException {@inheritDoc}" 329 */" 330 public void put(E e) throws InterruptedException {" 331 if (e == null) throw new NullPointerException();" 332 // Note: convention in all put/take/etc is to preset local var" 333 // holding count negative to indicate failure unless set." 334 int c = -1;" 335 Node<E> node = new Node(e);" 336 final ReentrantLock putLock = this.putLock;" 337 final AtomicInteger count = this.count;" 338 putLock.lockInterruptibly();" 339 try {