GVKun编程网logo

subprocess.call(subprocess.call详解)

3

本文将介绍subprocess.call的详细情况,特别是关于subprocess.call详解的相关信息。我们将通过案例分析、数据研究等多种方式,帮助您更全面地了解这个主题,同时也将涉及一些关于an

本文将介绍subprocess.call的详细情况,特别是关于subprocess.call详解的相关信息。我们将通过案例分析、数据研究等多种方式,帮助您更全面地了解这个主题,同时也将涉及一些关于android – subprocess.CalledProcessError返回非零退出状态1、C ++ forksubprocess,请求subprocess列表,在Linux中杀死进程、Python subprocess 模块-call() 实例源码、Python subprocess 模块-CalledProcessError() 实例源码的知识。

本文目录一览:

subprocess.call(subprocess.call详解)

subprocess.call(subprocess.call详解)

我是subprocess.call函数的新手,并且尝试了同一调用的不同组合,但无法正常工作。

我正在尝试执行以下命令:

cmd = 'sort -k1,1 -k4,4n -k5,5n '+outpath+fnametempout+' > '+outpath+fnameout
print cmd

如果我尝试拨打电话,则会收到错误消息:

cmd = cmd.split(" ")
print cmd
subprocess.call(cmd)

我得到的错误是:

sort: stat failed: >: No such file or directory

android – subprocess.CalledProcessError返回非零退出状态1

android – subprocess.CalledProcessError返回非零退出状态1

在运行kivy时,我收到了这个错误.我已经安装了蚂蚁.

subprocess.CalledProcessError: Command '['ant', 'debug']' returned non-zero exit status 1

谁能告诉我为什么会出现这个错误?由于Python或蚂蚁或其他任何东西?

解决方法:

我收到了这个错误,但真正的错误略高于此:

Buildfile: /Users/rallen/Documents/Devel/python/kivy/python-for-android/dist/default/build.xml

BUILD Failed
/Users/rallen/Documents/Devel/python/kivy/python-for-android/dist/default/build.xml:6: **Source resource does not exist: /Users/rallen/Documents/Devel/python/kivy/python-for-android/dist/default/project.properties**

Total time: 0 seconds
Traceback (most recent call last):
  File "./build.py", line 412, in <module>
    make_package(args)
  File "./build.py", line 336, in make_package
    subprocess.check_call([ANT, arg])
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/subprocess.py", line 511, in check_call
    raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command '['ant', 'debug']' returned non-zero exit status 1

谷歌搜索了一下,我找到了

https://groups.google.com/forum/#!topic/kivy-users/igYampuxxCU

据说只是创建dist / default / project.properties并放

target=android-14

在那个文件中.我实际上使用了android-19,但这让我超越了这个问题.

C ++ forksubprocess,请求subprocess列表,在Linux中杀死进程

C ++ forksubprocess,请求subprocess列表,在Linux中杀死进程

我试图创build一个subprocess,发送subprocess命令“LISTALL”。 subprocess然后应该向系统发出命令ps并将该列表返回给父进程。 父进程应该select一个进程并杀死它。 这是我迄今为止,但我只是得到它运行麻烦。

#include <stdio.h> #include <unistd.h> #include <cstring> #include <stdlib.h> #include <iostream> #include <sys/wait.h> char* getlistofProcesses(const char* cmd) { FILE* pipe = popen(cmd,"r"); if (!pipe) return (char*)"ERROR"; char buffer[128]; char *result = new char[1024]; while(!feof(pipe)) { if(fgets(buffer,128,pipe) != NULL) strcat(result,buffer); } pclose(pipe); return result; } int spawnGEdit() { pid_t gPid = fork(); if(gPid == 0) { execl("gedit","gedit",NULL); exit(-1); } else { } return 0; } int main(int argc,char **argv) { int P2C[2]; int C2P[2]; pipe(P2C); pipe(C2P); pid_t cPid = fork(); char cmd[50]; char* listofProcesses = new char[1024]; spawnGEdit(); if (cPid == 0) { close(P2C[1]); close(C2P[0]); read(P2C[0],cmd,10); if(strcmp(cmd,"LISTALL") == 0) { write(C2P[1],getlistofProcesses("ps"),1024); close(P2C[0]); close(C2P[1]); } } else if (cPid > 0) { close(C2P[1]); close(P2C[0]); write(P2C[1],"LISTALL",10); wait(NULL); read(C2P[0],listofProcesses,1024); printf("%s",listofProcesses); //Todo //get user input of a PID //kill the PID close(C2P[0]); close(P2C[1]); } else { // fork Failed printf("Forking Failed!n"); exit(1); } return 0; }

这些是我在尝试编译时遇到的错误:

