Implement async event streaming
This commit is contained in:
@@ -18,6 +18,7 @@ let package = Package(
|
|||||||
.package(url: "https://github.com/apple/swift-argument-parser", from: "1.7.0"),
|
.package(url: "https://github.com/apple/swift-argument-parser", from: "1.7.0"),
|
||||||
.package(url: "https://github.com/apple/swift-async-algorithms", from: "1.1.3"),
|
.package(url: "https://github.com/apple/swift-async-algorithms", from: "1.1.3"),
|
||||||
.package(url: "https://github.com/apple/swift-log", from: "1.10.1"),
|
.package(url: "https://github.com/apple/swift-log", from: "1.10.1"),
|
||||||
|
.package(url: "https://github.com/apple/swift-system", from: "1.6.4"),
|
||||||
.package(url: "https://github.com/swiftlang/swift-subprocess.git", from: "0.3.0"),
|
.package(url: "https://github.com/swiftlang/swift-subprocess.git", from: "0.3.0"),
|
||||||
.package(url: "https://github.com/tuist/Noora", from: "0.55.1")
|
.package(url: "https://github.com/tuist/Noora", from: "0.55.1")
|
||||||
],
|
],
|
||||||
@@ -28,6 +29,7 @@ let package = Package(
|
|||||||
dependencies: [
|
dependencies: [
|
||||||
"CInotify",
|
"CInotify",
|
||||||
.product(name: "Logging", package: "swift-log"),
|
.product(name: "Logging", package: "swift-log"),
|
||||||
|
.product(name: "SystemPackage", package: "swift-system")
|
||||||
]
|
]
|
||||||
),
|
),
|
||||||
.testTarget(
|
.testTarget(
|
||||||
|
|||||||
@@ -3,6 +3,7 @@
|
|||||||
|
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <sys/inotify.h>
|
#include <sys/inotify.h>
|
||||||
|
#include <unistd.h>
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
|
|
||||||
static inline int cinotify_deinit(int fd) {
|
static inline int cinotify_deinit(int fd) {
|
||||||
|
|||||||
@@ -1,14 +1,21 @@
|
|||||||
|
import Dispatch
|
||||||
import CInotify
|
import CInotify
|
||||||
|
|
||||||
public actor Inotify {
|
public actor Inotify {
|
||||||
private let fd: Int32
|
private let fd: Int32
|
||||||
private var watches: [Int32: String] = [:]
|
private var watches: [Int32: String] = [:]
|
||||||
|
private var eventReader: any DispatchSourceRead
|
||||||
|
private var eventStream: AsyncStream<RawInotifyEvent>
|
||||||
|
public var events: AsyncCompactMapSequence<AsyncStream<RawInotifyEvent>, InotifyEvent> {
|
||||||
|
self.eventStream.compactMap(self.transform(_:))
|
||||||
|
}
|
||||||
|
|
||||||
public init() throws {
|
public init() throws {
|
||||||
self.fd = inotify_init1(Int32(IN_NONBLOCK | IN_CLOEXEC))
|
self.fd = inotify_init1(Int32(IN_NONBLOCK | IN_CLOEXEC))
|
||||||
guard self.fd >= 0 else {
|
guard self.fd >= 0 else {
|
||||||
throw InotifyError.initFailed(errno: cinotify_get_errno())
|
throw InotifyError.initFailed(errno: cinotify_get_errno())
|
||||||
}
|
}
|
||||||
|
(self.eventReader, self.eventStream) = Self.createEventReader(forFileDescriptor: fd)
|
||||||
}
|
}
|
||||||
|
|
||||||
@discardableResult
|
@discardableResult
|
||||||
@@ -31,4 +38,33 @@ public actor Inotify {
|
|||||||
deinit {
|
deinit {
|
||||||
cinotify_deinit(self.fd)
|
cinotify_deinit(self.fd)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private func transform(_ rawEvent: RawInotifyEvent) -> InotifyEvent? {
|
||||||
|
guard let path = self.watches[rawEvent.watchDescriptor] else { return nil }
|
||||||
|
return InotifyEvent.init(from: rawEvent, inDirectory: path)
|
||||||
|
}
|
||||||
|
|
||||||
|
private static func createEventReader(forFileDescriptor fd: Int32) -> (any DispatchSourceRead, AsyncStream<RawInotifyEvent>) {
|
||||||
|
let (stream, continuation) = AsyncStream<RawInotifyEvent>.makeStream(
|
||||||
|
of: RawInotifyEvent.self,
|
||||||
|
bufferingPolicy: .bufferingNewest(512)
|
||||||
|
)
|
||||||
|
|
||||||
|
let reader = DispatchSource.makeReadSource(
|
||||||
|
fileDescriptor: fd,
|
||||||
|
queue: DispatchQueue(label: "Inotify.read", qos: .utility)
|
||||||
|
)
|
||||||
|
|
||||||
|
reader.setEventHandler {
|
||||||
|
for rawEvent in InotifyEventParser.parse(fromFileDescriptor: fd) {
|
||||||
|
continuation.yield(rawEvent)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
reader.setCancelHandler {
|
||||||
|
continuation.finish()
|
||||||
|
}
|
||||||
|
reader.activate()
|
||||||
|
|
||||||
|
return (reader, stream)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
26
Sources/Inotify/InotifyEvent.swift
Normal file
26
Sources/Inotify/InotifyEvent.swift
Normal file
@@ -0,0 +1,26 @@
|
|||||||
|
import SystemPackage
|
||||||
|
|
||||||
|
public struct InotifyEvent: Sendable, Hashable, CustomStringConvertible {
|
||||||
|
public let watchDescriptor: Int32
|
||||||
|
public let mask: InotifyEventMask
|
||||||
|
public let cookie: UInt32
|
||||||
|
public let path: FilePath
|
||||||
|
|
||||||
|
public var description: String {
|
||||||
|
var parts = ["InotifyEvent(wd: \(watchDescriptor), mask: \(mask), path: \"\(path)\""]
|
||||||
|
if cookie != 0 { parts.append("cookie: \(cookie)") }
|
||||||
|
return parts.joined(separator: ", ") + ")"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
extension InotifyEvent {
|
||||||
|
public init(from rawEvent: RawInotifyEvent, inDirectory path: String) {
|
||||||
|
let dirPath = FilePath(path)
|
||||||
|
self.init(
|
||||||
|
watchDescriptor: rawEvent.watchDescriptor,
|
||||||
|
mask: rawEvent.mask,
|
||||||
|
cookie: rawEvent.cookie,
|
||||||
|
path: dirPath.appending(rawEvent.name)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
57
Sources/Inotify/InotifyEventParser.swift
Normal file
57
Sources/Inotify/InotifyEventParser.swift
Normal file
@@ -0,0 +1,57 @@
|
|||||||
|
import CInotify
|
||||||
|
|
||||||
|
struct InotifyEventParser {
|
||||||
|
static let readBufferSize = 4096
|
||||||
|
|
||||||
|
static func parse(fromFileDescriptor fd: Int32) -> [RawInotifyEvent] {
|
||||||
|
let buffer = UnsafeMutableRawPointer.allocate(
|
||||||
|
byteCount: Self.readBufferSize,
|
||||||
|
alignment: MemoryLayout<inotify_event>.alignment
|
||||||
|
)
|
||||||
|
defer { buffer.deallocate() }
|
||||||
|
|
||||||
|
let bytesRead = read(fd, buffer, readBufferSize)
|
||||||
|
guard bytesRead > 0 else { return [] }
|
||||||
|
|
||||||
|
return Self.parseEventBuffer(buffer, bytesRead: bytesRead)
|
||||||
|
}
|
||||||
|
|
||||||
|
private static func parseEventBuffer(
|
||||||
|
_ buffer: UnsafeMutableRawPointer,
|
||||||
|
bytesRead: Int
|
||||||
|
) -> [RawInotifyEvent] {
|
||||||
|
var events: [RawInotifyEvent] = []
|
||||||
|
var offset = 0
|
||||||
|
|
||||||
|
while offset < bytesRead {
|
||||||
|
let eventPointer = buffer.advanced(by: offset)
|
||||||
|
let rawEvent = eventPointer.assumingMemoryBound(to: inotify_event.self).pointee
|
||||||
|
|
||||||
|
events.append(RawInotifyEvent(
|
||||||
|
watchDescriptor: rawEvent.wd,
|
||||||
|
mask: InotifyEventMask(rawValue: rawEvent.mask),
|
||||||
|
cookie: rawEvent.cookie,
|
||||||
|
name: Self.extractName(from: eventPointer, nameLength: rawEvent.len)
|
||||||
|
))
|
||||||
|
|
||||||
|
offset += Self.eventSize(nameLength: rawEvent.len)
|
||||||
|
}
|
||||||
|
|
||||||
|
return events
|
||||||
|
}
|
||||||
|
|
||||||
|
private static func extractName(
|
||||||
|
from eventPointer: UnsafeMutableRawPointer,
|
||||||
|
nameLength: UInt32
|
||||||
|
) -> String {
|
||||||
|
guard nameLength > 0 else { return "" }
|
||||||
|
let namePointer = eventPointer
|
||||||
|
.advanced(by: MemoryLayout<inotify_event>.size)
|
||||||
|
.assumingMemoryBound(to: CChar.self)
|
||||||
|
return String(cString: namePointer)
|
||||||
|
}
|
||||||
|
|
||||||
|
private static func eventSize(nameLength: UInt32) -> Int {
|
||||||
|
MemoryLayout<inotify_event>.size + Int(nameLength)
|
||||||
|
}
|
||||||
|
}
|
||||||
12
Sources/Inotify/RawInotifyEvent.swift
Normal file
12
Sources/Inotify/RawInotifyEvent.swift
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
public struct RawInotifyEvent: Sendable, Hashable, CustomStringConvertible {
|
||||||
|
public let watchDescriptor: Int32
|
||||||
|
public let mask: InotifyEventMask
|
||||||
|
public let cookie: UInt32
|
||||||
|
public let name: String
|
||||||
|
|
||||||
|
public var description: String {
|
||||||
|
var parts = ["RawInotifyEvent(wd: \(watchDescriptor), mask: \(mask), name: \"\(name)\""]
|
||||||
|
if cookie != 0 { parts.append("cookie: \(cookie)") }
|
||||||
|
return parts.joined(separator: ", ") + ")"
|
||||||
|
}
|
||||||
|
}
|
||||||
118
Tests/InotifyIntegrationTests/EventTests.swift
Normal file
118
Tests/InotifyIntegrationTests/EventTests.swift
Normal file
@@ -0,0 +1,118 @@
|
|||||||
|
import Foundation
|
||||||
|
import Testing
|
||||||
|
@testable import Inotify
|
||||||
|
|
||||||
|
@Suite("File Event Detection")
|
||||||
|
struct EventTests {
|
||||||
|
@Test func detectsFileCreation() async throws {
|
||||||
|
try await withTempDir { dir in
|
||||||
|
let filename = "testfile.txt"
|
||||||
|
let events = try await getEventsForTrigger(
|
||||||
|
in: dir,
|
||||||
|
mask: [.create, .closeWrite],
|
||||||
|
) { try createFile(at: "\($0)/\(filename)", contents: "hello") }
|
||||||
|
|
||||||
|
let createEvent = events.first { $0.mask.contains(.create) && $0.path.lastComponent?.string == filename }
|
||||||
|
#expect(createEvent != nil, "Expected CREATE for '\(filename)', got: \(events)")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test func detectsFileModification() async throws {
|
||||||
|
try await withTempDir { dir in
|
||||||
|
let filepath = "\(dir)/modify-target.txt"
|
||||||
|
try createFile(at: filepath)
|
||||||
|
|
||||||
|
let events = try await getEventsForTrigger(
|
||||||
|
in: dir,
|
||||||
|
mask: .modify,
|
||||||
|
) { _ in try "hello".write(toFile: filepath, atomically: false, encoding: .utf8) }
|
||||||
|
|
||||||
|
let modifyEvent = events.first { $0.mask.contains(.modify) && $0.path.string == filepath }
|
||||||
|
#expect(modifyEvent != nil, "Expected MODIFY for '\(filepath)', got: \(events)")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test func detectsFileDeletion() async throws {
|
||||||
|
try await withTempDir { dir in
|
||||||
|
let filepath = "\(dir)/delete-me.txt"
|
||||||
|
try createFile(at: filepath)
|
||||||
|
|
||||||
|
let events = try await getEventsForTrigger(
|
||||||
|
in: dir,
|
||||||
|
mask: .delete,
|
||||||
|
) { _ in try FileManager.default.removeItem(atPath: filepath) }
|
||||||
|
|
||||||
|
let deleteEvent = events.first { $0.mask.contains(.delete) && $0.path.string == filepath }
|
||||||
|
#expect(deleteEvent != nil, "Expected DELETE for '\(filepath)', got: \(events)")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test func detectsSubdirectoryCreationWithIsDirFlag() async throws {
|
||||||
|
try await withTempDir { dir in
|
||||||
|
let folderpath = "\(dir)/subdir-\(UUID())"
|
||||||
|
|
||||||
|
let events = try await getEventsForTrigger(
|
||||||
|
in: dir,
|
||||||
|
mask: .create,
|
||||||
|
) { _ in try FileManager.default.createDirectory(atPath: folderpath, withIntermediateDirectories: false) }
|
||||||
|
|
||||||
|
let createEvent = events.first { $0.mask.contains(.create) && $0.mask.contains(.isDir) && $0.path.string == folderpath }
|
||||||
|
#expect(createEvent != nil, "Expected CREATE for folder '\(folderpath)', got: \(events)")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test func detectsMoveWithMatchingCookies() async throws {
|
||||||
|
try await withTempDir { dir in
|
||||||
|
let sourceFilePath = "\(dir)/move-src.txt"
|
||||||
|
let destionationFilePath = "\(dir)/move-dst.txt"
|
||||||
|
try createFile(at: sourceFilePath)
|
||||||
|
|
||||||
|
let events = try await getEventsForTrigger(
|
||||||
|
in: dir,
|
||||||
|
mask: .move,
|
||||||
|
) { _ in try FileManager.default.moveItem(atPath: sourceFilePath, toPath: destionationFilePath) }
|
||||||
|
|
||||||
|
let movedFromEvent = events.first { $0.mask.contains(.movedFrom) && $0.path.string == sourceFilePath }
|
||||||
|
#expect(movedFromEvent != nil, "Expected MOVED_FROM for '\(movedFromEvent)', got: \(events)")
|
||||||
|
|
||||||
|
let movedToEvent = events.first { $0.mask.contains(.movedTo) && $0.path.string == destionationFilePath }
|
||||||
|
#expect(movedToEvent != nil, "Expected MOVED_TO for '\(destionationFilePath)', got: \(events)")
|
||||||
|
#expect(movedFromEvent?.cookie == movedToEvent?.cookie)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test func eventsArriveInOrder() async throws {
|
||||||
|
try await withTempDir { dir in
|
||||||
|
let filepath = "\(dir)/ordered-test.txt"
|
||||||
|
|
||||||
|
let events = try await getEventsForTrigger(in: dir, mask: [.create, .delete]) { _ in
|
||||||
|
try createFile(at: filepath)
|
||||||
|
try await Task.sleep(for: .milliseconds(50))
|
||||||
|
try FileManager.default.removeItem(atPath: filepath)
|
||||||
|
}
|
||||||
|
|
||||||
|
let createIdx = events.firstIndex { $0.mask.contains(.create) && $0.path.string == filepath }
|
||||||
|
#expect(createIdx != nil)
|
||||||
|
|
||||||
|
let deleteIdx = events.firstIndex { $0.mask.contains(.delete) && $0.path.string == filepath }
|
||||||
|
#expect(deleteIdx != nil)
|
||||||
|
|
||||||
|
if let createIdx, let deleteIdx {
|
||||||
|
#expect(createIdx < deleteIdx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test func maskFiltersCorrectly() async throws {
|
||||||
|
try await withTempDir { dir in
|
||||||
|
let filepath = "\(dir)/mask-filter.txt"
|
||||||
|
|
||||||
|
let events = try await getEventsForTrigger(in: dir, mask: .delete) { _ in
|
||||||
|
try createFile(at: filepath)
|
||||||
|
}
|
||||||
|
|
||||||
|
let deleteEvent = events.first { $0.mask.contains(.delete) && $0.path.string == filepath }
|
||||||
|
#expect(deleteEvent == nil)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
3
Tests/InotifyIntegrationTests/Utilities/createFile.swift
Normal file
3
Tests/InotifyIntegrationTests/Utilities/createFile.swift
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
func createFile(at path: String, contents: String = "") throws {
|
||||||
|
try contents.write(toFile: path, atomically: false, encoding: .utf8)
|
||||||
|
}
|
||||||
@@ -0,0 +1,25 @@
|
|||||||
|
import Inotify
|
||||||
|
|
||||||
|
func getEventsForTrigger(
|
||||||
|
in dir: String,
|
||||||
|
mask: InotifyEventMask,
|
||||||
|
trigger: @escaping (String) async throws -> Void
|
||||||
|
) async throws -> [InotifyEvent] {
|
||||||
|
let watcher = try Inotify()
|
||||||
|
try await watcher.addWatch(path: dir, mask: mask)
|
||||||
|
|
||||||
|
let eventTask = Task {
|
||||||
|
var events: [InotifyEvent] = []
|
||||||
|
for await event in await watcher.events {
|
||||||
|
events.append(event)
|
||||||
|
}
|
||||||
|
return events
|
||||||
|
}
|
||||||
|
|
||||||
|
try await Task.sleep(for: .milliseconds(200))
|
||||||
|
try await trigger(dir)
|
||||||
|
try await Task.sleep(for: .milliseconds(200))
|
||||||
|
|
||||||
|
eventTask.cancel()
|
||||||
|
return await eventTask.value
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user