Каналы
Каналы - это объекты помогающие наладить межзадачную коммуникацию. Под задачей здесь я буду иметь ввиду любой асинхронно выполняемый код. Канал - это разделяемый объект, доступный белее, чем одной задаче. Задачи могут работать в рамках одного процесса или на разных процессах, для этих двух вариантов тип объекта канала будет отличаться: локально доступный объект должен быть Channel
, а в другом процессе - RemoteChannel
. Независимо от типа канал предоставляет единый интерфейс. В него могут писать сразу несколько писателей и его могу читать несколько читателей. Организован он в виде очереди. Функция push!()
посылает новое значение в канал. take!()
забирает значение, удаляя его из канала. Есть и другие функции работы с каналами.
Асинхронная работа в рамках одного процесса
Разберем искусственный пример - копирование файла.
Этот пример я позаимствовал из отличной книги (которую горячо рекомендую всем) Андрея Александреску "Язык программирования D". Только там автор, рассчитывает за счет многопоточности достичь большей скорости копирования файлов (при определенных условиях). В моем же случае - пока хоть бы работало - уже хорошо. Поэтому я беру все величины "с потолка", лишь для демонстрации.
Одна задача читает данные из файла. Каждый раз, прочитав кусок данных, задача создает в памяти буфер, куда помещает прочитанные данные. Буфер она посылает в канал. Канал имеет ограниченную емкость, например, десять элементов. Когда канал наполняется, функция push!()
будет блокировать выполнение, пока канал не станет способен принимать следующую порцию данных.
Вторая задача в цикле будет пытаться читать данные из канала и записывать в предварительно открытый файл. Функция take!()
блокирует выполнение, если в канале нет данных. При поступлении из канала специального значения, которое нельзя спутать с данными читающая задача будет завершается, закрыв файл.
Создадим тестовый файл с тысячью строками:
#bash:
$ for i in `seq 1 1000`; do echo ">>>>>>>>>>>> line: $i <<<<<<<<<<<<<" >> zz.txt; done
$ wc -l zz.txt
1000 zz.txt
Запустим julia:
$ julia
Функция f
принимает параметры: канал, из которого будет читать данные, имя файла для записи, и stop
- специальное значение для выхода. В моем варианте значение stop
должно быть типом - просто я так придумал обозначить значение, которое нельзя спутать с данными.
julia> function f( c::Channel, file::AbstractString, stop::Type )
io = open( file, "w" )
info("File $file is opened")
while true
data=take!(c)
typeof(data)<:Type && break
write( io, data)
end
close(io)
info("$file - writed lines... ")
`wc -l $file`|>readstring|>info
info("Bay!")
end
Создадим тип, означающий остановку:
julia> abstract Stop
Создадим канал вместимостью 10 элементов:
julia> c1 = Channel( 10 )
Теперь нужно запустить асинхронную задачу:
julia> @schedule f( c1, "zz2.txt", Stop )
INFO: File zz2.txt is opened
Task (runnable) @0x00007f4174103820
Макрос @schedule
оборачивает вызов функции в асинхронную задачу и ставит ее в встроенный планировщик заданий Julia. Теперь задача ждет, пока в канале не появятся данные.
Теперь займемся отправкой данных из основной задачи. Откроем файл для чтения:
julia> f1 = open( "zz.txt" )
IOStream(<file zz.txt>)
Будем читать порции данных из файла и отправлять в канал. Буферами, в данном случае являются массивы байт UInt8[]
(возвращаемое значение функции read
), но так как, создавая канал, мы не указывали конкретный тип для него, то он может хранить объекты любого типа.
julia> while !eof(f1)
push!( c1, read( f1, 32))
end
Отправим асинхронной задаче сигнал завершения:
julia> push!( c1, Stop)
INFO: zz2.txt - writed lines...
Stop
INFO: 1000 zz2.txt
INFO: Bay!
Задача завершилась, отчитавшись о количестве строк, которые она записала в файл zz2.txt.
На что нужно обратить внимание. На каждой итерации пишущая задача создает новый буфер. Единожды созданные буферы не перезаписываются (иначе начнутся фокусы). Объект канала непосредственно разделяется между локальными задачами, обеспечивая атомарность вставки и удаления данных в условиях многопоточности.
Канал предоставляет интерфейс итератора
Да, так и есть. Читающую функцию можно переписать, избавившись от велосипеда - "специального значения для остановки":
julia> function f( c::Channel, file::AbstractString )
io = open( file, "w" )
info("File $file is opened")
for data in c
write( io, data)
end
close(io)
info("$file - writed lines... ")
`wc -l $file`|>readstring|>info
info("Bay!")
end
Пока канал открыт или пока в нем есть данные - итерации будут пытаться читать данные из канала, блокируя свой поток выполнения, если данных приходится подождать.
Воспроизведем прошлый пример:
julia> f1 = open( "zz.txt" )
IOStream(<file zz.txt>)
julia> c1 = Channel( 10 )
Channel{Any}(sz_max:10,sz_curr:0)
julia> @schedule f( c1, "zz2.txt" )
INFO: File zz2.txt is opened
Task (runnable) @0x00007f9cf2052bf0
julia> while !eof(f1)
push!( c1, read( f1, 32))
end
julia> close(c1)
INFO: zz2.txt - writed lines...
julia> INFO: 1000 zz2.txt
julia> INFO: Bay!
Для закрытия канала мы воспользовались функцией close()
. Попытка записать в закрытый канал, если бы такая была, привела бы к ошибке, но читать из закрытого канала - можно, пока в нем есть данные. Чтение пустого закрытого канала тоже приводит к ошибке. Для получения информации о возможности прочитать из канала следующий объект можно пользоваться функцией isready()
, или функцией wait()
, чтобы дождаться пока он станет доступен. Но интерфейс итератора скрывает эти подробности от нас.
... продолжение, возможно, скоро будет...