/tmp/cciTPIOZ.o: In function `getlistofProcesses(char const*)': test.cpp:(.text+0x53): undefined reference to `operator new[](unsigned long)' /tmp/cciTPIOZ.o: In function `main': test.cpp:(.text+0x166): undefined reference to `operator new[](unsigned long)' /tmp/cciTPIOZ.o: In function `__static_initialization_and_destruction_0(int,int)': test.cpp:(.text+0x2c0): undefined reference to `std::ios_base::Init::Init()' test.cpp:(.text+0x2cf): undefined reference to `std::ios_base::Init::~Init()' collect2: error: ld returned 1 exit status

我正在编译:

cc test.cpp -o test

有pipe道,叉子,dup2的问题

clone()系统调用是否最终依赖于fork函数?

具体来说,fork()如何在Linux中处理从malloc()dynamic分配的内存?

C多个进程写入1个pipe道

使用pipe道发送多个string到subprocess

从pipe道读取()保证在EOF之前提供所有primefaces写入的数据?

subprocess在使用共享内存时挂起?

如何复制当前进程?

父母怎样才能用一根pipe子给n个孩子发消息?

PHP:pcntl_fork()真的做什么?

编译错误行号:9,53,64可以通过使用这些解决:

第9行: FILE* pipe = popen(cmd.data(),"r");

第53行: write(C2P[1],getlistofProcesses("ps").data(),1024);

第64行: printf("%s",listofProcesses.data());

原因:这些popen,write,printf需要char *作为它们的参数,但是你传递它们的std :: string。 你必须使用std :: string.data()函数,因为它返回指向由std::string对象表示的字符数组的指针。

对于你在第63行的错误,请参考这个 。

PS: – 对于您所编辑的问题:

第10行: if (!pipe) return (char*)"ERROR";

第12行: char *result = new char[1024];

第53行:(在第7行中更改) char* getlistofProcesses(const char* cmd)

有点建议:使用wait(NULL); 在读取listofProcesses和exit(0);之前的父进程中exit(0); 在子进程结束时。

工作代码:

#include <stdio.h> #include <unistd.h> #include <cstring> #include <stdlib.h> #include <iostream> #include <sys/wait.h> char* getlistofProcesses(const char* cmd) { FILE* pipe = popen(cmd,1024); close(P2C[0]); close(C2P[1]); } exit(0); } else if (cPid > 0) { close(C2P[1]); close(P2C[0]); write(P2C[1],listofProcesses); //Todo //get user input of a PID //kill the PID close(C2P[0]); close(P2C[1]); } else { // fork Failed printf("Forking Failed!n"); exit(1); } return 0; }

Python subprocess 模块-call() 实例源码

Python subprocess 模块-call() 实例源码

Python subprocess 模块,call() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用subprocess.call()

项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def configure_analyst_opsvm():
    ''''''
    Configures Anaylyst for OPSVM
    ''''''
    if not service_running(''plumgrid''):
        restart_pg()
    opsvm_ip = pg_gw_context._pg_dir_context()[''opsvm_ip'']
    NS_ENTER = (''/opt/local/bin/nsenter -t $(ps ho pid --ppid $(cat ''
                ''/var/run/libvirt/lxc/plumgrid.pid)) -m -n -u -i -p '')
    sigmund_stop = NS_ENTER + ''/usr/bin/service plumgrid-sigmund stop''
    sigmund_status = NS_ENTER \\
        + ''/usr/bin/service plumgrid-sigmund status''
    sigmund_autoboot = NS_ENTER \\
        + ''/usr/bin/sigmund-configure --ip {0} --start --autoboot'' \\
        .format(opsvm_ip)
    try:
        status = subprocess.check_output(sigmund_status, shell=True)
        if ''start/running'' in status:
            if subprocess.call(sigmund_stop, shell=True):
                log(''plumgrid-sigmund Couldn\\''t be stopped!'')
                return
        subprocess.check_call(sigmund_autoboot, shell=True)
        status = subprocess.check_output(sigmund_status, shell=True)
    except:
        log(''plumgrid-sigmund Couldn\\''t be started!'')
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def cmp_pkgrevno(package, revno, pkgcache=None):
    """Compare supplied revno with the revno of the installed package

    *  1 => Installed revno is greater than supplied arg
    *  0 => Installed revno is the same as supplied arg
    * -1 => Installed revno is less than supplied arg

    This function imports apt_cache function from charmhelpers.fetch if
    the pkgcache argument is None. Be sure to add charmhelpers.fetch if
    you call this function,or pass an apt_pkg.Cache() instance.
    """
    import apt_pkg
    if not pkgcache:
        from charmhelpers.fetch import apt_cache
        pkgcache = apt_cache()
    pkg = pkgcache[package]
    return apt_pkg.version_compare(pkg.current_ver.ver_str, revno)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def log(message, level=None):
    """Write a message to the juju log"""
    command = [''juju-log'']
    if level:
        command += [''-l'', level]
    if not isinstance(message, six.string_types):
        message = repr(message)
    command += [message]
    # Missing juju-log should not cause failures in unit tests
    # Send log output to stderr
    try:
        subprocess.call(command)
    except OSError as e:
        if e.errno == errno.ENOENT:
            if level:
                message = "{}: {}".format(level, message)
            message = "juju-log: {}".format(message)
            print(message, file=sys.stderr)
        else:
            raise
项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def __call__(self, *args, **kwargs):
        """Trap ``SIGTERM`` and call wrapped function."""
        self._caught_signal = None
        # Register handler for SIGTERM,then call `self.func`
        self.old_signal_handler = signal.getsignal(signal.SIGTERM)
        signal.signal(signal.SIGTERM, self.signal_handler)

        self.func(*args, **kwargs)

        # Restore old signal handler
        signal.signal(signal.SIGTERM, self.old_signal_handler)

        # Handle any signal caught during execution
        if self._caught_signal is not None:
            signum, frame = self._caught_signal
            if callable(self.old_signal_handler):
                self.old_signal_handler(signum, frame)
            elif self.old_signal_handler == signal.SIG_DFL:
                sys.exit(0)
项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def install_update():
    """If a newer release is available,download and install it.

    :returns: ``True`` if an update is installed,else ``False``

    """
    update_data = wf().cached_data(''__workflow_update_status'', max_age=0)

    if not update_data or not update_data.get(''available''):
        wf().logger.info(''No update available'')
        return False

    local_file = download_workflow(update_data[''download_url''])

    wf().logger.info(''Installing updated workflow ...'')
    subprocess.call([''open'', local_file])

    update_data[''available''] = False
    wf().cache_data(''__workflow_update_status'', update_data)
    return True
项目:subtitle-synchronization    作者:AlbertoSabater    | 项目源码 | 文件源码
def getAudio(freq, audio_files=None):

    files = os.listdir(data_dir)
    p = re.compile(''.*\\.[mkv|avi]'')
    files = [ f for f in files if p.match(f) ]

    if audio_files:
        files = [ f for f in files if os.path.splitext(f)[0] in audio_files]

    audio_dirs = []
    for f in files:
        name, extension = os.path.splitext(f)
        command = "ffmpeg -i {0}{1}{2} -ab 160k -ac 2 -ar {3} -vn {0}{1}_{3}.wav".format(data_dir, name, extension, freq)
        audio_dirs.append(data_dir + name + ''_'' + str(freq) + ''.wav'')
        subprocess.call(command, shell=True)

    return audio_dirs

# Convert timestamp to seconds
项目:Pillage    作者:kostrin    | 项目源码 | 文件源码
def webEnum(self, args):
        print "INFO: Performing nmap http script scan for {}:{}".format(args[0],args[1])
        nmapSCAN = "nmap -sV -Pn -vv -p {} --script=''(httP* or ssl*) and not (dos or fuzzer or brute)'' -oN {}_http.nmap {}".format(args[1],args[0],args[0])
        subprocess.check_output(nmapSCAN, shell=True)

        print "INFO: Performing nikto scan on {}:{}".format(args[0],args[1])
        script="nikto -host {} -port {} -C all >> {}_nikto_{}.txt".format(args[0],args[1],args[1])
        subprocess.check_output(script, shell=True)

        ''''''
        print "INFO: Performing dirb scan on {}:{}".format(args[0],args[1])
        dirbList="/usr/share/wordlists/dirbuster/directory-list-2.3-small.txt"
        script="dirb {}://{}:{} {} -S -w >> {}_dirb_{}.txt".format(args[2],args[0],args[1],dirbList,args[1])
        subprocess.call(script,shell=True)
        ''''''
        print "INFO: Finished http module for {}:{}".format(args[0],args[1])
项目:AVSR-Deep-Speech    作者:pandeydivesh15    | 项目源码 | 文件源码
def convert_mp4(video_dir, audio_dir):
    ''''''
    Args: 
        1. video_dir:   Directory for all video files
        2. audio_dir:   Directory where all converted files will be stored.
    ''''''

    # Get all file names
    video_file_names = sorted(glob.glob(video_dir + "*.mp4"))
    # Extract actual names of file,also remove any extensions
    video_names = map(lambda x : x.split(''/'')[-1].split(".")[0], video_file_names)

    # Command for converting video to audio
    command = "ffmpeg -i " + video_dir + "{0}.mp4 -ab 96k -ar 44100 -vn " + audio_dir + "{0}.wav"

    for name in video_names:
        subprocess.call(command.format(name), shell=True)
项目:docklet    作者:unias    | 项目源码 | 文件源码
def delete_container(self, lxc_name):
        logger.info ("delete container:%s" % lxc_name)
        if self.imgmgr.deleteFS(lxc_name):
            Container_Collector.billing_increment(lxc_name)
            self.historymgr.log(lxc_name,"Delete")
            logger.info("delete container %s success" % lxc_name)
            return [True, "delete container success"]
        else:
            logger.info("delete container %s Failed" % lxc_name)
            return [False, "delete container Failed"]
        #status = subprocess.call([self.libpath+"/lxc_control.sh","delete",lxc_name])
        #if int(status) == 1:
        #    logger.error("delete container %s Failed" % lxc_name)
        #    return [False,"delete container Failed"]
        #else:
        #    logger.info ("delete container %s success" % lxc_name)
        #    return [True,"delete container success"]

    # start container,if running,restart it
项目:docklet    作者:unias    | 项目源码 | 文件源码
def recover_container(self, lxc_name):
        logger.info ("recover container:%s" % lxc_name)
        #status = subprocess.call([self.libpath+"/lxc_control.sh","status",lxc_name])
        [success, status] = self.container_status(lxc_name)
        if not success:
            return [False, status]
        self.imgmgr.checkFS(lxc_name)
        if status == ''stopped'':
            logger.info("%s stopped,recover it to running" % lxc_name)
            if self.start_container(lxc_name)[0]:
                self.historymgr.log(lxc_name,"Recover")
                if self.start_services(lxc_name)[0]:
                    logger.info("%s recover success" % lxc_name)
                    return [True, "recover success"]
                else:
                    logger.error("%s recover Failed with services not start" % lxc_name)
                    return [False, "recover Failed for services not start"]
            else:
                logger.error("%s recover Failed for container starting Failed" % lxc_name)
                return [False, "recover Failed for container starting Failed"]
        else:
            logger.info("%s recover success" % lxc_name)
            return [True, "recover success"]
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def service(action, service_name, **kwargs):
    """Control a system service.

    :param action: the action to take on the service
    :param service_name: the name of the service to perform th action on
    :param **kwargs: additional params to be passed to the service command in
                    the form of key=value.
    """
    if init_is_systemd():
        cmd = [''systemctl'', action, service_name]
    else:
        cmd = [''service'', action]
        for key, value in six.iteritems(kwargs):
            parameter = ''%s=%s'' % (key, value)
            cmd.append(parameter)
    return subprocess.call(cmd) == 0
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def log(message, file=sys.stderr)
        else:
            raise
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def service(action, value)
            cmd.append(parameter)
    return subprocess.call(cmd) == 0
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def load_prevIoUs(self, path=None):
        """Load prevIoUs copy of config from disk.

        In normal usage you don''t need to call this method directly - it
        is called automatically at object initialization.

        :param path:

            File path from which to load the prevIoUs config. If `None`,
            config is loaded from the default location. If `path` is
            specified,subsequent `save()` calls will write to the same
            path.

        """
        self.path = path or self.path
        with open(self.path) as f:
            self._prev_dict = json.load(f)
        for k, v in copy.deepcopy(self._prev_dict).items():
            if k not in self:
                self[k] = v
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def _run_apt_command(cmd, fatal=False):
    """Run an apt command with optional retries.

    :param: cmd: str: The apt command to run.
    :param: fatal: bool: Whether the command''s output should be checked and
        retried.
    """
    # Provide DEBIAN_FRONTEND=noninteractive if not present in the environment.
    cmd_env = {
        ''DEBIAN_FRONTEND'': os.environ.get(''DEBIAN_FRONTEND'', ''noninteractive'')}

    if fatal:
        _run_with_retries(
            cmd, cmd_env=cmd_env, retry_exitcodes=(1, APT_NO_LOCK,),
            retry_message="Couldn''t acquire DPKG lock")
    else:
        env = os.environ.copy()
        env.update(cmd_env)
        subprocess.call(cmd, env=env)
项目:ICGan-tensorflow    作者:zhangqianhui    | 项目源码 | 文件源码
def download_mnist(dirpath):
    data_dir = os.path.join(dirpath, ''mnist'')
    if os.path.exists(data_dir):
        print(''Found MNIST - skip'')
        return
    else:
        os.mkdir(data_dir)
    url_base = ''http://yann.lecun.com/exdb/mnist/''
    file_names = [''train-images-idx3-ubyte.gz'',''train-labels-idx1-ubyte.gz'',''t10k-images-idx3-ubyte.gz'',''t10k-labels-idx1-ubyte.gz'']
    for file_name in file_names:
        url = (url_base+file_name).format(**locals())
        print(url)
        out_path = os.path.join(data_dir,file_name)
        cmd = [''curl'', url, ''-o'', out_path]
        print(''Downloading '', file_name)
        subprocess.call(cmd)
        cmd = [''gzip'', ''-d'', out_path]
        print(''Decompressing '', file_name)
        subprocess.call(cmd)
项目:rivalcfg    作者:flozz    | 项目源码 | 文件源码
def run(self):
        _install.run(self)
        print("Installing udev rules...")
        if not os.path.isdir("/etc/udev/rules.d"):
            print("WARNING: udev rules have not been installed (/etc/udev/rules.d is not a directory)")
            return
        try:
            shutil.copy("./rivalcfg/data/99-steelseries-rival.rules", "/etc/udev/rules.d/")
        except IOError:
            print("WARNING: udev rules have not been installed (permission denied)")
            return
        try:
            subprocess.call(["udevadm", "trigger"])
        except OSError:
            print("WARNING: unable to update udev rules,please run the ''udevadm trigger'' command")
            return
        print("Done!")
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def set_data_field(record, field_name, field_val):
    assert(len(record.samples) == 1)
    new_format = record.FORMAT
    new_fields = new_format.split('':'')
    if not(field_name in new_fields):
        new_fields = new_fields + [field_name]
        new_format = '':''.join(new_fields)

    sample_call = get_record_sample_call(record)
    data = sample_call.data
    data_dict = data._asdict()
    data_dict[field_name] = field_val

    new_sample_vals = []
    for field in new_fields:
        new_sample_vals.append(data_dict[field])

    # Note - the old way of passing the fields to pyVCF is memory intensive
    # because a fresh type is allocated for each call to make_calldata_tuple
    #data_instantiator = vcf.model.make_calldata_tuple(new_fields)
    #data = data_instantiator(*new_sample_vals)
    data = FakeNamedTuple(new_fields, new_sample_vals)
    sample_call.data = data
    record.samples[0] = sample_call
    record.FORMAT = new_format
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def combine_vcfs(output_filename, input_vcf_filenames):
    tmp_filename = output_filename + ".tmp"

    for (i,fn) in enumerate(input_vcf_filenames):
        if i == 0:
            args = ''cat '' + fn
            subprocess.check_call(args + " > " + tmp_filename, shell=True)
        else:
            args = ''grep -v "^#" '' + fn
            ret = subprocess.call(args + " >> " + tmp_filename, shell=True)
            if ret == 2:
                raise Exception("grep call Failed: " + args)

    # Sort and index the files
    tk_tabix.sort_vcf(tmp_filename, output_filename)
    tk_tabix.index_vcf(output_filename)

    os.remove(tmp_filename)
项目:cmake.nvim    作者:phillipbonhomme    | 项目源码 | 文件源码
def run_cmake(self):
        print("Running CMake")
        build_dir_cmd_out = subprocess.call(
            ["mkdir", "build"],
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL)
        if build_dir_cmd_out != 0:
            print("Can\\''t setup CMake build directory.")
            return

        if self.cmake_build_info["build_dir"].is_dir():
            try:
                subprocess.check_output(
                    self.cmake_cmd_info["cmake_cmd"],
                    cwd=str(self.cmake_build_info["build_dir"]))
            except subprocess.CalledProcessError as e:
                print(e.output)
            if not self.cmake_build_info["comp_data_cmake"].is_file():
                print("Couldn''t setup CMake Project")
                return
        else:
            print("Couldn''t setup CMake Project")
            return
项目:benchmarks    作者:tensorflow    | 项目源码 | 文件源码
def DeletePods(pod_name, yaml_file):
  """Deletes pods based on the given kubernetes config.

  Args:
    pod_name: ''name-prefix'' selector for the pods.
    yaml_file: kubernetes yaml config.

  Raises:
    TimeoutError: if jobs didn''t terminate for a long time.
  """
  command = [_KUBECTL, ''delete'', ''--filename=%s'' % yaml_file]
  logging.info(''Deleting pods: %s'', '' ''.join(command))
  subprocess.call(command)

  def CheckPodsAreTerminated():
    return not _GetPodNames(pod_name)
  if not _WaitUntil(100, CheckPodsAreTerminated):
    raise TimeoutError(
        ''Timed out waiting for %s pod to terminate.'' % pod_name)
项目:dl4mt-multi    作者:nyu-dl    | 项目源码 | 文件源码
def call_script(model_path, config, proto, eval_script,
                num_process=10, normalize=True):
    ''''''
    open pipe and call the script
    ''''''
    try:
        subprocess.call(
            " python {} {} --config={} "
            " --proto={} "
            " -p {} {}".format(
                eval_script, model_path,
                proto, num_process,
                '' -n '' if normalize else ''''),
            shell=True)
    except:
        traceback.print_exc(file=sys.stdout)
        print ''error in call_bleu_script()''
    print model_path
项目:silverchain    作者:tomokinakamaru    | 项目源码 | 文件源码
def draw(self, graph, highlight=None):
        gv = ''''
        gv += ''digraph G{} {{\\n''.format(self._n)
        gv += ''  layout=neato;\\n''
        gv += self._ranks
        for s, d in graph.edges_iter():
            gv += ''  "{}" -> "{}"''.format(s, d)
            if (s, d) == highlight:
                gv += ''[color=red,penwidth=3]''
            gv += '';\\n''
        gv += ''}''

        fname = self.WSDIR + ''/{0:04d}''.format(self._n)
        with open(fname + ''.dot'', ''w'') as f:
            print(gv, file=f)

        cmd = ''''
        cmd += ''/usr/local/bin/neato''
        cmd += '' -Tpng {f}.dot -o{f}.png''.format(f=fname)
        subprocess.call(cmd, shell=True)

        self._n += 1
项目:ipwb    作者:oduwsdl    | 项目源码 | 文件源码
def isDaemonAlive(hostAndPort="{0}:{1}".format(IPFSAPI_IP, IPFSAPI_PORT)):
    """Ensure that the IPFS daemon is running via HTTP before proceeding"""
    client = ipfsapi.Client(IPFSAPI_IP, IPFSAPI_PORT)

    try:
        # OSError if ipfs not installed,redundant of below
        # subprocess.call([''ipfs'',''--version''],stdout=open(devnull,''wb''))

        # ConnectionError/AttributeError if IPFS daemon not running
        client.id()
        return True
    except (ConnectionError, exceptions.AttributeError):
        logError("Daemon is not running at http://" + hostAndPort)
        return False
    except OSError:
        logError("IPFS is likely not installed. "
                 "See https://ipfs.io/docs/install/")
        sys.exit()
    except:
        logError(''UnkNown error in retrieving daemon status'')
        logError(sys.exc_info()[0])
项目:imagepaste    作者:robinchenyu    | 项目源码 | 文件源码
def grab(bBox=None):
    if sys.platform == "darwin":
        f, file = tempfile.mkstemp(''.png'')
        os.close(f)
        subprocess.call([''screencapture'', ''-x'', file])
        im = Image.open(file)
        im.load()
        os.unlink(file)
    else:
        size, data = grabber()
        im = Image.frombytes(
            "RGB", size, data,
            # RGB,32-bit line padding,origo in lower left corner
            "raw", "BGR", (size[0]*3 + 3) & -4, -1
            )
    if bBox:
        im = im.crop(bBox)
    return im
项目:sat6_scripts    作者:RedHatSatellite    | 项目源码 | 文件源码
def publish_cv(dryrun):
    print "Running Content View Publish..."

    # Set the initial state
    good_publish = False

    if not dryrun:
        rc = subprocess.call([''/usr/local/bin/publish_content_views'', ''-q'', ''-a''])
    else:
        msg = "Dry run - not actually performing publish"
        helpers.log_msg(msg, ''WARNING'')
        rc = subprocess.call([''/usr/local/bin/publish_content_views'', ''-a'', ''-d''])

    if rc == 0:
        good_publish = True

    return good_publish
项目:sat6_scripts    作者:RedHatSatellite    | 项目源码 | 文件源码
def promote_cv(dryrun, lifecycle):
    print "Running Content View Promotion to " + lifecycle + "..."

    # Set the initial state
    good_promote = False

    if not dryrun:
        rc = subprocess.call([''/usr/local/bin/promote_content_views'', ''-e'', lifecycle])
    else:
        msg = "Dry run - not actually performing promotion"
        helpers.log_msg(msg, ''WARNING'')
        rc = subprocess.call([''/usr/local/bin/promote_content_views'', lifecycle])

    if rc == 0:
        good_promote = True

    return good_promote
项目:sat6_scripts    作者:RedHatSatellite    | 项目源码 | 文件源码
def postModule(moduleTar, moduleInputDir, pfserver, pfmodpath, pfuser, pftoken):
    """
    Function to push puppet modules using curl to Artifiactory repository
    """
    # Remove module''s extension (.tar.gz)
    puppetModuleNameNoExt = splitext(moduleTar)[0]

    # Remove the path from the module
    puppetModuleName = puppetModuleNameNoExt.split(''/'')[-1]

    # Split the module name into the required parts
    puppetModuleNameList = puppetModuleName.split(''-'')
    author = puppetModuleNameList[0]
    moduleName = puppetModuleNameList[1]
    version = puppetModuleNameList[2]

    url = "http://" + pfserver + pfmodpath + "/" + author + "/" + moduleName + "/" + moduleTar
    fileName = moduleInputDir + "/" + moduleTar

    # Put the files using curl (need to clean this up)
    authtoken = pfuser + ":" + pftoken
    subprocess.call([''curl'', ''-u'', authtoken, ''-XPUT'', ''-T'', fileName])
项目:bap-ida-python    作者:BinaryAnalysisPlatform    | 项目源码 | 文件源码
def _setup_headers(self, bap):
        "pass type information from IDA to BAP"
        # this is very fragile,and may break in case
        # if we have several BAP instances,especially
        # when they are running on different binaries.
        # Will leave it as it is until issue #588 is
        # resolved in the upstream
        with self.tmpfile("h") as out:
            ida.output_types(out)
            subprocess.call(bap, [
                ''--api-add'', ''c:"{0}"''.format(out.name),
            ])

        def cleanup():
            subprocess.call(bap, [
                "--api-remove", "c:{0}".
                format(os.path.basename(out.name))
            ])
        self.on_cleanup(cleanup)
项目:pscheduler    作者:perfsonar    | 项目源码 | 文件源码
def graceful_exit(tmpdir, keep_data_files=False, proc=None, pkill_cmd=None):
    #kill process if any,but keep in on try so doesn''t prevent directory clean-up
    try:
        if proc: 
            proc.terminate()
            log.debug("Sent terminate to powstream process %s" % proc.pid)
    except: 
        pass

    #if they are still not down,force them down
    try:
        if pkill_cmd:
            time.sleep("2")
            call([pkill_cmd, ''-9'', ''-f'', tmpdir ], shell=False)
    except:
        pass

    #clean directory
    try:
        cleanup_dir(tmpdir, keep_data_files=keep_data_files)
    except:
        pass

##
# Removes a data file
项目:deployfish    作者:caltechads    | 项目源码 | 文件源码
def __render(self):
        """
        Build the argument list for the ``register_task_deFinition()`` call.

        :rtype: dict
        """
        r = {}
        r[''family''] = self.family
        r[''networkMode''] = self.networkMode
        if self.taskRoleArn:
            r[''taskRoleArn''] = self.taskRoleArn
        r[''containerDeFinitions''] = [c.render() for c in self.containers]
        volumes = self.__get_volumes()
        if volumes:
            r[''volumes''] = volumes
        return r
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def _openDownloadFile(self, buildId, suffix):
        (tmpFd, tmpName) = mkstemp()
        url = self._makeUrl(buildId, suffix)
        try:
            os.close(tmpFd)
            env = { k:v for (k,v) in os.environ.items() if k in self.__whiteList }
            env["BOB_LOCAL_ARTIFACT"] = tmpName
            env["BOB_REMOTE_ARTIFACT"] = url
            ret = subprocess.call(["/bin/bash", "-ec", self.__downloadCmd],
                stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL,
                cwd="/tmp", env=env)
            if ret == 0:
                ret = tmpName
                tmpName = None
                return CustomDownloader(ret)
            else:
                raise ArtifactDownloadError("Failed (exit {})".format(ret))
        finally:
            if tmpName is not None: os.unlink(tmpName)
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def doHelp(availableCommands, argv, bobroot):
    parser = argparse.ArgumentParser(prog="bob help",
        description="display help information about command.")
    # Help without a command parameter gets handled by the main argument parser
    # in pym/bob/scripts.py.
    parser.add_argument(''command'', help="Command to get help for")

    args = parser.parse_args(argv)

    if args.command in availableCommands:
        manPage = "bob-" + args.command
        manSection = "1"
    else:
        manPage = "bob" + args.command
        manSection = "7"

    inSourceLoc = os.path.join(bobroot, "doc", "_build", "man", manPage+"."+manSection)
    if os.path.isfile(inSourceLoc):
        ret = subprocess.call(["man", inSourceLoc])
    else:
        ret = subprocess.call(["man", manSection, manPage])

    sys.exit(ret)
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def _scanDir(self, workspace, dir):
        self.__dir = dir
        dir = os.path.join(workspace, dir)
        try:
            remotes = subprocess.check_output(["git", "remote", "-v"],
                cwd=dir, universal_newlines=True).split("\\n")
            remotes = (r[:-8].split("\\t") for r in remotes if r.endswith("(fetch)"))
            self.__remotes = { remote:url for (remote,url) in remotes }

            self.__commit = subprocess.check_output(["git", "rev-parse", "HEAD"], universal_newlines=True).strip()
            self.__description = subprocess.check_output(
                ["git", "describe", "--always", "--dirty"], universal_newlines=True).strip()
            self.__dirty = subprocess.call(["git", "diff-index", "--quiet", "HEAD", "--"],
                cwd=dir) != 0
        except subprocess.CalledProcessError as e:
            raise BuildError("Git audit Failed: " + str(e))
        except OSError as e:
            raise BuildError("Error calling git: " + str(e))
项目:demcoreg    作者:dshean    | 项目源码 | 文件源码
def get_nlcd_fn(datadir=None):
    """Calls external shell script `get_nlcd.sh` to fetch:

    2011 Land Use Land Cover (nlcd) grids,30 m

    http://www.mrlc.gov/nlcd11_leg.PHP
    """
    if datadir is None:
        datadir = iolib.get_datadir()
    #This is original filename,which requires ~17 GB
    #nlcd_fn = os.path.join(datadir,''nlcd_2011_landcover_2011_edition_2014_10_10/nlcd_2011_landcover_2011_edition_2014_10_10.img'')
    #get_nlcd.sh Now creates a compressed GTiff,which is 1.1 GB
    nlcd_fn = os.path.join(datadir, ''nlcd_2011_landcover_2011_edition_2014_10_10/nlcd_2011_landcover_2011_edition_2014_10_10.tif'')
    if not os.path.exists(nlcd_fn):
        cmd = [''get_nlcd.sh'',]
        subprocess.call(cmd)
    return nlcd_fn
项目:demcoreg    作者:dshean    | 项目源码 | 文件源码
def get_bareground_fn(datadir=None):
    """Calls external shell script `get_bareground.sh` to fetch:

    ~2010 global bare ground,30 m

    Note: unzipped file size is 64 GB! Original products are uncompressed,and tiles are available globally (including empty data over ocean)

    The shell script will compress all downloaded tiles using lossless LZW compression.

    http://landcover.usgs.gov/glc/BareGroundDescriptionAndDownloads.PHP
    """
    if datadir is None:
        datadir = iolib.get_datadir()
    bg_fn = os.path.join(datadir, ''bare2010/bare2010.vrt'')
    if not os.path.exists(bg_fn):
        cmd = [''get_bareground.sh'',]
        subprocess.call(cmd)
    return bg_fn 

#Download latest global RGI glacier db
项目:demcoreg    作者:dshean    | 项目源码 | 文件源码
def get_glacier_poly(datadir=None):
    """Calls external shell script `get_rgi.sh` to fetch:

    Randolph Glacier Inventory (RGI) glacier outline shapefiles 

    Full RGI database: rgi50.zip is 410 MB

    The shell script will unzip and merge regional shp into single global shp

    http://www.glims.org/RGI/
    """
    if datadir is None:
        datadir = iolib.get_datadir()
    #rgi_fn = os.path.join(datadir,''rgi50/regions/rgi50_merge.shp'')
    #Update to rgi60,should have this returned from get_rgi.sh
    rgi_fn = os.path.join(datadir, ''rgi60/regions/rgi60_merge.shp'')
    if not os.path.exists(rgi_fn):
        cmd = [''get_rgi.sh'',]
        subprocess.call(cmd)
    return rgi_fn 

#Update glacier polygons
项目:factotum    作者:Denubis    | 项目源码 | 文件源码
def safeInstall():
    FACTORIOPATH = getFactorioPath()

    try:
        if not os.path.isdir("%s" % (FACTORIOPATH) ):       

            if os.access("%s/.." % (FACTORIOPATH), os.W_OK):
                os.mkdir(FACTORIOPATH, 0o777)
            else:
                subprocess.call([''sudo'', ''mkdir'', ''-p'', FACTORIOPATH])
                subprocess.call([''sudo'', ''chown'', getpass.getuser(), FACTORIOPATH])


            os.mkdir(os.path.join(FACTORIOPATH, "saves"))
            os.mkdir(os.path.join(FACTORIOPATH, "config"))
            with open("%s/.bashrc" % (os.path.expanduser("~")), "r+") as bashrc:
                lines = bashrc.read()

                if lines.find("eval \\"$(_FACTOTUM_COMPLETE=source factotum)\\"\\n") == -1:
                    bashrc.write("eval \\"$(_FACTOTUM_COMPLETE=source factotum)\\"\\n")
                    print("You''ll want to restart your shell for command autocompletion. Tab is your friend.")
        updateFactorio()
    except IOError as e:
        print("Cannot make %s. Please check permissions. Error %s" % (FACTORIOPATH, e))
        sys.exit(1)
项目:VPP_P4    作者:RuchaMarathe    | 项目源码 | 文件源码
def send_pkts_and_capture(port_interface_mapping, port_packet_list):
    '''''' sends packets to P4 and captures by sniffing ''''''
    queue = Queue.Queue()
    thd = threading.Thread(name="sniff_thread",
                           target=lambda: sniff_record(queue, port_interface_mapping))
    thd.start()
    # gives time for time to start sniffing... so packets are sniffed once sniff call begins
    time.sleep(1)
    for x in port_packet_list:
        port_num = x[''port'']
        iface_name = port_interface_mapping[''port2intf''][port_num]
        sendp(x[''packet''], iface=iface_name)
    thd.join()
    pack = queue.get(True)
    Packet_list = []
    for p in pack:
        eth = p.sniffed_on
        port_no = port_interface_mapping[''intf_port_names''][eth]
        Packet_list.append({''port'': port_no, ''packet'': p})
    return Packet_list
项目:Gank-Alfred-Workflow    作者:hujiaweibujidao    | 项目源码 | 文件源码
def __call__(self, **kwargs):
        self._caught_signal = None
        # Register handler for SIGTERM, frame)
            elif self.old_signal_handler == signal.SIG_DFL:
                sys.exit(0)
项目:ISB-CGC-pipelines    作者:isb-cgc    | 项目源码 | 文件源码
def editPipeline(args, config):
        pipelinedbutils = Pipelinedbutils(config)

        request = json.loads(pipelinedbutils.getJobInfo(select=["request"], where={"job_id": args.jobId})[0].request)

        _, tmp = mkstemp()
        with open(tmp, ''w'') as f:
            f.write("{data}".format(data=json.dumps(request, indent=4)))

        if "EDITOR" in os.environ.keys():
            editor = os.environ["EDITOR"]
        else:
            editor = "/usr/bin/nano"

        if subprocess.call([editor, tmp]) == 0:
            with open(tmp, ''r'') as f:
                request = json.load(f)

            pipelinedbutils.updateJob(args.jobId, keyName="job_id", setValues={"request": json.dumps(request)})
        else:
            print "ERROR: there was a problem editing the request"
            exit(-1)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def service(action, service_name):
    """Control a system service"""
    if init_is_systemd():
        cmd = [''systemctl'', action]
    return subprocess.call(cmd) == 0
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def restart_on_change_helper(lambda_f, restart_map, stopstart=False,
                             restart_functions=None):
    """Helper function to perform the restart_on_change function.

    This is provided for decorators to restart services if files described
    in the restart_map have changed after an invocation of lambda_f().

    @param lambda_f: function to call.
    @param restart_map: {file: [service,...]}
    @param stopstart: whether to stop,start or restart a service
    @param restart_functions: nonstandard functions to use to restart services
                              {svc: func,...}
    @returns result of lambda_f()
    """
    if restart_functions is None:
        restart_functions = {}
    checksums = {path: path_hash(path) for path in restart_map}
    r = lambda_f()
    # create a list of lists of the services to restart
    restarts = [restart_map[path]
                for path in restart_map
                if path_hash(path) != checksums[path]]
    # create a flat list of ordered services without duplicates from lists
    services_list = list(OrderedDict.fromkeys(itertools.chain(*restarts)))
    if services_list:
        actions = (''stop'', ''start'') if stopstart else (''restart'',)
        for service_name in services_list:
            if service_name in restart_functions:
                restart_functions[service_name](service_name)
            else:
                for action in actions:
                    service(action, service_name)
    return r
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def relation_set(relation_id=None, relation_settings=None, **kwargs):
    """Set relation information for the current unit"""
    relation_settings = relation_settings if relation_settings else {}
    relation_cmd_line = [''relation-set'']
    accepts_file = "--file" in subprocess.check_output(
        relation_cmd_line + ["--help"], universal_newlines=True)
    if relation_id is not None:
        relation_cmd_line.extend((''-r'', relation_id))
    settings = relation_settings.copy()
    settings.update(kwargs)
    for key, value in settings.items():
        # Force value to be a string: it always should,but some call
        # sites pass in things like dicts or numbers.
        if value is not None:
            settings[key] = "{}".format(value)
    if accepts_file:
        # --file was introduced in Juju 1.23.2. Use it by default if
        # available,since otherwise we''ll break if the relation data is
        # too big. Ideally we should tell relation-set to read the data from
        # stdin,but that feature is broken in 1.23.2: Bug #1454678.
        with tempfile.NamedTemporaryFile(delete=False) as settings_file:
            settings_file.write(yaml.safe_dump(settings).encode("utf-8"))
        subprocess.check_call(
            relation_cmd_line + ["--file", settings_file.name])
        os.remove(settings_file.name)
    else:
        for key, value in settings.items():
            if value is None:
                relation_cmd_line.append(''{}=''.format(key))
            else:
                relation_cmd_line.append(''{}={}''.format(key, value))
        subprocess.check_call(relation_cmd_line)
    # Flush cache of any relation-gets for local unit
    flush(local_unit())
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def status_set(workload_state, message):
    """Set the workload state with a message

    Use status-set to set the workload state with a message which is visible
    to the user via juju status. If the status-set command is not found then
    assume this is juju < 1.23 and juju-log the message unstead.

    workload_state -- valid juju workload state.
    message        -- status update message
    """
    valid_states = [''maintenance'', ''blocked'', ''waiting'', ''active'']
    if workload_state not in valid_states:
        raise ValueError(
            ''{!r} is not a valid workload state''.format(workload_state)
        )
    cmd = [''status-set'', workload_state, message]
    try:
        ret = subprocess.call(cmd)
        if ret == 0:
            return
    except OSError as e:
        if e.errno != errno.ENOENT:
            raise
    log_message = ''status-set Failed: {} {}''.format(workload_state,
                                                    message)
    log(log_message, level=''INFO'')
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def port_has_listener(address, port):
    """
    Returns True if the address:port is open and being listened to,
    else False.

    @param address: an IP address or hostname
    @param port: integer port

    Note calls ''zc'' via a subprocess shell
    """
    cmd = [''nc'', ''-z'', address, str(port)]
    result = subprocess.call(cmd)
    return not(bool(result))
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def disable_ipv6():
    """
    disable ufw IPv6 support in /etc/default/ufw
    """
    exit_code = subprocess.call([''sed'', ''-i'', ''s/IPV6=.*/IPV6=no/g'',
                                 ''/etc/default/ufw''])
    if exit_code == 0:
        hookenv.log(''IPv6 support in ufw disabled'', level=''INFO'')
    else:
        hookenv.log("Couldn''t disable IPv6 support in ufw", level="ERROR")
        raise UFWError("Couldn''t disable IPv6 support in ufw")
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def apt_mark(packages, mark, fatal=False):
    """Flag one or more packages using apt-mark"""
    log("Marking {} as {}".format(packages, mark))
    cmd = [''apt-mark'', mark]
    if isinstance(packages, six.string_types):
        cmd.append(packages)
    else:
        cmd.extend(packages)

    if fatal:
        subprocess.check_call(cmd, universal_newlines=True)
    else:
        subprocess.call(cmd, universal_newlines=True)
项目:Adafruit_Python_PureIO    作者:adafruit    | 项目源码 | 文件源码
def _python_cmd(*args):
    """
    Return True if the command succeeded.
    """
    args = (sys.executable,) + args
    return subprocess.call(args) == 0
项目:monasca-transform    作者:openstack    | 项目源码 | 文件源码
def when_i_am_elected_leader(self, event):
        """Callback when this host gets elected leader."""

        # set running state
        self.prevIoUsly_running = True

        self.LOG.info("Monasca Transform service running on %s "
                      "has been elected leader" % str(self.my_host_name))

        if CONF.service.spark_python_files:
            pyfiles = (" --py-files %s"
                       % CONF.service.spark_python_files)
        else:
            pyfiles = ''''

        event_logging_dest = ''''
        if (CONF.service.spark_event_logging_enabled and
                CONF.service.spark_event_logging_dest):
            event_logging_dest = (
                "--conf spark.eventLog.dir="
                "file://%s" %
                CONF.service.spark_event_logging_dest)

        # Build the command to start the Spark driver
        spark_cmd = "".join((
            "export SPARK_HOME=",
            CONF.service.spark_home,
            " && ",
            "spark-submit --master ",
            CONF.service.spark_master_list,
            " --conf spark.eventLog.enabled=",
            CONF.service.spark_event_logging_enabled,
            event_logging_dest,
            " --jars " + CONF.service.spark_jars_list,
            pyfiles,
            " " + CONF.service.spark_driver))

        # Start the Spark driver
        # (specify shell=True in order to
        #  correctly handle wildcards in the spark_cmd)
        subprocess.call(spark_cmd, shell=True)

Python subprocess 模块-CalledProcessError() 实例源码

Python subprocess 模块-CalledProcessError() 实例源码

Python subprocess 模块,CalledProcessError() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用subprocess.CalledProcessError()

项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def service_running(service_name):
    """Determine whether a system service is running"""
    if init_is_systemd():
        return service(''is-active'', service_name)
    else:
        try:
            output = subprocess.check_output(
                [''service'', service_name, ''status''],
                stderr=subprocess.STDOUT).decode(''UTF-8'')
        except subprocess.CalledProcessError:
            return False
        else:
            # This works for upstart scripts where the ''service'' command
            # returns a consistent string to represent running ''start/running''
            if ("start/running" in output or "is running" in output or
                    "up and running" in output):
                return True
            # Check System V scripts init script return codes
            if service_name in systemv_services_running():
                return True
            return False
项目:rca-evaluation    作者:sieve-microservices    | 项目源码 | 文件源码
def run_command(args, wait=False):

    try:
        if (wait):
            p = subprocess.Popen(
                args, 
                stdout = subprocess.PIPE)
            p.wait()
        else:
            p = subprocess.Popen(
                args, 
                stdin = None, stdout = None, stderr = None, close_fds = True)

        (result, error) = p.communicate()

    except subprocess.CalledProcessError as e:
        sys.stderr.write(
            "common::run_command() : [ERROR]: output = %s,error code = %s\\n" 
            % (e.output, e.returncode))

    return result
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def is_crm_leader(resource, retry=False):
    """
    Returns True if the charm calling this is the elected corosync leader,
    as returned by calling the external "crm" command.

    We allow this operation to be retried to avoid the possibility of getting a
    false negative. See LP #1396246 for more info.
    """
    if resource == DC_RESOURCE_NAME:
        return is_crm_dc()
    cmd = [''crm'', ''resource'', ''show'', resource]
    try:
        status = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
        if not isinstance(status, six.text_type):
            status = six.text_type(status, "utf-8")
    except subprocess.CalledProcessError:
        status = None

    if status and get_unit_hostname() in status:
        return True

    if status and "resource %s is NOT running" % (resource) in status:
        raise CRMResourceNotFound("CRM resource %s not found" % (resource))

    return False
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def import_key(keyid):
    key = keyid.strip()
    if (key.startswith(''-----BEGIN PGP PUBLIC KEY BLOCK-----'') and
            key.endswith(''-----END PGP PUBLIC KEY BLOCK-----'')):
        juju_log("PGP key found (looks like ASCII Armor format)", level=DEBUG)
        juju_log("Importing ASCII Armor PGP key", level=DEBUG)
        with tempfile.NamedTemporaryFile() as keyfile:
            with open(keyfile.name, ''w'') as fd:
                fd.write(key)
                fd.write("\\n")

            cmd = [''apt-key'', ''add'', keyfile.name]
            try:
                subprocess.check_call(cmd)
            except subprocess.CalledProcessError:
                error_out("Error importing PGP key ''%s''" % key)
    else:
        juju_log("PGP key found (looks like Radix64 format)", level=DEBUG)
        juju_log("Importing PGP key from keyserver", level=DEBUG)
        cmd = [''apt-key'', ''adv'', ''--keyserver'',
               ''hkp://keyserver.ubuntu.com:80'', ''--recv-keys'', key]
        try:
            subprocess.check_call(cmd)
        except subprocess.CalledProcessError:
            error_out("Error importing PGP key ''%s''" % key)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def _git_update_requirements(venv, package_dir, reqs_dir):
    """
    Update from global requirements.

    Update an OpenStack git directory''s requirements.txt and
    test-requirements.txt from global-requirements.txt.
    """
    orig_dir = os.getcwd()
    os.chdir(reqs_dir)
    python = os.path.join(venv, ''bin/python'')
    cmd = [python, ''update.py'', package_dir]
    try:
        subprocess.check_call(cmd)
    except subprocess.CalledProcessError:
        package = os.path.basename(package_dir)
        error_out("Error updating {} from "
                  "global-requirements.txt".format(package))
    os.chdir(orig_dir)
项目:Adafruit_Python_PureIO    作者:adafruit    | 项目源码 | 文件源码
def _clean_check(cmd, target):
    """
    Run the command to download target. If the command fails,clean up before
    re-raising the error.
    """
    try:
        subprocess.check_call(cmd)
    except subprocess.CalledProcessError:
        if os.access(target, os.F_OK):
            os.unlink(target)
        raise
项目:rca-evaluation    作者:sieve-microservices    | 项目源码 | 文件源码
def run_command(command, wait=False):

    try:
        if (wait):

            p = subprocess.Popen(
                [command], 
                stdout = subprocess.PIPE,
                shell = True)
            p.wait()
        else:
            p = subprocess.Popen(
                [command], 
                shell = True, e.returncode))

    return result
项目:microbar    作者:Bengt    | 项目源码 | 文件源码
def install_python(version, arch, home):
    print("Installing Python", version, "for", "bit architecture to", home)
    if exists(home):
        return

    path = download_python(version, arch)
    print("Installing", path, "to", home)
    success = False
    for cmd in INSTALL_CMD[version]:
        cmd = [part.format(home=home, path=path) for part in cmd]
        print("Running:", " ".join(cmd))
        try:
            check_call(cmd)
        except CalledProcessError as exc:
            print("Failed command", cmd, "with:", exc)
            if exists("install.log"):
                with open("install.log") as fh:
                    print(fh.read())
        else:
            success = True
    if success:
        print("Installation complete!")
    else:
        print("Installation Failed")
项目:ISB-CGC-pipelines    作者:isb-cgc    | 项目源码 | 文件源码
def stopScheduler():
        try:
            subprocess.check_call(["sudo", "service", "supervisor", "stop"])

        except subprocess.CalledProcessError as e:
            print "ERROR: Couldn''t stop the scheduler (supervisor): {reason}".format(reason=e)
            exit(-1)

        try:
            subprocess.check_call(["sudo", "rabbitmq-server", "stop"])

        except subprocess.CalledProcessError as e:
            print "ERROR: Couldn''t stop the scheduler (rabbitmq): {reason}".format(reason=e)
            exit(-1)

        print "Scheduler stopped successfully!"
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def fstab_mount(mountpoint):
    """Mount filesystem using fstab"""
    cmd_args = [''mount'', mountpoint]
    try:
        subprocess.check_output(cmd_args)
    except subprocess.CalledProcessError as e:
        log(''Error unmounting {}\\n{}''.format(mountpoint, e.output))
        return False
    return True
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def relation_get(attribute=None, unit=None, rid=None):
    """Get relation information"""
    _args = [''relation-get'', ''--format=json'']
    if rid:
        _args.append(''-r'')
        _args.append(rid)
    _args.append(attribute or ''-'')
    if unit:
        _args.append(unit)
    try:
        return json.loads(subprocess.check_output(_args).decode(''UTF-8''))
    except ValueError:
        return None
    except CalledProcessError as e:
        if e.returncode == 2:
            return None
        raise
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def is_crm_dc():
    """
    Determine leadership by querying the pacemaker Designated Controller
    """
    cmd = [''crm'', ''status'']
    try:
        status = subprocess.check_output(cmd, "utf-8")
    except subprocess.CalledProcessError as ex:
        raise CRMDCNotFound(str(ex))

    current_dc = ''''
    for line in status.split(''\\n''):
        if line.startswith(''Current DC''):
            # Current DC: juju-lytrusty-machine-2 (168108163) - partition with quorum
            current_dc = line.split('':'')[1].split()[0]
    if current_dc == get_unit_hostname():
        return True
    elif current_dc == ''NONE'':
        raise CRMDCNotFound(''Current DC: NONE'')

    return False
项目:sensor21    作者:21dotco    | 项目源码 | 文件源码
def run(daemon):
        if daemon:
            pid_file = ''./sensor21.pid''
            if os.path.isfile(pid_file):
                pid = int(open(pid_file).read())
                os.remove(pid_file)
                try:
                    p = psutil.Process(pid)
                    p.terminate()
                except:
                    pass
            try:
                p = subprocess.Popen([''python3'', ''sensor21-server.py''])
                open(pid_file, ''w'').write(str(p.pid))
            except subprocess.CalledProcessError:
                raise ValueError("error starting sensor21-server.py daemon")
        else:
            print("Server running...")
            app.run(host=''::'', port=5002)
项目:docklet    作者:unias    | 项目源码 | 文件源码
def del_addr(linkname, address):
        try:
            subprocess.run([''ip'', ''address'', ''del'', address, ''dev'', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
            return [True, str(linkname)]
        except subprocess.CalledProcessError as suberror:
            return [False, "delete address Failed : %s" % suberror.stdout.decode(''utf-8'')]


# ovs-vsctl list-br
# ovs-vsctl br-exists <Bridge>
# ovs-vsctl add-br <Bridge>
# ovs-vsctl del-br <Bridge>
# ovs-vsctl list-ports <Bridge>
# ovs-vsctl del-port <Bridge> <Port>
# ovs-vsctl add-port <Bridge> <Port> -- set interface <Port> type=gre options:remote_ip=<RemoteIP>
# ovs-vsctl add-port <Bridge> <Port> tag=<ID> -- set interface <Port> type=internal
# ovs-vsctl port-to-br <Port>
# ovs-vsctl set Port <Port> tag=<ID>
# ovs-vsctl clear Port <Port> tag
项目:python-twelve-tone    作者:accraze    | 项目源码 | 文件源码
def install_python(version, exc)
            if exists("install.log"):
                with open("install.log") as fh:
                    print(fh.read())
        else:
            success = True
    if success:
        print("Installation complete!")
    else:
        print("Installation Failed")
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def try_initialize_swauth():
    if is_leader() and config(''auth-type'') == ''swauth'':
        if leader_get(''swauth-init'') is not True:
            try:
                admin_key = config(''swauth-admin-key'')
                if admin_key == '''' or admin_key is None:
                    admin_key = leader_get(''swauth-admin-key'')
                    if admin_key is None:
                        admin_key = uuid.uuid4()
                leader_set({''swauth-admin-key'': admin_key})

                bind_port = config(''bind-port'')
                bind_port = determine_api_port(bind_port, singlenode_mode=True)
                subprocess.check_call([
                    ''swauth-prep'',
                    ''-A'',
                    ''http://localhost:{}/auth''.format(bind_port),
                    ''-K'', admin_key])
                leader_set({''swauth-init'': True})
            except subprocess.CalledProcessError:
                log("had a problem initializing swauth!")
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def relation_get(attribute=None, ''--format=json'']
    if rid:
        _args.append(''-r'')
        _args.append(rid)
    _args.append(attribute or ''-'')
    if unit:
        _args.append(unit)
    try:
        return json.loads(subprocess.check_output(_args).decode(''UTF-8''))
    except ValueError:
        return None
    except CalledProcessError as e:
        if e.returncode == 2:
            return None
        raise
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def _git_update_requirements(venv, package_dir]
    try:
        subprocess.check_call(cmd)
    except subprocess.CalledProcessError:
        package = os.path.basename(package_dir)
        error_out("Error updating {} from "
                  "global-requirements.txt".format(package))
    os.chdir(orig_dir)
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def test_failure(self):
        """Ensure that action_fail is called on failure."""
        self.config.return_value = "swauth"
        self.action_get.return_value = "test"
        self.determine_api_port.return_value = 8070
        self.CalledProcessError = ValueError

        self.check_call.side_effect = subprocess.CalledProcessError(
            0, "hi", "no")
        actions.add_user.add_user()
        self.leader_get.assert_called_with("swauth-admin-key")
        calls = [call("account"), call("username"), call("password")]
        self.action_get.assert_has_calls(calls)
        self.action_set.assert_not_called()

        self.action_fail.assert_called_once_with(
            ''Adding user test Failed with: "Command \\''hi\\'' returned non-zero ''
            ''exit status 0"'')
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def relation_get(attribute=None, ''--format=json'']
    if rid:
        _args.append(''-r'')
        _args.append(rid)
    _args.append(attribute or ''-'')
    if unit:
        _args.append(unit)
    try:
        return json.loads(subprocess.check_output(_args).decode(''UTF-8''))
    except ValueError:
        return None
    except CalledProcessError as e:
        if e.returncode == 2:
            return None
        raise
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def ensure_compliance(self):
        """Ensures that the modules are not loaded."""
        if not self.modules:
            return

        try:
            loaded_modules = self._get_loaded_modules()
            non_compliant_modules = []
            for module in self.modules:
                if module in loaded_modules:
                    log("Module ''%s'' is enabled but should not be." %
                        (module), level=INFO)
                    non_compliant_modules.append(module)

            if len(non_compliant_modules) == 0:
                return

            for module in non_compliant_modules:
                self._disable_module(module)
            self._restart_apache()
        except subprocess.CalledProcessError as e:
            log(''Error occurred auditing apache module compliance. ''
                ''This may have been already reported. ''
                ''Output is: %s'' % e.output, level=ERROR)
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def is_crm_dc():
    """
    Determine leadership by querying the pacemaker Designated Controller
    """
    cmd = [''crm'', "utf-8")
    except subprocess.CalledProcessError as ex:
        raise CRMDCNotFound(str(ex))

    current_dc = ''''
    for line in status.split(''\\n''):
        if line.startswith(''Current DC''):
            # Current DC: juju-lytrusty-machine-2 (168108163) - partition with quorum
            current_dc = line.split('':'')[1].split()[0]
    if current_dc == get_unit_hostname():
        return True
    elif current_dc == ''NONE'':
        raise CRMDCNotFound(''Current DC: NONE'')

    return False
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def is_crm_leader(resource, "utf-8")
    except subprocess.CalledProcessError:
        status = None

    if status and get_unit_hostname() in status:
        return True

    if status and "resource %s is NOT running" % (resource) in status:
        raise CRMResourceNotFound("CRM resource %s not found" % (resource))

    return False
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def _git_update_requirements(venv, package_dir]
    try:
        subprocess.check_call(cmd)
    except subprocess.CalledProcessError:
        package = os.path.basename(package_dir)
        error_out("Error updating {} from "
                  "global-requirements.txt".format(package))
    os.chdir(orig_dir)
项目:cmake.nvim    作者:phillipbonhomme    | 项目源码 | 文件源码
def test_RTagsDaemonStartClean(self):
        try:
            os.chdir("clean")
        except OSError:
            print("Test Error: Couldn''t cd into ''dirty'' test directory.")
            raise
        self.assertFalse(self.cmake_build_info["build_dir"].is_dir())
        self.plugin.setup_rtags_daemon()
        try:
            rtags_daemon_status = subprocess.check_output(
                self.cmake_cmd_info["rtags_status"])
        except subprocess.CalledProcessError as e:
            print(e.output)
        self.assertTrue(
            len("*********************************\\nfileids\\n*********************************\\n*********************************\\nheadererrors\\n*********************************\\n*********************************\\ninfo\\n*********************************\\nRunning a release build\\nsocketFile: /Users/phillipbonhomme/.rdm\\ndataDir: /Users/phillipbonhomme/.cache/rtags/\\noptions: 0x14jobCount: 4\\nrpVisitFileTimeout: 60000\\nrpIndexDataMessageTimeout: 60000\\nrpConnectTimeout: 0\\nrpConnectTimeout: 0\\ndefaultArguments: List<String>(-ferror-limit=50,-Wall,-fspell-checking,-Wno-unkNown-warning-option\\")\\nincludePaths: List<Source::Include>(\\")\\ndefines: List<Source::Define>(-DRTAGS=\\")\\nignoredCompilers: Set<Path>(\\")\\n*********************************\\njobs\\n*********************************\\n"
                ) <= len(str(rtags_daemon_status)))
项目:cmake.nvim    作者:phillipbonhomme    | 项目源码 | 文件源码
def test_RTagsDaemonStartDirty(self):
        try:
            os.chdir("dirty")
        except OSError:
            print("Test Error: Couldn''t cd into ''dirty'' test directory.")
            raise
        self.assertTrue(self.cmake_build_info["build_dir"].is_dir())
        self.plugin.setup_rtags_daemon()
        try:
            rtags_daemon_status = subprocess.check_output(
                self.cmake_cmd_info["rtags_status"])
        except subprocess.CalledProcessError as e:
            print(e.output)
        self.assertTrue(
            len("*********************************\\nfileids\\n*********************************\\n*********************************\\nheadererrors\\n*********************************\\n*********************************\\ninfo\\n*********************************\\nRunning a release build\\nsocketFile: /Users/phillipbonhomme/.rdm\\ndataDir: /Users/phillipbonhomme/.cache/rtags/\\noptions: 0x14jobCount: 4\\nrpVisitFileTimeout: 60000\\nrpIndexDataMessageTimeout: 60000\\nrpConnectTimeout: 0\\nrpConnectTimeout: 0\\ndefaultArguments: List<String>(-ferror-limit=50,-Wno-unkNown-warning-option\\")\\nincludePaths: List<Source::Include>(\\")\\ndefines: List<Source::Define>(-DRTAGS=\\")\\nignoredCompilers: Set<Path>(\\")\\n*********************************\\njobs\\n*********************************\\n"
                ) <= len(str(rtags_daemon_status)))
项目:cmake.nvim    作者:phillipbonhomme    | 项目源码 | 文件源码
def test_RTagsClientStartDirty(self):
        try:
            os.chdir("dirty")
        except OSError:
            print("Test Error: Couldn''t cd into ''dirty'' test directory.")
            raise
        self.assertTrue(self.cmake_build_info["build_dir"].is_dir())
        self.assertTrue(self.cmake_build_info["comp_data_cmake"].is_file())
        self.plugin.setup_rtags_daemon()
        self.plugin.connect_rtags_client()
        try:
            rtags_client_status = subprocess.check_output(
                self.cmake_cmd_info["rtags_file_status"] +
                [str(src_info["cpp"])])
        except subprocess.CalledProcessError as e:
            print(e.output)
        self.assertTrue(str(rtags_client_status).find("managed"))
        try:
            rtags_client_status = subprocess.check_output(
                self.cmake_cmd_info["rtags_file_status"] +
                [str(src_info["test_cpp"])])
        except subprocess.CalledProcessError as e:
            print(e.output)
        self.assertTrue(str(rtags_client_status).find("managed"))
项目:cmake.nvim    作者:phillipbonhomme    | 项目源码 | 文件源码
def test_RTagsClientSetFile(self):
        try:
            os.chdir("dirty")
        except OSError:
            print("Test Error: Couldn''t cd into ''dirty'' test directory.")
            raise
        self.assertTrue(self.cmake_build_info["build_dir"].is_dir())
        self.assertTrue(self.cmake_build_info["comp_data_cmake"].is_file())
        self.plugin.setup_rtags_daemon()
        self.plugin.connect_rtags_client()
        self.plugin.rtags_set_file([str(src_info["cpp"])])
        try:
            rtags_client_status = subprocess.check_output(
                self.cmake_cmd_info["rtags_file_status"] +
                [str(src_info["cpp"])])
        except subprocess.CalledProcessError as e:
            print(e.output)
        self.assertTrue(str(rtags_client_status).find("managed"))
项目:cmake.nvim    作者:phillipbonhomme    | 项目源码 | 文件源码
def test_RTagsClientUpdateBuffers(self):
        try:
            os.chdir("dirty")
        except OSError:
            print("Test Error: Couldn''t cd into ''dirty'' test directory.")
            raise
        self.assertTrue(self.cmake_build_info["build_dir"].is_dir())
        self.assertTrue(self.cmake_build_info["comp_data_cmake"].is_file())
        self.plugin.setup_rtags_daemon()
        self.plugin.connect_rtags_client()
        self.plugin.update_rtags_buffers(
            [str(src_info["test_cpp"]),
             str(src_info["cpp"])])
        try:
            rtags_client_status = subprocess.check_output(
                self.cmake_cmd_info["rtags_buffers"])
        except subprocess.CalledProcessError as e:
            print(e.output)
        filepath = os.getcwd() + str(src_info["test_cpp"])
        self.assertTrue(str(rtags_client_status).find(filepath))
项目:cmake.nvim    作者:phillipbonhomme    | 项目源码 | 文件源码
def run_cmake(self):
        print("Running CMake")
        build_dir_cmd_out = subprocess.call(
            ["mkdir", "build"],
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL)
        if build_dir_cmd_out != 0:
            print("Can\\''t setup CMake build directory.")
            return

        if self.cmake_build_info["build_dir"].is_dir():
            try:
                subprocess.check_output(
                    self.cmake_cmd_info["cmake_cmd"],
                    cwd=str(self.cmake_build_info["build_dir"]))
            except subprocess.CalledProcessError as e:
                print(e.output)
            if not self.cmake_build_info["comp_data_cmake"].is_file():
                print("Couldn''t setup CMake Project")
                return
        else:
            print("Couldn''t setup CMake Project")
            return
项目:benchmarks    作者:tensorflow    | 项目源码 | 文件源码
def _PrintLogs(pod_name_prefix, job_name):
  """Prints pod logs.

  If a pod has been restarted,prints logs from prevIoUs run. Otherwise,
  prints the logs from current run. We print logs for pods selected
  based on pod_name_prefix and job_name.

  Args:
    pod_name_prefix: value of ''name-prefix'' selector.
    job_name: value of ''job'' selector.
  """
  for pod_name in _GetPodNames(pod_name_prefix, job_name):
    try:
      # Get prevIoUs logs.
      logs_command = [_KUBECTL, ''logs'', ''-p'', pod_name]
      logging.info(''Command to get logs: %s'', '' ''.join(logs_command))
      output = subprocess.check_output(logs_command, universal_newlines=True)
    except subprocess.CalledProcessError:
      # We Couldn''t get prevIoUs logs,so we will try to get current logs.
      logs_command = [_KUBECTL, universal_newlines=True)
    print(''%s logs:'' % pod_name)
    print(output)
项目:extracts    作者:openmaptiles    | 项目源码 | 文件源码
def process_extract(extract):

        extract_file = os.path.join(target_dir, extract.extract + ''.mbtiles'')
        print(''Create extract {}''.format(extract_file))

        # Instead of patching copy over the patch source as target and
        # write directly to it (since that works concurrently).
        patch_src = args[''--patch-from'']
        if patch_src:
            print(''Use patch from {} as base''.format(patch_src))
            shutil.copyfile(patch_src, extract_file)

        try:
            create_extract(extract, source_file, extract_file)
        except subprocess.CalledProcessError as e:
            # Failing extracts should not interrupt
            # the entire process
            print(e, file=sys.stderr)
            return

        print(''Update Metadata {}''.format(extract_file))
        update_Metadata(extract_file, extract.Metadata(extract_file))
项目:foxbms-setup    作者:foxBMS    | 项目源码 | 文件源码
def get_main_git_path():
    """Gets the remote URL of the setup repository.

    Returns:
        string: remote URL of the setup-repository.
    """
    try:
        repository_basepath = subprocess.check_output(
            ''git config --get remote.origin.url''.split('' ''))
    except subprocess.CalledProcessError as err:
        setup_dir_path = os.path.dirname(os.path.realpath(__file__))
        err_msg = ''''''
\\''{}\\'' is not a git repository.
Did you download a .zip file from GitHub?

Use
    \\''git clone https://github.com/foxBMS/foxBMS-setup\\''
to download the foxBMS-setup repository.
        ''''''.format(setup_dir_path)
        logging.error(err_msg)
        sys.exit(1)
    repository_basepath, repository_name = repository_basepath.rsplit(''/'', 1)
    return repository_basepath, repository_name
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def callgit(self, workspacePath, *args):
        cmdLine = [''git'']
        cmdLine.extend(args)
        try:
            output = subprocess.check_output(cmdLine, cwd=os.path.join(os.getcwd(), self.__dir),
                universal_newlines=True, stderr=subprocess.DEVNULL)
        except subprocess.CalledProcessError as e:
            raise BuildError("git error:\\n Directory: ''{}''\\n Command: ''{}''\\n''{}''".format(
                os.path.join(workspacePath, " ".join(cmdLine), e.output.rstrip()))
        return output

    # Get GitSCM status. The purpose of this function is to return the status of the given directory
    #
    # return values:
    #  - error: The SCM is in a error state. Use this if git returned a error code.
    #  - dirty: SCM is dirty. Could be: modified files,switched to another branch/tag/commit/repo,unpushed commits.
    #  - clean: Same branch/tag/commit as specified in the recipe and no local changes.
    #  - empty: Directory is not existing.
    #
    # This function is called when build with --clean-checkout. ''error'' and ''dirty'' SCMs are moved to attic,
    # while empty and clean directories are not.
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def _scanDir(self, workspace, dir):
        self.__dir = dir
        dir = os.path.join(workspace, dir)
        try:
            remotes = subprocess.check_output(["git", "remote", "-v"],
                cwd=dir, universal_newlines=True).split("\\n")
            remotes = (r[:-8].split("\\t") for r in remotes if r.endswith("(fetch)"))
            self.__remotes = { remote:url for (remote,url) in remotes }

            self.__commit = subprocess.check_output(["git", "rev-parse", "HEAD"], universal_newlines=True).strip()
            self.__description = subprocess.check_output(
                ["git", "describe", "--always", "--dirty"], universal_newlines=True).strip()
            self.__dirty = subprocess.call(["git", "diff-index", "--quiet", "HEAD", "--"],
                cwd=dir) != 0
        except subprocess.CalledProcessError as e:
            raise BuildError("Git audit Failed: " + str(e))
        except OSError as e:
            raise BuildError("Error calling git: " + str(e))
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def callSubversion(self, *args):
        cmdLine = [''svn'']
        cmdLine.extend(args)

        try:
            output = subprocess.check_output(cmdLine, cwd=workspacePath, stderr=subprocess.DEVNULL)
        except subprocess.CalledProcessError as e:
            raise BuildError("svn error:\\n Directory: ''{}''\\n Command: ''{}''\\n''{}''".format(
                os.path.join(workspacePath, e.output.rstrip()))
        return output

    # Get SvnSCM status. The purpose of this function is to return the status of the given directory
    #
    # return values:
    #  - error: the scm is in a error state. Use this if svn call returns a error code.
    #  - dirty: SCM is dirty. Could be: modified files,switched to another URL or revision
    #  - clean: same URL and revision as specified in the recipe and no local changes.
    #  - empty: directory is not existing
    #
    # This function is called when build with --clean-checkout. ''error'' and ''dirty'' scm''s are moved to attic,
    # while empty and clean directories are not.
项目:fabric-sdk-py    作者:hyperledger    | 项目源码 | 文件源码
def cli_call(arg_list, expect_success=True, env=os.environ.copy()):
    """Executes a CLI command in a subprocess and return the results.

    Args:
        arg_list: a list command arguments
        expect_success: use False to return even if an error occurred
                        when executing the command
        env:

    Returns: (string,string,int) output message,error message,return code

    """
    p = subprocess.Popen(arg_list,
                         stderr=subprocess.PIPE, env=env)
    output, error = p.communicate()
    if p.returncode != 0:
        if output:
            print("Output:\\n" + str(output))
        if error:
            print("Error Message:\\n" + str(error))
        if expect_success:
            raise subprocess.CalledProcessError(
                p.returncode, arg_list, output)
    return output, error, p.returncode
项目:astrobase    作者:waqasbhatti    | 项目源码 | 文件源码
def gunzip_sqlitecurve(sqlitecurve):
    ''''''This just uncompresses the sqlitecurve in gzip format.

    FIXME: this doesn''t work with gzip < 1.6 or non-GNU gzip (probably).

    ''''''

    # -k to keep the input .gz just in case something explodes
    cmd = ''gunzip -k %s'' % sqlitecurve

    try:
        procout = subprocess.check_output(cmd, shell=True)
        return sqlitecurve.replace(''.gz'','''')
    except subprocess.CalledProcessError:
        LOGERROR(''Could not uncompress %s'' % sqlitecurve)
        return None



###############################################
## DECIDE WHICH COMPRESSION FUNCTIONS TO USE ##
###############################################
项目:messier    作者:conorsch    | 项目源码 | 文件源码
def available_vms(self, vms=None):
        """
        List all VMs regardless of state,filtering if requested via the <vms>
        parameter provider by the CLI.
        """
        try:
            possible_vms = [vm for vm in self.v.status()]
        except CalledProcessError, e:
            # Todo: Exception handling here assumes Vagrantfile is missing.
            # Vagrant seems to return 1 for many different errors,and finding
            # documentation for specific return codes has proven difficult.
            raise VagrantfileNotFound

        if vms:
            wanted_vms = [vm for vm in possible_vms if vm.name in vms]
            possible_vms = wanted_vms

        return possible_vms
项目:messier    作者:conorsch    | 项目源码 | 文件源码
def destroy_vms(self):
        """
        Destroy target VMs. Operates on all available VMs if none are specified.
        """
        for vm in self.vms:
            # Vagrant will return 1 if VM to be destroyed does not exist.
            if vm.state != "not_created":
                self.v.destroy(vm_name=vm.name)
                # Destroy a second time because the vagrant-digitalocean plugin
                # doesn''t clean up after itself:
                # https://github.com/smdahlen/vagrant-digitalocean/issues/194
                if vm.provider == "digital_ocean":
                    try:
                        self.v.destroy(vm_name=vm.name)
                    except CalledProcessError:
                        pass
项目:sphinxcontrib-versioning    作者:Robpol86    | 项目源码 | 文件源码
def test_error_bad_path(tmpdir):
    """Test handling of bad paths.

    :param tmpdir: pytest fixture.
    """
    with pytest.raises(CalledProcessError) as exc:
        pytest.run(tmpdir, [''sphinx-versioning'', ''-N'', ''-c'', ''unkNown'', ''build'', ''.'', str(tmpdir)])
    assert ''Directory "unkNown" does not exist.'' in exc.value.output

    tmpdir.ensure(''is_file'')
    with pytest.raises(CalledProcessError) as exc:
        pytest.run(tmpdir, ''is_file'', str(tmpdir)])
    assert ''Directory "is_file" is a file.'' in exc.value.output

    with pytest.raises(CalledProcessError) as exc:
        pytest.run(tmpdir, str(tmpdir)])
    assert ''Failed to find local git repository root in {}.''.format(repr(str(tmpdir))) in exc.value.output

    repo = tmpdir.ensure_dir(''repo'')
    pytest.run(repo, [''git'', ''init''])
    empty = tmpdir.ensure_dir(''empty1857'')
    with pytest.raises(CalledProcessError) as exc:
        pytest.run(repo, ''-g'', str(empty), str(tmpdir)])
    assert ''Failed to find local git repository root in'' in exc.value.output
    assert ''empty1857'' in exc.value.output
项目:sphinxcontrib-versioning    作者:Robpol86    | 项目源码 | 文件源码
def test_new_branch_tags(tmpdir, local_light, fail):
    """Test with new branches and tags unkNown to local repo.

    :param tmpdir: pytest fixture.
    :param local_light: conftest fixture.
    :param bool fail: Fail by not fetching.
    """
    remotes = [r for r in list_remote(str(local_light)) if r[1] == ''ob_at'']

    # Fail.
    sha = remotes[0][0]
    target = tmpdir.ensure_dir(''exported'', sha)
    if fail:
        with pytest.raises(CalledProcessError):
            export(str(local_light), sha, str(target))
        return

    # Fetch.
    fetch_commits(str(local_light), remotes)

    # Export.
    export(str(local_light), str(target))
    files = [f.relto(target) for f in target.listdir()]
    assert files == [''README'']
    assert target.join(''README'').read() == ''new''
项目:sphinxcontrib-versioning    作者:Robpol86    | 项目源码 | 文件源码
def get_root(directory):
    """Get root directory of the local git repo from any subdirectory within it.

    :raise GitError: If git command fails (dir not a git repo?).

    :param str directory: subdirectory in the local repo.

    :return: Root directory of repository.
    :rtype: str
    """
    command = [''git'', ''rev-parse'', ''--show-toplevel'']
    try:
        output = run_command(directory, command, env_var=False)
    except CalledProcessError as exc:
        raise GitError(''Failed to find local git repository root in {}.''.format(repr(directory)), exc.output)
    if IS_WINDOWS:
        output = output.replace(''/'', ''\\\\'')
    return output.strip()
项目:sphinxcontrib-versioning    作者:Robpol86    | 项目源码 | 文件源码
def fetch_commits(local_root, remotes):
    """Fetch from origin.

    :raise CalledProcessError: Unhandled git command failure.

    :param str local_root: Local path to git root directory.
    :param iter remotes: Output of list_remote().
    """
    # Fetch all kNown branches.
    command = [''git'', ''fetch'', ''origin'']
    run_command(local_root, command)

    # Fetch new branches/tags.
    for sha, name, kind in remotes:
        try:
            run_command(local_root, ''reflog'', sha])
        except CalledProcessError:
            run_command(local_root, command + [''refs/{0}/{1}''.format(kind, name)])
            run_command(local_root, sha])
项目:runcommands    作者:wylee    | 项目源码 | 文件源码
def version_getter(config):
    """Get tag associated with HEAD; fall back to SHA1.

    If HEAD is tagged,return the tag name; otherwise fall back to
    HEAD''s short SHA1 hash.

    .. note:: Only annotated tags are considered.

    Todo: Support non-annotated tags?

    """
    try:
        check_output([''git'', ''--is-inside-work-tree''], stderr=DEVNULL)
    except CalledProcessError:
        return None
    encoding = getpreferredencoding(do_setlocale=False)
    try:
        version = check_output([''git'', ''describe'', ''--exact-match''], stderr=DEVNULL)
    except CalledProcessError:
        version = check_output([''git'', ''--short'', ''HEAD''])
    version = version.decode(encoding).strip()
    return version
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def my_thread():
  global files,path,timeout,options
  myname= threading.currentThread().getName()
  while files:
     #create command to run
     nextfile=files.pop() 
     #print name of thread and command being run
     print(''Thread {0} starts processing {1}''.format(myname,nextfile))
     f=path + nextfile + options
     try:
        #timeout interrupts frozen command,shell=True does''nt open a console
        subprocess.check_call(args= f , shell=True, timeout=timeout)
     except subprocess.TimeoutExpired:
        print(''Thread {0} Processing {0} took too long'' .format(myname,nextfile))
     except subprocess.CalledProcessError as e: 
        print (''Thread {0} Processing {1} returned error {2}:{3}''.format(myname,nextfile,e.returncode,e.output))
     except Exception as e:
        print (''Thread {0} Processing {1} returned error {2}''.format(myname,type(e).__name__))
  print (''thread {0} stopped''.format(myname))
项目:detox    作者:tox-dev    | 项目源码 | 文件源码
def invoke(command, success_codes=(0,)):
    try:
        output = subprocess.check_output(command, stderr=subprocess.STDOUT)
        status = 0
    except subprocess.CalledProcessError as error:
        output = error.output
        status = error.returncode
    output = output.decode(''utf-8'')
    if status not in success_codes:
        raise Exception(
            ''Command %r return exit code %d and output: """%s""".'' % (
                command,
                status,
                output,
            )
        )
    return status, output
项目:charm-heat    作者:openstack    | 项目源码 | 文件源码
def upgrade_charm():
    if is_leader():
        # if we are upgrading,then the old version might have used the
        # HEAT_PATH/encryption-key. So we grab the key from that,and put it in
        # leader settings to ensure that the key remains the same during an
        # upgrade.
        encryption_path = os.path.join(HEAT_PATH, ''encryption-key'')
        if os.path.isfile(encryption_path):
            with open(encryption_path, ''r'') as f:
                encryption_key = f.read()
            try:
                leader_set({''heat-auth-encryption-key'': encryption_key})
            except subprocess.CalledProcessError as e:
                log("upgrade: leader_set: heat-auth-encryption-key Failed,"
                    " didn''t delete the existing file: {}.\\n"
                    "Error was: ".format(encryption_path, str(e)),
                    level=WARNING)
            else:
                # Now we just delete the file
                os.remove(encryption_path)
    leader_elected()
项目:ISB-CGC-pipelines    作者:isb-cgc    | 项目源码 | 文件源码
def __init__(self, fqArchiveUrl, filtersDir, outputPrefix, outputUrl, diskSize, diskType, logsPath, container, scriptUrl, tag, cores, mem, preemptible):
        super(Pipelinestep, self).__init__()

        fqFileName = os.path.basename(fqArchiveUrl)
        fqInputs = "{fqArchive}:{fqFileName}".format(fqArchive=fqArchiveUrl, fqFileName=fqFileName)

        try:
            filtersDirContents = subprocess.check_output(["gsutil", "ls", filtersDir])

        except subprocess.CalledProcessError as e:
            print "ERROR: Couldn''t get a listing of filter files!  -- {reason}".format(reason=e)
            exit(-1)

        bfInputs = [x for x in filtersDirContents.split(''\\n'') if re.match(''^.*\\.bf$'', x) or re.match(''^.*\\.txt'', x)]
        bfInputs.append(fqInputs)

        inputs = ",".join(["{url}:{filename}".format(url=x, filename=os.path.basename(x)) for x in bfInputs])
        outputs = "{outputPrefix}*:{outDir}".format(outputPrefix=outputPrefix, outDir=outputUrl)
        env = "INPUT_FILE={fqFileName},OUTPUT_PREFIX={outputPrefix},FILTERS_LIST={filtersList}".format(fqFileName=fqFileName, outputPrefix=outputPrefix, filtersList='',''.join([os.path.basename(x) for x in bfInputs if re.match(''^.*\\.bf$'', x)]))

        self._step = Pipelineschema("biobloomcategorizer",
                                                   self._pipelinesConfig,
                                                   logsPath,
                                                   container,
                                                   scriptUrl=scriptUrl,
                                                   cores=cores,
                                                   mem=mem,
                                                   diskSize=diskSize,
                                                   diskType=diskType,
                                                   inputs=inputs,
                                                   outputs=outputs,
                                                   env=env,
                                                   tag=tag,
                                                   preemptible=preemptible)
项目:ISB-CGC-pipelines    作者:isb-cgc    | 项目源码 | 文件源码
def getJobLogs(args, config):  # Todo: reimplement
        pipelinedbutils = Pipelinedbutils(config)

        jobInfo = pipelinedbutils.getJobInfo(select=["stdout_log", "stderr_log", "gcs_log_path"],
                                             where={"job_id": args.jobId})

        with open(os.devnull, ''w'') as fnull:
            if args.stdout:
                try:
                    stdoutLogFile = subprocess.check_output(
                        ["gsutil", "cat", os.path.join(jobInfo[0].gcs_log_path, jobInfo[0].stdout_log)], stderr=fnull)
                except subprocess.CalledProcessError as e:
                    print "ERROR: Couldn''t get the stdout log : {reason}".format(reason=e)
                    exit(-1)

                print "STDOUT:\\n"
                print stdoutLogFile
                print "---------\\n"

            if args.stderr:
                try:
                    stderrLogFile = subprocess.check_output(
                        ["gsutil", "-q", jobInfo[0].stderr_log)],
                        stderr=fnull)
                except subprocess.CalledProcessError as e:
                    print "ERROR: Couldn''t get the stderr log : {reason}".format(reason=e)
                    exit(-1)

                print "STDERR:\\n"
                print stderrLogFile
                print "---------\\n"

        pipelinedbutils.closeConnection()

关于subprocess.callsubprocess.call详解的问题我们已经讲解完毕,感谢您的阅读,如果还想了解更多关于android – subprocess.CalledProcessError返回非零退出状态1、C ++ forksubprocess,请求subprocess列表,在Linux中杀死进程、Python subprocess 模块-call() 实例源码、Python subprocess 模块-CalledProcessError() 实例源码等相关内容,可以在本站寻找。

本文标签: