Lock-free структуры данных. Concurrent maps: skip list. Неравномерное распределение запросов

В предыдущих статьях (раз , два) мы рассматривали классический hash map с хеш-таблицей и списком коллизий. Был построен lock-free ordered list, который послужил нам основой для lock-free hash map.
К сожалению, списки характеризуются линейной сложностью поиска O(N) , где N - число элементов в списке, так что наш алгоритм lock-free ordered list сам по себе представляет небольшой интерес при больших N .
Или все же представляет?..

Существует довольно любопытная структура данных - skip-list , основанная на обычном односвязном списке. Впервые она была описана в 1990 году W.Pugh, перевод его оригинальной статьи есть на хабре. Эта структура представляет для нас интерес, так как, имея алгоритм lock-free ordered list, довольно легко реализовать lock-free skip-list.
Для начала - немного о принципах построения skip-list. Skip-list представляет собой многоуровневый список: каждый элемент (иногда называемый башней, tower) имеет некоторую высоту h .


Высота h выбирается случайным образом из диапазона , где Hmax - максимально возможная высота, обычно 16 или 32. Вероятность того, что высота башни равна единице, есть P = 1/2 . Вероятность высоты k есть:

Несмотря на кажущуюся сложность вычисления высоты, её можно рассчитать довольно просто, например, так: h = lsb(rand()) , где lsb - номер младшего значащего бита числа.

Magic 1/2

На самом деле, константа 1/2 - это мое упрощение, в оригинальной статье идет речь о 0 < p < 1 и исследуется поведение skip-list при различных значениях p . Но для практической реализации p = 1/2 - наилучшее значение, мне кажется.


Получается, что чем больше уровень, тем более список на этом уровне разрежен по сравнению с нижележащими. Эта разреженность наряду с вероятностной природой дает оценку сложности поиска O(log N) , - такую же, как для бинарных самобалансирующихся деревьев.
Поиск в skip-list"е довольно прост: начиная с head-башни, которая имеет максимальную высоту, проходим по самому верхнему уровню, пока не найдем элемент с ключом больше искомого (или не упремся в хвост tail), спускаемся на уровень ниже, ищем аналогично в нем и т.д., пока не попадем на уровень 0, самый нижний; пример поиска ключа 34:

Lock-free skip list

Для построения lock-free skip-list у нас уже есть lock-free алгоритм для каждого уровня. Осталось разработать приемы работы с уровнями. Казалось бы, невозможно атомарно вставить узел-башню высотой, скажем, 20, - ведь для этого нужно атомарно поменять 20 указателей! Оказывается, этого и не нужно, достаточно уметь атомарно менять один указатель, - то, что мы уже умеем делать в lock-free list.
Рассмотрим, как происходит вставка в lock-free skip-list. Будем вставлять элемент-башню высотой 5 с ключом 23. Первым этапом мы ищем позицию вставки, двигаясь по уровням сверху вниз. В результате у нас получается массив prev позиций вставки на каждом уровне:


Далее, вставляем новый элемент на уровень 0, самый нижний. Вставку в lock-free список мы уже умеем делать:


Всё, - элемент становится частью списка, его можно найти, он становится достижимым , несмотря на то, что вся башня целиком ещё не вставлена в skip-list.
Далее мы не торопясь вставляем нашу башню на уровни выше, снизу вверх:


Эти вставки имеют уже второстепенное значение, призваны повысить эффективность поиска, но никак не влияют на принципиальную достижимость нового ключа.
Удаление элемента происходит в две фазы: сначала мы находим удаляемый элемент и помечаем его как логически удаленный, используя прием marked pointer. Сложность в том, что для башни мы должны пометить все уровни элемента, начиная с самого верхнего:


После того, как все уровни элемента-башни помечены, делается физическое удаление (точнее говоря, исключение элемента из списка, так как удаление памяти под элемент выполняется Hazard Pointer или RCU), также сверху вниз:


На каждом уровне применяется алгоритм вставки/удаления из обычного lock-free ordered list, который мы уже рассматривали.

