Browse Source

Shell: Make 'for' loops read their input as an stream

i.e. process an element as it becomes available.
AnotherTest 4 years ago
parent
commit
d4bcc689fb
2 changed files with 101 additions and 15 deletions
  1. 99 15
      Shell/AST.cpp
  2. 2 0
      Shell/AST.h

+ 99 - 15
Shell/AST.cpp

@@ -80,6 +80,20 @@ static inline Vector<Command> join_commands(Vector<Command> left, Vector<Command
     return commands;
     return commands;
 }
 }
 
 
+void Node::for_each_entry(RefPtr<Shell> shell, Function<IterationDecision(RefPtr<Value>)> callback)
+{
+    auto value = run(shell)->resolve_without_cast(shell);
+    if (value->is_job()) {
+        callback(value);
+        return;
+    }
+    auto list = value->resolve_as_list(shell);
+    for (auto& element : list) {
+        if (callback(create<StringValue>(move(element))) == IterationDecision::Break)
+            break;
+    }
+}
+
 void Node::dump(int level) const
 void Node::dump(int level) const
 {
 {
     print_indented(String::format("%s at %d:%d", class_name().characters(), m_position.start_offset, m_position.end_offset), level);
     print_indented(String::format("%s at %d:%d", class_name().characters(), m_position.start_offset, m_position.end_offset), level);
@@ -871,12 +885,10 @@ void Execute::dump(int level) const
     m_command->dump(level + 1);
     m_command->dump(level + 1);
 }
 }
 
 
-RefPtr<Value> Execute::run(RefPtr<Shell> shell)
+void Execute::for_each_entry(RefPtr<Shell> shell, Function<IterationDecision(RefPtr<Value>)> callback)
 {
 {
-    RefPtr<Job> job;
-
     if (m_command->would_execute())
     if (m_command->would_execute())
-        return m_command->run(shell);
+        return m_command->for_each_entry(shell, move(callback));
 
 
     auto commands = shell->expand_aliases(m_command->run(shell)->resolve_as_commands(shell));
     auto commands = shell->expand_aliases(m_command->run(shell)->resolve_as_commands(shell));
 
 
@@ -885,7 +897,7 @@ RefPtr<Value> Execute::run(RefPtr<Shell> shell)
         int rc = pipe(pipefd);
         int rc = pipe(pipefd);
         if (rc < 0) {
         if (rc < 0) {
             dbg() << "Error: cannot pipe(): " << strerror(errno);
             dbg() << "Error: cannot pipe(): " << strerror(errno);
-            return create<StringValue>("");
+            return;
         }
         }
         auto& last_in_commands = commands.last();
         auto& last_in_commands = commands.last();
 
 
@@ -895,14 +907,55 @@ RefPtr<Value> Execute::run(RefPtr<Shell> shell)
         last_in_commands.is_pipe_source = false;
         last_in_commands.is_pipe_source = false;
 
 
         auto notifier = Core::Notifier::construct(pipefd[0], Core::Notifier::Read);
         auto notifier = Core::Notifier::construct(pipefd[0], Core::Notifier::Read);
-        StringBuilder builder;
+        DuplexMemoryStream stream;
+
+        enum {
+            Continue,
+            Break,
+            NothingLeft,
+        };
+        auto check_and_call = [&] {
+            auto ifs = shell->local_variable_or("IFS", "\n");
+
+            if (auto offset = stream.offset_of(ifs.bytes()); offset.has_value()) {
+                auto line_end = offset.value();
+                if (line_end == 0) {
+                    auto rc = stream.discard_or_error(ifs.length());
+                    ASSERT(rc);
+
+                    if (shell->options.inline_exec_keep_empty_segments)
+                        if (callback(create<StringValue>("")) == IterationDecision::Break) {
+                            notifier->set_enabled(false);
+                            // FIXME: Kill all the jobs here.
+                            return Break;
+                        }
+                } else {
+                    auto entry = ByteBuffer::create_uninitialized(line_end + ifs.length());
+                    auto rc = stream.read_or_error(entry);
+                    ASSERT(rc);
+
+                    auto str = StringView(entry.data(), entry.size() - ifs.length());
+                    if (callback(create<StringValue>(str)) == IterationDecision::Break) {
+                        notifier->set_enabled(false);
+                        // FIXME: Kill all the jobs here.
+                        return Break;
+                    }
+                }
 
 
+                return Continue;
+            }
+
+            return NothingLeft;
+        };
         auto try_read = [&] {
         auto try_read = [&] {
-            u8 buffer[4096];
-            size_t remaining_size = 4096;
+            constexpr static auto buffer_size = 4096;
+            u8 buffer[buffer_size];
+            size_t remaining_size = buffer_size;
+
             for (;;) {
             for (;;) {
-                if (remaining_size == 0)
-                    break;
+                if (check_and_call() == Break)
+                    return;
+
                 auto read_size = read(pipefd[0], buffer, remaining_size);
                 auto read_size = read(pipefd[0], buffer, remaining_size);
                 if (read_size < 0) {
                 if (read_size < 0) {
                     if (errno == EINTR)
                     if (errno == EINTR)
@@ -914,10 +967,9 @@ RefPtr<Value> Execute::run(RefPtr<Shell> shell)
                 }
                 }
                 if (read_size == 0)
                 if (read_size == 0)
                     break;
                     break;
-                remaining_size -= read_size;
-            }
 
 
-            builder.append(StringView { buffer, 4096 - remaining_size });
+                stream.write({ buffer, (size_t)read_size });
+            }
         };
         };
 
 
         notifier->on_ready_to_read = [&] {
         notifier->on_ready_to_read = [&] {
@@ -936,7 +988,23 @@ RefPtr<Value> Execute::run(RefPtr<Shell> shell)
             dbg() << "close() failed: " << strerror(errno);
             dbg() << "close() failed: " << strerror(errno);
         }
         }
 
 
-        return create<StringValue>(builder.build(), shell->local_variable_or("IFS", "\n"), shell->options.inline_exec_keep_empty_segments);
+        if (!stream.eof()) {
+            auto action = Continue;
+            do {
+                action = check_and_call();
+                if (action == Break)
+                    return;
+            } while (action == Continue);
+
+            if (!stream.eof()) {
+                auto entry = ByteBuffer::create_uninitialized(stream.remaining());
+                auto rc = stream.read_or_error(entry);
+                ASSERT(rc);
+                callback(create<StringValue>(String::copy(entry)));
+            }
+        }
+
+        return;
     }
     }
 
 
     RefPtr<Job> last_job;
     RefPtr<Job> last_job;
