Каналы

Каналы - это объекты помогающие наладить межзадачную коммуникацию. Под задачей здесь я буду иметь ввиду любой асинхронно выполняемый код. Канал - это разделяемый объект, доступный белее, чем одной задаче. Задачи могут работать в рамках одного процесса или на разных процессах, для этих двух вариантов тип объекта канала будет отличаться: локально доступный объект должен быть Channel, а в другом процессе - RemoteChannel. Независимо от типа канал предоставляет единый интерфейс. В него могут писать сразу несколько писателей и его могу читать несколько читателей. Организован он в виде очереди. Функция push!() посылает новое значение в канал. take!() забирает значение, удаляя его из канала. Есть и другие функции работы с каналами.

Асинхронная работа в рамках одного процесса

Разберем искусственный пример - копирование файла.

Этот пример я позаимствовал из отличной книги (которую горячо рекомендую всем) Андрея Александреску "Язык программирования D". Только там автор, рассчитывает за счет многопоточности достичь большей скорости копирования файлов (при определенных условиях). В моем же случае - пока хоть бы работало - уже хорошо. Поэтому я беру все величины "с потолка", лишь для демонстрации.

Одна задача читает данные из файла. Каждый раз, прочитав кусок данных, задача создает в памяти буфер, куда помещает прочитанные данные. Буфер она посылает в канал. Канал имеет ограниченную емкость, например, десять элементов. Когда канал наполняется, функция push!() будет блокировать выполнение, пока канал не станет способен принимать следующую порцию данных.

Вторая задача в цикле будет пытаться читать данные из канала и записывать в предварительно открытый файл. Функция take!() блокирует выполнение, если в канале нет данных. При поступлении из канала специального значения, которое нельзя спутать с данными читающая задача будет завершается, закрыв файл.

Создадим тестовый файл с тысячью строками:

#bash:
$ for i in `seq 1 1000`; do echo ">>>>>>>>>>>> line: $i <<<<<<<<<<<<<" >> zz.txt; done
$ wc -l zz.txt
1000 zz.txt

Запустим julia:

$ julia

Функция f принимает параметры: канал, из которого будет читать данные, имя файла для записи, и stop - специальное значение для выхода. В моем варианте значение stop должно быть типом - просто я так придумал обозначить значение, которое нельзя спутать с данными.

julia> function f( c::Channel, file::AbstractString, stop::Type )
        io = open( file, "w" )
        info("File $file is opened")
        while true
         data=take!(c)
         typeof(data)<:Type && break
         write( io, data)
        end
        close(io)
        info("$file - writed lines... ")
        `wc -l $file`|>readstring|>info
        info("Bay!")
       end

Создадим тип, означающий остановку:

julia> abstract Stop

Создадим канал вместимостью 10 элементов:

julia> c1 = Channel( 10 )

Теперь нужно запустить асинхронную задачу:

julia> @schedule f( c1, "zz2.txt", Stop )
INFO: File zz2.txt is opened
Task (runnable) @0x00007f4174103820

Макрос @schedule оборачивает вызов функции в асинхронную задачу и ставит ее в встроенный планировщик заданий Julia. Теперь задача ждет, пока в канале не появятся данные.

Теперь займемся отправкой данных из основной задачи. Откроем файл для чтения:

julia> f1 = open( "zz.txt" )
IOStream(<file zz.txt>)

Будем читать порции данных из файла и отправлять в канал. Буферами, в данном случае являются массивы байт UInt8[](возвращаемое значение функции read), но так как, создавая канал, мы не указывали конкретный тип для него, то он может хранить объекты любого типа.

julia> while !eof(f1)
        push!( c1, read( f1, 32))
       end

Отправим асинхронной задаче сигнал завершения:

julia> push!( c1, Stop)
INFO: zz2.txt - writed lines...
Stop
INFO: 1000 zz2.txt

INFO: Bay!

Задача завершилась, отчитавшись о количестве строк, которые она записала в файл zz2.txt.

На что нужно обратить внимание. На каждой итерации пишущая задача создает новый буфер. Единожды созданные буферы не перезаписываются (иначе начнутся фокусы). Объект канала непосредственно разделяется между локальными задачами, обеспечивая атомарность вставки и удаления данных в условиях многопоточности.

Канал предоставляет интерфейс итератора

Да, так и есть. Читающую функцию можно переписать, избавившись от велосипеда - "специального значения для остановки":

julia> function f( c::Channel, file::AbstractString )
        io = open( file, "w" )
        info("File $file is opened")
        for data in c
         write( io, data)
        end
        close(io)
        info("$file - writed lines... ")
        `wc -l $file`|>readstring|>info
        info("Bay!")
       end

Пока канал открыт или пока в нем есть данные - итерации будут пытаться читать данные из канала, блокируя свой поток выполнения, если данных приходится подождать.

Воспроизведем прошлый пример:

julia> f1 = open( "zz.txt" )
IOStream(<file zz.txt>)

julia> c1 = Channel( 10 )
Channel{Any}(sz_max:10,sz_curr:0)

julia> @schedule f( c1, "zz2.txt" )
INFO: File zz2.txt is opened
Task (runnable) @0x00007f9cf2052bf0

julia> while !eof(f1)
        push!( c1, read( f1, 32))
       end

julia> close(c1)
INFO: zz2.txt - writed lines...

julia> INFO: 1000 zz2.txt
julia> INFO: Bay!

Для закрытия канала мы воспользовались функцией close(). Попытка записать в закрытый канал, если бы такая была, привела бы к ошибке, но читать из закрытого канала - можно, пока в нем есть данные. Чтение пустого закрытого канала тоже приводит к ошибке. Для получения информации о возможности прочитать из канала следующий объект можно пользоваться функцией isready(), или функцией wait(), чтобы дождаться пока он станет доступен. Но интерфейс итератора скрывает эти подробности от нас.

... продолжение, возможно, скоро будет...

results matching ""

    No results matching ""