Как видим, процедуры вставки/удаления из lock-free skip-list многошаговые, на каждом шаге возможна интерференция с конкурирующими операциями, поэтому при программировании skip-list нужна особая аккуратность. Например, при вставке мы сначала ищем позиции в списках на всех уровнях и формируем массивы prev . Вполне возможно, что в процессе вставки список на каком-то уровне изменится и эти позиции станут невалидными. В этом случае следует обновить prev , то есть найти вставляемый элемент, и продолжить вставку, начиная с уровня, на котором произошел облом.
Более интересна ситуация, когда происходит одновременная вставка ключа K и его удаление. Такое вполне возможно: вставка считается успешно выполненной, когда мы связали элемент на уровне 0 списка. После этого элемент уже достижим и его вполне можно удалить, несмотря на то, что он ещё не до конца вставлен в верхние уровни. Для разрешения коллизии вставки и удаления очень важен порядок: вставка производится снизу вверх, а удаление (точнее, его первая фаза - логическое удаление, marked pointer) - противоходом, сверху вниз. В таком случае процедура вставки обязательно увидит на каком-либо уровне метку удаления и немедленно прекратит свою работу.
Процедура удаления также может быть конкурентной на обоих своих фазах. На фазе логического удаления, когда проставляются метки на башне сверху вниз, нам конкуренция не страшна. А вот на фазе исключения удаляемого элемента из списков любое изменение skip-list"а, то есть нарушение массивов prev и found , определяющих позицию удаляемого элемента, приводит к тому, что нам надо эти массивы сформировать заново. Но метки уже проставлены и функция поиска просто не найдет удаляемый элемент! Для разрешения этой ситуации мы наделяем функцию поиска несвойственной ей работой: при обнаружении помеченного на каком-либо уровне элемента функция поиска исключает (unlink) этот элемент из списка этого уровня, то есть помогает удалять элементы. После исключения функция возобновляет поиск с самого начала, то есть с самого верхнего уровня (здесь возможны вариации, но самое простое - начать с начала). Это типичный пример взаимопомощи, часто встречающийся в lock-free программировании: один поток помогает другому делать его работу. Именно поэтому функция find() во многих lock-free контейнерах не является константной - поиск может изменить контейнер.
Чем ещё характеризуется skip-list? Во-первых, это упорядоченный map, в отличие от hash map, то есть поддерживает операции извлечения минимального extract_min() и максимального extract_max() ключей.
Во-вторых, skip-list прожорлив для схемы Hazard Pointrer (HP): при максимальной высоте башни 32 элементы массивов prev и found , определяющих искомую позицию, должны быть защищены hazard pointer"ами, то есть нам требуется минимум 64 hazard pointer"а (на самом деле, в реализации libcds – 66). Это довольно много для схемы HP, см. подробнее её устройство . Для схемы RCU некоторую сложность представляет реализация метода find() , так как этот метод может удалять элементы, а схема RCU требует, чтобы удаление исключение (unlink) элемента из списка было под критической секцией RCU, а удаление деаллокация памяти - вне критической секции, иначе возможен deadlock.
Интересную практическую задачу представляет собой реализация башен для высоты более 1. Сейчас в реализации skip-list в библиотеке libcds память под башни высотой более 1 распределяется отдельно от памяти под элемент даже для интрузивного варианта. Учитывая вероятностную природу высоты, получается, что для 50% элементов делается распределение памяти, - это может сказаться на производительности, а также негативно влияет на фрагментацию памяти. Есть задумка башни высотой не более h_min распределять одним блоком и только для высоких «дораспределять» память под башню:
template struct tower { tower * next; tower * high_next; // массив для указателей уровней >= Hmin };
Если Hmin = 4 , то при таком построении 93% элементов не потребуют распределения дополнительной памяти под указатели next - high_next для них будет nullptr .

skip-list: библиография


W.Pugh спустя год-два после опубликования своей работы о skip-list представил другую работу, посвященную конкурентной реализации, основанной на аккуратной расстановке блокировок (fine-grained locks).
Наиболее цитируемая работа по lock-free skip-list – это диссертация K.Fraser Practical lock-freedom 2003 года, в которой представлено несколько алгоритмов skip-list как на основе транзакционной памяти, так и на программной реализации MCAS (CAS над несколькими ячейками памяти). Эта работа представляет сейчас, на мой взгляд, чисто теоретический интерес, так как программная эмуляция транзакционной памяти довольно тяжела, равно как и MCAS. Хотя с выходом Haswell реализация транзакционной памяти в железе пошла в массы…
Реализация skip-list в libcds сделана по мотивам неоднократно цитируемой мною монографии The Art of Multiprocessor Programming . «По мотивам» потому, что оригинальный алгоритм приведен для Java и имеет, как мне кажется, несколько ошибок, которые, быть может, были исправлены во втором издании. Или это вовсе не ошибки для Java.
Есть ещё диссертация Фомичева , посвященная вопросу возобновления поиска в случае обнаруженных конфликтов в skip-list. Как я упоминал выше, find() при обнаружении помеченного (marked) элемента пытается удалить его из списка и возобновляет поиcк с самого начала . Фомичев предлагает механизм расстановки back-reference – обратных ссылок, чтобы работу возобновлять с некоторого предыдущего элемента, а не с начала, что, в принципе, должно повлиять на производительность в лучшую сторону. Алгоритм поддержки back-reference в актуальном состоянии довольно сложный и непонятно, будет ли выигрыш или же его съест код поддержки актуальности.

В следующей статье попытаемся рассмотреть другой обширный класс структур данных, являющихся основой для ассоциативных контейнеров, - деревья, точнее, их конкурентную реализацию.

Интересной разновидностью структуры данных эффективной реализации АТД «упорядоченный словарь» является skip^cnucoK (skip list). Эта структура данных организует объекты в произвольном порядке таким

образом, что поиск и обновление в среднем выполняются за 0(log п) времени, где п - количество объектов словаря. Интересно, что понятие усредненной временной сложности, используемое здесь, не зависит от возможного распределения ключей при вводе. Наоборот, оно продиктовано использованием генераторов случайных чисел в реализации процесса ввода, чтрбы облегчить процесс принятия решения о месте вставки нового объекта. Время выполнения в этом случае будет равно среднему значению времени выполнения ввода любых случайных чисел, используемых в качестве входных объектов.

Методы генерации случайных чисел встраиваются в большинство современных компьютеров, поскольку они широко применяются в компьютерных играх, криптографии и компьютерных симуляторах. Некоторые методы, называемые генераторами псевдослучайных чисел (pseudorandom number generators), генерируют псевдослучайно числа по некоторому закону, начиная с так называемого начального числа (seed). Другие методы используют аппаратные средства для извлечения «действительно» случайных чисел. В любом случае компьютер имеет числа, отвечающие требованиям, предъявляемым к случайным числам в приводимом анализе.

Основным преимуществом использования случайных чисел в струк- > туре данных и при создании алгоритма является простота и надежность результативных структур и методов. Таким образом можно-создать простую рандомизированную структуру данных, называемую skip-список, обеспечивающую логарифмические затраты времени для поиска, аналогичные затратам при использовании бинарного поискового алгоритма. Тем не менее предполагаемые временные затраты при skip-списке будут большими, чем у бинарного поиска в поисковой таблице. С другой стороны, при словарном обновлении skip-список намного быстрее поисковых таблиц.

Пусть skip-список S для словаря D состоит из серии списков {iSo, S\, Si, З/j}. Каждый список S\ хранит набор объектов словаря D по ключам в неубывающей последовательности плюс объекты с двумя специальными ключами, записываемыми в виде «-оо» и «+оо», где «-оо» обозначает меньше, а «+оо» - больше любого возможного ключа, который может быть в D. Кроме того, списки в Sотвечают следующим требованиям:

Список S 0 содержит каждый объект словаря D (плюс специальные объекты с ключами «-оо» и «+оо»);

Для / = 1, …, h – 1 список Si содержит (в дополнение к «-оо» и «+оо») случайно сгенерированный набор объектов из списка S t _ ь

Список S h содержит только «-оо» и «+оо».

Образец такого skip-списка приведен на рис. 8.9, наглядно представляющем списодс S со списком в основании и списками S\, …, S^ над ним. Высоту (height) списка S обозначим как h.

Рис. 8.9. Пример skip-списка

Интуитивно списки организованы таким образом; чтобы?/+/ содержал хотя бы каждый второй объект 5/. Как будет показано при рассмотрении метода ввода, объекты в St+\ выбираются произвольно из объектов в Si с таким расчетом, чтобы каждый выбираемый объект 5/ входил в 5/ + \ с вероятностью 1/2. Образно говоря, помещать или нет объект из в Si + 1, определяем в зависимости от того, какой стороной упадет подкидываемая монетка - орлом или решкой. Таким образом, можно считать, что S\ содержит около я/2 объектов, S2 - я/4, и, соответственно, Sj - около п/2′ объектов. Другими словами, высота h списка S может составлять около logn. Хотя при этом деление пополам числа объектов от одного списка к другому не является обязательным требованием в виде явно выраженного свойства skip-списка. Наоборот, случайный выбор важнее.

Используя позиционную абстракцию применительно к спискам и деревьям, можно рассматривать skip-список как двухмерную коллекцию позиций, организованных горизонтально в уровни (levels) и вертикально в башни (towers). Каждый уровень - это список S^ а каждая башня - набор позиций, хранящих один и тот же объект и расположенных друг над другом в списках. Позиции skip-списка можно проходить с помощью следующих операций:

after(/?): возвращает позицию, следующую за р ца том же уровне; before(/?): возвращает позицию, предшествующую р на том же уровне; below(/?): возвращает позицию, расположенную под р в той же башне; above(/?): возвращает позицию, расположенную над р в той же башне.

Установим, что перечисленные выше операции должны возвращать null, если запрашиваемой позиции не существует. Не вдаваясь в подробности, заметим, что skip-список легко реализуется с помощью связной структуры таким образом, что методы прохода требуют 0(1) времени (при наличии в списке позиции р). Такая связная структура представляет собой коллекцию из h двусвязных списков, объедийёйй^ й^айШ, в свою очередь являющиеся двусвязными списками.

Структура skip-списка позволяет применять простые поисковые алгоритмы для словарей. В действительности все поисковые алгоритмы skip-списка основываются на достаточно элегантном методе SkipSearch, принимающем ключ к и находящим объект в skip-списке S с наибольшим ключом (который может оказаться «-оо»), меньшим или равным к. Допустим, имеется искомый ключ к. Метод SkipSearch устанавливает позицию р в самой верхней левой позиции в skip-списке S. То есть р устанавливается на позиции специального объекта с ключом «-оо» в S^. Затем выполняется следующее:

1) если S.below(/?) равен null, то поиск заканчивается - на уровень ниже обнаружен наибольший объект в Sс ключом, меньшим или равным искомому ключу к. В противном случае опускаемся на один уровень в данной башне, устанавливая р S.be\ow(p);