@@ -946,7 +1014,23 @@ RefPtr<Value> Execute::run(RefPtr<Shell> shell)
         last_job = move(job);
         last_job = move(job);
     }
     }
 
 
-    return create<JobValue>(move(last_job));
+    callback(create<JobValue>(move(last_job)));
+
+    return;
+}
+
+RefPtr<Value> Execute::run(RefPtr<Shell> shell)
+{
+    NonnullRefPtrVector<Value> values;
+    for_each_entry(shell, [&](auto value) {
+        values.append(*value);
+        return IterationDecision::Continue;
+    });
+
+    if (values.size() == 1 && values.first().is_job())
+        return values.first();
+
+    return create<ListValue>(move(values));
 }
 }
 
 
 void Execute::highlight_in_editor(Line::Editor& editor, Shell& shell, HighlightMetadata metadata)
 void Execute::highlight_in_editor(Line::Editor& editor, Shell& shell, HighlightMetadata metadata)

+ 2 - 0
Shell/AST.h

@@ -352,6 +352,7 @@ private:
 class Node : public RefCounted<Node> {
 class Node : public RefCounted<Node> {
 public:
 public:
     virtual void dump(int level) const = 0;
     virtual void dump(int level) const = 0;
+    virtual void for_each_entry(RefPtr<Shell> shell, Function<IterationDecision(RefPtr<Value>)> callback);
     virtual RefPtr<Value> run(RefPtr<Shell>) = 0;
     virtual RefPtr<Value> run(RefPtr<Shell>) = 0;
     virtual void highlight_in_editor(Line::Editor&, Shell&, HighlightMetadata = {}) = 0;
     virtual void highlight_in_editor(Line::Editor&, Shell&, HighlightMetadata = {}) = 0;
     virtual Vector<Line::CompletionSuggestion> complete_for_editor(Shell&, size_t, const HitTestResult&);
     virtual Vector<Line::CompletionSuggestion> complete_for_editor(Shell&, size_t, const HitTestResult&);
@@ -657,6 +658,7 @@ public:
     virtual ~Execute();
     virtual ~Execute();
     void capture_stdout() { m_capture_stdout = true; }
     void capture_stdout() { m_capture_stdout = true; }
     RefPtr<Node> command() { return m_command; }
     RefPtr<Node> command() { return m_command; }
+    virtual void for_each_entry(RefPtr<Shell> shell, Function<IterationDecision(RefPtr<Value>)> callback) override;
 
 
 private:
 private:
     virtual void dump(int level) const override;
     virtual void dump(int level) const override;