Skip to content

Commit

Permalink
Merge pull request #244 from JuliaInterop/jet
Browse files Browse the repository at this point in the history
Fix support for 1.11
  • Loading branch information
JamesWrigley authored Jul 9, 2024
2 parents 5174e72 + cf4b89c commit 1a3d67c
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 114 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
name: CI

env:
JULIA_NUM_THREADS: 2
on:
Expand All @@ -9,6 +10,7 @@ on:
branches:
- master
tags: '*'

jobs:
test:
name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }}
Expand All @@ -18,6 +20,7 @@ jobs:
matrix:
version:
- '1' # Current stable version
- 'nightly'
os:
- ubuntu-latest
- windows-latest
Expand All @@ -36,6 +39,7 @@ jobs:
- os: ubuntu-latest
arch: x64
version: '1.3'

steps:
- uses: actions/checkout@v4
- uses: julia-actions/setup-julia@v2
Expand All @@ -59,6 +63,7 @@ jobs:
with:
file: lcov.info
token: ${{ secrets.CODECOV_TOKEN }}

docs:
name: Documentation
runs-on: ubuntu-latest
Expand Down
6 changes: 6 additions & 0 deletions docs/src/_changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ CurrentModule = ZMQ
This documents notable changes in ZMQ.jl. The format is based on [Keep a
Changelog](https://keepachangelog.com).

## Unreleased

### Added
- Support for creating [`Message`](@ref)'s from the new `Memory` type in Julia
1.11 ([#244]).

## [v1.2.6] - 2024-06-13

### Added
Expand Down
7 changes: 5 additions & 2 deletions src/message.jl
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,15 @@ mutable struct Message <: AbstractArray{UInt8,1}
Message(p, pointer(p.string)+p.offset, sizeof(p))

@doc """
Message(a::Array)
Message(a::T) where T <: DenseVector
Create a message with an array as a buffer (for send). Note: the same
ownership semantics as for [`Message(m::String)`](@ref) apply.
Usually `a` will be a 1D `Array`/`Vector`, but on 1.11+ it can also be a
`Memory`.
"""
Message(a::Array) = Message(a, pointer(a), sizeof(a))
Message(a::T) where T <: DenseVector = Message(a, pointer(a), sizeof(a))

@doc """
Message(io::IOBuffer)
Expand Down
281 changes: 169 additions & 112 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ using ZMQ, Test
@info("Testing with ZMQ version $(ZMQ.version)")

@testset "ZMQ contexts" begin
ctx=Context()
@test ctx isa Context
@test (ctx.io_threads = 2) == 2
@test ctx.io_threads == 2
ZMQ.close(ctx)

#try to create socket with expired context
@test_throws StateError Socket(ctx, PUB)
ctx=Context()
@test ctx isa Context
@test (ctx.io_threads = 2) == 2
@test ctx.io_threads == 2
ZMQ.close(ctx)

#try to create socket with expired context
@test_throws StateError Socket(ctx, PUB)
end

# This test is in its own function to keep it simple and try to trick Julia into
Expand All @@ -31,135 +31,192 @@ end
@testset "ZMQ sockets" begin
context_gc_test()

s=Socket(PUB)
@test s isa Socket
ZMQ.close(s)
s=Socket(PUB)
@test s isa Socket
ZMQ.close(s)

s1=Socket(REP)
s1.sndhwm = 1000
s1.linger = 1
s1.routing_id = "abcd"
s1=Socket(REP)
s1.sndhwm = 1000
s1.linger = 1
s1.routing_id = "abcd"

@test s1.routing_id == "abcd"
@test s1.sndhwm === 1000
@test s1.linger === 1
@test s1.rcvmore === false
@test s1.routing_id == "abcd"
@test s1.sndhwm === 1000
@test s1.linger === 1
@test s1.rcvmore === false

s2=Socket(REQ)
@test s1.type == REP
@test s2.type == REQ
s2=Socket(REQ)
@test s1.type == REP
@test s2.type == REQ

ZMQ.bind(s1, "tcp://*:5555")
ZMQ.connect(s2, "tcp://localhost:5555")
ZMQ.bind(s1, "tcp://*:5555")
ZMQ.connect(s2, "tcp://localhost:5555")

msg = Message("test request")
msg = Message("test request")

# Smoke tests
@test Base.elsize(msg) == 1
@test Base.strides(msg) == (1,)

# Test similar() and copy() fixes in https://github.com/JuliaInterop/ZMQ.jl/pull/165
@test similar(msg, UInt8, 12) isa Vector{UInt8}
@test copy(msg) == codeunits("test request")

ZMQ.send(s2, Message("test request"))
@test unsafe_string(ZMQ.recv(s1)) == "test request"
ZMQ.send(s1, Message("test response"))
@test unsafe_string(ZMQ.recv(s2)) == "test response"

ZMQ.send(s2, "test request 2")
@test ZMQ.recv(s1, String) == "test request 2"
ZMQ.send(s1, Vector(codeunits("test response 2")))
@test String(ZMQ.recv(s2, Vector{UInt8})) == "test response 2"
ZMQ.send(s2, 3.14159)
@test ZMQ.recv(s1, Float64) === 3.14159
ZMQ.send(s1, [314159, 12345])
@test ZMQ.recv(s2, Vector{Int}) == [314159, 12345]

# Test task-blocking behavior
c = Base.Condition()
global msg_sent = false
@async begin
global msg_sent
sleep(0.5)
msg_sent = true
ZMQ.send(s2, Message("test request"))
@test (unsafe_string(ZMQ.recv(s2)) == "test response")
notify(c)
end

# This will hang forver if ZMQ blocks the entire process since
# we'll never switch to the other task
@test unsafe_string(ZMQ.recv(s1)) == "test request"
@test msg_sent == true
ZMQ.send(s1, Message("test response"))
wait(c)

# Test _Message task-blocking behavior, similar to above
c = Base.Condition()
msg_sent = false
@async begin
global msg_sent
sleep(0.5)
msg_sent = true
ZMQ.send(s2, "another test request")
@test ZMQ.recv(s2, String) == "another test response"
notify(c)
end
@test ZMQ.recv(s1, String) == "another test request"
@test msg_sent == true
ZMQ.send(s1, "another test response")
wait(c)

ZMQ.send(s2, Message("another test request"))
msg = ZMQ.recv(s1)
o=convert(IOStream, msg)
seek(o, 0)
@test String(take!(o)) == "another test request"
ZMQ.send(s1) do io
print(io, "buffer ")
print(io, "this")
end
@test String(take!(ZMQ.recv(s2, IOBuffer))) == "buffer this"

@testset "Message AbstractVector interface" begin
m = Message("1")
@test m[1]==0x31
@test (m[1]=0x32) === 0x32
@test unsafe_string(m)=="2"
finalize(m)
end

# ZMQ.close(s1); ZMQ.close(s2) # should happen when context is closed
ZMQ.close(ZMQ._context) # immediately close global context rather than waiting for exit
# Test similar() and copy() fixes in https://github.com/JuliaInterop/ZMQ.jl/pull/165
@test similar(msg, UInt8, 12) isa Vector{UInt8}
@test copy(msg) == codeunits("test request")

ZMQ.send(s2, Message("test request"))
@test unsafe_string(ZMQ.recv(s1)) == "test request"
ZMQ.send(s1, Message("test response"))
@test unsafe_string(ZMQ.recv(s2)) == "test response"

ZMQ.send(s2, "test request 2")
@test ZMQ.recv(s1, String) == "test request 2"
ZMQ.send(s1, Vector(codeunits("test response 2")))
@test String(ZMQ.recv(s2, Vector{UInt8})) == "test response 2"
ZMQ.send(s2, 3.14159)
@test ZMQ.recv(s1, Float64) === 3.14159
ZMQ.send(s1, [314159, 12345])
@test ZMQ.recv(s2, Vector{Int}) == [314159, 12345]

# Test task-blocking behavior
c = Base.Condition()
global msg_sent = false
@async begin
global msg_sent
sleep(0.5)
msg_sent = true
ZMQ.send(s2, Message("test request"))
@test (unsafe_string(ZMQ.recv(s2)) == "test response")
notify(c)
end

# This will hang forver if ZMQ blocks the entire process since
# we'll never switch to the other task
@test unsafe_string(ZMQ.recv(s1)) == "test request"
@test msg_sent == true
ZMQ.send(s1, Message("test response"))
wait(c)

# Test _Message task-blocking behavior, similar to above
c = Base.Condition()
msg_sent = false
@async begin
global msg_sent
sleep(0.5)
msg_sent = true
ZMQ.send(s2, "another test request")
@test ZMQ.recv(s2, String) == "another test response"
notify(c)
end
@test ZMQ.recv(s1, String) == "another test request"
@test msg_sent == true
ZMQ.send(s1, "another test response")
wait(c)

ZMQ.send(s2, Message("another test request"))
msg = ZMQ.recv(s1)
o=convert(IOStream, msg)
seek(o, 0)
@test String(take!(o)) == "another test request"
ZMQ.send(s1) do io
print(io, "buffer ")
print(io, "this")
end
@test String(take!(ZMQ.recv(s2, IOBuffer))) == "buffer this"

@testset "Message AbstractVector interface" begin
m = Message("1")
@test m[1]==0x31
@test (m[1]=0x32) === 0x32
@test unsafe_string(m)=="2"
finalize(m)
end

# ZMQ.close(s1); ZMQ.close(s2) # should happen when context is closed
ZMQ.close(ZMQ._context) # immediately close global context rather than waiting for exit
@test !isopen(s1)
@test !isopen(s2)
end

# Test all the send constructors
@testset "Message" begin
s1 = Socket(PUB)
s2 = Socket(SUB)
ZMQ.subscribe(s2, "")
ZMQ.bind(s1, "tcp://*:5555")
ZMQ.connect(s2, "tcp://localhost:5555")

# Sleep for a bit to prevent the 'slow joiner' problem
sleep(0.5)

# Message(::Int) - construct from buffer size
data = rand(UInt8, 10)
m1 = Message(length(data))
# Note that we don't use copy!() for compatibility with Julia 1.3
for i in eachindex(data)
m1[i] = data[i]
end
ZMQ.send(s1, m1)
@test ZMQ.recv(s2) == data

# Message(::Any, ::Ptr, ::Int) - construct from pointer to existing data
buffer = rand(UInt8, 10)
m2 = Message(buffer, pointer(buffer), length(buffer))
ZMQ.send(s1, m2)
@test ZMQ.recv(s2) == buffer

# Message(::String)
str_msg = "foobar"
m3 = Message(str_msg)
ZMQ.send(s1, m3)
@test String(ZMQ.recv(s2)) == str_msg

# Message(::SubString)
m4 = Message(str_msg[1:3])
ZMQ.send(s1, m4)
@test String(ZMQ.recv(s2)) == str_msg[1:3]

# Message(::DenseVector) - construct from array
buffer2 = rand(UInt8, 10)
m5 = Message(buffer2)
ZMQ.send(s1, m5)
@test ZMQ.recv(s2) == buffer2

# Message(::IOBuffer)
buffer3 = rand(UInt8, 10)
iobuf = IOBuffer(buffer3)
m6 = Message(iobuf)
ZMQ.send(s1, m6)
@test ZMQ.recv(s2) == buffer3

close(s1)
close(s2)
end

@testset "ZMQ resource management" begin
local leaked_req_socket, leaked_rep_socket
ZMQ.Socket(ZMQ.REQ) do req_socket
leaked_req_socket = req_socket
leaked_req_socket = req_socket

ZMQ.Socket(ZMQ.REP) do rep_socket
leaked_rep_socket = rep_socket
ZMQ.Socket(ZMQ.REP) do rep_socket
leaked_rep_socket = rep_socket

ZMQ.bind(rep_socket, "inproc://tester")
ZMQ.connect(req_socket, "inproc://tester")
ZMQ.bind(rep_socket, "inproc://tester")
ZMQ.connect(req_socket, "inproc://tester")

ZMQ.send(req_socket, "Mr. Watson, come here, I want to see you.")
@test unsafe_string(ZMQ.recv(rep_socket)) == "Mr. Watson, come here, I want to see you."
ZMQ.send(rep_socket, "Coming, Mr. Bell.")
@test unsafe_string(ZMQ.recv(req_socket)) == "Coming, Mr. Bell."
end
ZMQ.send(req_socket, "Mr. Watson, come here, I want to see you.")
@test unsafe_string(ZMQ.recv(rep_socket)) == "Mr. Watson, come here, I want to see you."
ZMQ.send(rep_socket, "Coming, Mr. Bell.")
@test unsafe_string(ZMQ.recv(req_socket)) == "Coming, Mr. Bell."
end

@test !ZMQ.isopen(leaked_rep_socket)
@test !ZMQ.isopen(leaked_rep_socket)
end
@test !ZMQ.isopen(leaked_req_socket)

local leaked_ctx
ZMQ.Context() do ctx
leaked_ctx = ctx
leaked_ctx = ctx

@test isopen(ctx)
@test isopen(ctx)
end
@test !isopen(leaked_ctx)
end
Expand Down

0 comments on commit 1a3d67c

Please sign in to comment.