2) из позиции р продвигаемся вправо (вперед) до тех пор, пока не окажемся в крайней правой позиции текущего уровня, где кеу(/?) < к. Такой шаг называется сканированием вперед (scan forward). Указанная позиция существует всегда, поскольку каждый уровень имеет специальные ключи «+оо» и «-оо». И на самом деле, после шага вправо по текущему уровню р может остаться на исходном месте. В любом случае после этого снова повторяется предыдущее действие.

Рис. 8.10. Пример поиска в skip-списке. Обследованные во время поиска ключа 50 позиции выделены штриховым обозначением

Фрагмент кода 8.4 содержит описание поискового алгоритма skip-списка. Имея такой метод, несложно реализовать операцию findElement(/:). Выполняется операция р^- SkipSearch(A;) и проверяется равенство key{p)~ k. При ^равенстве возвращается /?.element. В противном случае возвращается сигнальное сообщение NO_SUCH_KEY.

Алгоритм SkipSearch(/:):

Input: поисковый кл?оч к.

Output: позиция р в S, объект которой имеет наибольший ключ, меньший или равный к.

Допустим, что р - самая верхняя левая позиция в S (состоящей как минимум из двух уровней), while below (р) * null do

р below(p) {просканировать вниз} while key(after(p)) < к do

Let p after(p) {просканировать вперед} return p

Фрагмент кода 8.4. Алгоритм поиска в skip-списке S

Оказывается, предполагаемое время выполнения алгоритма SkipSearch составляет 0(log п). Прежде чем обосновать это, приведем реализацию методов обновления skip-списка

7 ответов

Поскольку вы упомянули Список, который является как индексируемым (я предполагаю, что вы хотите скорейшего извлечения), так и для того, чтобы разрешать дубликаты, я бы посоветовал вам использовать пользовательский набор с LinkedList или ArrayList.

Вам нужно иметь базовый набор, например, HashSet и продолжать добавлять к нему значения. Если вы сталкиваетесь с дубликатом, значение этого набора должно указывать на список. Итак, у вас будет и быстрый поиск, и, конечно же, вы сохраните свои объекты в виде коллекции psuedo.

Это должно дать вам хорошую эффективность для извлечения. В идеале, если ваши Ключи не дублируются, вы достигнете O (1) в качестве скорости поиска.

Этот ответ задерживается на 3 года, но я надеюсь, что он будет полезен тем, кто хочет получить список пропусков Java с этого момента:)

Это решение позволяет дублировать, как вы просили. Я следую примерно руководству здесь http://igoro.com/archive/skip-lists-are-fascinating , поэтому сложности похожи на то, что удалить стоит O (nlogn) - как я это делал "Не пытайтесь использовать двусвязные узлы, я предполагаю, что это приведет к тому, что удалить до O (logn).

Код содержит: интерфейс, список пропуска, реализующий интерфейс, и класс node. Он также является общим.

Вы можете настроить параметр LEVELS для производительности, но помните компромисс между пространством и временем.

> search(T data); } public class SkipList> implements SkippableList { public static void main(String args) { SkipList sl = new SkipList<>(); int data = {4,2,7,0,9,1,3,7,3,4,5,6,0,2,8}; for (int i: data) { sl.insert(i); } sl.print(); sl.search(4); sl.delete(9); sl.print(); sl.insert(69); sl.print(); sl.search(69); } private final SkipNode head = new SkipNode<> SkipNode = new SkipNode<>(data); for (int i = 0; i < LEVELS; i++) { if (rand.nextInt((int) Math.pow(2, i)) == 0) { //insert with prob = 1/(2^i) insert(SkipNode, i); } } } @Override public boolean delete(T target) { System.out.println("Deleting " + target.toString()); SkipNode victim = search(target, false); if (victim == null) return false; victim.data = null; for (int i = 0; i < LEVELS; i++) { head.refreshAfterDelete(i); } System.out.println(); return true; } @Override public SkipNode search(T data) { return search(data, true); } @Override public void print() { for (int i = 0; i < LEVELS; i++) { head.print(i); } System.out.println(); } private void insert(SkipNode result = null; for (int i = LEVELS-1; i >= 0; i--) { if ((result = head.search(data, i, print)) != null) { if (print) { System.out.println("Found " + data.toString() + " at level " + i + ", so stoppped"); System.out.println(); } break; } } return result; } } class SkipNode> next = (SkipNode current = this.getNext(level); while (current != null && current.getNext(level) != null) { if (current.getNext(level).data == null) { SkipNode result = null; SkipNode < 1) { if (current.data.equals(data)) { result = current; break; } current = current.getNext(level); } return result; } void insert(SkipNode SkipNode, int level) { SkipNode current = this.getNext(level); if (current == null) { this.setNext(SkipNode, level); return; } if (SkipNode.data.compareTo(current.data) < 1) { this.setNext(SkipNode, level); SkipNode.setNext(current, level); return; } while (current.getNext(level) != null && current.data.compareTo(SkipNode.data) < 1 && current.getNext(level).data.compareTo(SkipNode.data) < 1) { current = current.getNext(level); } SkipNode successor = current.getNext(level); current.setNext(SkipNode, level); SkipNode.setNext(successor, level); } void print(int level) { System.out.print("level " + level + ": ["); int length = 0; SkipNode current = this.getNext(level); while (current != null) { length++; System.out.print(current.data.toString() + " "); current = current.getNext(level); } System.out.println("], length: " + length); } }

Вы можете использовать приведенное ниже код, чтобы сделать свой собственный базовый скипист :

1) Сделайте start и конец до represent start and end of skip list .

2) Add the nodes и assign pointers до следующего based on

If(node is even) then ,assign a fast lane pointer with next pointer else assign only pointer to next node

Java-код для базового списка пропуска (вы можете добавить дополнительные функции):

Public class MyClass { public static void main(String args) { Skiplist skiplist=new Skiplist(); Node n1=new Node(); Node n2=new Node(); Node n3=new Node(); Node n4=new Node(); Node n5=new Node(); Node n6=new Node(); n1.setData(1); n2.setData(2); n3.setData(3); n4.setData(4); n5.setData(5); n6.setData(6); skiplist.insert(n1); skiplist.insert(n2); skiplist.insert(n3); skiplist.insert(n4); skiplist.insert(n5); skiplist.insert(n6); /*print all nodes*/ skiplist.display(); System.out.println(); /* print only fast lane node*/ skiplist.displayFast(); } } class Node{ private int data; private Node one_next; //contain pointer to next node private Node two_next; //pointer to node after the very next node public int getData() { return data; } public void setData(int data) { this.data = data; } public Node getOne_next() { return one_next; } public void setOne_next(Node one_next) { this.one_next = one_next; } public Node getTwo_next() { return two_next; } public void setTwo_next(Node two_next) { this.two_next = two_next; } } class Skiplist{ Node start; //start pointer to skip list Node head; Node temp_next; //pointer to store last used fast lane node Node end; //end of skip list int length; public Skiplist(){ start=new Node(); end=new Node(); length=0; temp_next=start; } public void insert(Node node){ /*if skip list is empty */ if(length==0){ start.setOne_next(node); node.setOne_next(end); temp_next.setTwo_next(end); head=start; length++; } else{ length++; Node temp=start.getOne_next(); Node prev=start; while(temp != end){ prev=temp; temp=temp.getOne_next(); } /*add a fast lane pointer for even no of nodes*/ if(length%2==0){ prev.setOne_next(node); node.setOne_next(end); temp_next.setTwo_next(node); temp_next=node; node.setTwo_next(end); } /*odd no of node will not contain fast lane pointer*/ else{ prev.setOne_next(node); node.setOne_next(end); } } } public void display(){ System.out.println("--Simple Traversal--"); Node temp=start.getOne_next(); while(temp != end){ System.out.print(temp.getData()+"=>"); temp=temp.getOne_next(); } } public void displayFast(){ System.out.println("--Fast Lane Traversal--"); Node temp=start.getTwo_next(); while(temp !=end){ System.out.print(temp.getData()+"==>"); temp=temp.getTwo_next(); } } }

Вывод:

Простой обход -

1 = > 2 = > 3 = > 4 = > 5 = > 6 = >

Быстрая перемотка дорожки -

2 == > 4 == > 6 == >

Когда вы создаете ConcurrentSkipListSet , вы передаете компаратор в конструктор.

New ConcurrentSkipListSet<>(new ExampleComparator()); public class ExampleComparator implements Comparator {//your impl }

Вы можете создать компаратор, который сделает ваш SkipListSet вести себя как обычный список.

Я не утверждаю, что это моя собственная реализация. Я просто не могу вспомнить, где я его нашел. Если вы знаете, дайте мне знать, и я обновлю. Это работает для меня хорошо:

Public class SkipList> implements Iterable { Node _head = new Node<>(null, 33); private final Random rand = new Random(); private int _levels = 1; private AtomicInteger size = new AtomicInteger(0); ///

/// Inserts a value into the skip list. /// public void insert(T value) { // Determine the level of the new node. Generate a random number R. The // number of // 1-bits before we encounter the first 0-bit is the level of the node. // Since R is // 32-bit, the level can be at most 32. int level = 0; size.incrementAndGet(); for (int R = rand.nextInt(); (R & 1) == 1; R >>= 1) { level++; if (level == _levels) { _levels++; break; } } // Insert this node into the skip list Node newNode = new Node<>(value, level + 1); Node cur = _head; for (int i = _levels - 1; i >= 0; i--) { for (; cur.next[i] != null; cur = cur.next[i]) { if (cur.next[i].getValue().compareTo(value) > 0) break; } if (i <= level) { newNode.next[i] = cur.next[i]; cur.next[i] = newNode; } } } /// /// Returns whether a particular value already exists in the skip list /// public boolean contains(T value) { Node cur = _head; for (int i = _levels - 1; i >= 0; i--) { for (; cur.next[i] != null; cur = cur.next[i]) { if (cur.next[i].getValue().compareTo(value) > 0) break; if (cur.next[i].getValue().compareTo(value) == 0) return true; } } return false; } /// /// Attempts to remove one occurence of a particular value from the skip /// list. Returns /// whether the value was found in the skip list. /// public boolean remove(T value) { Node cur = _head; boolean found = false; for (int i = _levels - 1; i >= 0; i--) { for (; cur.next[i] != null; cur = cur.next[i]) { if (cur.next[i].getValue().compareTo(value) == 0) { found = true; cur.next[i] = cur.next[i].next[i]; break; } if (cur.next[i].getValue().compareTo(value) > 0) break; } } if (found) size.decrementAndGet(); return found; } @SuppressWarnings({ "rawtypes", "unchecked" }) @Override public Iterator iterator() { return new SkipListIterator(this, 0); } public int size() { return size.get(); } public Double toArray() { Double a = new Double; int i = 0; for (T t: this) { a[i] = (Double) t; i++; } return a; } } class Node> { public Node next; public N value; @SuppressWarnings("unchecked") public Node(N value, int level) { this.value = value; next = new Node; } public N getValue() { return value; } public Node getNext() { return next; } public Node getNext(int level) { return next; } public void setNext(Node next) { this.next = next; } } class SkipListIterator> implements Iterator { SkipList list; Node current; int level; public SkipListIterator(SkipList list, int level) { this.list = list; this.current = list._head; this.level = level; } public boolean hasNext() { return current.getNext(level) != null; } public E next() { current = current.getNext(level); return current.getValue(); } public void remove() throws UnsupportedOperationException { throw new UnsupportedOperationException(); } }

Исправлена ​​ошибка в реализации, предоставляемой @PoweredByRice. Он выбросил NPE для случаев, когда удаленный node был первым node. Другие обновления включают переименованные имена переменных и обратную печать порядка списка пропусков.

Import java.util.Random; interface SkippableList> { int LEVELS = 5; boolean delete(T target); void print(); void insert(T data); SkipNode search(T data); } class SkipNode> { N data; @SuppressWarnings("unchecked") SkipNode next = (SkipNode) new SkipNode; SkipNode(N data) { this.data = data; } void refreshAfterDelete(int level) { SkipNode current = this; while (current != null && current.getNext(level) != null) { if (current.getNext(level).data == null) { SkipNode successor = current.getNext(level).getNext(level); current.setNext(successor, level); return; } current = current.getNext(level); } } void setNext(SkipNode next, int level) { this.next = next; } SkipNode getNext(int level) { return this.next; } SkipNode search(N data, int level, boolean print) { if (print) { System.out.print("Searching for: " + data + " at "); print(level); } SkipNode result = null; SkipNode current = this.getNext(level); while (current != null && current.data.compareTo(data) < 1) { if (current.data.equals(data)) { result = current; break; } current = current.getNext(level); } return result; } void insert(SkipNode skipNode, int level) { SkipNode current = this.getNext(level); if (current == null) { this.setNext(skipNode, level); return; } if (skipNode.data.compareTo(current.data) < 1) { this.setNext(skipNode, level); skipNode.setNext(current, level); return; } while (current.getNext(level) != null && current.data.compareTo(skipNode.data) < 1 && current.getNext(level).data.compareTo(skipNode.data) < 1) { current = current.getNext(level); } SkipNode successor = current.getNext(level); current.setNext(skipNode, level); skipNode.setNext(successor, level); } void print(int level) { System.out.print("level " + level + ": [ "); int length = 0; SkipNode current = this.getNext(level); while (current != null) { length++; System.out.print(current.data + " "); current = current.getNext(level); } System.out.println("], length: " + length); } } public class SkipList> implements SkippableList { private final SkipNode head = new SkipNode<>(null); private final Random rand = new Random(); @Override public void insert(T data) { SkipNode skipNode = new SkipNode<>(data); for (int i = 0; i < LEVELS; i++) { if (rand.nextInt((int) Math.pow(2, i)) == 0) { //insert with prob = 1/(2^i) insert(skipNode, i); } } } @Override public boolean delete(T target) { System.out.println("Deleting " + target); SkipNode victim = search(target, true); if (victim == null) return false; victim.data = null; for (int i = 0; i < LEVELS; i++) { head.refreshAfterDelete(i); } System.out.println("deleted..."); return true; } @Override public SkipNode search(T data) { return search(data, true); } @Override public void print() { for (int i = LEVELS-1; i >= 0 ; i--) { head.print(i); } System.out.println(); } private void insert(SkipNode SkipNode, int level) { head.insert(SkipNode, level); } private SkipNode search(T data, boolean print) { SkipNode result = null; for (int i = LEVELS-1; i >= 0; i--) { if ((result = head.search(data, i, print)) != null) { if (print) { System.out.println("Found " + data.toString() + " at level " + i + ", so stopped"); System.out.println(); } break; } } return result; } public static void main(String args) { SkipList sl = new SkipList<>(); int data = {4,2,7,0,9,1,3,7,3,4,5,6,0,2,8}; for (int i: data) { sl.insert(i); } sl.print(); sl.search(4); sl.delete(4); System.out.println("Inserting 10"); sl.insert(10); sl.print(); sl.search(10); } }

Существует довольно распространённое мнение, что выполнение различных тестовых заданий помогает очень быстро поднять свой профессиональный уровень. Я и сам люблю иногда откопать какое-нить мудреное тестовое и порешать его, чтобы быть постоянно в тонусе, как говорится. Как-то я выполнял конкурсное задание на стажировку в одну компанию, задачка показалась мне забавной и интересной, вот её краткий текст:
Представьте, что ваш коллега-нытик пришел рассказать о своей непростой задаче - ему нужно не просто упорядочить по возрастанию набор целых чисел, а выдать все элементы упорядоченного набора с L-го по R-й включительно!
Вы заявили, что это элементарная задача и, чтобы написать решение на языке C#, вам нужно десять минут. Ну, или час. Или два. Или шоколадка «Алёнка»

Предполагается, что в наборе допускаются дубликаты, и количество элементов будет не больше, чем 10^6.

К оценке решения есть несколько комментариев:

Ваш код будут оценивать и тестировать три программиста:
  • Билл будет запускать ваше решение на тестах размером не больше 10Кб.
  • В тестах Стивена количество запросов будет не больше 10^5, при этом количество запросов на добавление будет не больше 100.
  • В тестах Марка количество запросов будет не больше 10^5.
Решение может быть очень интересным, поэтому я посчитал нужным его описать.

Решение

Пусть у нас есть абстрактное хранилище.

Обозначим Add(e) - добавление элемента в хранилище, а Range(l, r) - взятие среза с l по r элемент.

Тривиальный вариант хранилища может быть таким:

  • Основой хранилища будет упорядоченный по возрастанию динамический массив.
  • При каждой вставке бинарным поиском находится позицию, в которую необходимо вставить элемент.
  • При запросе Range(l, r) будем просто брать срез массива из l по r.
Оценим сложность подхода, основанного на динамическом массиве.

C Range(l, r) - взятие среза можно оценить, как O(r - l).

C Add(e) - вставка в худшем случае будет работать за O(n), где n - количество элементов. При n ~ 10^6, вставка - узкое место. Ниже в статье будет предложен улучшенный вариант хранилища.

Пример исходного кода можно посмотреть .

Skiplist

Skiplist - это рандомизированная альтернатива деревьям поиска, в основе которой лежит несколько связных списков. Была изобретена William Pugh в 1989 году. Операции поиска, вставки и удаления выполняются за логарифмически случайное время.

Эта структура данных не получила широкой известности (кстати, и на хабре достаточно обзорно о ней написано), хотя у нее хорошие асимптотические оценки. Любопытства ради захотелось ее реализовать, тем более имелась подходящая задача.

Пусть у нас есть отсортированный односвязный список:

В худшем случае поиск выполняется за O(n). Как его можно ускорить?

В одной из видео-лекций, которые я пересматривал, когда занимался задачкой, приводился замечательный пример про экспресс-линии в Нью-Йорке:

  • Скоростные линии соединяют некоторые станции
  • Обычные линии соединяют все станции
  • Есть обычные линии между общими станциями скоростных линий
Идея в следующем: мы можем добавить некоторое количество уровней с некоторым количеством узлов в них, при этом узлы в уровнях должны быть равномерно распределены. Рассмотрим пример, когда на каждом из вышележащих уровней располагается половина элементов из нижележащего уровня:


На примере изображен идеальный SkipList, в реальности всё выглядит подобным образом, но немного не так:)

Поиск

Так происходит поиск. Предположим, что мы ищем 72-й элемент:

Вставка

Со вставкой все немного сложнее. Для того чтобы вставить элемент, нам необходимо понять, куда его вставить в самом нижнем списке и при этом протолкнуть его на какое-то количество вышележащих уровней. Но на какое количество уровней нужно проталкивать каждый конкретный элемент?

Решать это предлагается так: при вставке мы добавляем новый элемент в самый нижний уровень и начинаем подкидывать монетку, пока она выпадает, мы проталкиваем элемент на следующий вышележащий уровень.
Попробуем вставить элемент - 67. Сначала найдем, куда в нижележащем списке его нужно вставить:

Предположил, что монетка выпала два раза подряд. Значит нужно протолкнуть элемент на два уровня вверх:

Доступ по индексу

Для доступа по индексу предлагается сделать следующее: каждый переход пометить количеством узлов, которое лежит под ним:

После того, как мы получаем доступ к i-му элементу (кстати, получаем мы его за O(ln n)), сделать срез не представляется трудным.

Пусть необходимо найти Range(5, 7). Сначала получим элемент по индексу пять:


А теперь Range(5, 7):

О реализации

Кажется естественной реализация, когда узел SkipList выглядит следующим образом:
SkipListNode {
int Element;
SkipListNode Next;
int Width;
}
Собственно, так и сделано .

Но в C# в массиве хранится еще и его длина, а хотелось сделать это за как можно меньшее количество памяти (как оказалось, в условиях задачи все оценивалось не так строго). При этом хотелось сделать так, чтобы реализация SkipList и расширенного RB Tree занимала примерно одинаковое количество памяти.

Ответ, как уменьшить потребление памяти, неожиданно был найден при ближайшем рассмотрении из пакета java.util.concurrent.

Двумерный skiplist

Пусть в одном измерении будет односвязный список из всех элементов. Во втором же будут лежать «экспресс-линии» для переходов со ссылками на нижний уровень.
ListNode {
int Element;
ListNode Next;
}

Lane {
Lane Right;
Lane Down;
ListNode Node;
}

Нечестная монетка

Еще для снижения потребления памяти можно воспользоваться «нечестной» монеткой: уменьшить вероятность проталкивания элемента на следующий уровень. В статье William Pugh рассматривался срез из нескольких значений вероятности проталкивания. При рассмотрении значений ½ и ¼ на практике получилось примерно одинаковое время поиска при уменьшении потребления памяти.

Немного о генерации случайных чисел

Копаясь в потрохах ConcurrentSkipListMap, заметил, что случайные числа генерируются следующим образом:
int randomLevel() {
int x = randomSeed;
x ^= x << 13;
x ^= x >>> 17;
randomSeed = x ^= x << 5;
if ((x & 0x80000001) != 0)
return 0;
int level = 1;
while (((x >>>= 1) & 1) != 0) ++level;
return level;
}
Подробнее о генерации псевдослучайных чисел с помощью XOR можно почитать в этой статьe . Особого увеличения скорости я не заметил, поэтому, не использовал его.

Исходник получившегося хранилища можно глянуть .

Все вместе можно забрать с googlecode.com (проект Pagination).

Тесты

Было использовано три типа хранилища:
  1. ArrayBased (динамический массив)
  2. SkipListBased (SkipList c параметром ¼)
  3. RBTreeBased (красно-черное дерево: реализация моего знакомого, который выполнял аналогичное задание).
Было проведено три вида тестов на вставку 10^6 элементов:
  1. Упорядоченные по возрастанию элементы
  2. Упорядоченные по убыванию элементы
  3. Случайные элементы
Тесты проводились на машине с i5, 8gb ram, ssd и Windows 7 x64.
Результаты:
Array RBTree SkipList
Random 127033 ms 1020 ms 1737 ms
Ordered 108 ms 457 ms 536 ms
Ordered by descending 256337 ms 358 ms 407 ms
Вполне ожидаемые результаты. Видно, что вставка в массив, когда элемент вставляется куда-нибудь, кроме как в конец, работает медленнее всего. При этом SkipList медленнее RBTree.

Также были проведены замеры: сколько каждое хранилище занимает в памяти при вставленных в него 10^6 элементах. Использовался студийный профайлер, для простоты запускался такой код:

var storage = ...
for (var i = 0; i < 1000000; ++i)
storage.Add(i);
Результаты:
Array RBTree SkipList
Total bytes allocated 8,389,066 bytes 24,000,060 bytes 23,985,598 bytes
Ещё вполне ожидаемые результаты: хранилище на динамическом массиве заняло наименьшее количество памяти, а SkipList и RBTree заняли примерно одинаковый объем.

Хеппи-энд с «Алёнкой»

Коллега-нытик, по условию задачи, проспорил мне шоколадку. Моё решение зачли с максимальным баллом. Надеюсь, кому-нибудь данная статья будет полезна. Если есть какие-то вопросы - буду рад ответить.

P.S.: я был на стажировке в компании СКБ Контур. Это чтобы не отвечать на одинаковые вопросы =)

СКИП – это система контроля исполнения поручений на базе MS SharePoint, позволяющая полностью автоматизировать процесс работы с поручениями. Это продуманное, коробочное решение для организации работы с поручениями. Оно подходит как для работы в крупных и территориально-распределенных компаниях, так и для компаний среднего бизнеса за счет возможности осуществления максимально гибкой настройки всех модулей.

Система СКИП базируется на платформе Microsoft SharePoint, что автоматически означает возможность ее интеграции с продуктами Microsoft, в том числе Microsoft Office.

Функциональные возможности системы

Система СКИП является «коробочным» решением и в этом варианте содержит базовый набор функциональных возможностей, необходимых для автоматизации работы с поручениями:

  • Назначение, исполнение, контроль поручения;
  • Отслеживание статуса исполнения поручения;
  • Возможность создавать вложенные («дочерние») поручения.

Список поручений с цветовой индикацией

При этом представленный функционал реализован таким образом, чтобы предоставить пользователю максимально широкие возможности по работе с системой:

  • Каталогизация поручений (одно поручение может располагаться в различных папках);
  • Фильтрация списков поручений;
  • Экспорт списков поручений в MS Excel;
  • Отчеты по исполнительской дисциплине;
  • Цветовая индикация поручений в зависимости от срока исполнения и статуса поручения;
  • Возможность прикрепления произвольного количества файлов любых форматов к Карточке поручения;
  • Интеграция с календарями Outlook;
  • Настраиваемые оповещения о назначении и ходе работы с поручением;
  • Система замещения сотрудников на период отпуска или командировки;
  • Создание периодических поручений (или поручений с расписанием) для мероприятий, которые имеют определенный период (совещания, встречи и т.д.);
  • Отображение сроков исполнения поручений на диаграмме Ганта;
  • и прочее

Список поручений с диаграммой Ганта

Похожие публикации